Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Commit 3e17af7

Browse files
authored
Protect #init() and setOptions() with mutex (#425)
This will ensure multiple calls to `setOptions()` are applied in invocation order. We now only resolve the `ready` `Promise` when all previous calls to `setOptions()` have completed. Finally, we also now only log a ready/updated message if there are no pending options updates to apply.
1 parent c40fe41 commit 3e17af7

File tree

3 files changed

+109
-40
lines changed

3 files changed

+109
-40
lines changed

packages/tre/src/index.ts

Lines changed: 68 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,9 @@ import {
3939
serializeConfig,
4040
} from "./runtime";
4141
import {
42-
DeferredPromise,
4342
HttpError,
4443
MiniflareCoreError,
44+
Mutex,
4545
OptionalZodTypeOf,
4646
UnionToIntersection,
4747
ValueOf,
@@ -153,11 +153,30 @@ export class Miniflare {
153153
#removeRuntimeExitHook?: () => void;
154154
#runtimeEntryURL?: URL;
155155

156-
readonly #disposeController: AbortController;
156+
// Mutual exclusion lock for runtime operations (i.e. initialisation and
157+
// updating config). This essentially puts initialisation and future updates
158+
// in a queue, ensuring they're performed in calling order.
159+
readonly #runtimeMutex: Mutex;
160+
161+
// Additionally, store `Promise`s for the call to `#init()` and the last call
162+
// to `setOptions()`. We need the `#init()` `Promise`, so we can propagate
163+
// initialisation errors in `ready`. We would have no way of catching these
164+
// otherwise.
165+
//
166+
// We store the last `setOptions()` `Promise` as well, so we can avoid
167+
// disposing or resolving `ready` until all pending `setOptions()` have
168+
// completed. Note we only need to store the latest one, as the mutex queue
169+
// will ensure all previous calls complete before starting the latest.
170+
//
171+
// We could just wait on the mutex when disposing/resolving `ready`, but we
172+
// use the presence of waiters on the mutex to avoid logging ready/updated
173+
// messages to the console if there are future updates.
157174
readonly #initPromise: Promise<void>;
158-
#loopbackServer?: StoppableServer;
175+
#lastUpdatePromise?: Promise<void>;
159176

160-
#updatePromise?: Promise<void>;
177+
// Aborted when dispose() is called
178+
readonly #disposeController: AbortController;
179+
#loopbackServer?: StoppableServer;
161180

162181
constructor(opts: MiniflareOptions) {
163182
// Initialise plugin gateway factories and routers
@@ -181,7 +200,8 @@ export class Miniflare {
181200
);
182201

183202
this.#disposeController = new AbortController();
184-
this.#initPromise = this.#init();
203+
this.#runtimeMutex = new Mutex();
204+
this.#initPromise = this.#runtimeMutex.runWith(() => this.#init());
185205
}
186206

187207
#initPlugins() {
@@ -197,18 +217,9 @@ export class Miniflare {
197217
}
198218
}
199219

200-
async dispatchFetch(
201-
input: RequestInfo,
202-
init?: RequestInit
203-
): Promise<Response> {
204-
await this.ready;
205-
const forward = new Request(input, init);
206-
const url = new URL(forward.url);
207-
url.host = this.#runtimeEntryURL!.host;
208-
return fetch(url, forward as RequestInit);
209-
}
210-
211220
async #init() {
221+
// This function must be run with `#runtimeMutex` held
222+
212223
// Start loopback server (how the runtime accesses with Miniflare's storage)
213224
this.#loopbackServer = await this.#startLoopbackServer(0, "127.0.0.1");
214225
const address = this.#loopbackServer.address();
@@ -230,7 +241,8 @@ export class Miniflare {
230241
await this.#runtime.updateConfig(configBuffer);
231242

232243
// Wait for runtime to start
233-
if (await this.#waitForRuntime()) {
244+
if ((await this.#waitForRuntime()) && !this.#runtimeMutex.hasWaiting) {
245+
// Only log if there aren't pending updates
234246
console.log(bold(green(`Ready on ${this.#runtimeEntryURL} 🎉`)));
235247
}
236248
}
@@ -380,7 +392,6 @@ export class Miniflare {
380392
}
381393

382394
async #assembleConfig(): Promise<Config> {
383-
// Copy options in case `setOptions` called whilst assembling config
384395
const optionsVersion = this.#optionsVersion;
385396
const allWorkerOpts = this.#workerOpts;
386397
const sharedOpts = this.#sharedOpts;
@@ -446,10 +457,15 @@ export class Miniflare {
446457
}
447458

448459
get ready(): Promise<URL> {
449-
// Cannot use async/await with getters.
450-
// Safety: `#runtimeEntryURL` is assigned in `#init()`. `#initPromise`
451-
// doesn't resolve until `#init()` returns.
452-
return this.#initPromise.then(() => this.#runtimeEntryURL!);
460+
// If `#init()` threw, we'd like to propagate the error here, so `await` it.
461+
// Note we can't use `async`/`await` with getters. We'd also like to wait
462+
// for `setOptions` calls to complete before resolving.
463+
//
464+
// Safety of `!`: `#runtimeEntryURL` is assigned in `#init()`.
465+
// `#initPromise` doesn't resolve until `#init()` returns.
466+
return this.#initPromise
467+
.then(() => this.#lastUpdatePromise)
468+
.then(() => this.#runtimeEntryURL!);
453469
}
454470

455471
#checkDisposed() {
@@ -461,49 +477,61 @@ export class Miniflare {
461477
}
462478
}
463479

464-
async setOptions(opts: MiniflareOptions) {
465-
this.#checkDisposed();
466-
467-
const updatePromise = new DeferredPromise<void>();
468-
this.#updatePromise = updatePromise;
469-
470-
// Wait for initial initialisation before changing options
471-
await this.#initPromise;
480+
async #setOptions(opts: MiniflareOptions) {
481+
// This function must be run with `#runtimeMutex` held
472482

473483
// Split and validate options
474484
// TODO: merge with previous config
475485
const [sharedOpts, workerOpts] = validateOptions(opts);
476-
// Increment version, so we know when runtime has processed updates
477-
this.#optionsVersion++;
478486
this.#sharedOpts = sharedOpts;
479487
this.#workerOpts = workerOpts;
480488

481-
// Assemble and serialize config
482-
const currentOptionsVersion = this.#optionsVersion;
489+
// Increment version, so we know when the runtime has processed updates
490+
this.#optionsVersion++;
491+
// Assemble and serialize config using new version
483492
const config = await this.#assembleConfig();
484-
// If `setOptions` called again, discard our now outdated config
485-
if (currentOptionsVersion !== this.#optionsVersion) return;
486493
const configBuffer = serializeConfig(config);
487494

488495
// Send to runtime and wait for updates to process
489496
assert(this.#runtime !== undefined);
490497
await this.#runtime.updateConfig(configBuffer);
491498

492-
if (await this.#waitForRuntime()) {
499+
if ((await this.#waitForRuntime()) && !this.#runtimeMutex.hasWaiting) {
500+
// Only log if this was the last pending update
493501
console.log(
494502
bold(green(`Updated and ready on ${this.#runtimeEntryURL} 🎉`))
495503
);
496504
}
497-
updatePromise.resolve();
505+
}
506+
507+
setOptions(opts: MiniflareOptions): Promise<void> {
508+
this.#checkDisposed();
509+
// Wait for initial initialisation and other setOptions to complete before
510+
// changing options
511+
const promise = this.#runtimeMutex.runWith(() => this.#setOptions(opts));
512+
this.#lastUpdatePromise = promise;
513+
return promise;
514+
}
515+
516+
async dispatchFetch(
517+
input: RequestInfo,
518+
init?: RequestInit
519+
): Promise<Response> {
520+
this.#checkDisposed();
521+
await this.ready;
522+
const forward = new Request(input, init);
523+
const url = new URL(forward.url);
524+
url.host = this.#runtimeEntryURL!.host;
525+
return fetch(url, forward as RequestInit);
498526
}
499527

500528
async dispose() {
501529
this.#disposeController.abort();
502530
try {
503531
await this.#initPromise;
504-
await this.#updatePromise;
532+
await this.#lastUpdatePromise;
505533
} finally {
506-
// Cleanup as much as possible even if #init() threw
534+
// Cleanup as much as possible even if `#init()` threw
507535
this.#removeRuntimeExitHook?.();
508536
this.#runtime?.dispose();
509537
await this.#stopLoopbackServer();

packages/tre/src/shared/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@ export * from "./data";
33
export * from "./deferred";
44
export * from "./error";
55
export * from "./matcher";
6+
export * from "./mutex";
67
export * from "./types";

packages/tre/src/shared/mutex.ts

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import assert from "assert";
2+
import { Awaitable } from "./types";
3+
4+
export class Mutex {
5+
private locked = false;
6+
private resolveQueue: (() => void)[] = [];
7+
8+
private lock(): Awaitable<void> {
9+
if (!this.locked) {
10+
this.locked = true;
11+
return;
12+
}
13+
return new Promise((resolve) => this.resolveQueue.push(resolve));
14+
}
15+
16+
private unlock(): void {
17+
assert(this.locked);
18+
if (this.resolveQueue.length > 0) {
19+
this.resolveQueue.shift()?.();
20+
} else {
21+
this.locked = false;
22+
}
23+
}
24+
25+
get hasWaiting(): boolean {
26+
return this.resolveQueue.length > 0;
27+
}
28+
29+
async runWith<T>(closure: () => Awaitable<T>): Promise<T> {
30+
const acquireAwaitable = this.lock();
31+
if (acquireAwaitable instanceof Promise) await acquireAwaitable;
32+
try {
33+
const awaitable = closure();
34+
if (awaitable instanceof Promise) return await awaitable;
35+
return awaitable;
36+
} finally {
37+
this.unlock();
38+
}
39+
}
40+
}

0 commit comments

Comments
 (0)