Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/shy-pugs-teach.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@powersync/service-module-mongodb': patch
'@powersync/service-core': patch
'@powersync/service-image': patch
---

Fix MongoDB initial replication with mixed \_id types.
1 change: 1 addition & 0 deletions modules/module-mongodb/src/replication/ChangeStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ export class ChangeStream {
while (true) {
const { docs: docBatch, lastKey } = await nextChunkPromise;
if (docBatch.length == 0) {
// No more data - stop iterating
break;
}

Expand Down
15 changes: 14 additions & 1 deletion modules/module-mongodb/src/replication/MongoSnapshotQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,18 @@ export class ChunkedSnapshotQuery implements AsyncDisposable {
let cursor = this.lastCursor;
let newCursor = false;
if (cursor == null || cursor.closed) {
const filter: mongo.Filter<mongo.Document> = this.lastKey == null ? {} : { _id: { $gt: this.lastKey as any } };
// This is subtly but importantly different from doing { _id: { $gt: this.lastKey } }.
// If there are separate BSON types of _id, then the form above only returns documents in the same type,
// while the $expr form will return all documents with _id greater than the lastKey, matching sort order.
// Both forms use indexes efficiently.
// For details, see:
// https://www.mongodb.com/docs/manual/reference/bson-type-comparison-order/#comparison-sort-order
// The $literal is necessary to ensure that the lastKey is treated as a literal value, and doesn't attempt
// any parsing as an operator.
// Starting in MongoDB 5.0, this filter can use the _id index. Source:
// https://www.mongodb.com/docs/manual/release-notes/5.0/#general-aggregation-improvements
const filter: mongo.Filter<mongo.Document> =
this.lastKey == null ? {} : { $expr: { $gt: ['$_id', { $literal: this.lastKey }] } };
cursor = this.collection.find(filter, {
batchSize: this.batchSize,
readConcern: 'majority',
Expand All @@ -38,8 +49,10 @@ export class ChunkedSnapshotQuery implements AsyncDisposable {
if (!hasNext) {
this.lastCursor = null;
if (newCursor) {
// We just created a new cursor and it has no results - we have finished the end of the query.
return { docs: [], lastKey: null };
} else {
// The cursor may have hit the batch limit - retry
return this.nextChunk();
}
}
Expand Down
25 changes: 25 additions & 0 deletions modules/module-mongodb/test/src/chunked_snapshot.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,31 @@ function defineBatchTests(factory: TestStorageFactory) {
});
});

test('chunked snapshot (mixed)', async () => {
await testChunkedSnapshot({
generateId(i) {
// Alternatingly return a number, string or nested document.
// This caused issues in the past due to comparison operators only checking fields matching the same type.
if (i % 3 == 0) {
return i;
} else if (i % 3 == 1) {
return `${i}`;
} else {
return { n: i };
}
},
idToSqlite(id: number | string) {
if (typeof id == 'number') {
return BigInt(id);
} else if (typeof id == 'string') {
return id;
} else {
return JSON.stringify(id);
}
}
});
});

async function testChunkedSnapshot(options: {
generateId: (i: number) => any;
idToSqlite?: (id: any) => SqliteJsonValue;
Expand Down