Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/funny-baboons-wonder.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/web': patch
---

Fix issues with multiple tabs when the Rust client and OPFS is used.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export type SharedConnectionWorker = {
export type WrappedWorkerConnectionOptions<Config extends ResolvedWebSQLOpenOptions = ResolvedWebSQLOpenOptions> = {
baseConnection: AsyncDatabaseConnection;
identifier: string;
remoteCanCloseUnexpectedly: boolean;
/**
* Need a remote in order to keep a reference to the Proxied worker
*/
Expand All @@ -29,10 +30,13 @@ export type WrappedWorkerConnectionOptions<Config extends ResolvedWebSQLOpenOpti
export class WorkerWrappedAsyncDatabaseConnection<Config extends ResolvedWebSQLOpenOptions = ResolvedWebSQLOpenOptions>
implements AsyncDatabaseConnection
{
protected lockAbortController: AbortController;
protected lockAbortController = new AbortController();
protected notifyRemoteClosed: AbortController | undefined;

constructor(protected options: WrappedWorkerConnectionOptions<Config>) {
this.lockAbortController = new AbortController();
if (options.remoteCanCloseUnexpectedly) {
this.notifyRemoteClosed = new AbortController();
}
}

protected get baseConnection() {
Expand All @@ -43,6 +47,48 @@ export class WorkerWrappedAsyncDatabaseConnection<Config extends ResolvedWebSQLO
return this.baseConnection.init();
}

/**
* Marks the remote as closed.
*
* This can sometimes happen outside of our control, e.g. when a shared worker requests a connection from a tab. When
* it happens, all methods on the {@link baseConnection} would never resolve. To avoid livelocks in this scenario, we
* throw on all outstanding promises and forbid new calls.
*/
markRemoteClosed() {
// Can non-null assert here because this function is only supposed to be called when remoteCanCloseUnexpectedly was
// set.
this.notifyRemoteClosed!.abort();
}

private withRemote<T>(inner: () => Promise<T>): Promise<T> {
const controller = this.notifyRemoteClosed;
if (controller) {
return new Promise((resolve, reject) => {
if (controller.signal.aborted) {
reject(new Error('Called operation on closed remote'));
}

function handleAbort() {
reject(new Error('Remote peer closed with request in flight'));
}

function markResolved(inner: () => void) {
controller!.signal.removeEventListener('abort', handleAbort);
inner();
}

controller.signal.addEventListener('abort', handleAbort);

inner()
.then((data) => markResolved(() => resolve(data)))
.catch((e) => markResolved(() => reject(e)));
});
} else {
// Can't close, so just return the inner promise unguarded.
return inner();
}
}

/**
* Get a MessagePort which can be used to share the internals of this connection.
*/
Expand Down Expand Up @@ -103,24 +149,27 @@ export class WorkerWrappedAsyncDatabaseConnection<Config extends ResolvedWebSQLO
async close(): Promise<void> {
// Abort any pending lock requests.
this.lockAbortController.abort();
await this.baseConnection.close();
this.options.remote[Comlink.releaseProxy]();
this.options.onClose?.();
try {
await this.withRemote(() => this.baseConnection.close());
} finally {
this.options.remote[Comlink.releaseProxy]();
this.options.onClose?.();
}
}

execute(sql: string, params?: any[]): Promise<ProxiedQueryResult> {
return this.baseConnection.execute(sql, params);
return this.withRemote(() => this.baseConnection.execute(sql, params));
}

executeRaw(sql: string, params?: any[]): Promise<any[][]> {
return this.baseConnection.executeRaw(sql, params);
return this.withRemote(() => this.baseConnection.executeRaw(sql, params));
}

executeBatch(sql: string, params?: any[]): Promise<ProxiedQueryResult> {
return this.baseConnection.executeBatch(sql, params);
return this.withRemote(() => this.baseConnection.executeBatch(sql, params));
}

getConfig(): Promise<ResolvedWebSQLOpenOptions> {
return this.baseConnection.getConfig();
return this.withRemote(() => this.baseConnection.getConfig());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ export class WASQLiteDBAdapter extends LockedAsyncDatabaseAdapter {
const remote = Comlink.wrap<OpenAsyncDatabaseConnection>(workerPort);
return new WorkerWrappedAsyncDatabaseConnection({
remote,
remoteCanCloseUnexpectedly: false,
identifier: options.dbFilename,
baseConnection: await remote({
...options,
Expand Down
2 changes: 2 additions & 0 deletions packages/web/src/db/adapters/wa-sqlite/WASQLiteOpenFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ export class WASQLiteOpenFactory extends AbstractWebSQLOpenFactory {

return new WorkerWrappedAsyncDatabaseConnection({
remote: workerDBOpener,
// This tab owns the worker, so we're guaranteed to outlive it.
remoteCanCloseUnexpectedly: false,
baseConnection: await workerDBOpener({
dbFilename: this.options.dbFilename,
vfs,
Expand Down
31 changes: 21 additions & 10 deletions packages/web/src/worker/sync/SharedSyncImplementation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ export type WrappedSyncPort = {
clientProvider: Comlink.Remote<AbstractSharedSyncClientProvider>;
db?: DBAdapter;
currentSubscriptions: SubscribedStream[];
closeListeners: (() => void)[];
};

/**
Expand Down Expand Up @@ -274,7 +275,8 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
const portProvider = {
port,
clientProvider: Comlink.wrap<AbstractSharedSyncClientProvider>(port),
currentSubscriptions: []
currentSubscriptions: [],
closeListeners: []
} satisfies WrappedSyncPort;
this.ports.push(portProvider);

Expand Down Expand Up @@ -331,10 +333,13 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
return () => {};
}

for (const closeListener of trackedPort.closeListeners) {
closeListener();
}

if (this.dbAdapter && this.dbAdapter == trackedPort.db) {
if (shouldReconnect) {
await this.connectionManager.disconnect();
}
// Unconditionally close the connection because the database it's writing to has just been closed.
await this.connectionManager.disconnect();

// Clearing the adapter will result in a new one being opened in connect
this.dbAdapter = null;
Expand All @@ -344,10 +349,6 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
}
}

if (trackedPort.db) {
await trackedPort.db.close();
}

// Re-index subscriptions, the subscriptions of the removed port would no longer be considered.
this.collectActiveSubscriptions();

Expand Down Expand Up @@ -473,11 +474,21 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
const locked = new LockedAsyncDatabaseAdapter({
name: identifier,
openConnection: async () => {
return new WorkerWrappedAsyncDatabaseConnection({
const wrapped = new WorkerWrappedAsyncDatabaseConnection({
remote,
baseConnection: db,
identifier
identifier,
// It's possible for this worker to outlive the client hosting the database for us. We need to be prepared for
// that and ensure pending requests are aborted when the tab is closed.
remoteCanCloseUnexpectedly: true
});
lastClient.closeListeners.push(() => {
this.logger.info('Aborting open connection because associated tab closed.');
wrapped.close();
wrapped.markRemoteClosed();
});

return wrapped;
},
logger: this.logger
});
Expand Down
4 changes: 3 additions & 1 deletion packages/web/src/worker/sync/WorkerClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ export class WorkerClient {

private async removePort() {
if (this.resolvedPort) {
const release = await this.sync.removePort(this.resolvedPort);
const resolved = this.resolvedPort;
this.resolvedPort = null;
const release = await this.sync.removePort(resolved);
this.resolvedPort = null;
this.port.postMessage({
event: SharedSyncClientEvent.CLOSE_ACK,
Expand Down