Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 33 additions & 16 deletions packages/react-on-rails-pro-node-renderer/src/shared/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import cluster from 'cluster';
import path from 'path';
import { MultipartFile } from '@fastify/multipart';
import { createWriteStream, ensureDir, move, MoveOptions, copy, CopyOptions } from 'fs-extra';
import { Readable, pipeline, PassThrough } from 'stream';
import { Readable, Writable, pipeline, PassThrough } from 'stream';
import { promisify } from 'util';
import * as errorReporter from './errorReporter.js';
import { getConfig } from './configBuilder.js';
Expand Down Expand Up @@ -128,26 +128,43 @@ export const isReadableStream = (stream: unknown): stream is Readable =>
typeof (stream as Readable).pipe === 'function' &&
typeof (stream as Readable).read === 'function';

export const handleStreamError = (stream: Readable, onError: (error: Error) => void) => {
const newStreamAfterHandlingError = new PassThrough();
// Propagate errors for logging/reporting, but don't terminate — error is not the end of the stream.
// Non-fatal errors (e.g., emitError for throwJsErrors) emit 'error' without destroying
// the stream, and React may continue rendering.
stream.on('error', (error) => {
onError(error);
});
/**
* Pipes source to destination with proper 'close' event handling.
*
* Node.js `pipe()` does NOT end the destination when the source is destroyed —
* it silently unpipes, leaving the destination open forever. This function fills
* that gap by listening for the 'close' event (which fires after both normal
* 'end' and `destroy()`) and ending the destination if needed.
*
* An optional `onError` callback provides observability for source stream errors
* without forwarding them to the destination (which would break the pipe).
*/
export const safePipe = <T extends Writable>(
source: Readable,
destination: T,
onError?: (err: Error) => void,
): T => {
if (onError) {
// Propagate errors for logging/reporting, but don't terminate — error is not the
// end of the stream. Non-fatal errors (e.g., emitError for throwJsErrors) emit
// 'error' without destroying the stream, and React may continue rendering.
source.on('error', onError);
}
// 'close' fires after both normal 'end' and destroy().
// On normal end, pipe() already forwards 'end' to the PassThrough — this is a no-op.
// On destroy, pipe() unpipes but does NOT end the PassThrough — we do it here.
stream.on('close', () => {
if (!newStreamAfterHandlingError.writableEnded) {
newStreamAfterHandlingError.end();
// On normal end, pipe() already forwards 'end' to the destination — this is a no-op.
// On destroy, pipe() unpipes but does NOT end the destination — we do it here.
source.on('close', () => {
if (!destination.writableEnded) {
destination.end();
}
});
stream.pipe(newStreamAfterHandlingError);
return newStreamAfterHandlingError;
source.pipe(destination);
return destination;
};

export const handleStreamError = (stream: Readable, onError: (error: Error) => void) =>
safePipe(stream, new PassThrough(), onError);

export const isErrorRenderResult = (result: RenderResult): result is { exceptionMessage: string } =>
typeof result === 'object' && !isReadableStream(result) && 'exceptionMessage' in result;

Expand Down
39 changes: 23 additions & 16 deletions packages/react-on-rails-pro/src/injectRSCPayload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@
* https://github.com/shakacode/react_on_rails/blob/master/REACT-ON-RAILS-PRO-LICENSE.md
*/

import { PassThrough, Readable } from 'stream';
import { PassThrough } from 'stream';
import { finished } from 'stream/promises';
import { PipeableOrReadableStream } from 'react-on-rails/types';
import { createRSCPayloadKey } from './utils.ts';
import RSCRequestTracker from './RSCRequestTracker.ts';
import safePipe from './safePipe.ts';

// In JavaScript, when an escape sequence with a backslash (\) is followed by a character
// that isn't a recognized escape character, the backslash is ignored, and the character
Expand Down Expand Up @@ -78,16 +79,7 @@ export default function injectRSCPayload(
domNodeId: string | undefined,
) {
const htmlStream = new PassThrough();
pipeableHtmlStream.pipe(htmlStream);
// When the source is destroyed, pipe() unpipes but does NOT end htmlStream.
// Listen for 'close' to ensure htmlStream ends, which triggers the cleanup chain.
if (typeof (pipeableHtmlStream as Readable).on === 'function') {
(pipeableHtmlStream as Readable).on('close', () => {
if (!htmlStream.writableEnded) {
htmlStream.end();
}
});
}
safePipe(pipeableHtmlStream, htmlStream);
const decoder = new TextDecoder();
let rscPromise: Promise<void> | null = null;

Expand Down Expand Up @@ -267,7 +259,8 @@ export default function injectRSCPayload(

// Wait for HTML stream to complete, then wait for all RSC promises
await finished(htmlStream).then(() => Promise.all(rscPromises));
} catch {
} catch (e) {
resultStream.emit('error', e instanceof Error ? e : new Error(String(e)));
endResultStream();
}
};
Expand Down Expand Up @@ -297,10 +290,21 @@ export default function injectRSCPayload(
});

/**
* Prevent unhandled error crash. Error alone is not the end of the stream —
* termination is handled by the 'close' event below.
* Report errors on htmlStream by emitting them on resultStream, where they
* propagate to handleStreamError → errorReporter in the node renderer.
* Error alone is not the end of the stream — termination is handled by the
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Double error emission risk

When htmlStream emits an error, finished(htmlStream) (called inside startRSC) will also reject with that same error. This means resultStream.emit('error', ...) gets called twice for a single htmlStream error — once here and once in the startRSC catch block — resulting in duplicate Sentry/errorReporter reports.

Consider removing this handler and relying solely on the startRSC catch block, or deduplicate using a seen-error guard:

let lastReportedErr: Error | undefined;
htmlStream.on('error', (err) => {
  const error = err instanceof Error ? err : new Error(String(err));
  if (error !== lastReportedErr) {
    lastReportedErr = error;
    resultStream.emit('error', error);
  }
});

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — this was a real double-emission bug. Fixed in eb254a4.

The htmlStream.on('error') handler now only emits on resultStream when startRSC hasn't been called yet (!rscPromise). Once startRSC is running, finished(htmlStream) in the startRSC catch block handles reporting — the handler still exists to prevent Node.js from throwing on unhandled 'error' event but no longer double-reports.

This correctly covers both edge cases:

  • Error before first data chunk (startRSC not running): htmlStream.on('error') reports it
  • Error after data starts flowing (startRSC running): finished() rejection → catch block reports it

* 'close' event below.
*
* We only emit here when startRSC hasn't been called yet. Once startRSC is
Comment on lines +294 to +298
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The !rscPromise guard correctly prevents double-emission in the common case, but there is a subtle window: if htmlStream emits error in the same microtask flush as the finished(htmlStream) rejection (both triggered by the same underlying stream error), the error event fires synchronously while rscPromise is already set, so this handler is skipped — good. However if the ordering is reversed (error event fires before finished() settles), the event handler emits here and then the startRSC catch block also emits. This is unlikely in practice but possible.

A boolean flag shared between the two code paths would be more robust:

let errorForwarded = false;
htmlStream.on('error', (err) => {
  if (!rscPromise && !errorForwarded) {
    errorForwarded = true;
    resultStream.emit('error', err instanceof Error ? err : new Error(String(err)));
  }
});

And in the startRSC catch block:

} catch (e) {
  if (!errorForwarded) {
    errorForwarded = true;
    resultStream.emit('error', e instanceof Error ? e : new Error(String(e)));
  }
  endResultStream();
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "subtle window" described here cannot occur due to Node.js's synchronous event handling guarantees:

  1. `rscPromise = startRSC()` is set synchronously when the first data chunk arrives (line 286)
  2. Inside `startRSC`, `finished(htmlStream)` registers its internal event listeners synchronously before the first `await` (line 261). There is no async gap between `rscPromise` being set and `finished()` listening.
  3. When `htmlStream` emits `'error'`, all registered listeners fire synchronously in registration order. Both our handler and `finished()`'s internal handler execute in the same synchronous call stack.

So the invariant holds: if `rscPromise` is set (our handler skips), `finished()` is guaranteed to be listening and will handle the error in the catch block. If `rscPromise` is not set, `startRSC` hasn't been called, so `finished()` isn't listening, and only our handler reports.

There is no ordering reversal possible — both the check (`!rscPromise`) and `finished()`'s listener registration happen synchronously before any event loop tick.

* running, finished(htmlStream) inside startRSC rejects on error, and the
* catch block there handles reporting — emitting here too would cause the
* same error to be reported twice.
*/
htmlStream.on('error', () => {});
htmlStream.on('error', (err) => {
if (!rscPromise) {
resultStream.emit('error', err instanceof Error ? err : new Error(String(err)));
}
});

/**
* 'close' fires after both normal 'end' and destroy().
Expand Down Expand Up @@ -339,7 +343,10 @@ export default function injectRSCPayload(
.finally(() => {
rscRequestTracker.clear();
})
.catch(() => endResultStream());
.catch((e: unknown) => {
resultStream.emit('error', e instanceof Error ? e : new Error(String(e)));
endResultStream();
});
});

return resultStream;
Expand Down
65 changes: 65 additions & 0 deletions packages/react-on-rails-pro/src/safePipe.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (c) 2025 Shakacode LLC
*
* This file is NOT licensed under the MIT (open source) license.
* It is part of the React on Rails Pro offering and is licensed separately.
*
* Unauthorized copying, modification, distribution, or use of this file,
* via any medium, is strictly prohibited without a valid license agreement
* from Shakacode LLC.
*
* For licensing terms, please see:
* https://github.com/shakacode/react_on_rails/blob/master/REACT-ON-RAILS-PRO-LICENSE.md
*/

import { Readable, Writable } from 'stream';
import { PipeableOrReadableStream } from 'react-on-rails/types';

/**
* Pipes source to destination with proper 'close' event handling.
*
* Node.js `pipe()` does NOT end the destination when the source is destroyed —
* it silently unpipes, leaving the destination open forever. This function fills
* that gap by listening for the 'close' event (which fires after both normal
* 'end' and `destroy()`) and ending the destination if the source didn't end
* normally.
*
* An optional `onError` callback provides observability for source stream errors
* without forwarding them to the destination (which would break the pipe).
*
* NOTE: `PipeableOrReadableStream` can be either a React `PipeableStream`
* (which only has `.pipe()` and `.abort()`, no `.on()`) or a Node.js
* `ReadableStream`. The 'close' and 'error' listeners are only attached when
* the source supports `.on()`.
*
* @param source - The source stream to pipe from
* @param destination - The destination stream to pipe into
* @param onError - Optional callback for source stream errors (for reporting, not forwarding)
* @returns The destination stream (for chaining)
*/
export default function safePipe<T extends Writable>(
source: PipeableOrReadableStream,
destination: T,
onError?: (err: Error) => void,
): T {
source.pipe(destination);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Listeners are attached after pipe(), whereas utils.ts's safePipe attaches them before calling pipe(). Prefer the before-pipe order to eliminate any theoretical race with a synchronous 'error' emission during piping:

Suggested change
source.pipe(destination);
if (typeof (source as Readable).on === 'function') {
const readableSource = source as Readable;
if (onError) {
readableSource.on('error', onError);
}
// 'close' fires after both normal 'end' and destroy().
// On normal end, pipe() already forwards 'end' to the destination — this is a no-op.
// On destroy, pipe() unpipes but does NOT end the destination — we do it here.
readableSource.on('close', () => {
if (!destination.writableEnded) {
destination.end();
}
});
}
source.pipe(destination);


if (typeof (source as Readable).on === 'function') {
const readableSource = source as Readable;

if (onError) {
readableSource.on('error', onError);
}

// 'close' fires after both normal 'end' and destroy().
// On normal end, pipe() already forwards 'end' to the destination — this is a no-op.
// On destroy, pipe() unpipes but does NOT end the destination — we do it here.
readableSource.on('close', () => {
if (!destination.writableEnded) {
destination.end();
}
});
}

return destination;
}
29 changes: 14 additions & 15 deletions packages/react-on-rails-pro/src/streamingUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import {
import * as ComponentRegistry from './ComponentRegistry.ts';
import PostSSRHookTracker from './PostSSRHookTracker.ts';
import RSCRequestTracker from './RSCRequestTracker.ts';
import safePipe from './safePipe.ts';

type BufferedEvent = {
event: 'data' | 'error' | 'end';
Expand Down Expand Up @@ -133,28 +134,26 @@ export const transformRenderStreamChunksToResultObject = (renderState: StreamRen
},
});

let pipedStream: PipeableOrReadableStream | null = null;
const pipeToTransform = (pipeableStream: PipeableOrReadableStream) => {
pipeableStream.pipe(transformStream);
// 'close' fires after both normal 'end' and destroy().
// On normal end, pipe() already forwards 'end' to transformStream — this is a no-op.
// On destroy, pipe() unpipes but does NOT end transformStream — we do it here.
if (typeof (pipeableStream as Readable).on === 'function') {
(pipeableStream as Readable).on('close', () => {
if (!transformStream.writableEnded) {
transformStream.end();
}
});
}
pipedStream = pipeableStream;
};
// We need to wrap the transformStream in a Readable stream to properly handle errors:
// 1. If we returned transformStream directly, we couldn't emit errors into it externally
// 2. If an error is emitted into the transformStream, it would cause the render to fail
// 3. By wrapping in Readable.from(), we can explicitly emit errors into the readableStream without affecting the transformStream
// Note: Readable.from can merge multiple chunks into a single chunk, so we need to ensure that we can separate them later
const { stream: readableStream, emitError } = bufferStream(transformStream);

let pipedStream: PipeableOrReadableStream | null = null;
const pipeToTransform = (pipeableStream: PipeableOrReadableStream) => {
// safePipe handles the 'close' event to end transformStream when the source is destroyed.
// The onError callback forwards source errors to readableStream (via emitError), which
// propagates them to handleStreamError → errorReporter in the node renderer. Emitting on
// the source (not the destination) keeps the pipe intact so data continues flowing for
// non-fatal errors.
safePipe(pipeableStream, transformStream, (err) => {
emitError(err);
});
pipedStream = pipeableStream;
};

const writeChunk = (chunk: string) => transformStream.write(chunk);
const endStream = () => {
transformStream.end();
Expand Down
5 changes: 3 additions & 2 deletions packages/react-on-rails-pro/src/transformRSCNodeStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
* https://github.com/shakacode/react_on_rails/blob/master/REACT-ON-RAILS-PRO-LICENSE.md
*/

import { Transform } from 'stream';
import { Readable, Transform } from 'stream';
import safePipe from './safePipe.ts';

/**
* Transforms an RSC Node.js stream for server-side processing.
Expand Down Expand Up @@ -55,5 +56,5 @@ export default function transformRSCStream(stream: NodeJS.ReadableStream): NodeJ
},
});

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This correctly adds the 'close'-event fix (the missed pipe site from PR #2407), but without an onError callback, errors emitted by the RSC node stream will still be silently dropped — they won't reach errorReporter.

If error visibility is desired here (consistent with the goal of this PR), consider:

return safePipe(stream as Readable, htmlExtractor, (err) => {
  // forward to caller or log — RSC stream errors should be observable
  htmlExtractor.emit('error', err);
});

Or at minimum document why silent error suppression is intentional for this specific pipe.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The safePipe change here was specifically to fix the close event gap (the pipe() destination-not-ended-on-destroy issue that PR #2407 missed at this site). Error reporting for the RSC source stream is a pre-existing concern that this PR doesn't make worse.

Regarding the suggested fix — emitting on the destination (htmlExtractor) would actually break the pipe. In Node.js v20+, pipe() attaches an error handler on the destination that calls source.unpipe(destination), so destination.emit('error', err) would disconnect the pipe and stop data flow.

The proper way to add error reporting here would require changes to the caller (getReactServerComponent.server.ts) to provide an error callback, which is outside this PR's scope. The Transform's own try/catch in its transform callback already handles parse errors (line 53-54). If the source stream is destroyed, safePipe ensures htmlExtractor is properly ended via the close handler — React's createFromNodeStream sees the stream end and handles the incomplete payload via its error boundary.

Happy to open a follow-up issue for RSC source stream error reporting if desired.

return stream.pipe(htmlExtractor);
return safePipe(stream as Readable, htmlExtractor);
}
Loading