Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
2e9df4b
improve queue behaviour
stevensJourney May 22, 2025
c4eccb8
cleanup
stevensJourney May 22, 2025
5ef22b1
simplify logic
stevensJourney May 22, 2025
f5df9dc
cleanup logic
stevensJourney May 26, 2025
1d155d9
Merge branch 'main' into connection-dequeue
stevensJourney May 26, 2025
be6b2b4
cleanup
stevensJourney May 26, 2025
828fae9
cleanup
stevensJourney May 26, 2025
8a911b7
Update packages/web/tests/stream.test.ts
stevensJourney May 26, 2025
601b484
cleanup common rollup
stevensJourney May 26, 2025
b234bca
cleanup listener
stevensJourney May 26, 2025
308bef4
cleanup
stevensJourney May 26, 2025
e561dda
use shared ConnectionManager in order to queue connections for multip…
stevensJourney May 28, 2025
4e4350f
cleanup connection manager
stevensJourney May 28, 2025
57289b7
remove only call
stevensJourney May 28, 2025
3d57e28
fix upload test
stevensJourney May 28, 2025
5074bb7
Merge branch 'main' into connection-dequeue
stevensJourney May 28, 2025
a0dfaf6
test timeout on ci
stevensJourney May 28, 2025
fe2bba9
stability improvements
stevensJourney May 28, 2025
9d37979
Fix reconnect bugs
stevensJourney May 29, 2025
bf881fc
code cleanup
stevensJourney May 29, 2025
b054bf1
Add code comments
stevensJourney May 29, 2025
5f62829
fix react native ios reconnect
stevensJourney May 29, 2025
eb4ef3b
Merge branch 'main' into connection-dequeue
stevensJourney May 29, 2025
5419661
fix typo
stevensJourney May 29, 2025
c165795
add logger to RN demo
stevensJourney May 29, 2025
b495ca7
fix multiple tab connection contention issue
stevensJourney Jun 2, 2025
73ce560
Merge branch 'main' into connection-dequeue
stevensJourney Jun 2, 2025
7659533
improve retry delay logic. Update diagnostics app error detection.
stevensJourney Jun 4, 2025
13a58e4
Report Sync errors in all pages. Fix bug where changes in sync errors…
stevensJourney Jun 4, 2025
641c9b4
update changeset
stevensJourney Jun 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/empty-pants-give.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
'@powersync/react-native': minor
'@powersync/common': minor
'@powersync/node': minor
'@powersync/web': minor
---

Improved behvaiour when connect is called multiple times in quick succession. Updating client parameters should now be more responsive.
176 changes: 156 additions & 20 deletions packages/common/src/client/AbstractPowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ import {
UpdateNotification,
isBatchedUpdateNotification
} from '../db/DBAdapter.js';
import { FULL_SYNC_PRIORITY } from '../db/crud/SyncProgress.js';
import { SyncPriorityStatus, SyncStatus } from '../db/crud/SyncStatus.js';
import { UploadQueueStats } from '../db/crud/UploadQueueStatus.js';
import { Schema } from '../db/schema/Schema.js';
import { BaseObserver } from '../utils/BaseObserver.js';
import { ControlledExecutor } from '../utils/ControlledExecutor.js';
import { mutexRunExclusive } from '../utils/mutex.js';
import { throttleTrailing } from '../utils/async.js';
import { mutexRunExclusive } from '../utils/mutex.js';
import { SQLOpenFactory, SQLOpenOptions, isDBAdapter, isSQLOpenFactory, isSQLOpenOptions } from './SQLOpenFactory.js';
import { PowerSyncBackendConnector } from './connection/PowerSyncBackendConnector.js';
import { runOnSchemaChange } from './runOnSchemaChange.js';
Expand All @@ -32,7 +33,6 @@ import {
type PowerSyncConnectionOptions,
type RequiredAdditionalConnectionOptions
} from './sync/stream/AbstractStreamingSyncImplementation.js';
import { FULL_SYNC_PRIORITY } from '../db/crud/SyncProgress.js';

export interface DisconnectAndClearOptions {
/** When set to false, data in local-only tables is preserved. */
Expand Down Expand Up @@ -112,6 +112,11 @@ export interface PowerSyncCloseOptions {
disconnect?: boolean;
}

type StoredConnectionOptions = {
connector: PowerSyncBackendConnector;
options: PowerSyncConnectionOptions;
};

const POWERSYNC_TABLE_MATCH = /(^ps_data__|^ps_data_local__)/;

const DEFAULT_DISCONNECT_CLEAR_OPTIONS: DisconnectAndClearOptions = {
Expand Down Expand Up @@ -176,6 +181,29 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB

private _database: DBAdapter;

/**
* Tracks active connection attempts
*/
protected connectingPromise: Promise<void> | null;
/**
* Tracks actively instantiating a streaming sync implementation.
*/
protected syncStreamInitPromise: Promise<void> | null;
/**
* Active disconnect operation. Calling disconnect multiple times
* will resolve to the same operation.
*/
protected disconnectingPromise: Promise<void> | null;
/**
* Tracks the last parameters supplied to `connect` calls.
* Calling `connect` multiple times in succession will result in:
* - 1 pending connection operation which will be aborted.
* - updating the last set of parameters while waiting for the pending
* attempt to be aborted
* - internally connecting with the last set of parameters
*/
protected pendingConnectionOptions: StoredConnectionOptions | null;

constructor(options: PowerSyncDatabaseOptionsWithDBAdapter);
constructor(options: PowerSyncDatabaseOptionsWithOpenFactory);
constructor(options: PowerSyncDatabaseOptionsWithSettings);
Expand Down Expand Up @@ -206,6 +234,9 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
this._schema = schema;
this.ready = false;
this.sdkVersion = '';
this.connectingPromise = null;
this.syncStreamInitPromise = null;
this.pendingConnectionOptions = null;
// Start async init
this._isReadyPromise = this.initialize();
}
Expand Down Expand Up @@ -425,34 +456,117 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
};
}

/**
* Locking mechanism for exclusively running critical portions of connect/disconnect operations.
* Locking here is mostly only important on web for multiple tab scenarios.
*/
protected abstract runExclusive<T>(callback: () => Promise<T>): Promise<T>;

protected async connectInternal() {
let appliedOptions: PowerSyncConnectionOptions | null = null;

// This method ensures a disconnect before any connection attempt
await this.disconnectInternal();

/**
* This portion creates a sync implementation which can be racy when disconnecting or
* if multiple tabs on web are in use.
* This is protected in an exclusive lock.
* The promise tracks the creation which is used to synchronize disconnect attempts.
*/
this.syncStreamInitPromise = this.runExclusive(async () => {
if (this.closed) {
throw new Error('Cannot connect using a closed client');
}

// Always await this if present since we will be populating a new sync implementation shortly
await this.disconnectingPromise;

if (!this.pendingConnectionOptions) {
// A disconnect could have cleared this.
return;
}
// get pending options and clear it in order for other connect attempts to queue other options
const { connector, options } = this.pendingConnectionOptions;
appliedOptions = options;
this.pendingConnectionOptions = null;

this.syncStreamImplementation = this.generateSyncStreamImplementation(
connector,
this.resolvedConnectionOptions(options)
);
this.syncStatusListenerDisposer = this.syncStreamImplementation.registerListener({
statusChanged: (status) => {
this.currentStatus = new SyncStatus({
...status.toJSON(),
hasSynced: this.currentStatus?.hasSynced || !!status.lastSyncedAt
});
this.iterateListeners((cb) => cb.statusChanged?.(this.currentStatus));
}
});

await this.syncStreamImplementation.waitForReady();
});

await this.syncStreamInitPromise;
this.syncStreamInitPromise = null;

if (!appliedOptions) {
// A disconnect could have cleared the options which did not create a syncStreamImplementation
return;
}

// It might be possible that a disconnect triggered between the last check
// and this point. Awaiting here allows the sync stream to be cleared if disconnected.
await this.disconnectingPromise;

this.syncStreamImplementation?.triggerCrudUpload();
this.options.logger?.debug('Attempting to connect to PowerSync instance');
await this.syncStreamImplementation?.connect(appliedOptions!);
}

/**
* Connects to stream of events from the PowerSync instance.
*/
async connect(connector: PowerSyncBackendConnector, options?: PowerSyncConnectionOptions) {
// Keep track if there were pending operations before this call
const hadPendingOptions = !!this.pendingConnectionOptions;

// Update pending options to the latest values
this.pendingConnectionOptions = {
connector,
options: options ?? {}
};

await this.waitForReady();

// close connection if one is open
await this.disconnect();
if (this.closed) {
throw new Error('Cannot connect using a closed client');
// Disconnecting here provides aborting in progress connection attempts.
// The connectInternal method will clear pending options once it starts connecting (with the options).
// We only need to trigger a disconnect here if we have already reached the point of connecting.
// If we do already have pending options, a disconnect has already been performed.
// The connectInternal method also does a sanity disconnect to prevent straggler connections.
if (!hadPendingOptions) {
await this.disconnectInternal();
}

const resolvedConnectOptions = this.resolvedConnectionOptions(options);

this.syncStreamImplementation = this.generateSyncStreamImplementation(connector, resolvedConnectOptions);
this.syncStatusListenerDisposer = this.syncStreamImplementation.registerListener({
statusChanged: (status) => {
this.currentStatus = new SyncStatus({
...status.toJSON(),
hasSynced: this.currentStatus?.hasSynced || !!status.lastSyncedAt
});
this.iterateListeners((cb) => cb.statusChanged?.(this.currentStatus));
// Triggers a connect which checks if pending options are available after the connect completes.
// The completion can be for a successful, unsuccessful or aborted connection attempt.
// If pending options are available another connection will be triggered.
const checkConnection = async (): Promise<void> => {
if (this.pendingConnectionOptions) {
// Pending options have been placed while connecting.
// Need to reconnect.
this.connectingPromise = this.connectInternal().finally(checkConnection);
return this.connectingPromise;
} else {
// Clear the connecting promise, done.
this.connectingPromise = null;
return;
}
});
};

await this.syncStreamImplementation.waitForReady();
this.syncStreamImplementation.triggerCrudUpload();
await this.syncStreamImplementation.connect(options);
this.connectingPromise ??= this.connectInternal().finally(checkConnection);
return this.connectingPromise;
}

/**
Expand All @@ -462,6 +576,28 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
*/
async disconnect() {
await this.waitForReady();
// This will help abort pending connects
this.pendingConnectionOptions = null;
await this.disconnectInternal();
}

protected async disconnectInternal() {
if (this.disconnectingPromise) {
// A disconnect is already in progress
return this.disconnectingPromise;
}

// Wait if a sync stream implementation is being created before closing it
// (syncStreamImplementation must be assigned before we can properly dispose it)
await this.syncStreamInitPromise;

this.disconnectingPromise = this.performDisconnect();

await this.disconnectingPromise;
this.disconnectingPromise = null;
}

protected async performDisconnect() {
await this.syncStreamImplementation?.disconnect();
this.syncStatusListenerDisposer?.();
await this.syncStreamImplementation?.dispose();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import Logger, { ILogger } from 'js-logger';

import { InternalProgressInformation } from 'src/db/crud/SyncProgress.js';
import { DataStream } from 'src/utils/DataStream.js';
import { SyncStatus, SyncStatusOptions } from '../../../db/crud/SyncStatus.js';
import { AbortOperation } from '../../../utils/AbortOperation.js';
import { BaseListener, BaseObserver, Disposable } from '../../../utils/BaseObserver.js';
import { onAbortPromise, throttleLeadingTrailing } from '../../../utils/async.js';
import { BucketChecksum, BucketDescription, BucketStorageAdapter, Checkpoint } from '../bucket/BucketStorageAdapter.js';
import { CrudEntry } from '../bucket/CrudEntry.js';
import { SyncDataBucket } from '../bucket/SyncDataBucket.js';
import { AbstractRemote, SyncStreamOptions, FetchStrategy } from './AbstractRemote.js';
import { AbstractRemote, FetchStrategy, SyncStreamOptions } from './AbstractRemote.js';
import {
BucketRequest,
StreamingSyncLine,
Expand All @@ -19,8 +21,6 @@ import {
isStreamingSyncCheckpointPartiallyComplete,
isStreamingSyncData
} from './streaming-sync-types.js';
import { DataStream } from 'src/utils/DataStream.js';
import { InternalProgressInformation } from 'src/db/crud/SyncProgress.js';

export enum LockType {
CRUD = 'crud',
Expand Down Expand Up @@ -341,12 +341,13 @@ The next upload iteration will be delayed.`);
await this.disconnect();
}

this.abortController = new AbortController();
const controller = new AbortController();
this.abortController = controller;
this.streamingSyncPromise = this.streamingSync(this.abortController.signal, options);

// Return a promise that resolves when the connection status is updated
return new Promise<void>((resolve) => {
const l = this.registerListener({
const disposer = this.registerListener({
statusUpdated: (update) => {
// This is triggered as soon as a connection is read from
if (typeof update.connected == 'undefined') {
Expand All @@ -356,13 +357,15 @@ The next upload iteration will be delayed.`);

if (update.connected == false) {
/**
* This function does not reject if initial connect attempt failed
* This function does not reject if initial connect attempt failed.
* Connected can be false if the connection attempt was aborted or if the initial connection
* attempt failed.
*/
this.logger.warn('Initial connect attempt did not successfully connect to server');
}

disposer();
resolve();
l();
}
});
});
Expand Down Expand Up @@ -520,6 +523,10 @@ The next upload iteration will be delayed.`);

const clientId = await this.options.adapter.getClientId();

if (signal.aborted) {
return;
}

this.logger.debug('Requesting stream from server');

const syncOptions: SyncStreamOptions = {
Expand Down
12 changes: 9 additions & 3 deletions packages/node/src/db/PowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,21 @@ import {
AdditionalConnectionOptions,
BucketStorageAdapter,
DBAdapter,
DEFAULT_REMOTE_LOGGER,
PowerSyncBackendConnector,
PowerSyncConnectionOptions,
PowerSyncDatabaseOptions,
PowerSyncDatabaseOptionsWithSettings,
RequiredAdditionalConnectionOptions,
SqliteBucketStorage,
SQLOpenFactory
} from '@powersync/common';

import { NodeRemote } from '../sync/stream/NodeRemote.js';
import { NodeStreamingSyncImplementation } from '../sync/stream/NodeStreamingSyncImplementation.js';

import Lock from 'async-lock';
import { Dispatcher } from 'undici';
import { BetterSQLite3DBAdapter } from './BetterSQLite3DBAdapter.js';
import { NodeSQLOpenOptions } from './options.js';
import { Dispatcher } from 'undici';

export type NodePowerSyncDatabaseOptions = PowerSyncDatabaseOptions & {
database: DBAdapter | SQLOpenFactory | NodeSQLOpenOptions;
Expand Down Expand Up @@ -57,8 +56,11 @@ export type NodePowerSyncConnectionOptions = PowerSyncConnectionOptions & NodeAd
* ```
*/
export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
protected lock: Lock;

constructor(options: NodePowerSyncDatabaseOptions) {
super(options);
this.lock = new Lock();
}

async _initialize(): Promise<void> {
Expand All @@ -83,6 +85,10 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
return super.connect(connector, options);
}

protected async runExclusive<T>(callback: () => Promise<T>): Promise<T> {
return await this.lock.acquire(`lock-${this.database.name}`, callback);
}

protected generateSyncStreamImplementation(
connector: PowerSyncBackendConnector,
options: NodeAdditionalConnectionOptions
Expand Down
7 changes: 7 additions & 0 deletions packages/react-native/src/db/PowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
type RequiredAdditionalConnectionOptions,
SqliteBucketStorage
} from '@powersync/common';
import Lock from 'async-lock';
import { ReactNativeRemote } from '../sync/stream/ReactNativeRemote';
import { ReactNativeStreamingSyncImplementation } from '../sync/stream/ReactNativeStreamingSyncImplementation';
import { ReactNativeQuickSqliteOpenFactory } from './adapters/react-native-quick-sqlite/ReactNativeQuickSQLiteOpenFactory';
Expand All @@ -27,6 +28,8 @@ import { ReactNativeQuickSqliteOpenFactory } from './adapters/react-native-quick
* ```
*/
export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
protected lock = new Lock();

async _initialize(): Promise<void> {}

/**
Expand All @@ -42,6 +45,10 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
return new SqliteBucketStorage(this.database, AbstractPowerSyncDatabase.transactionMutex);
}

protected async runExclusive<T>(callback: () => Promise<T>): Promise<T> {
return await this.lock.acquire(`lock-${this.database.name}`, callback);
}

protected generateSyncStreamImplementation(
connector: PowerSyncBackendConnector,
options: RequiredAdditionalConnectionOptions
Expand Down
Loading