diff --git a/.changeset/sixty-melons-bow.md b/.changeset/sixty-melons-bow.md new file mode 100644 index 000000000..87ec8c2a5 --- /dev/null +++ b/.changeset/sixty-melons-bow.md @@ -0,0 +1,7 @@ +--- +'@powersync/service-module-postgres-storage': patch +'@powersync/service-core-tests': patch +'@powersync/service-image': patch +--- + +[Postgres Storage] Fix op_id_sequence initialization edge case diff --git a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts index 7da3e59e7..e785549e8 100644 --- a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts +++ b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts @@ -336,7 +336,7 @@ export class PostgresSyncRulesStorage await callback(batch); await batch.flush(); - if (batch.last_flushed_op) { + if (batch.last_flushed_op != null) { return { flushed_op: batch.last_flushed_op }; } else { return null; diff --git a/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts b/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts index 1bb964719..4bddd5430 100644 --- a/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts +++ b/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts @@ -208,13 +208,7 @@ export class PostgresBucketBatch return null; } - const currentSequence = await this.db.sql` - SELECT - LAST_VALUE AS value - FROM - op_id_sequence; - `.first<{ value: bigint }>(); - return currentSequence!.value; + return this.getLastOpIdSequence(this.db); } async drop(sourceTables: storage.SourceTable[]): Promise { @@ -262,13 +256,7 @@ export class PostgresBucketBatch const lastOp = await this.withReplicationTransaction(async (db) => { resumeBatch = await this.replicateBatch(db, batch); - const sequence = await db.sql` - SELECT - LAST_VALUE AS value - FROM - op_id_sequence; - `.first<{ value: bigint }>(); - return sequence!.value; + return this.getLastOpIdSequence(db); }); // null if done, set if we need another flush @@ -895,6 +883,23 @@ export class PostgresBucketBatch `.execute(); } } + + private async getLastOpIdSequence(db: lib_postgres.AbstractPostgresConnection) { + // When no op_id has been generated, last_value = 1 and nextval() will be 1. + // To cater for this case, we check is_called, and default to 0 if no value has been generated. + const sequence = await db.sql` + SELECT + ( + CASE + WHEN is_called THEN last_value + ELSE 0 + END + ) AS value + FROM + op_id_sequence; + `.first<{ value: bigint }>(); + return sequence!.value; + } } /** diff --git a/packages/service-core-tests/src/tests/register-data-storage-tests.ts b/packages/service-core-tests/src/tests/register-data-storage-tests.ts index d64631294..f46a4cf4a 100644 --- a/packages/service-core-tests/src/tests/register-data-storage-tests.ts +++ b/packages/service-core-tests/src/tests/register-data-storage-tests.ts @@ -1878,4 +1878,54 @@ bucket_definitions: } }); }); + + test('op_id initialization edge case', async () => { + // Test syncing a batch of data that is small in count, + // but large enough in size to be split over multiple returned chunks. + // Similar to the above test, but splits over 1MB chunks. + const sync_rules = test_utils.testRules( + ` + bucket_definitions: + global: + data: + - SELECT id FROM test + - SELECT id FROM test_ignore WHERE false + ` + ); + await using factory = await generateStorageFactory(); + const bucketStorage = factory.getInstance(sync_rules); + + const sourceTable = test_utils.makeTestTable('test', ['id']); + const sourceTableIgnore = test_utils.makeTestTable('test_ignore', ['id']); + + const result1 = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + // This saves a record to current_data, but not bucket_data. + // This causes a checkpoint to be created without increasing the op_id sequence. + await batch.save({ + sourceTable: sourceTableIgnore, + tag: storage.SaveOperationTag.INSERT, + after: { + id: 'test1' + }, + afterReplicaId: test_utils.rid('test1') + }); + }); + + const checkpoint1 = result1!.flushed_op; + + const result2 = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.save({ + sourceTable: sourceTable, + tag: storage.SaveOperationTag.INSERT, + after: { + id: 'test2' + }, + afterReplicaId: test_utils.rid('test2') + }); + }); + + const checkpoint2 = result2!.flushed_op; + // we expect 0n and 1n, or 1n and 2n. + expect(checkpoint2).toBeGreaterThan(checkpoint1); + }); }