Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/sixty-melons-bow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@powersync/service-module-postgres-storage': patch
'@powersync/service-core-tests': patch
'@powersync/service-image': patch
---

[Postgres Storage] Fix op_id_sequence initialization edge case
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ export class PostgresSyncRulesStorage

await callback(batch);
await batch.flush();
if (batch.last_flushed_op) {
if (batch.last_flushed_op != null) {
return { flushed_op: batch.last_flushed_op };
} else {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,7 @@ export class PostgresBucketBatch
return null;
}

const currentSequence = await this.db.sql`
SELECT
LAST_VALUE AS value
FROM
op_id_sequence;
`.first<{ value: bigint }>();
return currentSequence!.value;
return this.getLastOpIdSequence(this.db);
}

async drop(sourceTables: storage.SourceTable[]): Promise<storage.FlushedResult | null> {
Expand Down Expand Up @@ -262,13 +256,7 @@ export class PostgresBucketBatch
const lastOp = await this.withReplicationTransaction(async (db) => {
resumeBatch = await this.replicateBatch(db, batch);

const sequence = await db.sql`
SELECT
LAST_VALUE AS value
FROM
op_id_sequence;
`.first<{ value: bigint }>();
return sequence!.value;
return this.getLastOpIdSequence(db);
});

// null if done, set if we need another flush
Expand Down Expand Up @@ -895,6 +883,23 @@ export class PostgresBucketBatch
`.execute();
}
}

private async getLastOpIdSequence(db: lib_postgres.AbstractPostgresConnection) {
// When no op_id has been generated, last_value = 1 and nextval() will be 1.
// To cater for this case, we check is_called, and default to 0 if no value has been generated.
const sequence = await db.sql`
SELECT
(
CASE
WHEN is_called THEN last_value
ELSE 0
END
) AS value
FROM
op_id_sequence;
`.first<{ value: bigint }>();
return sequence!.value;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1878,4 +1878,54 @@ bucket_definitions:
}
});
});

test('op_id initialization edge case', async () => {
// Test syncing a batch of data that is small in count,
// but large enough in size to be split over multiple returned chunks.
// Similar to the above test, but splits over 1MB chunks.
const sync_rules = test_utils.testRules(
`
bucket_definitions:
global:
data:
- SELECT id FROM test
- SELECT id FROM test_ignore WHERE false
`
);
await using factory = await generateStorageFactory();
const bucketStorage = factory.getInstance(sync_rules);

const sourceTable = test_utils.makeTestTable('test', ['id']);
const sourceTableIgnore = test_utils.makeTestTable('test_ignore', ['id']);

const result1 = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => {
// This saves a record to current_data, but not bucket_data.
// This causes a checkpoint to be created without increasing the op_id sequence.
await batch.save({
sourceTable: sourceTableIgnore,
tag: storage.SaveOperationTag.INSERT,
after: {
id: 'test1'
},
afterReplicaId: test_utils.rid('test1')
});
});

const checkpoint1 = result1!.flushed_op;

const result2 = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => {
await batch.save({
sourceTable: sourceTable,
tag: storage.SaveOperationTag.INSERT,
after: {
id: 'test2'
},
afterReplicaId: test_utils.rid('test2')
});
});

const checkpoint2 = result2!.flushed_op;
// we expect 0n and 1n, or 1n and 2n.
expect(checkpoint2).toBeGreaterThan(checkpoint1);
});
}