From e15bb9b6b1811611dba1f9df7e3f6ed9e79c3040 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 17 Mar 2026 13:42:31 +0200 Subject: [PATCH] Compact using secondaries. --- .../storage/implementation/MongoCompactor.ts | 109 +++++++++++++----- 1 file changed, 81 insertions(+), 28 deletions(-) diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts index d957f17b1..579c20e21 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts @@ -68,6 +68,24 @@ const DIRTY_BUCKET_SCAN_BATCH_SIZE = 2_000; /** This default is primarily for tests. */ const DEFAULT_MEMORY_LIMIT_MB = 64; +interface SessionOptions { + session: mongo.ClientSession; + readPreference: mongo.ReadPreferenceLike; + readConcern?: mongo.ReadConcernLike; + writeConcern?: mongo.WriteConcern; +} + +/** + * For compacting, it is fine to read data that is a little stale. + * + * We use causal sessions and secondaryPreferred read preference, so that we can read from secondaries, but still have a consistent view of the data. + */ +const DEFAULT_SESSION_OPTIONS: Omit = { + readPreference: 'secondaryPreferred', + readConcern: { level: 'majority' }, + writeConcern: { w: 'majority' } +}; + export class MongoCompactor { private updates: mongo.AnyBulkWriteOperation[] = []; private bucketStateUpdates: mongo.AnyBulkWriteOperation[] = []; @@ -106,20 +124,25 @@ export class MongoCompactor { * See /docs/compacting-operations.md for details. */ async compact() { + const session = this.db.client.startSession({ causalConsistency: true }); + const sessionOptions: SessionOptions = { + session, + ...DEFAULT_SESSION_OPTIONS + }; if (this.buckets) { for (let bucket of this.buckets) { // We can make this more efficient later on by iterating // through the buckets in a single query. // That makes batching more tricky, so we leave for later. - await this.compactSingleBucket(bucket); + await this.compactSingleBucket(sessionOptions, bucket); } } else { - await this.compactDirtyBuckets(); + await this.compactDirtyBuckets(sessionOptions); } } - private async compactDirtyBuckets() { - for await (let buckets of this.dirtyBucketBatches({ + private async compactDirtyBuckets(sessionOptions: SessionOptions) { + for await (let buckets of this.dirtyBucketBatches(sessionOptions, { minBucketChanges: this.minBucketChanges, minChangeRatio: this.minChangeRatio })) { @@ -131,12 +154,12 @@ export class MongoCompactor { } for (let { bucket } of buckets) { - await this.compactSingleBucket(bucket); + await this.compactSingleBucket(sessionOptions, bucket); } } } - private async compactSingleBucket(bucket: string) { + private async compactSingleBucket(sessionOptions: SessionOptions, bucket: string) { const idLimitBytes = this.idLimitBytes; let currentState: CurrentBucketState = { @@ -195,7 +218,8 @@ export class MongoCompactor { { // batchSize is 1 more than limit to auto-close the cursor. // See https://github.com/mongodb/node-mongodb-native/pull/4580 - batchSize: this.moveBatchQueryLimit + 1 + batchSize: this.moveBatchQueryLimit + 1, + ...sessionOptions } ); // We don't limit to a single batch here, since that often causes MongoDB to scan through more than it returns. @@ -277,7 +301,7 @@ export class MongoCompactor { } if (this.updates.length + this.bucketStateUpdates.length >= this.moveBatchLimit) { - await this.flush(); + await this.flush(sessionOptions); } } @@ -291,15 +315,15 @@ export class MongoCompactor { `Inserting CLEAR at ${this.group_id}:${bucket}:${currentState.lastNotPut} to remove ${currentState.opsSincePut} operations` ); // Need flush() before clear() - await this.flush(); - await this.clearBucket(currentState); + await this.flush(sessionOptions); + await this.clearBucket(sessionOptions, currentState); } // Do this _after_ clearBucket so that we have accurate counts. this.updateBucketChecksums(currentState); // Need another flush after updateBucketChecksums() - await this.flush(); + await this.flush(sessionOptions); } /** @@ -343,7 +367,7 @@ export class MongoCompactor { }); } - private async flush() { + private async flush(sessionOptions: SessionOptions) { if (this.updates.length > 0) { logger.info(`Compacting ${this.updates.length} ops`); await this.db.bucket_data.bulkWrite(this.updates, { @@ -351,14 +375,16 @@ export class MongoCompactor { // Since checksums are not affected, these operations can happen in any order, // and it's fine if the operations are partially applied. // Each individual operation is atomic. - ordered: false + ordered: false, + ...sessionOptions }); this.updates = []; } if (this.bucketStateUpdates.length > 0) { logger.info(`Updating ${this.bucketStateUpdates.length} bucket states`); await this.db.bucket_state.bulkWrite(this.bucketStateUpdates, { - ordered: false + ordered: false, + ...sessionOptions }); this.bucketStateUpdates = []; } @@ -371,7 +397,7 @@ export class MongoCompactor { * @param bucket bucket name * @param op op_id of the last non-PUT operation, which will be converted to CLEAR. */ - private async clearBucket(currentState: CurrentBucketState) { + private async clearBucket(sessionOptions: SessionOptions, currentState: CurrentBucketState) { const bucket = currentState.bucket; const clearOp = currentState.lastNotPut!; @@ -390,7 +416,10 @@ export class MongoCompactor { } }; + // We start a new transaction here with a new session, but do want this to be up to date with the parent session. const session = this.db.client.startSession(); + session.advanceClusterTime(sessionOptions.session.clusterTime!); + session.advanceOperationTime(sessionOptions.session.operationTime!); try { let done = false; while (!done && !this.signal?.aborted) { @@ -486,8 +515,19 @@ export class MongoCompactor { */ async populateChecksums(options: { minBucketChanges: number }): Promise { let count = 0; + const session = this.db.client.startSession({ causalConsistency: true }); + await using _ = { + async [Symbol.asyncDispose]() { + await session.endSession(); + } + }; + const sessionOptions: SessionOptions = { + session, + ...DEFAULT_SESSION_OPTIONS + }; + while (!this.signal?.aborted) { - const buckets = await this.dirtyBucketBatchForChecksums(options); + const buckets = await this.dirtyBucketBatchForChecksums(sessionOptions, options); if (buckets.length == 0 || this.signal?.aborted) { // All done break; @@ -508,7 +548,10 @@ export class MongoCompactor { logger.info( `Calculating checksums for batch of ${buckets.length} buckets, estimated count of ${totalCountEstimate}` ); - await this.updateChecksumsBatch(checkBuckets.map((b) => b.bucket)); + await this.updateChecksumsBatch( + sessionOptions, + checkBuckets.map((b) => b.bucket) + ); logger.info(`Updated checksums for batch of ${checkBuckets.length} buckets in ${Date.now() - start}ms`); count += checkBuckets.length; } @@ -523,10 +566,13 @@ export class MongoCompactor { * minBucketChanges: minimum number of changes for a bucket to be included in the results. * minChangeRatio: minimum ratio of changes to total ops for a bucket to be included in the results, number between 0 and 1. */ - private async *dirtyBucketBatches(options: { - minBucketChanges: number; - minChangeRatio: number; - }): AsyncGenerator<{ bucket: string; estimatedCount: number }[]> { + private async *dirtyBucketBatches( + sessionOptions: SessionOptions, + options: { + minBucketChanges: number; + minChangeRatio: number; + } + ): AsyncGenerator<{ bucket: string; estimatedCount: number }[]> { // Previously, we used an index on {_id.g: 1, estimate_since_compact.count: 1} to only buckets with changes. // This works well if there are only a small number of buckets with changes. // However, if buckets are continuosly modified while we are compacting, we get the same buckets over and over again. @@ -582,7 +628,10 @@ export class MongoCompactor { } } ], - { maxTimeMS: MONGO_OPERATION_TIMEOUT_MS } + { + maxTimeMS: MONGO_OPERATION_TIMEOUT_MS, + ...sessionOptions + } ) .toArray(); @@ -623,9 +672,12 @@ export class MongoCompactor { * * We currently don't get new data while doing populateChecksums, so we don't need to worry about buckets changing while processing. */ - private async dirtyBucketBatchForChecksums(options: { - minBucketChanges: number; - }): Promise<{ bucket: string; estimatedCount: number }[]> { + private async dirtyBucketBatchForChecksums( + sessionOptions: SessionOptions, + options: { + minBucketChanges: number; + } + ): Promise<{ bucket: string; estimatedCount: number }[]> { if (options.minBucketChanges <= 0) { throw new ReplicationAssertionError('minBucketChanges must be >= 1'); } @@ -646,7 +698,8 @@ export class MongoCompactor { 'estimate_since_compact.count': -1 }, limit: 200, - maxTimeMS: MONGO_OPERATION_TIMEOUT_MS + maxTimeMS: MONGO_OPERATION_TIMEOUT_MS, + ...sessionOptions } ) .toArray(); @@ -657,7 +710,7 @@ export class MongoCompactor { })); } - private async updateChecksumsBatch(buckets: string[]) { + private async updateChecksumsBatch(sessionOptions: SessionOptions, buckets: string[]) { const checksums = await this.storage.checksums.computePartialChecksumsDirect( buckets.map((bucket) => { return { @@ -703,6 +756,6 @@ export class MongoCompactor { }); } - await this.flush(); + await this.flush(sessionOptions); } }