diff --git a/.changeset/polite-news-sneeze.md b/.changeset/polite-news-sneeze.md new file mode 100644 index 000000000..2d71b1d7f --- /dev/null +++ b/.changeset/polite-news-sneeze.md @@ -0,0 +1,5 @@ +--- +'@powersync/common': patch +--- + +Fix applying bucket state around partial syncs. diff --git a/packages/common/src/client/sync/bucket/SqliteBucketStorage.ts b/packages/common/src/client/sync/bucket/SqliteBucketStorage.ts index cd6686a77..d626f06d8 100644 --- a/packages/common/src/client/sync/bucket/SqliteBucketStorage.ts +++ b/packages/common/src/client/sync/bucket/SqliteBucketStorage.ts @@ -154,9 +154,9 @@ export class SqliteBucketStorage extends BaseObserver imp return { ready: false, checkpointValid: false, checkpointFailures: r.checkpointFailures }; } - const buckets = checkpoint.buckets; + let buckets = checkpoint.buckets; if (priority !== undefined) { - buckets.filter((b) => hasMatchingPriority(priority, b)); + buckets = buckets.filter((b) => hasMatchingPriority(priority, b)); } const bucketNames = buckets.map((b) => b.bucket); await this.writeTransaction(async (tx) => { diff --git a/packages/node/tests/sync.test.ts b/packages/node/tests/sync.test.ts index 4eff5523f..cb4d5a603 100644 --- a/packages/node/tests/sync.test.ts +++ b/packages/node/tests/sync.test.ts @@ -61,7 +61,7 @@ describe('Sync', () => { mockSyncServiceTest('without priorities', async ({ syncService }) => { const database = await syncService.createDatabase(); database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP }); - await vi.waitFor(() => expect(syncService.connectedListeners).toEqual(1)); + await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1)); syncService.pushLine({ checkpoint: { @@ -96,7 +96,7 @@ describe('Sync', () => { mockSyncServiceTest('interrupted sync', async ({ syncService }) => { let database = await syncService.createDatabase(); database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP }); - await vi.waitFor(() => expect(syncService.connectedListeners).toEqual(1)); + await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1)); syncService.pushLine({ checkpoint: { @@ -111,12 +111,12 @@ describe('Sync', () => { // Close this database before sending the checkpoint... await database.close(); - await vi.waitFor(() => expect(syncService.connectedListeners).toEqual(0)); + await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(0)); // And open a new one database = await syncService.createDatabase(); database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP }); - await vi.waitFor(() => expect(syncService.connectedListeners).toEqual(1)); + await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1)); // Send same checkpoint again syncService.pushLine({ @@ -135,7 +135,7 @@ describe('Sync', () => { mockSyncServiceTest('interrupted sync with new checkpoint', async ({ syncService }) => { let database = await syncService.createDatabase(); database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP }); - await vi.waitFor(() => expect(syncService.connectedListeners).toEqual(1)); + await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1)); syncService.pushLine({ checkpoint: { @@ -150,10 +150,10 @@ describe('Sync', () => { // Re-open database await database.close(); - await vi.waitFor(() => expect(syncService.connectedListeners).toEqual(0)); + await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(0)); database = await syncService.createDatabase(); database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP }); - await vi.waitFor(() => expect(syncService.connectedListeners).toEqual(1)); + await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1)); // Send checkpoint with new data syncService.pushLine({ @@ -171,7 +171,7 @@ describe('Sync', () => { mockSyncServiceTest('different priorities', async ({ syncService }) => { let database = await syncService.createDatabase(); database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP }); - await vi.waitFor(() => expect(syncService.connectedListeners).toEqual(1)); + await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1)); syncService.pushLine({ checkpoint: { @@ -219,6 +219,39 @@ describe('Sync', () => { pushCheckpointComplete(syncService); await waitForSyncStatus(database, (s) => s.downloadProgress == null); }); + + mockSyncServiceTest('uses correct state when reconnecting', async ({syncService}) => { + let database = await syncService.createDatabase(); + database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP }); + await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1)); + + syncService.pushLine({ + checkpoint: { + last_op_id: '10', + buckets: [ + bucket('a', 5, {priority: 0}), + bucket('b', 5, {priority: 3}), + ] + } + }); + + // Sync priority 0 completely, start with rest + pushDataLine(syncService, 'a', 5); + pushDataLine(syncService, 'b', 1); + pushCheckpointComplete(syncService, 0); + await database.waitForFirstSync({priority: 0}); + + await database.close(); + await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(0)); + database = await syncService.createDatabase(); + database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP }); + await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1)); + + expect(syncService.connectedListeners[0].buckets).toStrictEqual([ + {"name": "a", "after": "10"}, + {"name": "b", "after": "6"}, + ]); + }); }); }); diff --git a/packages/node/tests/utils.ts b/packages/node/tests/utils.ts index 5c2c28988..6e85a3b9f 100644 --- a/packages/node/tests/utils.ts +++ b/packages/node/tests/utils.ts @@ -4,7 +4,6 @@ import path from 'node:path'; import { onTestFinished, test } from 'vitest'; import { AbstractPowerSyncDatabase, - AbstractRemoteOptions, column, NodePowerSyncDatabaseOptions, PowerSyncBackendConnector, @@ -76,20 +75,30 @@ export const databaseTest = tempDirectoryTest.extend<{ database: PowerSyncDataba // TODO: Unify this with the test setup for the web SDK. export const mockSyncServiceTest = tempDirectoryTest.extend<{ syncService: MockSyncService }>({ syncService: async ({ tmpdir }, use) => { - const listeners: ReadableStreamDefaultController[] = []; + interface Listener { + request: any, + stream: ReadableStreamDefaultController, + } + + const listeners: Listener[] = []; const inMemoryFetch: typeof fetch = async (info, init?) => { const request = new Request(info, init); if (request.url.endsWith('/sync/stream')) { - let thisController: ReadableStreamDefaultController | null = null; + const body = await request.json(); + let listener: Listener | null = null; const syncLines = new ReadableStream({ start(controller) { - thisController = controller; - listeners.push(controller); + listener = { + request: body, + stream: controller, + }; + + listeners.push(listener); }, cancel() { - listeners.splice(listeners.indexOf(thisController!), 1); + listeners.splice(listeners.indexOf(listener!), 1); } }); @@ -120,11 +129,11 @@ export const mockSyncServiceTest = tempDirectoryTest.extend<{ syncService: MockS await use({ get connectedListeners() { - return listeners.length; + return listeners.map((e) => e.request); }, pushLine(line) { for (const listener of listeners) { - listener.enqueue(line); + listener.stream.enqueue(line); } }, createDatabase: newConnection @@ -134,7 +143,7 @@ export const mockSyncServiceTest = tempDirectoryTest.extend<{ syncService: MockS export interface MockSyncService { pushLine: (line: StreamingSyncLine) => void; - connectedListeners: number; + connectedListeners: any[]; createDatabase: () => Promise; }