Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/polite-news-sneeze.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/common': patch
---

Fix applying bucket state around partial syncs.
4 changes: 2 additions & 2 deletions packages/common/src/client/sync/bucket/SqliteBucketStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,9 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
return { ready: false, checkpointValid: false, checkpointFailures: r.checkpointFailures };
}

const buckets = checkpoint.buckets;
let buckets = checkpoint.buckets;
if (priority !== undefined) {
buckets.filter((b) => hasMatchingPriority(priority, b));
buckets = buckets.filter((b) => hasMatchingPriority(priority, b));
}
const bucketNames = buckets.map((b) => b.bucket);
await this.writeTransaction(async (tx) => {
Expand Down
49 changes: 41 additions & 8 deletions packages/node/tests/sync.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ describe('Sync', () => {
mockSyncServiceTest('without priorities', async ({ syncService }) => {
const database = await syncService.createDatabase();
database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP });
await vi.waitFor(() => expect(syncService.connectedListeners).toEqual(1));
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));

syncService.pushLine({
checkpoint: {
Expand Down Expand Up @@ -96,7 +96,7 @@ describe('Sync', () => {
mockSyncServiceTest('interrupted sync', async ({ syncService }) => {
let database = await syncService.createDatabase();
database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP });
await vi.waitFor(() => expect(syncService.connectedListeners).toEqual(1));
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));

syncService.pushLine({
checkpoint: {
Expand All @@ -111,12 +111,12 @@ describe('Sync', () => {

// Close this database before sending the checkpoint...
await database.close();
await vi.waitFor(() => expect(syncService.connectedListeners).toEqual(0));
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(0));

// And open a new one
database = await syncService.createDatabase();
database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP });
await vi.waitFor(() => expect(syncService.connectedListeners).toEqual(1));
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));

// Send same checkpoint again
syncService.pushLine({
Expand All @@ -135,7 +135,7 @@ describe('Sync', () => {
mockSyncServiceTest('interrupted sync with new checkpoint', async ({ syncService }) => {
let database = await syncService.createDatabase();
database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP });
await vi.waitFor(() => expect(syncService.connectedListeners).toEqual(1));
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));

syncService.pushLine({
checkpoint: {
Expand All @@ -150,10 +150,10 @@ describe('Sync', () => {

// Re-open database
await database.close();
await vi.waitFor(() => expect(syncService.connectedListeners).toEqual(0));
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(0));
database = await syncService.createDatabase();
database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP });
await vi.waitFor(() => expect(syncService.connectedListeners).toEqual(1));
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));

// Send checkpoint with new data
syncService.pushLine({
Expand All @@ -171,7 +171,7 @@ describe('Sync', () => {
mockSyncServiceTest('different priorities', async ({ syncService }) => {
let database = await syncService.createDatabase();
database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP });
await vi.waitFor(() => expect(syncService.connectedListeners).toEqual(1));
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));

syncService.pushLine({
checkpoint: {
Expand Down Expand Up @@ -219,6 +219,39 @@ describe('Sync', () => {
pushCheckpointComplete(syncService);
await waitForSyncStatus(database, (s) => s.downloadProgress == null);
});

mockSyncServiceTest('uses correct state when reconnecting', async ({syncService}) => {
let database = await syncService.createDatabase();
database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP });
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));

syncService.pushLine({
checkpoint: {
last_op_id: '10',
buckets: [
bucket('a', 5, {priority: 0}),
bucket('b', 5, {priority: 3}),
]
}
});

// Sync priority 0 completely, start with rest
pushDataLine(syncService, 'a', 5);
pushDataLine(syncService, 'b', 1);
pushCheckpointComplete(syncService, 0);
await database.waitForFirstSync({priority: 0});

await database.close();
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(0));
database = await syncService.createDatabase();
database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP });
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));

expect(syncService.connectedListeners[0].buckets).toStrictEqual([
{"name": "a", "after": "10"},
{"name": "b", "after": "6"},
]);
});
});
});

Expand Down
27 changes: 18 additions & 9 deletions packages/node/tests/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import path from 'node:path';
import { onTestFinished, test } from 'vitest';
import {
AbstractPowerSyncDatabase,
AbstractRemoteOptions,
column,
NodePowerSyncDatabaseOptions,
PowerSyncBackendConnector,
Expand Down Expand Up @@ -76,20 +75,30 @@ export const databaseTest = tempDirectoryTest.extend<{ database: PowerSyncDataba
// TODO: Unify this with the test setup for the web SDK.
export const mockSyncServiceTest = tempDirectoryTest.extend<{ syncService: MockSyncService }>({
syncService: async ({ tmpdir }, use) => {
const listeners: ReadableStreamDefaultController<StreamingSyncLine>[] = [];
interface Listener {
request: any,
stream: ReadableStreamDefaultController<StreamingSyncLine>,
}

const listeners: Listener[] = [];

const inMemoryFetch: typeof fetch = async (info, init?) => {
const request = new Request(info, init);
if (request.url.endsWith('/sync/stream')) {
let thisController: ReadableStreamDefaultController<StreamingSyncLine> | null = null;
const body = await request.json();
let listener: Listener | null = null;

const syncLines = new ReadableStream<StreamingSyncLine>({
start(controller) {
thisController = controller;
listeners.push(controller);
listener = {
request: body,
stream: controller,
};

listeners.push(listener);
},
cancel() {
listeners.splice(listeners.indexOf(thisController!), 1);
listeners.splice(listeners.indexOf(listener!), 1);
}
});

Expand Down Expand Up @@ -120,11 +129,11 @@ export const mockSyncServiceTest = tempDirectoryTest.extend<{ syncService: MockS

await use({
get connectedListeners() {
return listeners.length;
return listeners.map((e) => e.request);
},
pushLine(line) {
for (const listener of listeners) {
listener.enqueue(line);
listener.stream.enqueue(line);
}
},
createDatabase: newConnection
Expand All @@ -134,7 +143,7 @@ export const mockSyncServiceTest = tempDirectoryTest.extend<{ syncService: MockS

export interface MockSyncService {
pushLine: (line: StreamingSyncLine) => void;
connectedListeners: number;
connectedListeners: any[];
createDatabase: () => Promise<PowerSyncDatabase>;
}

Expand Down