Skip to content

Commit f5df9dc

Browse files
cleanup logic
1 parent 5ef22b1 commit f5df9dc

File tree

4 files changed

+101
-28
lines changed

4 files changed

+101
-28
lines changed

packages/common/src/client/AbstractPowerSyncDatabase.ts

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -464,6 +464,9 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
464464
protected async connectInternal() {
465465
let appliedOptions: PowerSyncConnectionOptions | null = null;
466466

467+
// This method ensures a disconnect before any connection attempt
468+
await this.disconnectInternal();
469+
467470
/**
468471
* This portion creates a sync implementation which can be racy when disconnecting or
469472
* if multiple tabs on web are in use.
@@ -474,13 +477,15 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
474477
if (this.closed) {
475478
throw new Error('Cannot connect using a closed client');
476479
}
480+
481+
// Always await this if present since we will be populating a new sync implementation shortly
482+
await this.disconnectingPromise;
483+
477484
if (!this.pendingConnectionOptions) {
478485
// A disconnect could have cleared this.
479486
return;
480487
}
481-
482488
// get pending options and clear it in order for other connect attempts to queue other options
483-
484489
const { connector, options } = this.pendingConnectionOptions;
485490
appliedOptions = options;
486491
this.pendingConnectionOptions = null;
@@ -510,6 +515,10 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
510515
return;
511516
}
512517

518+
// It might be possible that a disconnect triggered between the last check
519+
// and this point. Awaiting here allows the sync stream to be cleared if disconnected.
520+
await this.disconnectingPromise;
521+
513522
this.syncStreamImplementation?.triggerCrudUpload();
514523
this.options.logger?.debug('Attempting to connect to PowerSync instance');
515524
await this.syncStreamImplementation?.connect(appliedOptions!);
@@ -519,25 +528,44 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
519528
* Connects to stream of events from the PowerSync instance.
520529
*/
521530
async connect(connector: PowerSyncBackendConnector, options?: PowerSyncConnectionOptions) {
522-
// This overrides options if present.
531+
// Keep track if there were pending operations before this call
532+
const hadPendingOptions = !!this.pendingConnectionOptions;
533+
534+
// Update pending options to the latest values
523535
this.pendingConnectionOptions = {
524536
connector,
525537
options: options ?? {}
526538
};
527539

528540
await this.waitForReady();
529-
await this.disconnectInternal();
530541

531-
const chain = (result) => {
542+
// Disconnecting here provides aborting in progress connection attempts.
543+
// The connectInternal method will clear pending options once it starts connecting (with the options).
544+
// We only need to trigger a disconnect here if we have already reached the point of connecting.
545+
// If we do already have pending options, a disconnect has already been performed.
546+
// The connectInternal method also does a sanity disconnect to prevent straggler connections.
547+
if (!hadPendingOptions) {
548+
await this.disconnectInternal();
549+
}
550+
551+
// Triggers a connect which checks if pending options are available after the connect completes.
552+
// The completion can be for a successful, unsuccessful or aborted connection attempt.
553+
// If pending options are available another connection will be triggered.
554+
const checkConnection = async (): Promise<void> => {
532555
if (this.pendingConnectionOptions) {
533-
return this.connectInternal().then(chain);
556+
// Pending options have been placed while connecting.
557+
// Need to reconnect.
558+
this.connectingPromise = this.connectInternal().finally(checkConnection);
559+
return this.connectingPromise;
534560
} else {
561+
// Clear the connecting promise, done.
535562
this.connectingPromise = null;
536-
return result;
563+
return;
537564
}
538565
};
539566

540-
return this.connectingPromise ?? this.connectInternal().then(chain);
567+
this.connectingPromise ??= this.connectInternal().finally(checkConnection);
568+
return this.connectingPromise;
541569
}
542570

543571
/**
@@ -548,10 +576,11 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
548576
protected async disconnectInternal() {
549577
if (this.disconnectingPromise) {
550578
// A disconnect is already in progress
551-
return await this.disconnectingPromise;
579+
return this.disconnectingPromise;
552580
}
581+
553582
// Wait if a sync stream implementation is being created before closing it
554-
// (it must be assigned before we can properly dispose it)
583+
// (syncStreamImplementation must be assigned before we can properly dispose it)
555584
await this.syncStreamInitPromise;
556585

557586
this.disconnectingPromise = (async () => {

packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -347,16 +347,7 @@ The next upload iteration will be delayed.`);
347347

348348
// Return a promise that resolves when the connection status is updated
349349
return new Promise<void>((resolve) => {
350-
let disposer: () => void;
351-
352-
const complete = () => {
353-
disposer?.();
354-
resolve();
355-
};
356-
357-
controller.signal.addEventListener('abort', complete, { once: true });
358-
359-
disposer = this.registerListener({
350+
const disposer = this.registerListener({
360351
statusUpdated: (update) => {
361352
// This is triggered as soon as a connection is read from
362353
if (typeof update.connected == 'undefined') {
@@ -366,13 +357,15 @@ The next upload iteration will be delayed.`);
366357

367358
if (update.connected == false) {
368359
/**
369-
* This function does not reject if initial connect attempt failed
360+
* This function does not reject if initial connect attempt failed.
361+
* Connected can be false if the connection attempt was aborted or if the initial connection
362+
* attempt failed.
370363
*/
371364
this.logger.warn('Initial connect attempt did not successfully connect to server');
372365
}
373366

374-
controller.signal.removeEventListener('abort', complete);
375-
complete();
367+
disposer();
368+
resolve();
376369
}
377370
});
378371
});
@@ -530,6 +523,10 @@ The next upload iteration will be delayed.`);
530523

531524
const clientId = await this.options.adapter.getClientId();
532525

526+
if (signal.aborted) {
527+
return;
528+
}
529+
533530
this.logger.debug('Requesting stream from server');
534531

535532
const syncOptions: SyncStreamOptions = {
@@ -557,6 +554,7 @@ The next upload iteration will be delayed.`);
557554
this.logger.debug('Stream established. Processing events');
558555

559556
while (!stream.closed) {
557+
console.log('waiting for stream line');
560558
const line = await stream.read();
561559
if (!line) {
562560
// The stream has closed while waiting

packages/web/tests/stream.test.ts

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,10 @@
1-
import { BucketChecksum, PowerSyncConnectionOptions, WASQLiteOpenFactory, WASQLiteVFS } from '@powersync/web';
1+
import {
2+
BucketChecksum,
3+
DataStream,
4+
PowerSyncConnectionOptions,
5+
WASQLiteOpenFactory,
6+
WASQLiteVFS
7+
} from '@powersync/web';
28
import { describe, expect, it, onTestFinished, vi } from 'vitest';
39
import { TestConnector } from './utils/MockStreamOpenFactory';
410
import { ConnectedDatabaseUtils, generateConnectedDatabase } from './utils/generateConnectedDatabase';
@@ -160,12 +166,25 @@ function describeStreamingTests(createConnectedDatabase: () => Promise<Connected
160166

161167
it('PowerSync reconnect multiple connect calls', async () => {
162168
// This initially performs a connect call
163-
const { powersync, waitForStream } = await createConnectedDatabase();
169+
const { powersync, remote } = await createConnectedDatabase();
164170
expect(powersync.connected).toBe(true);
165171

166172
const spy = vi.spyOn(powersync as any, 'generateSyncStreamImplementation');
167173

168-
// connect many times
174+
// Keep track of all connection stream to check if they are correctly closed later
175+
const generatedStreams: DataStream<any>[] = [];
176+
177+
// This method is used for all mocked connections
178+
const basePostStream = remote.postStream;
179+
const postSpy = vi.spyOn(remote, 'postStream').mockImplementation(async (...options) => {
180+
// Simulate a connection delay
181+
await new Promise((r) => setTimeout(r, 100));
182+
const stream = await basePostStream.call(remote, ...options);
183+
generatedStreams.push(stream);
184+
return stream;
185+
});
186+
187+
// Connect many times. The calls here are not awaited and have no async calls in between.
169188
const connectionAttempts = 10;
170189
for (let i = 1; i <= connectionAttempts; i++) {
171190
powersync.connect(new TestConnector(), { params: { count: i } });
@@ -183,9 +202,9 @@ function describeStreamingTests(createConnectedDatabase: () => Promise<Connected
183202
// are in the same for loop
184203
expect(spy.mock.calls.length).lessThan(connectionAttempts);
185204

186-
// Now with random delays
205+
// Now with random awaited delays between unawaited calls
187206
for (let i = connectionAttempts; i >= 0; i--) {
188-
await new Promise((r) => setTimeout(r, Math.random() * 100));
207+
await new Promise((r) => setTimeout(r, Math.random() * 10));
189208
powersync.connect(new TestConnector(), { params: { count: i } });
190209
}
191210

@@ -196,6 +215,28 @@ function describeStreamingTests(createConnectedDatabase: () => Promise<Connected
196215
},
197216
{ timeout: 2000, interval: 100 }
198217
);
218+
219+
console.log(`Generated stream a total of ${spy.mock.calls.length} times`);
220+
expect(spy.mock.calls.length).lessThanOrEqual(2 * connectionAttempts);
221+
222+
// The last request should make a network request with the client params
223+
await vi.waitFor(
224+
() => {
225+
expect(postSpy.mock.lastCall?.[0].data.parameters!['count']).equals(0);
226+
// The async postStream call's invocation is added to the count of calls
227+
// before the generated stream is added (there is a delay)
228+
// expect that the stream has been generated and tracked.
229+
expect(postSpy.mock.calls.length).equals(generatedStreams.length);
230+
},
231+
{ timeout: 1000, interval: 100 }
232+
);
233+
234+
const lastConnectionStream = generatedStreams.pop();
235+
expect(lastConnectionStream).toBeDefined();
236+
expect(lastConnectionStream?.closed).false;
237+
238+
// All streams except the last one (which has been popped off already) should be closed
239+
expect(generatedStreams.every((i) => i.closed)).true;
199240
});
200241

201242
it('Should trigger upload connector when connected', async () => {

packages/web/tests/utils/MockStreamOpenFactory.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,11 @@ export class MockRemote extends AbstractRemote {
108108
logger: this.logger
109109
});
110110

111+
if (options.abortSignal?.aborted) {
112+
stream.close();
113+
}
114+
options.abortSignal?.addEventListener('abort', () => stream.close());
115+
111116
const l = stream.registerListener({
112117
lowWater: async () => {
113118
try {

0 commit comments

Comments
 (0)