Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/shy-pugs-teach.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@powersync/service-module-mongodb': patch
'@powersync/service-core': patch
'@powersync/service-image': patch
---

Fix MongoDB intial replication with mixed \_id types.
1 change: 1 addition & 0 deletions modules/module-mongodb/src/replication/ChangeStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ export class ChangeStream {
while (true) {
const { docs: docBatch, lastKey } = await nextChunkPromise;
if (docBatch.length == 0) {
// No more data - stop iterating
break;
}

Expand Down
15 changes: 14 additions & 1 deletion modules/module-mongodb/src/replication/MongoSnapshotQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,18 @@ export class ChunkedSnapshotQuery implements AsyncDisposable {
let cursor = this.lastCursor;
let newCursor = false;
if (cursor == null || cursor.closed) {
const filter: mongo.Filter<mongo.Document> = this.lastKey == null ? {} : { _id: { $gt: this.lastKey as any } };
// This is subtly but importantly different from doing { _id: { $gt: this.lastKey } }.
// If there are separate BSON types of _id, then the form above only returns documents in the same type,
// while the $expr form will return all documents with _id greater than the lastKey, matching sort order.
// Both forms use indexes efficiently.
// For details, see:
// https://www.mongodb.com/docs/manual/reference/bson-type-comparison-order/#comparison-sort-order
// The $literal is necessary to ensure that the lastKey is treated as a literal value, and doesn't attempt
// any parsing as an operator.
// Starting in MongoDB 5.0, this filter can use the _id index. Source:
// https://www.mongodb.com/docs/manual/release-notes/5.0/#general-aggregation-improvements
const filter: mongo.Filter<mongo.Document> =
this.lastKey == null ? {} : { $expr: { $gt: ['$_id', { $literal: this.lastKey }] } };
cursor = this.collection.find(filter, {
batchSize: this.batchSize,
readConcern: 'majority',
Expand All @@ -38,8 +49,10 @@ export class ChunkedSnapshotQuery implements AsyncDisposable {
if (!hasNext) {
this.lastCursor = null;
if (newCursor) {
// We just created a new cursor and it has no results - we have finished the end of the query.
return { docs: [], lastKey: null };
} else {
// The cursor may have hit the batch limit - retry
return this.nextChunk();
}
}
Expand Down
25 changes: 25 additions & 0 deletions modules/module-mongodb/test/src/chunked_snapshot.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,31 @@ function defineBatchTests(factory: TestStorageFactory) {
});
});

test('chunked snapshot (mixed)', async () => {
await testChunkedSnapshot({
generateId(i) {
// Alternatingly return a number, string or nested document.
// This caused issues in the past due to comparison operators only checking fields matching the same type.
if (i % 3 == 0) {
return i;
} else if (i % 3 == 1) {
return `${i}`;
} else {
return { n: i };
}
},
idToSqlite(id: number | string) {
if (typeof id == 'number') {
return BigInt(id);
} else if (typeof id == 'string') {
return id;
} else {
return JSON.stringify(id);
}
}
});
});

async function testChunkedSnapshot(options: {
generateId: (i: number) => any;
idToSqlite?: (id: any) => SqliteJsonValue;
Expand Down