diff --git a/react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb b/react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb index db55ba2c1e..d12666bdc8 100644 --- a/react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb +++ b/react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb @@ -47,6 +47,20 @@ def stream_view_containing_react_components(template:, close_stream_at_end: true private + # Drains all streaming fibers concurrently using a producer-consumer pattern. + # + # Producer tasks: Each fiber drains its stream and enqueues chunks to a shared queue. + # Consumer task: Single writer dequeues chunks and writes them to the response stream. + # + # Ordering guarantees: + # - Chunks from the same component maintain their order + # - Chunks from different components may interleave based on production timing + # - The first component to produce a chunk will have it written first + # + # Memory management: + # - Uses a limited queue (configured via concurrent_component_streaming_buffer_size) + # - Producers block when the queue is full, providing backpressure + # - This prevents unbounded memory growth from fast producers def drain_streams_concurrently require "async" require "async/limited_queue" @@ -58,7 +72,9 @@ def drain_streams_concurrently buffer_size = ReactOnRailsPro.configuration.concurrent_component_streaming_buffer_size queue = Async::LimitedQueue.new(buffer_size) + # Consumer task: Single writer dequeues and writes to response stream writer = build_writer_task(parent: parent, queue: queue) + # Producer tasks: Each fiber drains its stream and enqueues chunks tasks = build_producer_tasks(parent: parent, queue: queue) # This structure ensures that even if a producer task fails, we always @@ -78,12 +94,19 @@ def build_producer_tasks(parent:, queue:) @rorp_rendering_fibers.each_with_index.map do |fiber, idx| parent.async do loop do + # Check if client disconnected before expensive operations + break if response.stream.closed? + chunk = fiber.resume break unless chunk # Will be blocked if the queue is full until a chunk is dequeued queue.enqueue([idx, chunk]) end + rescue IOError, Errno::EPIPE => e + # Client disconnected - stop producing + log_client_disconnect("producer", e) + break end end end @@ -97,6 +120,17 @@ def build_writer_task(parent:, queue:) _idx_from_queue, item = pair response.stream.write(item) end + rescue IOError, Errno::EPIPE => e + # Client disconnected - stop writing + log_client_disconnect("consumer", e) + end + end + + def log_client_disconnect(context, exception) + return unless ReactOnRails.configuration.logging_on_server + + ReactOnRails.configuration.logger.debug do + "[React on Rails Pro] Client disconnected during streaming (#{context}): #{exception.class}" end end end diff --git a/react_on_rails_pro/spec/dummy/spec/helpers/react_on_rails_pro_helper_spec.rb b/react_on_rails_pro/spec/dummy/spec/helpers/react_on_rails_pro_helper_spec.rb index ac3debe934..50a544270c 100644 --- a/react_on_rails_pro/spec/dummy/spec/helpers/react_on_rails_pro_helper_spec.rb +++ b/react_on_rails_pro/spec/dummy/spec/helpers/react_on_rails_pro_helper_spec.rb @@ -443,6 +443,7 @@ def mock_request_and_response(mock_chunks = chunks, count: 1) written_chunks << chunk end allow(mocked_stream).to receive(:close) + allow(mocked_stream).to receive(:closed?).and_return(false) mocked_response = instance_double(ActionDispatch::Response) allow(mocked_response).to receive(:stream).and_return(mocked_stream) allow(self).to receive(:response).and_return(mocked_response) @@ -532,6 +533,7 @@ def execute_stream_view_containing_react_components written_chunks.clear allow(mocked_stream).to receive(:write) { |chunk| written_chunks << chunk } allow(mocked_stream).to receive(:close) + allow(mocked_stream).to receive(:closed?).and_return(false) mocked_response = instance_double(ActionDispatch::Response) allow(mocked_response).to receive(:stream).and_return(mocked_stream) allow(self).to receive(:response).and_return(mocked_response) diff --git a/react_on_rails_pro/spec/react_on_rails_pro/stream_spec.rb b/react_on_rails_pro/spec/react_on_rails_pro/stream_spec.rb index a97f69ffde..12335496d0 100644 --- a/react_on_rails_pro/spec/react_on_rails_pro/stream_spec.rb +++ b/react_on_rails_pro/spec/react_on_rails_pro/stream_spec.rb @@ -388,6 +388,7 @@ def setup_stream_test(component_count: 2) allow(mocked_response).to receive(:stream).and_return(mocked_stream) allow(mocked_stream).to receive(:write) allow(mocked_stream).to receive(:close) + allow(mocked_stream).to receive(:closed?).and_return(false) allow(controller).to receive(:response).and_return(mocked_response) [component_queues, controller, mocked_stream]