Skip to content

Commit e561dda

Browse files
use shared ConnectionManager in order to queue connections for multipletab scenarios
1 parent 308bef4 commit e561dda

File tree

7 files changed

+448
-304
lines changed

7 files changed

+448
-304
lines changed

packages/common/src/client/AbstractPowerSyncDatabase.ts

Lines changed: 34 additions & 161 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import { BaseObserver } from '../utils/BaseObserver.js';
1717
import { ControlledExecutor } from '../utils/ControlledExecutor.js';
1818
import { throttleTrailing } from '../utils/async.js';
1919
import { mutexRunExclusive } from '../utils/mutex.js';
20+
import { ConnectionManager } from './ConnectionManager.js';
2021
import { SQLOpenFactory, SQLOpenOptions, isDBAdapter, isSQLOpenFactory, isSQLOpenOptions } from './SQLOpenFactory.js';
2122
import { PowerSyncBackendConnector } from './connection/PowerSyncBackendConnector.js';
2223
import { runOnSchemaChange } from './runOnSchemaChange.js';
@@ -112,11 +113,6 @@ export interface PowerSyncCloseOptions {
112113
disconnect?: boolean;
113114
}
114115

115-
type StoredConnectionOptions = {
116-
connector: PowerSyncBackendConnector;
117-
options: PowerSyncConnectionOptions;
118-
};
119-
120116
const POWERSYNC_TABLE_MATCH = /(^ps_data__|^ps_data_local__)/;
121117

122118
const DEFAULT_DISCONNECT_CLEAR_OPTIONS: DisconnectAndClearOptions = {
@@ -170,40 +166,20 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
170166
*/
171167
currentStatus: SyncStatus;
172168

173-
syncStreamImplementation?: StreamingSyncImplementation;
174169
sdkVersion: string;
175170

176171
protected bucketStorageAdapter: BucketStorageAdapter;
177-
private syncStatusListenerDisposer?: () => void;
178172
protected _isReadyPromise: Promise<void>;
173+
protected connectionManager: ConnectionManager;
174+
175+
get syncStreamImplementation() {
176+
return this.connectionManager.syncStreamImplementation;
177+
}
179178

180179
protected _schema: Schema;
181180

182181
private _database: DBAdapter;
183182

184-
/**
185-
* Tracks active connection attempts
186-
*/
187-
protected connectingPromise: Promise<void> | null;
188-
/**
189-
* Tracks actively instantiating a streaming sync implementation.
190-
*/
191-
protected syncStreamInitPromise: Promise<void> | null;
192-
/**
193-
* Active disconnect operation. Calling disconnect multiple times
194-
* will resolve to the same operation.
195-
*/
196-
protected disconnectingPromise: Promise<void> | null;
197-
/**
198-
* Tracks the last parameters supplied to `connect` calls.
199-
* Calling `connect` multiple times in succession will result in:
200-
* - 1 pending connection operation which will be aborted.
201-
* - updating the last set of parameters while waiting for the pending
202-
* attempt to be aborted
203-
* - internally connecting with the last set of parameters
204-
*/
205-
protected pendingConnectionOptions: StoredConnectionOptions | null;
206-
207183
protected connectionMutex: Mutex;
208184

209185
constructor(options: PowerSyncDatabaseOptionsWithDBAdapter);
@@ -236,11 +212,33 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
236212
this._schema = schema;
237213
this.ready = false;
238214
this.sdkVersion = '';
239-
this.connectingPromise = null;
240-
this.syncStreamInitPromise = null;
241-
this.pendingConnectionOptions = null;
242215
this.connectionMutex = new Mutex();
243216
// Start async init
217+
this.connectionManager = new ConnectionManager({
218+
createSyncImplementation: async (connector, options) => {
219+
await this.waitForReady();
220+
221+
return this.runExclusive(async () => {
222+
const sync = this.generateSyncStreamImplementation(connector, this.resolvedConnectionOptions(options));
223+
const onDispose = sync.registerListener({
224+
statusChanged: (status) => {
225+
this.currentStatus = new SyncStatus({
226+
...status.toJSON(),
227+
hasSynced: this.currentStatus?.hasSynced || !!status.lastSyncedAt
228+
});
229+
this.iterateListeners((cb) => cb.statusChanged?.(this.currentStatus));
230+
}
231+
});
232+
await sync.waitForReady();
233+
234+
return {
235+
sync,
236+
onDispose
237+
};
238+
});
239+
},
240+
logger: this.logger
241+
});
244242
this._isReadyPromise = this.initialize();
245243
}
246244

@@ -467,111 +465,11 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
467465
return this.connectionMutex.runExclusive(callback);
468466
}
469467

470-
protected async connectInternal() {
471-
let appliedOptions: PowerSyncConnectionOptions | null = null;
472-
473-
// This method ensures a disconnect before any connection attempt
474-
await this.disconnectInternal();
475-
476-
/**
477-
* This portion creates a sync implementation which can be racy when disconnecting or
478-
* if multiple tabs on web are in use.
479-
* This is protected in an exclusive lock.
480-
* The promise tracks the creation which is used to synchronize disconnect attempts.
481-
*/
482-
this.syncStreamInitPromise = this.runExclusive(async () => {
483-
if (this.closed) {
484-
throw new Error('Cannot connect using a closed client');
485-
}
486-
487-
// Always await this if present since we will be populating a new sync implementation shortly
488-
await this.disconnectingPromise;
489-
490-
if (!this.pendingConnectionOptions) {
491-
// A disconnect could have cleared this.
492-
return;
493-
}
494-
// get pending options and clear it in order for other connect attempts to queue other options
495-
const { connector, options } = this.pendingConnectionOptions;
496-
appliedOptions = options;
497-
this.pendingConnectionOptions = null;
498-
499-
this.syncStreamImplementation = this.generateSyncStreamImplementation(
500-
connector,
501-
this.resolvedConnectionOptions(options)
502-
);
503-
this.syncStatusListenerDisposer = this.syncStreamImplementation.registerListener({
504-
statusChanged: (status) => {
505-
this.currentStatus = new SyncStatus({
506-
...status.toJSON(),
507-
hasSynced: this.currentStatus?.hasSynced || !!status.lastSyncedAt
508-
});
509-
this.iterateListeners((cb) => cb.statusChanged?.(this.currentStatus));
510-
}
511-
});
512-
513-
await this.syncStreamImplementation.waitForReady();
514-
});
515-
516-
await this.syncStreamInitPromise;
517-
this.syncStreamInitPromise = null;
518-
519-
if (!appliedOptions) {
520-
// A disconnect could have cleared the options which did not create a syncStreamImplementation
521-
return;
522-
}
523-
524-
// It might be possible that a disconnect triggered between the last check
525-
// and this point. Awaiting here allows the sync stream to be cleared if disconnected.
526-
await this.disconnectingPromise;
527-
528-
this.syncStreamImplementation?.triggerCrudUpload();
529-
this.options.logger?.debug('Attempting to connect to PowerSync instance');
530-
await this.syncStreamImplementation?.connect(appliedOptions!);
531-
}
532-
533468
/**
534469
* Connects to stream of events from the PowerSync instance.
535470
*/
536471
async connect(connector: PowerSyncBackendConnector, options?: PowerSyncConnectionOptions) {
537-
// Keep track if there were pending operations before this call
538-
const hadPendingOptions = !!this.pendingConnectionOptions;
539-
540-
// Update pending options to the latest values
541-
this.pendingConnectionOptions = {
542-
connector,
543-
options: options ?? {}
544-
};
545-
546-
await this.waitForReady();
547-
548-
// Disconnecting here provides aborting in progress connection attempts.
549-
// The connectInternal method will clear pending options once it starts connecting (with the options).
550-
// We only need to trigger a disconnect here if we have already reached the point of connecting.
551-
// If we do already have pending options, a disconnect has already been performed.
552-
// The connectInternal method also does a sanity disconnect to prevent straggler connections.
553-
if (!hadPendingOptions) {
554-
await this.disconnectInternal();
555-
}
556-
557-
// Triggers a connect which checks if pending options are available after the connect completes.
558-
// The completion can be for a successful, unsuccessful or aborted connection attempt.
559-
// If pending options are available another connection will be triggered.
560-
const checkConnection = async (): Promise<void> => {
561-
if (this.pendingConnectionOptions) {
562-
// Pending options have been placed while connecting.
563-
// Need to reconnect.
564-
this.connectingPromise = this.connectInternal().finally(checkConnection);
565-
return this.connectingPromise;
566-
} else {
567-
// Clear the connecting promise, done.
568-
this.connectingPromise = null;
569-
return;
570-
}
571-
};
572-
573-
this.connectingPromise ??= this.connectInternal().finally(checkConnection);
574-
return this.connectingPromise;
472+
return this.connectionManager.connect(connector, options);
575473
}
576474

577475
/**
@@ -581,32 +479,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
581479
*/
582480
async disconnect() {
583481
await this.waitForReady();
584-
// This will help abort pending connects
585-
this.pendingConnectionOptions = null;
586-
await this.disconnectInternal();
587-
}
588-
589-
protected async disconnectInternal() {
590-
if (this.disconnectingPromise) {
591-
// A disconnect is already in progress
592-
return this.disconnectingPromise;
593-
}
594-
595-
// Wait if a sync stream implementation is being created before closing it
596-
// (syncStreamImplementation must be assigned before we can properly dispose it)
597-
await this.syncStreamInitPromise;
598-
599-
this.disconnectingPromise = this.performDisconnect();
600-
601-
await this.disconnectingPromise;
602-
this.disconnectingPromise = null;
603-
}
604-
605-
protected async performDisconnect() {
606-
await this.syncStreamImplementation?.disconnect();
607-
this.syncStatusListenerDisposer?.();
608-
await this.syncStreamImplementation?.dispose();
609-
this.syncStreamImplementation = undefined;
482+
return this.connectionManager.disconnect();
610483
}
611484

612485
/**
@@ -653,7 +526,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
653526
await this.disconnect();
654527
}
655528

656-
await this.syncStreamImplementation?.dispose();
529+
await this.connectionManager.close();
657530
await this.database.close();
658531
this.closed = true;
659532
}

0 commit comments

Comments
 (0)