Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Commit fc7eddc

Browse files
committed
Catch loopback request body abort errors
When `workerd` sends a request to Miniflare's loopback server, we convert the incoming request to an `undici` `Request` object. Previously, we passed the incoming request object directly to the `new Request()` constructor. Internally, `undici` created an async iterator from this request body stream. Unfortunately, if `workerd` ever aborted one of these requests (e.g. disposing runtime with a pending `waitUntil`ed `fetch`), this abort error would be propagated to the async iterator, and cause an unhandled rejection. We now convert this incoming request to a `ReadableStream` ourselves, catching errors from this async iterator, gracefully closing the stream.
1 parent d850639 commit fc7eddc

File tree

1 file changed

+38
-1
lines changed

1 file changed

+38
-1
lines changed

packages/tre/src/index.ts

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import net from "net";
66
import os from "os";
77
import path from "path";
88
import { Duplex } from "stream";
9+
import { ReadableStream } from "stream/web";
910
import type {
1011
IncomingRequestCfProperties,
1112
RequestInitCfProperties,
@@ -221,6 +222,40 @@ async function writeResponse(response: Response, res: http.ServerResponse) {
221222
res.end();
222223
}
223224

225+
function safeReadableStreamFrom(iterable: AsyncIterable<Uint8Array>) {
226+
// Adapted from `undici`, catches errors from `next()` to avoid unhandled
227+
// rejections from aborted request body streams:
228+
// https://github.com/nodejs/undici/blob/dfaec78f7a29f07bb043f9006ed0ceb0d5220b55/lib/core/util.js#L369-L392
229+
let iterator: AsyncIterator<Uint8Array>;
230+
return new ReadableStream<Uint8Array>(
231+
{
232+
async start() {
233+
iterator = iterable[Symbol.asyncIterator]();
234+
},
235+
// @ts-expect-error `pull` may return anything
236+
async pull(controller): Promise<boolean> {
237+
try {
238+
const { done, value } = await iterator.next();
239+
if (done) {
240+
queueMicrotask(() => controller.close());
241+
} else {
242+
const buf = Buffer.isBuffer(value) ? value : Buffer.from(value);
243+
controller.enqueue(new Uint8Array(buf));
244+
}
245+
} catch {
246+
queueMicrotask(() => controller.close());
247+
}
248+
// @ts-expect-error `pull` may return anything
249+
return controller.desiredSize > 0;
250+
},
251+
async cancel() {
252+
await iterator.return?.();
253+
},
254+
},
255+
0
256+
);
257+
}
258+
224259
export class Miniflare {
225260
readonly #gatewayFactories: PluginGatewayFactories;
226261
readonly #routers: PluginRouters;
@@ -479,10 +514,12 @@ export class Miniflare {
479514
);
480515
headers.delete(HEADER_ORIGINAL_URL);
481516

517+
const noBody = req.method === "GET" || req.method === "HEAD";
518+
const body = noBody ? undefined : safeReadableStreamFrom(req);
482519
const request = new Request(url, {
483520
method: req.method,
484521
headers,
485-
body: req.method === "GET" || req.method === "HEAD" ? undefined : req,
522+
body,
486523
duplex: "half",
487524
cf,
488525
});

0 commit comments

Comments
 (0)