Skip to content

Commit e5e7457

Browse files
Implement stream buffering to safely handle stream events and errors
1 parent db44411 commit e5e7457

File tree

1 file changed

+74
-2
lines changed

1 file changed

+74
-2
lines changed

node_package/src/streamServerRenderedReactComponent.ts

Lines changed: 74 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,79 @@ const stringToStream = (str: string): Readable => {
1717
return stream;
1818
};
1919

20+
type BufferdEvent = {
21+
event: 'data' | 'error' | 'end';
22+
data: unknown;
23+
}
24+
25+
/**
26+
* Creates a new Readable stream that safely buffers all events from the input stream until reading begins.
27+
*
28+
* This function solves two important problems:
29+
* 1. Error handling: If an error occurs on the source stream before error listeners are attached,
30+
* it would normally crash the process. This wrapper buffers error events until reading begins,
31+
* ensuring errors are properly handled once listeners are ready.
32+
* 2. Event ordering: All events (data, error, end) are buffered and replayed in the exact order
33+
* they were received, maintaining the correct sequence even if events occur before reading starts.
34+
*
35+
* @param stream - The source Readable stream to buffer
36+
* @returns {Object} An object containing:
37+
* - stream: A new Readable stream that will buffer and replay all events
38+
* - emitError: A function to manually emit errors into the stream
39+
*/
40+
const bufferStream = (stream: Readable) => {
41+
const bufferedEvents: BufferdEvent[] = [];
42+
let startedReading = false;
43+
44+
const listeners = (['data', 'error', 'end'] as const).map(event => {
45+
const listener = (data: unknown) => {
46+
if (!startedReading) {
47+
bufferedEvents.push({ event, data });
48+
}
49+
};
50+
stream.on(event, listener);
51+
return { event, listener };
52+
});
53+
54+
const bufferedStream = new Readable({
55+
read() {
56+
if (startedReading) return;
57+
startedReading = true;
58+
59+
// Remove initial listeners
60+
listeners.forEach(({ event, listener }) => stream.off(event, listener));
61+
const handleEvent = ({ event, data }: BufferdEvent) => {
62+
if (event === 'data') {
63+
this.push(data);
64+
} else if (event === 'error') {
65+
this.emit('error', data);
66+
} else {
67+
this.push(null);
68+
}
69+
};
70+
71+
// Replay buffered events
72+
bufferedEvents.forEach(handleEvent);
73+
74+
// Attach new listeners for future events
75+
(['data', 'error', 'end'] as const).forEach(event => {
76+
stream.on(event, (data: unknown) => handleEvent({ event, data }));
77+
});
78+
}
79+
});
80+
81+
return {
82+
stream: bufferedStream,
83+
emitError: (error: unknown) => {
84+
if (startedReading) {
85+
stream.emit('error', error);
86+
} else {
87+
bufferedEvents.push({ event: 'error', data: error });
88+
}
89+
},
90+
}
91+
};
92+
2093
export const transformRenderStreamChunksToResultObject = (renderState: StreamRenderState) => {
2194
const consoleHistory = console.history;
2295
let previouslyReplayedConsoleMessages = 0;
@@ -44,10 +117,9 @@ export const transformRenderStreamChunksToResultObject = (renderState: StreamRen
44117
// 2. If an error is emitted into the transformStream, it would cause the render to fail
45118
// 3. By wrapping in Readable.from(), we can explicitly emit errors into the readableStream without affecting the transformStream
46119
// Note: Readable.from can merge multiple chunks into a single chunk, so we need to ensure that we can separate them later
47-
const readableStream = Readable.from(transformStream);
120+
const { stream: readableStream, emitError } = bufferStream(transformStream);
48121

49122
const writeChunk = (chunk: string) => transformStream.write(chunk);
50-
const emitError = (error: unknown) => readableStream.emit('error', error);
51123
const endStream = () => {
52124
transformStream.end();
53125
pipedStream?.abort();

0 commit comments

Comments
 (0)