From 3ae39e53c08ef80d2ea97d2c031b112f51a682d0 Mon Sep 17 00:00:00 2001 From: ihabadham Date: Wed, 26 Nov 2025 21:51:50 +0200 Subject: [PATCH 1/5] Add client disconnect handling for concurrent component streaming MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a client disconnects during streaming (browser closed, network drop), the response stream write raises IOError or Errno::EPIPE. Without handling, this crashes the request and wastes CPU on continued processing. Changes: 1. Writer error handling (stream.rb): - Catch IOError/EPIPE in writing task - Stop barrier to cancel producer tasks on disconnect - Log disconnect for debugging (when logging_on_server enabled) 2. Producer early termination (react_on_rails_pro_helper.rb): - Add stream.closed? check before processing each chunk - Prevents deadlock when producer blocks on full queue - Prevents wasted CPU when producer runs ahead of failed writer 3. Configuration validation (configuration.rb): - Add setter validation for concurrent_component_streaming_buffer_size - Must be a positive integer 4. Tests: - Add "stops writing when client disconnects" test - Add buffer size validation tests - Add closed? stub to stream test setup Based on error handling from PRs #2017 and #2026, adapted for the new Async::Barrier architecture introduced in PR #2111. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- .../app/helpers/react_on_rails_pro_helper.rb | 43 +++++++++++-------- .../lib/react_on_rails_pro/concerns/stream.rb | 28 ++++++++++++ .../lib/react_on_rails_pro/configuration.rb | 21 ++++++++- .../react_on_rails_pro/configuration_spec.rb | 37 ++++++++++++++++ .../spec/react_on_rails_pro/stream_spec.rb | 37 ++++++++++++++++ 5 files changed, 146 insertions(+), 20 deletions(-) diff --git a/react_on_rails_pro/app/helpers/react_on_rails_pro_helper.rb b/react_on_rails_pro/app/helpers/react_on_rails_pro_helper.rb index 69c20af010..4c48941df8 100644 --- a/react_on_rails_pro/app/helpers/react_on_rails_pro_helper.rb +++ b/react_on_rails_pro/app/helpers/react_on_rails_pro_helper.rb @@ -313,25 +313,7 @@ def consumer_stream_async(on_complete:) # Start an async task on the barrier to stream all chunks @async_barrier.async do stream = yield - is_first = true - - stream.each_chunk do |chunk| - all_chunks << chunk if on_complete # Collect for callback - - if is_first - # Store first chunk in variable for synchronous access - first_chunk_var.value = chunk - is_first = false - else - # Enqueue remaining chunks to main output queue - @main_output_queue.enqueue(chunk) - end - end - - # Handle case where stream has no chunks - first_chunk_var.value = nil if is_first - - # Call callback with all chunks when streaming completes + process_stream_chunks(stream, first_chunk_var, all_chunks) on_complete&.call(all_chunks) end @@ -340,6 +322,29 @@ def consumer_stream_async(on_complete:) first_chunk_var.value end + def process_stream_chunks(stream, first_chunk_var, all_chunks) + is_first = true + + stream.each_chunk do |chunk| + # Check if client disconnected before processing chunk + break if response.stream.closed? + + all_chunks&.push(chunk) + + if is_first + # Store first chunk in variable for synchronous return + first_chunk_var.value = chunk + is_first = false + else + # Enqueue remaining chunks to main output queue + @main_output_queue.enqueue(chunk) + end + end + + # Handle case where stream has no chunks + first_chunk_var.value = nil if is_first + end + def internal_stream_react_component(component_name, options = {}) options = options.merge(render_mode: :html_streaming) result = internal_react_component(component_name, options) diff --git a/react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb b/react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb index 5735248828..78f7c6998e 100644 --- a/react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb +++ b/react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb @@ -60,12 +60,29 @@ def stream_view_containing_react_components(template:, close_stream_at_end: true private + # Drains all streaming tasks concurrently using a producer-consumer pattern. + # + # Producer tasks: Created by consumer_stream_async in the helper, each streams + # chunks from the renderer and enqueues them to @main_output_queue. + # + # Consumer task: Single writer dequeues chunks and writes to response stream. + # + # Client disconnect handling: + # - If client disconnects (IOError/Errno::EPIPE), writer stops gracefully + # - Barrier is stopped to cancel all producer tasks, preventing wasted work + # - No exception propagates to the controller for client disconnects def drain_streams_concurrently(parent_task) + client_disconnected = false + writing_task = parent_task.async do # Drain all remaining chunks from the queue to the response stream while (chunk = @main_output_queue.dequeue) response.stream.write(chunk) end + rescue IOError, Errno::EPIPE => e + # Client disconnected - stop writing gracefully + client_disconnected = true + log_client_disconnect("writer", e) end # Wait for all component streaming tasks to complete @@ -76,9 +93,20 @@ def drain_streams_concurrently(parent_task) raise e end ensure + # If client disconnected, stop all producer tasks to avoid wasted work + @async_barrier.stop if client_disconnected + # Close the queue to signal end of streaming @main_output_queue.close writing_task.wait end + + def log_client_disconnect(context, exception) + return unless ReactOnRails.configuration.logging_on_server + + Rails.logger.debug do + "[React on Rails Pro] Client disconnected during streaming (#{context}): #{exception.class}" + end + end end end diff --git a/react_on_rails_pro/lib/react_on_rails_pro/configuration.rb b/react_on_rails_pro/lib/react_on_rails_pro/configuration.rb index b2b9e30cb8..d9aba12936 100644 --- a/react_on_rails_pro/lib/react_on_rails_pro/configuration.rb +++ b/react_on_rails_pro/lib/react_on_rails_pro/configuration.rb @@ -70,7 +70,26 @@ class Configuration # rubocop:disable Metrics/ClassLength :renderer_request_retry_limit, :throw_js_errors, :ssr_timeout, :profile_server_rendering_js_code, :raise_non_shell_server_rendering_errors, :enable_rsc_support, :rsc_payload_generation_url_path, :rsc_bundle_js_file, :react_client_manifest_file, - :react_server_client_manifest_file, :concurrent_component_streaming_buffer_size + :react_server_client_manifest_file + + attr_reader :concurrent_component_streaming_buffer_size + + # Sets the buffer size for concurrent component streaming. + # + # This value controls how many chunks can be buffered in memory during + # concurrent streaming operations. When producers generate chunks faster + # than they can be written to the client, this buffer prevents unbounded + # memory growth by blocking producers when the buffer is full. + # + # @param value [Integer] A positive integer specifying the buffer size + # @raise [ReactOnRailsPro::Error] if value is not a positive integer + def concurrent_component_streaming_buffer_size=(value) + unless value.is_a?(Integer) && value.positive? + raise ReactOnRailsPro::Error, + "config.concurrent_component_streaming_buffer_size must be a positive integer" + end + @concurrent_component_streaming_buffer_size = value + end def initialize(renderer_url: nil, renderer_password: nil, server_renderer: nil, # rubocop:disable Metrics/AbcSize renderer_use_fallback_exec_js: nil, prerender_caching: nil, diff --git a/react_on_rails_pro/spec/react_on_rails_pro/configuration_spec.rb b/react_on_rails_pro/spec/react_on_rails_pro/configuration_spec.rb index d2bbe6802c..138e422092 100644 --- a/react_on_rails_pro/spec/react_on_rails_pro/configuration_spec.rb +++ b/react_on_rails_pro/spec/react_on_rails_pro/configuration_spec.rb @@ -260,5 +260,42 @@ def self.fetch(*) expect(ReactOnRailsPro.configuration.react_server_client_manifest_file).to eq("server-client-manifest.json") end end + + describe ".concurrent_component_streaming_buffer_size" do + it "accepts positive integers" do + ReactOnRailsPro.configure do |config| + config.concurrent_component_streaming_buffer_size = 128 + end + + expect(ReactOnRailsPro.configuration.concurrent_component_streaming_buffer_size).to eq(128) + end + + it "raises error for non-positive integers" do + expect do + ReactOnRailsPro.configure do |config| + config.concurrent_component_streaming_buffer_size = 0 + end + end.to raise_error(ReactOnRailsPro::Error, + /must be a positive integer/) + end + + it "raises error for negative integers" do + expect do + ReactOnRailsPro.configure do |config| + config.concurrent_component_streaming_buffer_size = -1 + end + end.to raise_error(ReactOnRailsPro::Error, + /must be a positive integer/) + end + + it "raises error for non-integers" do + expect do + ReactOnRailsPro.configure do |config| + config.concurrent_component_streaming_buffer_size = "64" + end + end.to raise_error(ReactOnRailsPro::Error, + /must be a positive integer/) + end + end end end diff --git a/react_on_rails_pro/spec/react_on_rails_pro/stream_spec.rb b/react_on_rails_pro/spec/react_on_rails_pro/stream_spec.rb index 8ee1eed235..c43c9b714a 100644 --- a/react_on_rails_pro/spec/react_on_rails_pro/stream_spec.rb +++ b/react_on_rails_pro/spec/react_on_rails_pro/stream_spec.rb @@ -392,6 +392,7 @@ def setup_stream_test(component_count: 2) allow(mocked_response).to receive(:stream).and_return(mocked_stream) allow(mocked_stream).to receive(:write) allow(mocked_stream).to receive(:close) + allow(mocked_stream).to receive(:closed?).and_return(false) allow(controller).to receive(:response).and_return(mocked_response) [component_queues, controller, mocked_stream] @@ -489,5 +490,41 @@ def setup_stream_test(component_count: 2) gaps = write_timestamps.each_cons(2).map { |a, b| b - a } expect(gaps.all? { |gap| gap >= 0.04 }).to be true end + + it "stops writing when client disconnects" do + queues, controller, stream = setup_stream_test(component_count: 1) + + written_chunks = [] + write_count = 0 + + # Simulate client disconnect: IOError on third write + allow(stream).to receive(:write) do |chunk| + write_count += 1 + raise IOError, "client disconnected" if write_count == 3 + + written_chunks << chunk + end + + # closed? stub required by setup but not used by StreamController's simple enqueue logic + allow(stream).to receive(:closed?).and_return(false) + + run_stream(controller) do |_parent| + queues[0].enqueue("Chunk1") + sleep 0.05 + queues[0].enqueue("Chunk2") + sleep 0.05 + queues[0].enqueue("Chunk3") # This write will raise IOError + sleep 0.05 + queues[0].enqueue("Chunk4") # Should not be written + queues[0].close + sleep 0.1 + end + + # Writer catches IOError and stops - only successful writes recorded + # Write 1: TEMPLATE (success), Write 2: Chunk1 (success), Write 3: Chunk2 (IOError) + expect(written_chunks.length).to eq(2) + expect(written_chunks).to include("TEMPLATE") + expect(written_chunks).to include("Chunk1") + end end end From 47ca8b1059a6ac8db87693b270d6f6a241f48c90 Mon Sep 17 00:00:00 2001 From: ihabadham Date: Wed, 26 Nov 2025 22:26:30 +0200 Subject: [PATCH 2/5] Improve client disconnect tests: add Errno::EPIPE coverage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Reorganize disconnect tests under "client disconnect handling" describe block - Add test for Errno::EPIPE (broken pipe) in addition to IOError - Simplify test names: "stops writing on IOError/Errno::EPIPE" - Clean up comments and remove redundant assertions 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- .../spec/react_on_rails_pro/stream_spec.rb | 75 ++++++++++++------- 1 file changed, 48 insertions(+), 27 deletions(-) diff --git a/react_on_rails_pro/spec/react_on_rails_pro/stream_spec.rb b/react_on_rails_pro/spec/react_on_rails_pro/stream_spec.rb index c43c9b714a..3b3ad5921a 100644 --- a/react_on_rails_pro/spec/react_on_rails_pro/stream_spec.rb +++ b/react_on_rails_pro/spec/react_on_rails_pro/stream_spec.rb @@ -491,40 +491,61 @@ def setup_stream_test(component_count: 2) expect(gaps.all? { |gap| gap >= 0.04 }).to be true end - it "stops writing when client disconnects" do - queues, controller, stream = setup_stream_test(component_count: 1) + describe "client disconnect handling" do + it "stops writing on IOError" do + queues, controller, stream = setup_stream_test(component_count: 1) - written_chunks = [] - write_count = 0 + written_chunks = [] + write_count = 0 - # Simulate client disconnect: IOError on third write - allow(stream).to receive(:write) do |chunk| - write_count += 1 - raise IOError, "client disconnected" if write_count == 3 + allow(stream).to receive(:write) do |chunk| + write_count += 1 + raise IOError, "client disconnected" if write_count == 3 - written_chunks << chunk - end + written_chunks << chunk + end - # closed? stub required by setup but not used by StreamController's simple enqueue logic - allow(stream).to receive(:closed?).and_return(false) + run_stream(controller) do |_parent| + queues[0].enqueue("Chunk1") + sleep 0.05 + queues[0].enqueue("Chunk2") + sleep 0.05 + queues[0].enqueue("Chunk3") + sleep 0.05 + queues[0].enqueue("Chunk4") + queues[0].close + sleep 0.1 + end - run_stream(controller) do |_parent| - queues[0].enqueue("Chunk1") - sleep 0.05 - queues[0].enqueue("Chunk2") - sleep 0.05 - queues[0].enqueue("Chunk3") # This write will raise IOError - sleep 0.05 - queues[0].enqueue("Chunk4") # Should not be written - queues[0].close - sleep 0.1 + # Write 1: TEMPLATE, Write 2: Chunk1, Write 3: Chunk2 (raises IOError) + expect(written_chunks).to eq(%w[TEMPLATE Chunk1]) end - # Writer catches IOError and stops - only successful writes recorded - # Write 1: TEMPLATE (success), Write 2: Chunk1 (success), Write 3: Chunk2 (IOError) - expect(written_chunks.length).to eq(2) - expect(written_chunks).to include("TEMPLATE") - expect(written_chunks).to include("Chunk1") + it "stops writing on Errno::EPIPE" do + queues, controller, stream = setup_stream_test(component_count: 1) + + written_chunks = [] + write_count = 0 + + allow(stream).to receive(:write) do |chunk| + write_count += 1 + raise Errno::EPIPE, "broken pipe" if write_count == 3 + + written_chunks << chunk + end + + run_stream(controller) do |_parent| + queues[0].enqueue("Chunk1") + sleep 0.05 + queues[0].enqueue("Chunk2") + sleep 0.05 + queues[0].enqueue("Chunk3") + queues[0].close + sleep 0.1 + end + + expect(written_chunks).to eq(%w[TEMPLATE Chunk1]) + end end end end From b3dcc58c4bf10e6dbbe3ad1da94772288e3469a6 Mon Sep 17 00:00:00 2001 From: ihabadham Date: Wed, 26 Nov 2025 23:49:05 +0200 Subject: [PATCH 3/5] Add test coverage for producer early termination on client disconnect MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Test that response.stream.closed? check in process_stream_chunks actually stops processing when client disconnects early. Also adds required response mocks to existing streaming tests. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- .../helpers/react_on_rails_pro_helper_spec.rb | 64 ++++++++++++++++++- 1 file changed, 62 insertions(+), 2 deletions(-) diff --git a/react_on_rails_pro/spec/dummy/spec/helpers/react_on_rails_pro_helper_spec.rb b/react_on_rails_pro/spec/dummy/spec/helpers/react_on_rails_pro_helper_spec.rb index e8b90fd01f..6297c025b7 100644 --- a/react_on_rails_pro/spec/dummy/spec/helpers/react_on_rails_pro_helper_spec.rb +++ b/react_on_rails_pro/spec/dummy/spec/helpers/react_on_rails_pro_helper_spec.rb @@ -362,7 +362,9 @@ def mock_request_and_response(mock_chunks = chunks, count: 1) end end - describe "#stream_react_component" do + describe "#stream_react_component" do # rubocop:disable RSpec/MultipleMemoizedHelpers + let(:mocked_rails_stream) { instance_double(ActionController::Live::Buffer) } + around do |example| # Wrap each test in Sync block to provide async context Sync do @@ -378,6 +380,14 @@ def mock_request_and_response(mock_chunks = chunks, count: 1) end end + before do + # Mock response.stream.closed? for client disconnect detection + allow(mocked_rails_stream).to receive(:closed?).and_return(false) + mocked_rails_response = instance_double(ActionDispatch::Response) + allow(mocked_rails_response).to receive(:stream).and_return(mocked_rails_stream) + allow(self).to receive(:response).and_return(mocked_rails_response) + end + it "returns the component shell that exist in the initial chunk with the consoleReplayScript" do mock_request_and_response initial_result = stream_react_component(component_name, props: props, **component_options) @@ -452,6 +462,38 @@ def mock_request_and_response(mock_chunks = chunks, count: 1) expect(collected_chunks[1]).to include(chunks_with_whitespaces[2][:html]) expect(collected_chunks[2]).to include(chunks_with_whitespaces[3][:html]) end + + it "stops processing chunks when client disconnects" do + many_chunks = Array.new(10) do |i| + { html: "
Chunk #{i}
", consoleReplayScript: "" } + end + mock_request_and_response(many_chunks) + + # Simulate client disconnect after first chunk + call_count = 0 + allow(mocked_rails_stream).to receive(:closed?) do + call_count += 1 + call_count > 1 # false for first call, true after + end + + # Start streaming - first chunk returned synchronously + initial_result = stream_react_component(component_name, props: props, **component_options) + expect(initial_result).to include("
Chunk 0
") + + # Wait for async task to complete + @async_barrier.wait + @main_output_queue.close + + # Collect chunks that were enqueued to output + collected_chunks = [] + while (chunk = @main_output_queue.dequeue) + collected_chunks << chunk + end + + # Should have stopped early - not all chunks processed + # The exact count depends on timing, but should be less than 9 (all remaining) + expect(collected_chunks.length).to be < 9 + end end describe "stream_view_containing_react_components" do # rubocop:disable RSpec/MultipleMemoizedHelpers @@ -476,6 +518,7 @@ def mock_request_and_response(mock_chunks = chunks, count: 1) written_chunks << chunk end allow(mocked_stream).to receive(:close) + allow(mocked_stream).to receive(:closed?).and_return(false) mocked_response = instance_double(ActionDispatch::Response) allow(mocked_response).to receive(:stream).and_return(mocked_stream) allow(self).to receive(:response).and_return(mocked_response) @@ -565,6 +608,7 @@ def execute_stream_view_containing_react_components written_chunks.clear allow(mocked_stream).to receive(:write) { |chunk| written_chunks << chunk } allow(mocked_stream).to receive(:close) + allow(mocked_stream).to receive(:closed?).and_return(false) mocked_response = instance_double(ActionDispatch::Response) allow(mocked_response).to receive(:stream).and_return(mocked_stream) allow(self).to receive(:response).and_return(mocked_response) @@ -709,7 +753,9 @@ def run_stream end end - describe "cached_stream_react_component integration with RandomValue", :caching do + describe "cached_stream_react_component integration with RandomValue", :caching do # rubocop:disable RSpec/MultipleMemoizedHelpers + let(:mocked_stream) { instance_double(ActionController::Live::Buffer) } + around do |example| original_prerender_caching = ReactOnRailsPro.configuration.prerender_caching ReactOnRailsPro.configuration.prerender_caching = true @@ -720,6 +766,13 @@ def run_stream Rails.cache.clear end + before do + allow(mocked_stream).to receive(:closed?).and_return(false) + mocked_response = instance_double(ActionDispatch::Response) + allow(mocked_response).to receive(:stream).and_return(mocked_stream) + allow(self).to receive(:response).and_return(mocked_response) + end + # we need this setup because we can't use the helper outside of stream_view_containing_react_components def render_cached_random_value(cache_key) # Streaming helpers require this context normally provided by stream_view_containing_react_components @@ -780,6 +833,7 @@ def render_cached_random_value(cache_key) { html: "
Test Content
", consoleReplayScript: "" } ] end + let(:mocked_stream) { instance_double(ActionController::Live::Buffer) } around do |example| Sync do @@ -790,6 +844,12 @@ def render_cached_random_value(cache_key) end before do + # Mock response.stream.closed? for client disconnect detection + allow(mocked_stream).to receive(:closed?).and_return(false) + mocked_response = instance_double(ActionDispatch::Response) + allow(mocked_response).to receive(:stream).and_return(mocked_stream) + allow(self).to receive(:response).and_return(mocked_response) + ReactOnRailsPro::Request.instance_variable_set(:@connection, nil) original_httpx_plugin = HTTPX.method(:plugin) allow(HTTPX).to receive(:plugin) do |*args| From b0c86d010c0b6aa9a5afa8cea715158a53668cc0 Mon Sep 17 00:00:00 2001 From: ihabadham Date: Thu, 27 Nov 2025 00:03:24 +0200 Subject: [PATCH 4/5] Remove redundant buffer size validation method MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The concurrent_component_streaming_buffer_size setter already validates the value when assigned (lines 86-92), making the separate validation method and its call in setup_config_values unnecessary. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- .../lib/react_on_rails_pro/configuration.rb | 9 --------- 1 file changed, 9 deletions(-) diff --git a/react_on_rails_pro/lib/react_on_rails_pro/configuration.rb b/react_on_rails_pro/lib/react_on_rails_pro/configuration.rb index d9aba12936..3342943fd8 100644 --- a/react_on_rails_pro/lib/react_on_rails_pro/configuration.rb +++ b/react_on_rails_pro/lib/react_on_rails_pro/configuration.rb @@ -137,7 +137,6 @@ def setup_config_values validate_remote_bundle_cache_adapter setup_renderer_password setup_assets_to_copy - validate_concurrent_component_streaming_buffer_size setup_execjs_profiler_if_needed check_react_on_rails_support_for_rsc end @@ -229,14 +228,6 @@ def validate_remote_bundle_cache_adapter end end - def validate_concurrent_component_streaming_buffer_size - return if concurrent_component_streaming_buffer_size.is_a?(Integer) && - concurrent_component_streaming_buffer_size.positive? - - raise ReactOnRailsPro::Error, - "config.concurrent_component_streaming_buffer_size must be a positive integer" - end - def setup_renderer_password return if renderer_password.present? From c023053b2f650758b0088b2f5e13adf3a4b6e75b Mon Sep 17 00:00:00 2001 From: ihabadham Date: Thu, 27 Nov 2025 00:19:50 +0200 Subject: [PATCH 5/5] Fix race condition in drain_streams_concurrently ensure block MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The ensure block was checking client_disconnected before waiting for writing_task to complete. This caused a race where the flag might still be false when checked, even though writing_task would set it to true moments later. Fixed by reordering the ensure block: 1. Close queue (unblocks writing_task if waiting on dequeue) 2. Wait for writing_task (ensures flag is set if disconnect occurred) 3. Check flag and stop barrier if needed This ensures producers are properly stopped when clients disconnect, preventing wasted CPU cycles. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- .../lib/react_on_rails_pro/concerns/stream.rb | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb b/react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb index 78f7c6998e..78c6de721c 100644 --- a/react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb +++ b/react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb @@ -93,12 +93,16 @@ def drain_streams_concurrently(parent_task) raise e end ensure - # If client disconnected, stop all producer tasks to avoid wasted work - @async_barrier.stop if client_disconnected - - # Close the queue to signal end of streaming + # Close the queue first to unblock writing_task (it may be waiting on dequeue) @main_output_queue.close + + # Wait for writing_task to ensure client_disconnected flag is set + # before we check it (fixes race condition where ensure runs before + # writing_task's rescue block sets the flag) writing_task.wait + + # If client disconnected, stop all producer tasks to avoid wasted work + @async_barrier.stop if client_disconnected end def log_client_disconnect(context, exception)