Skip to content
7 changes: 7 additions & 0 deletions .changeset/polite-goats-eat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@powersync/service-errors': minor
'@powersync/service-module-mongodb': minor
'@powersync/service-image': minor
---

Added support for MongoDB resume tokens. This should help detect Change Stream error edge cases such as changing the replication connection details after replication has begun.
74 changes: 74 additions & 0 deletions modules/module-mongodb/src/common/MongoLSN.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import { mongo } from '@powersync/lib-service-mongodb';
import { storage } from '@powersync/service-core';

export type MongoLSNSpecification = {
timestamp: mongo.Timestamp;
/**
* The ResumeToken type here is an alias for `unknown`.
* The docs mention the contents should be of the form:
* ```typescript
* {
* "_data" : <BinData|string>
* }
* ```
* We use BSON serialization to store the resume token.
*/
resume_token?: mongo.ResumeToken;
};

export const ZERO_LSN = '0000000000000000';

const DELIMINATOR = '|';

/**
* Represent a Logical Sequence Number (LSN) for MongoDB replication sources.
* This stores a combination of the cluster timestamp and optional Change Stream resume token.
*/
export class MongoLSN {
static fromSerialized(comparable: string): MongoLSN {
return new MongoLSN(MongoLSN.deserialize(comparable));
}

private static deserialize(comparable: string): MongoLSNSpecification {
const [timestampString, resumeString] = comparable.split(DELIMINATOR);

const a = parseInt(timestampString.substring(0, 8), 16);
const b = parseInt(timestampString.substring(8, 16), 16);

return {
timestamp: mongo.Timestamp.fromBits(b, a),
resume_token: resumeString ? storage.deserializeBson(Buffer.from(resumeString, 'base64')).resumeToken : null
};
}

static ZERO = MongoLSN.fromSerialized(ZERO_LSN);

constructor(protected options: MongoLSNSpecification) {}

get timestamp() {
return this.options.timestamp;
}

get resumeToken() {
return this.options.resume_token;
}

get comparable() {
const { timestamp, resumeToken } = this;

const a = timestamp.high.toString(16).padStart(8, '0');
const b = timestamp.low.toString(16).padStart(8, '0');

const segments = [`${a}${b}`];

if (resumeToken) {
segments.push(storage.serializeBson({ resumeToken }).toString('base64'));
}

return segments.join(DELIMINATOR);
}

toString() {
return this.comparable;
}
}
84 changes: 59 additions & 25 deletions modules/module-mongodb/src/replication/ChangeStream.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { mongo } from '@powersync/lib-service-mongodb';
import {
container,
DatabaseConnectionError,
ErrorCode,
logger,
ReplicationAbortedError,
Expand All @@ -9,20 +10,13 @@ import {
} from '@powersync/lib-services-framework';
import { Metrics, SaveOperationTag, SourceEntityDescriptor, SourceTable, storage } from '@powersync/service-core';
import { DatabaseInputRow, SqliteRow, SqlSyncRules, TablePattern } from '@powersync/service-sync-rules';
import { MongoLSN } from '../common/MongoLSN.js';
import { PostImagesOption } from '../types/types.js';
import { escapeRegExp } from '../utils.js';
import { MongoManager } from './MongoManager.js';
import {
constructAfterRecord,
createCheckpoint,
getMongoLsn,
getMongoRelation,
mongoLsnToTimestamp
} from './MongoRelation.js';
import { constructAfterRecord, createCheckpoint, getMongoRelation } from './MongoRelation.js';
import { CHECKPOINTS_COLLECTION } from './replication-utils.js';

export const ZERO_LSN = '0000000000000000';

export interface ChangeStreamOptions {
connections: MongoManager;
storage: storage.SyncRulesBucketStorage;
Expand All @@ -41,9 +35,9 @@ interface InitResult {
* * Some change stream documents do not have postImages.
* * startAfter/resumeToken is not valid anymore.
*/
export class ChangeStreamInvalidatedError extends Error {
constructor(message: string) {
super(message);
export class ChangeStreamInvalidatedError extends DatabaseConnectionError {
constructor(message: string, cause: any) {
super(ErrorCode.PSYNC_S1344, message, cause);
}
}

Expand Down Expand Up @@ -207,7 +201,7 @@ export class ChangeStream {
const session = await this.client.startSession();
try {
await this.storage.startBatch(
{ zeroLSN: ZERO_LSN, defaultSchema: this.defaultDb.databaseName, storeCurrentData: false },
{ zeroLSN: MongoLSN.ZERO.comparable, defaultSchema: this.defaultDb.databaseName, storeCurrentData: false },
async (batch) => {
// Start by resolving all tables.
// This checks postImage configuration, and that should fail as
Expand All @@ -220,12 +214,12 @@ export class ChangeStream {

for (let table of allSourceTables) {
await this.snapshotTable(batch, table, session);
await batch.markSnapshotDone([table], ZERO_LSN);
await batch.markSnapshotDone([table], MongoLSN.ZERO.comparable);

await touch();
}

const lsn = getMongoLsn(snapshotTime);
const { comparable: lsn } = new MongoLSN({ timestamp: snapshotTime });
logger.info(`Snapshot commit at ${snapshotTime.inspect()} / ${lsn}`);
await batch.commit(lsn);
}
Expand Down Expand Up @@ -516,7 +510,7 @@ export class ChangeStream {
e.codeName == 'NoMatchingDocument' &&
e.errmsg?.includes('post-image was not found')
) {
throw new ChangeStreamInvalidatedError(e.errmsg);
throw new ChangeStreamInvalidatedError(e.errmsg, e);
}
throw e;
}
Expand All @@ -527,10 +521,13 @@ export class ChangeStream {
await this.storage.autoActivate();

await this.storage.startBatch(
{ zeroLSN: ZERO_LSN, defaultSchema: this.defaultDb.databaseName, storeCurrentData: false },
{ zeroLSN: MongoLSN.ZERO.comparable, defaultSchema: this.defaultDb.databaseName, storeCurrentData: false },
async (batch) => {
const lastLsn = batch.lastCheckpointLsn;
const startAfter = mongoLsnToTimestamp(lastLsn) ?? undefined;
const { lastCheckpointLsn } = batch;
const lastLsn = lastCheckpointLsn ? MongoLSN.fromSerialized(lastCheckpointLsn) : null;
const startAfter = lastLsn?.timestamp;
const resumeAfter = lastLsn?.resumeToken;

logger.info(`Resume streaming at ${startAfter?.inspect()} / ${lastLsn}`);

const filters = this.getSourceNamespaceFilters();
Expand All @@ -554,12 +551,21 @@ export class ChangeStream {
}

const streamOptions: mongo.ChangeStreamOptions = {
startAtOperationTime: startAfter,
showExpandedEvents: true,
useBigInt64: true,
maxAwaitTimeMS: 200,
fullDocument: fullDocument
};

/**
* Only one of these options can be supplied at a time.
*/
if (resumeAfter) {
streamOptions.resumeAfter = resumeAfter;
} else {
streamOptions.startAtOperationTime = startAfter;
}

let stream: mongo.ChangeStream<mongo.Document>;
if (filters.multipleDatabases) {
// Requires readAnyDatabase@admin on Atlas
Expand All @@ -579,7 +585,7 @@ export class ChangeStream {
});

// Always start with a checkpoint.
// This helps us to clear erorrs when restarting, even if there is
// This helps us to clear errors when restarting, even if there is
// no data to replicate.
let waitForCheckpointLsn: string | null = await createCheckpoint(this.client, this.defaultDb);

Expand All @@ -592,6 +598,11 @@ export class ChangeStream {

const originalChangeDocument = await stream.tryNext();

// The stream was closed, we will only ever receive `null` from it
if (!originalChangeDocument && stream.closed) {
break;
}

if (originalChangeDocument == null || this.abort_signal.aborted) {
continue;
}
Expand Down Expand Up @@ -626,15 +637,38 @@ export class ChangeStream {
throw new ReplicationAssertionError(`Incomplete splitEvent: ${JSON.stringify(splitDocument.splitEvent)}`);
}

// console.log('event', changeDocument);

if (
(changeDocument.operationType == 'insert' ||
changeDocument.operationType == 'update' ||
changeDocument.operationType == 'replace') &&
changeDocument.operationType == 'replace' ||
changeDocument.operationType == 'drop') &&
changeDocument.ns.coll == CHECKPOINTS_COLLECTION
) {
const lsn = getMongoLsn(changeDocument.clusterTime!);
/**
* Dropping the database does not provide an `invalidate` event.
* We typically would receive `drop` events for the collection which we
* would process below.
*
* However we don't commit the LSN after collections are dropped.
* The prevents the `startAfter` or `resumeToken` from advancing past the drop events.
* The stream also closes after the drop events.
* This causes an infinite loop of processing the collection drop events.
*
* This check here invalidates the change stream if our `_checkpoints` collection
* is dropped. This allows for detecting when the DB is dropped.
*/
if (changeDocument.operationType == 'drop') {
throw new ChangeStreamInvalidatedError(
'Internal collections have been dropped',
new Error('_checkpoints collection was dropped')
);
}

const { comparable: lsn } = new MongoLSN({
timestamp: changeDocument.clusterTime!,
resume_token: changeDocument._id
});

if (waitForCheckpointLsn != null && lsn >= waitForCheckpointLsn) {
waitForCheckpointLsn = null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { mongo } from '@powersync/lib-service-mongodb';
import { isMongoServerError } from '@powersync/lib-service-mongodb';
import { container } from '@powersync/lib-services-framework';
import { replication } from '@powersync/service-core';

Expand Down Expand Up @@ -85,8 +85,8 @@ export class ChangeStreamReplicationJob extends replication.AbstractReplicationJ
}
if (e instanceof ChangeStreamInvalidatedError) {
throw e;
} else if (e instanceof mongo.MongoError && e.hasErrorLabel('NonResumableChangeStreamError')) {
throw new ChangeStreamInvalidatedError(e.message);
} else if (isMongoServerError(e) && e.hasErrorLabel('NonResumableChangeStreamError')) {
throw new ChangeStreamInvalidatedError(e.message, e);
} else {
// Report the error if relevant, before retrying
container.reporter.captureException(e, {
Expand Down
20 changes: 3 additions & 17 deletions modules/module-mongodb/src/replication/MongoRelation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ import { storage } from '@powersync/service-core';
import { JSONBig, JsonContainer } from '@powersync/service-jsonbig';
import { SqliteRow, SqliteValue } from '@powersync/service-sync-rules';

import { CHECKPOINTS_COLLECTION } from './replication-utils.js';
import { ErrorCode, ServiceError } from '@powersync/lib-services-framework';
import { MongoLSN } from '../common/MongoLSN.js';
import { CHECKPOINTS_COLLECTION } from './replication-utils.js';

export function getMongoRelation(source: mongo.ChangeStreamNameSpace): storage.SourceEntityDescriptor {
return {
Expand All @@ -15,21 +16,6 @@ export function getMongoRelation(source: mongo.ChangeStreamNameSpace): storage.S
} satisfies storage.SourceEntityDescriptor;
}

export function getMongoLsn(timestamp: mongo.Timestamp) {
const a = timestamp.high.toString(16).padStart(8, '0');
const b = timestamp.low.toString(16).padStart(8, '0');
return a + b;
}

export function mongoLsnToTimestamp(lsn: string | null) {
if (lsn == null) {
return null;
}
const a = parseInt(lsn.substring(0, 8), 16);
const b = parseInt(lsn.substring(8, 16), 16);
return mongo.Timestamp.fromBits(b, a);
}

export function constructAfterRecord(document: mongo.Document): SqliteRow {
let record: SqliteRow = {};
for (let key of Object.keys(document)) {
Expand Down Expand Up @@ -174,7 +160,7 @@ export async function createCheckpoint(client: mongo.MongoClient, db: mongo.Db):
);
const time = session.operationTime!;
// TODO: Use the above when we support custom write checkpoints
return getMongoLsn(time);
return new MongoLSN({ timestamp: time }).comparable;
} finally {
await session.endSession();
}
Expand Down
2 changes: 1 addition & 1 deletion modules/module-mongodb/test/src/change_stream_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ export class ChangeStreamTestContext {
}

startStreaming() {
this.streamPromise = this.walStream.streamChanges();
return (this.streamPromise = this.walStream.streamChanges());
}

async getCheckpoint(options?: { timeout?: number }) {
Expand Down
Loading