Skip to content

Commit de3977d

Browse files
ihabadhamclaude
andauthored
Add client disconnect handling for concurrent component streaming (#2137)
## Summary Adds error handling for client disconnects during streaming, which was missing after the architecture change in #2111. ## Background - **#2015** introduced concurrent component streaming with `Async::Semaphore` - **#2017** and **#2026** added client disconnect handling for that architecture (both closed, not merged) - **#2111** rewrote the streaming architecture using `Async::Barrier` + `Async::Variable` + `Async::LimitedQueue` - The error handling from #2017/#2026 was never adapted for the new architecture ## Problem When a client disconnects mid-stream (browser closed, network drop), `response.stream.write` raises `IOError` or `Errno::EPIPE`. Without handling: 1. The exception crashes the request 2. Producer tasks continue processing and enqueueing chunks unnecessarily, wasting CPU ## Changes ### 1. Writer error handling (`stream.rb`) - Catch `IOError`/`Errno::EPIPE` in writing task - Set `client_disconnected` flag and stop barrier to cancel producer tasks - Log disconnect for debugging (when `logging_on_server` enabled) ### 2. Producer early termination (`react_on_rails_pro_helper.rb`) - Add `stream.closed?` check before processing each chunk - Prevents deadlock when producer blocks on full queue after writer dies - Prevents wasted CPU when producer runs ahead of failed writer - Extract `process_stream_chunks` method for clarity ### 3. Configuration validation (`configuration.rb`) - Add setter validation for `concurrent_component_streaming_buffer_size` - Must be a positive integer ### 4. Tests - Add `client disconnect handling` describe block with IOError and EPIPE tests - Add buffer size validation tests - Add `closed?` stub to stream test setup ## Test Plan - [x] `bundle exec rspec spec/react_on_rails_pro/` - all 152 tests pass - [x] `bundle exec rubocop` - no offenses - [x] Verified tests fail on master, pass on this branch 🤖 Generated with [Claude Code](https://claude.com/claude-code) <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **Bug Fixes** * Improved streaming stability: better client-disconnect detection, logging, and shutdown to avoid wasted work while preserving previous chunk delivery semantics. * **Configuration** * Added a validated setting for concurrent streaming buffer size; only positive integers are accepted. * **Enhancements** * Centralized stream chunk processing for more consistent handling of first and subsequent chunks. * **Tests** * Added tests covering disconnect scenarios and buffer-size validation. <sub>✏️ Tip: You can customize this high-level summary in your review settings.</sub> <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: Claude <[email protected]>
1 parent 16b3908 commit de3977d

File tree

6 files changed

+234
-32
lines changed

6 files changed

+234
-32
lines changed

react_on_rails_pro/app/helpers/react_on_rails_pro_helper.rb

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -313,25 +313,7 @@ def consumer_stream_async(on_complete:)
313313
# Start an async task on the barrier to stream all chunks
314314
@async_barrier.async do
315315
stream = yield
316-
is_first = true
317-
318-
stream.each_chunk do |chunk|
319-
all_chunks << chunk if on_complete # Collect for callback
320-
321-
if is_first
322-
# Store first chunk in variable for synchronous access
323-
first_chunk_var.value = chunk
324-
is_first = false
325-
else
326-
# Enqueue remaining chunks to main output queue
327-
@main_output_queue.enqueue(chunk)
328-
end
329-
end
330-
331-
# Handle case where stream has no chunks
332-
first_chunk_var.value = nil if is_first
333-
334-
# Call callback with all chunks when streaming completes
316+
process_stream_chunks(stream, first_chunk_var, all_chunks)
335317
on_complete&.call(all_chunks)
336318
end
337319

@@ -340,6 +322,29 @@ def consumer_stream_async(on_complete:)
340322
first_chunk_var.value
341323
end
342324

325+
def process_stream_chunks(stream, first_chunk_var, all_chunks)
326+
is_first = true
327+
328+
stream.each_chunk do |chunk|
329+
# Check if client disconnected before processing chunk
330+
break if response.stream.closed?
331+
332+
all_chunks&.push(chunk)
333+
334+
if is_first
335+
# Store first chunk in variable for synchronous return
336+
first_chunk_var.value = chunk
337+
is_first = false
338+
else
339+
# Enqueue remaining chunks to main output queue
340+
@main_output_queue.enqueue(chunk)
341+
end
342+
end
343+
344+
# Handle case where stream has no chunks
345+
first_chunk_var.value = nil if is_first
346+
end
347+
343348
def internal_stream_react_component(component_name, options = {})
344349
options = options.merge(render_mode: :html_streaming)
345350
result = internal_react_component(component_name, options)

react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,29 @@ def stream_view_containing_react_components(template:, close_stream_at_end: true
6060

6161
private
6262

63+
# Drains all streaming tasks concurrently using a producer-consumer pattern.
64+
#
65+
# Producer tasks: Created by consumer_stream_async in the helper, each streams
66+
# chunks from the renderer and enqueues them to @main_output_queue.
67+
#
68+
# Consumer task: Single writer dequeues chunks and writes to response stream.
69+
#
70+
# Client disconnect handling:
71+
# - If client disconnects (IOError/Errno::EPIPE), writer stops gracefully
72+
# - Barrier is stopped to cancel all producer tasks, preventing wasted work
73+
# - No exception propagates to the controller for client disconnects
6374
def drain_streams_concurrently(parent_task)
75+
client_disconnected = false
76+
6477
writing_task = parent_task.async do
6578
# Drain all remaining chunks from the queue to the response stream
6679
while (chunk = @main_output_queue.dequeue)
6780
response.stream.write(chunk)
6881
end
82+
rescue IOError, Errno::EPIPE => e
83+
# Client disconnected - stop writing gracefully
84+
client_disconnected = true
85+
log_client_disconnect("writer", e)
6986
end
7087

7188
# Wait for all component streaming tasks to complete
@@ -76,9 +93,24 @@ def drain_streams_concurrently(parent_task)
7693
raise e
7794
end
7895
ensure
79-
# Close the queue to signal end of streaming
96+
# Close the queue first to unblock writing_task (it may be waiting on dequeue)
8097
@main_output_queue.close
98+
99+
# Wait for writing_task to ensure client_disconnected flag is set
100+
# before we check it (fixes race condition where ensure runs before
101+
# writing_task's rescue block sets the flag)
81102
writing_task.wait
103+
104+
# If client disconnected, stop all producer tasks to avoid wasted work
105+
@async_barrier.stop if client_disconnected
106+
end
107+
108+
def log_client_disconnect(context, exception)
109+
return unless ReactOnRails.configuration.logging_on_server
110+
111+
Rails.logger.debug do
112+
"[React on Rails Pro] Client disconnected during streaming (#{context}): #{exception.class}"
113+
end
82114
end
83115
end
84116
end

react_on_rails_pro/lib/react_on_rails_pro/configuration.rb

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,26 @@ class Configuration # rubocop:disable Metrics/ClassLength
7070
:renderer_request_retry_limit, :throw_js_errors, :ssr_timeout,
7171
:profile_server_rendering_js_code, :raise_non_shell_server_rendering_errors, :enable_rsc_support,
7272
:rsc_payload_generation_url_path, :rsc_bundle_js_file, :react_client_manifest_file,
73-
:react_server_client_manifest_file, :concurrent_component_streaming_buffer_size
73+
:react_server_client_manifest_file
74+
75+
attr_reader :concurrent_component_streaming_buffer_size
76+
77+
# Sets the buffer size for concurrent component streaming.
78+
#
79+
# This value controls how many chunks can be buffered in memory during
80+
# concurrent streaming operations. When producers generate chunks faster
81+
# than they can be written to the client, this buffer prevents unbounded
82+
# memory growth by blocking producers when the buffer is full.
83+
#
84+
# @param value [Integer] A positive integer specifying the buffer size
85+
# @raise [ReactOnRailsPro::Error] if value is not a positive integer
86+
def concurrent_component_streaming_buffer_size=(value)
87+
unless value.is_a?(Integer) && value.positive?
88+
raise ReactOnRailsPro::Error,
89+
"config.concurrent_component_streaming_buffer_size must be a positive integer"
90+
end
91+
@concurrent_component_streaming_buffer_size = value
92+
end
7493

7594
def initialize(renderer_url: nil, renderer_password: nil, server_renderer: nil, # rubocop:disable Metrics/AbcSize
7695
renderer_use_fallback_exec_js: nil, prerender_caching: nil,
@@ -118,7 +137,6 @@ def setup_config_values
118137
validate_remote_bundle_cache_adapter
119138
setup_renderer_password
120139
setup_assets_to_copy
121-
validate_concurrent_component_streaming_buffer_size
122140
setup_execjs_profiler_if_needed
123141
check_react_on_rails_support_for_rsc
124142
end
@@ -210,14 +228,6 @@ def validate_remote_bundle_cache_adapter
210228
end
211229
end
212230

213-
def validate_concurrent_component_streaming_buffer_size
214-
return if concurrent_component_streaming_buffer_size.is_a?(Integer) &&
215-
concurrent_component_streaming_buffer_size.positive?
216-
217-
raise ReactOnRailsPro::Error,
218-
"config.concurrent_component_streaming_buffer_size must be a positive integer"
219-
end
220-
221231
def setup_renderer_password
222232
return if renderer_password.present?
223233

react_on_rails_pro/spec/dummy/spec/helpers/react_on_rails_pro_helper_spec.rb

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,9 @@ def mock_request_and_response(mock_chunks = chunks, count: 1)
362362
end
363363
end
364364

365-
describe "#stream_react_component" do
365+
describe "#stream_react_component" do # rubocop:disable RSpec/MultipleMemoizedHelpers
366+
let(:mocked_rails_stream) { instance_double(ActionController::Live::Buffer) }
367+
366368
around do |example|
367369
# Wrap each test in Sync block to provide async context
368370
Sync do
@@ -378,6 +380,14 @@ def mock_request_and_response(mock_chunks = chunks, count: 1)
378380
end
379381
end
380382

383+
before do
384+
# Mock response.stream.closed? for client disconnect detection
385+
allow(mocked_rails_stream).to receive(:closed?).and_return(false)
386+
mocked_rails_response = instance_double(ActionDispatch::Response)
387+
allow(mocked_rails_response).to receive(:stream).and_return(mocked_rails_stream)
388+
allow(self).to receive(:response).and_return(mocked_rails_response)
389+
end
390+
381391
it "returns the component shell that exist in the initial chunk with the consoleReplayScript" do
382392
mock_request_and_response
383393
initial_result = stream_react_component(component_name, props: props, **component_options)
@@ -452,6 +462,38 @@ def mock_request_and_response(mock_chunks = chunks, count: 1)
452462
expect(collected_chunks[1]).to include(chunks_with_whitespaces[2][:html])
453463
expect(collected_chunks[2]).to include(chunks_with_whitespaces[3][:html])
454464
end
465+
466+
it "stops processing chunks when client disconnects" do
467+
many_chunks = Array.new(10) do |i|
468+
{ html: "<div>Chunk #{i}</div>", consoleReplayScript: "" }
469+
end
470+
mock_request_and_response(many_chunks)
471+
472+
# Simulate client disconnect after first chunk
473+
call_count = 0
474+
allow(mocked_rails_stream).to receive(:closed?) do
475+
call_count += 1
476+
call_count > 1 # false for first call, true after
477+
end
478+
479+
# Start streaming - first chunk returned synchronously
480+
initial_result = stream_react_component(component_name, props: props, **component_options)
481+
expect(initial_result).to include("<div>Chunk 0</div>")
482+
483+
# Wait for async task to complete
484+
@async_barrier.wait
485+
@main_output_queue.close
486+
487+
# Collect chunks that were enqueued to output
488+
collected_chunks = []
489+
while (chunk = @main_output_queue.dequeue)
490+
collected_chunks << chunk
491+
end
492+
493+
# Should have stopped early - not all chunks processed
494+
# The exact count depends on timing, but should be less than 9 (all remaining)
495+
expect(collected_chunks.length).to be < 9
496+
end
455497
end
456498

457499
describe "stream_view_containing_react_components" do # rubocop:disable RSpec/MultipleMemoizedHelpers
@@ -476,6 +518,7 @@ def mock_request_and_response(mock_chunks = chunks, count: 1)
476518
written_chunks << chunk
477519
end
478520
allow(mocked_stream).to receive(:close)
521+
allow(mocked_stream).to receive(:closed?).and_return(false)
479522
mocked_response = instance_double(ActionDispatch::Response)
480523
allow(mocked_response).to receive(:stream).and_return(mocked_stream)
481524
allow(self).to receive(:response).and_return(mocked_response)
@@ -565,6 +608,7 @@ def execute_stream_view_containing_react_components
565608
written_chunks.clear
566609
allow(mocked_stream).to receive(:write) { |chunk| written_chunks << chunk }
567610
allow(mocked_stream).to receive(:close)
611+
allow(mocked_stream).to receive(:closed?).and_return(false)
568612
mocked_response = instance_double(ActionDispatch::Response)
569613
allow(mocked_response).to receive(:stream).and_return(mocked_stream)
570614
allow(self).to receive(:response).and_return(mocked_response)
@@ -709,7 +753,9 @@ def run_stream
709753
end
710754
end
711755

712-
describe "cached_stream_react_component integration with RandomValue", :caching do
756+
describe "cached_stream_react_component integration with RandomValue", :caching do # rubocop:disable RSpec/MultipleMemoizedHelpers
757+
let(:mocked_stream) { instance_double(ActionController::Live::Buffer) }
758+
713759
around do |example|
714760
original_prerender_caching = ReactOnRailsPro.configuration.prerender_caching
715761
ReactOnRailsPro.configuration.prerender_caching = true
@@ -720,6 +766,13 @@ def run_stream
720766
Rails.cache.clear
721767
end
722768

769+
before do
770+
allow(mocked_stream).to receive(:closed?).and_return(false)
771+
mocked_response = instance_double(ActionDispatch::Response)
772+
allow(mocked_response).to receive(:stream).and_return(mocked_stream)
773+
allow(self).to receive(:response).and_return(mocked_response)
774+
end
775+
723776
# we need this setup because we can't use the helper outside of stream_view_containing_react_components
724777
def render_cached_random_value(cache_key)
725778
# Streaming helpers require this context normally provided by stream_view_containing_react_components
@@ -780,6 +833,7 @@ def render_cached_random_value(cache_key)
780833
{ html: "<div>Test Content</div>", consoleReplayScript: "" }
781834
]
782835
end
836+
let(:mocked_stream) { instance_double(ActionController::Live::Buffer) }
783837

784838
around do |example|
785839
Sync do
@@ -790,6 +844,12 @@ def render_cached_random_value(cache_key)
790844
end
791845

792846
before do
847+
# Mock response.stream.closed? for client disconnect detection
848+
allow(mocked_stream).to receive(:closed?).and_return(false)
849+
mocked_response = instance_double(ActionDispatch::Response)
850+
allow(mocked_response).to receive(:stream).and_return(mocked_stream)
851+
allow(self).to receive(:response).and_return(mocked_response)
852+
793853
ReactOnRailsPro::Request.instance_variable_set(:@connection, nil)
794854
original_httpx_plugin = HTTPX.method(:plugin)
795855
allow(HTTPX).to receive(:plugin) do |*args|

react_on_rails_pro/spec/react_on_rails_pro/configuration_spec.rb

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,5 +260,42 @@ def self.fetch(*)
260260
expect(ReactOnRailsPro.configuration.react_server_client_manifest_file).to eq("server-client-manifest.json")
261261
end
262262
end
263+
264+
describe ".concurrent_component_streaming_buffer_size" do
265+
it "accepts positive integers" do
266+
ReactOnRailsPro.configure do |config|
267+
config.concurrent_component_streaming_buffer_size = 128
268+
end
269+
270+
expect(ReactOnRailsPro.configuration.concurrent_component_streaming_buffer_size).to eq(128)
271+
end
272+
273+
it "raises error for non-positive integers" do
274+
expect do
275+
ReactOnRailsPro.configure do |config|
276+
config.concurrent_component_streaming_buffer_size = 0
277+
end
278+
end.to raise_error(ReactOnRailsPro::Error,
279+
/must be a positive integer/)
280+
end
281+
282+
it "raises error for negative integers" do
283+
expect do
284+
ReactOnRailsPro.configure do |config|
285+
config.concurrent_component_streaming_buffer_size = -1
286+
end
287+
end.to raise_error(ReactOnRailsPro::Error,
288+
/must be a positive integer/)
289+
end
290+
291+
it "raises error for non-integers" do
292+
expect do
293+
ReactOnRailsPro.configure do |config|
294+
config.concurrent_component_streaming_buffer_size = "64"
295+
end
296+
end.to raise_error(ReactOnRailsPro::Error,
297+
/must be a positive integer/)
298+
end
299+
end
263300
end
264301
end

0 commit comments

Comments
 (0)