diff --git a/.changeset/smart-years-grin.md b/.changeset/smart-years-grin.md new file mode 100644 index 000000000..76df8bc96 --- /dev/null +++ b/.changeset/smart-years-grin.md @@ -0,0 +1,5 @@ +--- +'@powersync/service-core': patch +--- + +Fix missing checkpoint complete line for empty sync iterations. diff --git a/modules/module-mongodb-storage/test/src/__snapshots__/storage_sync.test.ts.snap b/modules/module-mongodb-storage/test/src/__snapshots__/storage_sync.test.ts.snap index efb28bcd3..717e52de5 100644 --- a/modules/module-mongodb-storage/test/src/__snapshots__/storage_sync.test.ts.snap +++ b/modules/module-mongodb-storage/test/src/__snapshots__/storage_sync.test.ts.snap @@ -127,6 +127,62 @@ exports[`sync - mongodb > expiring token 2`] = ` ] `; +exports[`sync - mongodb > sends checkpoint complete line for empty checkpoint 1`] = ` +[ + { + "checkpoint": { + "buckets": [ + { + "bucket": "mybucket[]", + "checksum": -1221282404, + "count": 1, + "priority": 3, + }, + ], + "last_op_id": "1", + "write_checkpoint": undefined, + }, + }, + { + "data": { + "after": "0", + "bucket": "mybucket[]", + "data": [ + { + "checksum": 3073684892, + "data": "{"id":"t1","description":"sync"}", + "object_id": "t1", + "object_type": "test", + "op": "PUT", + "op_id": "1", + "subkey": "e5aa2ddc-1328-58fa-a000-0b5ed31eaf1a", + }, + ], + "has_more": false, + "next_after": "1", + }, + }, + { + "checkpoint_complete": { + "last_op_id": "1", + }, + }, + { + "checkpoint_diff": { + "last_op_id": "1", + "removed_buckets": [], + "updated_buckets": [], + "write_checkpoint": "1", + }, + }, + { + "checkpoint_complete": { + "last_op_id": "1", + }, + }, +] +`; + exports[`sync - mongodb > sync buckets in order 1`] = ` [ { diff --git a/modules/module-postgres-storage/test/src/__snapshots__/storage_sync.test.ts.snap b/modules/module-postgres-storage/test/src/__snapshots__/storage_sync.test.ts.snap index 52af71a0c..0fc987919 100644 --- a/modules/module-postgres-storage/test/src/__snapshots__/storage_sync.test.ts.snap +++ b/modules/module-postgres-storage/test/src/__snapshots__/storage_sync.test.ts.snap @@ -127,6 +127,62 @@ exports[`sync - postgres > expiring token 2`] = ` ] `; +exports[`sync - postgres > sends checkpoint complete line for empty checkpoint 1`] = ` +[ + { + "checkpoint": { + "buckets": [ + { + "bucket": "mybucket[]", + "checksum": -1221282404, + "count": 1, + "priority": 3, + }, + ], + "last_op_id": "1", + "write_checkpoint": undefined, + }, + }, + { + "data": { + "after": "0", + "bucket": "mybucket[]", + "data": [ + { + "checksum": 3073684892, + "data": "{"id":"t1","description":"sync"}", + "object_id": "t1", + "object_type": "test", + "op": "PUT", + "op_id": "1", + "subkey": "02d285ac-4f96-5124-8fba-c6d1df992dd1", + }, + ], + "has_more": false, + "next_after": "1", + }, + }, + { + "checkpoint_complete": { + "last_op_id": "1", + }, + }, + { + "checkpoint_diff": { + "last_op_id": "1", + "removed_buckets": [], + "updated_buckets": [], + "write_checkpoint": "1", + }, + }, + { + "checkpoint_complete": { + "last_op_id": "1", + }, + }, +] +`; + exports[`sync - postgres > sync buckets in order 1`] = ` [ { diff --git a/packages/service-core-tests/src/tests/register-sync-tests.ts b/packages/service-core-tests/src/tests/register-sync-tests.ts index 72548e6ac..3cf33dd75 100644 --- a/packages/service-core-tests/src/tests/register-sync-tests.ts +++ b/packages/service-core-tests/src/tests/register-sync-tests.ts @@ -253,6 +253,69 @@ bucket_definitions: expect(sentRows).toBe(10002); }); + test('sends checkpoint complete line for empty checkpoint', async () => { + await using f = await factory(); + + const syncRules = await f.updateSyncRules({ + content: BASIC_SYNC_RULES + }); + const bucketStorage = f.getInstance(syncRules); + await bucketStorage.autoActivate(); + + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { + id: 't1', + description: 'sync' + }, + afterReplicaId: 't1' + }); + await batch.commit('0/1'); + }); + + const stream = sync.streamResponse({ + bucketStorage: bucketStorage, + syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS), + params: { + buckets: [], + include_checksum: true, + raw_data: true + }, + tracker, + syncParams: new RequestParameters({ sub: '' }, {}), + token: { exp: Date.now() / 1000 + 100000 } as any + }); + + const lines: any[] = []; + let receivedCompletions = 0; + + for await (let next of stream) { + if (typeof next == 'string') { + next = JSON.parse(next); + } + lines.push(next); + + if (typeof next === 'object' && next !== null) { + if ('checkpoint_complete' in next) { + receivedCompletions++; + if (receivedCompletions == 1) { + // Trigger an empty bucket update. + await bucketStorage.createManagedWriteCheckpoint({user_id: '', heads: {'1': '1/0'}}); + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.commit('1/0'); + }); + } else { + break; + } + } + } + } + + expect(lines).toMatchSnapshot(); + }); + test('sync legacy non-raw data', async () => { const f = await factory(); diff --git a/packages/service-core/src/sync/sync.ts b/packages/service-core/src/sync/sync.ts index ae1e0c4ed..7c9ccb20e 100644 --- a/packages/service-core/src/sync/sync.ts +++ b/packages/service-core/src/sync/sync.ts @@ -155,6 +155,15 @@ async function* streamResponseInner( bucketsByPriority.sort((a, b) => a[0] - b[0]); // Sort from high to lower priorities const lowestPriority = bucketsByPriority.at(-1)?.[0]; + // Ensure that we have at least one priority batch: After sending the checkpoint line, clients expect to + // receive a sync complete message after the synchronization is done (which happens in the last + // bucketDataInBatches iteration). Without any batch, the line is missing and clients might not complete their + // sync properly. + const priorityBatches: [BucketPriority | null, BucketDescription[]][] = bucketsByPriority; + if (priorityBatches.length == 0) { + priorityBatches.push([null, []]); + } + function maybeRaceForNewCheckpoint() { if (syncedOperations >= 1000 && nextCheckpointPromise === undefined) { nextCheckpointPromise = (async () => { @@ -179,7 +188,7 @@ async function* streamResponseInner( // This incrementally updates dataBuckets with each individual bucket position. // At the end of this, we can be sure that all buckets have data up to the checkpoint. - for (const [priority, buckets] of bucketsByPriority) { + for (const [priority, buckets] of priorityBatches) { const isLast = priority === lowestPriority; if (abortCheckpointSignal.aborted) { break; @@ -196,9 +205,9 @@ async function* streamResponseInner( abort_connection: signal, abort_batch: abortCheckpointSignal, user_id: syncParams.user_id, - // Passing undefined will emit a full sync complete message at the end. If we pass a priority, we'll emit a partial - // sync complete message. - forPriority: !isLast ? priority : undefined + // Passing null here will emit a full sync complete message at the end. If we pass a priority, we'll emit a partial + // sync complete message instead. + forPriority: !isLast ? priority : null }); } @@ -227,7 +236,7 @@ interface BucketDataRequest { */ abort_batch: AbortSignal; user_id?: string; - forPriority?: BucketPriority; + forPriority: BucketPriority | null; onRowsSent: (amount: number) => void; } @@ -372,7 +381,7 @@ async function* bucketDataBatch(request: BucketDataRequest): AsyncGenerator