Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/light-clocks-hang.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
'@powersync/common': patch
'@powersync/node': patch
'@powersync/web': patch
'@powersync/react-native': patch
---

Rust client: Properly upload CRUD entries made while offline.
1 change: 0 additions & 1 deletion packages/common/src/client/ConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,6 @@ export class ConnectionManager extends BaseObserver<ConnectionManagerListener> {

this.logger.debug('Attempting to connect to PowerSync instance');
await this.syncStreamImplementation?.connect(appliedOptions!);
this.syncStreamImplementation?.triggerCrudUpload();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,7 @@ The next upload iteration will be delayed.`);
const adapter = this.options.adapter;
const remote = this.options.remote;
let receivingLines: Promise<void> | null = null;
let hadSyncLine = false;

const abortController = new AbortController();
signal.addEventListener('abort', () => abortController.abort());
Expand Down Expand Up @@ -885,6 +886,11 @@ The next upload iteration will be delayed.`);
}

await control(line.command, line.payload);

if (!hadSyncLine) {
syncImplementation.triggerCrudUpload();
hadSyncLine = true;
}
}
} finally {
const activeInstructions = controlInvocations;
Expand Down
19 changes: 14 additions & 5 deletions packages/common/src/utils/DataStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ export class DataStream<Data extends any = any> extends BaseObserver<DataStreamL
protected isClosed: boolean;

protected processingPromise: Promise<void> | null;
protected notifyDataAdded: (() => void) | null;

protected logger: ILogger;

Expand Down Expand Up @@ -90,6 +91,7 @@ export class DataStream<Data extends any = any> extends BaseObserver<DataStreamL
}

this.dataQueue.push(data);
this.notifyDataAdded?.();

this.processQueue();
}
Expand Down Expand Up @@ -151,7 +153,11 @@ export class DataStream<Data extends any = any> extends BaseObserver<DataStreamL
await this.iterateAsyncErrored(async (l) => l.highWater?.());
}

return (this.processingPromise = this._processQueue());
const promise = (this.processingPromise = this._processQueue());
promise.finally(() => {
return (this.processingPromise = null);
});
return promise;
}

/**
Expand All @@ -178,7 +184,7 @@ export class DataStream<Data extends any = any> extends BaseObserver<DataStreamL

protected async _processQueue() {
if (this.isClosed || !this.hasDataReader()) {
Promise.resolve().then(() => (this.processingPromise = null));
await Promise.resolve();
return;
}

Expand All @@ -188,10 +194,13 @@ export class DataStream<Data extends any = any> extends BaseObserver<DataStreamL
}

if (this.dataQueue.length <= this.lowWatermark) {
await this.iterateAsyncErrored(async (l) => l.lowWater?.());
}
const dataAdded = new Promise<void>((resolve) => {
this.notifyDataAdded = resolve;
});

this.processingPromise = null;
await Promise.race([this.iterateAsyncErrored(async (l) => l.lowWater?.()), dataAdded]);
this.notifyDataAdded = null;
}

if (this.dataQueue.length) {
// Next tick
Expand Down
140 changes: 138 additions & 2 deletions packages/node/tests/sync.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ function defineSyncTests(impl: SyncClientImplementation) {

mockSyncServiceTest('interrupt and defrag', async ({ syncService }) => {
let database = await syncService.createDatabase();
database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP });
database.connect(new TestConnector(), options);
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));

syncService.pushLine({
Expand All @@ -442,7 +442,7 @@ function defineSyncTests(impl: SyncClientImplementation) {
await database.close();
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(0));
database = await syncService.createDatabase();
database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP });
database.connect(new TestConnector(), options);
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));

// A sync rule deploy could reset buckets, making the new bucket smaller than the existing one.
Expand All @@ -459,6 +459,142 @@ function defineSyncTests(impl: SyncClientImplementation) {
await waitForSyncStatus(database, (s) => s.downloadProgress == null);
});
});

mockSyncServiceTest('should upload after connecting', async ({ syncService }) => {
let database = await syncService.createDatabase();

await database.execute('INSERT INTO lists (id, name) values (uuid(), ?)', ['local write']);
const query = database.watchWithAsyncGenerator('SELECT name FROM lists')[Symbol.asyncIterator]();
let rows = (await query.next()).value.rows._array;
expect(rows).toStrictEqual([{ name: 'local write' }]);

database.connect(new TestConnector(), options);
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));

syncService.pushLine({ checkpoint: { last_op_id: '1', write_checkpoint: '1', buckets: [bucket('a', 1)] } });
syncService.pushLine({
data: {
bucket: 'a',
data: [
{
checksum: 0,
op_id: '1',
op: 'PUT',
object_id: '1',
object_type: 'lists',
data: '{"name": "from server"}'
}
]
}
});
syncService.pushLine({ checkpoint_complete: { last_op_id: '1' } });

rows = (await query.next()).value.rows._array;
expect(rows).toStrictEqual([{ name: 'from server' }]);
});

mockSyncServiceTest('should update sync state incrementally', async ({ syncService }) => {
const powersync = await syncService.createDatabase();
powersync.connect(new TestConnector(), options);
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));

const buckets: BucketChecksum[] = [];
for (let prio = 0; prio <= 3; prio++) {
buckets.push({ bucket: `prio${prio}`, priority: prio, checksum: 10 + prio });
}
syncService.pushLine({
checkpoint: {
last_op_id: '4',
buckets
}
});

let operationId = 1;
const addRow = (prio: number) => {
syncService.pushLine({
data: {
bucket: `prio${prio}`,
data: [
{
checksum: prio + 10,
data: JSON.stringify({ name: 'row' }),
op: 'PUT',
op_id: (operationId++).toString(),
object_id: `prio${prio}`,
object_type: 'lists'
}
]
}
});
};

const syncCompleted = vi.fn();
powersync.waitForFirstSync().then(syncCompleted);

// Emit partial sync complete for each priority but the last.
for (var prio = 0; prio < 3; prio++) {
const partialSyncCompleted = vi.fn();
powersync.waitForFirstSync({ priority: prio }).then(partialSyncCompleted);
expect(powersync.currentStatus.statusForPriority(prio).hasSynced).toBe(false);
expect(partialSyncCompleted).not.toHaveBeenCalled();
expect(syncCompleted).not.toHaveBeenCalled();

addRow(prio);
syncService.pushLine({
partial_checkpoint_complete: {
last_op_id: operationId.toString(),
priority: prio
}
});

await powersync.syncStreamImplementation!.waitUntilStatusMatches((status) => {
return status.statusForPriority(prio).hasSynced === true;
});
await new Promise((r) => setTimeout(r));
expect(partialSyncCompleted).toHaveBeenCalledOnce();

expect(await powersync.getAll('select * from lists')).toHaveLength(prio + 1);
}

// Then, complete the sync.
addRow(3);
syncService.pushLine({ checkpoint_complete: { last_op_id: operationId.toString() } });
await vi.waitFor(() => expect(syncCompleted).toHaveBeenCalledOnce(), 500);
expect(await powersync.getAll('select * from lists')).toHaveLength(4);
});

mockSyncServiceTest('Should remember sync state', async ({ syncService }) => {
const powersync = await syncService.createDatabase();
powersync.connect(new TestConnector(), options);
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));

const buckets: BucketChecksum[] = [];
for (let prio = 0; prio <= 3; prio++) {
buckets.push({ bucket: `prio${prio}`, priority: prio, checksum: 0 });
}
syncService.pushLine({
checkpoint: {
last_op_id: '0',
buckets
}
});
syncService.pushLine({
partial_checkpoint_complete: {
last_op_id: '0',
priority: 0
}
});

await powersync.waitForFirstSync({ priority: 0 });

// Open another database instance.
const another = await syncService.createDatabase();
await another.init();

expect(another.currentStatus.priorityStatusEntries).toHaveLength(1);
expect(another.currentStatus.statusForPriority(0).hasSynced).toBeTruthy();
await another.waitForFirstSync({ priority: 0 });
});
}

function bucket(name: string, count: number, options: { priority: number } = { priority: 3 }): BucketChecksum {
Expand Down
7 changes: 7 additions & 0 deletions packages/node/tests/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,13 @@ export const mockSyncServiceTest = tempDirectoryTest.extend<{
});

return new Response(syncLines.pipeThrough(asLines) as any, { status: 200 });
} else if (request.url.indexOf('/write-checkpoint2.json') != -1) {
return new Response(
JSON.stringify({
data: { write_checkpoint: '1' }
}),
{ status: 200 }
);
} else {
return new Response('Not found', { status: 404 });
}
Expand Down
122 changes: 0 additions & 122 deletions packages/web/tests/stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -303,127 +303,5 @@ function describeStreamingTests(createConnectedDatabase: () => Promise<Connected
}
);
});

it('Should upload after reconnecting', async () => {
const { powersync, connect, uploadSpy } = await createConnectedDatabase();
expect(powersync.connected).toBe(true);

await powersync.disconnect();

// Status should update after uploads are completed
await vi.waitFor(
() => {
// to-have-been-called seems to not work after failing a check
expect(powersync.currentStatus.dataFlowStatus.uploading).false;
},
{
timeout: UPLOAD_TIMEOUT_MS
}
);
});

it('Should update sync state incrementally', async () => {
const { powersync, remote } = await createConnectedDatabase();
expect(powersync.currentStatus.dataFlowStatus.downloading).toBe(false);

const buckets: BucketChecksum[] = [];
for (let prio = 0; prio <= 3; prio++) {
buckets.push({ bucket: `prio${prio}`, priority: prio, checksum: 10 + prio });
}
remote.enqueueLine({
checkpoint: {
last_op_id: '4',
buckets
}
});

let operationId = 1;
const addRow = (prio: number) => {
remote.enqueueLine({
data: {
bucket: `prio${prio}`,
data: [
{
checksum: prio + 10,
data: JSON.stringify({ name: 'row' }),
op: 'PUT',
op_id: (operationId++).toString(),
object_id: `prio${prio}`,
object_type: 'users'
}
]
}
});
};

const syncCompleted = vi.fn();
powersync.waitForFirstSync().then(syncCompleted);

// Emit partial sync complete for each priority but the last.
for (var prio = 0; prio < 3; prio++) {
const partialSyncCompleted = vi.fn();
powersync.waitForFirstSync({ priority: prio }).then(partialSyncCompleted);
expect(powersync.currentStatus.statusForPriority(prio).hasSynced).toBe(false);
expect(partialSyncCompleted).not.toHaveBeenCalled();
expect(syncCompleted).not.toHaveBeenCalled();

addRow(prio);
remote.enqueueLine({
partial_checkpoint_complete: {
last_op_id: operationId.toString(),
priority: prio
}
});

await powersync.syncStreamImplementation!.waitUntilStatusMatches((status) => {
return status.statusForPriority(prio).hasSynced === true;
});
await new Promise((r) => setTimeout(r));
expect(partialSyncCompleted).toHaveBeenCalledOnce();

expect(await powersync.getAll('select * from users')).toHaveLength(prio + 1);
}

// Then, complete the sync.
addRow(3);
remote.enqueueLine({ checkpoint_complete: { last_op_id: operationId.toString() } });
await vi.waitFor(() => expect(syncCompleted).toHaveBeenCalledOnce(), 500);
expect(await powersync.getAll('select * from users')).toHaveLength(4);
});

it('Should remember sync state', async () => {
const { powersync, remote, openAnother } = await createConnectedDatabase();
expect(powersync.currentStatus.dataFlowStatus.downloading).toBe(false);

const buckets: BucketChecksum[] = [];
for (let prio = 0; prio <= 3; prio++) {
buckets.push({ bucket: `prio${prio}`, priority: prio, checksum: 0 });
}
remote.enqueueLine({
checkpoint: {
last_op_id: '0',
buckets
}
});
remote.enqueueLine({
partial_checkpoint_complete: {
last_op_id: '0',
priority: 0
}
});

await powersync.waitForFirstSync({ priority: 0 });

// Open another database instance.
const another = openAnother();
onTestFinished(async () => {
await another.close();
});
await another.init();

expect(another.currentStatus.priorityStatusEntries).toHaveLength(1);
expect(another.currentStatus.statusForPriority(0).hasSynced).toBeTruthy();
await another.waitForFirstSync({ priority: 0 });
});
};
}