diff --git a/.changeset/rotten-items-explode.md b/.changeset/rotten-items-explode.md new file mode 100644 index 000000000..2f19091b1 --- /dev/null +++ b/.changeset/rotten-items-explode.md @@ -0,0 +1,5 @@ +--- +'@powersync/service-module-postgres-storage': patch +--- + +Fix issue where compacting might fail with an "unexpected PUT operation" error. diff --git a/.changeset/rotten-nails-cheat.md b/.changeset/rotten-nails-cheat.md new file mode 100644 index 000000000..7fdc69585 --- /dev/null +++ b/.changeset/rotten-nails-cheat.md @@ -0,0 +1,5 @@ +--- +'@powersync/service-core-tests': minor +--- + +Added compacting test for interleaved bucket operations. diff --git a/.changeset/six-dragons-whisper.md b/.changeset/six-dragons-whisper.md new file mode 100644 index 000000000..6a2007da3 --- /dev/null +++ b/.changeset/six-dragons-whisper.md @@ -0,0 +1,8 @@ +--- +'@powersync/service-module-postgres-storage': patch +'@powersync/service-module-mongodb-storage': patch +'@powersync/service-core-tests': patch +'@powersync/service-core': patch +--- + +Unified compacting options between storage providers. diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts index 06a14df52..5e79d8df7 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts @@ -32,14 +32,7 @@ interface CurrentBucketState { /** * Additional options, primarily for testing. */ -export interface MongoCompactOptions extends storage.CompactOptions { - /** Minimum of 2 */ - clearBatchLimit?: number; - /** Minimum of 1 */ - moveBatchLimit?: number; - /** Minimum of 1 */ - moveBatchQueryLimit?: number; -} +export interface MongoCompactOptions extends storage.CompactOptions {} const DEFAULT_CLEAR_BATCH_LIMIT = 5000; const DEFAULT_MOVE_BATCH_LIMIT = 2000; diff --git a/modules/module-mongodb-storage/test/src/storage_compacting.test.ts b/modules/module-mongodb-storage/test/src/storage_compacting.test.ts index 803c873e7..285b7232c 100644 --- a/modules/module-mongodb-storage/test/src/storage_compacting.test.ts +++ b/modules/module-mongodb-storage/test/src/storage_compacting.test.ts @@ -1,11 +1,5 @@ -import { MongoCompactOptions } from '@module/storage/implementation/MongoCompactor.js'; import { register } from '@powersync/service-core-tests'; import { describe } from 'vitest'; import { INITIALIZED_MONGO_STORAGE_FACTORY } from './util.js'; -describe('Mongo Sync Bucket Storage Compact', () => - register.registerCompactTests(INITIALIZED_MONGO_STORAGE_FACTORY, { - clearBatchLimit: 2, - moveBatchLimit: 1, - moveBatchQueryLimit: 1 - })); +describe('Mongo Sync Bucket Storage Compact', () => register.registerCompactTests(INITIALIZED_MONGO_STORAGE_FACTORY)); diff --git a/modules/module-postgres-storage/src/storage/PostgresCompactor.ts b/modules/module-postgres-storage/src/storage/PostgresCompactor.ts index f88657dc6..67173c9f5 100644 --- a/modules/module-postgres-storage/src/storage/PostgresCompactor.ts +++ b/modules/module-postgres-storage/src/storage/PostgresCompactor.ts @@ -35,14 +35,7 @@ interface CurrentBucketState { /** * Additional options, primarily for testing. */ -export interface PostgresCompactOptions extends storage.CompactOptions { - /** Minimum of 2 */ - clearBatchLimit?: number; - /** Minimum of 1 */ - moveBatchLimit?: number; - /** Minimum of 1 */ - moveBatchQueryLimit?: number; -} +export interface PostgresCompactOptions extends storage.CompactOptions {} const DEFAULT_CLEAR_BATCH_LIMIT = 5000; const DEFAULT_MOVE_BATCH_LIMIT = 2000; @@ -99,15 +92,19 @@ export class PostgresCompactor { let bucketLower: string | null = null; let bucketUpper: string | null = null; + const MAX_CHAR = String.fromCodePoint(0xffff); - if (bucket?.includes('[')) { + if (bucket == null) { + bucketLower = ''; + bucketUpper = MAX_CHAR; + } else if (bucket?.includes('[')) { // Exact bucket name bucketLower = bucket; bucketUpper = bucket; } else if (bucket) { // Bucket definition name bucketLower = `${bucket}[`; - bucketUpper = `${bucket}[\uFFFF`; + bucketUpper = `${bucket}[${MAX_CHAR}`; } let upperOpIdLimit = BIGINT_MAX; @@ -126,10 +123,16 @@ export class PostgresCompactor { bucket_data WHERE group_id = ${{ type: 'int4', value: this.group_id }} - AND bucket_name LIKE COALESCE(${{ type: 'varchar', value: bucketLower }}, '%') - AND op_id < ${{ type: 'int8', value: upperOpIdLimit }} + AND bucket_name >= ${{ type: 'varchar', value: bucketLower }} + AND ( + ( + bucket_name = ${{ type: 'varchar', value: bucketUpper }} + AND op_id < ${{ type: 'int8', value: upperOpIdLimit }} + ) + OR bucket_name < ${{ type: 'varchar', value: bucketUpper }} COLLATE "C" -- Use binary comparison + ) ORDER BY - bucket_name, + bucket_name DESC, op_id DESC LIMIT ${{ type: 'int4', value: this.moveBatchQueryLimit }} @@ -145,7 +148,9 @@ export class PostgresCompactor { } // Set upperBound for the next batch - upperOpIdLimit = batch[batch.length - 1].op_id; + const lastBatchItem = batch[batch.length - 1]; + upperOpIdLimit = lastBatchItem.op_id; + bucketUpper = lastBatchItem.bucket_name; for (const doc of batch) { if (currentState == null || doc.bucket_name != currentState.bucket) { diff --git a/modules/module-postgres-storage/test/src/storage_compacting.test.ts b/modules/module-postgres-storage/test/src/storage_compacting.test.ts index f0b02b696..f0e09e89b 100644 --- a/modules/module-postgres-storage/test/src/storage_compacting.test.ts +++ b/modules/module-postgres-storage/test/src/storage_compacting.test.ts @@ -2,4 +2,4 @@ import { register } from '@powersync/service-core-tests'; import { describe } from 'vitest'; import { POSTGRES_STORAGE_FACTORY } from './util.js'; -describe('Postgres Sync Bucket Storage Compact', () => register.registerCompactTests(POSTGRES_STORAGE_FACTORY, {})); +describe('Postgres Sync Bucket Storage Compact', () => register.registerCompactTests(POSTGRES_STORAGE_FACTORY)); diff --git a/packages/service-core-tests/src/tests/register-compacting-tests.ts b/packages/service-core-tests/src/tests/register-compacting-tests.ts index 97c5ed1b1..fe5872c52 100644 --- a/packages/service-core-tests/src/tests/register-compacting-tests.ts +++ b/packages/service-core-tests/src/tests/register-compacting-tests.ts @@ -15,10 +15,7 @@ const TEST_TABLE = test_utils.makeTestTable('test', ['id']); * compactTests(() => new MongoStorageFactory(), { clearBatchLimit: 2, moveBatchLimit: 1, moveBatchQueryLimit: 1 })); * ``` */ -export function registerCompactTests( - generateStorageFactory: storage.TestStorageFactory, - compactOptions: CompactOptions -) { +export function registerCompactTests(generateStorageFactory: storage.TestStorageFactory) { test('compacting (1)', async () => { const sync_rules = test_utils.testRules(` bucket_definitions: @@ -87,7 +84,11 @@ bucket_definitions: } ]); - await bucketStorage.compact(compactOptions); + await bucketStorage.compact({ + clearBatchLimit: 2, + moveBatchLimit: 1, + moveBatchQueryLimit: 1 + }); const batchAfter = await test_utils.oneFromAsync( bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', '0']])) @@ -204,7 +205,11 @@ bucket_definitions: } ]); - await bucketStorage.compact(compactOptions); + await bucketStorage.compact({ + clearBatchLimit: 2, + moveBatchLimit: 1, + moveBatchQueryLimit: 1 + }); const batchAfter = await test_utils.oneFromAsync( bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', '0']])) @@ -285,7 +290,11 @@ bucket_definitions: }); const checkpoint2 = result2!.flushed_op; - await bucketStorage.compact(compactOptions); + await bucketStorage.compact({ + clearBatchLimit: 2, + moveBatchLimit: 1, + moveBatchQueryLimit: 1 + }); const batchAfter = await test_utils.oneFromAsync( bucketStorage.getBucketDataBatch(checkpoint2, new Map([['global[]', '0']])) @@ -307,4 +316,130 @@ bucket_definitions: checksum: 1874612650 }); }); + + test('compacting (4)', async () => { + const sync_rules = test_utils.testRules(/* yaml */ + ` bucket_definitions: + grouped: + # The parameter query here is not important + # We specifically don't want to create bucket_parameter records here + # since the op_ids for bucket_data could vary between storage implementations. + parameters: select 'b' as b + data: + - select * from test where b = bucket.b`); + + await using factory = await generateStorageFactory(); + const bucketStorage = factory.getInstance(sync_rules); + + const result = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + /** + * Repeatedly create operations which fall into different buckets. + * The bucket operations are purposely interleaved as the op_id increases. + * A large amount of operations are created here. + * The configured window of compacting operations is 100. This means the initial window will + * contain operations from multiple buckets. + */ + for (let count = 0; count < 100; count++) { + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { + id: 't1', + b: 'b1', + value: 'start' + }, + afterReplicaId: test_utils.rid('t1') + }); + + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.UPDATE, + after: { + id: 't1', + b: 'b1', + value: 'intermediate' + }, + afterReplicaId: test_utils.rid('t1') + }); + + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { + id: 't2', + b: 'b2', + value: 'start' + }, + afterReplicaId: test_utils.rid('t2') + }); + + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.UPDATE, + after: { + id: 't1', + b: 'b1', + value: 'final' + }, + afterReplicaId: test_utils.rid('t1') + }); + + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.UPDATE, + after: { + id: 't2', + b: 'b2', + value: 'final' + }, + afterReplicaId: test_utils.rid('t2') + }); + } + }); + + const checkpoint = result!.flushed_op; + + await bucketStorage.compact({ + clearBatchLimit: 100, + moveBatchLimit: 100, + moveBatchQueryLimit: 100 // Larger limit for a larger window of operations + }); + + const batchAfter = await test_utils.fromAsync( + bucketStorage.getBucketDataBatch( + checkpoint, + new Map([ + ['grouped["b1"]', '0'], + ['grouped["b2"]', '0'] + ]) + ) + ); + const dataAfter = batchAfter.flatMap((b) => b.batch.data); + + // The op_ids will vary between MongoDB and Postgres storage + expect(dataAfter).toMatchObject( + expect.arrayContaining([ + { op_id: '497', op: 'CLEAR', checksum: -937074151 }, + { + op_id: '499', + op: 'PUT', + object_type: 'test', + object_id: 't1', + checksum: 52221819, + subkey: '6544e3899293153fa7b38331/117ab485-4b42-58a2-ab32-0053a22c3423', + data: '{"id":"t1","b":"b1","value":"final"}' + }, + { op_id: '498', op: 'CLEAR', checksum: -234380197 }, + { + op_id: '500', + op: 'PUT', + object_type: 'test', + object_id: 't2', + checksum: 2126669493, + subkey: '6544e3899293153fa7b38331/ec27c691-b47a-5d92-927a-9944feb89eee', + data: '{"id":"t2","b":"b2","value":"final"}' + } + ]) + ); + }); } diff --git a/packages/service-core/src/storage/SyncRulesBucketStorage.ts b/packages/service-core/src/storage/SyncRulesBucketStorage.ts index 5aeb1f68e..f80b511ad 100644 --- a/packages/service-core/src/storage/SyncRulesBucketStorage.ts +++ b/packages/service-core/src/storage/SyncRulesBucketStorage.ts @@ -179,6 +179,15 @@ export interface CompactOptions { * These can be individual bucket names, or bucket definition names. */ compactBuckets?: string[]; + + /** Minimum of 2 */ + clearBatchLimit?: number; + + /** Minimum of 1 */ + moveBatchLimit?: number; + + /** Minimum of 1 */ + moveBatchQueryLimit?: number; } export interface TerminateOptions {