From cb6a5b5cf3016f88190e0a7f4ea1632d1f8b9711 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 30 Apr 2025 10:16:34 +0200 Subject: [PATCH 1/7] Persist resumeTokens without events. --- modules/module-mongodb/src/common/MongoLSN.ts | 28 ++++++++++++++ .../src/replication/ChangeStream.ts | 25 ++++++++++++- .../test/src/resume_token.test.ts | 37 +++++++++++++++++++ 3 files changed, 89 insertions(+), 1 deletion(-) create mode 100644 modules/module-mongodb/test/src/resume_token.test.ts diff --git a/modules/module-mongodb/src/common/MongoLSN.ts b/modules/module-mongodb/src/common/MongoLSN.ts index a91fc3f11..6fc347bc5 100644 --- a/modules/module-mongodb/src/common/MongoLSN.ts +++ b/modules/module-mongodb/src/common/MongoLSN.ts @@ -41,6 +41,14 @@ export class MongoLSN { }; } + static fromResumeToken(resumeToken: mongo.ResumeToken): MongoLSN { + const timestamp = parseResumeTokenTimestamp(resumeToken); + return new MongoLSN({ + timestamp, + resume_token: resumeToken + }); + } + static ZERO = MongoLSN.fromSerialized(ZERO_LSN); constructor(protected options: MongoLSNSpecification) {} @@ -72,3 +80,23 @@ export class MongoLSN { return this.comparable; } } + +/** + * Given a resumeToken in the form {_data: 'hex data'}, this parses the cluster timestamp. + * All other data in the token is ignored. + * + * @param resumeToken + * @returns a parsed timestamp + */ +export function parseResumeTokenTimestamp(resumeToken: mongo.ResumeToken): mongo.Timestamp { + const hex = (resumeToken as any)._data as string; + const buffer = Buffer.from(hex, 'hex'); + const view = new DataView(buffer.buffer, buffer.byteOffset, buffer.byteLength); + if (view.getUint8(0) != 130) { + throw new Error(`Invalid resume token: ${hex}`); + } + const t = view.getUint32(1); + const i = view.getUint32(5); + + return mongo.Timestamp.fromBits(i, t); +} diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index 3a47ca435..f57dd8184 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -597,6 +597,8 @@ export class ChangeStream { let flexDbNameWorkaroundLogged = false; + let lastEmptyResume = performance.now(); + while (true) { if (this.abort_signal.aborted) { break; @@ -608,9 +610,30 @@ export class ChangeStream { break; } - if (originalChangeDocument == null || this.abort_signal.aborted) { + if (this.abort_signal.aborted) { + break; + } + + if (originalChangeDocument == null) { + // We get a new null document after `maxAwaitTimeMS` if there were no other events. + // In this case, stream.resumeToken is the resume token associated with the last response. + // stream.resumeToken is not updated if stream.tryNext() returns data, while stream.next() + // does update it. + // From observed behavior, the actual resumeToken changes around once every 10 seconds. + // If we don't update it on empty events, we do keep consistency, but resuming the stream + // with old tokens may cause connection timeouts. + // We throttle this further by only persisting a keepalive once a minute. + // We add an additional check for waitForCheckpointLsn == null, to make sure we're not + // doing a keepalive in the middle of a transaction. + if (waitForCheckpointLsn == null && performance.now() - lastEmptyResume > 60_000) { + const { comparable: lsn } = MongoLSN.fromResumeToken(stream.resumeToken); + await batch.keepalive(lsn); + await touch(); + lastEmptyResume = performance.now(); + } continue; } + await touch(); if (startAfter != null && originalChangeDocument.clusterTime?.lte(startAfter)) { diff --git a/modules/module-mongodb/test/src/resume_token.test.ts b/modules/module-mongodb/test/src/resume_token.test.ts new file mode 100644 index 000000000..e9a3fe46b --- /dev/null +++ b/modules/module-mongodb/test/src/resume_token.test.ts @@ -0,0 +1,37 @@ +// Write tests for parseResumeTokenTimestamp +import { parseResumeTokenTimestamp } from '@module/common/MongoLSN.js'; +import { mongo } from '@powersync/lib-service-mongodb'; +import { describe, expect, it } from 'vitest'; + +describe('parseResumeTokenTimestamp', () => { + it('parses a valid resume token (1)', () => { + const timestamp = parseResumeTokenTimestamp({ _data: '826811D298000000012B0429296E1404' }); + expect(timestamp.t).toEqual(1745998488); + expect(timestamp.i).toEqual(1); + }); + + it('parses a valid resume token (2)', () => { + const timestamp = parseResumeTokenTimestamp({ + _data: + '8267B4B1F8000000322B042C0100296E5A10041831DD5EEE2B4D6495A610E5430872B6463C6F7065726174696F6E54797065003C7570646174650046646F63756D656E744B657900463C5F6964003C636865636B706F696E7400000004' + }); + expect(timestamp.t).toEqual(1739895288); + expect(timestamp.i).toEqual(50); + }); + + it('parses a valid resume token (3)', () => { + const timestamp = parseResumeTokenTimestamp({ + _data: + '826811D228000000022B042C0100296E5A10048725A7954ED247538A4851BAB78B0560463C6F7065726174696F6E54797065003C7570646174650046646F63756D656E744B657900463C5F6964003C636865636B706F696E7400000004' + }); + expect(timestamp.t).toEqual(1745998376); + expect(timestamp.i).toEqual(2); + }); + + it('throws for invalid prefix', () => { + const hex = 'FF0102030405060708'; + const resumeToken: any = { _data: hex }; + + expect(() => parseResumeTokenTimestamp(resumeToken)).toThrowError(/^Invalid resume token/); + }); +}); From ced3fa72aa79b1ce9a35c9e525d21a1babf41c0f Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 30 Apr 2025 10:17:03 +0200 Subject: [PATCH 2/7] Increase maxAwaitTimeMS to reduce overhead. --- modules/module-mongodb/src/replication/ChangeStream.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index f57dd8184..5ed0c12e7 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -557,7 +557,7 @@ export class ChangeStream { const streamOptions: mongo.ChangeStreamOptions = { showExpandedEvents: true, - maxAwaitTimeMS: 200, + maxAwaitTimeMS: 10_000, fullDocument: fullDocument }; From 3b2bcc5312d3c9ab16786de3d4f6baebeab6b67c Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 30 Apr 2025 10:42:39 +0200 Subject: [PATCH 3/7] Improve error message when the change stream request times out. --- libs/lib-mongodb/src/db/mongo.ts | 4 ++++ .../src/replication/ChangeStream.ts | 14 +++++++++++--- packages/service-errors/src/codes.ts | 16 ++++++++++++++++ 3 files changed, 31 insertions(+), 3 deletions(-) diff --git a/libs/lib-mongodb/src/db/mongo.ts b/libs/lib-mongodb/src/db/mongo.ts index c91b80fd0..5062043b0 100644 --- a/libs/lib-mongodb/src/db/mongo.ts +++ b/libs/lib-mongodb/src/db/mongo.ts @@ -88,3 +88,7 @@ export async function waitForAuth(db: mongo.Db) { export const isMongoServerError = (error: any): error is mongo.MongoServerError => { return error instanceof mongo.MongoServerError || error?.name == 'MongoServerError'; }; + +export const isMongoNetworkTimeoutError = (error: any): error is mongo.MongoNetworkTimeoutError => { + return error instanceof mongo.MongoNetworkTimeoutError || error?.name == 'MongoNetworkTimeoutError'; +}; diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index 5ed0c12e7..58ad58cb8 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -1,4 +1,4 @@ -import { mongo } from '@powersync/lib-service-mongodb'; +import { isMongoNetworkTimeoutError, mongo } from '@powersync/lib-service-mongodb'; import { container, DatabaseConnectionError, @@ -10,13 +10,13 @@ import { } from '@powersync/lib-services-framework'; import { MetricsEngine, SaveOperationTag, SourceEntityDescriptor, SourceTable, storage } from '@powersync/service-core'; import { DatabaseInputRow, SqliteRow, SqlSyncRules, TablePattern } from '@powersync/service-sync-rules'; +import { ReplicationMetric } from '@powersync/service-types'; import { MongoLSN } from '../common/MongoLSN.js'; import { PostImagesOption } from '../types/types.js'; import { escapeRegExp } from '../utils.js'; import { MongoManager } from './MongoManager.js'; import { constructAfterRecord, createCheckpoint, getCacheIdentifier, getMongoRelation } from './MongoRelation.js'; import { CHECKPOINTS_COLLECTION } from './replication-utils.js'; -import { ReplicationMetric } from '@powersync/service-types'; export interface ChangeStreamOptions { connections: MongoManager; @@ -604,7 +604,15 @@ export class ChangeStream { break; } - const originalChangeDocument = await stream.tryNext(); + const originalChangeDocument = await stream.tryNext().catch((e) => { + if (isMongoNetworkTimeoutError(e)) { + // This typically has an unhelpful message like "connection 2 to 159.41.94.47:27017 timed out". + // We wrap the error to make it more useful. + throw new DatabaseConnectionError(ErrorCode.PSYNC_S1345, `Timeout while reading MongoDB ChangeStream`, e); + } else { + throw new DatabaseConnectionError(ErrorCode.PSYNC_S1346, `Error reading MongoDB ChangeStream`, e); + } + }); // The stream was closed, we will only ever receive `null` from it if (!originalChangeDocument && stream.closed) { break; diff --git a/packages/service-errors/src/codes.ts b/packages/service-errors/src/codes.ts index 55c83c31a..d44da05ec 100644 --- a/packages/service-errors/src/codes.ts +++ b/packages/service-errors/src/codes.ts @@ -253,6 +253,22 @@ export enum ErrorCode { */ PSYNC_S1344 = 'PSYNC_S1344', + /** + * Failed to read MongoDB Change Stream to to a timeout. + * + * This may happen if there is a significant delay on the source database in reading the change stream. + * + * If this is not resolved after retries, replication may need to be restarted from scratch. + */ + PSYNC_S1345 = 'PSYNC_S1345', + + /** + * Failed to read MongoDB Change Stream. + * + * See the error cause for more details. + */ + PSYNC_S1346 = 'PSYNC_S1346', + // ## PSYNC_S14xx: MongoDB storage replication issues /** From 8335e29ccfda2f7cd464cac61ba48522f0207142 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 30 Apr 2025 10:50:21 +0200 Subject: [PATCH 4/7] Add changeset. --- .changeset/friendly-rings-accept.md | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 .changeset/friendly-rings-accept.md diff --git a/.changeset/friendly-rings-accept.md b/.changeset/friendly-rings-accept.md new file mode 100644 index 000000000..bdf24dec8 --- /dev/null +++ b/.changeset/friendly-rings-accept.md @@ -0,0 +1,9 @@ +--- +'@powersync/service-errors': patch +'@powersync/service-module-mongodb': patch +'@powersync/lib-service-mongodb': patch +'@powersync/service-core': patch +'@powersync/service-image': patch +--- + +[MongoDB] Fix resume token handling when no events are received From 42f45f28fe6ecd88f61fbed51e214214d0a59ad9 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 30 Apr 2025 12:55:24 +0200 Subject: [PATCH 5/7] Fix error mapping for ChangeStreamInvalidatedError. --- .../src/replication/ChangeStream.ts | 28 +++++++++++++------ .../replication/ChangeStreamReplicationJob.ts | 2 -- .../module-mongodb/test/src/resume.test.ts | 4 +-- 3 files changed, 22 insertions(+), 12 deletions(-) diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index 58ad58cb8..ce0a199e7 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -1,4 +1,4 @@ -import { isMongoNetworkTimeoutError, mongo } from '@powersync/lib-service-mongodb'; +import { isMongoNetworkTimeoutError, isMongoServerError, mongo } from '@powersync/lib-service-mongodb'; import { container, DatabaseConnectionError, @@ -605,13 +605,7 @@ export class ChangeStream { } const originalChangeDocument = await stream.tryNext().catch((e) => { - if (isMongoNetworkTimeoutError(e)) { - // This typically has an unhelpful message like "connection 2 to 159.41.94.47:27017 timed out". - // We wrap the error to make it more useful. - throw new DatabaseConnectionError(ErrorCode.PSYNC_S1345, `Timeout while reading MongoDB ChangeStream`, e); - } else { - throw new DatabaseConnectionError(ErrorCode.PSYNC_S1346, `Error reading MongoDB ChangeStream`, e); - } + throw mapChangeStreamError(e); }); // The stream was closed, we will only ever receive `null` from it if (!originalChangeDocument && stream.closed) { @@ -793,3 +787,21 @@ async function touch() { // or reduce PING_INTERVAL here. return container.probes.touch(); } + +function mapChangeStreamError(e: any) { + if (isMongoNetworkTimeoutError(e)) { + // This typically has an unhelpful message like "connection 2 to 159.41.94.47:27017 timed out". + // We wrap the error to make it more useful. + throw new DatabaseConnectionError(ErrorCode.PSYNC_S1345, `Timeout while reading MongoDB ChangeStream`, e); + } else if ( + isMongoServerError(e) && + e.codeName == 'NoMatchingDocument' && + e.errmsg?.includes('post-image was not found') + ) { + throw new ChangeStreamInvalidatedError(e.errmsg, e); + } else if (isMongoServerError(e) && e.hasErrorLabel('NonResumableChangeStreamError')) { + throw new ChangeStreamInvalidatedError(e.message, e); + } else { + throw new DatabaseConnectionError(ErrorCode.PSYNC_S1346, `Error reading MongoDB ChangeStream`, e); + } +} diff --git a/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts b/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts index c8e8fc8f0..497c48307 100644 --- a/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts +++ b/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts @@ -86,8 +86,6 @@ export class ChangeStreamReplicationJob extends replication.AbstractReplicationJ } if (e instanceof ChangeStreamInvalidatedError) { throw e; - } else if (isMongoServerError(e) && e.hasErrorLabel('NonResumableChangeStreamError')) { - throw new ChangeStreamInvalidatedError(e.message, e); } else { // Report the error if relevant, before retrying container.reporter.captureException(e, { diff --git a/modules/module-mongodb/test/src/resume.test.ts b/modules/module-mongodb/test/src/resume.test.ts index f5e3944ac..b89622be6 100644 --- a/modules/module-mongodb/test/src/resume.test.ts +++ b/modules/module-mongodb/test/src/resume.test.ts @@ -8,6 +8,7 @@ import { describe, expect, test, vi } from 'vitest'; import { ChangeStreamTestContext } from './change_stream_utils.js'; import { env } from './env.js'; import { INITIALIZED_MONGO_STORAGE_FACTORY, INITIALIZED_POSTGRES_STORAGE_FACTORY } from './util.js'; +import { ChangeStreamInvalidatedError } from '@module/replication/ChangeStream.js'; describe('mongo lsn', () => { test('LSN with resume tokens should be comparable', () => { @@ -145,8 +146,7 @@ function defineResumeTest(factoryGenerator: (options?: TestStorageOptions) => Pr context2.storage = factory.getInstance(activeContent!); const error = await context2.startStreaming().catch((ex) => ex); - expect(error).exist; // The ChangeStreamReplicationJob will detect this and throw a ChangeStreamInvalidatedError - expect(isMongoServerError(error) && error.hasErrorLabel('NonResumableChangeStreamError')); + expect(error).toBeInstanceOf(ChangeStreamInvalidatedError); }); } From 7bdaaa020ffa6a858aeaea88864b6d42997b22aa Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 30 Apr 2025 12:58:19 +0200 Subject: [PATCH 6/7] Address review comments. --- modules/module-mongodb/test/src/resume_token.test.ts | 2 -- packages/service-errors/src/codes.ts | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/modules/module-mongodb/test/src/resume_token.test.ts b/modules/module-mongodb/test/src/resume_token.test.ts index e9a3fe46b..aa7a85858 100644 --- a/modules/module-mongodb/test/src/resume_token.test.ts +++ b/modules/module-mongodb/test/src/resume_token.test.ts @@ -1,6 +1,4 @@ -// Write tests for parseResumeTokenTimestamp import { parseResumeTokenTimestamp } from '@module/common/MongoLSN.js'; -import { mongo } from '@powersync/lib-service-mongodb'; import { describe, expect, it } from 'vitest'; describe('parseResumeTokenTimestamp', () => { diff --git a/packages/service-errors/src/codes.ts b/packages/service-errors/src/codes.ts index d44da05ec..d1eef75d0 100644 --- a/packages/service-errors/src/codes.ts +++ b/packages/service-errors/src/codes.ts @@ -254,7 +254,7 @@ export enum ErrorCode { PSYNC_S1344 = 'PSYNC_S1344', /** - * Failed to read MongoDB Change Stream to to a timeout. + * Failed to read MongoDB Change Stream due to a timeout. * * This may happen if there is a significant delay on the source database in reading the change stream. * From 23dd6be322c69decdf9ad9e0ec0483451f6e9bd5 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 30 Apr 2025 13:33:30 +0200 Subject: [PATCH 7/7] Reduce maxAwaitTimeMS in tests to cover MongoDB 6.0. --- .../module-mongodb/src/replication/ChangeStream.ts | 12 +++++++++++- .../module-mongodb/test/src/change_stream_utils.ts | 5 ++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index ce0a199e7..1b01a4a28 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -23,6 +23,13 @@ export interface ChangeStreamOptions { storage: storage.SyncRulesBucketStorage; metrics: MetricsEngine; abort_signal: AbortSignal; + /** + * Override maxAwaitTimeMS for testing. + * + * In most cases, the default of 10_000 is fine. However, for MongoDB 6.0, this can cause a delay + * in closing the stream. To cover that case, reduce the timeout for tests. + */ + maxAwaitTimeMS?: number; } interface InitResult { @@ -56,6 +63,8 @@ export class ChangeStream { private readonly defaultDb: mongo.Db; private readonly metrics: MetricsEngine; + private readonly maxAwaitTimeMS: number; + private abort_signal: AbortSignal; private relation_cache = new Map(); @@ -65,6 +74,7 @@ export class ChangeStream { this.metrics = options.metrics; this.group_id = options.storage.group_id; this.connections = options.connections; + this.maxAwaitTimeMS = options.maxAwaitTimeMS ?? 10_000; this.client = this.connections.client; this.defaultDb = this.connections.db; this.sync_rules = options.storage.getParsedSyncRules({ @@ -557,7 +567,7 @@ export class ChangeStream { const streamOptions: mongo.ChangeStreamOptions = { showExpandedEvents: true, - maxAwaitTimeMS: 10_000, + maxAwaitTimeMS: this.maxAwaitTimeMS, fullDocument: fullDocument }; diff --git a/modules/module-mongodb/test/src/change_stream_utils.ts b/modules/module-mongodb/test/src/change_stream_utils.ts index 3ef945e95..bdff3da83 100644 --- a/modules/module-mongodb/test/src/change_stream_utils.ts +++ b/modules/module-mongodb/test/src/change_stream_utils.ts @@ -85,7 +85,10 @@ export class ChangeStreamTestContext { storage: this.storage, metrics: METRICS_HELPER.metricsEngine, connections: this.connectionManager, - abort_signal: this.abortController.signal + abort_signal: this.abortController.signal, + // Specifically reduce this from the default for tests on MongoDB <= 6.0, otherwise it can take + // a long time to abort the stream. + maxAwaitTimeMS: 200 }; this._walStream = new ChangeStream(options); return this._walStream!;