Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,20 @@ def stream_view_containing_react_components(template:, close_stream_at_end: true

private

# Drains all streaming fibers concurrently using a producer-consumer pattern.
#
# Producer tasks: Each fiber drains its stream and enqueues chunks to a shared queue.
# Consumer task: Single writer dequeues chunks and writes them to the response stream.
#
# Ordering guarantees:
# - Chunks from the same component maintain their order
# - Chunks from different components may interleave based on production timing
# - The first component to produce a chunk will have it written first
#
# Memory management:
# - Uses a limited queue (configured via concurrent_component_streaming_buffer_size)
# - Producers block when the queue is full, providing backpressure
# - This prevents unbounded memory growth from fast producers
def drain_streams_concurrently
require "async"
require "async/limited_queue"
Expand All @@ -58,7 +72,9 @@ def drain_streams_concurrently
buffer_size = ReactOnRailsPro.configuration.concurrent_component_streaming_buffer_size
queue = Async::LimitedQueue.new(buffer_size)

# Consumer task: Single writer dequeues and writes to response stream
writer = build_writer_task(parent: parent, queue: queue)
# Producer tasks: Each fiber drains its stream and enqueues chunks
tasks = build_producer_tasks(parent: parent, queue: queue)

# This structure ensures that even if a producer task fails, we always
Expand All @@ -78,12 +94,19 @@ def build_producer_tasks(parent:, queue:)
@rorp_rendering_fibers.each_with_index.map do |fiber, idx|
parent.async do
loop do
# Check if client disconnected before expensive operations
break if response.stream.closed?

chunk = fiber.resume
break unless chunk

# Will be blocked if the queue is full until a chunk is dequeued
queue.enqueue([idx, chunk])
end
rescue IOError, Errno::EPIPE => e
# Client disconnected - stop producing
log_client_disconnect("producer", e)
break
end
end
end
Expand All @@ -97,6 +120,17 @@ def build_writer_task(parent:, queue:)
_idx_from_queue, item = pair
response.stream.write(item)
end
rescue IOError, Errno::EPIPE => e
# Client disconnected - stop writing
log_client_disconnect("consumer", e)
end
end

def log_client_disconnect(context, exception)
return unless ReactOnRails.configuration.logging_on_server

ReactOnRails.configuration.logger.debug do
"[React on Rails Pro] Client disconnected during streaming (#{context}): #{exception.class}"
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ def mock_request_and_response(mock_chunks = chunks, count: 1)
written_chunks << chunk
end
allow(mocked_stream).to receive(:close)
allow(mocked_stream).to receive(:closed?).and_return(false)
mocked_response = instance_double(ActionDispatch::Response)
allow(mocked_response).to receive(:stream).and_return(mocked_stream)
allow(self).to receive(:response).and_return(mocked_response)
Expand Down Expand Up @@ -532,6 +533,7 @@ def execute_stream_view_containing_react_components
written_chunks.clear
allow(mocked_stream).to receive(:write) { |chunk| written_chunks << chunk }
allow(mocked_stream).to receive(:close)
allow(mocked_stream).to receive(:closed?).and_return(false)
mocked_response = instance_double(ActionDispatch::Response)
allow(mocked_response).to receive(:stream).and_return(mocked_stream)
allow(self).to receive(:response).and_return(mocked_response)
Expand Down
1 change: 1 addition & 0 deletions react_on_rails_pro/spec/react_on_rails_pro/stream_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ def setup_stream_test(component_count: 2)
allow(mocked_response).to receive(:stream).and_return(mocked_stream)
allow(mocked_stream).to receive(:write)
allow(mocked_stream).to receive(:close)
allow(mocked_stream).to receive(:closed?).and_return(false)
allow(controller).to receive(:response).and_return(mocked_response)

[component_queues, controller, mocked_stream]
Expand Down
Loading