Skip to content

Commit 2e9df4b

Browse files
improve queue behaviour
1 parent 38d36e2 commit 2e9df4b

File tree

8 files changed

+207
-50
lines changed

8 files changed

+207
-50
lines changed

.changeset/empty-pants-give.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
'@powersync/react-native': minor
3+
'@powersync/common': minor
4+
'@powersync/node': minor
5+
'@powersync/web': minor
6+
---
7+
8+
Improved behvaiour when connect is called multiple times in quick succession. Updating client parameters should now be more responsive.

packages/common/rollup.config.mjs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ import commonjs from '@rollup/plugin-commonjs';
22
import inject from '@rollup/plugin-inject';
33
import json from '@rollup/plugin-json';
44
import nodeResolve from '@rollup/plugin-node-resolve';
5-
import terser from '@rollup/plugin-terser';
65

76
export default (commandLineArgs) => {
87
const sourcemap = (commandLineArgs.sourceMap || 'true') == 'true';
@@ -26,8 +25,8 @@ export default (commandLineArgs) => {
2625
ReadableStream: ['web-streams-polyfill/ponyfill', 'ReadableStream'],
2726
// Used by can-ndjson-stream
2827
TextDecoder: ['text-encoding', 'TextDecoder']
29-
}),
30-
terser()
28+
})
29+
// terser()
3130
],
3231
// This makes life easier
3332
external: [

packages/common/src/client/AbstractPowerSyncDatabase.ts

Lines changed: 153 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,14 @@ import {
99
UpdateNotification,
1010
isBatchedUpdateNotification
1111
} from '../db/DBAdapter.js';
12+
import { FULL_SYNC_PRIORITY } from '../db/crud/SyncProgress.js';
1213
import { SyncPriorityStatus, SyncStatus } from '../db/crud/SyncStatus.js';
1314
import { UploadQueueStats } from '../db/crud/UploadQueueStatus.js';
1415
import { Schema } from '../db/schema/Schema.js';
1516
import { BaseObserver } from '../utils/BaseObserver.js';
1617
import { ControlledExecutor } from '../utils/ControlledExecutor.js';
17-
import { mutexRunExclusive } from '../utils/mutex.js';
1818
import { throttleTrailing } from '../utils/async.js';
19+
import { mutexRunExclusive } from '../utils/mutex.js';
1920
import { SQLOpenFactory, SQLOpenOptions, isDBAdapter, isSQLOpenFactory, isSQLOpenOptions } from './SQLOpenFactory.js';
2021
import { PowerSyncBackendConnector } from './connection/PowerSyncBackendConnector.js';
2122
import { runOnSchemaChange } from './runOnSchemaChange.js';
@@ -32,7 +33,6 @@ import {
3233
type PowerSyncConnectionOptions,
3334
type RequiredAdditionalConnectionOptions
3435
} from './sync/stream/AbstractStreamingSyncImplementation.js';
35-
import { FULL_SYNC_PRIORITY } from '../db/crud/SyncProgress.js';
3636

3737
export interface DisconnectAndClearOptions {
3838
/** When set to false, data in local-only tables is preserved. */
@@ -112,6 +112,11 @@ export interface PowerSyncCloseOptions {
112112
disconnect?: boolean;
113113
}
114114

115+
type StoredConnectionOptions = {
116+
connector: PowerSyncBackendConnector;
117+
options: PowerSyncConnectionOptions;
118+
};
119+
115120
const POWERSYNC_TABLE_MATCH = /(^ps_data__|^ps_data_local__)/;
116121

117122
const DEFAULT_DISCONNECT_CLEAR_OPTIONS: DisconnectAndClearOptions = {
@@ -176,6 +181,29 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
176181

177182
private _database: DBAdapter;
178183

184+
/**
185+
* Tracks active connection attempts
186+
*/
187+
protected connectingPromise: Promise<void> | null;
188+
/**
189+
* Tracks actively instantiating a streaming sync implementation.
190+
*/
191+
protected syncStreamInitPromise: Promise<void> | null;
192+
/**
193+
* Active disconnect operation. Call disconnect multiple times
194+
* will resolve to the same operation.
195+
*/
196+
protected disconnectingPromise: Promise<void> | null;
197+
/**
198+
* Tracks the last parameters supplied to `connect` calls.
199+
* Calling `connect` multiple times in succession will result in:
200+
* - 1 pending connection operation which will be aborted.
201+
* - updating the last set of parameters while waiting for the pending
202+
* attempt to be aborted
203+
* - internally connecting with the last set of parameters
204+
*/
205+
protected pendingConnectionOptions: StoredConnectionOptions | null;
206+
179207
constructor(options: PowerSyncDatabaseOptionsWithDBAdapter);
180208
constructor(options: PowerSyncDatabaseOptionsWithOpenFactory);
181209
constructor(options: PowerSyncDatabaseOptionsWithSettings);
@@ -206,6 +234,9 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
206234
this._schema = schema;
207235
this.ready = false;
208236
this.sdkVersion = '';
237+
this.connectingPromise = null;
238+
this.syncStreamInitPromise = null;
239+
this.pendingConnectionOptions = null;
209240
// Start async init
210241
this._isReadyPromise = this.initialize();
211242
}
@@ -425,34 +456,133 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
425456
};
426457
}
427458

459+
/**
460+
* Locking mechagnism for exclusively running critical portions of connect/disconnect operations.
461+
*/
462+
protected abstract connectExclusive(callback: () => Promise<void>): Promise<void>;
463+
464+
protected async connectInternal() {
465+
await this.disconnectInternal();
466+
467+
let appliedOptions: PowerSyncConnectionOptions | null = null;
468+
469+
/**
470+
* This portion creates a sync implementation which can be racy when disconnecting or
471+
* if multiple tabs on web are in use.
472+
* This is protected in an exclusive lock.
473+
* The promise tracks the creation which is used to synchronize disconnect attempts.
474+
*/
475+
this.syncStreamInitPromise = this.connectExclusive(async () => {
476+
if (this.closed) {
477+
throw new Error('Cannot connect using a closed client');
478+
}
479+
if (!this.pendingConnectionOptions) {
480+
// A disconnect could have cleared this.
481+
return;
482+
}
483+
484+
// get pending options and clear it in order for other connect attempts to queue other options
485+
486+
const { connector, options } = this.pendingConnectionOptions;
487+
appliedOptions = options;
488+
this.pendingConnectionOptions = null;
489+
490+
this.syncStreamImplementation = this.generateSyncStreamImplementation(
491+
connector,
492+
this.resolvedConnectionOptions(options)
493+
);
494+
this.syncStatusListenerDisposer = this.syncStreamImplementation.registerListener({
495+
statusChanged: (status) => {
496+
this.currentStatus = new SyncStatus({
497+
...status.toJSON(),
498+
hasSynced: this.currentStatus?.hasSynced || !!status.lastSyncedAt
499+
});
500+
this.iterateListeners((cb) => cb.statusChanged?.(this.currentStatus));
501+
}
502+
});
503+
504+
await this.syncStreamImplementation.waitForReady();
505+
});
506+
507+
await this.syncStreamInitPromise;
508+
this.syncStreamInitPromise = null;
509+
510+
if (!appliedOptions) {
511+
// A disconnect could have cleared the options which did not create a syncStreamImplementation
512+
return;
513+
}
514+
515+
this.syncStreamImplementation?.triggerCrudUpload();
516+
this.options.logger?.debug('Attempting to connect to PowerSync instance');
517+
await this.syncStreamImplementation?.connect(appliedOptions!);
518+
}
519+
428520
/**
429521
* Connects to stream of events from the PowerSync instance.
430522
*/
431523
async connect(connector: PowerSyncBackendConnector, options?: PowerSyncConnectionOptions) {
432524
await this.waitForReady();
433525

434-
// close connection if one is open
435-
await this.disconnect();
436-
if (this.closed) {
437-
throw new Error('Cannot connect using a closed client');
438-
}
526+
// A pending connection should be present if this is true
527+
// The options also have not been used yet if this is true.
528+
// We can update this referrence in order to update the next connection attempt.
529+
const hadPendingConnectionOptions = !!this.pendingConnectionOptions;
439530

440-
const resolvedConnectOptions = this.resolvedConnectionOptions(options);
531+
// This overrides options if present.
532+
this.pendingConnectionOptions = {
533+
connector,
534+
options: options ?? {}
535+
};
441536

442-
this.syncStreamImplementation = this.generateSyncStreamImplementation(connector, resolvedConnectOptions);
443-
this.syncStatusListenerDisposer = this.syncStreamImplementation.registerListener({
444-
statusChanged: (status) => {
445-
this.currentStatus = new SyncStatus({
446-
...status.toJSON(),
447-
hasSynced: this.currentStatus?.hasSynced || !!status.lastSyncedAt
448-
});
449-
this.iterateListeners((cb) => cb.statusChanged?.(this.currentStatus));
537+
if (hadPendingConnectionOptions) {
538+
// A connection attempt is already queued, but it hasn't used the options yet.
539+
// The params have been updated and will be used when connecting.
540+
if (!this.connectingPromise) {
541+
throw new Error(`Pending connection options were found without a pending connect operation`);
450542
}
451-
});
543+
return await this.connectingPromise;
544+
} else if (this.connectingPromise) {
545+
// If we didn't have pending options, we are busy with a connect.
546+
// i.e. the pending connect used the options already and is busy proceeding.
547+
// The call which creates the connectingPromise should check if there are pendingConnectionOptions and automatically
548+
// schedule a connect. See below:
549+
} else {
550+
// No pending options or pending operation. Start one
551+
this.connectingPromise = this.connectInternal().finally(() => {
552+
if (this.pendingConnectionOptions) {
553+
return this.connectInternal();
554+
}
555+
this.connectingPromise = null;
556+
return;
557+
});
558+
return await this.connectingPromise;
559+
}
560+
}
452561

453-
await this.syncStreamImplementation.waitForReady();
454-
this.syncStreamImplementation.triggerCrudUpload();
455-
await this.syncStreamImplementation.connect(options);
562+
/**
563+
* Close the sync connection.
564+
*
565+
* Use {@link connect} to connect again.
566+
*/
567+
protected async disconnectInternal() {
568+
if (this.disconnectingPromise) {
569+
// A disconnect is already in progress
570+
return await this.disconnectingPromise;
571+
}
572+
573+
// Wait if a sync stream implementation is being created before closing it
574+
// (it must be assigned before we can properly dispose it)
575+
await this.syncStreamInitPromise;
576+
577+
this.disconnectingPromise = (async () => {
578+
await this.syncStreamImplementation?.disconnect();
579+
this.syncStatusListenerDisposer?.();
580+
await this.syncStreamImplementation?.dispose();
581+
this.syncStreamImplementation = undefined;
582+
})();
583+
584+
await this.disconnectingPromise;
585+
this.disconnectingPromise = null;
456586
}
457587

458588
/**
@@ -462,10 +592,9 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
462592
*/
463593
async disconnect() {
464594
await this.waitForReady();
465-
await this.syncStreamImplementation?.disconnect();
466-
this.syncStatusListenerDisposer?.();
467-
await this.syncStreamImplementation?.dispose();
468-
this.syncStreamImplementation = undefined;
595+
// This will help abort pending connects
596+
this.pendingConnectionOptions = null;
597+
await this.disconnectInternal();
469598
}
470599

471600
/**

packages/node/src/db/PowerSyncDatabase.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,22 +5,21 @@ import {
55
AdditionalConnectionOptions,
66
BucketStorageAdapter,
77
DBAdapter,
8-
DEFAULT_REMOTE_LOGGER,
98
PowerSyncBackendConnector,
109
PowerSyncConnectionOptions,
1110
PowerSyncDatabaseOptions,
1211
PowerSyncDatabaseOptionsWithSettings,
13-
RequiredAdditionalConnectionOptions,
1412
SqliteBucketStorage,
1513
SQLOpenFactory
1614
} from '@powersync/common';
1715

1816
import { NodeRemote } from '../sync/stream/NodeRemote.js';
1917
import { NodeStreamingSyncImplementation } from '../sync/stream/NodeStreamingSyncImplementation.js';
2018

19+
import Lock from 'async-lock';
20+
import { Dispatcher } from 'undici';
2121
import { BetterSQLite3DBAdapter } from './BetterSQLite3DBAdapter.js';
2222
import { NodeSQLOpenOptions } from './options.js';
23-
import { Dispatcher } from 'undici';
2423

2524
export type NodePowerSyncDatabaseOptions = PowerSyncDatabaseOptions & {
2625
database: DBAdapter | SQLOpenFactory | NodeSQLOpenOptions;
@@ -57,8 +56,11 @@ export type NodePowerSyncConnectionOptions = PowerSyncConnectionOptions & NodeAd
5756
* ```
5857
*/
5958
export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
59+
protected connectionLock: Lock;
60+
6061
constructor(options: NodePowerSyncDatabaseOptions) {
6162
super(options);
63+
this.connectionLock = new Lock();
6264
}
6365

6466
async _initialize(): Promise<void> {
@@ -83,6 +85,10 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
8385
return super.connect(connector, options);
8486
}
8587

88+
protected async connectExclusive(callback: () => Promise<void>): Promise<void> {
89+
await this.connectionLock.acquire(`connection-lock-${this.database.name}`, callback);
90+
}
91+
8692
protected generateSyncStreamImplementation(
8793
connector: PowerSyncBackendConnector,
8894
options: NodeAdditionalConnectionOptions

packages/react-native/src/db/PowerSyncDatabase.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import {
88
type RequiredAdditionalConnectionOptions,
99
SqliteBucketStorage
1010
} from '@powersync/common';
11+
import Lock from 'async-lock';
1112
import { ReactNativeRemote } from '../sync/stream/ReactNativeRemote';
1213
import { ReactNativeStreamingSyncImplementation } from '../sync/stream/ReactNativeStreamingSyncImplementation';
1314
import { ReactNativeQuickSqliteOpenFactory } from './adapters/react-native-quick-sqlite/ReactNativeQuickSQLiteOpenFactory';
@@ -27,6 +28,8 @@ import { ReactNativeQuickSqliteOpenFactory } from './adapters/react-native-quick
2728
* ```
2829
*/
2930
export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
31+
protected connectionLock = new Lock();
32+
3033
async _initialize(): Promise<void> {}
3134

3235
/**
@@ -42,6 +45,10 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
4245
return new SqliteBucketStorage(this.database, AbstractPowerSyncDatabase.transactionMutex);
4346
}
4447

48+
protected async connectExclusive(callback: () => Promise<void>): Promise<void> {
49+
await this.connectionLock.acquire(`connection-lock-${this.database.name}`, callback);
50+
}
51+
4552
protected generateSyncStreamImplementation(
4653
connector: PowerSyncBackendConnector,
4754
options: RequiredAdditionalConnectionOptions

packages/web/src/db/PowerSyncDatabase.ts

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ import {
22
type BucketStorageAdapter,
33
type PowerSyncBackendConnector,
44
type PowerSyncCloseOptions,
5-
type PowerSyncConnectionOptions,
65
type RequiredAdditionalConnectionOptions,
76
AbstractPowerSyncDatabase,
87
DBAdapter,
@@ -172,16 +171,8 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
172171
});
173172
}
174173

175-
connect(connector: PowerSyncBackendConnector, options?: PowerSyncConnectionOptions): Promise<void> {
176-
/**
177-
* Using React strict mode might cause calls to connect to fire multiple times
178-
* Connect is wrapped inside a lock in order to prevent race conditions internally between multiple
179-
* connection attempts.
180-
*/
181-
return this.runExclusive(() => {
182-
this.options.logger?.debug('Attempting to connect to PowerSync instance');
183-
return super.connect(connector, options);
184-
});
174+
protected async connectExclusive(callback: () => Promise<void>): Promise<void> {
175+
await this.runExclusive(callback);
185176
}
186177

187178
protected generateBucketStorageAdapter(): BucketStorageAdapter {

0 commit comments

Comments
 (0)