Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 24 additions & 19 deletions react_on_rails_pro/app/helpers/react_on_rails_pro_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -313,25 +313,7 @@ def consumer_stream_async(on_complete:)
# Start an async task on the barrier to stream all chunks
@async_barrier.async do
stream = yield
is_first = true

stream.each_chunk do |chunk|
all_chunks << chunk if on_complete # Collect for callback

if is_first
# Store first chunk in variable for synchronous access
first_chunk_var.value = chunk
is_first = false
else
# Enqueue remaining chunks to main output queue
@main_output_queue.enqueue(chunk)
end
end

# Handle case where stream has no chunks
first_chunk_var.value = nil if is_first

# Call callback with all chunks when streaming completes
process_stream_chunks(stream, first_chunk_var, all_chunks)
on_complete&.call(all_chunks)
end

Expand All @@ -340,6 +322,29 @@ def consumer_stream_async(on_complete:)
first_chunk_var.value
end

def process_stream_chunks(stream, first_chunk_var, all_chunks)
is_first = true

stream.each_chunk do |chunk|
# Check if client disconnected before processing chunk
break if response.stream.closed?

all_chunks&.push(chunk)

if is_first
# Store first chunk in variable for synchronous return
first_chunk_var.value = chunk
is_first = false
else
# Enqueue remaining chunks to main output queue
@main_output_queue.enqueue(chunk)
end
end

# Handle case where stream has no chunks
first_chunk_var.value = nil if is_first
end

def internal_stream_react_component(component_name, options = {})
options = options.merge(render_mode: :html_streaming)
result = internal_react_component(component_name, options)
Expand Down
28 changes: 28 additions & 0 deletions react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,29 @@ def stream_view_containing_react_components(template:, close_stream_at_end: true

private

# Drains all streaming tasks concurrently using a producer-consumer pattern.
#
# Producer tasks: Created by consumer_stream_async in the helper, each streams
# chunks from the renderer and enqueues them to @main_output_queue.
#
# Consumer task: Single writer dequeues chunks and writes to response stream.
#
# Client disconnect handling:
# - If client disconnects (IOError/Errno::EPIPE), writer stops gracefully
# - Barrier is stopped to cancel all producer tasks, preventing wasted work
# - No exception propagates to the controller for client disconnects
def drain_streams_concurrently(parent_task)
client_disconnected = false

writing_task = parent_task.async do
# Drain all remaining chunks from the queue to the response stream
while (chunk = @main_output_queue.dequeue)
response.stream.write(chunk)
end
rescue IOError, Errno::EPIPE => e
# Client disconnected - stop writing gracefully
client_disconnected = true
log_client_disconnect("writer", e)
end

# Wait for all component streaming tasks to complete
Expand All @@ -76,9 +93,20 @@ def drain_streams_concurrently(parent_task)
raise e
end
ensure
# If client disconnected, stop all producer tasks to avoid wasted work
@async_barrier.stop if client_disconnected

# Close the queue to signal end of streaming
@main_output_queue.close
writing_task.wait
end

def log_client_disconnect(context, exception)
return unless ReactOnRails.configuration.logging_on_server

Rails.logger.debug do
"[React on Rails Pro] Client disconnected during streaming (#{context}): #{exception.class}"
end
end
end
end
21 changes: 20 additions & 1 deletion react_on_rails_pro/lib/react_on_rails_pro/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,26 @@ class Configuration # rubocop:disable Metrics/ClassLength
:renderer_request_retry_limit, :throw_js_errors, :ssr_timeout,
:profile_server_rendering_js_code, :raise_non_shell_server_rendering_errors, :enable_rsc_support,
:rsc_payload_generation_url_path, :rsc_bundle_js_file, :react_client_manifest_file,
:react_server_client_manifest_file, :concurrent_component_streaming_buffer_size
:react_server_client_manifest_file

attr_reader :concurrent_component_streaming_buffer_size

# Sets the buffer size for concurrent component streaming.
#
# This value controls how many chunks can be buffered in memory during
# concurrent streaming operations. When producers generate chunks faster
# than they can be written to the client, this buffer prevents unbounded
# memory growth by blocking producers when the buffer is full.
#
# @param value [Integer] A positive integer specifying the buffer size
# @raise [ReactOnRailsPro::Error] if value is not a positive integer
def concurrent_component_streaming_buffer_size=(value)
unless value.is_a?(Integer) && value.positive?
raise ReactOnRailsPro::Error,
"config.concurrent_component_streaming_buffer_size must be a positive integer"
end
@concurrent_component_streaming_buffer_size = value
end

def initialize(renderer_url: nil, renderer_password: nil, server_renderer: nil, # rubocop:disable Metrics/AbcSize
renderer_use_fallback_exec_js: nil, prerender_caching: nil,
Expand Down
37 changes: 37 additions & 0 deletions react_on_rails_pro/spec/react_on_rails_pro/configuration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -260,5 +260,42 @@ def self.fetch(*)
expect(ReactOnRailsPro.configuration.react_server_client_manifest_file).to eq("server-client-manifest.json")
end
end

describe ".concurrent_component_streaming_buffer_size" do
it "accepts positive integers" do
ReactOnRailsPro.configure do |config|
config.concurrent_component_streaming_buffer_size = 128
end

expect(ReactOnRailsPro.configuration.concurrent_component_streaming_buffer_size).to eq(128)
end

it "raises error for non-positive integers" do
expect do
ReactOnRailsPro.configure do |config|
config.concurrent_component_streaming_buffer_size = 0
end
end.to raise_error(ReactOnRailsPro::Error,
/must be a positive integer/)
end

it "raises error for negative integers" do
expect do
ReactOnRailsPro.configure do |config|
config.concurrent_component_streaming_buffer_size = -1
end
end.to raise_error(ReactOnRailsPro::Error,
/must be a positive integer/)
end

it "raises error for non-integers" do
expect do
ReactOnRailsPro.configure do |config|
config.concurrent_component_streaming_buffer_size = "64"
end
end.to raise_error(ReactOnRailsPro::Error,
/must be a positive integer/)
end
end
end
end
58 changes: 58 additions & 0 deletions react_on_rails_pro/spec/react_on_rails_pro/stream_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ def setup_stream_test(component_count: 2)
allow(mocked_response).to receive(:stream).and_return(mocked_stream)
allow(mocked_stream).to receive(:write)
allow(mocked_stream).to receive(:close)
allow(mocked_stream).to receive(:closed?).and_return(false)
allow(controller).to receive(:response).and_return(mocked_response)

[component_queues, controller, mocked_stream]
Expand Down Expand Up @@ -489,5 +490,62 @@ def setup_stream_test(component_count: 2)
gaps = write_timestamps.each_cons(2).map { |a, b| b - a }
expect(gaps.all? { |gap| gap >= 0.04 }).to be true
end

describe "client disconnect handling" do
it "stops writing on IOError" do
queues, controller, stream = setup_stream_test(component_count: 1)

written_chunks = []
write_count = 0

allow(stream).to receive(:write) do |chunk|
write_count += 1
raise IOError, "client disconnected" if write_count == 3

written_chunks << chunk
end

run_stream(controller) do |_parent|
queues[0].enqueue("Chunk1")
sleep 0.05
queues[0].enqueue("Chunk2")
sleep 0.05
queues[0].enqueue("Chunk3")
sleep 0.05
queues[0].enqueue("Chunk4")
queues[0].close
sleep 0.1
end

# Write 1: TEMPLATE, Write 2: Chunk1, Write 3: Chunk2 (raises IOError)
expect(written_chunks).to eq(%w[TEMPLATE Chunk1])
end

it "stops writing on Errno::EPIPE" do
queues, controller, stream = setup_stream_test(component_count: 1)

written_chunks = []
write_count = 0

allow(stream).to receive(:write) do |chunk|
write_count += 1
raise Errno::EPIPE, "broken pipe" if write_count == 3

written_chunks << chunk
end

run_stream(controller) do |_parent|
queues[0].enqueue("Chunk1")
sleep 0.05
queues[0].enqueue("Chunk2")
sleep 0.05
queues[0].enqueue("Chunk3")
queues[0].close
sleep 0.1
end

expect(written_chunks).to eq(%w[TEMPLATE Chunk1])
end
end
end
end
Loading