diff --git a/.changeset/funny-baboons-wonder.md b/.changeset/funny-baboons-wonder.md new file mode 100644 index 000000000..91e7b9f89 --- /dev/null +++ b/.changeset/funny-baboons-wonder.md @@ -0,0 +1,5 @@ +--- +'@powersync/web': patch +--- + +Fix issues with multiple tabs when the Rust client and OPFS is used. diff --git a/packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts b/packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts index 57808e3ce..5c4c6902e 100644 --- a/packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts +++ b/packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts @@ -15,6 +15,7 @@ export type SharedConnectionWorker = { export type WrappedWorkerConnectionOptions = { baseConnection: AsyncDatabaseConnection; identifier: string; + remoteCanCloseUnexpectedly: boolean; /** * Need a remote in order to keep a reference to the Proxied worker */ @@ -29,10 +30,13 @@ export type WrappedWorkerConnectionOptions implements AsyncDatabaseConnection { - protected lockAbortController: AbortController; + protected lockAbortController = new AbortController(); + protected notifyRemoteClosed: AbortController | undefined; constructor(protected options: WrappedWorkerConnectionOptions) { - this.lockAbortController = new AbortController(); + if (options.remoteCanCloseUnexpectedly) { + this.notifyRemoteClosed = new AbortController(); + } } protected get baseConnection() { @@ -43,6 +47,48 @@ export class WorkerWrappedAsyncDatabaseConnection(workerPromise: () => Promise): Promise { + const controller = this.notifyRemoteClosed; + if (controller) { + return new Promise((resolve, reject) => { + if (controller.signal.aborted) { + reject(new Error('Called operation on closed remote')); + } + + function handleAbort() { + reject(new Error('Remote peer closed with request in flight')); + } + + function completePromise(action: () => void) { + controller!.signal.removeEventListener('abort', handleAbort); + action(); + } + + controller.signal.addEventListener('abort', handleAbort); + + workerPromise() + .then((data) => completePromise(() => resolve(data))) + .catch((e) => completePromise(() => reject(e))); + }); + } else { + // Can't close, so just return the inner worker promise unguarded. + return workerPromise(); + } + } + /** * Get a MessagePort which can be used to share the internals of this connection. */ @@ -103,24 +149,27 @@ export class WorkerWrappedAsyncDatabaseConnection { // Abort any pending lock requests. this.lockAbortController.abort(); - await this.baseConnection.close(); - this.options.remote[Comlink.releaseProxy](); - this.options.onClose?.(); + try { + await this.withRemote(() => this.baseConnection.close()); + } finally { + this.options.remote[Comlink.releaseProxy](); + this.options.onClose?.(); + } } execute(sql: string, params?: any[]): Promise { - return this.baseConnection.execute(sql, params); + return this.withRemote(() => this.baseConnection.execute(sql, params)); } executeRaw(sql: string, params?: any[]): Promise { - return this.baseConnection.executeRaw(sql, params); + return this.withRemote(() => this.baseConnection.executeRaw(sql, params)); } executeBatch(sql: string, params?: any[]): Promise { - return this.baseConnection.executeBatch(sql, params); + return this.withRemote(() => this.baseConnection.executeBatch(sql, params)); } getConfig(): Promise { - return this.baseConnection.getConfig(); + return this.withRemote(() => this.baseConnection.getConfig()); } } diff --git a/packages/web/src/db/adapters/wa-sqlite/WASQLiteDBAdapter.ts b/packages/web/src/db/adapters/wa-sqlite/WASQLiteDBAdapter.ts index 01ffef3b0..1367ed811 100644 --- a/packages/web/src/db/adapters/wa-sqlite/WASQLiteDBAdapter.ts +++ b/packages/web/src/db/adapters/wa-sqlite/WASQLiteDBAdapter.ts @@ -54,6 +54,7 @@ export class WASQLiteDBAdapter extends LockedAsyncDatabaseAdapter { const remote = Comlink.wrap(workerPort); return new WorkerWrappedAsyncDatabaseConnection({ remote, + remoteCanCloseUnexpectedly: false, identifier: options.dbFilename, baseConnection: await remote({ ...options, diff --git a/packages/web/src/db/adapters/wa-sqlite/WASQLiteOpenFactory.ts b/packages/web/src/db/adapters/wa-sqlite/WASQLiteOpenFactory.ts index 129a3b130..487d121aa 100644 --- a/packages/web/src/db/adapters/wa-sqlite/WASQLiteOpenFactory.ts +++ b/packages/web/src/db/adapters/wa-sqlite/WASQLiteOpenFactory.ts @@ -82,6 +82,8 @@ export class WASQLiteOpenFactory extends AbstractWebSQLOpenFactory { return new WorkerWrappedAsyncDatabaseConnection({ remote: workerDBOpener, + // This tab owns the worker, so we're guaranteed to outlive it. + remoteCanCloseUnexpectedly: false, baseConnection: await workerDBOpener({ dbFilename: this.options.dbFilename, vfs, diff --git a/packages/web/src/worker/sync/SharedSyncImplementation.ts b/packages/web/src/worker/sync/SharedSyncImplementation.ts index 7db98c3d0..f8923c69f 100644 --- a/packages/web/src/worker/sync/SharedSyncImplementation.ts +++ b/packages/web/src/worker/sync/SharedSyncImplementation.ts @@ -75,6 +75,7 @@ export type WrappedSyncPort = { clientProvider: Comlink.Remote; db?: DBAdapter; currentSubscriptions: SubscribedStream[]; + closeListeners: (() => void)[]; }; /** @@ -274,7 +275,8 @@ export class SharedSyncImplementation extends BaseObserver(port), - currentSubscriptions: [] + currentSubscriptions: [], + closeListeners: [] } satisfies WrappedSyncPort; this.ports.push(portProvider); @@ -331,10 +333,13 @@ export class SharedSyncImplementation extends BaseObserver {}; } + for (const closeListener of trackedPort.closeListeners) { + closeListener(); + } + if (this.dbAdapter && this.dbAdapter == trackedPort.db) { - if (shouldReconnect) { - await this.connectionManager.disconnect(); - } + // Unconditionally close the connection because the database it's writing to has just been closed. + await this.connectionManager.disconnect(); // Clearing the adapter will result in a new one being opened in connect this.dbAdapter = null; @@ -344,10 +349,6 @@ export class SharedSyncImplementation extends BaseObserver { - return new WorkerWrappedAsyncDatabaseConnection({ + const wrapped = new WorkerWrappedAsyncDatabaseConnection({ remote, baseConnection: db, - identifier + identifier, + // It's possible for this worker to outlive the client hosting the database for us. We need to be prepared for + // that and ensure pending requests are aborted when the tab is closed. + remoteCanCloseUnexpectedly: true }); + lastClient.closeListeners.push(() => { + this.logger.info('Aborting open connection because associated tab closed.'); + wrapped.close(); + wrapped.markRemoteClosed(); + }); + + return wrapped; }, logger: this.logger }); diff --git a/packages/web/src/worker/sync/WorkerClient.ts b/packages/web/src/worker/sync/WorkerClient.ts index 22c44f4ba..1dfffe9b2 100644 --- a/packages/web/src/worker/sync/WorkerClient.ts +++ b/packages/web/src/worker/sync/WorkerClient.ts @@ -41,7 +41,9 @@ export class WorkerClient { private async removePort() { if (this.resolvedPort) { - const release = await this.sync.removePort(this.resolvedPort); + const resolved = this.resolvedPort; + this.resolvedPort = null; + const release = await this.sync.removePort(resolved); this.resolvedPort = null; this.port.postMessage({ event: SharedSyncClientEvent.CLOSE_ACK,