From 310d8d9c91fe665dc1f768ce78a7dfb82852ae9f Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 14 Jul 2025 15:50:18 +0200 Subject: [PATCH 1/3] Clear pendingValidatedCheckpoint on new checkpoints. --- .../sync/stream/AbstractStreamingSyncImplementation.ts | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts index 8bb2e8509..a74abe412 100644 --- a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts +++ b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts @@ -716,6 +716,8 @@ The next upload iteration will be delayed.`); if (isStreamingSyncCheckpoint(line)) { targetCheckpoint = line.checkpoint; + // New checkpoint - existing validated checkpoint is no longer valid + pendingValidatedCheckpoint = null; const bucketsToDelete = new Set(bucketMap.keys()); const newBuckets = new Map(); for (const checksum of line.checkpoint.buckets) { @@ -737,7 +739,13 @@ The next upload iteration will be delayed.`); if (result.endIteration) { return; } else if (!result.applied) { + // "Could not apply checkpoint due to local data". We need to retry after + // finishing uploads. pendingValidatedCheckpoint = targetCheckpoint; + } else { + // Nothing to retry later. This would likely already be null from the last + // checksum or checksum_diff operation, but we make sure. + pendingValidatedCheckpoint = null; } } else if (isStreamingSyncCheckpointPartiallyComplete(line)) { const priority = line.partial_checkpoint_complete.priority; @@ -773,6 +781,8 @@ The next upload iteration will be delayed.`); if (targetCheckpoint == null) { throw new Error('Checkpoint diff without previous checkpoint'); } + // New checkpoint - existing validated checkpoint is no longer valid + pendingValidatedCheckpoint = null; const diff = line.checkpoint_diff; const newBuckets = new Map(); for (const checksum of targetCheckpoint.buckets) { From 1007b9b9ffefb2242ee7232b7d4ab84cd51b432c Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 14 Jul 2025 17:20:02 +0200 Subject: [PATCH 2/3] Add test --- packages/node/src/db/PowerSyncDatabase.ts | 6 +- packages/node/tests/sync.test.ts | 80 +++++++++++++++++++++++ 2 files changed, 84 insertions(+), 2 deletions(-) diff --git a/packages/node/src/db/PowerSyncDatabase.ts b/packages/node/src/db/PowerSyncDatabase.ts index a7c6c9e00..459053abf 100644 --- a/packages/node/src/db/PowerSyncDatabase.ts +++ b/packages/node/src/db/PowerSyncDatabase.ts @@ -78,7 +78,8 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase { connector: PowerSyncBackendConnector, options: NodeAdditionalConnectionOptions ): AbstractStreamingSyncImplementation { - const remote = new NodeRemote(connector, this.options.logger, { + const logger = this.options.logger; + const remote = new NodeRemote(connector, logger, { dispatcher: options.dispatcher, ...(this.options as NodePowerSyncDatabaseOptions).remoteOptions }); @@ -92,7 +93,8 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase { }, retryDelayMs: this.options.retryDelayMs, crudUploadThrottleMs: this.options.crudUploadThrottleMs, - identifier: this.database.name + identifier: this.database.name, + logger: logger }); } } diff --git a/packages/node/tests/sync.test.ts b/packages/node/tests/sync.test.ts index 2b513a0c1..5d2f7199d 100644 --- a/packages/node/tests/sync.test.ts +++ b/packages/node/tests/sync.test.ts @@ -1,9 +1,11 @@ import { describe, vi, expect, beforeEach } from 'vitest'; +import util from 'node:util'; import { MockSyncService, mockSyncServiceTest, TestConnector, waitForSyncStatus } from './utils'; import { AbstractPowerSyncDatabase, BucketChecksum, + createLogger, OplogEntryJSON, PowerSyncConnectionOptions, ProgressWithOperations, @@ -537,6 +539,84 @@ function defineSyncTests(impl: SyncClientImplementation) { expect(rows).toStrictEqual([{ name: 'from server' }]); }); + mockSyncServiceTest('handles uploads across checkpoints', async ({ syncService }) => { + const logger = createLogger('test', { logLevel: Logger.TRACE }); + const logMessages: string[] = []; + (logger as any).invoke = (level, args) => { + console.log(...args); + logMessages.push(util.format(...args)); + }; + + // Regression test for https://github.com/powersync-ja/powersync-js/pull/665 + let database = await syncService.createDatabase({ logger }); + const connector = new TestConnector(); + let finishUpload: () => void; + const finishUploadPromise = new Promise((resolve, reject) => { + finishUpload = resolve; + }); + connector.uploadData = async (db) => { + const batch = await db.getCrudBatch(); + if (batch != null) { + await finishUploadPromise; + await batch.complete(); + } + }; + + await database.execute('INSERT INTO lists (id, name) VALUES (uuid(), ?);', ['local']); + database.connect(connector, options); + await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1)); + + syncService.pushLine({ checkpoint: { last_op_id: '1', write_checkpoint: '1', buckets: [bucket('a', 1)] } }); + syncService.pushLine({ + data: { + bucket: 'a', + data: [ + { + checksum: 0, + op_id: '1', + op: 'PUT', + object_id: '1', + object_type: 'lists', + data: '{"name": "s1"}' + } + ] + } + }); + // 1. Could not apply checkpoint due to local data. We will retry [...] after that upload is completed. + syncService.pushLine({ checkpoint_complete: { last_op_id: '1' } }); + await vi.waitFor(() => { + expect(logMessages).toEqual(expect.arrayContaining([expect.stringContaining('due to local data')])); + }); + + // 2. Send additional checkpoint while we're still busy uploading + syncService.pushLine({ checkpoint: { last_op_id: '2', write_checkpoint: '2', buckets: [bucket('a', 2)] } }); + syncService.pushLine({ + data: { + bucket: 'a', + data: [ + { + checksum: 0, + op_id: '2', + op: 'PUT', + object_id: '2', + object_type: 'lists', + data: '{"name": "s2"}' + } + ] + } + }); + syncService.pushLine({ checkpoint_complete: { last_op_id: '2' } }); + + // 3. Crud upload complete + finishUpload!(); + + // 4. Ensure the database is applying the second checkpoint + await vi.waitFor(async () => { + const rows = await database.getAll('SELECT * FROM lists WHERE name = ?', ['s2']); + expect(rows).toHaveLength(1); + }); + }); + mockSyncServiceTest('should update sync state incrementally', async ({ syncService }) => { const powersync = await syncService.createDatabase(); powersync.connect(new TestConnector(), options); From 489ab327738a43e55e584dd87d9a59266e2a061f Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 14 Jul 2025 17:21:02 +0200 Subject: [PATCH 3/3] Simplify --- packages/node/src/db/PowerSyncDatabase.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/node/src/db/PowerSyncDatabase.ts b/packages/node/src/db/PowerSyncDatabase.ts index 459053abf..da1325941 100644 --- a/packages/node/src/db/PowerSyncDatabase.ts +++ b/packages/node/src/db/PowerSyncDatabase.ts @@ -94,7 +94,7 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase { retryDelayMs: this.options.retryDelayMs, crudUploadThrottleMs: this.options.crudUploadThrottleMs, identifier: this.database.name, - logger: logger + logger }); } }