Skip to content

Commit 150a4da

Browse files
committed
Start looking at OPFS tab close issue
1 parent eff8cbf commit 150a4da

File tree

4 files changed

+79
-15
lines changed

4 files changed

+79
-15
lines changed

packages/web/src/db/adapters/LockedAsyncDatabaseAdapter.ts

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,8 @@ export class LockedAsyncDatabaseAdapter
203203
);
204204
}
205205

206+
static dbLockRequestId = 0;
207+
206208
protected async acquireLock(callback: () => Promise<any>, options?: { timeoutMs?: number }): Promise<any> {
207209
await this.waitForInitialized();
208210

@@ -221,13 +223,17 @@ export class LockedAsyncDatabaseAdapter
221223
}, timeoutMs)
222224
: null;
223225

224-
return getNavigatorLocks().request(`db-lock-${this._dbIdentifier}`, { signal: abortController.signal }, () => {
225-
this.pendingAbortControllers.delete(abortController);
226-
if (timoutId) {
227-
clearTimeout(timoutId);
228-
}
229-
return callback();
230-
});
226+
const id = LockedAsyncDatabaseAdapter.dbLockRequestId++;
227+
console.trace('Requesting database lock', this._dbIdentifier, id);
228+
return getNavigatorLocks()
229+
.request(`db-lock-${this._dbIdentifier}`, { signal: abortController.signal }, () => {
230+
this.pendingAbortControllers.delete(abortController);
231+
if (timoutId) {
232+
clearTimeout(timoutId);
233+
}
234+
return callback();
235+
})
236+
.finally(() => console.log('returning database lock', id));
231237
}
232238

233239
async readTransaction<T>(fn: (tx: Transaction) => Promise<T>, options?: DBLockOptions | undefined): Promise<T> {

packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ export type SharedConnectionWorker = {
1515
export type WrappedWorkerConnectionOptions<Config extends ResolvedWebSQLOpenOptions = ResolvedWebSQLOpenOptions> = {
1616
baseConnection: AsyncDatabaseConnection;
1717
identifier: string;
18+
remoteCanCloseUnexpectedly: boolean;
1819
/**
1920
* Need a remote in order to keep a reference to the Proxied worker
2021
*/
@@ -29,10 +30,13 @@ export type WrappedWorkerConnectionOptions<Config extends ResolvedWebSQLOpenOpti
2930
export class WorkerWrappedAsyncDatabaseConnection<Config extends ResolvedWebSQLOpenOptions = ResolvedWebSQLOpenOptions>
3031
implements AsyncDatabaseConnection
3132
{
32-
protected lockAbortController: AbortController;
33+
protected lockAbortController = new AbortController();
34+
protected notifyRemoteClosed: AbortController | undefined;
3335

3436
constructor(protected options: WrappedWorkerConnectionOptions<Config>) {
35-
this.lockAbortController = new AbortController();
37+
if (options.remoteCanCloseUnexpectedly) {
38+
this.notifyRemoteClosed = new AbortController();
39+
}
3640
}
3741

3842
protected get baseConnection() {
@@ -43,6 +47,48 @@ export class WorkerWrappedAsyncDatabaseConnection<Config extends ResolvedWebSQLO
4347
return this.baseConnection.init();
4448
}
4549

50+
/**
51+
* Marks the remote as closed.
52+
*
53+
* This can sometimes happen outside of our control, e.g. when a shared worker requests a connection from a tab. When
54+
* it happens, all methods on the {@link baseConnection} would never resolve. To avoid livelocks in this scenario, we
55+
* throw on all outstanding promises and forbid new calls.
56+
*/
57+
markRemoteClosed() {
58+
// Can non-null assert here because this function is only supposed to be called when remoteCanCloseUnexpectedly was
59+
// set.
60+
this.notifyRemoteClosed!.abort();
61+
}
62+
63+
private withRemote<T>(inner: () => Promise<T>): Promise<T> {
64+
const controller = this.notifyRemoteClosed;
65+
if (controller) {
66+
return new Promise((resolve, reject) => {
67+
if (controller.signal.aborted) {
68+
reject(new Error('Called operation on closed remote'));
69+
}
70+
71+
function handleAbort() {
72+
reject(new Error('Remote peer closed with request in flight'));
73+
}
74+
75+
function markResolved(inner: () => void) {
76+
controller!.signal.removeEventListener('abort', handleAbort);
77+
inner();
78+
}
79+
80+
controller.signal.addEventListener('abort', handleAbort);
81+
82+
inner()
83+
.then((data) => markResolved(() => resolve(data)))
84+
.catch((e) => markResolved(() => reject(e)));
85+
});
86+
} else {
87+
// Can't close, so just return the inner promise unguarded.
88+
return inner();
89+
}
90+
}
91+
4692
/**
4793
* Get a MessagePort which can be used to share the internals of this connection.
4894
*/

packages/web/src/worker/sync/SharedSyncImplementation.ts

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ export type WrappedSyncPort = {
7575
clientProvider: Comlink.Remote<AbstractSharedSyncClientProvider>;
7676
db?: DBAdapter;
7777
currentSubscriptions: SubscribedStream[];
78+
closeListeners: (() => void)[];
7879
};
7980

8081
/**
@@ -274,7 +275,8 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
274275
const portProvider = {
275276
port,
276277
clientProvider: Comlink.wrap<AbstractSharedSyncClientProvider>(port),
277-
currentSubscriptions: []
278+
currentSubscriptions: [],
279+
closeListeners: []
278280
} satisfies WrappedSyncPort;
279281
this.ports.push(portProvider);
280282

@@ -331,10 +333,13 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
331333
return () => {};
332334
}
333335

336+
for (const closeListener of trackedPort.closeListeners) {
337+
closeListener();
338+
}
339+
334340
if (this.dbAdapter && this.dbAdapter == trackedPort.db) {
335-
if (shouldReconnect) {
336-
await this.connectionManager.disconnect();
337-
}
341+
// Unconditionally close the connection because the database it's writing to has just been closed.
342+
await this.connectionManager.disconnect();
338343

339344
// Clearing the adapter will result in a new one being opened in connect
340345
this.dbAdapter = null;
@@ -473,11 +478,16 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
473478
const locked = new LockedAsyncDatabaseAdapter({
474479
name: identifier,
475480
openConnection: async () => {
476-
return new WorkerWrappedAsyncDatabaseConnection({
481+
const wrapped = new WorkerWrappedAsyncDatabaseConnection({
477482
remote,
478483
baseConnection: db,
479484
identifier
480485
});
486+
lastClient.closeListeners.push(() => {
487+
wrapped.markRemoteClosed();
488+
});
489+
490+
return wrapped;
481491
},
482492
logger: this.logger
483493
});

packages/web/src/worker/sync/WorkerClient.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ export class WorkerClient {
4141

4242
private async removePort() {
4343
if (this.resolvedPort) {
44-
const release = await this.sync.removePort(this.resolvedPort);
44+
const resolved = this.resolvedPort;
45+
this.resolvedPort = null;
46+
const release = await this.sync.removePort(resolved);
4547
this.resolvedPort = null;
4648
this.port.postMessage({
4749
event: SharedSyncClientEvent.CLOSE_ACK,

0 commit comments

Comments
 (0)