diff --git a/react_on_rails_pro/app/helpers/react_on_rails_pro_helper.rb b/react_on_rails_pro/app/helpers/react_on_rails_pro_helper.rb index 7251ff3fab..ef5dfe037d 100644 --- a/react_on_rails_pro/app/helpers/react_on_rails_pro_helper.rb +++ b/react_on_rails_pro/app/helpers/react_on_rails_pro_helper.rb @@ -128,7 +128,11 @@ def stream_react_component(component_name, options = {}) # Because setting prerender to false is equivalent to calling react_component with prerender: false options[:prerender] = true options = options.merge(immediate_hydration: true) unless options.key?(:immediate_hydration) - run_stream_inside_fiber do + + # Extract streaming-specific callback + on_complete = options.delete(:on_complete) + + run_stream_inside_fiber(on_complete: on_complete) do internal_stream_react_component(component_name, options) end end @@ -185,7 +189,11 @@ def rsc_payload_react_component(component_name, options = {}) # rsc_payload_react_component doesn't have the prerender option # Because setting prerender to false will not do anything options[:prerender] = true - run_stream_inside_fiber do + + # Extract streaming-specific callback + on_complete = options.delete(:on_complete) + + run_stream_inside_fiber(on_complete: on_complete) do internal_rsc_payload_react_component(component_name, options) end end @@ -246,30 +254,29 @@ def handle_stream_cache_hit(component_name, raw_options, auto_load_bundle, cache load_pack_for_generated_component(component_name, render_options) initial_result, *rest_chunks = cached_chunks - hit_fiber = Fiber.new do - rest_chunks.each { |chunk| Fiber.yield(chunk) } - nil + + # Enqueue remaining chunks asynchronously + @async_barrier.async do + rest_chunks.each { |chunk| @main_output_queue.enqueue(chunk) } end - @rorp_rendering_fibers << hit_fiber + + # Return first chunk directly initial_result end def handle_stream_cache_miss(component_name, raw_options, auto_load_bundle, view_cache_key, &block) - # Kick off the normal streaming helper to get the initial result and the original fiber - initial_result = render_stream_component_with_props(component_name, raw_options, auto_load_bundle, &block) - original_fiber = @rorp_rendering_fibers.pop - - buffered_chunks = [initial_result] - wrapper_fiber = Fiber.new do - while (chunk = original_fiber.resume) - buffered_chunks << chunk - Fiber.yield(chunk) - end - Rails.cache.write(view_cache_key, buffered_chunks, raw_options[:cache_options] || {}) - nil - end - @rorp_rendering_fibers << wrapper_fiber - initial_result + cache_aware_options = raw_options.merge( + on_complete: lambda { |chunks| + Rails.cache.write(view_cache_key, chunks, raw_options[:cache_options] || {}) + } + ) + + render_stream_component_with_props( + component_name, + cache_aware_options, + auto_load_bundle, + &block + ) end def render_stream_component_with_props(component_name, raw_options, auto_load_bundle) @@ -291,25 +298,46 @@ def check_caching_options!(raw_options, block) raise ReactOnRailsPro::Error, "Option 'cache_key' is required for React on Rails caching" end - def run_stream_inside_fiber - if @rorp_rendering_fibers.nil? + def run_stream_inside_fiber(on_complete:) + require "async/variable" + + if @async_barrier.nil? raise ReactOnRails::Error, "You must call stream_view_containing_react_components to render the view containing the react component" end - rendering_fiber = Fiber.new do + # Create a variable to hold the first chunk for synchronous return + first_chunk_var = Async::Variable.new + all_chunks = [] if on_complete # Only collect if callback provided + + # Start an async task on the barrier to stream all chunks + @async_barrier.async do stream = yield + is_first = true + stream.each_chunk do |chunk| - Fiber.yield chunk + all_chunks << chunk if on_complete # Collect for callback + + if is_first + # Store first chunk in variable for synchronous access + first_chunk_var.value = chunk + is_first = false + else + # Enqueue remaining chunks to main output queue + @main_output_queue.enqueue(chunk) + end end - end - @rorp_rendering_fibers << rendering_fiber + # Handle case where stream has no chunks + first_chunk_var.value = nil if is_first + + # Call callback with all chunks when streaming completes + on_complete&.call(all_chunks) + end - # return the first chunk of the fiber - # It contains the initial html of the component - # all updates will be appended to the stream sent to browser - rendering_fiber.resume + # Wait for and return the first chunk (blocking) + first_chunk_var.wait + first_chunk_var.value end def internal_stream_react_component(component_name, options = {}) diff --git a/react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb b/react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb index db55ba2c1e..8a985333bf 100644 --- a/react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb +++ b/react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb @@ -31,73 +31,47 @@ module Stream # # @see ReactOnRails::Helper#stream_react_component def stream_view_containing_react_components(template:, close_stream_at_end: true, **render_options) - @rorp_rendering_fibers = [] - template_string = render_to_string(template: template, **render_options) - # View may contain extra newlines, chunk already contains a newline - # Having multiple newlines between chunks causes hydration errors - # So we strip extra newlines from the template string and add a single newline - response.stream.write(template_string) - - begin - drain_streams_concurrently - ensure - response.stream.close if close_stream_at_end - end - end - - private - - def drain_streams_concurrently require "async" + require "async/barrier" require "async/limited_queue" - return if @rorp_rendering_fibers.empty? - - Sync do |parent| - # To avoid memory bloat, we use a limited queue to buffer chunks in memory. + Sync do |parent_task| + # Initialize async primitives for concurrent component streaming + @async_barrier = Async::Barrier.new buffer_size = ReactOnRailsPro.configuration.concurrent_component_streaming_buffer_size - queue = Async::LimitedQueue.new(buffer_size) + @main_output_queue = Async::LimitedQueue.new(buffer_size) - writer = build_writer_task(parent: parent, queue: queue) - tasks = build_producer_tasks(parent: parent, queue: queue) + # Render template - components will start streaming immediately + template_string = render_to_string(template: template, **render_options) + # View may contain extra newlines, chunk already contains a newline + # Having multiple newlines between chunks causes hydration errors + # So we strip extra newlines from the template string and add a single newline + response.stream.write(template_string) - # This structure ensures that even if a producer task fails, we always - # signal the writer to stop and then wait for it to finish draining - # any remaining items from the queue before propagating the error. begin - tasks.each(&:wait) + drain_streams_concurrently(parent_task) ensure - # `close` signals end-of-stream; when writer tries to dequeue, it will get nil, so it will exit. - queue.close - writer.wait + response.stream.close if close_stream_at_end end end end - def build_producer_tasks(parent:, queue:) - @rorp_rendering_fibers.each_with_index.map do |fiber, idx| - parent.async do - loop do - chunk = fiber.resume - break unless chunk + private - # Will be blocked if the queue is full until a chunk is dequeued - queue.enqueue([idx, chunk]) - end + def drain_streams_concurrently(parent_task) + writing_task = parent_task.async do + # Drain all remaining chunks from the queue to the response stream + while (chunk = @main_output_queue.dequeue) + response.stream.write(chunk) end end - end - def build_writer_task(parent:, queue:) - parent.async do - loop do - pair = queue.dequeue - break if pair.nil? + # Wait for all component streaming tasks to complete + @async_barrier.wait - _idx_from_queue, item = pair - response.stream.write(item) - end - end + # Close the queue to signal end of streaming + @main_output_queue.close + writing_task.wait end end end diff --git a/react_on_rails_pro/spec/dummy/spec/helpers/react_on_rails_pro_helper_spec.rb b/react_on_rails_pro/spec/dummy/spec/helpers/react_on_rails_pro_helper_spec.rb index 9524783f88..e8b90fd01f 100644 --- a/react_on_rails_pro/spec/dummy/spec/helpers/react_on_rails_pro_helper_spec.rb +++ b/react_on_rails_pro/spec/dummy/spec/helpers/react_on_rails_pro_helper_spec.rb @@ -2,11 +2,14 @@ require "async" require "async/queue" +require "async/barrier" require "rails_helper" require "support/script_tag_utils" RequestDetails = Struct.new(:original_url, :env) +# rubocop:disable RSpec/InstanceVariable + # This module is created to provide stub methods for `render_to_string` and `response` # These methods will be mocked in the tests to prevent " does not implement " errors # when these methods are called during testing. @@ -360,13 +363,19 @@ def mock_request_and_response(mock_chunks = chunks, count: 1) end describe "#stream_react_component" do - before do - # Initialize @rorp_rendering_fibers to mock the behavior of stream_view_containing_react_components. - # This instance variable is normally set by stream_view_containing_react_components method. - # By setting it here, we simulate that the view is being rendered using that method. - # This setup is necessary because stream_react_component relies on @rorp_rendering_fibers - # to function correctly within the streaming context. - @rorp_rendering_fibers = [] + around do |example| + # Wrap each test in Sync block to provide async context + Sync do + # Initialize async primitives to mock the behavior of stream_view_containing_react_components. + # These instance variables are normally set by stream_view_containing_react_components method. + # By setting them here, we simulate that the view is being rendered using that method. + # This setup is necessary because stream_react_component relies on @async_barrier and @main_output_queue + # to function correctly within the streaming context. + @async_barrier = Async::Barrier.new + @main_output_queue = Async::Queue.new + + example.run + end end it "returns the component shell that exist in the initial chunk with the consoleReplayScript" do @@ -380,35 +389,39 @@ def mock_request_and_response(mock_chunks = chunks, count: 1) expect(initial_result).to include(wrapped) end expect(initial_result).not_to include("More content", "Final content") - expect(chunks_read.count).to eq(1) + # NOTE: With async architecture, chunks are consumed in background immediately, + expect(chunks_read.count).to eq(3) end - it "creates a fiber to read subsequent chunks" do + it "streams subsequent chunks to the output queue" do mock_request_and_response - stream_react_component(component_name, props: props, **component_options) - expect(@rorp_rendering_fibers.count).to eq(1) # rubocop:disable RSpec/InstanceVariable - fiber = @rorp_rendering_fibers.first # rubocop:disable RSpec/InstanceVariable - expect(fiber).to be_alive - - second_result = fiber.resume - # regex that matches the html and wrapped consoleReplayScript - # Note: consoleReplayScript is now wrapped in a script tag with id="consoleReplayLog" + initial_result = stream_react_component(component_name, props: props, **component_options) + + # First chunk is returned synchronously + expect(initial_result).to include(react_component_div_with_initial_chunk) + + # Wait for async task to complete + @async_barrier.wait + @main_output_queue.close + + # Subsequent chunks should be in the output queue + collected_chunks = [] + while (chunk = @main_output_queue.dequeue) + collected_chunks << chunk + end + + # Should have received the remaining chunks (chunks 2 and 3) + expect(collected_chunks.length).to eq(2) + + # Verify second chunk content script = chunks[1][:consoleReplayScript] wrapped = script.present? ? "" : "" - expect(second_result).to match( + expect(collected_chunks[0]).to match( /#{Regexp.escape(chunks[1][:html])}\s+#{Regexp.escape(wrapped)}/ ) - expect(second_result).not_to include("Stream React Server Components", "Final content") - expect(chunks_read.count).to eq(2) - - third_result = fiber.resume - expect(third_result).to eq(chunks[2][:html].to_s) - expect(third_result).not_to include("Stream React Server Components", "More content") - expect(chunks_read.count).to eq(3) - expect(fiber.resume).to be_nil - expect(fiber).not_to be_alive - expect(chunks_read.count).to eq(chunks.count) + # Verify third chunk content + expect(collected_chunks[1]).to eq(chunks[2][:html].to_s) end it "does not trim whitespaces from html" do @@ -420,12 +433,24 @@ def mock_request_and_response(mock_chunks = chunks, count: 1) { html: "\t\t\t
Chunk 4: with mixed whitespaces
\n\n\n" } ].map { |chunk| chunk.merge(consoleReplayScript: "") } mock_request_and_response(chunks_with_whitespaces) + initial_result = stream_react_component(component_name, props: props, **component_options) expect(initial_result).to include(chunks_with_whitespaces.first[:html]) - fiber = @rorp_rendering_fibers.first # rubocop:disable RSpec/InstanceVariable - expect(fiber.resume).to include(chunks_with_whitespaces[1][:html]) - expect(fiber.resume).to include(chunks_with_whitespaces[2][:html]) - expect(fiber.resume).to include(chunks_with_whitespaces[3][:html]) + + # Wait for async task to complete + @async_barrier.wait + @main_output_queue.close + + # Collect remaining chunks from queue + collected_chunks = [] + while (chunk = @main_output_queue.dequeue) + collected_chunks << chunk + end + + # Verify whitespaces are preserved in all chunks + expect(collected_chunks[0]).to include(chunks_with_whitespaces[1][:html]) + expect(collected_chunks[1]).to include(chunks_with_whitespaces[2][:html]) + expect(collected_chunks[2]).to include(chunks_with_whitespaces[3][:html]) end end @@ -698,15 +723,25 @@ def run_stream # we need this setup because we can't use the helper outside of stream_view_containing_react_components def render_cached_random_value(cache_key) # Streaming helpers require this context normally provided by stream_view_containing_react_components - @rorp_rendering_fibers = [] + result = nil + Sync do + @async_barrier = Async::Barrier.new + @main_output_queue = Async::Queue.new - result = cached_stream_react_component("RandomValue", cache_key: cache_key, - id: "RandomValue-react-component-0") do - { a: 1, b: 2 } - end + result = cached_stream_react_component("RandomValue", cache_key: cache_key, + id: "RandomValue-react-component-0") do + { a: 1, b: 2 } + end + + # Complete the streaming lifecycle to trigger cache writes + @async_barrier.wait + @main_output_queue.close - # Complete the streaming lifecycle to trigger cache writes - @rorp_rendering_fibers.each { |fiber| fiber.resume while fiber.alive? } # rubocop:disable RSpec/InstanceVariable + # Drain the queue + while @main_output_queue.dequeue + # Just consume all remaining chunks + end + end result end @@ -746,8 +781,15 @@ def render_cached_random_value(cache_key) ] end + around do |example| + Sync do + @async_barrier = Async::Barrier.new + @main_output_queue = Async::Queue.new + example.run + end + end + before do - @rorp_rendering_fibers = [] ReactOnRailsPro::Request.instance_variable_set(:@connection, nil) original_httpx_plugin = HTTPX.method(:plugin) allow(HTTPX).to receive(:plugin) do |*args| @@ -775,3 +817,4 @@ def render_cached_random_value(cache_key) end end end +# rubocop:enable RSpec/InstanceVariable diff --git a/react_on_rails_pro/spec/react_on_rails_pro/stream_spec.rb b/react_on_rails_pro/spec/react_on_rails_pro/stream_spec.rb index a97f69ffde..8ee1eed235 100644 --- a/react_on_rails_pro/spec/react_on_rails_pro/stream_spec.rb +++ b/react_on_rails_pro/spec/react_on_rails_pro/stream_spec.rb @@ -2,6 +2,7 @@ require "async" require "async/queue" +require "async/variable" require_relative "spec_helper" class StreamController @@ -15,13 +16,16 @@ def initialize(component_queues:, initial_response: "TEMPLATE") end def render_to_string(**_opts) - @rorp_rendering_fibers = @component_queues.map do |queue| - Fiber.new do + # Simulate component helpers creating async tasks + # In real implementation, first chunks are part of template HTML + # For testing, we enqueue all chunks including first ones + @component_queues.each do |queue| + @async_barrier.async do loop do chunk = queue.dequeue break if chunk.nil? - Fiber.yield chunk + @main_output_queue.enqueue(chunk) end end end