Skip to content

Commit 9d37979

Browse files
Fix reconnect bugs
1 parent fe2bba9 commit 9d37979

File tree

3 files changed

+82
-52
lines changed

3 files changed

+82
-52
lines changed

packages/common/src/client/ConnectionManager.ts

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ import {
1111
*/
1212
export interface ConnectionManagerSyncImplementationResult {
1313
sync: StreamingSyncImplementation;
14+
/**
15+
* Additional cleanup function which is called after the sync stream implementation
16+
* is disposed.
17+
*/
1418
onDispose: () => Promise<void> | void;
1519
}
1620

@@ -65,7 +69,12 @@ export class ConnectionManager extends BaseObserver<ConnectionManagerListener> {
6569
protected pendingConnectionOptions: StoredConnectionOptions | null;
6670

6771
syncStreamImplementation: StreamingSyncImplementation | null;
68-
syncDisposer: (() => Promise<void> | void) | null;
72+
73+
/**
74+
* Additional cleanup function which is called after the sync stream implementation
75+
* is disposed.
76+
*/
77+
protected syncDisposer: (() => Promise<void> | void) | null;
6978

7079
constructor(protected options: ConnectionManagerOptions) {
7180
super();
@@ -82,6 +91,7 @@ export class ConnectionManager extends BaseObserver<ConnectionManagerListener> {
8291
}
8392

8493
async close() {
94+
await this.syncStreamImplementation?.dispose();
8595
await this.syncDisposer?.();
8696
}
8797

@@ -101,7 +111,7 @@ export class ConnectionManager extends BaseObserver<ConnectionManagerListener> {
101111
// If we do already have pending options, a disconnect has already been performed.
102112
// The connectInternal method also does a sanity disconnect to prevent straggler connections.
103113
// We should also disconnect if we have already completed a connection attempt.
104-
if (!hadPendingOptions) {
114+
if (!hadPendingOptions || this.syncStreamImplementation) {
105115
await this.disconnectInternal();
106116
}
107117

@@ -145,11 +155,14 @@ export class ConnectionManager extends BaseObserver<ConnectionManagerListener> {
145155
return;
146156
}
147157

158+
if (this.disconnectingPromise) {
159+
return;
160+
}
161+
148162
const { connector, options } = this.pendingConnectionOptions;
149163
appliedOptions = options;
150164

151165
this.pendingConnectionOptions = null;
152-
153166
const { sync, onDispose } = await this.options.createSyncImplementation(connector, options);
154167
this.iterateListeners((l) => l.syncStreamCreated?.(sync));
155168
this.syncStreamImplementation = sync;
@@ -195,17 +208,17 @@ export class ConnectionManager extends BaseObserver<ConnectionManagerListener> {
195208
return this.disconnectingPromise;
196209
}
197210

198-
// Wait if a sync stream implementation is being created before closing it
199-
// (syncStreamImplementation must be assigned before we can properly dispose it)
200-
await this.syncStreamInitPromise;
201-
202211
this.disconnectingPromise = this.performDisconnect();
203212

204213
await this.disconnectingPromise;
205214
this.disconnectingPromise = null;
206215
}
207216

208217
protected async performDisconnect() {
218+
// Wait if a sync stream implementation is being created before closing it
219+
// (syncStreamImplementation must be assigned before we can properly dispose it)
220+
await this.syncStreamInitPromise;
221+
209222
// Keep reference to the sync stream implementation and disposer
210223
// The class members will be cleared before we trigger the disconnect
211224
// to prevent any further calls to the sync stream implementation.

packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -449,6 +449,7 @@ The next upload iteration will be delayed.`);
449449
await this.streamingSyncIteration(nestedAbortController.signal, options);
450450
// Continue immediately, streamingSyncIteration will wait before completing if necessary.
451451
} catch (ex) {
452+
let delay = true;
452453
/**
453454
* Either:
454455
* - A network request failed with a failed connection or not OKAY response code.
@@ -459,6 +460,8 @@ The next upload iteration will be delayed.`);
459460
*/
460461
if (ex instanceof AbortOperation) {
461462
this.logger.warn(ex);
463+
delay = false;
464+
// A disconnect was requested, we should not delay since there is no explicit retry
462465
} else {
463466
this.logger.error(ex);
464467
}
@@ -470,7 +473,9 @@ The next upload iteration will be delayed.`);
470473
});
471474

472475
// On error, wait a little before retrying
473-
await this.delayRetry();
476+
if (delay) {
477+
await this.delayRetry();
478+
}
474479
} finally {
475480
if (!signal.aborted) {
476481
nestedAbortController.abort(new AbortOperation('Closing sync stream network requests before retry.'));

packages/web/src/worker/sync/SharedSyncImplementation.ts

Lines changed: 56 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import { AbstractSharedSyncClientProvider } from './AbstractSharedSyncClientProv
3030
import { BroadcastLogger } from './BroadcastLogger';
3131

3232
/**
33+
* @internal
3334
* Manual message events for shared sync clients
3435
*/
3536
export enum SharedSyncClientEvent {
@@ -42,6 +43,9 @@ export enum SharedSyncClientEvent {
4243
CLOSE_ACK = 'close-ack'
4344
}
4445

46+
/**
47+
* @internal
48+
*/
4549
export type ManualSharedSyncPayload = {
4650
event: SharedSyncClientEvent;
4751
data: any; // TODO update in future
@@ -85,6 +89,7 @@ export type RemoteOperationAbortController = {
8589
* We provide this unused placeholder when connecting with the ConnectionManager.
8690
*/
8791
const CONNECTOR_PLACEHOLDER = {} as PowerSyncBackendConnector;
92+
8893
/**
8994
* @internal
9095
* Shared sync implementation which runs inside a shared webworker
@@ -133,23 +138,25 @@ export class SharedSyncImplementation
133138
this.broadCastLogger = new BroadcastLogger(this.ports);
134139

135140
this.connectionManager = new ConnectionManager({
136-
createSyncImplementation: async (connector, options) => {
137-
await this.waitForReady();
138-
if (!this.dbAdapter) {
139-
await this.openInternalDB();
140-
}
141-
142-
const sync = this.generateStreamingImplementation();
143-
const onDispose = sync.registerListener({
144-
statusChanged: (status) => {
145-
this.updateAllStatuses(status.toJSON());
141+
createSyncImplementation: async () => {
142+
return this.portMutex.runExclusive(async () => {
143+
await this.waitForReady();
144+
if (!this.dbAdapter) {
145+
await this.openInternalDB();
146146
}
147-
});
148147

149-
return {
150-
sync,
151-
onDispose
152-
};
148+
const sync = await this.generateStreamingImplementation();
149+
const onDispose = sync.registerListener({
150+
statusChanged: (status) => {
151+
this.updateAllStatuses(status.toJSON());
152+
}
153+
});
154+
155+
return {
156+
sync,
157+
onDispose
158+
};
159+
});
153160
},
154161
logger: this.logger
155162
});
@@ -190,15 +197,17 @@ export class SharedSyncImplementation
190197
async setParams(params: SharedSyncInitOptions) {
191198
await this.portMutex.runExclusive(async () => {
192199
if (this.syncParams) {
200+
// Cannot modify already existing sync implementation params
201+
// But we can ask for a DB adapter, if required, at this point.
202+
193203
if (!this.dbAdapter) {
194204
await this.openInternalDB();
195205
}
196-
// Cannot modify already existing sync implementation
197206
return;
198207
}
199208

209+
// First time setting params
200210
this.syncParams = params;
201-
202211
if (params.streamOptions?.flags?.broadcastLogs) {
203212
this.logger = this.broadCastLogger;
204213
}
@@ -229,17 +238,12 @@ export class SharedSyncImplementation
229238
* connects.
230239
*/
231240
async connect(options?: PowerSyncConnectionOptions) {
232-
await this.portMutex.runExclusive(async () => {
233-
// Keep track of the last connect options if we need to reconnect due to a lost client
234-
this.lastConnectOptions = options;
235-
return this.connectionManager.connect(CONNECTOR_PLACEHOLDER, options);
236-
});
241+
this.lastConnectOptions = options;
242+
return this.connectionManager.connect(CONNECTOR_PLACEHOLDER, options);
237243
}
238244

239245
async disconnect() {
240-
await this.portMutex.runExclusive(async () => {
241-
await this.connectionManager.disconnect();
242-
});
246+
return this.connectionManager.disconnect();
243247
}
244248

245249
/**
@@ -266,11 +270,11 @@ export class SharedSyncImplementation
266270
* clients.
267271
*/
268272
async removePort(port: MessagePort) {
269-
return await this.portMutex.runExclusive(async () => {
273+
const { trackedPort, shouldReconnect } = await this.portMutex.runExclusive(async () => {
270274
const index = this.ports.findIndex((p) => p.port == port);
271275
if (index < 0) {
272276
this.logger.warn(`Could not remove port ${port} since it is not present in active ports.`);
273-
return;
277+
return {};
274278
}
275279

276280
const trackedPort = this.ports[index];
@@ -291,26 +295,35 @@ export class SharedSyncImplementation
291295

292296
const shouldReconnect = !!this.connectionManager.syncStreamImplementation && this.ports.length > 0;
293297

294-
if (this.dbAdapter && this.dbAdapter == trackedPort.db) {
295-
if (shouldReconnect) {
296-
await this.connectionManager.disconnect();
297-
}
298+
return {
299+
shouldReconnect,
300+
trackedPort
301+
};
302+
});
298303

299-
// Clearing the adapter will result in a new one being opened in connect
300-
this.dbAdapter = null;
304+
if (!trackedPort) {
305+
// We could not find the port to remove
306+
return () => {};
307+
}
301308

302-
if (shouldReconnect) {
303-
await this.connectionManager.connect(CONNECTOR_PLACEHOLDER, this.lastConnectOptions);
304-
}
309+
if (this.dbAdapter && this.dbAdapter == trackedPort.db) {
310+
if (shouldReconnect) {
311+
await this.connectionManager.disconnect();
305312
}
306313

307-
if (trackedPort.db) {
308-
await trackedPort.db.close();
314+
// Clearing the adapter will result in a new one being opened in connect
315+
this.dbAdapter = null;
316+
317+
if (shouldReconnect) {
318+
await this.connectionManager.connect(CONNECTOR_PLACEHOLDER, this.lastConnectOptions);
309319
}
310-
this.logger.debug(`Port ${port} removed from shared sync implementation.`);
311-
// Release proxy
312-
return () => trackedPort.clientProvider[Comlink.releaseProxy]();
313-
});
320+
}
321+
322+
if (trackedPort.db) {
323+
await trackedPort.db.close();
324+
}
325+
// Release proxy
326+
return () => trackedPort.clientProvider[Comlink.releaseProxy]();
314327
}
315328

316329
triggerCrudUpload() {
@@ -351,7 +364,6 @@ export class SharedSyncImplementation
351364
protected generateStreamingImplementation() {
352365
// This should only be called after initialization has completed
353366
const syncParams = this.syncParams!;
354-
355367
// Create a new StreamingSyncImplementation for each connect call. This is usually done is all SDKs.
356368
return new WebStreamingSyncImplementation({
357369
adapter: new SqliteBucketStorage(this.dbAdapter!, new Mutex(), this.logger),
@@ -454,7 +466,7 @@ export class SharedSyncImplementation
454466
* A function only used for unit tests which updates the internal
455467
* sync stream client and all tab client's sync status
456468
*/
457-
private _testUpdateAllStatuses(status: SyncStatusOptions) {
469+
private async _testUpdateAllStatuses(status: SyncStatusOptions) {
458470
if (!this.connectionManager.syncStreamImplementation) {
459471
// This is just for testing purposes
460472
this.connectionManager.syncStreamImplementation = this.generateStreamingImplementation();

0 commit comments

Comments
 (0)