Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changeset/friendly-rings-accept.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
'@powersync/service-errors': patch
'@powersync/service-module-mongodb': patch
'@powersync/lib-service-mongodb': patch
'@powersync/service-core': patch
'@powersync/service-image': patch
---

[MongoDB] Fix resume token handling when no events are received
4 changes: 4 additions & 0 deletions libs/lib-mongodb/src/db/mongo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,7 @@ export async function waitForAuth(db: mongo.Db) {
export const isMongoServerError = (error: any): error is mongo.MongoServerError => {
return error instanceof mongo.MongoServerError || error?.name == 'MongoServerError';
};

export const isMongoNetworkTimeoutError = (error: any): error is mongo.MongoNetworkTimeoutError => {
return error instanceof mongo.MongoNetworkTimeoutError || error?.name == 'MongoNetworkTimeoutError';
};
28 changes: 28 additions & 0 deletions modules/module-mongodb/src/common/MongoLSN.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ export class MongoLSN {
};
}

static fromResumeToken(resumeToken: mongo.ResumeToken): MongoLSN {
const timestamp = parseResumeTokenTimestamp(resumeToken);
return new MongoLSN({
timestamp,
resume_token: resumeToken
});
}

static ZERO = MongoLSN.fromSerialized(ZERO_LSN);

constructor(protected options: MongoLSNSpecification) {}
Expand Down Expand Up @@ -72,3 +80,23 @@ export class MongoLSN {
return this.comparable;
}
}

/**
* Given a resumeToken in the form {_data: 'hex data'}, this parses the cluster timestamp.
* All other data in the token is ignored.
*
* @param resumeToken
* @returns a parsed timestamp
*/
export function parseResumeTokenTimestamp(resumeToken: mongo.ResumeToken): mongo.Timestamp {
const hex = (resumeToken as any)._data as string;
const buffer = Buffer.from(hex, 'hex');
const view = new DataView(buffer.buffer, buffer.byteOffset, buffer.byteLength);
if (view.getUint8(0) != 130) {
throw new Error(`Invalid resume token: ${hex}`);
}
const t = view.getUint32(1);
const i = view.getUint32(5);

return mongo.Timestamp.fromBits(i, t);
}
63 changes: 58 additions & 5 deletions modules/module-mongodb/src/replication/ChangeStream.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { mongo } from '@powersync/lib-service-mongodb';
import { isMongoNetworkTimeoutError, isMongoServerError, mongo } from '@powersync/lib-service-mongodb';
import {
container,
DatabaseConnectionError,
Expand All @@ -10,19 +10,26 @@ import {
} from '@powersync/lib-services-framework';
import { MetricsEngine, SaveOperationTag, SourceEntityDescriptor, SourceTable, storage } from '@powersync/service-core';
import { DatabaseInputRow, SqliteRow, SqlSyncRules, TablePattern } from '@powersync/service-sync-rules';
import { ReplicationMetric } from '@powersync/service-types';
import { MongoLSN } from '../common/MongoLSN.js';
import { PostImagesOption } from '../types/types.js';
import { escapeRegExp } from '../utils.js';
import { MongoManager } from './MongoManager.js';
import { constructAfterRecord, createCheckpoint, getCacheIdentifier, getMongoRelation } from './MongoRelation.js';
import { CHECKPOINTS_COLLECTION } from './replication-utils.js';
import { ReplicationMetric } from '@powersync/service-types';

export interface ChangeStreamOptions {
connections: MongoManager;
storage: storage.SyncRulesBucketStorage;
metrics: MetricsEngine;
abort_signal: AbortSignal;
/**
* Override maxAwaitTimeMS for testing.
*
* In most cases, the default of 10_000 is fine. However, for MongoDB 6.0, this can cause a delay
* in closing the stream. To cover that case, reduce the timeout for tests.
*/
maxAwaitTimeMS?: number;
}

interface InitResult {
Expand Down Expand Up @@ -56,6 +63,8 @@ export class ChangeStream {
private readonly defaultDb: mongo.Db;
private readonly metrics: MetricsEngine;

private readonly maxAwaitTimeMS: number;

private abort_signal: AbortSignal;

private relation_cache = new Map<string | number, storage.SourceTable>();
Expand All @@ -65,6 +74,7 @@ export class ChangeStream {
this.metrics = options.metrics;
this.group_id = options.storage.group_id;
this.connections = options.connections;
this.maxAwaitTimeMS = options.maxAwaitTimeMS ?? 10_000;
this.client = this.connections.client;
this.defaultDb = this.connections.db;
this.sync_rules = options.storage.getParsedSyncRules({
Expand Down Expand Up @@ -557,7 +567,7 @@ export class ChangeStream {

const streamOptions: mongo.ChangeStreamOptions = {
showExpandedEvents: true,
maxAwaitTimeMS: 200,
maxAwaitTimeMS: this.maxAwaitTimeMS,
fullDocument: fullDocument
};

Expand Down Expand Up @@ -597,20 +607,45 @@ export class ChangeStream {

let flexDbNameWorkaroundLogged = false;

let lastEmptyResume = performance.now();

while (true) {
if (this.abort_signal.aborted) {
break;
}

const originalChangeDocument = await stream.tryNext();
const originalChangeDocument = await stream.tryNext().catch((e) => {
throw mapChangeStreamError(e);
});
// The stream was closed, we will only ever receive `null` from it
if (!originalChangeDocument && stream.closed) {
break;
}

if (originalChangeDocument == null || this.abort_signal.aborted) {
if (this.abort_signal.aborted) {
break;
}

if (originalChangeDocument == null) {
// We get a new null document after `maxAwaitTimeMS` if there were no other events.
// In this case, stream.resumeToken is the resume token associated with the last response.
// stream.resumeToken is not updated if stream.tryNext() returns data, while stream.next()
// does update it.
// From observed behavior, the actual resumeToken changes around once every 10 seconds.
// If we don't update it on empty events, we do keep consistency, but resuming the stream
// with old tokens may cause connection timeouts.
// We throttle this further by only persisting a keepalive once a minute.
// We add an additional check for waitForCheckpointLsn == null, to make sure we're not
// doing a keepalive in the middle of a transaction.
if (waitForCheckpointLsn == null && performance.now() - lastEmptyResume > 60_000) {
const { comparable: lsn } = MongoLSN.fromResumeToken(stream.resumeToken);
await batch.keepalive(lsn);
await touch();
lastEmptyResume = performance.now();
}
continue;
}

await touch();

if (startAfter != null && originalChangeDocument.clusterTime?.lte(startAfter)) {
Expand Down Expand Up @@ -762,3 +797,21 @@ async function touch() {
// or reduce PING_INTERVAL here.
return container.probes.touch();
}

function mapChangeStreamError(e: any) {
if (isMongoNetworkTimeoutError(e)) {
// This typically has an unhelpful message like "connection 2 to 159.41.94.47:27017 timed out".
// We wrap the error to make it more useful.
throw new DatabaseConnectionError(ErrorCode.PSYNC_S1345, `Timeout while reading MongoDB ChangeStream`, e);
} else if (
isMongoServerError(e) &&
e.codeName == 'NoMatchingDocument' &&
e.errmsg?.includes('post-image was not found')
) {
throw new ChangeStreamInvalidatedError(e.errmsg, e);
} else if (isMongoServerError(e) && e.hasErrorLabel('NonResumableChangeStreamError')) {
throw new ChangeStreamInvalidatedError(e.message, e);
} else {
throw new DatabaseConnectionError(ErrorCode.PSYNC_S1346, `Error reading MongoDB ChangeStream`, e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,6 @@ export class ChangeStreamReplicationJob extends replication.AbstractReplicationJ
}
if (e instanceof ChangeStreamInvalidatedError) {
throw e;
} else if (isMongoServerError(e) && e.hasErrorLabel('NonResumableChangeStreamError')) {
throw new ChangeStreamInvalidatedError(e.message, e);
} else {
// Report the error if relevant, before retrying
container.reporter.captureException(e, {
Expand Down
5 changes: 4 additions & 1 deletion modules/module-mongodb/test/src/change_stream_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,10 @@ export class ChangeStreamTestContext {
storage: this.storage,
metrics: METRICS_HELPER.metricsEngine,
connections: this.connectionManager,
abort_signal: this.abortController.signal
abort_signal: this.abortController.signal,
// Specifically reduce this from the default for tests on MongoDB <= 6.0, otherwise it can take
// a long time to abort the stream.
maxAwaitTimeMS: 200
};
this._walStream = new ChangeStream(options);
return this._walStream!;
Expand Down
4 changes: 2 additions & 2 deletions modules/module-mongodb/test/src/resume.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { describe, expect, test, vi } from 'vitest';
import { ChangeStreamTestContext } from './change_stream_utils.js';
import { env } from './env.js';
import { INITIALIZED_MONGO_STORAGE_FACTORY, INITIALIZED_POSTGRES_STORAGE_FACTORY } from './util.js';
import { ChangeStreamInvalidatedError } from '@module/replication/ChangeStream.js';

describe('mongo lsn', () => {
test('LSN with resume tokens should be comparable', () => {
Expand Down Expand Up @@ -145,8 +146,7 @@ function defineResumeTest(factoryGenerator: (options?: TestStorageOptions) => Pr
context2.storage = factory.getInstance(activeContent!);

const error = await context2.startStreaming().catch((ex) => ex);
expect(error).exist;
// The ChangeStreamReplicationJob will detect this and throw a ChangeStreamInvalidatedError
expect(isMongoServerError(error) && error.hasErrorLabel('NonResumableChangeStreamError'));
expect(error).toBeInstanceOf(ChangeStreamInvalidatedError);
});
}
35 changes: 35 additions & 0 deletions modules/module-mongodb/test/src/resume_token.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { parseResumeTokenTimestamp } from '@module/common/MongoLSN.js';
import { describe, expect, it } from 'vitest';

describe('parseResumeTokenTimestamp', () => {
it('parses a valid resume token (1)', () => {
const timestamp = parseResumeTokenTimestamp({ _data: '826811D298000000012B0429296E1404' });
expect(timestamp.t).toEqual(1745998488);
expect(timestamp.i).toEqual(1);
});

it('parses a valid resume token (2)', () => {
const timestamp = parseResumeTokenTimestamp({
_data:
'8267B4B1F8000000322B042C0100296E5A10041831DD5EEE2B4D6495A610E5430872B6463C6F7065726174696F6E54797065003C7570646174650046646F63756D656E744B657900463C5F6964003C636865636B706F696E7400000004'
});
expect(timestamp.t).toEqual(1739895288);
expect(timestamp.i).toEqual(50);
});

it('parses a valid resume token (3)', () => {
const timestamp = parseResumeTokenTimestamp({
_data:
'826811D228000000022B042C0100296E5A10048725A7954ED247538A4851BAB78B0560463C6F7065726174696F6E54797065003C7570646174650046646F63756D656E744B657900463C5F6964003C636865636B706F696E7400000004'
});
expect(timestamp.t).toEqual(1745998376);
expect(timestamp.i).toEqual(2);
});

it('throws for invalid prefix', () => {
const hex = 'FF0102030405060708';
const resumeToken: any = { _data: hex };

expect(() => parseResumeTokenTimestamp(resumeToken)).toThrowError(/^Invalid resume token/);
});
});
16 changes: 16 additions & 0 deletions packages/service-errors/src/codes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,22 @@ export enum ErrorCode {
*/
PSYNC_S1344 = 'PSYNC_S1344',

/**
* Failed to read MongoDB Change Stream due to a timeout.
*
* This may happen if there is a significant delay on the source database in reading the change stream.
*
* If this is not resolved after retries, replication may need to be restarted from scratch.
*/
PSYNC_S1345 = 'PSYNC_S1345',

/**
* Failed to read MongoDB Change Stream.
*
* See the error cause for more details.
*/
PSYNC_S1346 = 'PSYNC_S1346',

// ## PSYNC_S14xx: MongoDB storage replication issues

/**
Expand Down