diff --git a/.changeset/cyan-penguins-smell.md b/.changeset/cyan-penguins-smell.md new file mode 100644 index 000000000..ac7428965 --- /dev/null +++ b/.changeset/cyan-penguins-smell.md @@ -0,0 +1,5 @@ +--- +'@powersync/react-native': minor +--- + +Fixed issue where iOS WebSockets could fail to reconnect after a connection issue. diff --git a/.changeset/empty-pants-give.md b/.changeset/empty-pants-give.md new file mode 100644 index 000000000..ccf2b5c00 --- /dev/null +++ b/.changeset/empty-pants-give.md @@ -0,0 +1,8 @@ +--- +'@powersync/react-native': minor +'@powersync/common': minor +'@powersync/node': minor +'@powersync/web': minor +--- + +Improved behaviour when connect is called multiple times in quick succession. Updating client parameters should now be more responsive. diff --git a/.changeset/fuzzy-beers-occur.md b/.changeset/fuzzy-beers-occur.md new file mode 100644 index 000000000..295b64f23 --- /dev/null +++ b/.changeset/fuzzy-beers-occur.md @@ -0,0 +1,7 @@ +--- +'@powersync/diagnostics-app': minor +--- + +- Added Sync error alert banner to all views. +- Fix bug where clicking signOut would not disconnect from the PowerSync service. +- Updated implementation to fetch sync errors from the SyncStatus. diff --git a/.changeset/fuzzy-ties-double.md b/.changeset/fuzzy-ties-double.md new file mode 100644 index 000000000..3a0b27c1d --- /dev/null +++ b/.changeset/fuzzy-ties-double.md @@ -0,0 +1,5 @@ +--- +'@powersync/common': patch +--- + +Fixed bug where changes in SyncStatus downloadError and uploadError might not be reported. diff --git a/demos/react-native-supabase-todolist/library/powersync/system.ts b/demos/react-native-supabase-todolist/library/powersync/system.ts index 6ca46a861..8f1c2713e 100644 --- a/demos/react-native-supabase-todolist/library/powersync/system.ts +++ b/demos/react-native-supabase-todolist/library/powersync/system.ts @@ -5,16 +5,16 @@ import React from 'react'; import { SupabaseStorageAdapter } from '../storage/SupabaseStorageAdapter'; import { type AttachmentRecord } from '@powersync/attachments'; +import { configureFts } from '../fts/fts_setup'; import { KVStorage } from '../storage/KVStorage'; import { AppConfig } from '../supabase/AppConfig'; import { SupabaseConnector } from '../supabase/SupabaseConnector'; import { AppSchema } from './AppSchema'; import { PhotoAttachmentQueue } from './PhotoAttachmentQueue'; -import { configureFts } from '../fts/fts_setup'; const logger = createBaseLogger(); logger.useDefaults(); -logger.setLevel(LogLevel.INFO); +logger.setLevel(LogLevel.DEBUG); export class System { kvStorage: KVStorage; @@ -31,19 +31,22 @@ export class System { schema: AppSchema, database: { dbFilename: 'sqlite.db' - } + }, + logger }); /** * The snippet below uses OP-SQLite as the default database adapter. * You will have to uninstall `@journeyapps/react-native-quick-sqlite` and * install both `@powersync/op-sqlite` and `@op-engineering/op-sqlite` to use this. * + * ```typescript * import { OPSqliteOpenFactory } from '@powersync/op-sqlite'; // Add this import * * const factory = new OPSqliteOpenFactory({ - * dbFilename: 'sqlite.db' + * dbFilename: 'sqlite.db' * }); * this.powersync = new PowerSyncDatabase({ database: factory, schema: AppSchema }); + * ``` */ if (AppConfig.supabaseBucket) { diff --git a/packages/common/rollup.config.mjs b/packages/common/rollup.config.mjs index ad05faf6a..1fc8fedd9 100644 --- a/packages/common/rollup.config.mjs +++ b/packages/common/rollup.config.mjs @@ -4,8 +4,11 @@ import json from '@rollup/plugin-json'; import nodeResolve from '@rollup/plugin-node-resolve'; import terser from '@rollup/plugin-terser'; +/** + * @returns {import('rollup').RollupOptions} + */ export default (commandLineArgs) => { - const sourcemap = (commandLineArgs.sourceMap || 'true') == 'true'; + const sourceMap = (commandLineArgs.sourceMap || 'true') == 'true'; // Clears rollup CLI warning https://github.com/rollup/rollup/issues/2694 delete commandLineArgs.sourceMap; @@ -15,7 +18,7 @@ export default (commandLineArgs) => { output: { file: 'dist/bundle.mjs', format: 'esm', - sourcemap: sourcemap + sourcemap: sourceMap }, plugins: [ json(), @@ -27,7 +30,7 @@ export default (commandLineArgs) => { // Used by can-ndjson-stream TextDecoder: ['text-encoding', 'TextDecoder'] }), - terser() + terser({ sourceMap }) ], // This makes life easier external: [ diff --git a/packages/common/src/client/AbstractPowerSyncDatabase.ts b/packages/common/src/client/AbstractPowerSyncDatabase.ts index c78912c99..590747969 100644 --- a/packages/common/src/client/AbstractPowerSyncDatabase.ts +++ b/packages/common/src/client/AbstractPowerSyncDatabase.ts @@ -9,13 +9,15 @@ import { UpdateNotification, isBatchedUpdateNotification } from '../db/DBAdapter.js'; +import { FULL_SYNC_PRIORITY } from '../db/crud/SyncProgress.js'; import { SyncPriorityStatus, SyncStatus } from '../db/crud/SyncStatus.js'; import { UploadQueueStats } from '../db/crud/UploadQueueStatus.js'; import { Schema } from '../db/schema/Schema.js'; import { BaseObserver } from '../utils/BaseObserver.js'; import { ControlledExecutor } from '../utils/ControlledExecutor.js'; -import { mutexRunExclusive } from '../utils/mutex.js'; import { throttleTrailing } from '../utils/async.js'; +import { mutexRunExclusive } from '../utils/mutex.js'; +import { ConnectionManager } from './ConnectionManager.js'; import { SQLOpenFactory, SQLOpenOptions, isDBAdapter, isSQLOpenFactory, isSQLOpenOptions } from './SQLOpenFactory.js'; import { PowerSyncBackendConnector } from './connection/PowerSyncBackendConnector.js'; import { runOnSchemaChange } from './runOnSchemaChange.js'; @@ -32,7 +34,6 @@ import { type PowerSyncConnectionOptions, type RequiredAdditionalConnectionOptions } from './sync/stream/AbstractStreamingSyncImplementation.js'; -import { FULL_SYNC_PRIORITY } from '../db/crud/SyncProgress.js'; export interface DisconnectAndClearOptions { /** When set to false, data in local-only tables is preserved. */ @@ -165,17 +166,22 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver void; protected _isReadyPromise: Promise; + protected connectionManager: ConnectionManager; + + get syncStreamImplementation() { + return this.connectionManager.syncStreamImplementation; + } protected _schema: Schema; private _database: DBAdapter; + protected runExclusiveMutex: Mutex; + constructor(options: PowerSyncDatabaseOptionsWithDBAdapter); constructor(options: PowerSyncDatabaseOptionsWithOpenFactory); constructor(options: PowerSyncDatabaseOptionsWithSettings); @@ -206,7 +212,33 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver { + await this.waitForReady(); + + return this.runExclusive(async () => { + const sync = this.generateSyncStreamImplementation(connector, this.resolvedConnectionOptions(options)); + const onDispose = sync.registerListener({ + statusChanged: (status) => { + this.currentStatus = new SyncStatus({ + ...status.toJSON(), + hasSynced: this.currentStatus?.hasSynced || !!status.lastSyncedAt + }); + this.iterateListeners((cb) => cb.statusChanged?.(this.currentStatus)); + } + }); + await sync.waitForReady(); + + return { + sync, + onDispose + }; + }); + }, + logger: this.logger + }); this._isReadyPromise = this.initialize(); } @@ -425,34 +457,19 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver(callback: () => Promise): Promise { + return this.runExclusiveMutex.runExclusive(callback); + } + /** * Connects to stream of events from the PowerSync instance. */ async connect(connector: PowerSyncBackendConnector, options?: PowerSyncConnectionOptions) { - await this.waitForReady(); - - // close connection if one is open - await this.disconnect(); - if (this.closed) { - throw new Error('Cannot connect using a closed client'); - } - - const resolvedConnectOptions = this.resolvedConnectionOptions(options); - - this.syncStreamImplementation = this.generateSyncStreamImplementation(connector, resolvedConnectOptions); - this.syncStatusListenerDisposer = this.syncStreamImplementation.registerListener({ - statusChanged: (status) => { - this.currentStatus = new SyncStatus({ - ...status.toJSON(), - hasSynced: this.currentStatus?.hasSynced || !!status.lastSyncedAt - }); - this.iterateListeners((cb) => cb.statusChanged?.(this.currentStatus)); - } - }); - - await this.syncStreamImplementation.waitForReady(); - this.syncStreamImplementation.triggerCrudUpload(); - await this.syncStreamImplementation.connect(options); + return this.connectionManager.connect(connector, options); } /** @@ -461,11 +478,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver Promise | void; +} + +/** + * @internal + */ +export interface ConnectionManagerOptions { + createSyncImplementation( + connector: PowerSyncBackendConnector, + options: PowerSyncConnectionOptions + ): Promise; + logger: ILogger; +} + +type StoredConnectionOptions = { + connector: PowerSyncBackendConnector; + options: PowerSyncConnectionOptions; +}; + +/** + * @internal + */ +export interface ConnectionManagerListener extends BaseListener { + syncStreamCreated: (sync: StreamingSyncImplementation) => void; +} + +/** + * @internal + */ +export class ConnectionManager extends BaseObserver { + /** + * Tracks active connection attempts + */ + protected connectingPromise: Promise | null; + /** + * Tracks actively instantiating a streaming sync implementation. + */ + protected syncStreamInitPromise: Promise | null; + /** + * Active disconnect operation. Calling disconnect multiple times + * will resolve to the same operation. + */ + protected disconnectingPromise: Promise | null; + /** + * Tracks the last parameters supplied to `connect` calls. + * Calling `connect` multiple times in succession will result in: + * - 1 pending connection operation which will be aborted. + * - updating the last set of parameters while waiting for the pending + * attempt to be aborted + * - internally connecting with the last set of parameters + */ + protected pendingConnectionOptions: StoredConnectionOptions | null; + + syncStreamImplementation: StreamingSyncImplementation | null; + + /** + * Additional cleanup function which is called after the sync stream implementation + * is disposed. + */ + protected syncDisposer: (() => Promise | void) | null; + + constructor(protected options: ConnectionManagerOptions) { + super(); + this.connectingPromise = null; + this.syncStreamInitPromise = null; + this.disconnectingPromise = null; + this.pendingConnectionOptions = null; + this.syncStreamImplementation = null; + this.syncDisposer = null; + } + + get logger() { + return this.options.logger; + } + + async close() { + await this.syncStreamImplementation?.dispose(); + await this.syncDisposer?.(); + } + + async connect(connector: PowerSyncBackendConnector, options?: PowerSyncConnectionOptions) { + // Keep track if there were pending operations before this call + const hadPendingOptions = !!this.pendingConnectionOptions; + + // Update pending options to the latest values + this.pendingConnectionOptions = { + connector, + options: options ?? {} + }; + + // Disconnecting here provides aborting in progress connection attempts. + // The connectInternal method will clear pending options once it starts connecting (with the options). + // We only need to trigger a disconnect here if we have already reached the point of connecting. + // If we do already have pending options, a disconnect has already been performed. + // The connectInternal method also does a sanity disconnect to prevent straggler connections. + // We should also disconnect if we have already completed a connection attempt. + if (!hadPendingOptions || this.syncStreamImplementation) { + await this.disconnectInternal(); + } + + // Triggers a connect which checks if pending options are available after the connect completes. + // The completion can be for a successful, unsuccessful or aborted connection attempt. + // If pending options are available another connection will be triggered. + const checkConnection = async (): Promise => { + if (this.pendingConnectionOptions) { + // Pending options have been placed while connecting. + // Need to reconnect. + this.connectingPromise = this.connectInternal() + .catch(() => {}) + .finally(checkConnection); + return this.connectingPromise; + } else { + // Clear the connecting promise, done. + this.connectingPromise = null; + return; + } + }; + + this.connectingPromise ??= this.connectInternal() + .catch(() => {}) + .finally(checkConnection); + return this.connectingPromise; + } + + protected async connectInternal() { + let appliedOptions: PowerSyncConnectionOptions | null = null; + + // This method ensures a disconnect before any connection attempt + await this.disconnectInternal(); + + /** + * This portion creates a sync implementation which can be racy when disconnecting or + * if multiple tabs on web are in use. + * This is protected in an exclusive lock. + * The promise tracks the creation which is used to synchronize disconnect attempts. + */ + this.syncStreamInitPromise = new Promise(async (resolve, reject) => { + try { + if (!this.pendingConnectionOptions) { + this.logger.debug('No pending connection options found, not creating sync stream implementation'); + // A disconnect could have cleared this. + resolve(); + return; + } + + if (this.disconnectingPromise) { + resolve(); + return; + } + + const { connector, options } = this.pendingConnectionOptions; + appliedOptions = options; + + this.pendingConnectionOptions = null; + const { sync, onDispose } = await this.options.createSyncImplementation(connector, options); + this.iterateListeners((l) => l.syncStreamCreated?.(sync)); + this.syncStreamImplementation = sync; + this.syncDisposer = onDispose; + await this.syncStreamImplementation.waitForReady(); + resolve(); + } catch (error) { + reject(error); + } + }); + + await this.syncStreamInitPromise; + this.syncStreamInitPromise = null; + + if (!appliedOptions) { + // A disconnect could have cleared the options which did not create a syncStreamImplementation + return; + } + + // It might be possible that a disconnect triggered between the last check + // and this point. Awaiting here allows the sync stream to be cleared if disconnected. + await this.disconnectingPromise; + + this.logger.debug('Attempting to connect to PowerSync instance'); + await this.syncStreamImplementation?.connect(appliedOptions!); + this.syncStreamImplementation?.triggerCrudUpload(); + } + + /** + * Close the sync connection. + * + * Use {@link connect} to connect again. + */ + async disconnect() { + // This will help abort pending connects + this.pendingConnectionOptions = null; + await this.disconnectInternal(); + } + + protected async disconnectInternal(): Promise { + if (this.disconnectingPromise) { + // A disconnect is already in progress + return this.disconnectingPromise; + } + + this.disconnectingPromise = this.performDisconnect(); + + await this.disconnectingPromise; + this.disconnectingPromise = null; + } + + protected async performDisconnect() { + // Wait if a sync stream implementation is being created before closing it + // (syncStreamImplementation must be assigned before we can properly dispose it) + await this.syncStreamInitPromise; + + // Keep reference to the sync stream implementation and disposer + // The class members will be cleared before we trigger the disconnect + // to prevent any further calls to the sync stream implementation. + const sync = this.syncStreamImplementation; + this.syncStreamImplementation = null; + const disposer = this.syncDisposer; + this.syncDisposer = null; + + await sync?.disconnect(); + await sync?.dispose(); + await disposer?.(); + } +} diff --git a/packages/common/src/client/sync/stream/AbstractRemote.ts b/packages/common/src/client/sync/stream/AbstractRemote.ts index 1d180e6fe..0ccc8503d 100644 --- a/packages/common/src/client/sync/stream/AbstractRemote.ts +++ b/packages/common/src/client/sync/stream/AbstractRemote.ts @@ -4,12 +4,12 @@ import ndjsonStream from 'can-ndjson-stream'; import { type fetch } from 'cross-fetch'; import Logger, { ILogger } from 'js-logger'; import { RSocket, RSocketConnector, Requestable } from 'rsocket-core'; -import { WebsocketClientTransport } from 'rsocket-websocket-client'; import PACKAGE from '../../../../package.json' with { type: 'json' }; import { AbortOperation } from '../../../utils/AbortOperation.js'; import { DataStream } from '../../../utils/DataStream.js'; import { PowerSyncCredentials } from '../../connection/PowerSyncCredentials.js'; import { StreamingSyncLine, StreamingSyncRequest } from './streaming-sync-types.js'; +import { WebsocketClientTransport } from './WebsocketClientTransport.js'; export type BSONImplementation = typeof BSON; diff --git a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts index ca0370f21..9997da2f9 100644 --- a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts +++ b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts @@ -1,5 +1,7 @@ import Logger, { ILogger } from 'js-logger'; +import { InternalProgressInformation } from 'src/db/crud/SyncProgress.js'; +import { DataStream } from 'src/utils/DataStream.js'; import { SyncStatus, SyncStatusOptions } from '../../../db/crud/SyncStatus.js'; import { AbortOperation } from '../../../utils/AbortOperation.js'; import { BaseListener, BaseObserver, Disposable } from '../../../utils/BaseObserver.js'; @@ -7,7 +9,7 @@ import { onAbortPromise, throttleLeadingTrailing } from '../../../utils/async.js import { BucketChecksum, BucketDescription, BucketStorageAdapter, Checkpoint } from '../bucket/BucketStorageAdapter.js'; import { CrudEntry } from '../bucket/CrudEntry.js'; import { SyncDataBucket } from '../bucket/SyncDataBucket.js'; -import { AbstractRemote, SyncStreamOptions, FetchStrategy } from './AbstractRemote.js'; +import { AbstractRemote, FetchStrategy, SyncStreamOptions } from './AbstractRemote.js'; import { BucketRequest, StreamingSyncLine, @@ -19,8 +21,6 @@ import { isStreamingSyncCheckpointPartiallyComplete, isStreamingSyncData } from './streaming-sync-types.js'; -import { DataStream } from 'src/utils/DataStream.js'; -import { InternalProgressInformation } from 'src/db/crud/SyncProgress.js'; export enum LockType { CRUD = 'crud', @@ -341,12 +341,13 @@ The next upload iteration will be delayed.`); await this.disconnect(); } - this.abortController = new AbortController(); + const controller = new AbortController(); + this.abortController = controller; this.streamingSyncPromise = this.streamingSync(this.abortController.signal, options); // Return a promise that resolves when the connection status is updated return new Promise((resolve) => { - const l = this.registerListener({ + const disposer = this.registerListener({ statusUpdated: (update) => { // This is triggered as soon as a connection is read from if (typeof update.connected == 'undefined') { @@ -356,13 +357,15 @@ The next upload iteration will be delayed.`); if (update.connected == false) { /** - * This function does not reject if initial connect attempt failed + * This function does not reject if initial connect attempt failed. + * Connected can be false if the connection attempt was aborted or if the initial connection + * attempt failed. */ this.logger.warn('Initial connect attempt did not successfully connect to server'); } + disposer(); resolve(); - l(); } }); }); @@ -439,6 +442,7 @@ The next upload iteration will be delayed.`); */ while (true) { this.updateSyncStatus({ connecting: true }); + let shouldDelayRetry = true; try { if (signal?.aborted) { break; @@ -450,12 +454,16 @@ The next upload iteration will be delayed.`); * Either: * - A network request failed with a failed connection or not OKAY response code. * - There was a sync processing error. - * This loop will retry. + * - The connection was aborted. + * This loop will retry after a delay if the connection was not aborted. * The nested abort controller will cleanup any open network requests and streams. * The WebRemote should only abort pending fetch requests or close active Readable streams. */ + if (ex instanceof AbortOperation) { this.logger.warn(ex); + shouldDelayRetry = false; + // A disconnect was requested, we should not delay since there is no explicit retry } else { this.logger.error(ex); } @@ -465,9 +473,6 @@ The next upload iteration will be delayed.`); downloadError: ex } }); - - // On error, wait a little before retrying - await this.delayRetry(); } finally { if (!signal.aborted) { nestedAbortController.abort(new AbortOperation('Closing sync stream network requests before retry.')); @@ -478,6 +483,11 @@ The next upload iteration will be delayed.`); connected: false, connecting: true // May be unnecessary }); + + // On error, wait a little before retrying + if (shouldDelayRetry) { + await this.delayRetry(nestedAbortController.signal); + } } } @@ -520,6 +530,10 @@ The next upload iteration will be delayed.`); const clientId = await this.options.adapter.getClientId(); + if (signal.aborted) { + return; + } + this.logger.debug('Requesting stream from server'); const syncOptions: SyncStreamOptions = { @@ -841,7 +855,29 @@ The next upload iteration will be delayed.`); this.iterateListeners((cb) => cb.statusUpdated?.(options)); } - private async delayRetry() { - return new Promise((resolve) => setTimeout(resolve, this.options.retryDelayMs)); + private async delayRetry(signal?: AbortSignal): Promise { + return new Promise((resolve) => { + if (signal?.aborted) { + // If the signal is already aborted, resolve immediately + resolve(); + return; + } + + const { retryDelayMs } = this.options; + + let timeoutId: ReturnType | undefined; + + const endDelay = () => { + resolve(); + if (timeoutId) { + clearTimeout(timeoutId); + timeoutId = undefined; + } + signal?.removeEventListener('abort', endDelay); + }; + + signal?.addEventListener('abort', endDelay, { once: true }); + timeoutId = setTimeout(endDelay, retryDelayMs); + }); } } diff --git a/packages/common/src/client/sync/stream/WebsocketClientTransport.ts b/packages/common/src/client/sync/stream/WebsocketClientTransport.ts new file mode 100644 index 000000000..55a09c863 --- /dev/null +++ b/packages/common/src/client/sync/stream/WebsocketClientTransport.ts @@ -0,0 +1,70 @@ +/** + * Adapted from rsocket-websocket-client + * https://github.com/rsocket/rsocket-js/blob/e224cf379e747c4f1ddc4f2fa111854626cc8575/packages/rsocket-websocket-client/src/WebsocketClientTransport.ts#L17 + * This adds additional error handling for React Native iOS. + * This particularly adds a close listener to handle cases where the WebSocket + * connection closes immediately after opening without emitting an error. + */ +import { + ClientTransport, + Closeable, + Demultiplexer, + Deserializer, + DuplexConnection, + FrameHandler, + Multiplexer, + Outbound +} from 'rsocket-core'; +import { ClientOptions } from 'rsocket-websocket-client'; +import { WebsocketDuplexConnection } from 'rsocket-websocket-client/dist/WebsocketDuplexConnection.js'; + +export class WebsocketClientTransport implements ClientTransport { + private readonly url: string; + private readonly factory: (url: string) => WebSocket; + + constructor(options: ClientOptions) { + this.url = options.url; + this.factory = options.wsCreator ?? ((url: string) => new WebSocket(url)); + } + + connect( + multiplexerDemultiplexerFactory: (outbound: Outbound & Closeable) => Multiplexer & Demultiplexer & FrameHandler + ): Promise { + return new Promise((resolve, reject) => { + const websocket = this.factory(this.url); + + websocket.binaryType = 'arraybuffer'; + + let removeListeners: () => void; + + const openListener = () => { + removeListeners(); + resolve(new WebsocketDuplexConnection(websocket, new Deserializer(), multiplexerDemultiplexerFactory)); + }; + + const errorListener = (ev: ErrorEvent) => { + removeListeners(); + reject(ev.error); + }; + + /** + * In some cases, such as React Native iOS, the WebSocket connection may close immediately after opening + * without and error. In such cases, we need to handle the close event to reject the promise. + */ + const closeListener = () => { + removeListeners(); + reject(new Error('WebSocket connection closed while opening')); + }; + + removeListeners = () => { + websocket.removeEventListener('open', openListener); + websocket.removeEventListener('error', errorListener); + websocket.removeEventListener('close', closeListener); + }; + + websocket.addEventListener('open', openListener); + websocket.addEventListener('error', errorListener); + websocket.addEventListener('close', closeListener); + }); + } +} diff --git a/packages/common/src/db/crud/SyncStatus.ts b/packages/common/src/db/crud/SyncStatus.ts index f01211eaa..775c9fa36 100644 --- a/packages/common/src/db/crud/SyncStatus.ts +++ b/packages/common/src/db/crud/SyncStatus.ts @@ -171,7 +171,22 @@ export class SyncStatus { * @returns {boolean} True if the instances are considered equal, false otherwise */ isEqual(status: SyncStatus) { - return JSON.stringify(this.options) == JSON.stringify(status.options); + /** + * By default Error object are serialized to an empty object. + * This replaces Errors with more useful information before serialization. + */ + const replacer = (_: string, value: any) => { + if (value instanceof Error) { + return { + name: value.name, + message: value.message, + stack: value.stack + }; + } + return value; + }; + + return JSON.stringify(this.options, replacer) == JSON.stringify(status.options, replacer); } /** diff --git a/packages/common/src/index.ts b/packages/common/src/index.ts index 96fdb08c1..44203a2b9 100644 --- a/packages/common/src/index.ts +++ b/packages/common/src/index.ts @@ -1,36 +1,35 @@ export * from './client/AbstractPowerSyncDatabase.js'; export * from './client/AbstractPowerSyncOpenFactory.js'; -export * from './client/SQLOpenFactory.js'; +export { compilableQueryWatch, CompilableQueryWatchHandler } from './client/compilableQueryWatch.js'; export * from './client/connection/PowerSyncBackendConnector.js'; export * from './client/connection/PowerSyncCredentials.js'; -export * from './client/sync/bucket/BucketStorageAdapter.js'; +export { MAX_OP_ID } from './client/constants.js'; export { runOnSchemaChange } from './client/runOnSchemaChange.js'; -export { CompilableQueryWatchHandler, compilableQueryWatch } from './client/compilableQueryWatch.js'; -export { UpdateType, CrudEntry, OpId } from './client/sync/bucket/CrudEntry.js'; -export * from './client/sync/bucket/SqliteBucketStorage.js'; +export * from './client/SQLOpenFactory.js'; +export * from './client/sync/bucket/BucketStorageAdapter.js'; export * from './client/sync/bucket/CrudBatch.js'; +export { CrudEntry, OpId, UpdateType } from './client/sync/bucket/CrudEntry.js'; export * from './client/sync/bucket/CrudTransaction.js'; +export * from './client/sync/bucket/OplogEntry.js'; +export * from './client/sync/bucket/OpType.js'; +export * from './client/sync/bucket/SqliteBucketStorage.js'; export * from './client/sync/bucket/SyncDataBatch.js'; export * from './client/sync/bucket/SyncDataBucket.js'; -export * from './client/sync/bucket/OpType.js'; -export * from './client/sync/bucket/OplogEntry.js'; export * from './client/sync/stream/AbstractRemote.js'; export * from './client/sync/stream/AbstractStreamingSyncImplementation.js'; export * from './client/sync/stream/streaming-sync-types.js'; -export { MAX_OP_ID } from './client/constants.js'; +export * from './client/ConnectionManager.js'; export { ProgressWithOperations, SyncProgress } from './db/crud/SyncProgress.js'; export * from './db/crud/SyncStatus.js'; export * from './db/crud/UploadQueueStatus.js'; -export * from './db/schema/Schema.js'; -export * from './db/schema/Table.js'; +export * from './db/DBAdapter.js'; +export * from './db/schema/Column.js'; export * from './db/schema/Index.js'; export * from './db/schema/IndexedColumn.js'; -export * from './db/schema/Column.js'; +export * from './db/schema/Schema.js'; +export * from './db/schema/Table.js'; export * from './db/schema/TableV2.js'; -export * from './db/crud/SyncStatus.js'; -export * from './db/crud/UploadQueueStatus.js'; -export * from './db/DBAdapter.js'; export * from './utils/AbortOperation.js'; export * from './utils/BaseObserver.js'; diff --git a/packages/node/src/db/PowerSyncDatabase.ts b/packages/node/src/db/PowerSyncDatabase.ts index bbabad46e..0c4501678 100644 --- a/packages/node/src/db/PowerSyncDatabase.ts +++ b/packages/node/src/db/PowerSyncDatabase.ts @@ -5,12 +5,10 @@ import { AdditionalConnectionOptions, BucketStorageAdapter, DBAdapter, - DEFAULT_REMOTE_LOGGER, PowerSyncBackendConnector, PowerSyncConnectionOptions, PowerSyncDatabaseOptions, PowerSyncDatabaseOptionsWithSettings, - RequiredAdditionalConnectionOptions, SqliteBucketStorage, SQLOpenFactory } from '@powersync/common'; @@ -18,9 +16,9 @@ import { import { NodeRemote } from '../sync/stream/NodeRemote.js'; import { NodeStreamingSyncImplementation } from '../sync/stream/NodeStreamingSyncImplementation.js'; +import { Dispatcher } from 'undici'; import { BetterSQLite3DBAdapter } from './BetterSQLite3DBAdapter.js'; import { NodeSQLOpenOptions } from './options.js'; -import { Dispatcher } from 'undici'; export type NodePowerSyncDatabaseOptions = PowerSyncDatabaseOptions & { database: DBAdapter | SQLOpenFactory | NodeSQLOpenOptions; diff --git a/packages/react-native/rollup.config.mjs b/packages/react-native/rollup.config.mjs index 9a180747c..d2ee7e7fa 100644 --- a/packages/react-native/rollup.config.mjs +++ b/packages/react-native/rollup.config.mjs @@ -54,7 +54,7 @@ export default (commandLineArgs) => { } ] }), - terser() + terser({ sourceMap: sourcemap }) ], external: [ '@journeyapps/react-native-quick-sqlite', diff --git a/packages/web/src/db/PowerSyncDatabase.ts b/packages/web/src/db/PowerSyncDatabase.ts index 16001712e..8351b4f1b 100644 --- a/packages/web/src/db/PowerSyncDatabase.ts +++ b/packages/web/src/db/PowerSyncDatabase.ts @@ -2,7 +2,6 @@ import { type BucketStorageAdapter, type PowerSyncBackendConnector, type PowerSyncCloseOptions, - type PowerSyncConnectionOptions, type RequiredAdditionalConnectionOptions, AbstractPowerSyncDatabase, DBAdapter, @@ -172,23 +171,11 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase { }); } - connect(connector: PowerSyncBackendConnector, options?: PowerSyncConnectionOptions): Promise { - /** - * Using React strict mode might cause calls to connect to fire multiple times - * Connect is wrapped inside a lock in order to prevent race conditions internally between multiple - * connection attempts. - */ - return this.runExclusive(() => { - this.options.logger?.debug('Attempting to connect to PowerSync instance'); - return super.connect(connector, options); - }); - } - protected generateBucketStorageAdapter(): BucketStorageAdapter { return new SqliteBucketStorage(this.database, AbstractPowerSyncDatabase.transactionMutex); } - protected runExclusive(cb: () => Promise) { + protected async runExclusive(cb: () => Promise) { if (this.resolvedFlags.ssrMode) { return PowerSyncDatabase.SHARED_MUTEX.runExclusive(cb); } diff --git a/packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts b/packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts index 3dad3192b..dd059139b 100644 --- a/packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts +++ b/packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts @@ -187,9 +187,6 @@ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplem */ async connect(options?: PowerSyncConnectionOptions): Promise { await this.waitForReady(); - // This is needed since a new tab won't have any reference to the - // shared worker sync implementation since that is only created on the first call to `connect`. - await this.disconnect(); return this.syncManager.connect(options); } @@ -209,13 +206,24 @@ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplem async dispose(): Promise { await this.waitForReady(); - // Signal the shared worker that this client is closing its connection to the worker - const closeMessagePayload: ManualSharedSyncPayload = { - event: SharedSyncClientEvent.CLOSE_CLIENT, - data: {} - }; - this.messagePort.postMessage(closeMessagePayload); + await new Promise((resolve) => { + // Listen for the close acknowledgment from the worker + this.messagePort.addEventListener('message', (event) => { + const payload = event.data as ManualSharedSyncPayload; + if (payload?.event === SharedSyncClientEvent.CLOSE_ACK) { + resolve(); + } + }); + + // Signal the shared worker that this client is closing its connection to the worker + const closeMessagePayload: ManualSharedSyncPayload = { + event: SharedSyncClientEvent.CLOSE_CLIENT, + data: {} + }; + this.messagePort.postMessage(closeMessagePayload); + }); + // Release the proxy this.syncManager[Comlink.releaseProxy](); this.messagePort.close(); diff --git a/packages/web/src/worker/sync/SharedSyncImplementation.ts b/packages/web/src/worker/sync/SharedSyncImplementation.ts index 60e2e1c0f..0af122ead 100644 --- a/packages/web/src/worker/sync/SharedSyncImplementation.ts +++ b/packages/web/src/worker/sync/SharedSyncImplementation.ts @@ -1,16 +1,16 @@ import { - type AbstractStreamingSyncImplementation, type ILogger, type ILogLevel, - type LockOptions, type PowerSyncConnectionOptions, type StreamingSyncImplementation, type StreamingSyncImplementationListener, type SyncStatusOptions, AbortOperation, BaseObserver, + ConnectionManager, createLogger, DBAdapter, + PowerSyncBackendConnector, SqliteBucketStorage, SyncStatus } from '@powersync/common'; @@ -26,11 +26,11 @@ import { OpenAsyncDatabaseConnection } from '../../db/adapters/AsyncDatabaseConn import { LockedAsyncDatabaseAdapter } from '../../db/adapters/LockedAsyncDatabaseAdapter'; import { ResolvedWebSQLOpenOptions } from '../../db/adapters/web-sql-flags'; import { WorkerWrappedAsyncDatabaseConnection } from '../../db/adapters/WorkerWrappedAsyncDatabaseConnection'; -import { getNavigatorLocks } from '../../shared/navigator'; import { AbstractSharedSyncClientProvider } from './AbstractSharedSyncClientProvider'; import { BroadcastLogger } from './BroadcastLogger'; /** + * @internal * Manual message events for shared sync clients */ export enum SharedSyncClientEvent { @@ -38,9 +38,14 @@ export enum SharedSyncClientEvent { * This client requests the shared sync manager should * close it's connection to the client. */ - CLOSE_CLIENT = 'close-client' + CLOSE_CLIENT = 'close-client', + + CLOSE_ACK = 'close-ack' } +/** + * @internal + */ export type ManualSharedSyncPayload = { event: SharedSyncClientEvent; data: any; // TODO update in future @@ -78,6 +83,13 @@ export type RemoteOperationAbortController = { activePort: WrappedSyncPort; }; +/** + * HACK: The shared implementation wraps and provides its own + * PowerSyncBackendConnector when generating the streaming sync implementation. + * We provide this unused placeholder when connecting with the ConnectionManager. + */ +const CONNECTOR_PLACEHOLDER = {} as PowerSyncBackendConnector; + /** * @internal * Shared sync implementation which runs inside a shared webworker @@ -87,7 +99,6 @@ export class SharedSyncImplementation implements StreamingSyncImplementation { protected ports: WrappedSyncPort[]; - protected syncStreamClient: AbstractStreamingSyncImplementation | null; protected isInitialized: Promise; protected statusListener?: () => void; @@ -99,7 +110,9 @@ export class SharedSyncImplementation protected syncParams: SharedSyncInitOptions | null; protected logger: ILogger; protected lastConnectOptions: PowerSyncConnectionOptions | undefined; + protected portMutex: Mutex; + protected connectionManager: ConnectionManager; syncStatus: SyncStatus; broadCastLogger: ILogger; @@ -108,9 +121,9 @@ export class SharedSyncImplementation this.ports = []; this.dbAdapter = null; this.syncParams = null; - this.syncStreamClient = null; this.logger = createLogger('shared-sync'); this.lastConnectOptions = undefined; + this.portMutex = new Mutex(); this.isInitialized = new Promise((resolve) => { const callback = this.registerListener({ @@ -123,24 +136,50 @@ export class SharedSyncImplementation this.syncStatus = new SyncStatus({}); this.broadCastLogger = new BroadcastLogger(this.ports); - } - async waitForStatus(status: SyncStatusOptions): Promise { - await this.waitForReady(); - return this.syncStreamClient!.waitForStatus(status); - } + this.connectionManager = new ConnectionManager({ + createSyncImplementation: async () => { + return this.portMutex.runExclusive(async () => { + await this.waitForReady(); + if (!this.dbAdapter) { + await this.openInternalDB(); + } - async waitUntilStatusMatches(predicate: (status: SyncStatus) => boolean): Promise { - await this.waitForReady(); - return this.syncStreamClient!.waitUntilStatusMatches(predicate); + const sync = this.generateStreamingImplementation(); + const onDispose = sync.registerListener({ + statusChanged: (status) => { + this.updateAllStatuses(status.toJSON()); + } + }); + + return { + sync, + onDispose + }; + }); + }, + logger: this.logger + }); } get lastSyncedAt(): Date | undefined { - return this.syncStreamClient?.lastSyncedAt; + return this.connectionManager.syncStreamImplementation?.lastSyncedAt; } get isConnected(): boolean { - return this.syncStreamClient?.isConnected ?? false; + return this.connectionManager.syncStreamImplementation?.isConnected ?? false; + } + + async waitForStatus(status: SyncStatusOptions): Promise { + return this.withSyncImplementation(async (sync) => { + return sync.waitForStatus(status); + }); + } + + async waitUntilStatusMatches(predicate: (status: SyncStatus) => boolean): Promise { + return this.withSyncImplementation(async (sync) => { + return sync.waitUntilStatusMatches(predicate); + }); } async waitForReady() { @@ -156,30 +195,40 @@ export class SharedSyncImplementation * Configures the DBAdapter connection and a streaming sync client. */ async setParams(params: SharedSyncInitOptions) { - if (this.syncParams) { - // Cannot modify already existing sync implementation - return; - } + await this.portMutex.runExclusive(async () => { + if (this.syncParams) { + // Cannot modify already existing sync implementation params + // But we can ask for a DB adapter, if required, at this point. - this.syncParams = params; + if (!this.dbAdapter) { + await this.openInternalDB(); + } + return; + } - if (params.streamOptions?.flags?.broadcastLogs) { - this.logger = this.broadCastLogger; - } + // First time setting params + this.syncParams = params; + if (params.streamOptions?.flags?.broadcastLogs) { + this.logger = this.broadCastLogger; + } - self.onerror = (event) => { - // Share any uncaught events on the broadcast logger - this.logger.error('Uncaught exception in PowerSync shared sync worker', event); - }; + self.onerror = (event) => { + // Share any uncaught events on the broadcast logger + this.logger.error('Uncaught exception in PowerSync shared sync worker', event); + }; - await this.openInternalDB(); - this.iterateListeners((l) => l.initialized?.()); + if (!this.dbAdapter) { + await this.openInternalDB(); + } + + this.iterateListeners((l) => l.initialized?.()); + }); } async dispose() { await this.waitForReady(); this.statusListener?.(); - return this.syncStreamClient?.dispose(); + return this.connectionManager.close(); } /** @@ -189,49 +238,31 @@ export class SharedSyncImplementation * connects. */ async connect(options?: PowerSyncConnectionOptions) { - await this.waitForReady(); - // This effectively queues connect and disconnect calls. Ensuring multiple tabs' requests are synchronized - return getNavigatorLocks().request('shared-sync-connect', async () => { - if (!this.dbAdapter) { - await this.openInternalDB(); - } - this.syncStreamClient = this.generateStreamingImplementation(); - this.lastConnectOptions = options; - this.syncStreamClient.registerListener({ - statusChanged: (status) => { - this.updateAllStatuses(status.toJSON()); - } - }); - - await this.syncStreamClient.connect(options); - }); + this.lastConnectOptions = options; + return this.connectionManager.connect(CONNECTOR_PLACEHOLDER, options); } async disconnect() { - await this.waitForReady(); - // This effectively queues connect and disconnect calls. Ensuring multiple tabs' requests are synchronized - return getNavigatorLocks().request('shared-sync-connect', async () => { - await this.syncStreamClient?.disconnect(); - await this.syncStreamClient?.dispose(); - this.syncStreamClient = null; - }); + return this.connectionManager.disconnect(); } /** * Adds a new client tab's message port to the list of connected ports */ - addPort(port: MessagePort) { - const portProvider = { - port, - clientProvider: Comlink.wrap(port) - }; - this.ports.push(portProvider); - - // Give the newly connected client the latest status - const status = this.syncStreamClient?.syncStatus; - if (status) { - portProvider.clientProvider.statusChanged(status.toJSON()); - } + async addPort(port: MessagePort) { + await this.portMutex.runExclusive(() => { + const portProvider = { + port, + clientProvider: Comlink.wrap(port) + }; + this.ports.push(portProvider); + + // Give the newly connected client the latest status + const status = this.connectionManager.syncStreamImplementation?.syncStatus; + if (status) { + portProvider.clientProvider.statusChanged(status.toJSON()); + } + }); } /** @@ -239,70 +270,105 @@ export class SharedSyncImplementation * clients. */ async removePort(port: MessagePort) { - const index = this.ports.findIndex((p) => p.port == port); - if (index < 0) { - this.logger.warn(`Could not remove port ${port} since it is not present in active ports.`); - return; - } - - const trackedPort = this.ports[index]; - // Remove from the list of active ports - this.ports.splice(index, 1); - - /** - * The port might currently be in use. Any active functions might - * not resolve. Abort them here. - */ - [this.fetchCredentialsController, this.uploadDataController].forEach((abortController) => { - if (abortController?.activePort.port == port) { - abortController!.controller.abort(new AbortOperation('Closing pending requests after client port is removed')); + // Remove the port within a mutex context. + // Warns if the port is not found. This should not happen in practice. + // We return early if the port is not found. + const { trackedPort, shouldReconnect } = await this.portMutex.runExclusive(async () => { + const index = this.ports.findIndex((p) => p.port == port); + if (index < 0) { + this.logger.warn(`Could not remove port ${port} since it is not present in active ports.`); + return {}; } + + const trackedPort = this.ports[index]; + // Remove from the list of active ports + this.ports.splice(index, 1); + + /** + * The port might currently be in use. Any active functions might + * not resolve. Abort them here. + */ + [this.fetchCredentialsController, this.uploadDataController].forEach((abortController) => { + if (abortController?.activePort.port == port) { + abortController!.controller.abort( + new AbortOperation('Closing pending requests after client port is removed') + ); + } + }); + + const shouldReconnect = !!this.connectionManager.syncStreamImplementation && this.ports.length > 0; + + return { + shouldReconnect, + trackedPort + }; }); - const shouldReconnect = !!this.syncStreamClient; + if (!trackedPort) { + // We could not find the port to remove + return () => {}; + } + if (this.dbAdapter && this.dbAdapter == trackedPort.db) { if (shouldReconnect) { - await this.disconnect(); + await this.connectionManager.disconnect(); } // Clearing the adapter will result in a new one being opened in connect this.dbAdapter = null; if (shouldReconnect) { - await this.connect(this.lastConnectOptions); + await this.connectionManager.connect(CONNECTOR_PLACEHOLDER, this.lastConnectOptions); } } if (trackedPort.db) { - trackedPort.db.close(); + await trackedPort.db.close(); } // Release proxy - trackedPort.clientProvider[Comlink.releaseProxy](); + return () => trackedPort.clientProvider[Comlink.releaseProxy](); } triggerCrudUpload() { - this.waitForReady().then(() => this.syncStreamClient?.triggerCrudUpload()); - } - - async obtainLock(lockOptions: LockOptions): Promise { - await this.waitForReady(); - return this.syncStreamClient!.obtainLock(lockOptions); + this.withSyncImplementation(async (sync) => { + sync.triggerCrudUpload(); + }); } async hasCompletedSync(): Promise { - await this.waitForReady(); - return this.syncStreamClient!.hasCompletedSync(); + return this.withSyncImplementation(async (sync) => { + return sync.hasCompletedSync(); + }); } async getWriteCheckpoint(): Promise { + return this.withSyncImplementation(async (sync) => { + return sync.getWriteCheckpoint(); + }); + } + + protected async withSyncImplementation(callback: (sync: StreamingSyncImplementation) => Promise): Promise { await this.waitForReady(); - return this.syncStreamClient!.getWriteCheckpoint(); + + if (this.connectionManager.syncStreamImplementation) { + return callback(this.connectionManager.syncStreamImplementation); + } + + const sync = await new Promise((resolve) => { + const dispose = this.connectionManager.registerListener({ + syncStreamCreated: (sync) => { + resolve(sync); + dispose?.(); + } + }); + }); + + return callback(sync); } protected generateStreamingImplementation() { // This should only be called after initialization has completed const syncParams = this.syncParams!; - // Create a new StreamingSyncImplementation for each connect call. This is usually done is all SDKs. return new WebStreamingSyncImplementation({ adapter: new SqliteBucketStorage(this.dbAdapter!, new Mutex(), this.logger), @@ -405,15 +471,14 @@ export class SharedSyncImplementation * A function only used for unit tests which updates the internal * sync stream client and all tab client's sync status */ - private _testUpdateAllStatuses(status: SyncStatusOptions) { - if (!this.syncStreamClient) { + private async _testUpdateAllStatuses(status: SyncStatusOptions) { + if (!this.connectionManager.syncStreamImplementation) { // This is just for testing purposes - this.syncStreamClient = this.generateStreamingImplementation(); + this.connectionManager.syncStreamImplementation = this.generateStreamingImplementation(); } // Only assigning, don't call listeners for this test - this.syncStreamClient!.syncStatus = new SyncStatus(status); - + this.connectionManager.syncStreamImplementation!.syncStatus = new SyncStatus(status); this.updateAllStatuses(status); } } diff --git a/packages/web/src/worker/sync/SharedSyncImplementation.worker.ts b/packages/web/src/worker/sync/SharedSyncImplementation.worker.ts index ad7644456..11a14fd9b 100644 --- a/packages/web/src/worker/sync/SharedSyncImplementation.worker.ts +++ b/packages/web/src/worker/sync/SharedSyncImplementation.worker.ts @@ -1,10 +1,10 @@ +import { createBaseLogger } from '@powersync/common'; import * as Comlink from 'comlink'; import { - SharedSyncImplementation, SharedSyncClientEvent, + SharedSyncImplementation, type ManualSharedSyncPayload } from './SharedSyncImplementation'; -import { createBaseLogger } from '@powersync/common'; const _self: SharedWorkerGlobalScope = self as any; const logger = createBaseLogger(); @@ -12,20 +12,25 @@ logger.useDefaults(); const sharedSyncImplementation = new SharedSyncImplementation(); -_self.onconnect = function (event: MessageEvent) { +_self.onconnect = async function (event: MessageEvent) { const port = event.ports[0]; /** * Adds an extra listener which can remove this port * from the list of monitored ports. */ - port.addEventListener('message', (event) => { + port.addEventListener('message', async (event) => { const payload = event.data as ManualSharedSyncPayload; if (payload?.event == SharedSyncClientEvent.CLOSE_CLIENT) { - sharedSyncImplementation.removePort(port); + const release = await sharedSyncImplementation.removePort(port); + port.postMessage({ + event: SharedSyncClientEvent.CLOSE_ACK, + data: {} + } satisfies ManualSharedSyncPayload); + release?.(); } }); + await sharedSyncImplementation.addPort(port); Comlink.expose(sharedSyncImplementation, port); - sharedSyncImplementation.addPort(port); }; diff --git a/packages/web/tests/src/db/AbstractPowerSyncDatabase.test.ts b/packages/web/tests/src/db/AbstractPowerSyncDatabase.test.ts index 9885af547..581d94722 100644 --- a/packages/web/tests/src/db/AbstractPowerSyncDatabase.test.ts +++ b/packages/web/tests/src/db/AbstractPowerSyncDatabase.test.ts @@ -1,15 +1,15 @@ -import { describe, it, expect, vi } from 'vitest'; import { AbstractPowerSyncDatabase, - DEFAULT_RETRY_DELAY_MS, - DEFAULT_CRUD_UPLOAD_THROTTLE_MS, BucketStorageAdapter, DBAdapter, + DEFAULT_CRUD_UPLOAD_THROTTLE_MS, + DEFAULT_RETRY_DELAY_MS, PowerSyncBackendConnector, PowerSyncDatabaseOptionsWithSettings, RequiredAdditionalConnectionOptions, StreamingSyncImplementation } from '@powersync/common'; +import { describe, expect, it, vi } from 'vitest'; import { testSchema } from '../../utils/testDb'; class TestPowerSyncDatabase extends AbstractPowerSyncDatabase { diff --git a/packages/web/tests/src/db/PowersyncDatabase.test.ts b/packages/web/tests/src/db/PowersyncDatabase.test.ts index ad44d3b9e..a7487ece1 100644 --- a/packages/web/tests/src/db/PowersyncDatabase.test.ts +++ b/packages/web/tests/src/db/PowersyncDatabase.test.ts @@ -1,5 +1,5 @@ -import { describe, it, expect, beforeEach, vi, afterEach } from 'vitest'; -import { AbstractPowerSyncDatabase, PowerSyncDatabase, SyncStreamConnectionMethod } from '@powersync/web'; +import { createBaseLogger, PowerSyncDatabase } from '@powersync/web'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { testSchema } from '../../utils/testDb'; describe('PowerSyncDatabase', () => { @@ -9,9 +9,10 @@ describe('PowerSyncDatabase', () => { let mockSyncImplementation: any; beforeEach(() => { + const logger = createBaseLogger(); mockLogger = { - debug: vi.fn(), - warn: vi.fn() + debug: vi.spyOn(logger, 'debug'), + warn: vi.spyOn(logger, 'warn') }; // Initialize with minimal required options @@ -20,12 +21,8 @@ describe('PowerSyncDatabase', () => { database: { dbFilename: 'test.db' }, - logger: mockLogger + logger }); - - vi.spyOn(db as any, 'runExclusive').mockImplementation((cb: any) => cb()); - - vi.spyOn(AbstractPowerSyncDatabase.prototype, 'connect').mockResolvedValue(undefined); }); afterEach(() => { @@ -36,27 +33,6 @@ describe('PowerSyncDatabase', () => { it('should log debug message when attempting to connect', async () => { await db.connect(mockConnector); expect(mockLogger.debug).toHaveBeenCalledWith('Attempting to connect to PowerSync instance'); - expect(db['runExclusive']).toHaveBeenCalled(); - }); - - it('should use connect with correct options', async () => { - await db.connect(mockConnector, { - retryDelayMs: 1000, - crudUploadThrottleMs: 2000, - params: { - param1: 1 - }, - connectionMethod: SyncStreamConnectionMethod.HTTP - }); - - expect(AbstractPowerSyncDatabase.prototype.connect).toHaveBeenCalledWith(mockConnector, { - retryDelayMs: 1000, - crudUploadThrottleMs: 2000, - connectionMethod: 'http', - params: { - param1: 1 - } - }); }); }); }); diff --git a/packages/web/tests/stream.test.ts b/packages/web/tests/stream.test.ts index 2fc8c75a4..fec4a70da 100644 --- a/packages/web/tests/stream.test.ts +++ b/packages/web/tests/stream.test.ts @@ -1,17 +1,33 @@ -import { BucketChecksum, WASQLiteOpenFactory, WASQLiteVFS } from '@powersync/web'; +import { + BucketChecksum, + createBaseLogger, + DataStream, + PowerSyncConnectionOptions, + WASQLiteOpenFactory, + WASQLiteVFS +} from '@powersync/web'; import { describe, expect, it, onTestFinished, vi } from 'vitest'; import { TestConnector } from './utils/MockStreamOpenFactory'; import { ConnectedDatabaseUtils, generateConnectedDatabase } from './utils/generateConnectedDatabase'; const UPLOAD_TIMEOUT_MS = 3000; +const logger = createBaseLogger(); +logger.useDefaults(); + describe('Streaming', { sequential: true }, () => { describe( 'Streaming - With Web Workers', { sequential: true }, - describeStreamingTests(() => generateConnectedDatabase()) + describeStreamingTests(() => + generateConnectedDatabase({ + powerSyncOptions: { + logger + } + }) + ) ); describe( @@ -24,7 +40,8 @@ describe('Streaming', { sequential: true }, () => { powerSyncOptions: { flags: { useWebWorker: false - } + }, + logger } }) ) @@ -41,7 +58,8 @@ describe('Streaming', { sequential: true }, () => { database: new WASQLiteOpenFactory({ dbFilename: 'streaming-opfs.sqlite', vfs: WASQLiteVFS.OPFSCoopSyncVFS - }) + }), + logger } }) ) @@ -160,15 +178,79 @@ function describeStreamingTests(createConnectedDatabase: () => Promise { // This initially performs a connect call - const { powersync, waitForStream } = await createConnectedDatabase(); + const { powersync, remote } = await createConnectedDatabase(); expect(powersync.connected).toBe(true); - // Call connect again, a new stream should be requested - const newStream = waitForStream(); - powersync.connect(new TestConnector()); + const spy = vi.spyOn(powersync as any, 'generateSyncStreamImplementation'); - // A new stream should be requested - await newStream; + // Keep track of all connection streams to check if they are correctly closed later + const generatedStreams: DataStream[] = []; + + // This method is used for all mocked connections + const basePostStream = remote.postStream; + const postSpy = vi.spyOn(remote, 'postStream').mockImplementation(async (...options) => { + // Simulate a connection delay + await new Promise((r) => setTimeout(r, 100)); + const stream = await basePostStream.call(remote, ...options); + generatedStreams.push(stream); + return stream; + }); + + // Connect many times. The calls here are not awaited and have no async calls in between. + const connectionAttempts = 10; + for (let i = 1; i <= connectionAttempts; i++) { + powersync.connect(new TestConnector(), { params: { count: i } }); + } + + await vi.waitFor( + () => { + const call = spy.mock.lastCall![1] as PowerSyncConnectionOptions; + expect(call.params!['count']).eq(connectionAttempts); + }, + { timeout: 2000, interval: 100 } + ); + + // In this case it should most likely be 1 attempt since all the calls + // are in the same for loop + expect(spy.mock.calls.length).lessThan(connectionAttempts); + + // Now with random awaited delays between unawaited calls + for (let i = connectionAttempts; i >= 0; i--) { + await new Promise((r) => setTimeout(r, Math.random() * 10)); + powersync.connect(new TestConnector(), { params: { count: i } }); + } + + await vi.waitFor( + () => { + const call = spy.mock.lastCall![1] as PowerSyncConnectionOptions; + expect(call.params!['count']).eq(0); + }, + { timeout: 8000, interval: 100 } + ); + + expect( + spy.mock.calls.length, + `Expected generated streams to be less than or equal to ${2 * connectionAttempts}, but got ${spy.mock.calls.length}` + ).lessThanOrEqual(2 * connectionAttempts); + + // The last request should make a network request with the client params + await vi.waitFor( + () => { + expect(postSpy.mock.lastCall?.[0].data.parameters!['count']).equals(0); + // The async postStream call's invocation is added to the count of calls + // before the generated stream is added (there is a delay) + // expect that the stream has been generated and tracked. + expect(postSpy.mock.calls.length).equals(generatedStreams.length); + }, + { timeout: 1000, interval: 100 } + ); + + const lastConnectionStream = generatedStreams.pop(); + expect(lastConnectionStream).toBeDefined(); + expect(lastConnectionStream?.closed).false; + + // All streams except the last one (which has been popped off already) should be closed + expect(generatedStreams.every((i) => i.closed)).true; }); it('Should trigger upload connector when connected', async () => { diff --git a/packages/web/tests/utils/MockStreamOpenFactory.ts b/packages/web/tests/utils/MockStreamOpenFactory.ts index e29cf313b..84dfad789 100644 --- a/packages/web/tests/utils/MockStreamOpenFactory.ts +++ b/packages/web/tests/utils/MockStreamOpenFactory.ts @@ -81,7 +81,13 @@ export class MockRemote extends AbstractRemote { if (this.errorOnStreamStart) { controller.error(new Error('Mock error on stream start')); } - + // The request could be aborted at any time. + // This checks if the signal is already aborted and closes the stream if so. + // If not, it adds an event listener to close the stream when the signal is aborted. + if (signal?.aborted) { + controller.close(); + return; + } signal?.addEventListener('abort', () => { try { controller.close(); @@ -106,6 +112,11 @@ export class MockRemote extends AbstractRemote { logger: this.logger }); + if (options.abortSignal?.aborted) { + stream.close(); + } + options.abortSignal?.addEventListener('abort', () => stream.close()); + const l = stream.registerListener({ lowWater: async () => { try { diff --git a/tools/diagnostics-app/src/app/views/layout.tsx b/tools/diagnostics-app/src/app/views/layout.tsx index 567f0c687..64531a11e 100644 --- a/tools/diagnostics-app/src/app/views/layout.tsx +++ b/tools/diagnostics-app/src/app/views/layout.tsx @@ -6,10 +6,11 @@ import SouthIcon from '@mui/icons-material/South'; import StorageIcon from '@mui/icons-material/Storage'; import TableChartIcon from '@mui/icons-material/TableChart'; import TerminalIcon from '@mui/icons-material/Terminal'; -import WifiIcon from '@mui/icons-material/Wifi'; import UserIcon from '@mui/icons-material/VerifiedUser'; +import WifiIcon from '@mui/icons-material/Wifi'; import { + Alert, AppBar, Box, Divider, @@ -34,7 +35,7 @@ import { SYNC_DIAGNOSTICS_ROUTE } from '@/app/router'; import { useNavigationPanel } from '@/components/navigation/NavigationPanelContext'; -import { signOut, sync, syncErrorTracker } from '@/library/powersync/ConnectionManager'; +import { signOut, sync } from '@/library/powersync/ConnectionManager'; import { usePowerSync } from '@powersync/react'; import { useNavigate } from 'react-router-dom'; @@ -103,19 +104,12 @@ export default function ViewsLayout({ children }: { children: React.ReactNode }) const l = sync.registerListener({ statusChanged: (status) => { setSyncStatus(status); + setSyncError(status.dataFlowStatus.downloadError ?? null); } }); return () => l(); }, []); - React.useEffect(() => { - const l = syncErrorTracker.registerListener({ - lastErrorUpdated(error) { - setSyncError(error); - } - }); - return () => l(); - }, []); const drawerWidth = 320; const drawer = ( @@ -199,6 +193,7 @@ export default function ViewsLayout({ children }: { children: React.ReactNode }) + {syncError ? Sync error detected: {syncError.message} : null} {children} @@ -220,4 +215,4 @@ namespace S { object-fit: contain; padding: 20px; `; -} \ No newline at end of file +} diff --git a/tools/diagnostics-app/src/app/views/sync-diagnostics.tsx b/tools/diagnostics-app/src/app/views/sync-diagnostics.tsx index 1aa3f90d5..7772f5336 100644 --- a/tools/diagnostics-app/src/app/views/sync-diagnostics.tsx +++ b/tools/diagnostics-app/src/app/views/sync-diagnostics.tsx @@ -1,5 +1,5 @@ import { NavigationPage } from '@/components/navigation/NavigationPage'; -import { clearData, db, sync, syncErrorTracker } from '@/library/powersync/ConnectionManager'; +import { clearData, db, sync } from '@/library/powersync/ConnectionManager'; import { Box, Button, @@ -73,7 +73,6 @@ FROM local_bucket_data local`; export default function SyncDiagnosticsPage() { const [bucketRows, setBucketRows] = React.useState(null); const [tableRows, setTableRows] = React.useState(null); - const [syncError, setSyncError] = React.useState(syncErrorTracker.lastSyncError); const [lastSyncedAt, setlastSyncedAt] = React.useState(null); const bucketRowsLoading = bucketRows == null; @@ -125,15 +124,6 @@ export default function SyncDiagnosticsPage() { }; }, []); - React.useEffect(() => { - const l = syncErrorTracker.registerListener({ - lastErrorUpdated(error) { - setSyncError(error); - } - }); - return () => l(); - }, []); - const columns: GridColDef[] = [ { field: 'name', headerName: 'Name', flex: 2 }, { field: 'tables', headerName: 'Table(s)', flex: 1, type: 'text' }, diff --git a/tools/diagnostics-app/src/library/powersync/ConnectionManager.ts b/tools/diagnostics-app/src/library/powersync/ConnectionManager.ts index 61053db92..8a9a4a0ae 100644 --- a/tools/diagnostics-app/src/library/powersync/ConnectionManager.ts +++ b/tools/diagnostics-app/src/library/powersync/ConnectionManager.ts @@ -1,20 +1,19 @@ import { BaseListener, - BaseObserver, + createBaseLogger, + LogLevel, PowerSyncDatabase, - WebRemote, - WebStreamingSyncImplementation, - WebStreamingSyncImplementationOptions, - WASQLiteOpenFactory, TemporaryStorageOption, + WASQLiteOpenFactory, WASQLiteVFS, - createBaseLogger, - LogLevel + WebRemote, + WebStreamingSyncImplementation, + WebStreamingSyncImplementationOptions } from '@powersync/web'; +import { safeParse } from '../safeParse/safeParse'; import { DynamicSchemaManager } from './DynamicSchemaManager'; import { RecordingStorageAdapter } from './RecordingStorageAdapter'; import { TokenConnector } from './TokenConnector'; -import { safeParse } from '../safeParse/safeParse'; const baseLogger = createBaseLogger(); baseLogger.useDefaults(); @@ -62,31 +61,6 @@ export interface SyncErrorListener extends BaseListener { lastErrorUpdated?: ((error: Error) => void) | undefined; } -class SyncErrorTracker extends BaseObserver { - public lastSyncError: Error | null = null; - - constructor() { - super(); - // Big hack: Use the logger to get access to connection errors - const defaultHandler = baseLogger.createDefaultHandler(); - baseLogger.setHandler((messages, context) => { - defaultHandler(messages, context); - if (context.name == 'PowerSyncStream' && context.level.name == 'ERROR') { - if (messages[0] instanceof Error) { - this.lastSyncError = messages[0]; - } else { - this.lastSyncError = new Error('' + messages[0]); - } - this.iterateListeners((listener) => { - listener.lastErrorUpdated?.(this.lastSyncError!); - }); - } - }); - } -} - -export const syncErrorTracker = new SyncErrorTracker(); - if (connector.hasCredentials()) { connect(); } @@ -95,11 +69,10 @@ export async function connect() { const params = getParams(); await sync.connect({ params }); if (!sync.syncStatus.connected) { + const error = sync.syncStatus.dataFlowStatus.downloadError ?? new Error('Failed to connect'); // Disconnect but don't wait for it sync.disconnect(); - throw syncErrorTracker.lastSyncError ?? new Error('Failed to connect'); - } else { - syncErrorTracker.lastSyncError = null; + throw error; } } @@ -120,6 +93,7 @@ export async function disconnect() { export async function signOut() { connector.clearCredentials(); + await disconnect(); await db.disconnectAndClear(); await schemaManager.clear(); }