Skip to content

Commit 6630097

Browse files
Wrap entire open operation init in writeLock.
1 parent 445ec69 commit 6630097

File tree

3 files changed

+73
-64
lines changed

3 files changed

+73
-64
lines changed

packages/web/src/db/adapters/LockedAsyncDatabaseAdapter.ts

Lines changed: 52 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -113,23 +113,28 @@ export class LockedAsyncDatabaseAdapter
113113
}
114114

115115
protected async openInternalDB() {
116-
// Dispose any previous table change listener.
117-
this._disposeTableChangeListener?.();
118-
this._disposeTableChangeListener = null;
119-
120-
const isReOpen = !!this._db;
121-
122-
this._db = await this.options.openConnection();
123-
await this._db.init();
124-
this._config = await this._db.getConfig();
125-
await this.registerOnChangeListener(this._db);
126-
if (isReOpen) {
127-
this.iterateListeners((cb) => cb.databaseReOpened?.());
128-
}
129116
/**
130-
* This is only required for the long-lived shared IndexedDB connections.
117+
* Execute opening of the db in a lock in order not to interfere with other operations.
131118
*/
132-
this.requiresHolds = (this._config as ResolvedWASQLiteOpenFactoryOptions).vfs == WASQLiteVFS.IDBBatchAtomicVFS;
119+
return this._acquireLock(async () => {
120+
// Dispose any previous table change listener.
121+
this._disposeTableChangeListener?.();
122+
this._disposeTableChangeListener = null;
123+
124+
const isReOpen = !!this._db;
125+
126+
this._db = await this.options.openConnection();
127+
await this._db.init();
128+
this._config = await this._db.getConfig();
129+
await this.registerOnChangeListener(this._db);
130+
if (isReOpen) {
131+
this.iterateListeners((cb) => cb.databaseReOpened?.());
132+
}
133+
/**
134+
* This is only required for the long-lived shared IndexedDB connections.
135+
*/
136+
this.requiresHolds = (this._config as ResolvedWASQLiteOpenFactoryOptions).vfs == WASQLiteVFS.IDBBatchAtomicVFS;
137+
});
133138
}
134139

135140
protected _reOpen() {
@@ -252,13 +257,10 @@ export class LockedAsyncDatabaseAdapter
252257
);
253258
}
254259

255-
protected async acquireLock(callback: () => Promise<any>, options?: { timeoutMs?: number }): Promise<any> {
256-
await this.waitForInitialized();
257-
260+
protected async _acquireLock(callback: () => Promise<any>, options?: { timeoutMs?: number }): Promise<any> {
258261
if (this.closing) {
259262
throw new Error(`Cannot acquire lock, the database is closing`);
260263
}
261-
262264
const abortController = new AbortController();
263265
this.pendingAbortControllers.add(abortController);
264266
const { timeoutMs } = options ?? {};
@@ -278,36 +280,41 @@ export class LockedAsyncDatabaseAdapter
278280
if (timeoutId) {
279281
clearTimeout(timeoutId);
280282
}
281-
let holdId: string | null = null;
282-
try {
283-
// The database is being opened in the background. Wait for it here.
284-
if (this.databaseOpenPromise) {
285-
/**
286-
* We can't await this since it uses the same lock as we're in now.
287-
*/
288-
throw new ConnectionClosedError('Connection is busy re-opening');
289-
}
283+
return await callback();
284+
}
285+
);
286+
}
290287

291-
holdId = this.requiresHolds ? await this.baseDB.markHold() : null;
292-
return await callback();
293-
} catch (ex) {
294-
if (
295-
ex instanceof ConnectionClosedError ||
296-
(ex instanceof Error && ex.name === 'NoModificationAllowedError')
297-
) {
298-
if (this.options.reOpenOnConnectionClosed && !this.databaseOpenPromise && !this.closing) {
299-
// Immediately re-open the database. We need to miss as little table updates as possible.
300-
this.reOpenInternalDB();
301-
}
302-
}
303-
throw ex;
304-
} finally {
305-
if (holdId) {
306-
await this.baseDB.releaseHold(holdId);
288+
protected async acquireLock(callback: () => Promise<any>, options?: { timeoutMs?: number }): Promise<any> {
289+
await this.waitForInitialized();
290+
291+
return this._acquireLock(async () => {
292+
let holdId: string | null = null;
293+
try {
294+
// The database is being opened in the background. Wait for it here.
295+
if (this.databaseOpenPromise) {
296+
/**
297+
* We can't await this since it uses the same lock as we're in now.
298+
*/
299+
throw new ConnectionClosedError('Connection is busy re-opening');
300+
}
301+
302+
holdId = this.requiresHolds ? await this.baseDB.markHold() : null;
303+
return await callback();
304+
} catch (ex) {
305+
if (ex instanceof ConnectionClosedError || (ex instanceof Error && ex.name === 'NoModificationAllowedError')) {
306+
if (this.options.reOpenOnConnectionClosed && !this.databaseOpenPromise && !this.closing) {
307+
// Immediately re-open the database. We need to miss as little table updates as possible.
308+
this.reOpenInternalDB();
307309
}
308310
}
311+
throw ex;
312+
} finally {
313+
if (holdId) {
314+
await this.baseDB.releaseHold(holdId);
315+
}
309316
}
310-
);
317+
}, options);
311318
}
312319

313320
async readTransaction<T>(fn: (tx: Transaction) => Promise<T>, options?: DBLockOptions | undefined): Promise<T> {

packages/web/src/worker/db/WASQLiteDB.worker.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,11 @@ baseLogger.useDefaults();
1616
const logger = createLogger('db-worker');
1717

1818
const DBMap = new Map<string, SharedDBWorkerConnection>();
19-
19+
const OPEN_DB_LOCK = 'open-wasqlite-db';
2020
let nextClientId = 1;
2121

2222
const openDBShared = async (options: WorkerDBOpenerOptions): Promise<AsyncDatabaseConnection> => {
23-
// Prevent multiple simultaneous opens from causing race conditions
24-
// Use the same lock as a write lock to prevent concurrent writes to the database.
25-
return getNavigatorLocks().request(`db-lock-${options.dbFilename}`, async () => {
23+
return getNavigatorLocks().request(OPEN_DB_LOCK, async () => {
2624
const clientId = nextClientId++;
2725
const { dbFilename, logLevel } = options;
2826

packages/web/src/worker/db/opfs.ts

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -93,18 +93,21 @@ export class OPFSCoopSyncVFS extends FacadeVFS {
9393
});
9494
});
9595

96-
finalizationRegistry.register(this, async () => {
97-
for (const file of this.persistentFiles.values()) {
98-
const release = this.#releaseAccessHandle(file);
99-
try {
100-
await this.#releaseAccessHandle(file);
101-
} catch (e) {
102-
this.log?.('error releasing access handle', e);
103-
} finally {
104-
release();
105-
}
106-
}
107-
});
96+
const releaseHandle = async () => {
97+
await Promise.all(
98+
this.persistentFiles.values().map(async (file) => {
99+
try {
100+
await this.#releaseAccessHandle(file);
101+
} catch (e) {
102+
this.log?.('error releasing access handle', e);
103+
} finally {
104+
release();
105+
}
106+
})
107+
);
108+
};
109+
110+
finalizationRegistry.register(this, releaseHandle);
108111
finalizationRegistry.register(this, this.releaser);
109112
const tmpDir = await root.getDirectoryHandle(tmpDirName, { create: true });
110113

@@ -272,9 +275,10 @@ export class OPFSCoopSyncVFS extends FacadeVFS {
272275
this.mapIdToFile.delete(fileId);
273276

274277
if (file?.flags & VFS.SQLITE_OPEN_MAIN_DB) {
275-
if (file.persistentFile?.handleLockReleaser) {
276-
this.#releaseAccessHandle(file);
277-
}
278+
// Release this either way
279+
// if (file.persistentFile?.handleLockReleaser) {
280+
this.#releaseAccessHandle(file);
281+
// }
278282
} else if (file?.flags & VFS.SQLITE_OPEN_DELETEONCLOSE) {
279283
file.accessHandle.truncate(0);
280284
this.accessiblePaths.delete(file.path);

0 commit comments

Comments
 (0)