diff --git a/.changeset/short-flowers-pump.md b/.changeset/short-flowers-pump.md new file mode 100644 index 000000000..5c23e5274 --- /dev/null +++ b/.changeset/short-flowers-pump.md @@ -0,0 +1,5 @@ +--- +'@powersync/common': patch +--- + +Fix warning when reconnecting during CRUD uploads and using the Rust client. diff --git a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts index 669441df3..d5e83691d 100644 --- a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts +++ b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts @@ -1071,7 +1071,9 @@ The next upload iteration will be delayed.`); await control(PowerSyncControlCommand.START, JSON.stringify(options)); this.notifyCompletedUploads = () => { - controlInvocations?.enqueueData({ command: PowerSyncControlCommand.NOTIFY_CRUD_UPLOAD_COMPLETED }); + if (controlInvocations && !controlInvocations?.closed) { + controlInvocations.enqueueData({ command: PowerSyncControlCommand.NOTIFY_CRUD_UPLOAD_COMPLETED }); + } }; await receivingLines; } finally { diff --git a/packages/node/tests/sync.test.ts b/packages/node/tests/sync.test.ts index 561b81cf8..c63163a6f 100644 --- a/packages/node/tests/sync.test.ts +++ b/packages/node/tests/sync.test.ts @@ -849,6 +849,80 @@ function defineSyncTests(impl: SyncClientImplementation) { ); }); } + + mockSyncServiceTest('can reconnect based on query changes', async ({ syncService }) => { + // Test for https://discord.com/channels/1138230179878154300/1399340612435710034/1399340612435710034 + const logger = createLogger('test', { logLevel: Logger.TRACE }); + const logMessages: string[] = []; + (logger as any).invoke = (level, args) => { + console.log(...args); + logMessages.push(util.format(...args)); + }; + + const powersync = await syncService.createDatabase({ logger }); + powersync.watchWithCallback('SELECT * FROM lists', [], { + onResult(results) { + const param = results.rows?.length ?? 0; + + powersync.connect(new TestConnector(), { ...options, params: { a: param } }); + } + }); + + await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1)); + expect(syncService.connectedListeners[0]).toMatchObject({ + parameters: { a: 0 } + }); + + await powersync.execute('insert into lists (id, name) values (?, ?);', ['local_list', 'local']); + + await vi.waitFor(() => + expect(syncService.connectedListeners[0]).toMatchObject({ + parameters: { a: 1 } + }) + ); + + syncService.pushLine({ + checkpoint: { + write_checkpoint: '1', + last_op_id: '1', + buckets: [bucket('a', 1)] + } + }); + syncService.pushLine({ + data: { + bucket: 'a', + data: [ + { + checksum: 0, + op_id: '1', + op: 'PUT', + object_id: 'local_list', + object_type: 'lists', + data: '{"name": "local"}' + }, + { + checksum: 0, + op_id: '2', + op: 'PUT', + object_id: 'my_list', + object_type: 'lists', + data: '{"name": "r"}' + } + ] + } + }); + syncService.pushLine({ checkpoint_complete: { last_op_id: '1' } }); + + await vi.waitFor(() => + expect(syncService.connectedListeners[0]).toMatchObject({ + parameters: { a: 2 } + }) + ); + + expect(logMessages).not.toEqual( + expect.arrayContaining([expect.stringContaining('Cannot enqueue data into closed stream')]) + ); + }); } function bucket(name: string, count: number, options: { priority: number } = { priority: 3 }): BucketChecksum {