Skip to content

Commit e4a552e

Browse files
justin808claude
andcommitted
Add client disconnect handling and documentation to concurrent streaming
Improves error handling and documentation for concurrent component streaming: 1. Client Disconnect Handling - Added IOError and Errno::EPIPE exception handling in producer tasks - Added response.stream.closed? check before expensive operations - Added exception handling in writer task to stop gracefully on disconnect - Prevents wasted resources when clients disconnect mid-stream 2. Documentation Enhancements - Added detailed comments explaining producer-consumer pattern - Documented ordering guarantees for concurrent streaming - Clarified that chunks from same component maintain order - Clarified that chunks from different components may interleave - Added memory management documentation 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent dca30af commit e4a552e

File tree

1 file changed

+25
-0
lines changed
  • react_on_rails_pro/lib/react_on_rails_pro/concerns

1 file changed

+25
-0
lines changed

react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,20 @@ def stream_view_containing_react_components(template:, close_stream_at_end: true
4747

4848
private
4949

50+
# Drains all streaming fibers concurrently using a producer-consumer pattern.
51+
#
52+
# Producer tasks: Each fiber drains its stream and enqueues chunks to a shared queue.
53+
# Consumer task: Single writer dequeues chunks and writes them to the response stream.
54+
#
55+
# Ordering guarantees:
56+
# - Chunks from the same component maintain their order
57+
# - Chunks from different components may interleave based on production timing
58+
# - The first component to produce a chunk will have it written first
59+
#
60+
# Memory management:
61+
# - Uses a limited queue (configured via concurrent_component_streaming_buffer_size)
62+
# - Producers block when the queue is full, providing backpressure
63+
# - This prevents unbounded memory growth from fast producers
5064
def drain_streams_concurrently
5165
require "async"
5266
require "async/limited_queue"
@@ -58,7 +72,9 @@ def drain_streams_concurrently
5872
buffer_size = ReactOnRailsPro.configuration.concurrent_component_streaming_buffer_size
5973
queue = Async::LimitedQueue.new(buffer_size)
6074

75+
# Consumer task: Single writer dequeues and writes to response stream
6176
writer = build_writer_task(parent: parent, queue: queue)
77+
# Producer tasks: Each fiber drains its stream and enqueues chunks
6278
tasks = build_producer_tasks(parent: parent, queue: queue)
6379

6480
# This structure ensures that even if a producer task fails, we always
@@ -78,12 +94,18 @@ def build_producer_tasks(parent:, queue:)
7894
@rorp_rendering_fibers.each_with_index.map do |fiber, idx|
7995
parent.async do
8096
loop do
97+
# Check if client disconnected before expensive operations
98+
break if response.stream.closed?
99+
81100
chunk = fiber.resume
82101
break unless chunk
83102

84103
# Will be blocked if the queue is full until a chunk is dequeued
85104
queue.enqueue([idx, chunk])
86105
end
106+
rescue IOError, Errno::EPIPE
107+
# Client disconnected - stop producing
108+
break
87109
end
88110
end
89111
end
@@ -97,6 +119,9 @@ def build_writer_task(parent:, queue:)
97119
_idx_from_queue, item = pair
98120
response.stream.write(item)
99121
end
122+
rescue IOError, Errno::EPIPE
123+
# Client disconnected - stop writing
124+
nil
100125
end
101126
end
102127
end

0 commit comments

Comments
 (0)