Skip to content

Commit 6469c23

Browse files
committed
Rust: Trigger CRUD upload on connect
1 parent 8a9a5dc commit 6469c23

File tree

5 files changed

+67
-7
lines changed

5 files changed

+67
-7
lines changed

.changeset/light-clocks-hang.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/common': patch
3+
---
4+
5+
Rust client: Properly upload CRUD entries made while offline.

packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -840,6 +840,7 @@ The next upload iteration will be delayed.`);
840840
const adapter = this.options.adapter;
841841
const remote = this.options.remote;
842842
let receivingLines: Promise<void> | null = null;
843+
let hadSyncLine = false;
843844

844845
const abortController = new AbortController();
845846
signal.addEventListener('abort', () => abortController.abort());
@@ -884,6 +885,11 @@ The next upload iteration will be delayed.`);
884885
return;
885886
}
886887

888+
if (!hadSyncLine) {
889+
syncImplementation.triggerCrudUpload();
890+
hadSyncLine = true;
891+
}
892+
887893
await control(line.command, line.payload);
888894
}
889895
} finally {

packages/common/src/utils/DataStream.ts

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ export class DataStream<Data extends any = any> extends BaseObserver<DataStreamL
3939
protected isClosed: boolean;
4040

4141
protected processingPromise: Promise<void> | null;
42+
protected notifyDataAdded: ((_: null) => void) | null;
4243

4344
protected logger: ILogger;
4445

@@ -90,6 +91,7 @@ export class DataStream<Data extends any = any> extends BaseObserver<DataStreamL
9091
}
9192

9293
this.dataQueue.push(data);
94+
this.notifyDataAdded?.(null);
9395

9496
this.processQueue();
9597
}
@@ -151,7 +153,11 @@ export class DataStream<Data extends any = any> extends BaseObserver<DataStreamL
151153
await this.iterateAsyncErrored(async (l) => l.highWater?.());
152154
}
153155

154-
return (this.processingPromise = this._processQueue());
156+
const promise = (this.processingPromise = this._processQueue());
157+
promise.finally(() => {
158+
return (this.processingPromise = null);
159+
});
160+
return promise;
155161
}
156162

157163
/**
@@ -178,7 +184,7 @@ export class DataStream<Data extends any = any> extends BaseObserver<DataStreamL
178184

179185
protected async _processQueue() {
180186
if (this.isClosed || !this.hasDataReader()) {
181-
Promise.resolve().then(() => (this.processingPromise = null));
187+
await Promise.resolve();
182188
return;
183189
}
184190

@@ -188,10 +194,13 @@ export class DataStream<Data extends any = any> extends BaseObserver<DataStreamL
188194
}
189195

190196
if (this.dataQueue.length <= this.lowWatermark) {
191-
await this.iterateAsyncErrored(async (l) => l.lowWater?.());
192-
}
197+
const dataAdded = new Promise((resolve) => {
198+
this.notifyDataAdded = resolve;
199+
});
193200

194-
this.processingPromise = null;
201+
await Promise.race([this.iterateAsyncErrored(async (l) => l.lowWater?.()), dataAdded]);
202+
this.notifyDataAdded = null;
203+
}
195204

196205
if (this.dataQueue.length) {
197206
// Next tick

packages/node/tests/sync.test.ts

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,7 @@ function defineSyncTests(impl: SyncClientImplementation) {
401401

402402
mockSyncServiceTest('interrupt and defrag', async ({ syncService }) => {
403403
let database = await syncService.createDatabase();
404-
database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP });
404+
database.connect(new TestConnector(), options);
405405
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));
406406

407407
syncService.pushLine({
@@ -419,7 +419,7 @@ function defineSyncTests(impl: SyncClientImplementation) {
419419
await database.close();
420420
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(0));
421421
database = await syncService.createDatabase();
422-
database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP });
422+
database.connect(new TestConnector(), options);
423423
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));
424424

425425
// A sync rule deploy could reset buckets, making the new bucket smaller than the existing one.
@@ -436,6 +436,39 @@ function defineSyncTests(impl: SyncClientImplementation) {
436436
await waitForSyncStatus(database, (s) => s.downloadProgress == null);
437437
});
438438
});
439+
440+
mockSyncServiceTest('should upload after connecting', async ({ syncService }) => {
441+
let database = await syncService.createDatabase();
442+
443+
database.execute('INSERT INTO lists (id, name) values (uuid(), ?)', ['local write']);
444+
const query = database.watchWithAsyncGenerator('SELECT name FROM lists')[Symbol.asyncIterator]();
445+
let rows = (await query.next()).value.rows._array;
446+
expect(rows).toStrictEqual([{ name: 'local write' }]);
447+
448+
database.connect(new TestConnector(), options);
449+
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));
450+
451+
syncService.pushLine({ checkpoint: { last_op_id: '1', write_checkpoint: '1', buckets: [bucket('a', 1)] } });
452+
syncService.pushLine({
453+
data: {
454+
bucket: 'a',
455+
data: [
456+
{
457+
checksum: 0,
458+
op_id: '1',
459+
op: 'PUT',
460+
object_id: '1',
461+
object_type: 'lists',
462+
data: '{"name": "from server"}'
463+
}
464+
]
465+
}
466+
});
467+
syncService.pushLine({ checkpoint_complete: { last_op_id: '1' } });
468+
469+
rows = (await query.next()).value.rows._array;
470+
expect(rows).toStrictEqual([{ name: 'from server' }]);
471+
});
439472
}
440473

441474
function bucket(name: string, count: number, options: { priority: number } = { priority: 3 }): BucketChecksum {

packages/node/tests/utils.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,13 @@ export const mockSyncServiceTest = tempDirectoryTest.extend<{
116116
});
117117

118118
return new Response(syncLines.pipeThrough(asLines) as any, { status: 200 });
119+
} else if (request.url.indexOf('/write-checkpoint2.json') != -1) {
120+
return new Response(
121+
JSON.stringify({
122+
data: { write_checkpoint: '1' }
123+
}),
124+
{ status: 200 }
125+
);
119126
} else {
120127
return new Response('Not found', { status: 404 });
121128
}

0 commit comments

Comments
 (0)