Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/new-readers-dance.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/common': patch
---

Rust sync client: Fix `connect()` resolving before a connection is made.
Original file line number Diff line number Diff line change
Expand Up @@ -342,17 +342,18 @@ export abstract class AbstractStreamingSyncImplementation
let checkedCrudItem: CrudEntry | undefined;

while (true) {
this.updateSyncStatus({
dataFlow: {
uploading: true
}
});
try {
/**
* This is the first item in the FIFO CRUD queue.
*/
const nextCrudItem = await this.options.adapter.nextCrudItem();
if (nextCrudItem) {
this.updateSyncStatus({
dataFlow: {
uploading: true
}
});

if (nextCrudItem.clientId == checkedCrudItem?.clientId) {
// This will force a higher log level than exceptions which are caught here.
this.logger.warn(`Potentially previously uploaded CRUD entries are still present in the upload queue.
Expand Down Expand Up @@ -410,23 +411,15 @@ The next upload iteration will be delayed.`);
this.abortController = controller;
this.streamingSyncPromise = this.streamingSync(this.abortController.signal, options);

// Return a promise that resolves when the connection status is updated
// Return a promise that resolves when the connection status is updated to indicate that we're connected.
return new Promise<void>((resolve) => {
const disposer = this.registerListener({
statusUpdated: (update) => {
// This is triggered as soon as a connection is read from
if (typeof update.connected == 'undefined') {
// only concern with connection updates
return;
}

if (update.connected == false) {
/**
* This function does not reject if initial connect attempt failed.
* Connected can be false if the connection attempt was aborted or if the initial connection
* attempt failed.
*/
statusChanged: (status) => {
if (status.dataFlowStatus.downloadError != null) {
this.logger.warn('Initial connect attempt did not successfully connect to server');
} else if (status.connecting) {
// Still connecting.
return;
}

disposer();
Expand Down Expand Up @@ -889,6 +882,10 @@ The next upload iteration will be delayed.`);
);
}

// The rust client will set connected: true after the first sync line because that's when it gets invoked, but
// we're already connected here and can report that.
syncImplementation.updateSyncStatus({ connected: true });

try {
while (!controlInvocations.closed) {
const line = await controlInvocations.read();
Expand Down
43 changes: 43 additions & 0 deletions packages/node/tests/sync.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,49 @@ function defineSyncTests(impl: SyncClientImplementation) {
expect(Math.abs(lastSyncedAt - now)).toBeLessThan(5000);
});

mockSyncServiceTest('connect() waits for connection', async ({ syncService }) => {
const database = await syncService.createDatabase();
let connectCompleted = false;
database.connect(new TestConnector(), options).then(() => {
connectCompleted = true;
});
expect(connectCompleted).toBeFalsy();

await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));
// We want connected: true once we have a connection
await vi.waitFor(() => connectCompleted);
expect(database.currentStatus.dataFlowStatus.downloading).toBeFalsy();

syncService.pushLine({
checkpoint: {
last_op_id: '10',
buckets: [bucket('a', 10)]
}
});

await vi.waitFor(() => expect(database.currentStatus.dataFlowStatus.downloading).toBeTruthy());
});

mockSyncServiceTest('does not set uploading status without local writes', async ({ syncService }) => {
const database = await syncService.createDatabase();
database.registerListener({
statusChanged(status) {
expect(status.dataFlowStatus.uploading).toBeFalsy();
}
});

database.connect(new TestConnector(), options);
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));

syncService.pushLine({
checkpoint: {
last_op_id: '10',
buckets: [bucket('a', 10)]
}
});
await vi.waitFor(() => expect(database.currentStatus.dataFlowStatus.downloading).toBeTruthy());
});

describe('reports progress', () => {
let lastOpId = 0;

Expand Down