Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Commit c575036

Browse files
committed
Allow workers to be accessed directly via dedicated sockets
We'd like this for Wrangler, so we can use the same Miniflare instance for both the regular proxy worker and the inspector proxy worker. The inspector proxy worker needs to run on its own port as `/json` and `/json/version` are well-known path names used by `chrome://inspect`'s service discovery. Otherwise, we'd be able to use `routes`. Note this is an unsafe option as it bypasses Miniflare's entry worker, so won't include any request logging/ pretty errors.
1 parent 5310694 commit c575036

File tree

5 files changed

+180
-37
lines changed

5 files changed

+180
-37
lines changed

packages/miniflare/src/index.ts

Lines changed: 105 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,10 @@ import {
5353
QueuesError,
5454
R2_PLUGIN_NAME,
5555
ReplaceWorkersTypes,
56+
SOCKET_ENTRY,
5657
SharedOptions,
5758
WorkerOptions,
59+
getDirectSocketName,
5860
getGlobalServices,
5961
kProxyNodeBinding,
6062
maybeGetSitesManifestModule,
@@ -77,6 +79,7 @@ import {
7779
RuntimeOptions,
7880
Service,
7981
Socket,
82+
SocketIdentifier,
8083
Worker_Binding,
8184
Worker_Module,
8285
serializeConfig,
@@ -98,6 +101,13 @@ import {
98101
} from "./workers";
99102
import { _formatZodError } from "./zod-format";
100103

104+
const DEFAULT_HOST = "127.0.0.1";
105+
function getAccessibleHost(host: string) {
106+
return host === "*" || host === "0.0.0.0" || host === "::"
107+
? "127.0.0.1"
108+
: host;
109+
}
110+
101111
// ===== `Miniflare` User Options =====
102112
export type MiniflareOptions = SharedOptions &
103113
(WorkerOptions | { workers: WorkerOptions[] });
@@ -534,6 +544,7 @@ export class Miniflare {
534544
#runtime?: Runtime;
535545
#removeRuntimeExitHook?: () => void;
536546
#runtimeEntryURL?: URL;
547+
#socketPorts?: Map<SocketIdentifier, number>;
537548
#runtimeClient?: Client;
538549
#proxyClient?: ProxyClient;
539550

@@ -575,11 +586,11 @@ export class Miniflare {
575586
}
576587

577588
this.#log = this.#sharedOpts.core.log ?? new NoOpLog();
578-
this.#host = this.#sharedOpts.core.host ?? "127.0.0.1";
579-
this.#accessibleHost =
580-
this.#host === "*" || this.#host === "0.0.0.0" || this.#host === "::"
581-
? "127.0.0.1"
582-
: this.#host;
589+
this.#host = this.#sharedOpts.core.host ?? DEFAULT_HOST;
590+
// TODO: maybe remove `#accessibleHost` field, and just get whenever
591+
// constructing entry URL, then extract constructing entry URL into
592+
// function used `getUnsafeGetDirectURL()` too?
593+
this.#accessibleHost = getAccessibleHost(this.#host);
583594

584595
if (net.isIPv6(this.#accessibleHost)) {
585596
this.#accessibleHost = `[${this.#accessibleHost}]`;
@@ -965,6 +976,20 @@ export class Miniflare {
965976
}
966977
}
967978
}
979+
980+
// Allow additional sockets to be opened directly to specific workers,
981+
// bypassing Miniflare's entry worker.
982+
let { unsafeDirectHost, unsafeDirectPort } = workerOpts.core;
983+
if (unsafeDirectHost !== undefined || unsafeDirectPort !== undefined) {
984+
unsafeDirectHost ??= DEFAULT_HOST;
985+
unsafeDirectPort ??= 0;
986+
sockets.push({
987+
name: getDirectSocketName(i),
988+
address: `${unsafeDirectHost}:${unsafeDirectPort}`,
989+
service: { name: getUserServiceName(workerName) },
990+
http: {},
991+
});
992+
}
968993
}
969994

970995
// For testing proxy client serialisation, add an API that just returns its
@@ -1027,28 +1052,46 @@ export class Miniflare {
10271052
assert(this.#runtime !== undefined);
10281053
const config = await this.#assembleConfig();
10291054
const configBuffer = serializeConfig(config);
1030-
const maybePort = await this.#runtime.updateConfig(configBuffer, {
1031-
signal: this.#disposeController.signal,
1032-
entryPort: maybeApply(parseInt, this.#runtimeEntryURL?.port),
1033-
});
1055+
1056+
// Get all socket names we expect to get ports for
1057+
assert(config.sockets !== undefined);
1058+
const requiredSockets: SocketIdentifier[] = config.sockets.map(
1059+
({ name }) => {
1060+
assert(name !== undefined);
1061+
return name;
1062+
}
1063+
);
1064+
1065+
const maybeSocketPorts = await this.#runtime.updateConfig(
1066+
configBuffer,
1067+
requiredSockets,
1068+
{
1069+
signal: this.#disposeController.signal,
1070+
entryPort: maybeApply(parseInt, this.#runtimeEntryURL?.port),
1071+
}
1072+
);
10341073
if (this.#disposeController.signal.aborted) return;
1035-
if (maybePort === undefined) {
1074+
if (maybeSocketPorts === undefined) {
10361075
throw new MiniflareCoreError(
10371076
"ERR_RUNTIME_FAILURE",
10381077
"The Workers runtime failed to start. " +
10391078
"There is likely additional logging output above."
10401079
);
10411080
}
1081+
// Note: `updateConfig()` doesn't resolve until ports for all required
1082+
// sockets have been recorded. At this point, `maybeSocketPorts` contains
1083+
// all of `requiredSockets` as keys.
1084+
this.#socketPorts = maybeSocketPorts;
10421085

10431086
const entrySocket = config.sockets?.[0];
10441087
const secure = entrySocket !== undefined && "https" in entrySocket;
1045-
1046-
// noinspection HttpUrlsUsage
1047-
const previousEntry = this.#runtimeEntryURL;
1088+
const previousEntryURL = this.#runtimeEntryURL;
1089+
const entryPort = maybeSocketPorts.get(SOCKET_ENTRY);
1090+
assert(entryPort !== undefined);
10481091
this.#runtimeEntryURL = new URL(
1049-
`${secure ? "https" : "http"}://${this.#accessibleHost}:${maybePort}`
1092+
`${secure ? "https" : "http"}://${this.#accessibleHost}:${entryPort}`
10501093
);
1051-
if (previousEntry?.toString() !== this.#runtimeEntryURL.toString()) {
1094+
if (previousEntryURL?.toString() !== this.#runtimeEntryURL.toString()) {
10521095
this.#runtimeClient = new Client(this.#runtimeEntryURL, {
10531096
connect: { rejectUnauthorized: false },
10541097
});
@@ -1072,7 +1115,7 @@ export class Miniflare {
10721115

10731116
const host = net.isIPv6(this.#host) ? `[${this.#host}]` : this.#host;
10741117
this.#log.info(
1075-
`${ready} on ${secure ? "https" : "http"}://${host}:${maybePort} `
1118+
`${ready} on ${secure ? "https" : "http"}://${host}:${entryPort} `
10761119
);
10771120

10781121
if (initial) {
@@ -1086,7 +1129,7 @@ export class Miniflare {
10861129
}
10871130

10881131
for (const h of hosts) {
1089-
this.#log.info(`- ${secure ? "https" : "http"}://${h}:${maybePort}`);
1132+
this.#log.info(`- ${secure ? "https" : "http"}://${h}:${entryPort}`);
10901133
}
10911134
}
10921135

@@ -1116,6 +1159,35 @@ export class Miniflare {
11161159
return this.#waitForReady();
11171160
}
11181161

1162+
async unsafeGetDirectURL(workerName?: string) {
1163+
this.#checkDisposed();
1164+
await this.ready;
1165+
1166+
// Get worker index and options from name, defaulting to entrypoint
1167+
const workerIndex = this.#findAndAssertWorkerIndex(workerName);
1168+
const workerOpts = this.#workerOpts[workerIndex];
1169+
1170+
// Try to get direct access port for worker
1171+
const socketName = getDirectSocketName(workerIndex);
1172+
// `#socketPorts` is assigned in `#assembleAndUpdateConfig()`, which is
1173+
// called by `#init()`, and `ready` doesn't resolve until `#init()` returns.
1174+
assert(this.#socketPorts !== undefined);
1175+
const maybePort = this.#socketPorts.get(socketName);
1176+
if (maybePort === undefined) {
1177+
const friendlyWorkerName =
1178+
workerName === undefined ? "entrypoint" : JSON.stringify(workerName);
1179+
throw new TypeError(
1180+
`Direct access disabled in ${friendlyWorkerName} worker`
1181+
);
1182+
}
1183+
1184+
// Construct accessible URL from configured host and port
1185+
const host = workerOpts.core.unsafeDirectHost ?? DEFAULT_HOST;
1186+
const accessibleHost = getAccessibleHost(host);
1187+
// noinspection HttpUrlsUsage
1188+
return new URL(`http://${accessibleHost}:${maybePort}`);
1189+
}
1190+
11191191
#checkDisposed() {
11201192
if (this.#disposeController.signal.aborted) {
11211193
throw new MiniflareCoreError(
@@ -1212,24 +1284,29 @@ export class Miniflare {
12121284
return this.#proxyClient;
12131285
}
12141286

1215-
async getBindings<Env = Record<string, unknown>>(
1216-
workerName?: string
1217-
): Promise<Env> {
1218-
const bindings: Record<string, unknown> = {};
1219-
const proxyClient = await this._getProxyClient();
1220-
1221-
// Find worker by name, defaulting to entrypoint worker if none specified
1222-
let workerOpts: PluginWorkerOptions | undefined;
1287+
#findAndAssertWorkerIndex(workerName?: string): number {
12231288
if (workerName === undefined) {
1224-
workerOpts = this.#workerOpts[0];
1289+
return 0;
12251290
} else {
1226-
workerOpts = this.#workerOpts.find(
1291+
const index = this.#workerOpts.findIndex(
12271292
({ core }) => (core.name ?? "") === workerName
12281293
);
1229-
if (workerOpts === undefined) {
1294+
if (index === -1) {
12301295
throw new TypeError(`${JSON.stringify(workerName)} worker not found`);
12311296
}
1297+
return index;
12321298
}
1299+
}
1300+
1301+
async getBindings<Env = Record<string, unknown>>(
1302+
workerName?: string
1303+
): Promise<Env> {
1304+
const bindings: Record<string, unknown> = {};
1305+
const proxyClient = await this._getProxyClient();
1306+
1307+
// Find worker by name, defaulting to entrypoint worker if none specified
1308+
const workerIndex = this.#findAndAssertWorkerIndex(workerName);
1309+
const workerOpts = this.#workerOpts[workerIndex];
12331310
workerName = workerOpts.core.name ?? "";
12341311

12351312
// Populate bindings from each plugin

packages/miniflare/src/plugins/core/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,8 @@ const CoreOptionsSchemaInput = z.intersection(
103103
fetchMock: z.instanceof(MockAgent).optional(),
104104

105105
unsafeEphemeralDurableObjects: z.boolean().optional(),
106+
unsafeDirectHost: z.string().optional(),
107+
unsafeDirectPort: z.number().optional(),
106108
})
107109
);
108110
export const CoreOptionsSchema = CoreOptionsSchemaInput.transform((value) => {

packages/miniflare/src/plugins/shared/constants.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@ import {
77
import { CoreBindings, SharedBindings } from "../../workers";
88

99
export const SOCKET_ENTRY = "entry";
10+
const SOCKET_DIRECT_PREFIX = "direct";
11+
12+
export function getDirectSocketName(workerIndex: number) {
13+
return `${SOCKET_DIRECT_PREFIX}:${workerIndex}`;
14+
}
1015

1116
// Service looping back to Miniflare's Node.js process (for storage, etc)
1217
export const SERVICE_LOOPBACK = "loopback";

packages/miniflare/src/runtime/index.ts

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,35 @@ const ControlMessageSchema = z.object({
1717
port: z.number(),
1818
});
1919

20-
async function waitForPort(
21-
socket: string,
20+
export type SocketIdentifier = string;
21+
22+
async function waitForPorts(
23+
requiredSockets: SocketIdentifier[],
2224
stream: Readable,
2325
options?: Abortable
24-
): Promise<number | undefined> {
26+
): Promise<Map<SocketIdentifier, number> | undefined> {
2527
if (options?.signal?.aborted) return;
2628
const lines = rl.createInterface(stream);
2729
// Calling `close()` will end the async iterator below and return undefined
2830
const abortListener = () => lines.close();
2931
options?.signal?.addEventListener("abort", abortListener, { once: true });
32+
// We're going to be mutating `sockets`, so shallow copy it
33+
requiredSockets = Array.from(requiredSockets);
34+
const socketPorts = new Map<SocketIdentifier, number>();
3035
try {
3136
for await (const line of lines) {
3237
const message = ControlMessageSchema.safeParse(JSON.parse(line));
33-
if (message.success && message.data.socket === socket) {
34-
return message.data.port;
35-
}
38+
// If this was an unrecognised control message, ignore it
39+
if (!message.success) continue;
40+
const socket = message.data.socket;
41+
const index = requiredSockets.indexOf(socket);
42+
// If this wasn't a required socket, ignore it
43+
if (index === -1) continue;
44+
// Record the port of this socket
45+
socketPorts.set(socket, message.data.port);
46+
// Satisfy the requirement, if there are no more, return the ports map
47+
requiredSockets.splice(index, 1);
48+
if (requiredSockets.length === 0) return socketPorts;
3649
}
3750
} finally {
3851
options?.signal?.removeEventListener("abort", abortListener);
@@ -106,8 +119,9 @@ export class Runtime {
106119

107120
async updateConfig(
108121
configBuffer: Buffer,
122+
requiredSockets: SocketIdentifier[],
109123
options?: Abortable & Partial<Pick<RuntimeOptions, "entryPort">>
110-
): Promise<number | undefined> {
124+
): Promise<Map<SocketIdentifier, number /* port */> | undefined> {
111125
// 1. Stop existing process (if any) and wait for exit
112126
await this.dispose();
113127
// TODO: what happens if runtime crashes?
@@ -132,8 +146,8 @@ export class Runtime {
132146
runtimeProcess.stdin.write(configBuffer);
133147
runtimeProcess.stdin.end();
134148

135-
// 4. Wait for socket to start listening
136-
return waitForPort(SOCKET_ENTRY, controlPipe, options);
149+
// 4. Wait for sockets to start listening
150+
return waitForPorts(requiredSockets, controlPipe, options);
137151
}
138152

139153
dispose(): Awaitable<void> {

packages/miniflare/test/index.spec.ts

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -837,6 +837,51 @@ test("Miniflare: getBindings() and friends return bindings for different workers
837837
await t.throwsAsync(() => mf.getQueueProducer("BUCKET", "c"), expectations);
838838
});
839839

840+
test("Miniflare: allows direct access to workers", async (t) => {
841+
const mf = new Miniflare({
842+
workers: [
843+
{
844+
name: "a",
845+
script: `addEventListener("fetch", (e) => e.respondWith(new Response("a")))`,
846+
unsafeDirectPort: 0,
847+
},
848+
{
849+
routes: ["*/*"],
850+
script: `addEventListener("fetch", (e) => e.respondWith(new Response("b")))`,
851+
},
852+
{
853+
name: "c",
854+
script: `addEventListener("fetch", (e) => e.respondWith(new Response("c")))`,
855+
unsafeDirectHost: "127.0.0.1",
856+
},
857+
],
858+
});
859+
t.teardown(() => mf.dispose());
860+
861+
// Check can access workers as usual
862+
let res = await mf.dispatchFetch("http://localhost/");
863+
t.is(await res.text(), "b");
864+
865+
// Check can access workers directly
866+
// (`undefined` worker name should default to entrypoint, not unnamed worker)
867+
const aURL = await mf.unsafeGetDirectURL();
868+
const cURL = await mf.unsafeGetDirectURL("c");
869+
res = await fetch(aURL);
870+
t.is(await res.text(), "a");
871+
res = await fetch(cURL);
872+
t.is(await res.text(), "c");
873+
874+
// Can can only access configured for direct access
875+
await t.throwsAsync(mf.unsafeGetDirectURL("d"), {
876+
instanceOf: TypeError,
877+
message: '"d" worker not found',
878+
});
879+
await t.throwsAsync(mf.unsafeGetDirectURL(""), {
880+
instanceOf: TypeError,
881+
message: 'Direct access disabled in "" worker',
882+
});
883+
});
884+
840885
// Only test `MINIFLARE_WORKERD_PATH` on Unix. The test uses a Node.js script
841886
// with a shebang, directly as the replacement `workerd` binary, which won't
842887
// work on Windows.

0 commit comments

Comments
 (0)