From 7769c5c93b3d4b20eec6ec20c582bac0b139a7b7 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 30 Jun 2025 16:22:59 -0600 Subject: [PATCH 1/5] Rust: Trigger CRUD upload on connect --- .changeset/light-clocks-hang.md | 5 +++ .../AbstractStreamingSyncImplementation.ts | 6 +++ packages/common/src/utils/DataStream.ts | 19 +++++++--- packages/node/tests/sync.test.ts | 37 ++++++++++++++++++- packages/node/tests/utils.ts | 7 ++++ 5 files changed, 67 insertions(+), 7 deletions(-) create mode 100644 .changeset/light-clocks-hang.md diff --git a/.changeset/light-clocks-hang.md b/.changeset/light-clocks-hang.md new file mode 100644 index 000000000..fc866ab8e --- /dev/null +++ b/.changeset/light-clocks-hang.md @@ -0,0 +1,5 @@ +--- +'@powersync/common': patch +--- + +Rust client: Properly upload CRUD entries made while offline. diff --git a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts index e8aac6aaf..ed39b7bdf 100644 --- a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts +++ b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts @@ -840,6 +840,7 @@ The next upload iteration will be delayed.`); const adapter = this.options.adapter; const remote = this.options.remote; let receivingLines: Promise | null = null; + let hadSyncLine = false; const abortController = new AbortController(); signal.addEventListener('abort', () => abortController.abort()); @@ -884,6 +885,11 @@ The next upload iteration will be delayed.`); return; } + if (!hadSyncLine) { + syncImplementation.triggerCrudUpload(); + hadSyncLine = true; + } + await control(line.command, line.payload); } } finally { diff --git a/packages/common/src/utils/DataStream.ts b/packages/common/src/utils/DataStream.ts index 9dcf1564a..08e506a1c 100644 --- a/packages/common/src/utils/DataStream.ts +++ b/packages/common/src/utils/DataStream.ts @@ -39,6 +39,7 @@ export class DataStream extends BaseObserver | null; + protected notifyDataAdded: ((_: null) => void) | null; protected logger: ILogger; @@ -90,6 +91,7 @@ export class DataStream extends BaseObserver extends BaseObserver l.highWater?.()); } - return (this.processingPromise = this._processQueue()); + const promise = (this.processingPromise = this._processQueue()); + promise.finally(() => { + return (this.processingPromise = null); + }); + return promise; } /** @@ -178,7 +184,7 @@ export class DataStream extends BaseObserver (this.processingPromise = null)); + await Promise.resolve(); return; } @@ -188,10 +194,13 @@ export class DataStream extends BaseObserver l.lowWater?.()); - } + const dataAdded = new Promise((resolve) => { + this.notifyDataAdded = resolve; + }); - this.processingPromise = null; + await Promise.race([this.iterateAsyncErrored(async (l) => l.lowWater?.()), dataAdded]); + this.notifyDataAdded = null; + } if (this.dataQueue.length) { // Next tick diff --git a/packages/node/tests/sync.test.ts b/packages/node/tests/sync.test.ts index ad1efb255..a7b8edfa5 100644 --- a/packages/node/tests/sync.test.ts +++ b/packages/node/tests/sync.test.ts @@ -424,7 +424,7 @@ function defineSyncTests(impl: SyncClientImplementation) { mockSyncServiceTest('interrupt and defrag', async ({ syncService }) => { let database = await syncService.createDatabase(); - database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP }); + database.connect(new TestConnector(), options); await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1)); syncService.pushLine({ @@ -442,7 +442,7 @@ function defineSyncTests(impl: SyncClientImplementation) { await database.close(); await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(0)); database = await syncService.createDatabase(); - database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP }); + database.connect(new TestConnector(), options); await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1)); // A sync rule deploy could reset buckets, making the new bucket smaller than the existing one. @@ -459,6 +459,39 @@ function defineSyncTests(impl: SyncClientImplementation) { await waitForSyncStatus(database, (s) => s.downloadProgress == null); }); }); + + mockSyncServiceTest('should upload after connecting', async ({ syncService }) => { + let database = await syncService.createDatabase(); + + database.execute('INSERT INTO lists (id, name) values (uuid(), ?)', ['local write']); + const query = database.watchWithAsyncGenerator('SELECT name FROM lists')[Symbol.asyncIterator](); + let rows = (await query.next()).value.rows._array; + expect(rows).toStrictEqual([{ name: 'local write' }]); + + database.connect(new TestConnector(), options); + await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1)); + + syncService.pushLine({ checkpoint: { last_op_id: '1', write_checkpoint: '1', buckets: [bucket('a', 1)] } }); + syncService.pushLine({ + data: { + bucket: 'a', + data: [ + { + checksum: 0, + op_id: '1', + op: 'PUT', + object_id: '1', + object_type: 'lists', + data: '{"name": "from server"}' + } + ] + } + }); + syncService.pushLine({ checkpoint_complete: { last_op_id: '1' } }); + + rows = (await query.next()).value.rows._array; + expect(rows).toStrictEqual([{ name: 'from server' }]); + }); } function bucket(name: string, count: number, options: { priority: number } = { priority: 3 }): BucketChecksum { diff --git a/packages/node/tests/utils.ts b/packages/node/tests/utils.ts index 9eebc48d4..4246f9f76 100644 --- a/packages/node/tests/utils.ts +++ b/packages/node/tests/utils.ts @@ -116,6 +116,13 @@ export const mockSyncServiceTest = tempDirectoryTest.extend<{ }); return new Response(syncLines.pipeThrough(asLines) as any, { status: 200 }); + } else if (request.url.indexOf('/write-checkpoint2.json') != -1) { + return new Response( + JSON.stringify({ + data: { write_checkpoint: '1' } + }), + { status: 200 } + ); } else { return new Response('Not found', { status: 404 }); } From ccf36f79c3a259d0d23e6406866919d4f9c56ece Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 30 Jun 2025 17:00:14 -0600 Subject: [PATCH 2/5] Remove duplicate upload logic --- packages/common/src/client/ConnectionManager.ts | 1 - .../sync/stream/AbstractStreamingSyncImplementation.ts | 5 +++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/common/src/client/ConnectionManager.ts b/packages/common/src/client/ConnectionManager.ts index 1ade81c36..48900e624 100644 --- a/packages/common/src/client/ConnectionManager.ts +++ b/packages/common/src/client/ConnectionManager.ts @@ -194,7 +194,6 @@ export class ConnectionManager extends BaseObserver { this.logger.debug('Attempting to connect to PowerSync instance'); await this.syncStreamImplementation?.connect(appliedOptions!); - this.syncStreamImplementation?.triggerCrudUpload(); } /** diff --git a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts index ed39b7bdf..bf295adce 100644 --- a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts +++ b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts @@ -885,12 +885,12 @@ The next upload iteration will be delayed.`); return; } + await control(line.command, line.payload); + if (!hadSyncLine) { syncImplementation.triggerCrudUpload(); hadSyncLine = true; } - - await control(line.command, line.payload); } } finally { const activeInstructions = controlInvocations; @@ -999,6 +999,7 @@ The next upload iteration will be delayed.`); ); this.notifyCompletedUploads = () => { + console.log('notify completed'); controlInvocations?.enqueueData({ command: PowerSyncControlCommand.NOTIFY_CRUD_UPLOAD_COMPLETED }); }; await receivingLines; From 7dcaef3138f1f3f9a43beafddb31395efb57bccf Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 1 Jul 2025 08:22:32 -0600 Subject: [PATCH 3/5] Review feedback --- .changeset/light-clocks-hang.md | 3 +++ .../sync/stream/AbstractStreamingSyncImplementation.ts | 1 - packages/common/src/utils/DataStream.ts | 6 +++--- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/.changeset/light-clocks-hang.md b/.changeset/light-clocks-hang.md index fc866ab8e..a6f1ae6f1 100644 --- a/.changeset/light-clocks-hang.md +++ b/.changeset/light-clocks-hang.md @@ -1,5 +1,8 @@ --- '@powersync/common': patch +'@powersync/node': patch +'@powersync/web': patch +'@powersync/react-native': patch --- Rust client: Properly upload CRUD entries made while offline. diff --git a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts index bf295adce..9ea24685f 100644 --- a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts +++ b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts @@ -999,7 +999,6 @@ The next upload iteration will be delayed.`); ); this.notifyCompletedUploads = () => { - console.log('notify completed'); controlInvocations?.enqueueData({ command: PowerSyncControlCommand.NOTIFY_CRUD_UPLOAD_COMPLETED }); }; await receivingLines; diff --git a/packages/common/src/utils/DataStream.ts b/packages/common/src/utils/DataStream.ts index 08e506a1c..35b93c7e1 100644 --- a/packages/common/src/utils/DataStream.ts +++ b/packages/common/src/utils/DataStream.ts @@ -39,7 +39,7 @@ export class DataStream extends BaseObserver | null; - protected notifyDataAdded: ((_: null) => void) | null; + protected notifyDataAdded: (() => void) | null; protected logger: ILogger; @@ -91,7 +91,7 @@ export class DataStream extends BaseObserver extends BaseObserver { + const dataAdded = new Promise((resolve) => { this.notifyDataAdded = resolve; }); From 10fe492d69cb97c604f4115d1bef5b7dc3dfa01c Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 1 Jul 2025 09:30:49 -0600 Subject: [PATCH 4/5] Deflake node test --- packages/node/tests/sync.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/node/tests/sync.test.ts b/packages/node/tests/sync.test.ts index a7b8edfa5..368434c39 100644 --- a/packages/node/tests/sync.test.ts +++ b/packages/node/tests/sync.test.ts @@ -463,7 +463,7 @@ function defineSyncTests(impl: SyncClientImplementation) { mockSyncServiceTest('should upload after connecting', async ({ syncService }) => { let database = await syncService.createDatabase(); - database.execute('INSERT INTO lists (id, name) values (uuid(), ?)', ['local write']); + await database.execute('INSERT INTO lists (id, name) values (uuid(), ?)', ['local write']); const query = database.watchWithAsyncGenerator('SELECT name FROM lists')[Symbol.asyncIterator](); let rows = (await query.next()).value.rows._array; expect(rows).toStrictEqual([{ name: 'local write' }]); From 6c35bcce71d85a8e2cd6a11bd25814e793b32113 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 1 Jul 2025 10:06:44 -0600 Subject: [PATCH 5/5] Move more tests into node --- packages/node/tests/sync.test.ts | 103 +++++++++++++++++++++++++ packages/web/tests/stream.test.ts | 122 ------------------------------ 2 files changed, 103 insertions(+), 122 deletions(-) diff --git a/packages/node/tests/sync.test.ts b/packages/node/tests/sync.test.ts index 368434c39..e01f121d7 100644 --- a/packages/node/tests/sync.test.ts +++ b/packages/node/tests/sync.test.ts @@ -492,6 +492,109 @@ function defineSyncTests(impl: SyncClientImplementation) { rows = (await query.next()).value.rows._array; expect(rows).toStrictEqual([{ name: 'from server' }]); }); + + mockSyncServiceTest('should update sync state incrementally', async ({ syncService }) => { + const powersync = await syncService.createDatabase(); + powersync.connect(new TestConnector(), options); + await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1)); + + const buckets: BucketChecksum[] = []; + for (let prio = 0; prio <= 3; prio++) { + buckets.push({ bucket: `prio${prio}`, priority: prio, checksum: 10 + prio }); + } + syncService.pushLine({ + checkpoint: { + last_op_id: '4', + buckets + } + }); + + let operationId = 1; + const addRow = (prio: number) => { + syncService.pushLine({ + data: { + bucket: `prio${prio}`, + data: [ + { + checksum: prio + 10, + data: JSON.stringify({ name: 'row' }), + op: 'PUT', + op_id: (operationId++).toString(), + object_id: `prio${prio}`, + object_type: 'lists' + } + ] + } + }); + }; + + const syncCompleted = vi.fn(); + powersync.waitForFirstSync().then(syncCompleted); + + // Emit partial sync complete for each priority but the last. + for (var prio = 0; prio < 3; prio++) { + const partialSyncCompleted = vi.fn(); + powersync.waitForFirstSync({ priority: prio }).then(partialSyncCompleted); + expect(powersync.currentStatus.statusForPriority(prio).hasSynced).toBe(false); + expect(partialSyncCompleted).not.toHaveBeenCalled(); + expect(syncCompleted).not.toHaveBeenCalled(); + + addRow(prio); + syncService.pushLine({ + partial_checkpoint_complete: { + last_op_id: operationId.toString(), + priority: prio + } + }); + + await powersync.syncStreamImplementation!.waitUntilStatusMatches((status) => { + return status.statusForPriority(prio).hasSynced === true; + }); + await new Promise((r) => setTimeout(r)); + expect(partialSyncCompleted).toHaveBeenCalledOnce(); + + expect(await powersync.getAll('select * from lists')).toHaveLength(prio + 1); + } + + // Then, complete the sync. + addRow(3); + syncService.pushLine({ checkpoint_complete: { last_op_id: operationId.toString() } }); + await vi.waitFor(() => expect(syncCompleted).toHaveBeenCalledOnce(), 500); + expect(await powersync.getAll('select * from lists')).toHaveLength(4); + }); + + mockSyncServiceTest('Should remember sync state', async ({ syncService }) => { + const powersync = await syncService.createDatabase(); + powersync.connect(new TestConnector(), options); + await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1)); + + const buckets: BucketChecksum[] = []; + for (let prio = 0; prio <= 3; prio++) { + buckets.push({ bucket: `prio${prio}`, priority: prio, checksum: 0 }); + } + syncService.pushLine({ + checkpoint: { + last_op_id: '0', + buckets + } + }); + syncService.pushLine({ + partial_checkpoint_complete: { + last_op_id: '0', + priority: 0 + } + }); + + await powersync.waitForFirstSync({ priority: 0 }); + + // Open another database instance. + const another = await syncService.createDatabase(); + await another.init(); + + expect(another.currentStatus.priorityStatusEntries).toHaveLength(1); + expect(another.currentStatus.statusForPriority(0).hasSynced).toBeTruthy(); + await another.waitForFirstSync({ priority: 0 }); + }); } function bucket(name: string, count: number, options: { priority: number } = { priority: 3 }): BucketChecksum { diff --git a/packages/web/tests/stream.test.ts b/packages/web/tests/stream.test.ts index fec4a70da..08e1e86de 100644 --- a/packages/web/tests/stream.test.ts +++ b/packages/web/tests/stream.test.ts @@ -303,127 +303,5 @@ function describeStreamingTests(createConnectedDatabase: () => Promise { - const { powersync, connect, uploadSpy } = await createConnectedDatabase(); - expect(powersync.connected).toBe(true); - - await powersync.disconnect(); - - // Status should update after uploads are completed - await vi.waitFor( - () => { - // to-have-been-called seems to not work after failing a check - expect(powersync.currentStatus.dataFlowStatus.uploading).false; - }, - { - timeout: UPLOAD_TIMEOUT_MS - } - ); - }); - - it('Should update sync state incrementally', async () => { - const { powersync, remote } = await createConnectedDatabase(); - expect(powersync.currentStatus.dataFlowStatus.downloading).toBe(false); - - const buckets: BucketChecksum[] = []; - for (let prio = 0; prio <= 3; prio++) { - buckets.push({ bucket: `prio${prio}`, priority: prio, checksum: 10 + prio }); - } - remote.enqueueLine({ - checkpoint: { - last_op_id: '4', - buckets - } - }); - - let operationId = 1; - const addRow = (prio: number) => { - remote.enqueueLine({ - data: { - bucket: `prio${prio}`, - data: [ - { - checksum: prio + 10, - data: JSON.stringify({ name: 'row' }), - op: 'PUT', - op_id: (operationId++).toString(), - object_id: `prio${prio}`, - object_type: 'users' - } - ] - } - }); - }; - - const syncCompleted = vi.fn(); - powersync.waitForFirstSync().then(syncCompleted); - - // Emit partial sync complete for each priority but the last. - for (var prio = 0; prio < 3; prio++) { - const partialSyncCompleted = vi.fn(); - powersync.waitForFirstSync({ priority: prio }).then(partialSyncCompleted); - expect(powersync.currentStatus.statusForPriority(prio).hasSynced).toBe(false); - expect(partialSyncCompleted).not.toHaveBeenCalled(); - expect(syncCompleted).not.toHaveBeenCalled(); - - addRow(prio); - remote.enqueueLine({ - partial_checkpoint_complete: { - last_op_id: operationId.toString(), - priority: prio - } - }); - - await powersync.syncStreamImplementation!.waitUntilStatusMatches((status) => { - return status.statusForPriority(prio).hasSynced === true; - }); - await new Promise((r) => setTimeout(r)); - expect(partialSyncCompleted).toHaveBeenCalledOnce(); - - expect(await powersync.getAll('select * from users')).toHaveLength(prio + 1); - } - - // Then, complete the sync. - addRow(3); - remote.enqueueLine({ checkpoint_complete: { last_op_id: operationId.toString() } }); - await vi.waitFor(() => expect(syncCompleted).toHaveBeenCalledOnce(), 500); - expect(await powersync.getAll('select * from users')).toHaveLength(4); - }); - - it('Should remember sync state', async () => { - const { powersync, remote, openAnother } = await createConnectedDatabase(); - expect(powersync.currentStatus.dataFlowStatus.downloading).toBe(false); - - const buckets: BucketChecksum[] = []; - for (let prio = 0; prio <= 3; prio++) { - buckets.push({ bucket: `prio${prio}`, priority: prio, checksum: 0 }); - } - remote.enqueueLine({ - checkpoint: { - last_op_id: '0', - buckets - } - }); - remote.enqueueLine({ - partial_checkpoint_complete: { - last_op_id: '0', - priority: 0 - } - }); - - await powersync.waitForFirstSync({ priority: 0 }); - - // Open another database instance. - const another = openAnother(); - onTestFinished(async () => { - await another.close(); - }); - await another.init(); - - expect(another.currentStatus.priorityStatusEntries).toHaveLength(1); - expect(another.currentStatus.statusForPriority(0).hasSynced).toBeTruthy(); - await another.waitForFirstSync({ priority: 0 }); - }); }; }