Skip to content
33 changes: 24 additions & 9 deletions react_on_rails_pro/app/helpers/react_on_rails_pro_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -292,24 +292,39 @@ def check_caching_options!(raw_options, block)
end

def run_stream_inside_fiber
if @rorp_rendering_fibers.nil?
require "async/variable"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to do it only here and not at the beginning of the file?


if @async_barrier.nil?
raise ReactOnRails::Error,
"You must call stream_view_containing_react_components to render the view containing the react component"
end

rendering_fiber = Fiber.new do
# Create a variable to hold the first chunk for synchronous return
first_chunk_var = Async::Variable.new

# Start an async task on the barrier to stream all chunks
@async_barrier.async do
stream = yield
is_first = true

stream.each_chunk do |chunk|
Fiber.yield chunk
if is_first
# Store first chunk in variable for synchronous access
first_chunk_var.value = chunk
is_first = false
else
# Enqueue remaining chunks to main output queue
@main_output_queue.enqueue(chunk)
end
end
end

@rorp_rendering_fibers << rendering_fiber
# Handle case where stream has no chunks
first_chunk_var.value = nil if is_first
end

# return the first chunk of the fiber
# It contains the initial html of the component
# all updates will be appended to the stream sent to browser
rendering_fiber.resume
# Wait for and return the first chunk (blocking)
first_chunk_var.wait
first_chunk_var.value
end

def internal_stream_react_component(component_name, options = {})
Expand Down
73 changes: 22 additions & 51 deletions react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,72 +31,43 @@ module Stream
#
# @see ReactOnRails::Helper#stream_react_component
def stream_view_containing_react_components(template:, close_stream_at_end: true, **render_options)
@rorp_rendering_fibers = []
template_string = render_to_string(template: template, **render_options)
# View may contain extra newlines, chunk already contains a newline
# Having multiple newlines between chunks causes hydration errors
# So we strip extra newlines from the template string and add a single newline
response.stream.write(template_string)

begin
drain_streams_concurrently
ensure
response.stream.close if close_stream_at_end
end
end

private

def drain_streams_concurrently
require "async"
require "async/barrier"
require "async/limited_queue"

return if @rorp_rendering_fibers.empty?

Sync do |parent|
# To avoid memory bloat, we use a limited queue to buffer chunks in memory.
Sync do
# Initialize async primitives for concurrent component streaming
@async_barrier = Async::Barrier.new
buffer_size = ReactOnRailsPro.configuration.concurrent_component_streaming_buffer_size
queue = Async::LimitedQueue.new(buffer_size)
@main_output_queue = Async::LimitedQueue.new(buffer_size)

writer = build_writer_task(parent: parent, queue: queue)
tasks = build_producer_tasks(parent: parent, queue: queue)
# Render template - components will start streaming immediately
template_string = render_to_string(template: template, **render_options)
# View may contain extra newlines, chunk already contains a newline
# Having multiple newlines between chunks causes hydration errors
# So we strip extra newlines from the template string and add a single newline
response.stream.write(template_string)

# This structure ensures that even if a producer task fails, we always
# signal the writer to stop and then wait for it to finish draining
# any remaining items from the queue before propagating the error.
begin
tasks.each(&:wait)
drain_streams_concurrently
ensure
# `close` signals end-of-stream; when writer tries to dequeue, it will get nil, so it will exit.
queue.close
writer.wait
response.stream.close if close_stream_at_end
end
end
end

def build_producer_tasks(parent:, queue:)
@rorp_rendering_fibers.each_with_index.map do |fiber, idx|
parent.async do
loop do
chunk = fiber.resume
break unless chunk
private

# Will be blocked if the queue is full until a chunk is dequeued
queue.enqueue([idx, chunk])
end
end
end
end
def drain_streams_concurrently
# Wait for all component streaming tasks to complete
@async_barrier.wait

def build_writer_task(parent:, queue:)
parent.async do
loop do
pair = queue.dequeue
break if pair.nil?
# Close the queue to signal end of streaming
@main_output_queue.close

_idx_from_queue, item = pair
response.stream.write(item)
end
# Drain all remaining chunks from the queue to the response stream
while (chunk = @main_output_queue.dequeue)
response.stream.write(chunk)
end
end
end
Expand Down
Loading