Skip to content

Commit c7ec7ff

Browse files
Fix silent error absorption in RSC payload injection and streaming pipes
Errors in injectRSCPayload (htmlStream errors, startRSC catch, rscPromise catch) were silently swallowed, preventing them from reaching errorReporter and Sentry. This change emits those errors on resultStream so they propagate through handleStreamError to the error reporting infrastructure without breaking the pipe or causing Fastify to return error responses. Also introduces a shared safePipe utility that replaces the scattered pipe+close boilerplate from PR #2407 with a single function that handles the Node.js pipe() gap (destination not ended on source destroy) and provides an optional onError callback for non-fatal error reporting. Closes #2450 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent a1de77e commit c7ec7ff

File tree

5 files changed

+131
-49
lines changed

5 files changed

+131
-49
lines changed

packages/react-on-rails-pro-node-renderer/src/shared/utils.ts

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import cluster from 'cluster';
22
import path from 'path';
33
import { MultipartFile } from '@fastify/multipart';
44
import { createWriteStream, ensureDir, move, MoveOptions, copy, CopyOptions } from 'fs-extra';
5-
import { Readable, pipeline, PassThrough } from 'stream';
5+
import { Readable, Writable, pipeline, PassThrough } from 'stream';
66
import { promisify } from 'util';
77
import * as errorReporter from './errorReporter.js';
88
import { getConfig } from './configBuilder.js';
@@ -128,26 +128,43 @@ export const isReadableStream = (stream: unknown): stream is Readable =>
128128
typeof (stream as Readable).pipe === 'function' &&
129129
typeof (stream as Readable).read === 'function';
130130

131-
export const handleStreamError = (stream: Readable, onError: (error: Error) => void) => {
132-
const newStreamAfterHandlingError = new PassThrough();
133-
// Propagate errors for logging/reporting, but don't terminate — error is not the end of the stream.
134-
// Non-fatal errors (e.g., emitError for throwJsErrors) emit 'error' without destroying
135-
// the stream, and React may continue rendering.
136-
stream.on('error', (error) => {
137-
onError(error);
138-
});
131+
/**
132+
* Pipes source to destination with proper 'close' event handling.
133+
*
134+
* Node.js `pipe()` does NOT end the destination when the source is destroyed —
135+
* it silently unpipes, leaving the destination open forever. This function fills
136+
* that gap by listening for the 'close' event (which fires after both normal
137+
* 'end' and `destroy()`) and ending the destination if needed.
138+
*
139+
* An optional `onError` callback provides observability for source stream errors
140+
* without forwarding them to the destination (which would break the pipe).
141+
*/
142+
export const safePipe = <T extends Writable>(
143+
source: Readable,
144+
destination: T,
145+
onError?: (err: Error) => void,
146+
): T => {
147+
if (onError) {
148+
// Propagate errors for logging/reporting, but don't terminate — error is not the
149+
// end of the stream. Non-fatal errors (e.g., emitError for throwJsErrors) emit
150+
// 'error' without destroying the stream, and React may continue rendering.
151+
source.on('error', onError);
152+
}
139153
// 'close' fires after both normal 'end' and destroy().
140-
// On normal end, pipe() already forwards 'end' to the PassThrough — this is a no-op.
141-
// On destroy, pipe() unpipes but does NOT end the PassThrough — we do it here.
142-
stream.on('close', () => {
143-
if (!newStreamAfterHandlingError.writableEnded) {
144-
newStreamAfterHandlingError.end();
154+
// On normal end, pipe() already forwards 'end' to the destination — this is a no-op.
155+
// On destroy, pipe() unpipes but does NOT end the destination — we do it here.
156+
source.on('close', () => {
157+
if (!destination.writableEnded) {
158+
destination.end();
145159
}
146160
});
147-
stream.pipe(newStreamAfterHandlingError);
148-
return newStreamAfterHandlingError;
161+
source.pipe(destination);
162+
return destination;
149163
};
150164

165+
export const handleStreamError = (stream: Readable, onError: (error: Error) => void) =>
166+
safePipe(stream, new PassThrough(), onError);
167+
151168
export const isErrorRenderResult = (result: RenderResult): result is { exceptionMessage: string } =>
152169
typeof result === 'object' && !isReadableStream(result) && 'exceptionMessage' in result;
153170

packages/react-on-rails-pro/src/injectRSCPayload.ts

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,12 @@
1212
* https://github.com/shakacode/react_on_rails/blob/master/REACT-ON-RAILS-PRO-LICENSE.md
1313
*/
1414

15-
import { PassThrough, Readable } from 'stream';
15+
import { PassThrough } from 'stream';
1616
import { finished } from 'stream/promises';
1717
import { PipeableOrReadableStream } from 'react-on-rails/types';
1818
import { createRSCPayloadKey } from './utils.ts';
1919
import RSCRequestTracker from './RSCRequestTracker.ts';
20+
import safePipe from './safePipe.ts';
2021

2122
// In JavaScript, when an escape sequence with a backslash (\) is followed by a character
2223
// that isn't a recognized escape character, the backslash is ignored, and the character
@@ -78,16 +79,7 @@ export default function injectRSCPayload(
7879
domNodeId: string | undefined,
7980
) {
8081
const htmlStream = new PassThrough();
81-
pipeableHtmlStream.pipe(htmlStream);
82-
// When the source is destroyed, pipe() unpipes but does NOT end htmlStream.
83-
// Listen for 'close' to ensure htmlStream ends, which triggers the cleanup chain.
84-
if (typeof (pipeableHtmlStream as Readable).on === 'function') {
85-
(pipeableHtmlStream as Readable).on('close', () => {
86-
if (!htmlStream.writableEnded) {
87-
htmlStream.end();
88-
}
89-
});
90-
}
82+
safePipe(pipeableHtmlStream, htmlStream);
9183
const decoder = new TextDecoder();
9284
let rscPromise: Promise<void> | null = null;
9385

@@ -267,7 +259,8 @@ export default function injectRSCPayload(
267259

268260
// Wait for HTML stream to complete, then wait for all RSC promises
269261
await finished(htmlStream).then(() => Promise.all(rscPromises));
270-
} catch {
262+
} catch (e) {
263+
resultStream.emit('error', e instanceof Error ? e : new Error(String(e)));
271264
endResultStream();
272265
}
273266
};
@@ -297,10 +290,14 @@ export default function injectRSCPayload(
297290
});
298291

299292
/**
300-
* Prevent unhandled error crash. Error alone is not the end of the stream —
301-
* termination is handled by the 'close' event below.
293+
* Report errors on htmlStream by emitting them on resultStream, where they
294+
* propagate to handleStreamError → errorReporter in the node renderer.
295+
* Error alone is not the end of the stream — termination is handled by the
296+
* 'close' event below.
302297
*/
303-
htmlStream.on('error', () => {});
298+
htmlStream.on('error', (err) => {
299+
resultStream.emit('error', err instanceof Error ? err : new Error(String(err)));
300+
});
304301

305302
/**
306303
* 'close' fires after both normal 'end' and destroy().
@@ -339,7 +336,10 @@ export default function injectRSCPayload(
339336
.finally(() => {
340337
rscRequestTracker.clear();
341338
})
342-
.catch(() => endResultStream());
339+
.catch((e: unknown) => {
340+
resultStream.emit('error', e instanceof Error ? e : new Error(String(e)));
341+
endResultStream();
342+
});
343343
});
344344

345345
return resultStream;
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright (c) 2025 Shakacode LLC
3+
*
4+
* This file is NOT licensed under the MIT (open source) license.
5+
* It is part of the React on Rails Pro offering and is licensed separately.
6+
*
7+
* Unauthorized copying, modification, distribution, or use of this file,
8+
* via any medium, is strictly prohibited without a valid license agreement
9+
* from Shakacode LLC.
10+
*
11+
* For licensing terms, please see:
12+
* https://github.com/shakacode/react_on_rails/blob/master/REACT-ON-RAILS-PRO-LICENSE.md
13+
*/
14+
15+
import { Readable, Writable } from 'stream';
16+
import { PipeableOrReadableStream } from 'react-on-rails/types';
17+
18+
/**
19+
* Pipes source to destination with proper 'close' event handling.
20+
*
21+
* Node.js `pipe()` does NOT end the destination when the source is destroyed —
22+
* it silently unpipes, leaving the destination open forever. This function fills
23+
* that gap by listening for the 'close' event (which fires after both normal
24+
* 'end' and `destroy()`) and ending the destination if the source didn't end
25+
* normally.
26+
*
27+
* An optional `onError` callback provides observability for source stream errors
28+
* without forwarding them to the destination (which would break the pipe).
29+
*
30+
* NOTE: `PipeableOrReadableStream` can be either a React `PipeableStream`
31+
* (which only has `.pipe()` and `.abort()`, no `.on()`) or a Node.js
32+
* `ReadableStream`. The 'close' and 'error' listeners are only attached when
33+
* the source supports `.on()`.
34+
*
35+
* @param source - The source stream to pipe from
36+
* @param destination - The destination stream to pipe into
37+
* @param onError - Optional callback for source stream errors (for reporting, not forwarding)
38+
* @returns The destination stream (for chaining)
39+
*/
40+
export default function safePipe<T extends Writable>(
41+
source: PipeableOrReadableStream,
42+
destination: T,
43+
onError?: (err: Error) => void,
44+
): T {
45+
source.pipe(destination);
46+
47+
if (typeof (source as Readable).on === 'function') {
48+
const readableSource = source as Readable;
49+
50+
if (onError) {
51+
readableSource.on('error', onError);
52+
}
53+
54+
// 'close' fires after both normal 'end' and destroy().
55+
// On normal end, pipe() already forwards 'end' to the destination — this is a no-op.
56+
// On destroy, pipe() unpipes but does NOT end the destination — we do it here.
57+
readableSource.on('close', () => {
58+
if (!destination.writableEnded) {
59+
destination.end();
60+
}
61+
});
62+
}
63+
64+
return destination;
65+
}

packages/react-on-rails-pro/src/streamingUtils.ts

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import {
3131
import * as ComponentRegistry from './ComponentRegistry.ts';
3232
import PostSSRHookTracker from './PostSSRHookTracker.ts';
3333
import RSCRequestTracker from './RSCRequestTracker.ts';
34+
import safePipe from './safePipe.ts';
3435

3536
type BufferedEvent = {
3637
event: 'data' | 'error' | 'end';
@@ -133,28 +134,26 @@ export const transformRenderStreamChunksToResultObject = (renderState: StreamRen
133134
},
134135
});
135136

136-
let pipedStream: PipeableOrReadableStream | null = null;
137-
const pipeToTransform = (pipeableStream: PipeableOrReadableStream) => {
138-
pipeableStream.pipe(transformStream);
139-
// 'close' fires after both normal 'end' and destroy().
140-
// On normal end, pipe() already forwards 'end' to transformStream — this is a no-op.
141-
// On destroy, pipe() unpipes but does NOT end transformStream — we do it here.
142-
if (typeof (pipeableStream as Readable).on === 'function') {
143-
(pipeableStream as Readable).on('close', () => {
144-
if (!transformStream.writableEnded) {
145-
transformStream.end();
146-
}
147-
});
148-
}
149-
pipedStream = pipeableStream;
150-
};
151137
// We need to wrap the transformStream in a Readable stream to properly handle errors:
152138
// 1. If we returned transformStream directly, we couldn't emit errors into it externally
153139
// 2. If an error is emitted into the transformStream, it would cause the render to fail
154140
// 3. By wrapping in Readable.from(), we can explicitly emit errors into the readableStream without affecting the transformStream
155141
// Note: Readable.from can merge multiple chunks into a single chunk, so we need to ensure that we can separate them later
156142
const { stream: readableStream, emitError } = bufferStream(transformStream);
157143

144+
let pipedStream: PipeableOrReadableStream | null = null;
145+
const pipeToTransform = (pipeableStream: PipeableOrReadableStream) => {
146+
// safePipe handles the 'close' event to end transformStream when the source is destroyed.
147+
// The onError callback forwards source errors to readableStream (via emitError), which
148+
// propagates them to handleStreamError → errorReporter in the node renderer. Emitting on
149+
// the source (not the destination) keeps the pipe intact so data continues flowing for
150+
// non-fatal errors.
151+
safePipe(pipeableStream, transformStream, (err) => {
152+
emitError(err);
153+
});
154+
pipedStream = pipeableStream;
155+
};
156+
158157
const writeChunk = (chunk: string) => transformStream.write(chunk);
159158
const endStream = () => {
160159
transformStream.end();

packages/react-on-rails-pro/src/transformRSCNodeStream.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@
1212
* https://github.com/shakacode/react_on_rails/blob/master/REACT-ON-RAILS-PRO-LICENSE.md
1313
*/
1414

15-
import { Transform } from 'stream';
15+
import { Readable, Transform } from 'stream';
16+
import safePipe from './safePipe.ts';
1617

1718
/**
1819
* Transforms an RSC Node.js stream for server-side processing.
@@ -55,5 +56,5 @@ export default function transformRSCStream(stream: NodeJS.ReadableStream): NodeJ
5556
},
5657
});
5758

58-
return stream.pipe(htmlExtractor);
59+
return safePipe(stream as Readable, htmlExtractor);
5960
}

0 commit comments

Comments
 (0)