Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 17 additions & 20 deletions modules/module-mongodb/src/replication/ChangeStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -176,19 +176,22 @@ export class ChangeStream {
const sourceTables = this.sync_rules.getSourceTables();
await this.client.connect();

// We need to get the snapshot time before taking the initial snapshot.
const hello = await this.defaultDb.command({ hello: 1 });
const startTime = hello.lastWrite?.majorityOpTime?.ts as mongo.Timestamp;
const snapshotTime = hello.lastWrite?.majorityOpTime?.ts as mongo.Timestamp;
if (hello.msg == 'isdbgrid') {
throw new Error('Sharded MongoDB Clusters are not supported yet (including MongoDB Serverless instances).');
} else if (hello.setName == null) {
throw new Error('Standalone MongoDB instances are not supported - use a replicaset.');
} else if (startTime == null) {
} else if (snapshotTime == null) {
// Not known where this would happen apart from the above cases
throw new Error('MongoDB lastWrite timestamp not found.');
}
const session = await this.client.startSession({
snapshot: true
});
// We previously used {snapshot: true} for the snapshot session.
// While it gives nice consistency guarantees, it fails when the
// snapshot takes longer than 5 minutes, due to minSnapshotHistoryWindowInSeconds
// expiring the snapshot.
const session = await this.client.startSession();
try {
await this.storage.startBatch(
{ zeroLSN: ZERO_LSN, defaultSchema: this.defaultDb.databaseName },
Expand All @@ -209,17 +212,9 @@ export class ChangeStream {
await touch();
}

const snapshotTime = session.clusterTime?.clusterTime ?? startTime;

if (snapshotTime != null) {
const lsn = getMongoLsn(snapshotTime);
logger.info(`Snapshot commit at ${snapshotTime.inspect()} / ${lsn}`);
// keepalive() does an auto-commit if there is data
await batch.flush();
await batch.keepalive(lsn);
} else {
throw new Error(`No snapshot clusterTime available.`);
}
const lsn = getMongoLsn(snapshotTime);
logger.info(`Snapshot commit at ${snapshotTime.inspect()} / ${lsn}`);
await batch.commit(lsn);
}
);
} finally {
Expand Down Expand Up @@ -293,7 +288,7 @@ export class ChangeStream {

const db = this.client.db(table.schema);
const collection = db.collection(table.table);
const query = collection.find({}, { session });
const query = collection.find({}, { session, readConcern: { level: 'majority' } });

const cursor = query.stream();

Expand Down Expand Up @@ -566,7 +561,10 @@ export class ChangeStream {
stream.close();
});

let waitForCheckpointLsn: string | null = null;
// Always start with a checkpoint.
// This helps us to clear erorrs when restarting, even if there is
// no data to replicate.
let waitForCheckpointLsn: string | null = await createCheckpoint(this.client, this.defaultDb);

while (true) {
if (this.abort_signal.aborted) {
Expand Down Expand Up @@ -596,8 +594,7 @@ export class ChangeStream {
if (waitForCheckpointLsn != null && lsn >= waitForCheckpointLsn) {
waitForCheckpointLsn = null;
}
await batch.flush();
await batch.keepalive(lsn);
await batch.commit(lsn);
} else if (
changeDocument.operationType == 'insert' ||
changeDocument.operationType == 'update' ||
Expand Down
38 changes: 37 additions & 1 deletion modules/module-mongodb/test/src/change_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { BucketStorageFactory } from '@powersync/service-core';
import * as crypto from 'crypto';
import * as mongo from 'mongodb';
import { setTimeout } from 'node:timers/promises';
import { describe, expect, test } from 'vitest';
import { describe, expect, test, vi } from 'vitest';
import { ChangeStreamTestContext } from './change_stream_utils.js';
import { PostImagesOption } from '@module/types/types.js';

Expand Down Expand Up @@ -460,4 +460,40 @@ bucket_definitions:
message: expect.stringContaining('stream was configured to require a post-image for all update events')
});
});

test('recover from error', async () => {
await using context = await ChangeStreamTestContext.open(factory);
const { db } = context;
await context.updateSyncRules(`
bucket_definitions:
global:
data:
- SELECT _id as id, description, num FROM "test_data"`);

await db.createCollection('test_data', {
changeStreamPreAndPostImages: { enabled: false }
});

const collection = db.collection('test_data');
await collection.insertOne({ description: 'test1', num: 1152921504606846976n });

await context.replicateSnapshot();

// Simulate an error
await context.storage!.reportError(new Error('simulated error'));
expect((await context.factory.getActiveSyncRulesContent())?.last_fatal_error).toEqual('simulated error');

// startStreaming() should automatically clear the error.
context.startStreaming();

// getBucketData() creates a checkpoint that clears the error, so we don't do that
// Just wait, and check that the error is cleared automatically.
await vi.waitUntil(
async () => {
const error = (await context.factory.getActiveSyncRulesContent())?.last_fatal_error;
return error == null;
},
{ timeout: 2_000 }
);
});
}
38 changes: 35 additions & 3 deletions modules/module-mongodb/test/src/change_stream_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ export class ChangeStreamTestContext {
getClientCheckpoint(this.client, this.db, this.factory, { timeout: options?.timeout ?? 15_000 }),
this.streamPromise
]);
if (typeof checkpoint == undefined) {
if (typeof checkpoint == 'undefined') {
// This indicates an issue with the test setup - streamingPromise completed instead
// of getClientCheckpoint()
throw new Error('Test failure - streamingPromise completed');
Expand All @@ -105,14 +105,32 @@ export class ChangeStreamTestContext {
return fromAsync(this.storage!.getBucketDataBatch(checkpoint, map));
}

async getBucketData(bucket: string, start?: string, options?: { timeout?: number }) {
async getBucketData(
bucket: string,
start?: string,
options?: { timeout?: number; limit?: number; chunkLimitBytes?: number }
) {
start ??= '0';
let checkpoint = await this.getCheckpoint(options);
const map = new Map<string, string>([[bucket, start]]);
const batch = this.storage!.getBucketDataBatch(checkpoint, map);
const batch = this.storage!.getBucketDataBatch(checkpoint, map, {
limit: options?.limit,
chunkLimitBytes: options?.chunkLimitBytes
});
const batches = await fromAsync(batch);
return batches[0]?.batch.data ?? [];
}

async getChecksums(buckets: string[], options?: { timeout?: number }) {
let checkpoint = await this.getCheckpoint(options);
return this.storage!.getChecksums(checkpoint, buckets);
}

async getChecksum(bucket: string, options?: { timeout?: number }) {
let checkpoint = await this.getCheckpoint(options);
const map = await this.storage!.getChecksums(checkpoint, [bucket]);
return map.get(bucket);
}
}

export async function getClientCheckpoint(
Expand Down Expand Up @@ -144,3 +162,17 @@ export async function getClientCheckpoint(

throw new Error(`Timeout while waiting for checkpoint ${lsn}. Last checkpoint: ${lastCp?.lsn}`);
}

export async function setSnapshotHistorySeconds(client: mongo.MongoClient, seconds: number) {
const { minSnapshotHistoryWindowInSeconds: currentValue } = await client
.db('admin')
.command({ getParameter: 1, minSnapshotHistoryWindowInSeconds: 1 });

await client.db('admin').command({ setParameter: 1, minSnapshotHistoryWindowInSeconds: seconds });

return {
async [Symbol.asyncDispose]() {
await client.db('admin').command({ setParameter: 1, minSnapshotHistoryWindowInSeconds: currentValue });
}
};
}
109 changes: 109 additions & 0 deletions modules/module-mongodb/test/src/slow_tests.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import { MONGO_STORAGE_FACTORY } from '@core-tests/util.js';
import { BucketStorageFactory } from '@powersync/service-core';
import * as mongo from 'mongodb';
import { setTimeout } from 'node:timers/promises';
import { describe, expect, test } from 'vitest';
import { ChangeStreamTestContext, setSnapshotHistorySeconds } from './change_stream_utils.js';
import { env } from './env.js';

type StorageFactory = () => Promise<BucketStorageFactory>;

const BASIC_SYNC_RULES = `
bucket_definitions:
global:
data:
- SELECT _id as id, description FROM "test_data"
`;

describe('change stream slow tests - mongodb', { timeout: 60_000 }, function () {
if (env.CI || env.SLOW_TESTS) {
defineSlowTests(MONGO_STORAGE_FACTORY);
} else {
// Need something in this file.
test('no-op', () => {});
}
});

function defineSlowTests(factory: StorageFactory) {
test('replicating snapshot with lots of data', async () => {
await using context = await ChangeStreamTestContext.open(factory);
// Test with low minSnapshotHistoryWindowInSeconds, to trigger:
// > Read timestamp .. is older than the oldest available timestamp.
// This happened when we had {snapshot: true} in the initial
// snapshot session.
await using _ = await setSnapshotHistorySeconds(context.client, 1);
const { db } = context;
await context.updateSyncRules(`
bucket_definitions:
global:
data:
- SELECT _id as id, description, num FROM "test_data1"
- SELECT _id as id, description, num FROM "test_data2"
`);

const collection1 = db.collection('test_data1');
const collection2 = db.collection('test_data2');

let operations: mongo.AnyBulkWriteOperation[] = [];
for (let i = 0; i < 10_000; i++) {
operations.push({ insertOne: { document: { description: `pre${i}`, num: i } } });
}
await collection1.bulkWrite(operations);
await collection2.bulkWrite(operations);

await context.replicateSnapshot();
context.startStreaming();
const checksum = await context.getChecksum('global[]');
expect(checksum).toMatchObject({
count: 20_000
});
});

test('writes concurrently with snapshot', async () => {
// If there is an issue with snapshotTime (the start LSN for the
// changestream), we may miss updates, which this test would
// hopefully catch.

await using context = await ChangeStreamTestContext.open(factory);
const { db } = context;
await context.updateSyncRules(`
bucket_definitions:
global:
data:
- SELECT _id as id, description, num FROM "test_data"
`);

const collection = db.collection('test_data');

let operations: mongo.AnyBulkWriteOperation[] = [];
for (let i = 0; i < 5_000; i++) {
operations.push({ insertOne: { document: { description: `pre${i}`, num: i } } });
}
await collection.bulkWrite(operations);

const snapshotPromise = context.replicateSnapshot();

for (let i = 49; i >= 0; i--) {
await collection.updateMany(
{ num: { $gte: i * 100, $lt: i * 100 + 100 } },
{ $set: { description: 'updated' + i } }
);
await setTimeout(20);
}

await snapshotPromise;
context.startStreaming();

const data = await context.getBucketData('global[]', undefined, { limit: 50_000, chunkLimitBytes: 60_000_000 });

const preDocuments = data.filter((d) => JSON.parse(d.data! as string).description.startsWith('pre')).length;
const updatedDocuments = data.filter((d) => JSON.parse(d.data! as string).description.startsWith('updated')).length;

// If the test works properly, preDocuments should be around 2000-3000.
// The total should be around 9000-9900.
// However, it is very sensitive to timing, so we allow a wide range.
// updatedDocuments must be strictly >= 5000, otherwise something broke.
expect(updatedDocuments).toBeGreaterThanOrEqual(5_000);
expect(preDocuments).toBeLessThanOrEqual(5_000);
});
}
Loading