Skip to content

Commit 7cf90a5

Browse files
committed
refactor(@angular/ssr): drain node stream
1 parent 8fb9f96 commit 7cf90a5

File tree

1 file changed

+35
-12
lines changed

1 file changed

+35
-12
lines changed

packages/angular/ssr/node/src/response.ts

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
import type { ServerResponse } from 'node:http';
1010
import type { Http2ServerResponse } from 'node:http2';
11+
import type { ReadableStreamReadResult } from 'node:stream/web';
1112

1213
/**
1314
* Streams a web-standard `Response` into a Node.js `ServerResponse`
@@ -52,27 +53,49 @@ export async function writeResponseToNodeResponse(
5253

5354
try {
5455
const reader = body.getReader();
55-
56-
destination.on('close', () => {
57-
reader.cancel().catch((error) => {
56+
function cancelStream(error?: Error) {
57+
reader.cancel(error).catch(() => {
5858
// eslint-disable-next-line no-console
5959
console.error(
6060
`An error occurred while writing the response body for: ${destination.req.url}.`,
6161
error,
6262
);
6363
});
64-
});
65-
66-
// eslint-disable-next-line no-constant-condition
67-
while (true) {
68-
const { done, value } = await reader.read();
69-
if (done) {
70-
destination.end();
71-
break;
64+
if (error) {
65+
destination.destroy(error);
7266
}
67+
}
7368

74-
(destination as ServerResponse).write(value);
69+
function handleStream({
70+
done,
71+
value,
72+
}: ReadableStreamReadResult<Uint8Array>): void | Promise<void> {
73+
try {
74+
if (done) {
75+
// End the response
76+
destination.end();
77+
} else if ((destination as ServerResponse).write(value)) {
78+
// Continue reading recursively
79+
reader.read().then(handleStream, cancelStream);
80+
} else {
81+
// Wait for the drain event to continue reading
82+
destination.once('drain', () => reader.read().then(handleStream, cancelStream));
83+
}
84+
} catch (error) {
85+
cancelStream(error instanceof Error ? error : undefined);
86+
}
7587
}
88+
89+
destination.on('close', cancelStream);
90+
destination.on('error', cancelStream);
91+
reader.read().then(handleStream, cancelStream);
92+
93+
// Return a promise that resolves when the stream is closed
94+
return reader.closed.finally(() => {
95+
// cleanup listeners
96+
(destination as ServerResponse).off('close', cancelStream);
97+
(destination as ServerResponse).off('error', cancelStream);
98+
});
7699
} catch {
77100
destination.end('Internal server error.');
78101
}

0 commit comments

Comments
 (0)