Skip to content

Commit d053e84

Browse files
[Feature] MongoDB Resume Tokens (#196)
1 parent a4e387c commit d053e84

File tree

8 files changed

+312
-46
lines changed

8 files changed

+312
-46
lines changed

.changeset/polite-goats-eat.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
'@powersync/service-errors': minor
3+
'@powersync/service-module-mongodb': minor
4+
'@powersync/service-image': minor
5+
---
6+
7+
Added support for MongoDB resume tokens. This should help detect Change Stream error edge cases such as changing the replication connection details after replication has begun.
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
import { mongo } from '@powersync/lib-service-mongodb';
2+
import { storage } from '@powersync/service-core';
3+
4+
export type MongoLSNSpecification = {
5+
timestamp: mongo.Timestamp;
6+
/**
7+
* The ResumeToken type here is an alias for `unknown`.
8+
* The docs mention the contents should be of the form:
9+
* ```typescript
10+
* {
11+
* "_data" : <BinData|string>
12+
* }
13+
* ```
14+
* We use BSON serialization to store the resume token.
15+
*/
16+
resume_token?: mongo.ResumeToken;
17+
};
18+
19+
export const ZERO_LSN = '0000000000000000';
20+
21+
const DELIMINATOR = '|';
22+
23+
/**
24+
* Represent a Logical Sequence Number (LSN) for MongoDB replication sources.
25+
* This stores a combination of the cluster timestamp and optional Change Stream resume token.
26+
*/
27+
export class MongoLSN {
28+
static fromSerialized(comparable: string): MongoLSN {
29+
return new MongoLSN(MongoLSN.deserialize(comparable));
30+
}
31+
32+
private static deserialize(comparable: string): MongoLSNSpecification {
33+
const [timestampString, resumeString] = comparable.split(DELIMINATOR);
34+
35+
const a = parseInt(timestampString.substring(0, 8), 16);
36+
const b = parseInt(timestampString.substring(8, 16), 16);
37+
38+
return {
39+
timestamp: mongo.Timestamp.fromBits(b, a),
40+
resume_token: resumeString ? storage.deserializeBson(Buffer.from(resumeString, 'base64')).resumeToken : null
41+
};
42+
}
43+
44+
static ZERO = MongoLSN.fromSerialized(ZERO_LSN);
45+
46+
constructor(protected options: MongoLSNSpecification) {}
47+
48+
get timestamp() {
49+
return this.options.timestamp;
50+
}
51+
52+
get resumeToken() {
53+
return this.options.resume_token;
54+
}
55+
56+
get comparable() {
57+
const { timestamp, resumeToken } = this;
58+
59+
const a = timestamp.high.toString(16).padStart(8, '0');
60+
const b = timestamp.low.toString(16).padStart(8, '0');
61+
62+
const segments = [`${a}${b}`];
63+
64+
if (resumeToken) {
65+
segments.push(storage.serializeBson({ resumeToken }).toString('base64'));
66+
}
67+
68+
return segments.join(DELIMINATOR);
69+
}
70+
71+
toString() {
72+
return this.comparable;
73+
}
74+
}

modules/module-mongodb/src/replication/ChangeStream.ts

Lines changed: 59 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { mongo } from '@powersync/lib-service-mongodb';
22
import {
33
container,
4+
DatabaseConnectionError,
45
ErrorCode,
56
logger,
67
ReplicationAbortedError,
@@ -9,20 +10,13 @@ import {
910
} from '@powersync/lib-services-framework';
1011
import { Metrics, SaveOperationTag, SourceEntityDescriptor, SourceTable, storage } from '@powersync/service-core';
1112
import { DatabaseInputRow, SqliteRow, SqlSyncRules, TablePattern } from '@powersync/service-sync-rules';
13+
import { MongoLSN } from '../common/MongoLSN.js';
1214
import { PostImagesOption } from '../types/types.js';
1315
import { escapeRegExp } from '../utils.js';
1416
import { MongoManager } from './MongoManager.js';
15-
import {
16-
constructAfterRecord,
17-
createCheckpoint,
18-
getMongoLsn,
19-
getMongoRelation,
20-
mongoLsnToTimestamp
21-
} from './MongoRelation.js';
17+
import { constructAfterRecord, createCheckpoint, getMongoRelation } from './MongoRelation.js';
2218
import { CHECKPOINTS_COLLECTION } from './replication-utils.js';
2319

24-
export const ZERO_LSN = '0000000000000000';
25-
2620
export interface ChangeStreamOptions {
2721
connections: MongoManager;
2822
storage: storage.SyncRulesBucketStorage;
@@ -41,9 +35,9 @@ interface InitResult {
4135
* * Some change stream documents do not have postImages.
4236
* * startAfter/resumeToken is not valid anymore.
4337
*/
44-
export class ChangeStreamInvalidatedError extends Error {
45-
constructor(message: string) {
46-
super(message);
38+
export class ChangeStreamInvalidatedError extends DatabaseConnectionError {
39+
constructor(message: string, cause: any) {
40+
super(ErrorCode.PSYNC_S1344, message, cause);
4741
}
4842
}
4943

@@ -207,7 +201,7 @@ export class ChangeStream {
207201
const session = await this.client.startSession();
208202
try {
209203
await this.storage.startBatch(
210-
{ zeroLSN: ZERO_LSN, defaultSchema: this.defaultDb.databaseName, storeCurrentData: false },
204+
{ zeroLSN: MongoLSN.ZERO.comparable, defaultSchema: this.defaultDb.databaseName, storeCurrentData: false },
211205
async (batch) => {
212206
// Start by resolving all tables.
213207
// This checks postImage configuration, and that should fail as
@@ -220,12 +214,12 @@ export class ChangeStream {
220214

221215
for (let table of allSourceTables) {
222216
await this.snapshotTable(batch, table, session);
223-
await batch.markSnapshotDone([table], ZERO_LSN);
217+
await batch.markSnapshotDone([table], MongoLSN.ZERO.comparable);
224218

225219
await touch();
226220
}
227221

228-
const lsn = getMongoLsn(snapshotTime);
222+
const { comparable: lsn } = new MongoLSN({ timestamp: snapshotTime });
229223
logger.info(`Snapshot commit at ${snapshotTime.inspect()} / ${lsn}`);
230224
await batch.commit(lsn);
231225
}
@@ -516,7 +510,7 @@ export class ChangeStream {
516510
e.codeName == 'NoMatchingDocument' &&
517511
e.errmsg?.includes('post-image was not found')
518512
) {
519-
throw new ChangeStreamInvalidatedError(e.errmsg);
513+
throw new ChangeStreamInvalidatedError(e.errmsg, e);
520514
}
521515
throw e;
522516
}
@@ -527,10 +521,13 @@ export class ChangeStream {
527521
await this.storage.autoActivate();
528522

529523
await this.storage.startBatch(
530-
{ zeroLSN: ZERO_LSN, defaultSchema: this.defaultDb.databaseName, storeCurrentData: false },
524+
{ zeroLSN: MongoLSN.ZERO.comparable, defaultSchema: this.defaultDb.databaseName, storeCurrentData: false },
531525
async (batch) => {
532-
const lastLsn = batch.lastCheckpointLsn;
533-
const startAfter = mongoLsnToTimestamp(lastLsn) ?? undefined;
526+
const { lastCheckpointLsn } = batch;
527+
const lastLsn = lastCheckpointLsn ? MongoLSN.fromSerialized(lastCheckpointLsn) : null;
528+
const startAfter = lastLsn?.timestamp;
529+
const resumeAfter = lastLsn?.resumeToken;
530+
534531
logger.info(`Resume streaming at ${startAfter?.inspect()} / ${lastLsn}`);
535532

536533
const filters = this.getSourceNamespaceFilters();
@@ -554,12 +551,21 @@ export class ChangeStream {
554551
}
555552

556553
const streamOptions: mongo.ChangeStreamOptions = {
557-
startAtOperationTime: startAfter,
558554
showExpandedEvents: true,
559555
useBigInt64: true,
560556
maxAwaitTimeMS: 200,
561557
fullDocument: fullDocument
562558
};
559+
560+
/**
561+
* Only one of these options can be supplied at a time.
562+
*/
563+
if (resumeAfter) {
564+
streamOptions.resumeAfter = resumeAfter;
565+
} else {
566+
streamOptions.startAtOperationTime = startAfter;
567+
}
568+
563569
let stream: mongo.ChangeStream<mongo.Document>;
564570
if (filters.multipleDatabases) {
565571
// Requires readAnyDatabase@admin on Atlas
@@ -579,7 +585,7 @@ export class ChangeStream {
579585
});
580586

581587
// Always start with a checkpoint.
582-
// This helps us to clear erorrs when restarting, even if there is
588+
// This helps us to clear errors when restarting, even if there is
583589
// no data to replicate.
584590
let waitForCheckpointLsn: string | null = await createCheckpoint(this.client, this.defaultDb);
585591

@@ -592,6 +598,11 @@ export class ChangeStream {
592598

593599
const originalChangeDocument = await stream.tryNext();
594600

601+
// The stream was closed, we will only ever receive `null` from it
602+
if (!originalChangeDocument && stream.closed) {
603+
break;
604+
}
605+
595606
if (originalChangeDocument == null || this.abort_signal.aborted) {
596607
continue;
597608
}
@@ -626,15 +637,38 @@ export class ChangeStream {
626637
throw new ReplicationAssertionError(`Incomplete splitEvent: ${JSON.stringify(splitDocument.splitEvent)}`);
627638
}
628639

629-
// console.log('event', changeDocument);
630-
631640
if (
632641
(changeDocument.operationType == 'insert' ||
633642
changeDocument.operationType == 'update' ||
634-
changeDocument.operationType == 'replace') &&
643+
changeDocument.operationType == 'replace' ||
644+
changeDocument.operationType == 'drop') &&
635645
changeDocument.ns.coll == CHECKPOINTS_COLLECTION
636646
) {
637-
const lsn = getMongoLsn(changeDocument.clusterTime!);
647+
/**
648+
* Dropping the database does not provide an `invalidate` event.
649+
* We typically would receive `drop` events for the collection which we
650+
* would process below.
651+
*
652+
* However we don't commit the LSN after collections are dropped.
653+
* The prevents the `startAfter` or `resumeToken` from advancing past the drop events.
654+
* The stream also closes after the drop events.
655+
* This causes an infinite loop of processing the collection drop events.
656+
*
657+
* This check here invalidates the change stream if our `_checkpoints` collection
658+
* is dropped. This allows for detecting when the DB is dropped.
659+
*/
660+
if (changeDocument.operationType == 'drop') {
661+
throw new ChangeStreamInvalidatedError(
662+
'Internal collections have been dropped',
663+
new Error('_checkpoints collection was dropped')
664+
);
665+
}
666+
667+
const { comparable: lsn } = new MongoLSN({
668+
timestamp: changeDocument.clusterTime!,
669+
resume_token: changeDocument._id
670+
});
671+
638672
if (waitForCheckpointLsn != null && lsn >= waitForCheckpointLsn) {
639673
waitForCheckpointLsn = null;
640674
}

modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { mongo } from '@powersync/lib-service-mongodb';
1+
import { isMongoServerError } from '@powersync/lib-service-mongodb';
22
import { container } from '@powersync/lib-services-framework';
33
import { replication } from '@powersync/service-core';
44

@@ -85,8 +85,8 @@ export class ChangeStreamReplicationJob extends replication.AbstractReplicationJ
8585
}
8686
if (e instanceof ChangeStreamInvalidatedError) {
8787
throw e;
88-
} else if (e instanceof mongo.MongoError && e.hasErrorLabel('NonResumableChangeStreamError')) {
89-
throw new ChangeStreamInvalidatedError(e.message);
88+
} else if (isMongoServerError(e) && e.hasErrorLabel('NonResumableChangeStreamError')) {
89+
throw new ChangeStreamInvalidatedError(e.message, e);
9090
} else {
9191
// Report the error if relevant, before retrying
9292
container.reporter.captureException(e, {

modules/module-mongodb/src/replication/MongoRelation.ts

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ import { storage } from '@powersync/service-core';
33
import { JSONBig, JsonContainer } from '@powersync/service-jsonbig';
44
import { SqliteRow, SqliteValue } from '@powersync/service-sync-rules';
55

6-
import { CHECKPOINTS_COLLECTION } from './replication-utils.js';
76
import { ErrorCode, ServiceError } from '@powersync/lib-services-framework';
7+
import { MongoLSN } from '../common/MongoLSN.js';
8+
import { CHECKPOINTS_COLLECTION } from './replication-utils.js';
89

910
export function getMongoRelation(source: mongo.ChangeStreamNameSpace): storage.SourceEntityDescriptor {
1011
return {
@@ -15,21 +16,6 @@ export function getMongoRelation(source: mongo.ChangeStreamNameSpace): storage.S
1516
} satisfies storage.SourceEntityDescriptor;
1617
}
1718

18-
export function getMongoLsn(timestamp: mongo.Timestamp) {
19-
const a = timestamp.high.toString(16).padStart(8, '0');
20-
const b = timestamp.low.toString(16).padStart(8, '0');
21-
return a + b;
22-
}
23-
24-
export function mongoLsnToTimestamp(lsn: string | null) {
25-
if (lsn == null) {
26-
return null;
27-
}
28-
const a = parseInt(lsn.substring(0, 8), 16);
29-
const b = parseInt(lsn.substring(8, 16), 16);
30-
return mongo.Timestamp.fromBits(b, a);
31-
}
32-
3319
export function constructAfterRecord(document: mongo.Document): SqliteRow {
3420
let record: SqliteRow = {};
3521
for (let key of Object.keys(document)) {
@@ -174,7 +160,7 @@ export async function createCheckpoint(client: mongo.MongoClient, db: mongo.Db):
174160
);
175161
const time = session.operationTime!;
176162
// TODO: Use the above when we support custom write checkpoints
177-
return getMongoLsn(time);
163+
return new MongoLSN({ timestamp: time }).comparable;
178164
} finally {
179165
await session.endSession();
180166
}

modules/module-mongodb/test/src/change_stream_utils.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ export class ChangeStreamTestContext {
8585
}
8686

8787
startStreaming() {
88-
this.streamPromise = this.walStream.streamChanges();
88+
return (this.streamPromise = this.walStream.streamChanges());
8989
}
9090

9191
async getCheckpoint(options?: { timeout?: number }) {

0 commit comments

Comments
 (0)