Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/sixty-melons-bow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@powersync/service-module-postgres-storage': patch
'@powersync/service-core-tests': patch
'@powersync/service-image': patch
---

[Postgres Storage] Fix op_id_sequence intialization edge case
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ export class PostgresSyncRulesStorage

await callback(batch);
await batch.flush();
if (batch.last_flushed_op) {
if (batch.last_flushed_op != null) {
return { flushed_op: batch.last_flushed_op };
} else {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,7 @@ export class PostgresBucketBatch
return null;
}

const currentSequence = await this.db.sql`
SELECT
LAST_VALUE AS value
FROM
op_id_sequence;
`.first<{ value: bigint }>();
return currentSequence!.value;
return this.getLastOpIdSequence(this.db);
}

async drop(sourceTables: storage.SourceTable[]): Promise<storage.FlushedResult | null> {
Expand Down Expand Up @@ -262,13 +256,7 @@ export class PostgresBucketBatch
const lastOp = await this.withReplicationTransaction(async (db) => {
resumeBatch = await this.replicateBatch(db, batch);

const sequence = await db.sql`
SELECT
LAST_VALUE AS value
FROM
op_id_sequence;
`.first<{ value: bigint }>();
return sequence!.value;
return this.getLastOpIdSequence(db);
});

// null if done, set if we need another flush
Expand Down Expand Up @@ -895,6 +883,23 @@ export class PostgresBucketBatch
`.execute();
}
}

private async getLastOpIdSequence(db: lib_postgres.AbstractPostgresConnection) {
// When no op_id has been generated, last_value = 1 and nextval() will be 1.
// To cater for this case, we check is_called, and default to 0 if no value has been generated.
const sequence = await db.sql`
SELECT
(
CASE
WHEN is_called THEN last_value
ELSE 0
END
) AS value
FROM
op_id_sequence;
`.first<{ value: bigint }>();
return sequence!.value;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1878,4 +1878,54 @@ bucket_definitions:
}
});
});

test('op_id initialization edge case', async () => {
// Test syncing a batch of data that is small in count,
// but large enough in size to be split over multiple returned chunks.
// Similar to the above test, but splits over 1MB chunks.
const sync_rules = test_utils.testRules(
`
bucket_definitions:
global:
data:
- SELECT id FROM test
- SELECT id FROM test_ignore WHERE false
`
);
await using factory = await generateStorageFactory();
const bucketStorage = factory.getInstance(sync_rules);

const sourceTable = test_utils.makeTestTable('test', ['id']);
const sourceTableIgnore = test_utils.makeTestTable('test_ignore', ['id']);

const result1 = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => {
// This saves a record to current_data, but not bucket_data.
// This causes a checkpoint to be created without increasing the op_id sequence.
await batch.save({
sourceTable: sourceTableIgnore,
tag: storage.SaveOperationTag.INSERT,
after: {
id: 'test1'
},
afterReplicaId: test_utils.rid('test1')
});
});

const checkpoint1 = result1!.flushed_op;

const result2 = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => {
await batch.save({
sourceTable: sourceTable,
tag: storage.SaveOperationTag.INSERT,
after: {
id: 'test2'
},
afterReplicaId: test_utils.rid('test2')
});
});

const checkpoint2 = result2!.flushed_op;
// we expect 0n and 1n, or 1n and 2n.
expect(checkpoint2).toBeGreaterThan(checkpoint1);
});
}