Skip to content

Commit 949dd23

Browse files
committed
Handle backpressure correctly in response stream
Fixes #76
1 parent 2d5932d commit 949dd23

File tree

3 files changed

+77
-1
lines changed

3 files changed

+77
-1
lines changed

packages/node-fetch-server/CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22

33
This is the changelog for [`node-fetch-server`](https://github.com/mjackson/remix-the-web/tree/main/packages/node-fetch-server). It follows [semantic versioning](https://semver.org/).
44

5+
## HEAD
6+
7+
- Handle backpressure correctly in response streaming
8+
59
## v0.7.0 (2025-06-06)
610

711
- Add `/src` to npm package, so "go to definition" goes to the actual source

packages/node-fetch-server/src/lib/request-listener.test.ts

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,74 @@ describe('createRequestListener', () => {
205205
listener(req, res);
206206
});
207207
});
208+
209+
it('handles backpressure when writing response chunks', async () => {
210+
await new Promise<void>((resolve) => {
211+
let handler: FetchHandler = async () => {
212+
// Create a response with multiple chunks
213+
let chunks = ['chunk1', 'chunk2', 'chunk3', 'chunk4', 'chunk5'];
214+
let body = new ReadableStream({
215+
async start(controller) {
216+
for (let chunk of chunks) {
217+
controller.enqueue(new TextEncoder().encode(chunk));
218+
}
219+
controller.close();
220+
},
221+
});
222+
return new Response(body);
223+
};
224+
225+
let listener = createRequestListener(handler);
226+
assert.ok(listener);
227+
228+
let req = createMockRequest();
229+
230+
// Create a custom mock response that properly extends EventEmitter
231+
let writtenChunks: Uint8Array[] = [];
232+
let writeCallCount = 0;
233+
let drainListenerCalled = false;
234+
235+
let writable = new stream.Writable();
236+
let res = Object.assign(writable, {
237+
req,
238+
writeHead() {},
239+
write(chunk: Uint8Array) {
240+
writtenChunks.push(chunk);
241+
writeCallCount++;
242+
243+
// Simulate backpressure on chunks 2 and 4
244+
if (writeCallCount === 2 || writeCallCount === 4) {
245+
setTimeout(() => {
246+
writable.emit('drain');
247+
}, 0);
248+
return false; // Indicate buffer is full
249+
}
250+
return true; // Indicate buffer has space
251+
},
252+
end() {
253+
assert.equal(writtenChunks.length, 5);
254+
assert.equal(writeCallCount, 5);
255+
256+
// Verify backpressure was handled
257+
assert.ok(drainListenerCalled, 'drain listener should have been registered');
258+
259+
// Verify chunk contents
260+
let receivedText = writtenChunks.map((chunk) => new TextDecoder().decode(chunk)).join('');
261+
assert.equal(receivedText, 'chunk1chunk2chunk3chunk4chunk5');
262+
263+
resolve();
264+
},
265+
once(event: string, callback: () => void) {
266+
if (event === 'drain') {
267+
drainListenerCalled = true;
268+
}
269+
stream.Writable.prototype.once.call(writable, event, callback);
270+
},
271+
}) as unknown as http.ServerResponse;
272+
273+
listener(req, res);
274+
});
275+
});
208276
});
209277

210278
function createMockRequest({

packages/node-fetch-server/src/lib/request-listener.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,11 @@ export async function sendResponse(
217217
if (response.body != null && res.req.method !== 'HEAD') {
218218
for await (let chunk of readStream(response.body)) {
219219
// @ts-expect-error - Node typings for http2 require a 2nd parameter to write but it's optional
220-
res.write(chunk);
220+
if (res.write(chunk) === false) {
221+
await new Promise<void>((resolve) => {
222+
res.once('drain', resolve);
223+
});
224+
}
221225
}
222226
}
223227

0 commit comments

Comments
 (0)