Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/funny-baboons-wonder.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/web': patch
---

Fix issues with multiple tabs when the Rust client and OPFS is used.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export type SharedConnectionWorker = {
export type WrappedWorkerConnectionOptions<Config extends ResolvedWebSQLOpenOptions = ResolvedWebSQLOpenOptions> = {
baseConnection: AsyncDatabaseConnection;
identifier: string;
remoteCanCloseUnexpectedly: boolean;
/**
* Need a remote in order to keep a reference to the Proxied worker
*/
Expand All @@ -29,10 +30,13 @@ export type WrappedWorkerConnectionOptions<Config extends ResolvedWebSQLOpenOpti
export class WorkerWrappedAsyncDatabaseConnection<Config extends ResolvedWebSQLOpenOptions = ResolvedWebSQLOpenOptions>
implements AsyncDatabaseConnection
{
protected lockAbortController: AbortController;
protected lockAbortController = new AbortController();
protected notifyRemoteClosed: AbortController | undefined;

constructor(protected options: WrappedWorkerConnectionOptions<Config>) {
this.lockAbortController = new AbortController();
if (options.remoteCanCloseUnexpectedly) {
this.notifyRemoteClosed = new AbortController();
}
}

protected get baseConnection() {
Expand All @@ -43,6 +47,48 @@ export class WorkerWrappedAsyncDatabaseConnection<Config extends ResolvedWebSQLO
return this.baseConnection.init();
}

/**
* Marks the remote as closed.
*
* This can sometimes happen outside of our control, e.g. when a shared worker requests a connection from a tab. When
* it happens, all methods on the {@link baseConnection} would never resolve. To avoid livelocks in this scenario, we
* throw on all outstanding promises and forbid new calls.
*/
markRemoteClosed() {
// Can non-null assert here because this function is only supposed to be called when remoteCanCloseUnexpectedly was
// set.
this.notifyRemoteClosed!.abort();
}

private withRemote<T>(workerPromise: () => Promise<T>): Promise<T> {
const controller = this.notifyRemoteClosed;
if (controller) {
return new Promise((resolve, reject) => {
if (controller.signal.aborted) {
reject(new Error('Called operation on closed remote'));
}

function handleAbort() {
reject(new Error('Remote peer closed with request in flight'));
}

function completePromise(action: () => void) {
controller!.signal.removeEventListener('abort', handleAbort);
action();
}

controller.signal.addEventListener('abort', handleAbort);

workerPromise()
.then((data) => completePromise(() => resolve(data)))
.catch((e) => completePromise(() => reject(e)));
});
} else {
// Can't close, so just return the inner worker promise unguarded.
return workerPromise();
}
}

/**
* Get a MessagePort which can be used to share the internals of this connection.
*/
Expand Down Expand Up @@ -103,24 +149,27 @@ export class WorkerWrappedAsyncDatabaseConnection<Config extends ResolvedWebSQLO
async close(): Promise<void> {
// Abort any pending lock requests.
this.lockAbortController.abort();
await this.baseConnection.close();
this.options.remote[Comlink.releaseProxy]();
this.options.onClose?.();
try {
await this.withRemote(() => this.baseConnection.close());
} finally {
this.options.remote[Comlink.releaseProxy]();
this.options.onClose?.();
}
}

execute(sql: string, params?: any[]): Promise<ProxiedQueryResult> {
return this.baseConnection.execute(sql, params);
return this.withRemote(() => this.baseConnection.execute(sql, params));
}

executeRaw(sql: string, params?: any[]): Promise<any[][]> {
return this.baseConnection.executeRaw(sql, params);
return this.withRemote(() => this.baseConnection.executeRaw(sql, params));
}

executeBatch(sql: string, params?: any[]): Promise<ProxiedQueryResult> {
return this.baseConnection.executeBatch(sql, params);
return this.withRemote(() => this.baseConnection.executeBatch(sql, params));
}

getConfig(): Promise<ResolvedWebSQLOpenOptions> {
return this.baseConnection.getConfig();
return this.withRemote(() => this.baseConnection.getConfig());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ export class WASQLiteDBAdapter extends LockedAsyncDatabaseAdapter {
const remote = Comlink.wrap<OpenAsyncDatabaseConnection>(workerPort);
return new WorkerWrappedAsyncDatabaseConnection({
remote,
remoteCanCloseUnexpectedly: false,
identifier: options.dbFilename,
baseConnection: await remote({
...options,
Expand Down
2 changes: 2 additions & 0 deletions packages/web/src/db/adapters/wa-sqlite/WASQLiteOpenFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ export class WASQLiteOpenFactory extends AbstractWebSQLOpenFactory {

return new WorkerWrappedAsyncDatabaseConnection({
remote: workerDBOpener,
// This tab owns the worker, so we're guaranteed to outlive it.
remoteCanCloseUnexpectedly: false,
baseConnection: await workerDBOpener({
dbFilename: this.options.dbFilename,
vfs,
Expand Down
31 changes: 21 additions & 10 deletions packages/web/src/worker/sync/SharedSyncImplementation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ export type WrappedSyncPort = {
clientProvider: Comlink.Remote<AbstractSharedSyncClientProvider>;
db?: DBAdapter;
currentSubscriptions: SubscribedStream[];
closeListeners: (() => void)[];
};

/**
Expand Down Expand Up @@ -274,7 +275,8 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
const portProvider = {
port,
clientProvider: Comlink.wrap<AbstractSharedSyncClientProvider>(port),
currentSubscriptions: []
currentSubscriptions: [],
closeListeners: []
} satisfies WrappedSyncPort;
this.ports.push(portProvider);

Expand Down Expand Up @@ -331,10 +333,13 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
return () => {};
}

for (const closeListener of trackedPort.closeListeners) {
closeListener();
}

if (this.dbAdapter && this.dbAdapter == trackedPort.db) {
if (shouldReconnect) {
await this.connectionManager.disconnect();
}
// Unconditionally close the connection because the database it's writing to has just been closed.
await this.connectionManager.disconnect();

// Clearing the adapter will result in a new one being opened in connect
this.dbAdapter = null;
Expand All @@ -344,10 +349,6 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
}
}

if (trackedPort.db) {
await trackedPort.db.close();
}

// Re-index subscriptions, the subscriptions of the removed port would no longer be considered.
this.collectActiveSubscriptions();

Expand Down Expand Up @@ -473,11 +474,21 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
const locked = new LockedAsyncDatabaseAdapter({
name: identifier,
openConnection: async () => {
return new WorkerWrappedAsyncDatabaseConnection({
const wrapped = new WorkerWrappedAsyncDatabaseConnection({
remote,
baseConnection: db,
identifier
identifier,
// It's possible for this worker to outlive the client hosting the database for us. We need to be prepared for
// that and ensure pending requests are aborted when the tab is closed.
remoteCanCloseUnexpectedly: true
});
lastClient.closeListeners.push(() => {
this.logger.info('Aborting open connection because associated tab closed.');
wrapped.close();
wrapped.markRemoteClosed();
});

return wrapped;
},
logger: this.logger
});
Expand Down
4 changes: 3 additions & 1 deletion packages/web/src/worker/sync/WorkerClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ export class WorkerClient {

private async removePort() {
if (this.resolvedPort) {
const release = await this.sync.removePort(this.resolvedPort);
const resolved = this.resolvedPort;
this.resolvedPort = null;
const release = await this.sync.removePort(resolved);
this.resolvedPort = null;
this.port.postMessage({
event: SharedSyncClientEvent.CLOSE_ACK,
Expand Down