Skip to content

Commit 08f6ae8

Browse files
authored
Merge pull request #251 from powersync-ja/fix-changestream-resumetoken
[MongoDB] Fix resume token handling when no events are received
1 parent 192eaf5 commit 08f6ae8

File tree

9 files changed

+156
-10
lines changed

9 files changed

+156
-10
lines changed
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
---
2+
'@powersync/service-errors': patch
3+
'@powersync/service-module-mongodb': patch
4+
'@powersync/lib-service-mongodb': patch
5+
'@powersync/service-core': patch
6+
'@powersync/service-image': patch
7+
---
8+
9+
[MongoDB] Fix resume token handling when no events are received

libs/lib-mongodb/src/db/mongo.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,3 +88,7 @@ export async function waitForAuth(db: mongo.Db) {
8888
export const isMongoServerError = (error: any): error is mongo.MongoServerError => {
8989
return error instanceof mongo.MongoServerError || error?.name == 'MongoServerError';
9090
};
91+
92+
export const isMongoNetworkTimeoutError = (error: any): error is mongo.MongoNetworkTimeoutError => {
93+
return error instanceof mongo.MongoNetworkTimeoutError || error?.name == 'MongoNetworkTimeoutError';
94+
};

modules/module-mongodb/src/common/MongoLSN.ts

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,14 @@ export class MongoLSN {
4141
};
4242
}
4343

44+
static fromResumeToken(resumeToken: mongo.ResumeToken): MongoLSN {
45+
const timestamp = parseResumeTokenTimestamp(resumeToken);
46+
return new MongoLSN({
47+
timestamp,
48+
resume_token: resumeToken
49+
});
50+
}
51+
4452
static ZERO = MongoLSN.fromSerialized(ZERO_LSN);
4553

4654
constructor(protected options: MongoLSNSpecification) {}
@@ -72,3 +80,23 @@ export class MongoLSN {
7280
return this.comparable;
7381
}
7482
}
83+
84+
/**
85+
* Given a resumeToken in the form {_data: 'hex data'}, this parses the cluster timestamp.
86+
* All other data in the token is ignored.
87+
*
88+
* @param resumeToken
89+
* @returns a parsed timestamp
90+
*/
91+
export function parseResumeTokenTimestamp(resumeToken: mongo.ResumeToken): mongo.Timestamp {
92+
const hex = (resumeToken as any)._data as string;
93+
const buffer = Buffer.from(hex, 'hex');
94+
const view = new DataView(buffer.buffer, buffer.byteOffset, buffer.byteLength);
95+
if (view.getUint8(0) != 130) {
96+
throw new Error(`Invalid resume token: ${hex}`);
97+
}
98+
const t = view.getUint32(1);
99+
const i = view.getUint32(5);
100+
101+
return mongo.Timestamp.fromBits(i, t);
102+
}

modules/module-mongodb/src/replication/ChangeStream.ts

Lines changed: 58 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { mongo } from '@powersync/lib-service-mongodb';
1+
import { isMongoNetworkTimeoutError, isMongoServerError, mongo } from '@powersync/lib-service-mongodb';
22
import {
33
container,
44
DatabaseConnectionError,
@@ -10,19 +10,26 @@ import {
1010
} from '@powersync/lib-services-framework';
1111
import { MetricsEngine, SaveOperationTag, SourceEntityDescriptor, SourceTable, storage } from '@powersync/service-core';
1212
import { DatabaseInputRow, SqliteRow, SqlSyncRules, TablePattern } from '@powersync/service-sync-rules';
13+
import { ReplicationMetric } from '@powersync/service-types';
1314
import { MongoLSN } from '../common/MongoLSN.js';
1415
import { PostImagesOption } from '../types/types.js';
1516
import { escapeRegExp } from '../utils.js';
1617
import { MongoManager } from './MongoManager.js';
1718
import { constructAfterRecord, createCheckpoint, getCacheIdentifier, getMongoRelation } from './MongoRelation.js';
1819
import { CHECKPOINTS_COLLECTION } from './replication-utils.js';
19-
import { ReplicationMetric } from '@powersync/service-types';
2020

2121
export interface ChangeStreamOptions {
2222
connections: MongoManager;
2323
storage: storage.SyncRulesBucketStorage;
2424
metrics: MetricsEngine;
2525
abort_signal: AbortSignal;
26+
/**
27+
* Override maxAwaitTimeMS for testing.
28+
*
29+
* In most cases, the default of 10_000 is fine. However, for MongoDB 6.0, this can cause a delay
30+
* in closing the stream. To cover that case, reduce the timeout for tests.
31+
*/
32+
maxAwaitTimeMS?: number;
2633
}
2734

2835
interface InitResult {
@@ -56,6 +63,8 @@ export class ChangeStream {
5663
private readonly defaultDb: mongo.Db;
5764
private readonly metrics: MetricsEngine;
5865

66+
private readonly maxAwaitTimeMS: number;
67+
5968
private abort_signal: AbortSignal;
6069

6170
private relation_cache = new Map<string | number, storage.SourceTable>();
@@ -65,6 +74,7 @@ export class ChangeStream {
6574
this.metrics = options.metrics;
6675
this.group_id = options.storage.group_id;
6776
this.connections = options.connections;
77+
this.maxAwaitTimeMS = options.maxAwaitTimeMS ?? 10_000;
6878
this.client = this.connections.client;
6979
this.defaultDb = this.connections.db;
7080
this.sync_rules = options.storage.getParsedSyncRules({
@@ -557,7 +567,7 @@ export class ChangeStream {
557567

558568
const streamOptions: mongo.ChangeStreamOptions = {
559569
showExpandedEvents: true,
560-
maxAwaitTimeMS: 200,
570+
maxAwaitTimeMS: this.maxAwaitTimeMS,
561571
fullDocument: fullDocument
562572
};
563573

@@ -597,20 +607,45 @@ export class ChangeStream {
597607

598608
let flexDbNameWorkaroundLogged = false;
599609

610+
let lastEmptyResume = performance.now();
611+
600612
while (true) {
601613
if (this.abort_signal.aborted) {
602614
break;
603615
}
604616

605-
const originalChangeDocument = await stream.tryNext();
617+
const originalChangeDocument = await stream.tryNext().catch((e) => {
618+
throw mapChangeStreamError(e);
619+
});
606620
// The stream was closed, we will only ever receive `null` from it
607621
if (!originalChangeDocument && stream.closed) {
608622
break;
609623
}
610624

611-
if (originalChangeDocument == null || this.abort_signal.aborted) {
625+
if (this.abort_signal.aborted) {
626+
break;
627+
}
628+
629+
if (originalChangeDocument == null) {
630+
// We get a new null document after `maxAwaitTimeMS` if there were no other events.
631+
// In this case, stream.resumeToken is the resume token associated with the last response.
632+
// stream.resumeToken is not updated if stream.tryNext() returns data, while stream.next()
633+
// does update it.
634+
// From observed behavior, the actual resumeToken changes around once every 10 seconds.
635+
// If we don't update it on empty events, we do keep consistency, but resuming the stream
636+
// with old tokens may cause connection timeouts.
637+
// We throttle this further by only persisting a keepalive once a minute.
638+
// We add an additional check for waitForCheckpointLsn == null, to make sure we're not
639+
// doing a keepalive in the middle of a transaction.
640+
if (waitForCheckpointLsn == null && performance.now() - lastEmptyResume > 60_000) {
641+
const { comparable: lsn } = MongoLSN.fromResumeToken(stream.resumeToken);
642+
await batch.keepalive(lsn);
643+
await touch();
644+
lastEmptyResume = performance.now();
645+
}
612646
continue;
613647
}
648+
614649
await touch();
615650

616651
if (startAfter != null && originalChangeDocument.clusterTime?.lte(startAfter)) {
@@ -762,3 +797,21 @@ async function touch() {
762797
// or reduce PING_INTERVAL here.
763798
return container.probes.touch();
764799
}
800+
801+
function mapChangeStreamError(e: any) {
802+
if (isMongoNetworkTimeoutError(e)) {
803+
// This typically has an unhelpful message like "connection 2 to 159.41.94.47:27017 timed out".
804+
// We wrap the error to make it more useful.
805+
throw new DatabaseConnectionError(ErrorCode.PSYNC_S1345, `Timeout while reading MongoDB ChangeStream`, e);
806+
} else if (
807+
isMongoServerError(e) &&
808+
e.codeName == 'NoMatchingDocument' &&
809+
e.errmsg?.includes('post-image was not found')
810+
) {
811+
throw new ChangeStreamInvalidatedError(e.errmsg, e);
812+
} else if (isMongoServerError(e) && e.hasErrorLabel('NonResumableChangeStreamError')) {
813+
throw new ChangeStreamInvalidatedError(e.message, e);
814+
} else {
815+
throw new DatabaseConnectionError(ErrorCode.PSYNC_S1346, `Error reading MongoDB ChangeStream`, e);
816+
}
817+
}

modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,6 @@ export class ChangeStreamReplicationJob extends replication.AbstractReplicationJ
8686
}
8787
if (e instanceof ChangeStreamInvalidatedError) {
8888
throw e;
89-
} else if (isMongoServerError(e) && e.hasErrorLabel('NonResumableChangeStreamError')) {
90-
throw new ChangeStreamInvalidatedError(e.message, e);
9189
} else {
9290
// Report the error if relevant, before retrying
9391
container.reporter.captureException(e, {

modules/module-mongodb/test/src/change_stream_utils.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,10 @@ export class ChangeStreamTestContext {
8585
storage: this.storage,
8686
metrics: METRICS_HELPER.metricsEngine,
8787
connections: this.connectionManager,
88-
abort_signal: this.abortController.signal
88+
abort_signal: this.abortController.signal,
89+
// Specifically reduce this from the default for tests on MongoDB <= 6.0, otherwise it can take
90+
// a long time to abort the stream.
91+
maxAwaitTimeMS: 200
8992
};
9093
this._walStream = new ChangeStream(options);
9194
return this._walStream!;

modules/module-mongodb/test/src/resume.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { describe, expect, test, vi } from 'vitest';
88
import { ChangeStreamTestContext } from './change_stream_utils.js';
99
import { env } from './env.js';
1010
import { INITIALIZED_MONGO_STORAGE_FACTORY, INITIALIZED_POSTGRES_STORAGE_FACTORY } from './util.js';
11+
import { ChangeStreamInvalidatedError } from '@module/replication/ChangeStream.js';
1112

1213
describe('mongo lsn', () => {
1314
test('LSN with resume tokens should be comparable', () => {
@@ -145,8 +146,7 @@ function defineResumeTest(factoryGenerator: (options?: TestStorageOptions) => Pr
145146
context2.storage = factory.getInstance(activeContent!);
146147

147148
const error = await context2.startStreaming().catch((ex) => ex);
148-
expect(error).exist;
149149
// The ChangeStreamReplicationJob will detect this and throw a ChangeStreamInvalidatedError
150-
expect(isMongoServerError(error) && error.hasErrorLabel('NonResumableChangeStreamError'));
150+
expect(error).toBeInstanceOf(ChangeStreamInvalidatedError);
151151
});
152152
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import { parseResumeTokenTimestamp } from '@module/common/MongoLSN.js';
2+
import { describe, expect, it } from 'vitest';
3+
4+
describe('parseResumeTokenTimestamp', () => {
5+
it('parses a valid resume token (1)', () => {
6+
const timestamp = parseResumeTokenTimestamp({ _data: '826811D298000000012B0429296E1404' });
7+
expect(timestamp.t).toEqual(1745998488);
8+
expect(timestamp.i).toEqual(1);
9+
});
10+
11+
it('parses a valid resume token (2)', () => {
12+
const timestamp = parseResumeTokenTimestamp({
13+
_data:
14+
'8267B4B1F8000000322B042C0100296E5A10041831DD5EEE2B4D6495A610E5430872B6463C6F7065726174696F6E54797065003C7570646174650046646F63756D656E744B657900463C5F6964003C636865636B706F696E7400000004'
15+
});
16+
expect(timestamp.t).toEqual(1739895288);
17+
expect(timestamp.i).toEqual(50);
18+
});
19+
20+
it('parses a valid resume token (3)', () => {
21+
const timestamp = parseResumeTokenTimestamp({
22+
_data:
23+
'826811D228000000022B042C0100296E5A10048725A7954ED247538A4851BAB78B0560463C6F7065726174696F6E54797065003C7570646174650046646F63756D656E744B657900463C5F6964003C636865636B706F696E7400000004'
24+
});
25+
expect(timestamp.t).toEqual(1745998376);
26+
expect(timestamp.i).toEqual(2);
27+
});
28+
29+
it('throws for invalid prefix', () => {
30+
const hex = 'FF0102030405060708';
31+
const resumeToken: any = { _data: hex };
32+
33+
expect(() => parseResumeTokenTimestamp(resumeToken)).toThrowError(/^Invalid resume token/);
34+
});
35+
});

packages/service-errors/src/codes.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,22 @@ export enum ErrorCode {
253253
*/
254254
PSYNC_S1344 = 'PSYNC_S1344',
255255

256+
/**
257+
* Failed to read MongoDB Change Stream due to a timeout.
258+
*
259+
* This may happen if there is a significant delay on the source database in reading the change stream.
260+
*
261+
* If this is not resolved after retries, replication may need to be restarted from scratch.
262+
*/
263+
PSYNC_S1345 = 'PSYNC_S1345',
264+
265+
/**
266+
* Failed to read MongoDB Change Stream.
267+
*
268+
* See the error cause for more details.
269+
*/
270+
PSYNC_S1346 = 'PSYNC_S1346',
271+
256272
// ## PSYNC_S14xx: MongoDB storage replication issues
257273

258274
/**

0 commit comments

Comments
 (0)