Skip to content
Open
90 changes: 59 additions & 31 deletions react_on_rails_pro/app/helpers/react_on_rails_pro_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,11 @@ def stream_react_component(component_name, options = {})
# Because setting prerender to false is equivalent to calling react_component with prerender: false
options[:prerender] = true
options = options.merge(immediate_hydration: true) unless options.key?(:immediate_hydration)
run_stream_inside_fiber do

# Extract streaming-specific callback
on_complete = options.delete(:on_complete)

run_stream_inside_fiber(on_complete: on_complete) do
internal_stream_react_component(component_name, options)
end
end
Expand Down Expand Up @@ -185,7 +189,11 @@ def rsc_payload_react_component(component_name, options = {})
# rsc_payload_react_component doesn't have the prerender option
# Because setting prerender to false will not do anything
options[:prerender] = true
run_stream_inside_fiber do

# Extract streaming-specific callback
on_complete = options.delete(:on_complete)

run_stream_inside_fiber(on_complete: on_complete) do
internal_rsc_payload_react_component(component_name, options)
end
end
Expand Down Expand Up @@ -246,30 +254,29 @@ def handle_stream_cache_hit(component_name, raw_options, auto_load_bundle, cache
load_pack_for_generated_component(component_name, render_options)

initial_result, *rest_chunks = cached_chunks
hit_fiber = Fiber.new do
rest_chunks.each { |chunk| Fiber.yield(chunk) }
nil

# Enqueue remaining chunks asynchronously
@async_barrier.async do
rest_chunks.each { |chunk| @main_output_queue.enqueue(chunk) }
end
@rorp_rendering_fibers << hit_fiber

# Return first chunk directly
initial_result
end

def handle_stream_cache_miss(component_name, raw_options, auto_load_bundle, view_cache_key, &block)
# Kick off the normal streaming helper to get the initial result and the original fiber
initial_result = render_stream_component_with_props(component_name, raw_options, auto_load_bundle, &block)
original_fiber = @rorp_rendering_fibers.pop

buffered_chunks = [initial_result]
wrapper_fiber = Fiber.new do
while (chunk = original_fiber.resume)
buffered_chunks << chunk
Fiber.yield(chunk)
end
Rails.cache.write(view_cache_key, buffered_chunks, raw_options[:cache_options] || {})
nil
end
@rorp_rendering_fibers << wrapper_fiber
initial_result
cache_aware_options = raw_options.merge(
on_complete: lambda { |chunks|
Rails.cache.write(view_cache_key, chunks, raw_options[:cache_options] || {})
}
)

render_stream_component_with_props(
component_name,
cache_aware_options,
auto_load_bundle,
&block
)
end

def render_stream_component_with_props(component_name, raw_options, auto_load_bundle)
Expand All @@ -291,25 +298,46 @@ def check_caching_options!(raw_options, block)
raise ReactOnRailsPro::Error, "Option 'cache_key' is required for React on Rails caching"
end

def run_stream_inside_fiber
if @rorp_rendering_fibers.nil?
def run_stream_inside_fiber(on_complete:)
require "async/variable"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to do it only here and not at the beginning of the file?


if @async_barrier.nil?
raise ReactOnRails::Error,
"You must call stream_view_containing_react_components to render the view containing the react component"
end

rendering_fiber = Fiber.new do
# Create a variable to hold the first chunk for synchronous return
first_chunk_var = Async::Variable.new
all_chunks = [] if on_complete # Only collect if callback provided

# Start an async task on the barrier to stream all chunks
@async_barrier.async do
stream = yield
is_first = true

stream.each_chunk do |chunk|
Fiber.yield chunk
all_chunks << chunk if on_complete # Collect for callback

if is_first
# Store first chunk in variable for synchronous access
first_chunk_var.value = chunk
is_first = false
else
# Enqueue remaining chunks to main output queue
@main_output_queue.enqueue(chunk)
end
end
end

@rorp_rendering_fibers << rendering_fiber
# Handle case where stream has no chunks
first_chunk_var.value = nil if is_first

# Call callback with all chunks when streaming completes
on_complete&.call(all_chunks)
end

# return the first chunk of the fiber
# It contains the initial html of the component
# all updates will be appended to the stream sent to browser
rendering_fiber.resume
# Wait for and return the first chunk (blocking)
first_chunk_var.wait
first_chunk_var.value
end

def internal_stream_react_component(component_name, options = {})
Expand Down
74 changes: 24 additions & 50 deletions react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,73 +31,47 @@ module Stream
#
# @see ReactOnRails::Helper#stream_react_component
def stream_view_containing_react_components(template:, close_stream_at_end: true, **render_options)
@rorp_rendering_fibers = []
template_string = render_to_string(template: template, **render_options)
# View may contain extra newlines, chunk already contains a newline
# Having multiple newlines between chunks causes hydration errors
# So we strip extra newlines from the template string and add a single newline
response.stream.write(template_string)

begin
drain_streams_concurrently
ensure
response.stream.close if close_stream_at_end
end
end

private

def drain_streams_concurrently
require "async"
require "async/barrier"
require "async/limited_queue"

return if @rorp_rendering_fibers.empty?

Sync do |parent|
# To avoid memory bloat, we use a limited queue to buffer chunks in memory.
Sync do |parent_task|
# Initialize async primitives for concurrent component streaming
@async_barrier = Async::Barrier.new
buffer_size = ReactOnRailsPro.configuration.concurrent_component_streaming_buffer_size
queue = Async::LimitedQueue.new(buffer_size)
@main_output_queue = Async::LimitedQueue.new(buffer_size)

writer = build_writer_task(parent: parent, queue: queue)
tasks = build_producer_tasks(parent: parent, queue: queue)
# Render template - components will start streaming immediately
template_string = render_to_string(template: template, **render_options)
# View may contain extra newlines, chunk already contains a newline
# Having multiple newlines between chunks causes hydration errors
# So we strip extra newlines from the template string and add a single newline
response.stream.write(template_string)

# This structure ensures that even if a producer task fails, we always
# signal the writer to stop and then wait for it to finish draining
# any remaining items from the queue before propagating the error.
begin
tasks.each(&:wait)
drain_streams_concurrently(parent_task)
ensure
# `close` signals end-of-stream; when writer tries to dequeue, it will get nil, so it will exit.
queue.close
writer.wait
response.stream.close if close_stream_at_end
end
end
end

def build_producer_tasks(parent:, queue:)
@rorp_rendering_fibers.each_with_index.map do |fiber, idx|
parent.async do
loop do
chunk = fiber.resume
break unless chunk
private

# Will be blocked if the queue is full until a chunk is dequeued
queue.enqueue([idx, chunk])
end
def drain_streams_concurrently(parent_task)
writing_task = parent_task.async do
# Drain all remaining chunks from the queue to the response stream
while (chunk = @main_output_queue.dequeue)
response.stream.write(chunk)
end
end
end

def build_writer_task(parent:, queue:)
parent.async do
loop do
pair = queue.dequeue
break if pair.nil?
# Wait for all component streaming tasks to complete
@async_barrier.wait

_idx_from_queue, item = pair
response.stream.write(item)
end
end
# Close the queue to signal end of streaming
@main_output_queue.close
writing_task.wait
end
Comment on lines +61 to 75
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

Ensure queues and writer task are cleaned up even if a component task fails

drain_streams_concurrently assumes @async_barrier.wait completes normally before closing @main_output_queue and waiting on writing_task. If any async component task raises, @async_barrier.wait can raise as well, which means:

  • @main_output_queue.close won’t run, leaving the queue open.
  • writing_task may remain blocked in dequeue on an open, now-idle queue.

Depending on how Async::Barrier and task failure propagation behave in your exact async version, this could manifest as a hung streaming response or leaked task rather than a clean failure.

Consider wrapping the wait/close sequence so the queue is always closed and the writer task joined, even when @async_barrier.wait raises, e.g.:

def drain_streams_concurrently(parent_task)
  writing_task = parent_task.async do
    while (chunk = @main_output_queue.dequeue)
      response.stream.write(chunk)
    end
  end

  begin
    @async_barrier.wait
  ensure
    @main_output_queue.close
    writing_task.wait
  end
end

This keeps the steady-state behavior but guarantees termination of the writer loop in error paths.

To validate this, you can inject a failure into one component task (e.g., raise inside stream.each_chunk) and confirm the request doesn’t hang and all tasks complete.


I'll verify the review comment by examining the actual implementation and understanding the async behavior.Good! I found critical information. According to the documentation, Async::Barrier#wait will invoke Async::Task#wait on each waiting task, which may raise an error. This confirms the review concern. Now let me search the codebase to verify the actual implementation.

[/run_scripts]


🏁 Script executed:

#!/bin/bash
# Find the file and verify it exists
find . -name "stream.rb" -path "*/react_on_rails_pro/*" 2>/dev/null | head -5

Length of output: 62


[/run_scripts]


🏁 Script executed:

#!/bin/bash
# View the exact method and surrounding context
sed -n '50,85p' ./react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb

Length of output: 666


🏁 Script executed:

#!/bin/bash
# Search for error handling or rescue blocks in this file
grep -n -B2 -A2 "rescue\|ensure\|raise" ./react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb

Length of output: 160


🏁 Script executed:

#!/bin/bash
# Search for tests related to drain_streams_concurrently or error handling
find . -name "*.rb" -path "*/test/*" -o -path "*/spec/*" | xargs grep -l "drain_streams_concurrently\|stream.*fail\|async.*barrier" 2>/dev/null

Length of output: 59


[/run_scripts]


🏁 Script executed:

#!/bin/bash
# Examine the test file to see if error scenarios are covered
cat ./react_on_rails_pro/spec/react_on_rails_pro/stream_spec.rb

Length of output: 15492


🏁 Script executed:

#!/bin/bash
# Check if there are any other usages or dependencies on @async_barrier or @main_output_queue
grep -n "@async_barrier\|@main_output_queue" ./react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb

Length of output: 232


Ensure queues and writer task are cleaned up even if a component task fails

The concern is valid. If @async_barrier.wait raises an exception (propagated from a failed component task), the subsequent lines closing @main_output_queue and joining writing_task won't execute. This leaves the queue open and the writer task blocked on an idle queue, causing potential resource leaks and hung streaming responses. The outer begin/ensure block (lines 51–55) only closes response.stream and doesn't address internal state cleanup.

The proposed fix is correct and necessary:

def drain_streams_concurrently(parent_task)
  writing_task = parent_task.async do
    while (chunk = @main_output_queue.dequeue)
      response.stream.write(chunk)
    end
  end

  begin
    @async_barrier.wait
  ensure
    @main_output_queue.close
    writing_task.wait
  end
end

This ensures termination of the writer loop and proper task cleanup even in error paths.

🤖 Prompt for AI Agents
In react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb around lines 61
to 75, the current flow calls @async_barrier.wait and then closes
@main_output_queue and waits on writing_task, but if @async_barrier.wait raises
an exception the queue and writer are left open/blocked; wrap the
@async_barrier.wait call in a begin/ensure block so that in the ensure you
always call @main_output_queue.close and writing_task.wait to guarantee the
writer loop terminates and resources are cleaned up even when a component task
fails.

end
end
Loading
Loading