Skip to content

Commit 335b04d

Browse files
authored
Merge branch 'main' into undici-errors
2 parents 547029b + 3590864 commit 335b04d

File tree

7 files changed

+173
-130
lines changed

7 files changed

+173
-130
lines changed

.changeset/light-clocks-hang.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
'@powersync/common': patch
3+
'@powersync/node': patch
4+
'@powersync/web': patch
5+
'@powersync/react-native': patch
6+
---
7+
8+
Rust client: Properly upload CRUD entries made while offline.

packages/common/src/client/ConnectionManager.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,6 @@ export class ConnectionManager extends BaseObserver<ConnectionManagerListener> {
194194

195195
this.logger.debug('Attempting to connect to PowerSync instance');
196196
await this.syncStreamImplementation?.connect(appliedOptions!);
197-
this.syncStreamImplementation?.triggerCrudUpload();
198197
}
199198

200199
/**

packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -840,6 +840,7 @@ The next upload iteration will be delayed.`);
840840
const adapter = this.options.adapter;
841841
const remote = this.options.remote;
842842
let receivingLines: Promise<void> | null = null;
843+
let hadSyncLine = false;
843844

844845
const abortController = new AbortController();
845846
signal.addEventListener('abort', () => abortController.abort());
@@ -885,6 +886,11 @@ The next upload iteration will be delayed.`);
885886
}
886887

887888
await control(line.command, line.payload);
889+
890+
if (!hadSyncLine) {
891+
syncImplementation.triggerCrudUpload();
892+
hadSyncLine = true;
893+
}
888894
}
889895
} finally {
890896
const activeInstructions = controlInvocations;

packages/common/src/utils/DataStream.ts

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ export class DataStream<Data extends any = any> extends BaseObserver<DataStreamL
3939
protected isClosed: boolean;
4040

4141
protected processingPromise: Promise<void> | null;
42+
protected notifyDataAdded: (() => void) | null;
4243

4344
protected logger: ILogger;
4445

@@ -90,6 +91,7 @@ export class DataStream<Data extends any = any> extends BaseObserver<DataStreamL
9091
}
9192

9293
this.dataQueue.push(data);
94+
this.notifyDataAdded?.();
9395

9496
this.processQueue();
9597
}
@@ -151,7 +153,11 @@ export class DataStream<Data extends any = any> extends BaseObserver<DataStreamL
151153
await this.iterateAsyncErrored(async (l) => l.highWater?.());
152154
}
153155

154-
return (this.processingPromise = this._processQueue());
156+
const promise = (this.processingPromise = this._processQueue());
157+
promise.finally(() => {
158+
return (this.processingPromise = null);
159+
});
160+
return promise;
155161
}
156162

157163
/**
@@ -178,7 +184,7 @@ export class DataStream<Data extends any = any> extends BaseObserver<DataStreamL
178184

179185
protected async _processQueue() {
180186
if (this.isClosed || !this.hasDataReader()) {
181-
Promise.resolve().then(() => (this.processingPromise = null));
187+
await Promise.resolve();
182188
return;
183189
}
184190

@@ -188,10 +194,13 @@ export class DataStream<Data extends any = any> extends BaseObserver<DataStreamL
188194
}
189195

190196
if (this.dataQueue.length <= this.lowWatermark) {
191-
await this.iterateAsyncErrored(async (l) => l.lowWater?.());
192-
}
197+
const dataAdded = new Promise<void>((resolve) => {
198+
this.notifyDataAdded = resolve;
199+
});
193200

194-
this.processingPromise = null;
201+
await Promise.race([this.iterateAsyncErrored(async (l) => l.lowWater?.()), dataAdded]);
202+
this.notifyDataAdded = null;
203+
}
195204

196205
if (this.dataQueue.length) {
197206
// Next tick

packages/node/tests/sync.test.ts

Lines changed: 138 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,7 @@ function defineSyncTests(impl: SyncClientImplementation) {
424424

425425
mockSyncServiceTest('interrupt and defrag', async ({ syncService }) => {
426426
let database = await syncService.createDatabase();
427-
database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP });
427+
database.connect(new TestConnector(), options);
428428
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));
429429

430430
syncService.pushLine({
@@ -442,7 +442,7 @@ function defineSyncTests(impl: SyncClientImplementation) {
442442
await database.close();
443443
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(0));
444444
database = await syncService.createDatabase();
445-
database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP });
445+
database.connect(new TestConnector(), options);
446446
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));
447447

448448
// A sync rule deploy could reset buckets, making the new bucket smaller than the existing one.
@@ -459,6 +459,142 @@ function defineSyncTests(impl: SyncClientImplementation) {
459459
await waitForSyncStatus(database, (s) => s.downloadProgress == null);
460460
});
461461
});
462+
463+
mockSyncServiceTest('should upload after connecting', async ({ syncService }) => {
464+
let database = await syncService.createDatabase();
465+
466+
await database.execute('INSERT INTO lists (id, name) values (uuid(), ?)', ['local write']);
467+
const query = database.watchWithAsyncGenerator('SELECT name FROM lists')[Symbol.asyncIterator]();
468+
let rows = (await query.next()).value.rows._array;
469+
expect(rows).toStrictEqual([{ name: 'local write' }]);
470+
471+
database.connect(new TestConnector(), options);
472+
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));
473+
474+
syncService.pushLine({ checkpoint: { last_op_id: '1', write_checkpoint: '1', buckets: [bucket('a', 1)] } });
475+
syncService.pushLine({
476+
data: {
477+
bucket: 'a',
478+
data: [
479+
{
480+
checksum: 0,
481+
op_id: '1',
482+
op: 'PUT',
483+
object_id: '1',
484+
object_type: 'lists',
485+
data: '{"name": "from server"}'
486+
}
487+
]
488+
}
489+
});
490+
syncService.pushLine({ checkpoint_complete: { last_op_id: '1' } });
491+
492+
rows = (await query.next()).value.rows._array;
493+
expect(rows).toStrictEqual([{ name: 'from server' }]);
494+
});
495+
496+
mockSyncServiceTest('should update sync state incrementally', async ({ syncService }) => {
497+
const powersync = await syncService.createDatabase();
498+
powersync.connect(new TestConnector(), options);
499+
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));
500+
501+
const buckets: BucketChecksum[] = [];
502+
for (let prio = 0; prio <= 3; prio++) {
503+
buckets.push({ bucket: `prio${prio}`, priority: prio, checksum: 10 + prio });
504+
}
505+
syncService.pushLine({
506+
checkpoint: {
507+
last_op_id: '4',
508+
buckets
509+
}
510+
});
511+
512+
let operationId = 1;
513+
const addRow = (prio: number) => {
514+
syncService.pushLine({
515+
data: {
516+
bucket: `prio${prio}`,
517+
data: [
518+
{
519+
checksum: prio + 10,
520+
data: JSON.stringify({ name: 'row' }),
521+
op: 'PUT',
522+
op_id: (operationId++).toString(),
523+
object_id: `prio${prio}`,
524+
object_type: 'lists'
525+
}
526+
]
527+
}
528+
});
529+
};
530+
531+
const syncCompleted = vi.fn();
532+
powersync.waitForFirstSync().then(syncCompleted);
533+
534+
// Emit partial sync complete for each priority but the last.
535+
for (var prio = 0; prio < 3; prio++) {
536+
const partialSyncCompleted = vi.fn();
537+
powersync.waitForFirstSync({ priority: prio }).then(partialSyncCompleted);
538+
expect(powersync.currentStatus.statusForPriority(prio).hasSynced).toBe(false);
539+
expect(partialSyncCompleted).not.toHaveBeenCalled();
540+
expect(syncCompleted).not.toHaveBeenCalled();
541+
542+
addRow(prio);
543+
syncService.pushLine({
544+
partial_checkpoint_complete: {
545+
last_op_id: operationId.toString(),
546+
priority: prio
547+
}
548+
});
549+
550+
await powersync.syncStreamImplementation!.waitUntilStatusMatches((status) => {
551+
return status.statusForPriority(prio).hasSynced === true;
552+
});
553+
await new Promise((r) => setTimeout(r));
554+
expect(partialSyncCompleted).toHaveBeenCalledOnce();
555+
556+
expect(await powersync.getAll('select * from lists')).toHaveLength(prio + 1);
557+
}
558+
559+
// Then, complete the sync.
560+
addRow(3);
561+
syncService.pushLine({ checkpoint_complete: { last_op_id: operationId.toString() } });
562+
await vi.waitFor(() => expect(syncCompleted).toHaveBeenCalledOnce(), 500);
563+
expect(await powersync.getAll('select * from lists')).toHaveLength(4);
564+
});
565+
566+
mockSyncServiceTest('Should remember sync state', async ({ syncService }) => {
567+
const powersync = await syncService.createDatabase();
568+
powersync.connect(new TestConnector(), options);
569+
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));
570+
571+
const buckets: BucketChecksum[] = [];
572+
for (let prio = 0; prio <= 3; prio++) {
573+
buckets.push({ bucket: `prio${prio}`, priority: prio, checksum: 0 });
574+
}
575+
syncService.pushLine({
576+
checkpoint: {
577+
last_op_id: '0',
578+
buckets
579+
}
580+
});
581+
syncService.pushLine({
582+
partial_checkpoint_complete: {
583+
last_op_id: '0',
584+
priority: 0
585+
}
586+
});
587+
588+
await powersync.waitForFirstSync({ priority: 0 });
589+
590+
// Open another database instance.
591+
const another = await syncService.createDatabase();
592+
await another.init();
593+
594+
expect(another.currentStatus.priorityStatusEntries).toHaveLength(1);
595+
expect(another.currentStatus.statusForPriority(0).hasSynced).toBeTruthy();
596+
await another.waitForFirstSync({ priority: 0 });
597+
});
462598
}
463599

464600
function bucket(name: string, count: number, options: { priority: number } = { priority: 3 }): BucketChecksum {

packages/node/tests/utils.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,13 @@ export const mockSyncServiceTest = tempDirectoryTest.extend<{
116116
});
117117

118118
return new Response(syncLines.pipeThrough(asLines) as any, { status: 200 });
119+
} else if (request.url.indexOf('/write-checkpoint2.json') != -1) {
120+
return new Response(
121+
JSON.stringify({
122+
data: { write_checkpoint: '1' }
123+
}),
124+
{ status: 200 }
125+
);
119126
} else {
120127
return new Response('Not found', { status: 404 });
121128
}

packages/web/tests/stream.test.ts

Lines changed: 0 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -303,127 +303,5 @@ function describeStreamingTests(createConnectedDatabase: () => Promise<Connected
303303
}
304304
);
305305
});
306-
307-
it('Should upload after reconnecting', async () => {
308-
const { powersync, connect, uploadSpy } = await createConnectedDatabase();
309-
expect(powersync.connected).toBe(true);
310-
311-
await powersync.disconnect();
312-
313-
// Status should update after uploads are completed
314-
await vi.waitFor(
315-
() => {
316-
// to-have-been-called seems to not work after failing a check
317-
expect(powersync.currentStatus.dataFlowStatus.uploading).false;
318-
},
319-
{
320-
timeout: UPLOAD_TIMEOUT_MS
321-
}
322-
);
323-
});
324-
325-
it('Should update sync state incrementally', async () => {
326-
const { powersync, remote } = await createConnectedDatabase();
327-
expect(powersync.currentStatus.dataFlowStatus.downloading).toBe(false);
328-
329-
const buckets: BucketChecksum[] = [];
330-
for (let prio = 0; prio <= 3; prio++) {
331-
buckets.push({ bucket: `prio${prio}`, priority: prio, checksum: 10 + prio });
332-
}
333-
remote.enqueueLine({
334-
checkpoint: {
335-
last_op_id: '4',
336-
buckets
337-
}
338-
});
339-
340-
let operationId = 1;
341-
const addRow = (prio: number) => {
342-
remote.enqueueLine({
343-
data: {
344-
bucket: `prio${prio}`,
345-
data: [
346-
{
347-
checksum: prio + 10,
348-
data: JSON.stringify({ name: 'row' }),
349-
op: 'PUT',
350-
op_id: (operationId++).toString(),
351-
object_id: `prio${prio}`,
352-
object_type: 'users'
353-
}
354-
]
355-
}
356-
});
357-
};
358-
359-
const syncCompleted = vi.fn();
360-
powersync.waitForFirstSync().then(syncCompleted);
361-
362-
// Emit partial sync complete for each priority but the last.
363-
for (var prio = 0; prio < 3; prio++) {
364-
const partialSyncCompleted = vi.fn();
365-
powersync.waitForFirstSync({ priority: prio }).then(partialSyncCompleted);
366-
expect(powersync.currentStatus.statusForPriority(prio).hasSynced).toBe(false);
367-
expect(partialSyncCompleted).not.toHaveBeenCalled();
368-
expect(syncCompleted).not.toHaveBeenCalled();
369-
370-
addRow(prio);
371-
remote.enqueueLine({
372-
partial_checkpoint_complete: {
373-
last_op_id: operationId.toString(),
374-
priority: prio
375-
}
376-
});
377-
378-
await powersync.syncStreamImplementation!.waitUntilStatusMatches((status) => {
379-
return status.statusForPriority(prio).hasSynced === true;
380-
});
381-
await new Promise((r) => setTimeout(r));
382-
expect(partialSyncCompleted).toHaveBeenCalledOnce();
383-
384-
expect(await powersync.getAll('select * from users')).toHaveLength(prio + 1);
385-
}
386-
387-
// Then, complete the sync.
388-
addRow(3);
389-
remote.enqueueLine({ checkpoint_complete: { last_op_id: operationId.toString() } });
390-
await vi.waitFor(() => expect(syncCompleted).toHaveBeenCalledOnce(), 500);
391-
expect(await powersync.getAll('select * from users')).toHaveLength(4);
392-
});
393-
394-
it('Should remember sync state', async () => {
395-
const { powersync, remote, openAnother } = await createConnectedDatabase();
396-
expect(powersync.currentStatus.dataFlowStatus.downloading).toBe(false);
397-
398-
const buckets: BucketChecksum[] = [];
399-
for (let prio = 0; prio <= 3; prio++) {
400-
buckets.push({ bucket: `prio${prio}`, priority: prio, checksum: 0 });
401-
}
402-
remote.enqueueLine({
403-
checkpoint: {
404-
last_op_id: '0',
405-
buckets
406-
}
407-
});
408-
remote.enqueueLine({
409-
partial_checkpoint_complete: {
410-
last_op_id: '0',
411-
priority: 0
412-
}
413-
});
414-
415-
await powersync.waitForFirstSync({ priority: 0 });
416-
417-
// Open another database instance.
418-
const another = openAnother();
419-
onTestFinished(async () => {
420-
await another.close();
421-
});
422-
await another.init();
423-
424-
expect(another.currentStatus.priorityStatusEntries).toHaveLength(1);
425-
expect(another.currentStatus.statusForPriority(0).hasSynced).toBeTruthy();
426-
await another.waitForFirstSync({ priority: 0 });
427-
});
428306
};
429307
}

0 commit comments

Comments
 (0)