From 9370f38bcacb56f3dd7c03867acf5d9591e7b388 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 27 May 2025 12:46:31 +0200 Subject: [PATCH 1/4] Ignore checkpoints from different replication streams. --- .../src/api/MongoRouteAPIAdapter.ts | 8 +-- .../src/replication/ChangeStream.ts | 60 +++++++++++++++---- .../src/replication/MongoRelation.ts | 22 +++++-- .../test/src/change_stream_utils.ts | 4 +- packages/service-core/src/api/RouteAPI.ts | 5 -- 5 files changed, 70 insertions(+), 29 deletions(-) diff --git a/modules/module-mongodb/src/api/MongoRouteAPIAdapter.ts b/modules/module-mongodb/src/api/MongoRouteAPIAdapter.ts index 40e561343..958517b99 100644 --- a/modules/module-mongodb/src/api/MongoRouteAPIAdapter.ts +++ b/modules/module-mongodb/src/api/MongoRouteAPIAdapter.ts @@ -5,7 +5,7 @@ import * as sync_rules from '@powersync/service-sync-rules'; import * as service_types from '@powersync/service-types'; import { MongoManager } from '../replication/MongoManager.js'; -import { constructAfterRecord, createCheckpoint } from '../replication/MongoRelation.js'; +import { constructAfterRecord, createCheckpoint, STANDALONE_CHECKPOINT_ID } from '../replication/MongoRelation.js'; import { CHECKPOINTS_COLLECTION } from '../replication/replication-utils.js'; import * as types from '../types/types.js'; import { escapeRegExp } from '../utils.js'; @@ -206,10 +206,6 @@ export class MongoRouteAPIAdapter implements api.RouteAPI { return undefined; } - async getReplicationHead(): Promise { - return createCheckpoint(this.client, this.db); - } - async createReplicationHead(callback: ReplicationHeadCallback): Promise { const session = this.client.startSession(); try { @@ -224,7 +220,7 @@ export class MongoRouteAPIAdapter implements api.RouteAPI { // Trigger a change on the changestream. await this.db.collection(CHECKPOINTS_COLLECTION).findOneAndUpdate( { - _id: 'checkpoint' as any + _id: STANDALONE_CHECKPOINT_ID as any }, { $inc: { i: 1 } diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index 1b01a4a28..240b0db86 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -15,7 +15,13 @@ import { MongoLSN } from '../common/MongoLSN.js'; import { PostImagesOption } from '../types/types.js'; import { escapeRegExp } from '../utils.js'; import { MongoManager } from './MongoManager.js'; -import { constructAfterRecord, createCheckpoint, getCacheIdentifier, getMongoRelation } from './MongoRelation.js'; +import { + constructAfterRecord, + createCheckpoint, + getCacheIdentifier, + getMongoRelation, + STANDALONE_CHECKPOINT_ID +} from './MongoRelation.js'; import { CHECKPOINTS_COLLECTION } from './replication-utils.js'; export interface ChangeStreamOptions { @@ -69,6 +75,8 @@ export class ChangeStream { private relation_cache = new Map(); + private checkpointStreamId = new mongo.ObjectId(); + constructor(options: ChangeStreamOptions) { this.storage = options.storage; this.metrics = options.metrics; @@ -247,6 +255,11 @@ export class ChangeStream { await this.defaultDb.createCollection(CHECKPOINTS_COLLECTION, { changeStreamPreAndPostImages: { enabled: true } }); + } else { + // Clear the collection on startup, to keep it clean + // We never query this collection directly, and don't want to keep the data around. + // We only use this to get data into the oplog/changestream. + await this.defaultDb.collection(CHECKPOINTS_COLLECTION).deleteMany({}); } } @@ -434,7 +447,7 @@ export class ChangeStream { await batch.truncate([result.table]); await this.snapshotTable(batch, result.table); - const no_checkpoint_before_lsn = await createCheckpoint(this.client, this.defaultDb); + const no_checkpoint_before_lsn = await createCheckpoint(this.client, this.defaultDb, STANDALONE_CHECKPOINT_ID); const [table] = await batch.markSnapshotDone([result.table], no_checkpoint_before_lsn); return table; @@ -601,7 +614,11 @@ export class ChangeStream { // Always start with a checkpoint. // This helps us to clear errors when restarting, even if there is // no data to replicate. - let waitForCheckpointLsn: string | null = await createCheckpoint(this.client, this.defaultDb); + let waitForCheckpointLsn: string | null = await createCheckpoint( + this.client, + this.defaultDb, + this.checkpointStreamId + ); let splitDocument: mongo.ChangeStreamDocument | null = null; @@ -700,13 +717,9 @@ export class ChangeStream { } } - if ( - (changeDocument.operationType == 'insert' || - changeDocument.operationType == 'update' || - changeDocument.operationType == 'replace' || - changeDocument.operationType == 'drop') && - changeDocument.ns.coll == CHECKPOINTS_COLLECTION - ) { + const ns = 'ns' in changeDocument && 'coll' in changeDocument.ns ? changeDocument.ns : undefined; + + if (ns?.coll == CHECKPOINTS_COLLECTION) { /** * Dropping the database does not provide an `invalidate` event. * We typically would receive `drop` events for the collection which we @@ -727,6 +740,31 @@ export class ChangeStream { ); } + if ( + !( + changeDocument.operationType == 'insert' || + changeDocument.operationType == 'update' || + changeDocument.operationType == 'replace' + ) + ) { + continue; + } + + // We handle two types of checkpoint events: + // 1. "Standalone" checkpoints, typically write checkpoints. We want to process these + // immediately, regardless of where they were created. + // 2. "Batch" checkpoints for the current stream. This is used as a form of dynamic rate + // limiting of commits, so we specifically want to exclude checkpoints from other streams. + // + // It may be useful to also throttle commits due to standalone checkpoints in the future. + // However, these typically have a much lower rate than batch checkpoints, so we don't do that for now. + + const checkpointId = changeDocument._id as string | mongo.ObjectId; + if ( + !(checkpointId == STANDALONE_CHECKPOINT_ID || this.checkpointStreamId.equals(this.checkpointStreamId)) + ) { + continue; + } const { comparable: lsn } = new MongoLSN({ timestamp: changeDocument.clusterTime!, resume_token: changeDocument._id @@ -743,7 +781,7 @@ export class ChangeStream { changeDocument.operationType == 'delete' ) { if (waitForCheckpointLsn == null) { - waitForCheckpointLsn = await createCheckpoint(this.client, this.defaultDb); + waitForCheckpointLsn = await createCheckpoint(this.client, this.defaultDb, this.checkpointStreamId); } const rel = getMongoRelation(changeDocument.ns); const table = await this.getRelation(batch, rel, { diff --git a/modules/module-mongodb/src/replication/MongoRelation.ts b/modules/module-mongodb/src/replication/MongoRelation.ts index 62127eb48..ace529d71 100644 --- a/modules/module-mongodb/src/replication/MongoRelation.ts +++ b/modules/module-mongodb/src/replication/MongoRelation.ts @@ -147,15 +147,27 @@ function filterJsonData(data: any, depth = 0): any { } } -export async function createCheckpoint(client: mongo.MongoClient, db: mongo.Db): Promise { +/** + * Id for checkpoints not associated with any specific replication stream. + * + * Use this for write checkpoints, or any other case where we want to process + * the checkpoint immeidately, and not wait for batching. + */ +export const STANDALONE_CHECKPOINT_ID = '_standalone_checkpoint'; + +export async function createCheckpoint( + client: mongo.MongoClient, + db: mongo.Db, + id: mongo.ObjectId | string +): Promise { const session = client.startSession(); try { - // Note: If multiple PowerSync instances are replicating the same source database, - // they'll modify the same checkpoint document. This is fine - it could create - // more replication load than required, but won't break anything. + // We use an unique id per process, and clear documents on startup. + // This is so that we can filter events for our own process only, and ignore + // events from other processes. await db.collection(CHECKPOINTS_COLLECTION).findOneAndUpdate( { - _id: 'checkpoint' as any + _id: id as any }, { $inc: { i: 1 } diff --git a/modules/module-mongodb/test/src/change_stream_utils.ts b/modules/module-mongodb/test/src/change_stream_utils.ts index aa7b669ae..a7a662258 100644 --- a/modules/module-mongodb/test/src/change_stream_utils.ts +++ b/modules/module-mongodb/test/src/change_stream_utils.ts @@ -12,7 +12,7 @@ import { METRICS_HELPER, test_utils } from '@powersync/service-core-tests'; import { ChangeStream, ChangeStreamOptions } from '@module/replication/ChangeStream.js'; import { MongoManager } from '@module/replication/MongoManager.js'; -import { createCheckpoint } from '@module/replication/MongoRelation.js'; +import { createCheckpoint, STANDALONE_CHECKPOINT_ID } from '@module/replication/MongoRelation.js'; import { NormalizedMongoConnectionConfig } from '@module/types/types.js'; import { TEST_CONNECTION_OPTIONS, clearTestDb } from './util.js'; @@ -160,7 +160,7 @@ export async function getClientCheckpoint( options?: { timeout?: number } ): Promise { const start = Date.now(); - const lsn = await createCheckpoint(client, db); + const lsn = await createCheckpoint(client, db, STANDALONE_CHECKPOINT_ID); // This old API needs a persisted checkpoint id. // Since we don't use LSNs anymore, the only way to get that is to wait. diff --git a/packages/service-core/src/api/RouteAPI.ts b/packages/service-core/src/api/RouteAPI.ts index 14967dba6..38a37d59f 100644 --- a/packages/service-core/src/api/RouteAPI.ts +++ b/packages/service-core/src/api/RouteAPI.ts @@ -49,11 +49,6 @@ export interface RouteAPI { */ getReplicationLag(options: ReplicationLagOptions): Promise; - /** - * Get the current LSN or equivalent replication HEAD position identifier - */ - getReplicationHead(): Promise; - /** * Get the current LSN or equivalent replication HEAD position identifier. * From ce213f954bfba9847f301def37104cac87a37220 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 28 May 2025 10:45:55 +0200 Subject: [PATCH 2/4] Add changeset. --- .changeset/beige-clouds-cry.md | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 .changeset/beige-clouds-cry.md diff --git a/.changeset/beige-clouds-cry.md b/.changeset/beige-clouds-cry.md new file mode 100644 index 000000000..0187e9b8a --- /dev/null +++ b/.changeset/beige-clouds-cry.md @@ -0,0 +1,7 @@ +--- +'@powersync/service-module-mongodb': patch +'@powersync/service-core': patch +'@powersync/service-image': patch +--- + +[MongoDB] Fix replication batching From ba2896a2c9143e86fcf1f904bd8a623d50285282 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 28 May 2025 11:02:00 +0200 Subject: [PATCH 3/4] Fix checkpoint comparison. --- modules/module-mongodb/src/replication/ChangeStream.ts | 4 +--- modules/module-mongodb/src/replication/MongoRelation.ts | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index 240b0db86..4ce1ed1a8 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -760,9 +760,7 @@ export class ChangeStream { // However, these typically have a much lower rate than batch checkpoints, so we don't do that for now. const checkpointId = changeDocument._id as string | mongo.ObjectId; - if ( - !(checkpointId == STANDALONE_CHECKPOINT_ID || this.checkpointStreamId.equals(this.checkpointStreamId)) - ) { + if (!(checkpointId == STANDALONE_CHECKPOINT_ID || this.checkpointStreamId.equals(checkpointId))) { continue; } const { comparable: lsn } = new MongoLSN({ diff --git a/modules/module-mongodb/src/replication/MongoRelation.ts b/modules/module-mongodb/src/replication/MongoRelation.ts index ace529d71..736ff149d 100644 --- a/modules/module-mongodb/src/replication/MongoRelation.ts +++ b/modules/module-mongodb/src/replication/MongoRelation.ts @@ -151,7 +151,7 @@ function filterJsonData(data: any, depth = 0): any { * Id for checkpoints not associated with any specific replication stream. * * Use this for write checkpoints, or any other case where we want to process - * the checkpoint immeidately, and not wait for batching. + * the checkpoint immediately, and not wait for batching. */ export const STANDALONE_CHECKPOINT_ID = '_standalone_checkpoint'; From 5b689f81309195196609ee43e0e585136a1beeb7 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 28 May 2025 11:43:57 +0200 Subject: [PATCH 4/4] Use documentKey._id, not the resume token. --- modules/module-mongodb/src/replication/ChangeStream.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index 4ce1ed1a8..958872216 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -759,7 +759,7 @@ export class ChangeStream { // It may be useful to also throttle commits due to standalone checkpoints in the future. // However, these typically have a much lower rate than batch checkpoints, so we don't do that for now. - const checkpointId = changeDocument._id as string | mongo.ObjectId; + const checkpointId = changeDocument.documentKey._id as string | mongo.ObjectId; if (!(checkpointId == STANDALONE_CHECKPOINT_ID || this.checkpointStreamId.equals(checkpointId))) { continue; }