diff --git a/.changeset/seven-mangos-sleep.md b/.changeset/seven-mangos-sleep.md new file mode 100644 index 00000000..e15a4bc1 --- /dev/null +++ b/.changeset/seven-mangos-sleep.md @@ -0,0 +1,7 @@ +--- +'@powersync/service-module-mongodb-storage': patch +'@powersync/service-core': patch +'@powersync/service-image': patch +--- + +[MongoDB Storage] Only compact modified buckets. Add partial index on bucket_state to handle large numbers of buckets when pre-calculating checksums or compacting. diff --git a/modules/module-mongodb-storage/src/migrations/db/migrations/1760433882550-bucket-state-index2.ts b/modules/module-mongodb-storage/src/migrations/db/migrations/1760433882550-bucket-state-index2.ts new file mode 100644 index 00000000..e9fd01c6 --- /dev/null +++ b/modules/module-mongodb-storage/src/migrations/db/migrations/1760433882550-bucket-state-index2.ts @@ -0,0 +1,34 @@ +import { migrations } from '@powersync/service-core'; +import * as storage from '../../../storage/storage-index.js'; +import { MongoStorageConfig } from '../../../types/types.js'; + +const INDEX_NAME = 'dirty_buckets'; + +export const up: migrations.PowerSyncMigrationFunction = async (context) => { + const { + service_context: { configuration } + } = context; + const db = storage.createPowerSyncMongo(configuration.storage as MongoStorageConfig); + + try { + await db.createBucketStateIndex2(); + } finally { + await db.client.close(); + } +}; + +export const down: migrations.PowerSyncMigrationFunction = async (context) => { + const { + service_context: { configuration } + } = context; + + const db = storage.createPowerSyncMongo(configuration.storage as MongoStorageConfig); + + try { + if (await db.bucket_state.indexExists(INDEX_NAME)) { + await db.bucket_state.dropIndex(INDEX_NAME); + } + } finally { + await db.client.close(); + } +}; diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts index e81d566c..b1f2deb4 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts @@ -99,11 +99,19 @@ export class MongoCompactor { await this.compactInternal(bucket); } } else { - await this.compactInternal(undefined); + await this.compactDirtyBuckets(); } } - async compactInternal(bucket: string | undefined) { + private async compactDirtyBuckets() { + for await (let buckets of this.iterateDirtyBuckets()) { + for (let bucket of buckets) { + await this.compactInternal(bucket); + } + } + } + + private async compactInternal(bucket: string | undefined) { const idLimitBytes = this.idLimitBytes; let currentState: CurrentBucketState | null = null; @@ -483,6 +491,16 @@ export class MongoCompactor { * Subset of compact, only populating checksums where relevant. */ async populateChecksums() { + for await (let buckets of this.iterateDirtyBuckets()) { + const start = Date.now(); + logger.info(`Calculating checksums for batch of ${buckets.length} buckets, starting at ${buckets[0]}`); + + await this.updateChecksumsBatch(buckets); + logger.info(`Updated checksums for batch of ${buckets.length} buckets in ${Date.now() - start}ms`); + } + } + + private async *iterateDirtyBuckets(): AsyncGenerator { // This is updated after each batch let lowerBound: BucketStateDocument['_id'] = { g: this.group_id, @@ -500,10 +518,11 @@ export class MongoCompactor { $gt: lowerBound, $lt: upperBound }, - compacted_state: { $exists: false } + // Partial index exists on this + 'estimate_since_compact.count': { $gt: 0 } }; - const bucketsWithoutChecksums = await this.db.bucket_state + const dirtyBuckets = await this.db.bucket_state .find(filter, { projection: { _id: 1 @@ -512,19 +531,19 @@ export class MongoCompactor { _id: 1 }, limit: 5_000, - maxTimeMS: MONGO_OPERATION_TIMEOUT_MS + maxTimeMS: MONGO_OPERATION_TIMEOUT_MS, + // Make sure we use the partial index + hint: 'dirty_buckets' }) .toArray(); - if (bucketsWithoutChecksums.length == 0) { - // All done + + if (dirtyBuckets.length == 0) { break; } - logger.info(`Calculating checksums for batch of ${bucketsWithoutChecksums.length} buckets`); - - await this.updateChecksumsBatch(bucketsWithoutChecksums.map((b) => b._id.b)); + yield dirtyBuckets.map((bucket) => bucket._id.b); - lowerBound = bucketsWithoutChecksums[bucketsWithoutChecksums.length - 1]._id; + lowerBound = dirtyBuckets[dirtyBuckets.length - 1]._id; } } @@ -559,6 +578,10 @@ export class MongoCompactor { count: bucketChecksum.count, checksum: BigInt(bucketChecksum.checksum), bytes: null + }, + estimate_since_compact: { + count: 0, + bytes: 0 } } }, diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoTestStorageFactoryGenerator.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoTestStorageFactoryGenerator.ts index 4aa149bf..d85354be 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoTestStorageFactoryGenerator.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoTestStorageFactoryGenerator.ts @@ -25,6 +25,7 @@ export const MongoTestStorageFactoryGenerator = (factoryOptions: MongoTestStorag // Full migrations are not currently run for tests, so we manually create the important ones await db.createCheckpointEventsCollection(); await db.createBucketStateIndex(); + await db.createBucketStateIndex2(); return new MongoBucketStorage(db, { slot_name_prefix: 'test_' }, factoryOptions.internalOptions); }; diff --git a/modules/module-mongodb-storage/src/storage/implementation/db.ts b/modules/module-mongodb-storage/src/storage/implementation/db.ts index e6b08352..326d138b 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/db.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/db.ts @@ -141,6 +141,19 @@ export class PowerSyncMongo { { name: 'bucket_updates', unique: true } ); } + /** + * Only use in migrations and tests. + */ + async createBucketStateIndex2() { + // TODO: Implement a better mechanism to use migrations in tests + await this.bucket_state.createIndex( + { + _id: 1, + 'estimate_since_compact.count': 1 + }, + { name: 'dirty_buckets', partialFilterExpression: { 'estimate_since_compact.count': { $gt: 0 } } } + ); + } } export function createPowerSyncMongo(config: MongoStorageConfig, options?: lib_mongo.MongoConnectionOptions) {