diff --git a/.changeset/beige-clouds-cry.md b/.changeset/beige-clouds-cry.md new file mode 100644 index 000000000..0187e9b8a --- /dev/null +++ b/.changeset/beige-clouds-cry.md @@ -0,0 +1,7 @@ +--- +'@powersync/service-module-mongodb': patch +'@powersync/service-core': patch +'@powersync/service-image': patch +--- + +[MongoDB] Fix replication batching diff --git a/modules/module-mongodb/src/api/MongoRouteAPIAdapter.ts b/modules/module-mongodb/src/api/MongoRouteAPIAdapter.ts index 40e561343..958517b99 100644 --- a/modules/module-mongodb/src/api/MongoRouteAPIAdapter.ts +++ b/modules/module-mongodb/src/api/MongoRouteAPIAdapter.ts @@ -5,7 +5,7 @@ import * as sync_rules from '@powersync/service-sync-rules'; import * as service_types from '@powersync/service-types'; import { MongoManager } from '../replication/MongoManager.js'; -import { constructAfterRecord, createCheckpoint } from '../replication/MongoRelation.js'; +import { constructAfterRecord, createCheckpoint, STANDALONE_CHECKPOINT_ID } from '../replication/MongoRelation.js'; import { CHECKPOINTS_COLLECTION } from '../replication/replication-utils.js'; import * as types from '../types/types.js'; import { escapeRegExp } from '../utils.js'; @@ -206,10 +206,6 @@ export class MongoRouteAPIAdapter implements api.RouteAPI { return undefined; } - async getReplicationHead(): Promise { - return createCheckpoint(this.client, this.db); - } - async createReplicationHead(callback: ReplicationHeadCallback): Promise { const session = this.client.startSession(); try { @@ -224,7 +220,7 @@ export class MongoRouteAPIAdapter implements api.RouteAPI { // Trigger a change on the changestream. await this.db.collection(CHECKPOINTS_COLLECTION).findOneAndUpdate( { - _id: 'checkpoint' as any + _id: STANDALONE_CHECKPOINT_ID as any }, { $inc: { i: 1 } diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index 1b01a4a28..958872216 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -15,7 +15,13 @@ import { MongoLSN } from '../common/MongoLSN.js'; import { PostImagesOption } from '../types/types.js'; import { escapeRegExp } from '../utils.js'; import { MongoManager } from './MongoManager.js'; -import { constructAfterRecord, createCheckpoint, getCacheIdentifier, getMongoRelation } from './MongoRelation.js'; +import { + constructAfterRecord, + createCheckpoint, + getCacheIdentifier, + getMongoRelation, + STANDALONE_CHECKPOINT_ID +} from './MongoRelation.js'; import { CHECKPOINTS_COLLECTION } from './replication-utils.js'; export interface ChangeStreamOptions { @@ -69,6 +75,8 @@ export class ChangeStream { private relation_cache = new Map(); + private checkpointStreamId = new mongo.ObjectId(); + constructor(options: ChangeStreamOptions) { this.storage = options.storage; this.metrics = options.metrics; @@ -247,6 +255,11 @@ export class ChangeStream { await this.defaultDb.createCollection(CHECKPOINTS_COLLECTION, { changeStreamPreAndPostImages: { enabled: true } }); + } else { + // Clear the collection on startup, to keep it clean + // We never query this collection directly, and don't want to keep the data around. + // We only use this to get data into the oplog/changestream. + await this.defaultDb.collection(CHECKPOINTS_COLLECTION).deleteMany({}); } } @@ -434,7 +447,7 @@ export class ChangeStream { await batch.truncate([result.table]); await this.snapshotTable(batch, result.table); - const no_checkpoint_before_lsn = await createCheckpoint(this.client, this.defaultDb); + const no_checkpoint_before_lsn = await createCheckpoint(this.client, this.defaultDb, STANDALONE_CHECKPOINT_ID); const [table] = await batch.markSnapshotDone([result.table], no_checkpoint_before_lsn); return table; @@ -601,7 +614,11 @@ export class ChangeStream { // Always start with a checkpoint. // This helps us to clear errors when restarting, even if there is // no data to replicate. - let waitForCheckpointLsn: string | null = await createCheckpoint(this.client, this.defaultDb); + let waitForCheckpointLsn: string | null = await createCheckpoint( + this.client, + this.defaultDb, + this.checkpointStreamId + ); let splitDocument: mongo.ChangeStreamDocument | null = null; @@ -700,13 +717,9 @@ export class ChangeStream { } } - if ( - (changeDocument.operationType == 'insert' || - changeDocument.operationType == 'update' || - changeDocument.operationType == 'replace' || - changeDocument.operationType == 'drop') && - changeDocument.ns.coll == CHECKPOINTS_COLLECTION - ) { + const ns = 'ns' in changeDocument && 'coll' in changeDocument.ns ? changeDocument.ns : undefined; + + if (ns?.coll == CHECKPOINTS_COLLECTION) { /** * Dropping the database does not provide an `invalidate` event. * We typically would receive `drop` events for the collection which we @@ -727,6 +740,29 @@ export class ChangeStream { ); } + if ( + !( + changeDocument.operationType == 'insert' || + changeDocument.operationType == 'update' || + changeDocument.operationType == 'replace' + ) + ) { + continue; + } + + // We handle two types of checkpoint events: + // 1. "Standalone" checkpoints, typically write checkpoints. We want to process these + // immediately, regardless of where they were created. + // 2. "Batch" checkpoints for the current stream. This is used as a form of dynamic rate + // limiting of commits, so we specifically want to exclude checkpoints from other streams. + // + // It may be useful to also throttle commits due to standalone checkpoints in the future. + // However, these typically have a much lower rate than batch checkpoints, so we don't do that for now. + + const checkpointId = changeDocument.documentKey._id as string | mongo.ObjectId; + if (!(checkpointId == STANDALONE_CHECKPOINT_ID || this.checkpointStreamId.equals(checkpointId))) { + continue; + } const { comparable: lsn } = new MongoLSN({ timestamp: changeDocument.clusterTime!, resume_token: changeDocument._id @@ -743,7 +779,7 @@ export class ChangeStream { changeDocument.operationType == 'delete' ) { if (waitForCheckpointLsn == null) { - waitForCheckpointLsn = await createCheckpoint(this.client, this.defaultDb); + waitForCheckpointLsn = await createCheckpoint(this.client, this.defaultDb, this.checkpointStreamId); } const rel = getMongoRelation(changeDocument.ns); const table = await this.getRelation(batch, rel, { diff --git a/modules/module-mongodb/src/replication/MongoRelation.ts b/modules/module-mongodb/src/replication/MongoRelation.ts index 62127eb48..736ff149d 100644 --- a/modules/module-mongodb/src/replication/MongoRelation.ts +++ b/modules/module-mongodb/src/replication/MongoRelation.ts @@ -147,15 +147,27 @@ function filterJsonData(data: any, depth = 0): any { } } -export async function createCheckpoint(client: mongo.MongoClient, db: mongo.Db): Promise { +/** + * Id for checkpoints not associated with any specific replication stream. + * + * Use this for write checkpoints, or any other case where we want to process + * the checkpoint immediately, and not wait for batching. + */ +export const STANDALONE_CHECKPOINT_ID = '_standalone_checkpoint'; + +export async function createCheckpoint( + client: mongo.MongoClient, + db: mongo.Db, + id: mongo.ObjectId | string +): Promise { const session = client.startSession(); try { - // Note: If multiple PowerSync instances are replicating the same source database, - // they'll modify the same checkpoint document. This is fine - it could create - // more replication load than required, but won't break anything. + // We use an unique id per process, and clear documents on startup. + // This is so that we can filter events for our own process only, and ignore + // events from other processes. await db.collection(CHECKPOINTS_COLLECTION).findOneAndUpdate( { - _id: 'checkpoint' as any + _id: id as any }, { $inc: { i: 1 } diff --git a/modules/module-mongodb/test/src/change_stream_utils.ts b/modules/module-mongodb/test/src/change_stream_utils.ts index aa7b669ae..a7a662258 100644 --- a/modules/module-mongodb/test/src/change_stream_utils.ts +++ b/modules/module-mongodb/test/src/change_stream_utils.ts @@ -12,7 +12,7 @@ import { METRICS_HELPER, test_utils } from '@powersync/service-core-tests'; import { ChangeStream, ChangeStreamOptions } from '@module/replication/ChangeStream.js'; import { MongoManager } from '@module/replication/MongoManager.js'; -import { createCheckpoint } from '@module/replication/MongoRelation.js'; +import { createCheckpoint, STANDALONE_CHECKPOINT_ID } from '@module/replication/MongoRelation.js'; import { NormalizedMongoConnectionConfig } from '@module/types/types.js'; import { TEST_CONNECTION_OPTIONS, clearTestDb } from './util.js'; @@ -160,7 +160,7 @@ export async function getClientCheckpoint( options?: { timeout?: number } ): Promise { const start = Date.now(); - const lsn = await createCheckpoint(client, db); + const lsn = await createCheckpoint(client, db, STANDALONE_CHECKPOINT_ID); // This old API needs a persisted checkpoint id. // Since we don't use LSNs anymore, the only way to get that is to wait. diff --git a/packages/service-core/src/api/RouteAPI.ts b/packages/service-core/src/api/RouteAPI.ts index 14967dba6..38a37d59f 100644 --- a/packages/service-core/src/api/RouteAPI.ts +++ b/packages/service-core/src/api/RouteAPI.ts @@ -49,11 +49,6 @@ export interface RouteAPI { */ getReplicationLag(options: ReplicationLagOptions): Promise; - /** - * Get the current LSN or equivalent replication HEAD position identifier - */ - getReplicationHead(): Promise; - /** * Get the current LSN or equivalent replication HEAD position identifier. *