From 8e99e243ed1e0f92696e40f65426303ca790a85d Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 1 Sep 2025 17:36:56 +0200 Subject: [PATCH 1/4] Add failing tests. --- .../1741697235857-bucket-state-index.ts | 8 +- .../MongoTestStorageFactoryGenerator.ts | 7 +- .../src/storage/implementation/db.ts | 14 ++ .../src/storage/implementation/models.ts | 4 + .../test/src/storage_compacting.test.ts | 125 +++++++++++++++++- 5 files changed, 143 insertions(+), 15 deletions(-) diff --git a/modules/module-mongodb-storage/src/migrations/db/migrations/1741697235857-bucket-state-index.ts b/modules/module-mongodb-storage/src/migrations/db/migrations/1741697235857-bucket-state-index.ts index 33220d9cc..9db091b15 100644 --- a/modules/module-mongodb-storage/src/migrations/db/migrations/1741697235857-bucket-state-index.ts +++ b/modules/module-mongodb-storage/src/migrations/db/migrations/1741697235857-bucket-state-index.ts @@ -11,13 +11,7 @@ export const up: migrations.PowerSyncMigrationFunction = async (context) => { const db = storage.createPowerSyncMongo(configuration.storage as MongoStorageConfig); try { - await db.bucket_state.createIndex( - { - '_id.g': 1, - last_op: 1 - }, - { name: INDEX_NAME, unique: true } - ); + await db.createBucketStateIndex(); } finally { await db.client.close(); } diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoTestStorageFactoryGenerator.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoTestStorageFactoryGenerator.ts index 72be4547e..4aa149bfb 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoTestStorageFactoryGenerator.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoTestStorageFactoryGenerator.ts @@ -18,13 +18,14 @@ export const MongoTestStorageFactoryGenerator = (factoryOptions: MongoTestStorag await db.db.createCollection('bucket_parameters'); } - // Full migrations are not currently run for tests, so we manually create this - await db.createCheckpointEventsCollection(); - if (!options?.doNotClear) { await db.clear(); } + // Full migrations are not currently run for tests, so we manually create the important ones + await db.createCheckpointEventsCollection(); + await db.createBucketStateIndex(); + return new MongoBucketStorage(db, { slot_name_prefix: 'test_' }, factoryOptions.internalOptions); }; }; diff --git a/modules/module-mongodb-storage/src/storage/implementation/db.ts b/modules/module-mongodb-storage/src/storage/implementation/db.ts index dc5f4738e..e6b08352f 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/db.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/db.ts @@ -127,6 +127,20 @@ export class PowerSyncMongo { max: 50 // max number of documents }); } + + /** + * Only use in migrations and tests. + */ + async createBucketStateIndex() { + // TODO: Implement a better mechanism to use migrations in tests + await this.bucket_state.createIndex( + { + '_id.g': 1, + last_op: 1 + }, + { name: 'bucket_updates', unique: true } + ); + } } export function createPowerSyncMongo(config: MongoStorageConfig, options?: lib_mongo.MongoConnectionOptions) { diff --git a/modules/module-mongodb-storage/src/storage/implementation/models.ts b/modules/module-mongodb-storage/src/storage/implementation/models.ts index 998deec2c..0d2b92426 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/models.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/models.ts @@ -97,6 +97,10 @@ export interface BucketStateDocument { g: number; b: string; }; + /** + * Important: There is an unique index on {'_id.g': 1, last_op: 1}. + * That means the last_op must match an actual op in the bucket, and not the commit checkpoint. + */ last_op: bigint; /** * If set, this can be treated as "cache" of a checksum at a specific point. diff --git a/modules/module-mongodb-storage/test/src/storage_compacting.test.ts b/modules/module-mongodb-storage/test/src/storage_compacting.test.ts index a02a10811..8962d0cc2 100644 --- a/modules/module-mongodb-storage/test/src/storage_compacting.test.ts +++ b/modules/module-mongodb-storage/test/src/storage_compacting.test.ts @@ -1,7 +1,122 @@ -import { register } from '@powersync/service-core-tests'; -import { describe } from 'vitest'; +import { register, TEST_TABLE, test_utils } from '@powersync/service-core-tests'; +import { describe, expect, test } from 'vitest'; import { INITIALIZED_MONGO_STORAGE_FACTORY } from './util.js'; +import { storage, SyncRulesBucketStorage } from '@powersync/service-core'; -describe('Mongo Sync Bucket Storage Compact', () => register.registerCompactTests(INITIALIZED_MONGO_STORAGE_FACTORY)); -describe('Mongo Sync Parameter Storage Compact', () => - register.registerParameterCompactTests(INITIALIZED_MONGO_STORAGE_FACTORY)); +describe('Mongo Sync Bucket Storage Compact', () => { + register.registerCompactTests(INITIALIZED_MONGO_STORAGE_FACTORY); + + describe('with blank bucket_state', () => { + // This can happen when migrating from older service versions, that did not populate bucket_state yet. + const populate = async (bucketStorage: SyncRulesBucketStorage) => { + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { + id: 't1', + owner_id: 'u1' + }, + afterReplicaId: test_utils.rid('t1') + }); + + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { + id: 't2', + owner_id: 'u2' + }, + afterReplicaId: test_utils.rid('t2') + }); + + await batch.commit('1/1'); + }); + + return bucketStorage.getCheckpoint(); + }; + + const setup = async () => { + await using factory = await INITIALIZED_MONGO_STORAGE_FACTORY(); + const syncRules = await factory.updateSyncRules({ + content: ` +bucket_definitions: + by_user: + parameters: select request.user_id() as user_id + data: [select * from test where owner_id = bucket.user_id] + ` + }); + const bucketStorage = factory.getInstance(syncRules); + const { checkpoint } = await populate(bucketStorage); + + return { bucketStorage, checkpoint, factory }; + }; + + test('full compact', async () => { + const { bucketStorage, checkpoint, factory } = await setup(); + + // Simulate bucket_state from old version not being available + await factory.db.bucket_state.deleteMany({}); + + await bucketStorage.compact({ + clearBatchLimit: 200, + moveBatchLimit: 10, + moveBatchQueryLimit: 10, + maxOpId: checkpoint, + signal: null as any + }); + + const checksumAfter = await bucketStorage.getChecksums(checkpoint, ['by_user["u1"]', 'by_user["u2"]']); + expect(checksumAfter.get('by_user["u1"]')).toEqual({ + bucket: 'by_user["u1"]', + checksum: -659469718, + count: 1 + }); + expect(checksumAfter.get('by_user["u2"]')).toEqual({ + bucket: 'by_user["u2"]', + checksum: 430217650, + count: 1 + }); + }); + + test('populatePersistentChecksumCache', async () => { + // Populate old sync rules version + const { factory } = await setup(); + + // Not populate another version (bucket definition name changed) + const syncRules = await factory.updateSyncRules({ + content: ` +bucket_definitions: + by_user2: + parameters: select request.user_id() as user_id + data: [select * from test where owner_id = bucket.user_id] + ` + }); + const bucketStorage = factory.getInstance(syncRules); + + await populate(bucketStorage); + const { checkpoint } = await bucketStorage.getCheckpoint(); + + await bucketStorage.populatePersistentChecksumCache({ + maxOpId: checkpoint, + signal: new AbortController().signal + }); + + const checksumAfter = await bucketStorage.getChecksums(checkpoint, ['by_user2["u1"]', 'by_user2["u2"]']); + expect(checksumAfter.get('by_user2["u1"]')).toEqual({ + bucket: 'by_user2["u1"]', + checksum: -659469718, + count: 1 + }); + expect(checksumAfter.get('by_user2["u2"]')).toEqual({ + bucket: 'by_user2["u2"]', + checksum: 430217650, + count: 1 + }); + }); + }); +}); + +describe('Mongo Sync Parameter Storage Compact', () => { + register.registerParameterCompactTests(INITIALIZED_MONGO_STORAGE_FACTORY); +}); From 76846b75a9d0f55144c03669ded6b9a2e6a1dc8d Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 1 Sep 2025 17:42:13 +0200 Subject: [PATCH 2/4] Remove upsert on bucket_state, to avoid unique index failures. --- .../storage/implementation/MongoCompactor.ts | 20 ++++++------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts index ab776ccea..24f4483f5 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts @@ -329,15 +329,11 @@ export class MongoCompactor { count: 0, bytes: 0 } - }, - $setOnInsert: { - // Only set this if we're creating the document. - // In all other cases, the replication process will have a set a more accurate id. - last_op: this.maxOpId } }, - // We generally expect this to have been created before, but do handle cases of old unchanged buckets - upsert: true + // We generally expect this to have been created before. + // We don't create new ones here, to avoid issues with the unique index on bucket_updates. + upsert: false } }); } @@ -551,15 +547,11 @@ export class MongoCompactor { checksum: BigInt(bucketChecksum.checksum), bytes: null } - }, - $setOnInsert: { - // Only set this if we're creating the document. - // In all other cases, the replication process will have a set a more accurate id. - last_op: this.maxOpId } }, - // We generally expect this to have been created before, but do handle cases of old unchanged buckets - upsert: true + // We don't create new ones here - it gets tricky to get the last_op right with the unique index on: + // bucket_updates: {'id.g': 1, 'last_op': 1} + upsert: false } }); } From 7c2875a7a7192a5f364fb71d37fe299a435b5e08 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 1 Sep 2025 17:47:00 +0200 Subject: [PATCH 3/4] Filter by group_id for populateChecksums. --- .../storage/implementation/MongoCompactor.ts | 23 +++++++++++++------ 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts index 24f4483f5..b3b9830d4 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts @@ -6,7 +6,6 @@ import { PowerSyncMongo } from './db.js'; import { BucketDataDocument, BucketDataKey, BucketStateDocument } from './models.js'; import { MongoSyncBucketStorage } from './MongoSyncBucketStorage.js'; import { cacheKey } from './OperationBatch.js'; -import { readSingleBatch } from './util.js'; interface CurrentBucketState { /** Bucket name */ @@ -480,15 +479,25 @@ export class MongoCompactor { * Subset of compact, only populating checksums where relevant. */ async populateChecksums() { - let lastId: BucketStateDocument['_id'] | null = null; + // This is updated after each batch + let lowerBound: BucketStateDocument['_id'] = { + g: this.group_id, + b: new mongo.MinKey() as any + }; + // This is static + const upperBound: BucketStateDocument['_id'] = { + g: this.group_id, + b: new mongo.MaxKey() as any + }; while (!this.signal?.aborted) { // By filtering buckets, we effectively make this "resumeable". - let filter: mongo.Filter = { + const filter: mongo.Filter = { + _id: { + $gt: lowerBound, + $lt: upperBound + }, compacted_state: { $exists: false } }; - if (lastId) { - filter._id = { $gt: lastId }; - } const bucketsWithoutChecksums = await this.db.bucket_state .find(filter, { @@ -511,7 +520,7 @@ export class MongoCompactor { await this.updateChecksumsBatch(bucketsWithoutChecksums.map((b) => b._id.b)); - lastId = bucketsWithoutChecksums[bucketsWithoutChecksums.length - 1]._id; + lowerBound = bucketsWithoutChecksums[bucketsWithoutChecksums.length - 1]._id; } } From 6c61b310a8441e8354278a003aa2214cc1183544 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 1 Sep 2025 17:47:56 +0200 Subject: [PATCH 4/4] Add changeset. --- .changeset/tasty-peaches-give.md | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 .changeset/tasty-peaches-give.md diff --git a/.changeset/tasty-peaches-give.md b/.changeset/tasty-peaches-give.md new file mode 100644 index 000000000..7e5491640 --- /dev/null +++ b/.changeset/tasty-peaches-give.md @@ -0,0 +1,7 @@ +--- +'@powersync/service-module-mongodb-storage': patch +'@powersync/service-core': patch +'@powersync/service-image': patch +--- + +Fix "E11000 duplicate key error collection: powersync_demo.bucket_state" in some cases on sync rules deploy