Skip to content

Commit 1479fd0

Browse files
authored
fix(miniflare): support WebSocket proxying to workerd (#10273)
1 parent 911fd8d commit 1479fd0

File tree

3 files changed

+156
-7
lines changed

3 files changed

+156
-7
lines changed

.changeset/light-garlics-count.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
---
2+
"miniflare": patch
3+
---
4+
5+
fix: support WebSocket proxying to workerd
6+
7+
The dev registry proxy server now correctly handles WebSocket upgrade requests and
8+
tunnels bidirectional frames between the workerd processes. Previously,
9+
handshakes would fail due to missing upgrade logic.

packages/miniflare/src/shared/dev-registry.worker.ts

Lines changed: 54 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,9 @@ class ProxyServer {
103103
serviceProxyTarget.entrypoint
104104
);
105105

106-
this.handleProxy(req, res, address);
106+
this.handleProxy(req, res, address, (callback) =>
107+
this.subscribe(serviceProxyTarget.service, callback)
108+
);
107109
return;
108110
}
109111

@@ -120,7 +122,9 @@ class ProxyServer {
120122
return;
121123
}
122124

123-
this.handleProxy(req, res, address);
125+
this.handleProxy(req, res, address, (callback) =>
126+
this.subscribe(doProxyTarget.scriptName, callback)
127+
);
124128
return;
125129
}
126130

@@ -242,7 +246,8 @@ class ProxyServer {
242246
private handleProxy(
243247
req: http.IncomingMessage,
244248
res: http.ServerResponse,
245-
target: ProxyAddress
249+
target: ProxyAddress,
250+
onTargetUpdated: (callback: () => void) => void
246251
) {
247252
const headers = { ...req.headers };
248253
let path = target.path;
@@ -271,17 +276,15 @@ class ProxyServer {
271276
path,
272277
headers,
273278
};
279+
const upstream = http.request(options);
274280

275-
const upstream = http.request(options, (upRes) => {
281+
upstream.on("response", (upRes) => {
276282
// Relay status and headers back to the original client
277283
res.writeHead(upRes.statusCode ?? 500, upRes.headers);
278284
// Pipe the response body
279285
upRes.pipe(res);
280286
});
281287

282-
// Pipe the client request body to the upstream
283-
req.pipe(upstream);
284-
285288
upstream.on("error", (error) => {
286289
log.error(
287290
new Error(
@@ -294,6 +297,50 @@ class ProxyServer {
294297
if (!res.headersSent) res.writeHead(502);
295298
res.end("Bad Gateway");
296299
});
300+
301+
if (req.headers.upgrade?.toLowerCase() === "websocket") {
302+
upstream.on("upgrade", (upRes, socket, head) => {
303+
// For WebSocket upgrades, we need to respond directly on the original request socket
304+
if (req.socket && req.socket.writable) {
305+
// Build and write complete HTTP response header
306+
const statusLine = `HTTP/1.1 ${upRes.statusCode ?? 101} ${upRes.statusMessage ?? "Switching Protocols"}\r\n`;
307+
let headersString = "";
308+
for (let i = 0; i < upRes.rawHeaders.length; i += 2) {
309+
headersString += `${upRes.rawHeaders[i]}: ${upRes.rawHeaders[i + 1]}\r\n`;
310+
}
311+
312+
req.socket.write(statusLine + headersString + "\r\n");
313+
314+
// Write any buffered data
315+
if (head && head.length > 0) {
316+
req.socket.write(head);
317+
}
318+
319+
// Pipe bidirectional WebSocket data
320+
socket.pipe(req.socket, { end: false });
321+
req.socket.pipe(socket, { end: false });
322+
323+
// Handle connection cleanup
324+
socket.on("error", () => req.socket.destroy());
325+
req.socket.on("error", () => socket.destroy());
326+
socket.on("close", () => req.socket.destroy());
327+
req.socket.on("close", () => socket.destroy());
328+
329+
// Close the socket when the target address is updated
330+
onTargetUpdated(() => {
331+
socket.end();
332+
});
333+
} else {
334+
socket.end();
335+
}
336+
});
337+
338+
// End the request to trigger the upgrade
339+
upstream.end();
340+
} else {
341+
// Pipe the client request body to the upstream for regular HTTP requests
342+
req.pipe(upstream);
343+
}
297344
}
298345

299346
/**

packages/miniflare/test/dev-registry.spec.ts

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,99 @@ test("DevRegistry: fetch to module worker", async (t) => {
126126
});
127127
});
128128

129+
test("DevRegistry: WebSocket upgrade to module worker", async (t) => {
130+
const unsafeDevRegistryPath = await useTmp(t);
131+
const local = new Miniflare({
132+
name: "local-worker",
133+
unsafeDevRegistryPath,
134+
serviceBindings: {
135+
SERVICE: {
136+
name: "remote-worker",
137+
},
138+
},
139+
compatibilityFlags: ["experimental"],
140+
modules: true,
141+
script: `
142+
export default {
143+
async fetch(request, env, ctx) {
144+
const wsResponse = await env.SERVICE.fetch(request.url, {
145+
headers: { Upgrade: "websocket" }
146+
});
147+
148+
if (wsResponse.webSocket) {
149+
wsResponse.webSocket.accept();
150+
151+
// Test bidirectional communication
152+
wsResponse.webSocket.send("ping");
153+
154+
const messagePromise = new Promise((resolve) => {
155+
wsResponse.webSocket.addEventListener("message", (event) => {
156+
resolve(event.data);
157+
});
158+
});
159+
160+
const response = await messagePromise;
161+
162+
return new Response(\`WebSocket communication successful: \${response}\`, {
163+
status: 200,
164+
});
165+
}
166+
167+
return new Response("WebSocket upgrade failed", {
168+
status: 500,
169+
});
170+
}
171+
}
172+
`,
173+
});
174+
t.teardown(() => local.dispose());
175+
176+
const remote = new Miniflare({
177+
name: "remote-worker",
178+
unsafeDevRegistryPath,
179+
compatibilityFlags: ["experimental"],
180+
modules: true,
181+
script: `
182+
export default {
183+
async fetch(request, env, ctx) {
184+
// Handle WebSocket upgrade requests
185+
if (request.headers.get("Upgrade") === "websocket") {
186+
const [server, client] = Object.values(new WebSocketPair());
187+
server.accept();
188+
189+
server.addEventListener("message", (event) => {
190+
// Echo back with a response to test bidirectional communication
191+
if (event.data === "ping") {
192+
server.send("pong");
193+
}
194+
});
195+
196+
return new Response(null, { status: 101, webSocket: client });
197+
}
198+
199+
// This test only focuses on WebSocket, no HTTP handling needed
200+
return new Response("Not a WebSocket request", { status: 400 });
201+
}
202+
}
203+
`,
204+
unsafeDirectSockets: [
205+
{
206+
entrypoint: undefined,
207+
proxy: true,
208+
},
209+
],
210+
});
211+
t.teardown(() => remote.dispose());
212+
213+
await remote.ready;
214+
await waitUntil(t, async (t) => {
215+
const res = await local.dispatchFetch("http://example.com");
216+
const result = await res.text();
217+
t.is(result, "WebSocket communication successful: pong");
218+
t.is(res.status, 200);
219+
});
220+
});
221+
129222
test("DevRegistry: RPC to default entrypoint", async (t) => {
130223
const unsafeDevRegistryPath = await useTmp(t);
131224
const local = new Miniflare({

0 commit comments

Comments
 (0)