Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Commit 2ded669

Browse files
committed
Assert bodies consumed
This change throws an uncaught exception if we forget to consume a response body in a test. Unconsumed bodies may cause `undici` to hang or throw socket errors.
1 parent 5fb7544 commit 2ded669

File tree

7 files changed

+37
-7
lines changed

7 files changed

+37
-7
lines changed

ava.config.mjs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,7 @@ export default {
1616
compile: false,
1717
rewritePaths,
1818
},
19+
environmentVariables: {
20+
MINIFLARE_ASSERT_BODIES_CONSUMED: "true",
21+
},
1922
};

packages/miniflare/src/index.ts

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1091,6 +1091,25 @@ export class Miniflare {
10911091
throw reviveError(this.#workerSrcOpts, caught);
10921092
}
10931093

1094+
if (
1095+
process.env.MINIFLARE_ASSERT_BODIES_CONSUMED !== undefined &&
1096+
response.body !== null
1097+
) {
1098+
// Throw an uncaught exception if the body from this response isn't
1099+
// consumed "immediately". `undici` may hang or throw socket errors if we
1100+
// don't remember to do this:
1101+
// https://github.com/nodejs/undici/issues/583#issuecomment-1577468249
1102+
const originalLimit = Error.stackTraceLimit;
1103+
Error.stackTraceLimit = Infinity;
1104+
const error = new Error(
1105+
"`body` returned from `Miniflare#dispatchFetch()` not consumed immediately"
1106+
);
1107+
Error.stackTraceLimit = originalLimit;
1108+
setImmediate(() => {
1109+
if (!response.bodyUsed) throw error;
1110+
});
1111+
}
1112+
10941113
return response;
10951114
};
10961115

@@ -1109,7 +1128,8 @@ export class Miniflare {
11091128
async _getProxyClient(): Promise<ProxyClient> {
11101129
this.#checkDisposed();
11111130
await this.ready;
1112-
return this.#proxyClient!;
1131+
assert(this.#proxyClient !== undefined);
1132+
return this.#proxyClient;
11131133
}
11141134

11151135
async getBindings<Env = Record<string, unknown>>(

packages/miniflare/src/plugins/core/proxy/client.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,7 @@ class ProxyStubHandler<T extends object> implements ProxyHandler<T> {
285285
// Need to `.pipeThrough()` here otherwise we'll get
286286
// `TypeError: Response body object should not be disturbed or locked`
287287
// when trying to construct a `Response` with the stream.
288+
// TODO(soon): add support for MINIFLARE_ASSERT_BODIES_CONSUMED here
288289
unbufferedStream = rest.pipeThrough(new TransformStream());
289290
}
290291

packages/miniflare/src/plugins/core/proxy/fetch-sync.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ port.addEventListener("message", async (event) => {
6060
try {
6161
port.postMessage({ id, error });
6262
} catch {
63+
// If error failed to serialise, post simplified version
6364
port.postMessage({ id, error: new Error(String(error)) });
6465
}
6566
}
@@ -130,6 +131,7 @@ export class SynchronousFetcher {
130131
// synchronously fetch from internal Miniflare code (e.g. proxy server)
131132
throw reviveError([], caught);
132133
}
134+
// TODO(soon): add support for MINIFLARE_ASSERT_BODIES_CONSUMED here
133135
return { status, headers, body };
134136
} else {
135137
throw message.error;

packages/miniflare/test/index.spec.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -503,6 +503,7 @@ test("Miniflare: HTTPS fetches using browser CA certificates", async (t) => {
503503
t.teardown(() => mf.dispose());
504504
const res = await mf.dispatchFetch("http://localhost");
505505
t.true(res.ok);
506+
await res.arrayBuffer(); // (drain)
506507
});
507508

508509
test("Miniflare: accepts https requests", async (t) => {
@@ -522,6 +523,7 @@ test("Miniflare: accepts https requests", async (t) => {
522523

523524
const res = await mf.dispatchFetch("https://localhost");
524525
t.true(res.ok);
526+
await res.arrayBuffer(); // (drain)
525527

526528
t.assert(log.logs[0][1].startsWith("Ready on https://"));
527529
});

packages/miniflare/test/plugins/queues/index.spec.ts

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ test("flushes partial and full batches", async (t) => {
4141
} else if (url.pathname === "/batch") {
4242
await env.QUEUE.sendBatch(body);
4343
}
44-
return new Response();
44+
return new Response(null, { status: 204 });
4545
}
4646
}`,
4747
},
@@ -232,7 +232,7 @@ test("sends all structured cloneable types", async (t) => {
232232
await env.QUEUE.sendBatch(Object.entries(VALUES).map(
233233
([key, value]) => ({ body: { name: key, value } })
234234
));
235-
return new Response();
235+
return new Response(null, { status: 204 });
236236
},
237237
async queue(batch, env, ctx) {
238238
let error;
@@ -305,7 +305,7 @@ test("retries messages", async (t) => {
305305
const url = new URL(request.url);
306306
const body = await request.json();
307307
await env.QUEUE.sendBatch(body);
308-
return new Response();
308+
return new Response(null, { status: 204 });
309309
},
310310
async queue(batch, env, ctx) {
311311
const res = await env.RETRY_FILTER.fetch("http://localhost", {
@@ -532,7 +532,7 @@ test("moves to dead letter queue", async (t) => {
532532
const url = new URL(request.url);
533533
const body = await request.json();
534534
await env.BAD_QUEUE.sendBatch(body);
535-
return new Response();
535+
return new Response(null, { status: 204 });
536536
},
537537
async queue(batch, env, ctx) {
538538
const res = await env.RETRY_FILTER.fetch("http://localhost", {
@@ -638,7 +638,7 @@ test("operations permit strange queue names", async (t) => {
638638
async fetch(request, env, ctx) {
639639
await env.QUEUE.send("msg1");
640640
await env.QUEUE.sendBatch([{ body: "msg2" }]);
641-
return new Response();
641+
return new Response(null, { status: 204 });
642642
},
643643
async queue(batch, env, ctx) {
644644
await env.REPORTER.fetch("http://localhost", {
@@ -718,7 +718,8 @@ test("supports message contentTypes", async (t) => {
718718
},
719719
};`,
720720
});
721-
await mf.dispatchFetch("http://localhost");
721+
const res = await mf.dispatchFetch("http://localhost");
722+
await res.arrayBuffer(); // (drain)
722723
timers.timestamp += 1000;
723724
await timers.waitForTasks();
724725
const batch = await promise;

types/env.d.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,6 @@ declare namespace NodeJS {
22
export interface ProcessEnv {
33
NODE_ENV?: string;
44
NODE_EXTRA_CA_CERTS?: string;
5+
MINIFLARE_ASSERT_BODIES_CONSUMED?: string;
56
}
67
}

0 commit comments

Comments
 (0)