diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index 61224848e..730f39813 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -517,14 +517,13 @@ export class ChangeStream { const startAfter = mongoLsnToTimestamp(lastLsn) ?? undefined; logger.info(`Resume streaming at ${startAfter?.inspect()} / ${lastLsn}`); - // TODO: Use changeStreamSplitLargeEvent - const filters = this.getSourceNamespaceFilters(); const pipeline: mongo.Document[] = [ { $match: filters.$match - } + }, + { $changeStreamSplitLargeEvent: {} } ]; let fullDocument: 'required' | 'updateLookup'; @@ -568,22 +567,49 @@ export class ChangeStream { // no data to replicate. let waitForCheckpointLsn: string | null = await createCheckpoint(this.client, this.defaultDb); + let splitDocument: mongo.ChangeStreamDocument | null = null; + while (true) { if (this.abort_signal.aborted) { break; } - const changeDocument = await stream.tryNext(); + const originalChangeDocument = await stream.tryNext(); - if (changeDocument == null || this.abort_signal.aborted) { + if (originalChangeDocument == null || this.abort_signal.aborted) { continue; } await touch(); - if (startAfter != null && changeDocument.clusterTime?.lte(startAfter)) { + if (startAfter != null && originalChangeDocument.clusterTime?.lte(startAfter)) { continue; } + let changeDocument = originalChangeDocument; + if (originalChangeDocument?.splitEvent != null) { + // Handle split events from $changeStreamSplitLargeEvent. + // This is only relevant for very large update operations. + const splitEvent = originalChangeDocument?.splitEvent; + + if (splitDocument == null) { + splitDocument = originalChangeDocument; + } else { + splitDocument = Object.assign(splitDocument, originalChangeDocument); + } + + if (splitEvent.fragment == splitEvent.of) { + // Got all fragments + changeDocument = splitDocument; + splitDocument = null; + } else { + // Wait for more fragments + continue; + } + } else if (splitDocument != null) { + // We were waiting for fragments, but got a different event + throw new Error(`Incomplete splitEvent: ${JSON.stringify(splitDocument.splitEvent)}`); + } + // console.log('event', changeDocument); if ( diff --git a/modules/module-mongodb/test/src/change_stream.test.ts b/modules/module-mongodb/test/src/change_stream.test.ts index ddab9b8f0..0380fce6b 100644 --- a/modules/module-mongodb/test/src/change_stream.test.ts +++ b/modules/module-mongodb/test/src/change_stream.test.ts @@ -352,32 +352,41 @@ bucket_definitions: expect(data).toMatchObject([putOp('test_data', { id: test_id, description: 'test1' })]); }); - // Not correctly implemented yet - test.skip('large record', async () => { + test('large record', async () => { + // Test a large update. + + // Without $changeStreamSplitLargeEvent, we get this error: + // MongoServerError: PlanExecutor error during aggregation :: caused by :: BSONObj size: 33554925 (0x20001ED) is invalid. + // Size must be between 0 and 16793600(16MB) + await using context = await ChangeStreamTestContext.open(factory); await context.updateSyncRules(`bucket_definitions: global: data: - - SELECT _id as id, description, other FROM "test_data"`); + - SELECT _id as id, name, other FROM "test_data"`); const { db } = context; await context.replicateSnapshot(); - // 16MB - const largeDescription = crypto.randomBytes(8_000_000 - 100).toString('hex'); - const collection = db.collection('test_data'); - const result = await collection.insertOne({ description: largeDescription }); + const result = await collection.insertOne({ name: 't1' }); const test_id = result.insertedId; - await collection.updateOne({ _id: test_id }, { $set: { name: 't2' } }); + // 12MB field. + // The field appears twice in the ChangeStream event, so the total size + // is > 16MB. + + // We don't actually have this description field in the sync rules, + // That causes other issues, not relevant for this specific test. + const largeDescription = crypto.randomBytes(12000000 / 2).toString('hex'); + + await collection.updateOne({ _id: test_id }, { $set: { description: largeDescription } }); context.startStreaming(); const data = await context.getBucketData('global[]'); expect(data.length).toEqual(2); - const row = JSON.parse(data[0].data as string); - delete row.description; - expect(row).toEqual({ id: test_id.toHexString() }); + const row1 = JSON.parse(data[0].data as string); + expect(row1).toEqual({ id: test_id.toHexString(), name: 't1' }); delete data[0].data; expect(data[0]).toMatchObject({ object_id: test_id.toHexString(), @@ -385,6 +394,15 @@ bucket_definitions: op: 'PUT', op_id: '1' }); + const row2 = JSON.parse(data[1].data as string); + expect(row2).toEqual({ id: test_id.toHexString(), name: 't1' }); + delete data[1].data; + expect(data[1]).toMatchObject({ + object_id: test_id.toHexString(), + object_type: 'test_data', + op: 'PUT', + op_id: '2' + }); }); test('collection not in sync rules', async () => {