Skip to content

Commit 3ae39e5

Browse files
ihabadhamclaude
andcommitted
Add client disconnect handling for concurrent component streaming
When a client disconnects during streaming (browser closed, network drop), the response stream write raises IOError or Errno::EPIPE. Without handling, this crashes the request and wastes CPU on continued processing. Changes: 1. Writer error handling (stream.rb): - Catch IOError/EPIPE in writing task - Stop barrier to cancel producer tasks on disconnect - Log disconnect for debugging (when logging_on_server enabled) 2. Producer early termination (react_on_rails_pro_helper.rb): - Add stream.closed? check before processing each chunk - Prevents deadlock when producer blocks on full queue - Prevents wasted CPU when producer runs ahead of failed writer 3. Configuration validation (configuration.rb): - Add setter validation for concurrent_component_streaming_buffer_size - Must be a positive integer 4. Tests: - Add "stops writing when client disconnects" test - Add buffer size validation tests - Add closed? stub to stream test setup Based on error handling from PRs #2017 and #2026, adapted for the new Async::Barrier architecture introduced in PR #2111. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent 16b3908 commit 3ae39e5

File tree

5 files changed

+146
-20
lines changed

5 files changed

+146
-20
lines changed

react_on_rails_pro/app/helpers/react_on_rails_pro_helper.rb

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -313,25 +313,7 @@ def consumer_stream_async(on_complete:)
313313
# Start an async task on the barrier to stream all chunks
314314
@async_barrier.async do
315315
stream = yield
316-
is_first = true
317-
318-
stream.each_chunk do |chunk|
319-
all_chunks << chunk if on_complete # Collect for callback
320-
321-
if is_first
322-
# Store first chunk in variable for synchronous access
323-
first_chunk_var.value = chunk
324-
is_first = false
325-
else
326-
# Enqueue remaining chunks to main output queue
327-
@main_output_queue.enqueue(chunk)
328-
end
329-
end
330-
331-
# Handle case where stream has no chunks
332-
first_chunk_var.value = nil if is_first
333-
334-
# Call callback with all chunks when streaming completes
316+
process_stream_chunks(stream, first_chunk_var, all_chunks)
335317
on_complete&.call(all_chunks)
336318
end
337319

@@ -340,6 +322,29 @@ def consumer_stream_async(on_complete:)
340322
first_chunk_var.value
341323
end
342324

325+
def process_stream_chunks(stream, first_chunk_var, all_chunks)
326+
is_first = true
327+
328+
stream.each_chunk do |chunk|
329+
# Check if client disconnected before processing chunk
330+
break if response.stream.closed?
331+
332+
all_chunks&.push(chunk)
333+
334+
if is_first
335+
# Store first chunk in variable for synchronous return
336+
first_chunk_var.value = chunk
337+
is_first = false
338+
else
339+
# Enqueue remaining chunks to main output queue
340+
@main_output_queue.enqueue(chunk)
341+
end
342+
end
343+
344+
# Handle case where stream has no chunks
345+
first_chunk_var.value = nil if is_first
346+
end
347+
343348
def internal_stream_react_component(component_name, options = {})
344349
options = options.merge(render_mode: :html_streaming)
345350
result = internal_react_component(component_name, options)

react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,29 @@ def stream_view_containing_react_components(template:, close_stream_at_end: true
6060

6161
private
6262

63+
# Drains all streaming tasks concurrently using a producer-consumer pattern.
64+
#
65+
# Producer tasks: Created by consumer_stream_async in the helper, each streams
66+
# chunks from the renderer and enqueues them to @main_output_queue.
67+
#
68+
# Consumer task: Single writer dequeues chunks and writes to response stream.
69+
#
70+
# Client disconnect handling:
71+
# - If client disconnects (IOError/Errno::EPIPE), writer stops gracefully
72+
# - Barrier is stopped to cancel all producer tasks, preventing wasted work
73+
# - No exception propagates to the controller for client disconnects
6374
def drain_streams_concurrently(parent_task)
75+
client_disconnected = false
76+
6477
writing_task = parent_task.async do
6578
# Drain all remaining chunks from the queue to the response stream
6679
while (chunk = @main_output_queue.dequeue)
6780
response.stream.write(chunk)
6881
end
82+
rescue IOError, Errno::EPIPE => e
83+
# Client disconnected - stop writing gracefully
84+
client_disconnected = true
85+
log_client_disconnect("writer", e)
6986
end
7087

7188
# Wait for all component streaming tasks to complete
@@ -76,9 +93,20 @@ def drain_streams_concurrently(parent_task)
7693
raise e
7794
end
7895
ensure
96+
# If client disconnected, stop all producer tasks to avoid wasted work
97+
@async_barrier.stop if client_disconnected
98+
7999
# Close the queue to signal end of streaming
80100
@main_output_queue.close
81101
writing_task.wait
82102
end
103+
104+
def log_client_disconnect(context, exception)
105+
return unless ReactOnRails.configuration.logging_on_server
106+
107+
Rails.logger.debug do
108+
"[React on Rails Pro] Client disconnected during streaming (#{context}): #{exception.class}"
109+
end
110+
end
83111
end
84112
end

react_on_rails_pro/lib/react_on_rails_pro/configuration.rb

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,26 @@ class Configuration # rubocop:disable Metrics/ClassLength
7070
:renderer_request_retry_limit, :throw_js_errors, :ssr_timeout,
7171
:profile_server_rendering_js_code, :raise_non_shell_server_rendering_errors, :enable_rsc_support,
7272
:rsc_payload_generation_url_path, :rsc_bundle_js_file, :react_client_manifest_file,
73-
:react_server_client_manifest_file, :concurrent_component_streaming_buffer_size
73+
:react_server_client_manifest_file
74+
75+
attr_reader :concurrent_component_streaming_buffer_size
76+
77+
# Sets the buffer size for concurrent component streaming.
78+
#
79+
# This value controls how many chunks can be buffered in memory during
80+
# concurrent streaming operations. When producers generate chunks faster
81+
# than they can be written to the client, this buffer prevents unbounded
82+
# memory growth by blocking producers when the buffer is full.
83+
#
84+
# @param value [Integer] A positive integer specifying the buffer size
85+
# @raise [ReactOnRailsPro::Error] if value is not a positive integer
86+
def concurrent_component_streaming_buffer_size=(value)
87+
unless value.is_a?(Integer) && value.positive?
88+
raise ReactOnRailsPro::Error,
89+
"config.concurrent_component_streaming_buffer_size must be a positive integer"
90+
end
91+
@concurrent_component_streaming_buffer_size = value
92+
end
7493

7594
def initialize(renderer_url: nil, renderer_password: nil, server_renderer: nil, # rubocop:disable Metrics/AbcSize
7695
renderer_use_fallback_exec_js: nil, prerender_caching: nil,

react_on_rails_pro/spec/react_on_rails_pro/configuration_spec.rb

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,5 +260,42 @@ def self.fetch(*)
260260
expect(ReactOnRailsPro.configuration.react_server_client_manifest_file).to eq("server-client-manifest.json")
261261
end
262262
end
263+
264+
describe ".concurrent_component_streaming_buffer_size" do
265+
it "accepts positive integers" do
266+
ReactOnRailsPro.configure do |config|
267+
config.concurrent_component_streaming_buffer_size = 128
268+
end
269+
270+
expect(ReactOnRailsPro.configuration.concurrent_component_streaming_buffer_size).to eq(128)
271+
end
272+
273+
it "raises error for non-positive integers" do
274+
expect do
275+
ReactOnRailsPro.configure do |config|
276+
config.concurrent_component_streaming_buffer_size = 0
277+
end
278+
end.to raise_error(ReactOnRailsPro::Error,
279+
/must be a positive integer/)
280+
end
281+
282+
it "raises error for negative integers" do
283+
expect do
284+
ReactOnRailsPro.configure do |config|
285+
config.concurrent_component_streaming_buffer_size = -1
286+
end
287+
end.to raise_error(ReactOnRailsPro::Error,
288+
/must be a positive integer/)
289+
end
290+
291+
it "raises error for non-integers" do
292+
expect do
293+
ReactOnRailsPro.configure do |config|
294+
config.concurrent_component_streaming_buffer_size = "64"
295+
end
296+
end.to raise_error(ReactOnRailsPro::Error,
297+
/must be a positive integer/)
298+
end
299+
end
263300
end
264301
end

react_on_rails_pro/spec/react_on_rails_pro/stream_spec.rb

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,7 @@ def setup_stream_test(component_count: 2)
392392
allow(mocked_response).to receive(:stream).and_return(mocked_stream)
393393
allow(mocked_stream).to receive(:write)
394394
allow(mocked_stream).to receive(:close)
395+
allow(mocked_stream).to receive(:closed?).and_return(false)
395396
allow(controller).to receive(:response).and_return(mocked_response)
396397

397398
[component_queues, controller, mocked_stream]
@@ -489,5 +490,41 @@ def setup_stream_test(component_count: 2)
489490
gaps = write_timestamps.each_cons(2).map { |a, b| b - a }
490491
expect(gaps.all? { |gap| gap >= 0.04 }).to be true
491492
end
493+
494+
it "stops writing when client disconnects" do
495+
queues, controller, stream = setup_stream_test(component_count: 1)
496+
497+
written_chunks = []
498+
write_count = 0
499+
500+
# Simulate client disconnect: IOError on third write
501+
allow(stream).to receive(:write) do |chunk|
502+
write_count += 1
503+
raise IOError, "client disconnected" if write_count == 3
504+
505+
written_chunks << chunk
506+
end
507+
508+
# closed? stub required by setup but not used by StreamController's simple enqueue logic
509+
allow(stream).to receive(:closed?).and_return(false)
510+
511+
run_stream(controller) do |_parent|
512+
queues[0].enqueue("Chunk1")
513+
sleep 0.05
514+
queues[0].enqueue("Chunk2")
515+
sleep 0.05
516+
queues[0].enqueue("Chunk3") # This write will raise IOError
517+
sleep 0.05
518+
queues[0].enqueue("Chunk4") # Should not be written
519+
queues[0].close
520+
sleep 0.1
521+
end
522+
523+
# Writer catches IOError and stops - only successful writes recorded
524+
# Write 1: TEMPLATE (success), Write 2: Chunk1 (success), Write 3: Chunk2 (IOError)
525+
expect(written_chunks.length).to eq(2)
526+
expect(written_chunks).to include("TEMPLATE")
527+
expect(written_chunks).to include("Chunk1")
528+
end
492529
end
493530
end

0 commit comments

Comments
 (0)