Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 24 additions & 19 deletions react_on_rails_pro/app/helpers/react_on_rails_pro_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -313,25 +313,7 @@ def consumer_stream_async(on_complete:)
# Start an async task on the barrier to stream all chunks
@async_barrier.async do
stream = yield
is_first = true

stream.each_chunk do |chunk|
all_chunks << chunk if on_complete # Collect for callback

if is_first
# Store first chunk in variable for synchronous access
first_chunk_var.value = chunk
is_first = false
else
# Enqueue remaining chunks to main output queue
@main_output_queue.enqueue(chunk)
end
end

# Handle case where stream has no chunks
first_chunk_var.value = nil if is_first

# Call callback with all chunks when streaming completes
process_stream_chunks(stream, first_chunk_var, all_chunks)
on_complete&.call(all_chunks)
end

Expand All @@ -340,6 +322,29 @@ def consumer_stream_async(on_complete:)
first_chunk_var.value
end

def process_stream_chunks(stream, first_chunk_var, all_chunks)
is_first = true

stream.each_chunk do |chunk|
# Check if client disconnected before processing chunk
break if response.stream.closed?

all_chunks&.push(chunk)

if is_first
# Store first chunk in variable for synchronous return
first_chunk_var.value = chunk
is_first = false
else
# Enqueue remaining chunks to main output queue
@main_output_queue.enqueue(chunk)
end
end

# Handle case where stream has no chunks
first_chunk_var.value = nil if is_first
end

def internal_stream_react_component(component_name, options = {})
options = options.merge(render_mode: :html_streaming)
result = internal_react_component(component_name, options)
Expand Down
28 changes: 28 additions & 0 deletions react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,29 @@ def stream_view_containing_react_components(template:, close_stream_at_end: true

private

# Drains all streaming tasks concurrently using a producer-consumer pattern.
#
# Producer tasks: Created by consumer_stream_async in the helper, each streams
# chunks from the renderer and enqueues them to @main_output_queue.
#
# Consumer task: Single writer dequeues chunks and writes to response stream.
#
# Client disconnect handling:
# - If client disconnects (IOError/Errno::EPIPE), writer stops gracefully
# - Barrier is stopped to cancel all producer tasks, preventing wasted work
# - No exception propagates to the controller for client disconnects
def drain_streams_concurrently(parent_task)
client_disconnected = false

writing_task = parent_task.async do
# Drain all remaining chunks from the queue to the response stream
while (chunk = @main_output_queue.dequeue)
response.stream.write(chunk)
end
rescue IOError, Errno::EPIPE => e
# Client disconnected - stop writing gracefully
client_disconnected = true
log_client_disconnect("writer", e)
end

# Wait for all component streaming tasks to complete
Expand All @@ -76,9 +93,20 @@ def drain_streams_concurrently(parent_task)
raise e
end
ensure
# If client disconnected, stop all producer tasks to avoid wasted work
@async_barrier.stop if client_disconnected

# Close the queue to signal end of streaming
@main_output_queue.close
writing_task.wait
end

def log_client_disconnect(context, exception)
return unless ReactOnRails.configuration.logging_on_server

Rails.logger.debug do
"[React on Rails Pro] Client disconnected during streaming (#{context}): #{exception.class}"
end
end
end
end
30 changes: 20 additions & 10 deletions react_on_rails_pro/lib/react_on_rails_pro/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,26 @@ class Configuration # rubocop:disable Metrics/ClassLength
:renderer_request_retry_limit, :throw_js_errors, :ssr_timeout,
:profile_server_rendering_js_code, :raise_non_shell_server_rendering_errors, :enable_rsc_support,
:rsc_payload_generation_url_path, :rsc_bundle_js_file, :react_client_manifest_file,
:react_server_client_manifest_file, :concurrent_component_streaming_buffer_size
:react_server_client_manifest_file

attr_reader :concurrent_component_streaming_buffer_size

# Sets the buffer size for concurrent component streaming.
#
# This value controls how many chunks can be buffered in memory during
# concurrent streaming operations. When producers generate chunks faster
# than they can be written to the client, this buffer prevents unbounded
# memory growth by blocking producers when the buffer is full.
#
# @param value [Integer] A positive integer specifying the buffer size
# @raise [ReactOnRailsPro::Error] if value is not a positive integer
def concurrent_component_streaming_buffer_size=(value)
unless value.is_a?(Integer) && value.positive?
raise ReactOnRailsPro::Error,
"config.concurrent_component_streaming_buffer_size must be a positive integer"
end
@concurrent_component_streaming_buffer_size = value
end

def initialize(renderer_url: nil, renderer_password: nil, server_renderer: nil, # rubocop:disable Metrics/AbcSize
renderer_use_fallback_exec_js: nil, prerender_caching: nil,
Expand Down Expand Up @@ -118,7 +137,6 @@ def setup_config_values
validate_remote_bundle_cache_adapter
setup_renderer_password
setup_assets_to_copy
validate_concurrent_component_streaming_buffer_size
setup_execjs_profiler_if_needed
check_react_on_rails_support_for_rsc
end
Expand Down Expand Up @@ -210,14 +228,6 @@ def validate_remote_bundle_cache_adapter
end
end

def validate_concurrent_component_streaming_buffer_size
return if concurrent_component_streaming_buffer_size.is_a?(Integer) &&
concurrent_component_streaming_buffer_size.positive?

raise ReactOnRailsPro::Error,
"config.concurrent_component_streaming_buffer_size must be a positive integer"
end

def setup_renderer_password
return if renderer_password.present?

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,9 @@ def mock_request_and_response(mock_chunks = chunks, count: 1)
end
end

describe "#stream_react_component" do
describe "#stream_react_component" do # rubocop:disable RSpec/MultipleMemoizedHelpers
let(:mocked_rails_stream) { instance_double(ActionController::Live::Buffer) }

around do |example|
# Wrap each test in Sync block to provide async context
Sync do
Expand All @@ -378,6 +380,14 @@ def mock_request_and_response(mock_chunks = chunks, count: 1)
end
end

before do
# Mock response.stream.closed? for client disconnect detection
allow(mocked_rails_stream).to receive(:closed?).and_return(false)
mocked_rails_response = instance_double(ActionDispatch::Response)
allow(mocked_rails_response).to receive(:stream).and_return(mocked_rails_stream)
allow(self).to receive(:response).and_return(mocked_rails_response)
end

it "returns the component shell that exist in the initial chunk with the consoleReplayScript" do
mock_request_and_response
initial_result = stream_react_component(component_name, props: props, **component_options)
Expand Down Expand Up @@ -452,6 +462,38 @@ def mock_request_and_response(mock_chunks = chunks, count: 1)
expect(collected_chunks[1]).to include(chunks_with_whitespaces[2][:html])
expect(collected_chunks[2]).to include(chunks_with_whitespaces[3][:html])
end

it "stops processing chunks when client disconnects" do
many_chunks = Array.new(10) do |i|
{ html: "<div>Chunk #{i}</div>", consoleReplayScript: "" }
end
mock_request_and_response(many_chunks)

# Simulate client disconnect after first chunk
call_count = 0
allow(mocked_rails_stream).to receive(:closed?) do
call_count += 1
call_count > 1 # false for first call, true after
end

# Start streaming - first chunk returned synchronously
initial_result = stream_react_component(component_name, props: props, **component_options)
expect(initial_result).to include("<div>Chunk 0</div>")

# Wait for async task to complete
@async_barrier.wait
@main_output_queue.close

# Collect chunks that were enqueued to output
collected_chunks = []
while (chunk = @main_output_queue.dequeue)
collected_chunks << chunk
end

# Should have stopped early - not all chunks processed
# The exact count depends on timing, but should be less than 9 (all remaining)
expect(collected_chunks.length).to be < 9
end
end

describe "stream_view_containing_react_components" do # rubocop:disable RSpec/MultipleMemoizedHelpers
Expand All @@ -476,6 +518,7 @@ def mock_request_and_response(mock_chunks = chunks, count: 1)
written_chunks << chunk
end
allow(mocked_stream).to receive(:close)
allow(mocked_stream).to receive(:closed?).and_return(false)
mocked_response = instance_double(ActionDispatch::Response)
allow(mocked_response).to receive(:stream).and_return(mocked_stream)
allow(self).to receive(:response).and_return(mocked_response)
Expand Down Expand Up @@ -565,6 +608,7 @@ def execute_stream_view_containing_react_components
written_chunks.clear
allow(mocked_stream).to receive(:write) { |chunk| written_chunks << chunk }
allow(mocked_stream).to receive(:close)
allow(mocked_stream).to receive(:closed?).and_return(false)
mocked_response = instance_double(ActionDispatch::Response)
allow(mocked_response).to receive(:stream).and_return(mocked_stream)
allow(self).to receive(:response).and_return(mocked_response)
Expand Down Expand Up @@ -709,7 +753,9 @@ def run_stream
end
end

describe "cached_stream_react_component integration with RandomValue", :caching do
describe "cached_stream_react_component integration with RandomValue", :caching do # rubocop:disable RSpec/MultipleMemoizedHelpers
let(:mocked_stream) { instance_double(ActionController::Live::Buffer) }

around do |example|
original_prerender_caching = ReactOnRailsPro.configuration.prerender_caching
ReactOnRailsPro.configuration.prerender_caching = true
Expand All @@ -720,6 +766,13 @@ def run_stream
Rails.cache.clear
end

before do
allow(mocked_stream).to receive(:closed?).and_return(false)
mocked_response = instance_double(ActionDispatch::Response)
allow(mocked_response).to receive(:stream).and_return(mocked_stream)
allow(self).to receive(:response).and_return(mocked_response)
end

# we need this setup because we can't use the helper outside of stream_view_containing_react_components
def render_cached_random_value(cache_key)
# Streaming helpers require this context normally provided by stream_view_containing_react_components
Expand Down Expand Up @@ -780,6 +833,7 @@ def render_cached_random_value(cache_key)
{ html: "<div>Test Content</div>", consoleReplayScript: "" }
]
end
let(:mocked_stream) { instance_double(ActionController::Live::Buffer) }

around do |example|
Sync do
Expand All @@ -790,6 +844,12 @@ def render_cached_random_value(cache_key)
end

before do
# Mock response.stream.closed? for client disconnect detection
allow(mocked_stream).to receive(:closed?).and_return(false)
mocked_response = instance_double(ActionDispatch::Response)
allow(mocked_response).to receive(:stream).and_return(mocked_stream)
allow(self).to receive(:response).and_return(mocked_response)

ReactOnRailsPro::Request.instance_variable_set(:@connection, nil)
original_httpx_plugin = HTTPX.method(:plugin)
allow(HTTPX).to receive(:plugin) do |*args|
Expand Down
37 changes: 37 additions & 0 deletions react_on_rails_pro/spec/react_on_rails_pro/configuration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -260,5 +260,42 @@ def self.fetch(*)
expect(ReactOnRailsPro.configuration.react_server_client_manifest_file).to eq("server-client-manifest.json")
end
end

describe ".concurrent_component_streaming_buffer_size" do
it "accepts positive integers" do
ReactOnRailsPro.configure do |config|
config.concurrent_component_streaming_buffer_size = 128
end

expect(ReactOnRailsPro.configuration.concurrent_component_streaming_buffer_size).to eq(128)
end

it "raises error for non-positive integers" do
expect do
ReactOnRailsPro.configure do |config|
config.concurrent_component_streaming_buffer_size = 0
end
end.to raise_error(ReactOnRailsPro::Error,
/must be a positive integer/)
end

it "raises error for negative integers" do
expect do
ReactOnRailsPro.configure do |config|
config.concurrent_component_streaming_buffer_size = -1
end
end.to raise_error(ReactOnRailsPro::Error,
/must be a positive integer/)
end

it "raises error for non-integers" do
expect do
ReactOnRailsPro.configure do |config|
config.concurrent_component_streaming_buffer_size = "64"
end
end.to raise_error(ReactOnRailsPro::Error,
/must be a positive integer/)
end
end
end
end
58 changes: 58 additions & 0 deletions react_on_rails_pro/spec/react_on_rails_pro/stream_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ def setup_stream_test(component_count: 2)
allow(mocked_response).to receive(:stream).and_return(mocked_stream)
allow(mocked_stream).to receive(:write)
allow(mocked_stream).to receive(:close)
allow(mocked_stream).to receive(:closed?).and_return(false)
allow(controller).to receive(:response).and_return(mocked_response)

[component_queues, controller, mocked_stream]
Expand Down Expand Up @@ -489,5 +490,62 @@ def setup_stream_test(component_count: 2)
gaps = write_timestamps.each_cons(2).map { |a, b| b - a }
expect(gaps.all? { |gap| gap >= 0.04 }).to be true
end

describe "client disconnect handling" do
it "stops writing on IOError" do
queues, controller, stream = setup_stream_test(component_count: 1)

written_chunks = []
write_count = 0

allow(stream).to receive(:write) do |chunk|
write_count += 1
raise IOError, "client disconnected" if write_count == 3

written_chunks << chunk
end

run_stream(controller) do |_parent|
queues[0].enqueue("Chunk1")
sleep 0.05
queues[0].enqueue("Chunk2")
sleep 0.05
queues[0].enqueue("Chunk3")
sleep 0.05
queues[0].enqueue("Chunk4")
queues[0].close
sleep 0.1
end

# Write 1: TEMPLATE, Write 2: Chunk1, Write 3: Chunk2 (raises IOError)
expect(written_chunks).to eq(%w[TEMPLATE Chunk1])
end

it "stops writing on Errno::EPIPE" do
queues, controller, stream = setup_stream_test(component_count: 1)

written_chunks = []
write_count = 0

allow(stream).to receive(:write) do |chunk|
write_count += 1
raise Errno::EPIPE, "broken pipe" if write_count == 3

written_chunks << chunk
end

run_stream(controller) do |_parent|
queues[0].enqueue("Chunk1")
sleep 0.05
queues[0].enqueue("Chunk2")
sleep 0.05
queues[0].enqueue("Chunk3")
queues[0].close
sleep 0.1
end

expect(written_chunks).to eq(%w[TEMPLATE Chunk1])
end
end
end
end
Loading