Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/sweet-chairs-build.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@powersync/service-module-mongodb': patch
'@powersync/service-core': patch
'@powersync/service-image': patch
---

MongoDB: Fix replication of undefined values causing missing documents
18 changes: 13 additions & 5 deletions modules/module-mongodb/src/replication/MongoRelation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,12 @@ export function constructAfterRecord(document: mongo.Document): SqliteRow {

export function toMongoSyncRulesValue(data: any): SqliteValue {
const autoBigNum = true;
if (data == null) {
// null or undefined
return data;
if (data === null) {
return null;
} else if (typeof data == 'undefined') {
// We consider `undefined` in top-level fields as missing replicated value,
// so use null instead.
return null;
} else if (typeof data == 'string') {
return data;
} else if (typeof data == 'number') {
Expand Down Expand Up @@ -95,8 +98,13 @@ function filterJsonData(data: any, depth = 0): any {
// This is primarily to prevent infinite recursion
throw new Error(`json nested object depth exceeds the limit of ${DEPTH_LIMIT}`);
}
if (data == null) {
return data; // null or undefined
if (data === null) {
return data;
} else if (typeof data == 'undefined') {
// For nested data, keep as undefined.
// In arrays, this is converted to null.
// In objects, the key is excluded.
return undefined;
} else if (typeof data == 'string') {
return data;
} else if (typeof data == 'number') {
Expand Down
79 changes: 69 additions & 10 deletions modules/module-mongodb/test/src/mongo_test.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,52 @@ describe('mongo data types', () => {
mongo.ObjectId.createFromHexString('66e834cc91d805df11fa0ecb'),
'mydb',
{ foo: 'bar' }
),
undefined: undefined
)
}
]);
}

async function insertUndefined(db: mongo.Db, collection: string, array?: boolean) {
// MongoDB has deprecated the `undefined` value, making it really
// difficult to insert one into the database.
// mapReduce is also deprecated, but it's one way to still generate
// the value.
const mapInput = db.collection('map_input');
await mapInput.insertOne({ test: 'test' });
const fin = array ? `return { result: [undefined] }` : `return { result: undefined }`;
await db.command({
mapReduce: 'map_input',
map: new mongo.Code(`function () {
// We only need to emit once for a single result:
emit(5, {});
}`),
reduce: new mongo.Code(`function (key, values) {
// Return an object whose property is explicitly set to undefined
return undefined;
}`),
finalize: new mongo.Code(`function (key, reducedVal) {
${fin};
}`),
out: { merge: 'map_output' }
});

await db
.collection('map_output')
.aggregate([
{ $set: { undefined: '$value.result' } },
{ $project: { undefined: 1 } },
{
$merge: {
into: collection
}
}
])
.toArray();

await mapInput.drop();
await db.collection('map_output').drop();
}

async function insertNested(collection: mongo.Collection) {
await collection.insertMany([
{
Expand Down Expand Up @@ -118,9 +158,11 @@ describe('mongo data types', () => {
js: '{"code":"testcode","scope":null}',
js2: '{"code":"testcode","scope":{"foo":"bar"}}',
pointer: '{"collection":"mycollection","oid":"66e834cc91d805df11fa0ecb","fields":{}}',
pointer2: '{"collection":"mycollection","oid":"66e834cc91d805df11fa0ecb","db":"mydb","fields":{"foo":"bar"}}',
undefined: null
pointer2: '{"collection":"mycollection","oid":"66e834cc91d805df11fa0ecb","db":"mydb","fields":{"foo":"bar"}}'
});

// This must specifically be null, and not undefined.
expect(transformed[4].undefined).toBeNull();
}

function checkResultsNested(transformed: Record<string, any>[]) {
Expand Down Expand Up @@ -158,20 +200,27 @@ describe('mongo data types', () => {
js: '[{"code":"testcode","scope":null}]',
pointer: '[{"collection":"mycollection","oid":"66e834cc91d805df11fa0ecb","fields":{}}]',
minKey: '[null]',
maxKey: '[null]',
maxKey: '[null]'
});

expect(transformed[4]).toMatchObject({
undefined: '[null]'
});
}

test('test direct queries', async () => {
const { db, client } = await connectMongoData();

const collection = db.collection('test_data');
try {
await setupTable(db);

await insert(collection);
await insertUndefined(db, 'test_data');

const transformed = [...ChangeStream.getQueryData(await db.collection('test_data').find().toArray())];
const rawResults = await db.collection('test_data').find().toArray();
// It is tricky to save "undefined" with mongo, so we check that it succeeded.
expect(rawResults[4].undefined).toBeUndefined();
const transformed = [...ChangeStream.getQueryData(rawResults)];

checkResults(transformed);
} finally {
Expand All @@ -186,8 +235,11 @@ describe('mongo data types', () => {
await setupTable(db);

await insertNested(collection);
await insertUndefined(db, 'test_data_arrays', true);

const transformed = [...ChangeStream.getQueryData(await db.collection('test_data_arrays').find().toArray())];
const rawResults = await db.collection('test_data_arrays').find().toArray();
expect(rawResults[4].undefined).toEqual([undefined]);
const transformed = [...ChangeStream.getQueryData(rawResults)];

checkResultsNested(transformed);
} finally {
Expand All @@ -212,8 +264,9 @@ describe('mongo data types', () => {
await stream.tryNext();

await insert(collection);
await insertUndefined(db, 'test_data');

const transformed = await getReplicationTx(stream, 4);
const transformed = await getReplicationTx(stream, 5);

checkResults(transformed);
} finally {
Expand All @@ -236,8 +289,9 @@ describe('mongo data types', () => {
await stream.tryNext();

await insertNested(collection);
await insertUndefined(db, 'test_data_arrays', true);

const transformed = await getReplicationTx(stream, 4);
const transformed = await getReplicationTx(stream, 5);

checkResultsNested(transformed);
} finally {
Expand All @@ -256,6 +310,7 @@ describe('mongo data types', () => {
const collection = db.collection('test_data');
await setupTable(db);
await insert(collection);
await insertUndefined(db, 'test_data');

const schema = await adapter.getConnectionSchema();
const dbSchema = schema.filter((s) => s.name == TEST_CONNECTION_OPTIONS.database)[0];
Expand Down Expand Up @@ -440,6 +495,10 @@ bucket_definitions:
async function getReplicationTx(replicationStream: mongo.ChangeStream, count: number) {
let transformed: SqliteRow[] = [];
for await (const doc of replicationStream) {
// Specifically filter out map_input / map_output collections
if (!(doc as any)?.ns?.coll?.startsWith('test_data')) {
continue;
}
transformed.push(constructAfterRecord((doc as any).fullDocument));
if (transformed.length == count) {
break;
Expand Down
6 changes: 3 additions & 3 deletions packages/service-core/src/storage/mongo/MongoBucketBatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ export class MongoBucketBatch extends DisposableObserver<BucketBatchStorageListe
// However, it will be valid by the end of the transaction.
//
// In this case, we don't save the op, but we do save the current data.
if (afterId && after && util.isCompleteRow(after)) {
if (afterId && after && util.isCompleteRow(this.storeCurrentData, after)) {
// Insert or update
if (sourceTable.syncData) {
const { results: evaluated, errors: syncErrors } = this.sync_rules.evaluateRowWithErrors({
Expand Down Expand Up @@ -722,8 +722,8 @@ export class MongoBucketBatch extends DisposableObserver<BucketBatchStorageListe
table: sourceTable,
data: {
op: tag,
after: after && util.isCompleteRow(after) ? after : undefined,
before: before && util.isCompleteRow(before) ? before : undefined
after: after && util.isCompleteRow(this.storeCurrentData, after) ? after : undefined,
before: before && util.isCompleteRow(this.storeCurrentData, before) ? before : undefined
},
event
})
Expand Down
11 changes: 10 additions & 1 deletion packages/service-core/src/util/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,16 @@ export function hasToastedValues(row: sync_rules.ToastableSqliteRow) {
return false;
}

export function isCompleteRow(row: sync_rules.ToastableSqliteRow): row is sync_rules.SqliteRow {
/**
* Returns true if we have a complete row.
*
* If we don't store data, we assume we always have a complete row.
*/
export function isCompleteRow(storeData: boolean, row: sync_rules.ToastableSqliteRow): row is sync_rules.SqliteRow {
if (!storeData) {
// Assume the row is complete - no need to check
return true;
}
return !hasToastedValues(row);
}

Expand Down
Loading