Skip to content

Commit ef85649

Browse files
committed
Invalidate changestream if postImage is not available.
1 parent 0b9d0e7 commit ef85649

File tree

3 files changed

+87
-5
lines changed

3 files changed

+87
-5
lines changed

modules/module-mongodb/src/replication/ChangeStream.ts

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,15 @@ interface InitResult {
2525
needsInitialSync: boolean;
2626
}
2727

28-
export class MissingReplicationSlotError extends Error {
28+
/**
29+
* Thrown when the change stream is not valid anymore, and replication
30+
* must be restarted.
31+
*
32+
* Possible reasons:
33+
* * Some change stream documents do not have postImages.
34+
* * startAfter/resumeToken is not valid anymore.
35+
*/
36+
export class ChangeStreamInvalidatedError extends Error {
2937
constructor(message: string) {
3038
super(message);
3139
}
@@ -482,6 +490,21 @@ export class ChangeStream {
482490
}
483491

484492
async streamChanges() {
493+
try {
494+
await this.streamChangesInternal();
495+
} catch (e) {
496+
if (
497+
e instanceof mongo.MongoServerError &&
498+
e.codeName == 'NoMatchingDocument' &&
499+
e.errmsg?.includes('post-image was not found')
500+
) {
501+
throw new ChangeStreamInvalidatedError(e.errmsg);
502+
}
503+
throw e;
504+
}
505+
}
506+
507+
async streamChangesInternal() {
485508
// Auto-activate as soon as initial replication is done
486509
await this.storage.autoActivate();
487510

modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { container } from '@powersync/lib-services-framework';
2-
import { MissingReplicationSlotError, ChangeStream } from './ChangeStream.js';
2+
import { ChangeStreamInvalidatedError, ChangeStream } from './ChangeStream.js';
33

44
import { replication } from '@powersync/service-core';
55
import { ConnectionManagerFactory } from './ConnectionManagerFactory.js';
@@ -40,7 +40,7 @@ export class ChangeStreamReplicationJob extends replication.AbstractReplicationJ
4040
});
4141
this.logger.error(`Replication failed`, e);
4242

43-
if (e instanceof MissingReplicationSlotError) {
43+
if (e instanceof ChangeStreamInvalidatedError) {
4444
// This stops replication on this slot, and creates a new slot
4545
await this.options.storage.factory.slotRemoved(this.slotName);
4646
}
@@ -84,8 +84,10 @@ export class ChangeStreamReplicationJob extends replication.AbstractReplicationJ
8484
// Without this additional log, the cause may not be visible in the logs.
8585
this.logger.error(`cause`, e.cause);
8686
}
87-
if (e instanceof mongo.MongoError && e.hasErrorLabel('NonResumableChangeStreamError')) {
88-
throw new MissingReplicationSlotError(e.message);
87+
if (e instanceof ChangeStreamInvalidatedError) {
88+
throw e;
89+
} else if (e instanceof mongo.MongoError && e.hasErrorLabel('NonResumableChangeStreamError')) {
90+
throw new ChangeStreamInvalidatedError(e.message);
8991
} else {
9092
// Report the error if relevant, before retrying
9193
container.reporter.captureException(e, {

modules/module-mongodb/test/src/change_stream.test.ts

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -402,4 +402,61 @@ bucket_definitions:
402402

403403
expect(data).toMatchObject([]);
404404
});
405+
406+
test('postImages - new collection with postImages enabled', async () => {
407+
await using context = await ChangeStreamTestContext.open(factory, { postImages: 'autoConfigure' });
408+
const { db } = context;
409+
await context.updateSyncRules(`
410+
bucket_definitions:
411+
global:
412+
data:
413+
- SELECT _id as id, description FROM "test_%"`);
414+
415+
await context.replicateSnapshot();
416+
417+
await db.createCollection('test_data', {
418+
// enabled: true here - everything should work
419+
changeStreamPreAndPostImages: { enabled: true }
420+
});
421+
const collection = db.collection('test_data');
422+
const result = await collection.insertOne({ description: 'test1' });
423+
const test_id = result.insertedId;
424+
await collection.updateOne({ _id: test_id }, { $set: { description: 'test2' } });
425+
426+
context.startStreaming();
427+
428+
const data = await context.getBucketData('global[]');
429+
expect(data).toMatchObject([
430+
putOp('test_data', { id: test_id!.toHexString(), description: 'test1' }),
431+
putOp('test_data', { id: test_id!.toHexString(), description: 'test2' })
432+
]);
433+
});
434+
435+
test.only('postImages - new collection with postImages disabled', async () => {
436+
await using context = await ChangeStreamTestContext.open(factory, { postImages: 'autoConfigure' });
437+
const { db } = context;
438+
await context.updateSyncRules(`
439+
bucket_definitions:
440+
global:
441+
data:
442+
- SELECT _id as id, description FROM "test_data%"`);
443+
444+
await context.replicateSnapshot();
445+
446+
await db.createCollection('test_data', {
447+
// enabled: false here, but autoConfigure will enable it.
448+
// Unfortunately, that is too late, and replication must be restarted.
449+
changeStreamPreAndPostImages: { enabled: false }
450+
});
451+
const collection = db.collection('test_data');
452+
const result = await collection.insertOne({ description: 'test1' });
453+
const test_id = result.insertedId;
454+
await collection.updateOne({ _id: test_id }, { $set: { description: 'test2' } });
455+
456+
context.startStreaming();
457+
458+
await expect(() => context.getBucketData('global[]')).rejects.toMatchObject({
459+
message: expect.stringContaining('stream was configured to require a post-image for all update events')
460+
});
461+
});
405462
}

0 commit comments

Comments
 (0)