From 81b8cbbf52d67de14ff94f7482bd3c9da77230e7 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 12 Nov 2024 14:21:14 +0200 Subject: [PATCH 1/3] Automatically clear errors when restarting replication. --- .../src/replication/ChangeStream.ts | 5 ++- .../test/src/change_stream.test.ts | 38 ++++++++++++++++++- 2 files changed, 41 insertions(+), 2 deletions(-) diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index 250217c95..290711a50 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -564,7 +564,10 @@ export class ChangeStream { stream.close(); }); - let waitForCheckpointLsn: string | null = null; + // Always start with a checkpoint. + // This helps us to clear erorrs when restarting, even if there is + // no data to replicate. + let waitForCheckpointLsn: string | null = await createCheckpoint(this.client, this.defaultDb); while (true) { if (this.abort_signal.aborted) { diff --git a/modules/module-mongodb/test/src/change_stream.test.ts b/modules/module-mongodb/test/src/change_stream.test.ts index bff0a6857..ddab9b8f0 100644 --- a/modules/module-mongodb/test/src/change_stream.test.ts +++ b/modules/module-mongodb/test/src/change_stream.test.ts @@ -4,7 +4,7 @@ import { BucketStorageFactory } from '@powersync/service-core'; import * as crypto from 'crypto'; import * as mongo from 'mongodb'; import { setTimeout } from 'node:timers/promises'; -import { describe, expect, test } from 'vitest'; +import { describe, expect, test, vi } from 'vitest'; import { ChangeStreamTestContext } from './change_stream_utils.js'; import { PostImagesOption } from '@module/types/types.js'; @@ -460,4 +460,40 @@ bucket_definitions: message: expect.stringContaining('stream was configured to require a post-image for all update events') }); }); + + test('recover from error', async () => { + await using context = await ChangeStreamTestContext.open(factory); + const { db } = context; + await context.updateSyncRules(` +bucket_definitions: + global: + data: + - SELECT _id as id, description, num FROM "test_data"`); + + await db.createCollection('test_data', { + changeStreamPreAndPostImages: { enabled: false } + }); + + const collection = db.collection('test_data'); + await collection.insertOne({ description: 'test1', num: 1152921504606846976n }); + + await context.replicateSnapshot(); + + // Simulate an error + await context.storage!.reportError(new Error('simulated error')); + expect((await context.factory.getActiveSyncRulesContent())?.last_fatal_error).toEqual('simulated error'); + + // startStreaming() should automatically clear the error. + context.startStreaming(); + + // getBucketData() creates a checkpoint that clears the error, so we don't do that + // Just wait, and check that the error is cleared automatically. + await vi.waitUntil( + async () => { + const error = (await context.factory.getActiveSyncRulesContent())?.last_fatal_error; + return error == null; + }, + { timeout: 2_000 } + ); + }); } From b8ccb407613c104f1d0d4ce50a4f0ea43d5c6213 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 12 Nov 2024 14:32:01 +0200 Subject: [PATCH 2/3] Use commit instead of keepalive. --- modules/module-mongodb/src/replication/ChangeStream.ts | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index 290711a50..1972784e4 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -214,9 +214,7 @@ export class ChangeStream { if (snapshotTime != null) { const lsn = getMongoLsn(snapshotTime); logger.info(`Snapshot commit at ${snapshotTime.inspect()} / ${lsn}`); - // keepalive() does an auto-commit if there is data - await batch.flush(); - await batch.keepalive(lsn); + await batch.commit(lsn); } else { throw new Error(`No snapshot clusterTime available.`); } @@ -597,8 +595,7 @@ export class ChangeStream { if (waitForCheckpointLsn != null && lsn >= waitForCheckpointLsn) { waitForCheckpointLsn = null; } - await batch.flush(); - await batch.keepalive(lsn); + await batch.commit(lsn); } else if ( changeDocument.operationType == 'insert' || changeDocument.operationType == 'update' || From b4452aa67257315f3300fdb4be2a78c2222f7fca Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 12 Nov 2024 16:18:58 +0200 Subject: [PATCH 3/3] Fix initial snapshot implementation. --- .../src/replication/ChangeStream.ts | 27 ++--- .../test/src/change_stream_utils.ts | 38 +++++- .../test/src/slow_tests.test.ts | 109 ++++++++++++++++++ 3 files changed, 156 insertions(+), 18 deletions(-) create mode 100644 modules/module-mongodb/test/src/slow_tests.test.ts diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index 1972784e4..b21533b7e 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -176,19 +176,22 @@ export class ChangeStream { const sourceTables = this.sync_rules.getSourceTables(); await this.client.connect(); + // We need to get the snapshot time before taking the initial snapshot. const hello = await this.defaultDb.command({ hello: 1 }); - const startTime = hello.lastWrite?.majorityOpTime?.ts as mongo.Timestamp; + const snapshotTime = hello.lastWrite?.majorityOpTime?.ts as mongo.Timestamp; if (hello.msg == 'isdbgrid') { throw new Error('Sharded MongoDB Clusters are not supported yet (including MongoDB Serverless instances).'); } else if (hello.setName == null) { throw new Error('Standalone MongoDB instances are not supported - use a replicaset.'); - } else if (startTime == null) { + } else if (snapshotTime == null) { // Not known where this would happen apart from the above cases throw new Error('MongoDB lastWrite timestamp not found.'); } - const session = await this.client.startSession({ - snapshot: true - }); + // We previously used {snapshot: true} for the snapshot session. + // While it gives nice consistency guarantees, it fails when the + // snapshot takes longer than 5 minutes, due to minSnapshotHistoryWindowInSeconds + // expiring the snapshot. + const session = await this.client.startSession(); try { await this.storage.startBatch( { zeroLSN: ZERO_LSN, defaultSchema: this.defaultDb.databaseName }, @@ -209,15 +212,9 @@ export class ChangeStream { await touch(); } - const snapshotTime = session.clusterTime?.clusterTime ?? startTime; - - if (snapshotTime != null) { - const lsn = getMongoLsn(snapshotTime); - logger.info(`Snapshot commit at ${snapshotTime.inspect()} / ${lsn}`); - await batch.commit(lsn); - } else { - throw new Error(`No snapshot clusterTime available.`); - } + const lsn = getMongoLsn(snapshotTime); + logger.info(`Snapshot commit at ${snapshotTime.inspect()} / ${lsn}`); + await batch.commit(lsn); } ); } finally { @@ -289,7 +286,7 @@ export class ChangeStream { const db = this.client.db(table.schema); const collection = db.collection(table.table); - const query = collection.find({}, { session }); + const query = collection.find({}, { session, readConcern: { level: 'majority' } }); const cursor = query.stream(); diff --git a/modules/module-mongodb/test/src/change_stream_utils.ts b/modules/module-mongodb/test/src/change_stream_utils.ts index 76735d380..77a5d9647 100644 --- a/modules/module-mongodb/test/src/change_stream_utils.ts +++ b/modules/module-mongodb/test/src/change_stream_utils.ts @@ -91,7 +91,7 @@ export class ChangeStreamTestContext { getClientCheckpoint(this.client, this.db, this.factory, { timeout: options?.timeout ?? 15_000 }), this.streamPromise ]); - if (typeof checkpoint == undefined) { + if (typeof checkpoint == 'undefined') { // This indicates an issue with the test setup - streamingPromise completed instead // of getClientCheckpoint() throw new Error('Test failure - streamingPromise completed'); @@ -105,14 +105,32 @@ export class ChangeStreamTestContext { return fromAsync(this.storage!.getBucketDataBatch(checkpoint, map)); } - async getBucketData(bucket: string, start?: string, options?: { timeout?: number }) { + async getBucketData( + bucket: string, + start?: string, + options?: { timeout?: number; limit?: number; chunkLimitBytes?: number } + ) { start ??= '0'; let checkpoint = await this.getCheckpoint(options); const map = new Map([[bucket, start]]); - const batch = this.storage!.getBucketDataBatch(checkpoint, map); + const batch = this.storage!.getBucketDataBatch(checkpoint, map, { + limit: options?.limit, + chunkLimitBytes: options?.chunkLimitBytes + }); const batches = await fromAsync(batch); return batches[0]?.batch.data ?? []; } + + async getChecksums(buckets: string[], options?: { timeout?: number }) { + let checkpoint = await this.getCheckpoint(options); + return this.storage!.getChecksums(checkpoint, buckets); + } + + async getChecksum(bucket: string, options?: { timeout?: number }) { + let checkpoint = await this.getCheckpoint(options); + const map = await this.storage!.getChecksums(checkpoint, [bucket]); + return map.get(bucket); + } } export async function getClientCheckpoint( @@ -144,3 +162,17 @@ export async function getClientCheckpoint( throw new Error(`Timeout while waiting for checkpoint ${lsn}. Last checkpoint: ${lastCp?.lsn}`); } + +export async function setSnapshotHistorySeconds(client: mongo.MongoClient, seconds: number) { + const { minSnapshotHistoryWindowInSeconds: currentValue } = await client + .db('admin') + .command({ getParameter: 1, minSnapshotHistoryWindowInSeconds: 1 }); + + await client.db('admin').command({ setParameter: 1, minSnapshotHistoryWindowInSeconds: seconds }); + + return { + async [Symbol.asyncDispose]() { + await client.db('admin').command({ setParameter: 1, minSnapshotHistoryWindowInSeconds: currentValue }); + } + }; +} diff --git a/modules/module-mongodb/test/src/slow_tests.test.ts b/modules/module-mongodb/test/src/slow_tests.test.ts new file mode 100644 index 000000000..535e967c4 --- /dev/null +++ b/modules/module-mongodb/test/src/slow_tests.test.ts @@ -0,0 +1,109 @@ +import { MONGO_STORAGE_FACTORY } from '@core-tests/util.js'; +import { BucketStorageFactory } from '@powersync/service-core'; +import * as mongo from 'mongodb'; +import { setTimeout } from 'node:timers/promises'; +import { describe, expect, test } from 'vitest'; +import { ChangeStreamTestContext, setSnapshotHistorySeconds } from './change_stream_utils.js'; +import { env } from './env.js'; + +type StorageFactory = () => Promise; + +const BASIC_SYNC_RULES = ` +bucket_definitions: + global: + data: + - SELECT _id as id, description FROM "test_data" +`; + +describe('change stream slow tests - mongodb', { timeout: 60_000 }, function () { + if (env.CI || env.SLOW_TESTS) { + defineSlowTests(MONGO_STORAGE_FACTORY); + } else { + // Need something in this file. + test('no-op', () => {}); + } +}); + +function defineSlowTests(factory: StorageFactory) { + test('replicating snapshot with lots of data', async () => { + await using context = await ChangeStreamTestContext.open(factory); + // Test with low minSnapshotHistoryWindowInSeconds, to trigger: + // > Read timestamp .. is older than the oldest available timestamp. + // This happened when we had {snapshot: true} in the initial + // snapshot session. + await using _ = await setSnapshotHistorySeconds(context.client, 1); + const { db } = context; + await context.updateSyncRules(` +bucket_definitions: + global: + data: + - SELECT _id as id, description, num FROM "test_data1" + - SELECT _id as id, description, num FROM "test_data2" + `); + + const collection1 = db.collection('test_data1'); + const collection2 = db.collection('test_data2'); + + let operations: mongo.AnyBulkWriteOperation[] = []; + for (let i = 0; i < 10_000; i++) { + operations.push({ insertOne: { document: { description: `pre${i}`, num: i } } }); + } + await collection1.bulkWrite(operations); + await collection2.bulkWrite(operations); + + await context.replicateSnapshot(); + context.startStreaming(); + const checksum = await context.getChecksum('global[]'); + expect(checksum).toMatchObject({ + count: 20_000 + }); + }); + + test('writes concurrently with snapshot', async () => { + // If there is an issue with snapshotTime (the start LSN for the + // changestream), we may miss updates, which this test would + // hopefully catch. + + await using context = await ChangeStreamTestContext.open(factory); + const { db } = context; + await context.updateSyncRules(` +bucket_definitions: + global: + data: + - SELECT _id as id, description, num FROM "test_data" + `); + + const collection = db.collection('test_data'); + + let operations: mongo.AnyBulkWriteOperation[] = []; + for (let i = 0; i < 5_000; i++) { + operations.push({ insertOne: { document: { description: `pre${i}`, num: i } } }); + } + await collection.bulkWrite(operations); + + const snapshotPromise = context.replicateSnapshot(); + + for (let i = 49; i >= 0; i--) { + await collection.updateMany( + { num: { $gte: i * 100, $lt: i * 100 + 100 } }, + { $set: { description: 'updated' + i } } + ); + await setTimeout(20); + } + + await snapshotPromise; + context.startStreaming(); + + const data = await context.getBucketData('global[]', undefined, { limit: 50_000, chunkLimitBytes: 60_000_000 }); + + const preDocuments = data.filter((d) => JSON.parse(d.data! as string).description.startsWith('pre')).length; + const updatedDocuments = data.filter((d) => JSON.parse(d.data! as string).description.startsWith('updated')).length; + + // If the test works properly, preDocuments should be around 2000-3000. + // The total should be around 9000-9900. + // However, it is very sensitive to timing, so we allow a wide range. + // updatedDocuments must be strictly >= 5000, otherwise something broke. + expect(updatedDocuments).toBeGreaterThanOrEqual(5_000); + expect(preDocuments).toBeLessThanOrEqual(5_000); + }); +}