diff --git a/.changeset/hot-pets-itch.md b/.changeset/hot-pets-itch.md new file mode 100644 index 000000000..947bdb05f --- /dev/null +++ b/.changeset/hot-pets-itch.md @@ -0,0 +1,6 @@ +--- +'@powersync/service-module-mongodb-storage': patch +'@powersync/service-module-mongodb': patch +--- + +Improve intial replication performance for MongoDB by avoiding sessions. diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts index ce9fca320..0a30859ce 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts @@ -270,7 +270,7 @@ export class MongoBucketBatch } } - return resumeBatch; + return resumeBatch?.hasData() ? resumeBatch : null; } private saveOperation( diff --git a/modules/module-mongodb-storage/src/storage/implementation/OperationBatch.ts b/modules/module-mongodb-storage/src/storage/implementation/OperationBatch.ts index 886f98e78..43772a46c 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/OperationBatch.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/OperationBatch.ts @@ -41,6 +41,10 @@ export class OperationBatch { return this.batch.length >= MAX_BATCH_COUNT || this.currentSize > MAX_RECORD_BATCH_SIZE; } + hasData() { + return this.length > 0; + } + /** * * @param sizes Map of source key to estimated size of the current_data document, or undefined if current_data is not persisted. diff --git a/modules/module-mongodb-storage/src/storage/implementation/PersistedBatch.ts b/modules/module-mongodb-storage/src/storage/implementation/PersistedBatch.ts index 4eaae0d60..29441fe61 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/PersistedBatch.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/PersistedBatch.ts @@ -245,6 +245,7 @@ export class PersistedBatch { } async flush(db: PowerSyncMongo, session: mongo.ClientSession) { + const startAt = performance.now(); if (this.bucketData.length > 0) { await db.bucket_data.bulkWrite(this.bucketData, { session, @@ -267,10 +268,11 @@ export class PersistedBatch { }); } + const duration = performance.now() - startAt; logger.info( `powersync_${this.group_id} Flushed ${this.bucketData.length} + ${this.bucketParameters.length} + ${ this.currentData.length - } updates, ${Math.round(this.currentSize / 1024)}kb. Last op_id: ${this.debugLastOpId}` + } updates, ${Math.round(this.currentSize / 1024)}kb in ${duration.toFixed(0)}ms. Last op_id: ${this.debugLastOpId}` ); this.bucketData = []; diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index e7930942f..4bb5c828f 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -8,7 +8,14 @@ import { ReplicationAssertionError, ServiceError } from '@powersync/lib-services-framework'; -import { Metrics, SaveOperationTag, SourceEntityDescriptor, SourceTable, storage } from '@powersync/service-core'; +import { + BSON_DESERIALIZE_DATA_OPTIONS, + Metrics, + SaveOperationTag, + SourceEntityDescriptor, + SourceTable, + storage +} from '@powersync/service-core'; import { DatabaseInputRow, SqliteRow, SqlSyncRules, TablePattern } from '@powersync/service-sync-rules'; import { MongoLSN } from '../common/MongoLSN.js'; import { PostImagesOption } from '../types/types.js'; @@ -193,39 +200,31 @@ export class ChangeStream { // Not known where this would happen apart from the above cases throw new ReplicationAssertionError('MongoDB lastWrite timestamp not found.'); } - // We previously used {snapshot: true} for the snapshot session. - // While it gives nice consistency guarantees, it fails when the - // snapshot takes longer than 5 minutes, due to minSnapshotHistoryWindowInSeconds - // expiring the snapshot. - const session = await this.client.startSession(); - try { - await this.storage.startBatch( - { zeroLSN: MongoLSN.ZERO.comparable, defaultSchema: this.defaultDb.databaseName, storeCurrentData: false }, - async (batch) => { - // Start by resolving all tables. - // This checks postImage configuration, and that should fail as - // earlier as possible. - let allSourceTables: SourceTable[] = []; - for (let tablePattern of sourceTables) { - const tables = await this.resolveQualifiedTableNames(batch, tablePattern); - allSourceTables.push(...tables); - } - for (let table of allSourceTables) { - await this.snapshotTable(batch, table, session); - await batch.markSnapshotDone([table], MongoLSN.ZERO.comparable); + await this.storage.startBatch( + { zeroLSN: MongoLSN.ZERO.comparable, defaultSchema: this.defaultDb.databaseName, storeCurrentData: false }, + async (batch) => { + // Start by resolving all tables. + // This checks postImage configuration, and that should fail as + // earlier as possible. + let allSourceTables: SourceTable[] = []; + for (let tablePattern of sourceTables) { + const tables = await this.resolveQualifiedTableNames(batch, tablePattern); + allSourceTables.push(...tables); + } - await touch(); - } + for (let table of allSourceTables) { + await this.snapshotTable(batch, table); + await batch.markSnapshotDone([table], MongoLSN.ZERO.comparable); - const { comparable: lsn } = new MongoLSN({ timestamp: snapshotTime }); - logger.info(`${this.logPrefix} Snapshot commit at ${snapshotTime.inspect()} / ${lsn}`); - await batch.commit(lsn); + await touch(); } - ); - } finally { - session.endSession(); - } + + const { comparable: lsn } = new MongoLSN({ timestamp: snapshotTime }); + logger.info(`${this.logPrefix} Snapshot commit at ${snapshotTime.inspect()} / ${lsn}`); + await batch.commit(lsn); + } + ); } private async setupCheckpointsCollection() { @@ -283,48 +282,52 @@ export class ChangeStream { } } - private async snapshotTable( - batch: storage.BucketStorageBatch, - table: storage.SourceTable, - session?: mongo.ClientSession - ) { + private async snapshotTable(batch: storage.BucketStorageBatch, table: storage.SourceTable) { logger.info(`${this.logPrefix} Replicating ${table.qualifiedName}`); const estimatedCount = await this.estimatedCount(table); let at = 0; - let lastLogIndex = 0; - const db = this.client.db(table.schema); const collection = db.collection(table.table); - const query = collection.find({}, { session, readConcern: { level: 'majority' } }); - - const cursor = query.stream(); - - for await (let document of cursor) { - if (this.abort_signal.aborted) { - throw new ReplicationAbortedError(`Aborted initial replication`); - } - - const record = constructAfterRecord(document); + const cursor = collection.find({}, { batchSize: 6_000, readConcern: 'majority' }); + + let lastBatch = performance.now(); + // hasNext() is the call that triggers fetching of the next batch, + // then we read it with readBufferedDocuments(). This gives us semi-explicit + // control over the fetching of each batch, and avoids a separate promise per document + let hasNextPromise = cursor.hasNext(); + while (await hasNextPromise) { + const docBatch = cursor.readBufferedDocuments(); + // Pre-fetch next batch, so that we can read and write concurrently + hasNextPromise = cursor.hasNext(); + for (let document of docBatch) { + if (this.abort_signal.aborted) { + throw new ReplicationAbortedError(`Aborted initial replication`); + } - // This auto-flushes when the batch reaches its size limit - await batch.save({ - tag: SaveOperationTag.INSERT, - sourceTable: table, - before: undefined, - beforeReplicaId: undefined, - after: record, - afterReplicaId: document._id - }); + const record = constructAfterRecord(document); - at += 1; - if (at - lastLogIndex >= 5000) { - logger.info(`${this.logPrefix} Replicating ${table.qualifiedName} ${at}/${estimatedCount}`); - lastLogIndex = at; + // This auto-flushes when the batch reaches its size limit + await batch.save({ + tag: SaveOperationTag.INSERT, + sourceTable: table, + before: undefined, + beforeReplicaId: undefined, + after: record, + afterReplicaId: document._id + }); } - Metrics.getInstance().rows_replicated_total.add(1); + at += docBatch.length; + Metrics.getInstance().rows_replicated_total.add(docBatch.length); + const duration = performance.now() - lastBatch; + lastBatch = performance.now(); + logger.info( + `${this.logPrefix} Replicating ${table.qualifiedName} ${at}/${estimatedCount} in ${duration.toFixed(0)}ms` + ); await touch(); } + // In case the loop was interrupted, make sure we await the last promise. + await hasNextPromise; await batch.flush(); logger.info(`${this.logPrefix} Replicated ${at} documents for ${table.qualifiedName}`);