Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Commit a915982

Browse files
committed
Allow host/port to be updated in Miniflare#setOptions()
If `port: 0` is passed to `Miniflare#setOptions()`, a new random port will be allocated. Previously, Miniflare would always try to reuse the existing port (even if a non-zero port was passed). That behaviour can be retained by passing `port: (await mf.ready).port` to `setOptions()`. We'd like this for Wrangler, so we can guarantee we always reload on a fresh port.
1 parent 92715ed commit a915982

File tree

3 files changed

+134
-125
lines changed

3 files changed

+134
-125
lines changed

packages/miniflare/src/index.ts

Lines changed: 68 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import assert from "assert";
22
import crypto from "crypto";
3+
import { Abortable } from "events";
34
import fs from "fs";
45
import http from "http";
56
import net from "net";
@@ -98,15 +99,24 @@ import {
9899
LogLevel,
99100
Mutex,
100101
SharedHeaders,
101-
maybeApply,
102102
} from "./workers";
103103
import { _formatZodError } from "./zod-format";
104104

105105
const DEFAULT_HOST = "127.0.0.1";
106+
function getURLSafeHost(host: string) {
107+
return net.isIPv6(host) ? `[${host}]` : host;
108+
}
106109
function getAccessibleHost(host: string) {
107-
return host === "*" || host === "0.0.0.0" || host === "::"
108-
? "127.0.0.1"
109-
: host;
110+
const accessibleHost =
111+
host === "*" || host === "0.0.0.0" || host === "::" ? "127.0.0.1" : host;
112+
return getURLSafeHost(accessibleHost);
113+
}
114+
115+
function getServerPort(server: http.Server) {
116+
const address = server.address();
117+
// Note address would be string with unix socket
118+
assert(address !== null && typeof address === "object");
119+
return address.port;
110120
}
111121

112122
// ===== `Miniflare` User Options =====
@@ -539,11 +549,9 @@ export class Miniflare {
539549
#sharedOpts: PluginSharedOptions;
540550
#workerOpts: PluginWorkerOptions[];
541551
#log: Log;
542-
readonly #host: string;
543-
readonly #accessibleHost: string;
544552

545-
#runtime?: Runtime;
546-
#removeRuntimeExitHook?: () => void;
553+
readonly #runtime?: Runtime;
554+
readonly #removeRuntimeExitHook?: () => void;
547555
#runtimeEntryURL?: URL;
548556
#socketPorts?: Map<SocketIdentifier, number>;
549557
#runtimeClient?: Client;
@@ -567,7 +575,7 @@ export class Miniflare {
567575
// Aborted when dispose() is called
568576
readonly #disposeController: AbortController;
569577
#loopbackServer?: StoppableServer;
570-
#loopbackPort?: number;
578+
#loopbackHost?: string;
571579
readonly #liveReloadServer: WebSocketServer;
572580
readonly #webSocketServer: WebSocketServer;
573581
readonly #webSocketExtraHeaders: WeakMap<http.IncomingMessage, Headers>;
@@ -587,15 +595,6 @@ export class Miniflare {
587595
}
588596

589597
this.#log = this.#sharedOpts.core.log ?? new NoOpLog();
590-
this.#host = this.#sharedOpts.core.host ?? DEFAULT_HOST;
591-
// TODO: maybe remove `#accessibleHost` field, and just get whenever
592-
// constructing entry URL, then extract constructing entry URL into
593-
// function used `getUnsafeGetDirectURL()` too?
594-
this.#accessibleHost = getAccessibleHost(this.#host);
595-
596-
if (net.isIPv6(this.#accessibleHost)) {
597-
this.#accessibleHost = `[${this.#accessibleHost}]`;
598-
}
599598

600599
this.#liveReloadServer = new WebSocketServer({ noServer: true });
601600
this.#webSocketServer = new WebSocketServer({
@@ -630,10 +629,14 @@ export class Miniflare {
630629
fs.rmSync(this.#tmpPath, { force: true, recursive: true });
631630
});
632631

632+
// Setup runtime
633+
this.#runtime = new Runtime();
634+
this.#removeRuntimeExitHook = exitHook(() => void this.#runtime?.dispose());
635+
633636
this.#disposeController = new AbortController();
634637
this.#runtimeMutex = new Mutex();
635638
this.#initPromise = this.#runtimeMutex
636-
.runWith(() => this.#init())
639+
.runWith(() => this.#assembleAndUpdateConfig())
637640
.catch((e) => {
638641
// If initialisation failed, attempting to `dispose()` this instance
639642
// will too. Therefore, remove from the instance registry now, so we
@@ -655,35 +658,6 @@ export class Miniflare {
655658
}
656659
}
657660

658-
async #init() {
659-
// This function must be run with `#runtimeMutex` held
660-
661-
// Start loopback server (how the runtime accesses with Miniflare's storage)
662-
// using the same host as the main runtime server. This means we can use the
663-
// loopback server for live reload updates too.
664-
this.#loopbackServer = await this.#startLoopbackServer(0, this.#host);
665-
const address = this.#loopbackServer.address();
666-
// Note address would be string with unix socket
667-
assert(address !== null && typeof address === "object");
668-
// noinspection JSObjectNullOrUndefined
669-
this.#loopbackPort = address.port;
670-
671-
// Start runtime
672-
const port = this.#sharedOpts.core.port ?? 0;
673-
const opts: RuntimeOptions = {
674-
entryHost: net.isIPv6(this.#host) ? `[${this.#host}]` : this.#host,
675-
entryPort: port,
676-
loopbackPort: this.#loopbackPort,
677-
inspectorPort: this.#sharedOpts.core.inspectorPort,
678-
verbose: this.#sharedOpts.core.verbose,
679-
};
680-
this.#runtime = new Runtime(opts);
681-
this.#removeRuntimeExitHook = exitHook(() => void this.#runtime?.dispose());
682-
683-
// Update config and wait for runtime to start
684-
await this.#assembleAndUpdateConfig();
685-
}
686-
687661
async #handleLoopbackCustomService(
688662
request: Request,
689663
customService: string
@@ -862,21 +836,37 @@ export class Miniflare {
862836
await writeResponse(response, res);
863837
};
864838

865-
#startLoopbackServer(
866-
port: number,
867-
hostname: string
868-
): Promise<StoppableServer> {
869-
if (hostname === "*") {
870-
hostname = "::";
839+
async #getLoopbackPort(): Promise<number> {
840+
// This function must be run with `#runtimeMutex` held
841+
842+
// Start loopback server (how the runtime accesses Node.js) using the same
843+
// host as the main runtime server. This means we can use the loopback
844+
// server for live reload updates too.
845+
const loopbackHost = this.#sharedOpts.core.host ?? DEFAULT_HOST;
846+
// If we've already started the loopback server...
847+
if (this.#loopbackServer !== undefined) {
848+
// ...and it's using the correct host, reuse it
849+
if (this.#loopbackHost === loopbackHost) {
850+
return getServerPort(this.#loopbackServer);
851+
}
852+
// Otherwise, stop it, and create a new one
853+
await this.#stopLoopbackServer();
871854
}
855+
this.#loopbackServer = await this.#startLoopbackServer(loopbackHost);
856+
this.#loopbackHost = loopbackHost;
857+
return getServerPort(this.#loopbackServer);
858+
}
859+
860+
#startLoopbackServer(hostname: string): Promise<StoppableServer> {
861+
if (hostname === "*") hostname = "::";
872862

873863
return new Promise((resolve) => {
874864
const server = stoppable(
875865
http.createServer(this.#handleLoopback),
876866
/* grace */ 0
877867
);
878868
server.on("upgrade", this.#handleLoopbackUpgrade);
879-
server.listen(port, hostname, () => resolve(server));
869+
server.listen(0, hostname, () => resolve(server));
880870
});
881871
}
882872

@@ -887,12 +877,9 @@ export class Miniflare {
887877
});
888878
}
889879

890-
async #assembleConfig(): Promise<Config> {
880+
async #assembleConfig(loopbackPort: number): Promise<Config> {
891881
const allWorkerOpts = this.#workerOpts;
892882
const sharedOpts = this.#sharedOpts;
893-
const loopbackPort = this.#loopbackPort;
894-
// #assembleConfig is always called after the loopback server is created
895-
assert(loopbackPort !== undefined);
896883

897884
sharedOpts.core.cf = await setupCf(this.#log, sharedOpts.core.cf);
898885

@@ -1049,9 +1036,11 @@ export class Miniflare {
10491036
}
10501037

10511038
async #assembleAndUpdateConfig() {
1039+
// This function must be run with `#runtimeMutex` held
10521040
const initial = !this.#runtimeEntryURL;
10531041
assert(this.#runtime !== undefined);
1054-
const config = await this.#assembleConfig();
1042+
const loopbackPort = await this.#getLoopbackPort();
1043+
const config = await this.#assembleConfig(loopbackPort);
10551044
const configBuffer = serializeConfig(config);
10561045

10571046
// Get all socket names we expect to get ports for
@@ -1062,18 +1051,26 @@ export class Miniflare {
10621051
return name;
10631052
}
10641053
);
1065-
// TODO(now): there's a bug here if the inspector was not enabled initially,
1066-
// fixed by a later commit in this PR
10671054
if (this.#sharedOpts.core.inspectorPort !== undefined) {
10681055
requiredSockets.push(kInspectorSocket);
10691056
}
1057+
1058+
// Reload runtime
1059+
const host = this.#sharedOpts.core.host ?? DEFAULT_HOST;
1060+
const urlSafeHost = getURLSafeHost(host);
1061+
const accessibleHost = getAccessibleHost(host);
1062+
const runtimeOpts: Abortable & RuntimeOptions = {
1063+
signal: this.#disposeController.signal,
1064+
entryHost: urlSafeHost,
1065+
entryPort: this.#sharedOpts.core.port ?? 0,
1066+
loopbackPort,
1067+
requiredSockets,
1068+
inspectorPort: this.#sharedOpts.core.inspectorPort,
1069+
verbose: this.#sharedOpts.core.verbose,
1070+
};
10701071
const maybeSocketPorts = await this.#runtime.updateConfig(
10711072
configBuffer,
1072-
requiredSockets,
1073-
{
1074-
signal: this.#disposeController.signal,
1075-
entryPort: maybeApply(parseInt, this.#runtimeEntryURL?.port),
1076-
}
1073+
runtimeOpts
10771074
);
10781075
if (this.#disposeController.signal.aborted) return;
10791076
if (maybeSocketPorts === undefined) {
@@ -1094,7 +1091,7 @@ export class Miniflare {
10941091
const entryPort = maybeSocketPorts.get(SOCKET_ENTRY);
10951092
assert(entryPort !== undefined);
10961093
this.#runtimeEntryURL = new URL(
1097-
`${secure ? "https" : "http"}://${this.#accessibleHost}:${entryPort}`
1094+
`${secure ? "https" : "http"}://${accessibleHost}:${entryPort}`
10981095
);
10991096
if (previousEntryURL?.toString() !== this.#runtimeEntryURL.toString()) {
11001097
this.#runtimeClient = new Client(this.#runtimeEntryURL, {
@@ -1118,16 +1115,15 @@ export class Miniflare {
11181115
// Only log and trigger reload if there aren't pending updates
11191116
const ready = initial ? "Ready" : "Updated and ready";
11201117

1121-
const host = net.isIPv6(this.#host) ? `[${this.#host}]` : this.#host;
11221118
this.#log.info(
1123-
`${ready} on ${secure ? "https" : "http"}://${host}:${entryPort} `
1119+
`${ready} on ${secure ? "https" : "http"}://${urlSafeHost}:${entryPort}`
11241120
);
11251121

11261122
if (initial) {
11271123
let hosts: string[];
1128-
if (this.#host === "::" || this.#host === "*") {
1124+
if (host === "::" || host === "*") {
11291125
hosts = getAccessibleHosts(false);
1130-
} else if (this.#host === "0.0.0.0") {
1126+
} else if (host === "0.0.0.0") {
11311127
hosts = getAccessibleHosts(true);
11321128
} else {
11331129
hosts = [];

packages/miniflare/src/runtime/index.ts

Lines changed: 44 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,26 @@ const ControlMessageSchema = z.discriminatedUnion("event", [
2626
export const kInspectorSocket = Symbol("kInspectorSocket");
2727
export type SocketIdentifier = string | typeof kInspectorSocket;
2828

29+
export interface RuntimeOptions {
30+
entryHost: string;
31+
entryPort: number;
32+
loopbackPort: number;
33+
requiredSockets: SocketIdentifier[];
34+
inspectorPort?: number;
35+
verbose?: boolean;
36+
}
37+
2938
async function waitForPorts(
30-
requiredSockets: SocketIdentifier[],
3139
stream: Readable,
32-
options?: Abortable
40+
options: Abortable & Pick<RuntimeOptions, "requiredSockets">
3341
): Promise<Map<SocketIdentifier, number> | undefined> {
3442
if (options?.signal?.aborted) return;
3543
const lines = rl.createInterface(stream);
3644
// Calling `close()` will end the async iterator below and return undefined
3745
const abortListener = () => lines.close();
3846
options?.signal?.addEventListener("abort", abortListener, { once: true });
3947
// We're going to be mutating `sockets`, so shallow copy it
40-
requiredSockets = Array.from(requiredSockets);
48+
const requiredSockets = Array.from(options.requiredSockets);
4149
const socketPorts = new Map<SocketIdentifier, number>();
4250
try {
4351
for await (const line of lines) {
@@ -82,65 +90,52 @@ function pipeOutput(runtime: childProcess.ChildProcessWithoutNullStreams) {
8290
// runtime.stderr.pipe(process.stderr);
8391
}
8492

85-
export interface RuntimeOptions {
86-
entryHost: string;
87-
entryPort: number;
88-
loopbackPort: number;
89-
inspectorPort?: number;
90-
verbose?: boolean;
93+
function getRuntimeCommand() {
94+
return process.env.MINIFLARE_WORKERD_PATH ?? workerdPath;
9195
}
9296

93-
export class Runtime {
94-
readonly #command: string;
95-
96-
#process?: childProcess.ChildProcess;
97-
#processExitPromise?: Promise<void>;
98-
99-
constructor(private opts: RuntimeOptions) {
100-
this.#command = process.env.MINIFLARE_WORKERD_PATH ?? workerdPath;
97+
function getRuntimeArgs(options: RuntimeOptions) {
98+
const args: string[] = [
99+
"serve",
100+
// Required to use binary capnp config
101+
"--binary",
102+
// Required to use compatibility flags without a default-on date,
103+
// (e.g. "streams_enable_constructors"), see https://github.com/cloudflare/workerd/pull/21
104+
"--experimental",
105+
`--socket-addr=${SOCKET_ENTRY}=${options.entryHost}:${options.entryPort}`,
106+
`--external-addr=${SERVICE_LOOPBACK}=localhost:${options.loopbackPort}`,
107+
// Configure extra pipe for receiving control messages (e.g. when ready)
108+
"--control-fd=3",
109+
// Read config from stdin
110+
"-",
111+
];
112+
if (options.inspectorPort !== undefined) {
113+
// Required to enable the V8 inspector
114+
args.push(`--inspector-addr=localhost:${options.inspectorPort}`);
115+
}
116+
if (options.verbose) {
117+
args.push("--verbose");
101118
}
102119

103-
get #args() {
104-
const args: string[] = [
105-
"serve",
106-
// Required to use binary capnp config
107-
"--binary",
108-
// Required to use compatibility flags without a default-on date,
109-
// (e.g. "streams_enable_constructors"), see https://github.com/cloudflare/workerd/pull/21
110-
"--experimental",
111-
`--socket-addr=${SOCKET_ENTRY}=${this.opts.entryHost}:${this.opts.entryPort}`,
112-
`--external-addr=${SERVICE_LOOPBACK}=localhost:${this.opts.loopbackPort}`,
113-
// Configure extra pipe for receiving control messages (e.g. when ready)
114-
"--control-fd=3",
115-
// Read config from stdin
116-
"-",
117-
];
118-
if (this.opts.inspectorPort !== undefined) {
119-
// Required to enable the V8 inspector
120-
args.push(`--inspector-addr=localhost:${this.opts.inspectorPort}`);
121-
}
122-
if (this.opts.verbose) {
123-
args.push("--verbose");
124-
}
120+
return args;
121+
}
125122

126-
return args;
127-
}
123+
export class Runtime {
124+
#process?: childProcess.ChildProcess;
125+
#processExitPromise?: Promise<void>;
128126

129127
async updateConfig(
130128
configBuffer: Buffer,
131-
requiredSockets: SocketIdentifier[],
132-
options?: Abortable & Partial<Pick<RuntimeOptions, "entryPort">>
129+
options: Abortable & RuntimeOptions
133130
): Promise<Map<SocketIdentifier, number /* port */> | undefined> {
134131
// 1. Stop existing process (if any) and wait for exit
135132
await this.dispose();
136133
// TODO: what happens if runtime crashes?
137134

138-
if (options?.entryPort !== undefined) {
139-
this.opts.entryPort = options.entryPort;
140-
}
141-
142135
// 2. Start new process
143-
const runtimeProcess = childProcess.spawn(this.#command, this.#args, {
136+
const command = getRuntimeCommand();
137+
const args = getRuntimeArgs(options);
138+
const runtimeProcess = childProcess.spawn(command, args, {
144139
stdio: ["pipe", "pipe", "pipe", "pipe"],
145140
env: process.env,
146141
});
@@ -156,7 +151,7 @@ export class Runtime {
156151
runtimeProcess.stdin.end();
157152

158153
// 4. Wait for sockets to start listening
159-
return waitForPorts(requiredSockets, controlPipe, options);
154+
return waitForPorts(controlPipe, options);
160155
}
161156

162157
dispose(): Awaitable<void> {

0 commit comments

Comments
 (0)