Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,24 @@ const DIRTY_BUCKET_SCAN_BATCH_SIZE = 2_000;
/** This default is primarily for tests. */
const DEFAULT_MEMORY_LIMIT_MB = 64;

interface SessionOptions {
session: mongo.ClientSession;
readPreference: mongo.ReadPreferenceLike;
readConcern?: mongo.ReadConcernLike;
writeConcern?: mongo.WriteConcern;
}

/**
* For compacting, it is fine to read data that is a little stale.
*
* We use causal sessions and secondaryPreferred read preference, so that we can read from secondaries, but still have a consistent view of the data.
*/
const DEFAULT_SESSION_OPTIONS: Omit<SessionOptions, 'session'> = {
readPreference: 'secondaryPreferred',
readConcern: { level: 'majority' },
writeConcern: { w: 'majority' }
};

export class MongoCompactor {
private updates: mongo.AnyBulkWriteOperation<BucketDataDocument>[] = [];
private bucketStateUpdates: mongo.AnyBulkWriteOperation<BucketStateDocument>[] = [];
Expand Down Expand Up @@ -106,20 +124,25 @@ export class MongoCompactor {
* See /docs/compacting-operations.md for details.
*/
async compact() {
const session = this.db.client.startSession({ causalConsistency: true });
const sessionOptions: SessionOptions = {
session,
...DEFAULT_SESSION_OPTIONS
};
if (this.buckets) {
for (let bucket of this.buckets) {
// We can make this more efficient later on by iterating
// through the buckets in a single query.
// That makes batching more tricky, so we leave for later.
await this.compactSingleBucket(bucket);
await this.compactSingleBucket(sessionOptions, bucket);
}
} else {
await this.compactDirtyBuckets();
await this.compactDirtyBuckets(sessionOptions);
}
}

private async compactDirtyBuckets() {
for await (let buckets of this.dirtyBucketBatches({
private async compactDirtyBuckets(sessionOptions: SessionOptions) {
for await (let buckets of this.dirtyBucketBatches(sessionOptions, {
minBucketChanges: this.minBucketChanges,
minChangeRatio: this.minChangeRatio
})) {
Expand All @@ -131,12 +154,12 @@ export class MongoCompactor {
}

for (let { bucket } of buckets) {
await this.compactSingleBucket(bucket);
await this.compactSingleBucket(sessionOptions, bucket);
}
}
}

private async compactSingleBucket(bucket: string) {
private async compactSingleBucket(sessionOptions: SessionOptions, bucket: string) {
const idLimitBytes = this.idLimitBytes;

let currentState: CurrentBucketState = {
Expand Down Expand Up @@ -195,7 +218,8 @@ export class MongoCompactor {
{
// batchSize is 1 more than limit to auto-close the cursor.
// See https://github.com/mongodb/node-mongodb-native/pull/4580
batchSize: this.moveBatchQueryLimit + 1
batchSize: this.moveBatchQueryLimit + 1,
...sessionOptions
}
);
// We don't limit to a single batch here, since that often causes MongoDB to scan through more than it returns.
Expand Down Expand Up @@ -277,7 +301,7 @@ export class MongoCompactor {
}

if (this.updates.length + this.bucketStateUpdates.length >= this.moveBatchLimit) {
await this.flush();
await this.flush(sessionOptions);
}
}

Expand All @@ -291,15 +315,15 @@ export class MongoCompactor {
`Inserting CLEAR at ${this.group_id}:${bucket}:${currentState.lastNotPut} to remove ${currentState.opsSincePut} operations`
);
// Need flush() before clear()
await this.flush();
await this.clearBucket(currentState);
await this.flush(sessionOptions);
await this.clearBucket(sessionOptions, currentState);
}

// Do this _after_ clearBucket so that we have accurate counts.
this.updateBucketChecksums(currentState);

// Need another flush after updateBucketChecksums()
await this.flush();
await this.flush(sessionOptions);
}

/**
Expand Down Expand Up @@ -343,22 +367,24 @@ export class MongoCompactor {
});
}

private async flush() {
private async flush(sessionOptions: SessionOptions) {
if (this.updates.length > 0) {
logger.info(`Compacting ${this.updates.length} ops`);
await this.db.bucket_data.bulkWrite(this.updates, {
// Order is not important.
// Since checksums are not affected, these operations can happen in any order,
// and it's fine if the operations are partially applied.
// Each individual operation is atomic.
ordered: false
ordered: false,
...sessionOptions
});
this.updates = [];
}
if (this.bucketStateUpdates.length > 0) {
logger.info(`Updating ${this.bucketStateUpdates.length} bucket states`);
await this.db.bucket_state.bulkWrite(this.bucketStateUpdates, {
ordered: false
ordered: false,
...sessionOptions
});
this.bucketStateUpdates = [];
}
Expand All @@ -371,7 +397,7 @@ export class MongoCompactor {
* @param bucket bucket name
* @param op op_id of the last non-PUT operation, which will be converted to CLEAR.
*/
private async clearBucket(currentState: CurrentBucketState) {
private async clearBucket(sessionOptions: SessionOptions, currentState: CurrentBucketState) {
const bucket = currentState.bucket;
const clearOp = currentState.lastNotPut!;

Expand All @@ -390,7 +416,10 @@ export class MongoCompactor {
}
};

// We start a new transaction here with a new session, but do want this to be up to date with the parent session.
const session = this.db.client.startSession();
session.advanceClusterTime(sessionOptions.session.clusterTime!);
session.advanceOperationTime(sessionOptions.session.operationTime!);
try {
let done = false;
while (!done && !this.signal?.aborted) {
Expand Down Expand Up @@ -486,8 +515,19 @@ export class MongoCompactor {
*/
async populateChecksums(options: { minBucketChanges: number }): Promise<PopulateChecksumCacheResults> {
let count = 0;
const session = this.db.client.startSession({ causalConsistency: true });
await using _ = {
async [Symbol.asyncDispose]() {
await session.endSession();
}
};
const sessionOptions: SessionOptions = {
session,
...DEFAULT_SESSION_OPTIONS
};

while (!this.signal?.aborted) {
const buckets = await this.dirtyBucketBatchForChecksums(options);
const buckets = await this.dirtyBucketBatchForChecksums(sessionOptions, options);
if (buckets.length == 0 || this.signal?.aborted) {
// All done
break;
Expand All @@ -508,7 +548,10 @@ export class MongoCompactor {
logger.info(
`Calculating checksums for batch of ${buckets.length} buckets, estimated count of ${totalCountEstimate}`
);
await this.updateChecksumsBatch(checkBuckets.map((b) => b.bucket));
await this.updateChecksumsBatch(
sessionOptions,
checkBuckets.map((b) => b.bucket)
);
logger.info(`Updated checksums for batch of ${checkBuckets.length} buckets in ${Date.now() - start}ms`);
count += checkBuckets.length;
}
Expand All @@ -523,10 +566,13 @@ export class MongoCompactor {
* minBucketChanges: minimum number of changes for a bucket to be included in the results.
* minChangeRatio: minimum ratio of changes to total ops for a bucket to be included in the results, number between 0 and 1.
*/
private async *dirtyBucketBatches(options: {
minBucketChanges: number;
minChangeRatio: number;
}): AsyncGenerator<{ bucket: string; estimatedCount: number }[]> {
private async *dirtyBucketBatches(
sessionOptions: SessionOptions,
options: {
minBucketChanges: number;
minChangeRatio: number;
}
): AsyncGenerator<{ bucket: string; estimatedCount: number }[]> {
// Previously, we used an index on {_id.g: 1, estimate_since_compact.count: 1} to only buckets with changes.
// This works well if there are only a small number of buckets with changes.
// However, if buckets are continuosly modified while we are compacting, we get the same buckets over and over again.
Expand Down Expand Up @@ -582,7 +628,10 @@ export class MongoCompactor {
}
}
],
{ maxTimeMS: MONGO_OPERATION_TIMEOUT_MS }
{
maxTimeMS: MONGO_OPERATION_TIMEOUT_MS,
...sessionOptions
}
)
.toArray();

Expand Down Expand Up @@ -623,9 +672,12 @@ export class MongoCompactor {
*
* We currently don't get new data while doing populateChecksums, so we don't need to worry about buckets changing while processing.
*/
private async dirtyBucketBatchForChecksums(options: {
minBucketChanges: number;
}): Promise<{ bucket: string; estimatedCount: number }[]> {
private async dirtyBucketBatchForChecksums(
sessionOptions: SessionOptions,
options: {
minBucketChanges: number;
}
): Promise<{ bucket: string; estimatedCount: number }[]> {
if (options.minBucketChanges <= 0) {
throw new ReplicationAssertionError('minBucketChanges must be >= 1');
}
Expand All @@ -646,7 +698,8 @@ export class MongoCompactor {
'estimate_since_compact.count': -1
},
limit: 200,
maxTimeMS: MONGO_OPERATION_TIMEOUT_MS
maxTimeMS: MONGO_OPERATION_TIMEOUT_MS,
...sessionOptions
}
)
.toArray();
Expand All @@ -657,7 +710,7 @@ export class MongoCompactor {
}));
}

private async updateChecksumsBatch(buckets: string[]) {
private async updateChecksumsBatch(sessionOptions: SessionOptions, buckets: string[]) {
const checksums = await this.storage.checksums.computePartialChecksumsDirect(
buckets.map((bucket) => {
return {
Expand Down Expand Up @@ -703,6 +756,6 @@ export class MongoCompactor {
});
}

await this.flush();
await this.flush(sessionOptions);
}
}
Loading