Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/violet-badgers-collect.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
'@powersync/service-module-mongodb-storage': patch
'@powersync/service-core-tests': patch
'@powersync/service-core': patch
'@powersync/service-image': patch
---

Fix checksum calculation issues with large buckets.
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,10 @@ export class MongoChecksums {
},
last_op: { $max: '$_id.o' }
}
}
},
// Sort the aggregated results (100 max, so should be fast).
// This is important to identify which buckets we have partial data for.
{ $sort: { _id: 1 } }
],
{ session: undefined, readConcern: 'snapshot', maxTimeMS: lib_mongo.MONGO_CHECKSUM_TIMEOUT_MS }
)
Expand Down Expand Up @@ -284,7 +287,6 @@ export class MongoChecksums {
// All done for this bucket
requests.delete(bucket);
}
batchCount++;
}
if (!limitReached) {
break;
Expand Down
48 changes: 48 additions & 0 deletions modules/module-mongodb-storage/test/src/storage.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { describe } from 'vitest';
import { INITIALIZED_MONGO_STORAGE_FACTORY } from './util.js';
import { env } from './env.js';
import { MongoTestStorageFactoryGenerator } from '@module/storage/implementation/MongoTestStorageFactoryGenerator.js';
import { MongoChecksumOptions } from '@module/storage/implementation/MongoChecksums.js';

describe('Mongo Sync Bucket Storage - Parameters', () =>
register.registerDataStorageParameterTests(INITIALIZED_MONGO_STORAGE_FACTORY));
Expand Down Expand Up @@ -42,3 +43,50 @@ describe('Mongo Sync Bucket Storage - split buckets', () =>
}
})
));

describe('Mongo Sync Bucket Storage - checksum calculations', () => {
// This test tests 4 buckets x 4 operations in each.
// We specifically use operationBatchLimit that does not have factors in common with 4,
// as well some that do.
const params: MongoChecksumOptions[] = [
{
bucketBatchLimit: 100,
operationBatchLimit: 3
},

{
bucketBatchLimit: 10,
operationBatchLimit: 7
},

{
bucketBatchLimit: 3,
operationBatchLimit: 1
},
{
bucketBatchLimit: 1,
operationBatchLimit: 3
},
{
bucketBatchLimit: 2,
operationBatchLimit: 4
},
{
bucketBatchLimit: 4,
operationBatchLimit: 12
}
];
for (let options of params) {
describe(`${options.bucketBatchLimit}|${options.operationBatchLimit}`, () => {
register.testChecksumBatching(
MongoTestStorageFactoryGenerator({
url: env.MONGO_TEST_URL,
isCI: env.CI,
internalOptions: {
checksumOptions: options
}
})
);
});
}
});
Original file line number Diff line number Diff line change
Expand Up @@ -1235,4 +1235,59 @@ bucket_definitions:
const checksums2 = [...(await bucketStorage.getChecksums(checkpoint + 1n, ['global[]'])).values()];
expect(checksums2).toEqual([{ bucket: 'global[]', checksum: 1917136889, count: 1 }]);
});

testChecksumBatching(generateStorageFactory);
}

/**
* This specifically tests an issue we ran into with MongoDB storage.
*
* Exposed as a separate test so we can test with more storage parameters.
*/
export function testChecksumBatching(generateStorageFactory: storage.TestStorageFactory) {
test('checksums for multiple buckets', async () => {
await using factory = await generateStorageFactory();
const syncRules = await factory.updateSyncRules({
content: `
bucket_definitions:
user:
parameters: select request.user_id() as user_id
data:
- select id, description from test where user_id = bucket.user_id
`
});
const bucketStorage = factory.getInstance(syncRules);

const sourceTable = TEST_TABLE;
await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => {
for (let u of ['u1', 'u2', 'u3', 'u4']) {
for (let t of ['t1', 't2', 't3', 't4']) {
const id = `${t}_${u}`;
await batch.save({
sourceTable,
tag: storage.SaveOperationTag.INSERT,
after: {
id,
description: `${t} description`,
user_id: u
},
afterReplicaId: test_utils.rid(id)
});
}
}
await batch.commit('1/1');
});
const { checkpoint } = await bucketStorage.getCheckpoint();

bucketStorage.clearChecksumCache();
const buckets = ['user["u1"]', 'user["u2"]', 'user["u3"]', 'user["u4"]'];
const checksums = [...(await bucketStorage.getChecksums(checkpoint, buckets)).values()];
checksums.sort((a, b) => a.bucket.localeCompare(b.bucket));
expect(checksums).toEqual([
{ bucket: 'user["u1"]', count: 4, checksum: 346204588 },
{ bucket: 'user["u2"]', count: 4, checksum: 5261081 },
{ bucket: 'user["u3"]', count: 4, checksum: 134760718 },
{ bucket: 'user["u4"]', count: 4, checksum: -302639724 }
]);
});
}