Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/smart-years-grin.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/service-core': patch
---

Fix missing checkpoint complete line for empty sync iterations.
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,62 @@ exports[`sync - mongodb > expiring token 2`] = `
]
`;

exports[`sync - mongodb > sends checkpoint complete line for empty checkpoint 1`] = `
[
{
"checkpoint": {
"buckets": [
{
"bucket": "mybucket[]",
"checksum": -1221282404,
"count": 1,
"priority": 3,
},
],
"last_op_id": "1",
"write_checkpoint": undefined,
},
},
{
"data": {
"after": "0",
"bucket": "mybucket[]",
"data": [
{
"checksum": 3073684892,
"data": "{"id":"t1","description":"sync"}",
"object_id": "t1",
"object_type": "test",
"op": "PUT",
"op_id": "1",
"subkey": "e5aa2ddc-1328-58fa-a000-0b5ed31eaf1a",
},
],
"has_more": false,
"next_after": "1",
},
},
{
"checkpoint_complete": {
"last_op_id": "1",
},
},
{
"checkpoint_diff": {
"last_op_id": "1",
"removed_buckets": [],
"updated_buckets": [],
"write_checkpoint": "1",
},
},
{
"checkpoint_complete": {
"last_op_id": "1",
},
},
]
`;

exports[`sync - mongodb > sync buckets in order 1`] = `
[
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,62 @@ exports[`sync - postgres > expiring token 2`] = `
]
`;

exports[`sync - postgres > sends checkpoint complete line for empty checkpoint 1`] = `
[
{
"checkpoint": {
"buckets": [
{
"bucket": "mybucket[]",
"checksum": -1221282404,
"count": 1,
"priority": 3,
},
],
"last_op_id": "1",
"write_checkpoint": undefined,
},
},
{
"data": {
"after": "0",
"bucket": "mybucket[]",
"data": [
{
"checksum": 3073684892,
"data": "{"id":"t1","description":"sync"}",
"object_id": "t1",
"object_type": "test",
"op": "PUT",
"op_id": "1",
"subkey": "02d285ac-4f96-5124-8fba-c6d1df992dd1",
},
],
"has_more": false,
"next_after": "1",
},
},
{
"checkpoint_complete": {
"last_op_id": "1",
},
},
{
"checkpoint_diff": {
"last_op_id": "1",
"removed_buckets": [],
"updated_buckets": [],
"write_checkpoint": "1",
},
},
{
"checkpoint_complete": {
"last_op_id": "1",
},
},
]
`;

exports[`sync - postgres > sync buckets in order 1`] = `
[
{
Expand Down
63 changes: 63 additions & 0 deletions packages/service-core-tests/src/tests/register-sync-tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,69 @@ bucket_definitions:
expect(sentRows).toBe(10002);
});

test('sends checkpoint complete line for empty checkpoint', async () => {
await using f = await factory();

const syncRules = await f.updateSyncRules({
content: BASIC_SYNC_RULES
});
const bucketStorage = f.getInstance(syncRules);
await bucketStorage.autoActivate();

await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => {
await batch.save({
sourceTable: TEST_TABLE,
tag: storage.SaveOperationTag.INSERT,
after: {
id: 't1',
description: 'sync'
},
afterReplicaId: 't1'
});
await batch.commit('0/1');
});

const stream = sync.streamResponse({
bucketStorage: bucketStorage,
syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS),
params: {
buckets: [],
include_checksum: true,
raw_data: true
},
tracker,
syncParams: new RequestParameters({ sub: '' }, {}),
token: { exp: Date.now() / 1000 + 100000 } as any
});

const lines: any[] = [];
let receivedCompletions = 0;

for await (let next of stream) {
if (typeof next == 'string') {
next = JSON.parse(next);
}
lines.push(next);

if (typeof next === 'object' && next !== null) {
if ('checkpoint_complete' in next) {
receivedCompletions++;
if (receivedCompletions == 1) {
// Trigger an empty bucket update.
await bucketStorage.createManagedWriteCheckpoint({user_id: '', heads: {'1': '1/0'}});
await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => {
await batch.commit('1/0');
});
} else {
break;
}
}
}
}

expect(lines).toMatchSnapshot();
});

test('sync legacy non-raw data', async () => {
const f = await factory();

Expand Down
21 changes: 15 additions & 6 deletions packages/service-core/src/sync/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,15 @@ async function* streamResponseInner(
bucketsByPriority.sort((a, b) => a[0] - b[0]); // Sort from high to lower priorities
const lowestPriority = bucketsByPriority.at(-1)?.[0];

// Ensure that we have at least one priority batch: After sending the checkpoint line, clients expect to
// receive a sync complete message after the synchronization is done (which happens in the last
// bucketDataInBatches iteration). Without any batch, the line is missing and clients might not complete their
// sync properly.
const priorityBatches: [BucketPriority | null, BucketDescription[]][] = bucketsByPriority;
if (priorityBatches.length == 0) {
priorityBatches.push([null, []]);
}

function maybeRaceForNewCheckpoint() {
if (syncedOperations >= 1000 && nextCheckpointPromise === undefined) {
nextCheckpointPromise = (async () => {
Expand All @@ -179,7 +188,7 @@ async function* streamResponseInner(

// This incrementally updates dataBuckets with each individual bucket position.
// At the end of this, we can be sure that all buckets have data up to the checkpoint.
for (const [priority, buckets] of bucketsByPriority) {
for (const [priority, buckets] of priorityBatches) {
const isLast = priority === lowestPriority;
if (abortCheckpointSignal.aborted) {
break;
Expand All @@ -196,9 +205,9 @@ async function* streamResponseInner(
abort_connection: signal,
abort_batch: abortCheckpointSignal,
user_id: syncParams.user_id,
// Passing undefined will emit a full sync complete message at the end. If we pass a priority, we'll emit a partial
// sync complete message.
forPriority: !isLast ? priority : undefined
// Passing null here will emit a full sync complete message at the end. If we pass a priority, we'll emit a partial
// sync complete message instead.
forPriority: !isLast ? priority : null
});
}

Expand Down Expand Up @@ -227,7 +236,7 @@ interface BucketDataRequest {
*/
abort_batch: AbortSignal;
user_id?: string;
forPriority?: BucketPriority;
forPriority: BucketPriority | null;
onRowsSent: (amount: number) => void;
}

Expand Down Expand Up @@ -372,7 +381,7 @@ async function* bucketDataBatch(request: BucketDataRequest): AsyncGenerator<Buck
// More data should be available immediately for a new checkpoint.
yield { data: null, done: true };
} else {
if (request.forPriority !== undefined) {
if (request.forPriority != null) {
const line: util.StreamingSyncCheckpointPartiallyComplete = {
partial_checkpoint_complete: {
last_op_id: checkpoint,
Expand Down
Loading