diff --git a/react_on_rails_pro/app/helpers/react_on_rails_pro_helper.rb b/react_on_rails_pro/app/helpers/react_on_rails_pro_helper.rb index 69c20af010..4c48941df8 100644 --- a/react_on_rails_pro/app/helpers/react_on_rails_pro_helper.rb +++ b/react_on_rails_pro/app/helpers/react_on_rails_pro_helper.rb @@ -313,25 +313,7 @@ def consumer_stream_async(on_complete:) # Start an async task on the barrier to stream all chunks @async_barrier.async do stream = yield - is_first = true - - stream.each_chunk do |chunk| - all_chunks << chunk if on_complete # Collect for callback - - if is_first - # Store first chunk in variable for synchronous access - first_chunk_var.value = chunk - is_first = false - else - # Enqueue remaining chunks to main output queue - @main_output_queue.enqueue(chunk) - end - end - - # Handle case where stream has no chunks - first_chunk_var.value = nil if is_first - - # Call callback with all chunks when streaming completes + process_stream_chunks(stream, first_chunk_var, all_chunks) on_complete&.call(all_chunks) end @@ -340,6 +322,29 @@ def consumer_stream_async(on_complete:) first_chunk_var.value end + def process_stream_chunks(stream, first_chunk_var, all_chunks) + is_first = true + + stream.each_chunk do |chunk| + # Check if client disconnected before processing chunk + break if response.stream.closed? + + all_chunks&.push(chunk) + + if is_first + # Store first chunk in variable for synchronous return + first_chunk_var.value = chunk + is_first = false + else + # Enqueue remaining chunks to main output queue + @main_output_queue.enqueue(chunk) + end + end + + # Handle case where stream has no chunks + first_chunk_var.value = nil if is_first + end + def internal_stream_react_component(component_name, options = {}) options = options.merge(render_mode: :html_streaming) result = internal_react_component(component_name, options) diff --git a/react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb b/react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb index 5735248828..78c6de721c 100644 --- a/react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb +++ b/react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb @@ -60,12 +60,29 @@ def stream_view_containing_react_components(template:, close_stream_at_end: true private + # Drains all streaming tasks concurrently using a producer-consumer pattern. + # + # Producer tasks: Created by consumer_stream_async in the helper, each streams + # chunks from the renderer and enqueues them to @main_output_queue. + # + # Consumer task: Single writer dequeues chunks and writes to response stream. + # + # Client disconnect handling: + # - If client disconnects (IOError/Errno::EPIPE), writer stops gracefully + # - Barrier is stopped to cancel all producer tasks, preventing wasted work + # - No exception propagates to the controller for client disconnects def drain_streams_concurrently(parent_task) + client_disconnected = false + writing_task = parent_task.async do # Drain all remaining chunks from the queue to the response stream while (chunk = @main_output_queue.dequeue) response.stream.write(chunk) end + rescue IOError, Errno::EPIPE => e + # Client disconnected - stop writing gracefully + client_disconnected = true + log_client_disconnect("writer", e) end # Wait for all component streaming tasks to complete @@ -76,9 +93,24 @@ def drain_streams_concurrently(parent_task) raise e end ensure - # Close the queue to signal end of streaming + # Close the queue first to unblock writing_task (it may be waiting on dequeue) @main_output_queue.close + + # Wait for writing_task to ensure client_disconnected flag is set + # before we check it (fixes race condition where ensure runs before + # writing_task's rescue block sets the flag) writing_task.wait + + # If client disconnected, stop all producer tasks to avoid wasted work + @async_barrier.stop if client_disconnected + end + + def log_client_disconnect(context, exception) + return unless ReactOnRails.configuration.logging_on_server + + Rails.logger.debug do + "[React on Rails Pro] Client disconnected during streaming (#{context}): #{exception.class}" + end end end end diff --git a/react_on_rails_pro/lib/react_on_rails_pro/configuration.rb b/react_on_rails_pro/lib/react_on_rails_pro/configuration.rb index b2b9e30cb8..3342943fd8 100644 --- a/react_on_rails_pro/lib/react_on_rails_pro/configuration.rb +++ b/react_on_rails_pro/lib/react_on_rails_pro/configuration.rb @@ -70,7 +70,26 @@ class Configuration # rubocop:disable Metrics/ClassLength :renderer_request_retry_limit, :throw_js_errors, :ssr_timeout, :profile_server_rendering_js_code, :raise_non_shell_server_rendering_errors, :enable_rsc_support, :rsc_payload_generation_url_path, :rsc_bundle_js_file, :react_client_manifest_file, - :react_server_client_manifest_file, :concurrent_component_streaming_buffer_size + :react_server_client_manifest_file + + attr_reader :concurrent_component_streaming_buffer_size + + # Sets the buffer size for concurrent component streaming. + # + # This value controls how many chunks can be buffered in memory during + # concurrent streaming operations. When producers generate chunks faster + # than they can be written to the client, this buffer prevents unbounded + # memory growth by blocking producers when the buffer is full. + # + # @param value [Integer] A positive integer specifying the buffer size + # @raise [ReactOnRailsPro::Error] if value is not a positive integer + def concurrent_component_streaming_buffer_size=(value) + unless value.is_a?(Integer) && value.positive? + raise ReactOnRailsPro::Error, + "config.concurrent_component_streaming_buffer_size must be a positive integer" + end + @concurrent_component_streaming_buffer_size = value + end def initialize(renderer_url: nil, renderer_password: nil, server_renderer: nil, # rubocop:disable Metrics/AbcSize renderer_use_fallback_exec_js: nil, prerender_caching: nil, @@ -118,7 +137,6 @@ def setup_config_values validate_remote_bundle_cache_adapter setup_renderer_password setup_assets_to_copy - validate_concurrent_component_streaming_buffer_size setup_execjs_profiler_if_needed check_react_on_rails_support_for_rsc end @@ -210,14 +228,6 @@ def validate_remote_bundle_cache_adapter end end - def validate_concurrent_component_streaming_buffer_size - return if concurrent_component_streaming_buffer_size.is_a?(Integer) && - concurrent_component_streaming_buffer_size.positive? - - raise ReactOnRailsPro::Error, - "config.concurrent_component_streaming_buffer_size must be a positive integer" - end - def setup_renderer_password return if renderer_password.present? diff --git a/react_on_rails_pro/spec/dummy/spec/helpers/react_on_rails_pro_helper_spec.rb b/react_on_rails_pro/spec/dummy/spec/helpers/react_on_rails_pro_helper_spec.rb index e8b90fd01f..6297c025b7 100644 --- a/react_on_rails_pro/spec/dummy/spec/helpers/react_on_rails_pro_helper_spec.rb +++ b/react_on_rails_pro/spec/dummy/spec/helpers/react_on_rails_pro_helper_spec.rb @@ -362,7 +362,9 @@ def mock_request_and_response(mock_chunks = chunks, count: 1) end end - describe "#stream_react_component" do + describe "#stream_react_component" do # rubocop:disable RSpec/MultipleMemoizedHelpers + let(:mocked_rails_stream) { instance_double(ActionController::Live::Buffer) } + around do |example| # Wrap each test in Sync block to provide async context Sync do @@ -378,6 +380,14 @@ def mock_request_and_response(mock_chunks = chunks, count: 1) end end + before do + # Mock response.stream.closed? for client disconnect detection + allow(mocked_rails_stream).to receive(:closed?).and_return(false) + mocked_rails_response = instance_double(ActionDispatch::Response) + allow(mocked_rails_response).to receive(:stream).and_return(mocked_rails_stream) + allow(self).to receive(:response).and_return(mocked_rails_response) + end + it "returns the component shell that exist in the initial chunk with the consoleReplayScript" do mock_request_and_response initial_result = stream_react_component(component_name, props: props, **component_options) @@ -452,6 +462,38 @@ def mock_request_and_response(mock_chunks = chunks, count: 1) expect(collected_chunks[1]).to include(chunks_with_whitespaces[2][:html]) expect(collected_chunks[2]).to include(chunks_with_whitespaces[3][:html]) end + + it "stops processing chunks when client disconnects" do + many_chunks = Array.new(10) do |i| + { html: "
Chunk #{i}
", consoleReplayScript: "" } + end + mock_request_and_response(many_chunks) + + # Simulate client disconnect after first chunk + call_count = 0 + allow(mocked_rails_stream).to receive(:closed?) do + call_count += 1 + call_count > 1 # false for first call, true after + end + + # Start streaming - first chunk returned synchronously + initial_result = stream_react_component(component_name, props: props, **component_options) + expect(initial_result).to include("
Chunk 0
") + + # Wait for async task to complete + @async_barrier.wait + @main_output_queue.close + + # Collect chunks that were enqueued to output + collected_chunks = [] + while (chunk = @main_output_queue.dequeue) + collected_chunks << chunk + end + + # Should have stopped early - not all chunks processed + # The exact count depends on timing, but should be less than 9 (all remaining) + expect(collected_chunks.length).to be < 9 + end end describe "stream_view_containing_react_components" do # rubocop:disable RSpec/MultipleMemoizedHelpers @@ -476,6 +518,7 @@ def mock_request_and_response(mock_chunks = chunks, count: 1) written_chunks << chunk end allow(mocked_stream).to receive(:close) + allow(mocked_stream).to receive(:closed?).and_return(false) mocked_response = instance_double(ActionDispatch::Response) allow(mocked_response).to receive(:stream).and_return(mocked_stream) allow(self).to receive(:response).and_return(mocked_response) @@ -565,6 +608,7 @@ def execute_stream_view_containing_react_components written_chunks.clear allow(mocked_stream).to receive(:write) { |chunk| written_chunks << chunk } allow(mocked_stream).to receive(:close) + allow(mocked_stream).to receive(:closed?).and_return(false) mocked_response = instance_double(ActionDispatch::Response) allow(mocked_response).to receive(:stream).and_return(mocked_stream) allow(self).to receive(:response).and_return(mocked_response) @@ -709,7 +753,9 @@ def run_stream end end - describe "cached_stream_react_component integration with RandomValue", :caching do + describe "cached_stream_react_component integration with RandomValue", :caching do # rubocop:disable RSpec/MultipleMemoizedHelpers + let(:mocked_stream) { instance_double(ActionController::Live::Buffer) } + around do |example| original_prerender_caching = ReactOnRailsPro.configuration.prerender_caching ReactOnRailsPro.configuration.prerender_caching = true @@ -720,6 +766,13 @@ def run_stream Rails.cache.clear end + before do + allow(mocked_stream).to receive(:closed?).and_return(false) + mocked_response = instance_double(ActionDispatch::Response) + allow(mocked_response).to receive(:stream).and_return(mocked_stream) + allow(self).to receive(:response).and_return(mocked_response) + end + # we need this setup because we can't use the helper outside of stream_view_containing_react_components def render_cached_random_value(cache_key) # Streaming helpers require this context normally provided by stream_view_containing_react_components @@ -780,6 +833,7 @@ def render_cached_random_value(cache_key) { html: "
Test Content
", consoleReplayScript: "" } ] end + let(:mocked_stream) { instance_double(ActionController::Live::Buffer) } around do |example| Sync do @@ -790,6 +844,12 @@ def render_cached_random_value(cache_key) end before do + # Mock response.stream.closed? for client disconnect detection + allow(mocked_stream).to receive(:closed?).and_return(false) + mocked_response = instance_double(ActionDispatch::Response) + allow(mocked_response).to receive(:stream).and_return(mocked_stream) + allow(self).to receive(:response).and_return(mocked_response) + ReactOnRailsPro::Request.instance_variable_set(:@connection, nil) original_httpx_plugin = HTTPX.method(:plugin) allow(HTTPX).to receive(:plugin) do |*args| diff --git a/react_on_rails_pro/spec/react_on_rails_pro/configuration_spec.rb b/react_on_rails_pro/spec/react_on_rails_pro/configuration_spec.rb index d2bbe6802c..138e422092 100644 --- a/react_on_rails_pro/spec/react_on_rails_pro/configuration_spec.rb +++ b/react_on_rails_pro/spec/react_on_rails_pro/configuration_spec.rb @@ -260,5 +260,42 @@ def self.fetch(*) expect(ReactOnRailsPro.configuration.react_server_client_manifest_file).to eq("server-client-manifest.json") end end + + describe ".concurrent_component_streaming_buffer_size" do + it "accepts positive integers" do + ReactOnRailsPro.configure do |config| + config.concurrent_component_streaming_buffer_size = 128 + end + + expect(ReactOnRailsPro.configuration.concurrent_component_streaming_buffer_size).to eq(128) + end + + it "raises error for non-positive integers" do + expect do + ReactOnRailsPro.configure do |config| + config.concurrent_component_streaming_buffer_size = 0 + end + end.to raise_error(ReactOnRailsPro::Error, + /must be a positive integer/) + end + + it "raises error for negative integers" do + expect do + ReactOnRailsPro.configure do |config| + config.concurrent_component_streaming_buffer_size = -1 + end + end.to raise_error(ReactOnRailsPro::Error, + /must be a positive integer/) + end + + it "raises error for non-integers" do + expect do + ReactOnRailsPro.configure do |config| + config.concurrent_component_streaming_buffer_size = "64" + end + end.to raise_error(ReactOnRailsPro::Error, + /must be a positive integer/) + end + end end end diff --git a/react_on_rails_pro/spec/react_on_rails_pro/stream_spec.rb b/react_on_rails_pro/spec/react_on_rails_pro/stream_spec.rb index 8ee1eed235..3b3ad5921a 100644 --- a/react_on_rails_pro/spec/react_on_rails_pro/stream_spec.rb +++ b/react_on_rails_pro/spec/react_on_rails_pro/stream_spec.rb @@ -392,6 +392,7 @@ def setup_stream_test(component_count: 2) allow(mocked_response).to receive(:stream).and_return(mocked_stream) allow(mocked_stream).to receive(:write) allow(mocked_stream).to receive(:close) + allow(mocked_stream).to receive(:closed?).and_return(false) allow(controller).to receive(:response).and_return(mocked_response) [component_queues, controller, mocked_stream] @@ -489,5 +490,62 @@ def setup_stream_test(component_count: 2) gaps = write_timestamps.each_cons(2).map { |a, b| b - a } expect(gaps.all? { |gap| gap >= 0.04 }).to be true end + + describe "client disconnect handling" do + it "stops writing on IOError" do + queues, controller, stream = setup_stream_test(component_count: 1) + + written_chunks = [] + write_count = 0 + + allow(stream).to receive(:write) do |chunk| + write_count += 1 + raise IOError, "client disconnected" if write_count == 3 + + written_chunks << chunk + end + + run_stream(controller) do |_parent| + queues[0].enqueue("Chunk1") + sleep 0.05 + queues[0].enqueue("Chunk2") + sleep 0.05 + queues[0].enqueue("Chunk3") + sleep 0.05 + queues[0].enqueue("Chunk4") + queues[0].close + sleep 0.1 + end + + # Write 1: TEMPLATE, Write 2: Chunk1, Write 3: Chunk2 (raises IOError) + expect(written_chunks).to eq(%w[TEMPLATE Chunk1]) + end + + it "stops writing on Errno::EPIPE" do + queues, controller, stream = setup_stream_test(component_count: 1) + + written_chunks = [] + write_count = 0 + + allow(stream).to receive(:write) do |chunk| + write_count += 1 + raise Errno::EPIPE, "broken pipe" if write_count == 3 + + written_chunks << chunk + end + + run_stream(controller) do |_parent| + queues[0].enqueue("Chunk1") + sleep 0.05 + queues[0].enqueue("Chunk2") + sleep 0.05 + queues[0].enqueue("Chunk3") + queues[0].close + sleep 0.1 + end + + expect(written_chunks).to eq(%w[TEMPLATE Chunk1]) + end + end end end