diff --git a/.gitignore b/.gitignore index edfb0b3f9..f25cb5d2e 100644 --- a/.gitignore +++ b/.gitignore @@ -72,3 +72,5 @@ yalc.lock # File Generated by ROR FS-based Registry **/generated + +.claude/ diff --git a/CHANGELOG.md b/CHANGELOG.md index 4ece4dc3f..33406024a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,11 +16,18 @@ You can find the **package** version numbers from this repo's tags and below in ## [Unreleased] *Add changes in master not yet tagged.* +### Improved +- Significantly improved streaming performance by processing React components concurrently instead of sequentially. This reduces latency and improves responsiveness when using `stream_view_containing_react_components`. + +### Added +- Added `config.concurrent_component_streaming_buffer_size` configuration option to control the memory buffer size for concurrent component streaming (defaults to 64). This allows fine-tuning of memory usage vs. performance for streaming applications. + ### Added - Added `cached_stream_react_component` helper method, similar to `cached_react_component` but for streamed components. ### Changed (Breaking) - `config.prerender_caching`, which controls caching for non-streaming components, now also controls caching for streamed components. To disable caching for an individual render, pass `internal_option(:skip_prerender_cache)`. +- Added `async` gem dependency (>= 2.6) to support concurrent streaming functionality. ## [4.0.0-rc.15] - 2025-08-11 diff --git a/Gemfile.lock b/Gemfile.lock index 471e8be60..6582c9ebe 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -11,6 +11,7 @@ PATH specs: react_on_rails_pro (4.0.0) addressable + async (>= 2.6) connection_pool execjs (~> 2.9) httpx (~> 1.5) @@ -95,6 +96,12 @@ GEM public_suffix (>= 2.0.2, < 7.0) amazing_print (1.6.0) ast (2.4.2) + async (2.27.4) + console (~> 1.29) + fiber-annotation + io-event (~> 1.11) + metrics (~> 0.12) + traces (~> 0.15) base64 (0.2.0) benchmark (0.4.0) bigdecimal (3.1.9) @@ -122,6 +129,10 @@ GEM commonmarker (1.1.4-x86_64-linux) concurrent-ruby (1.3.5) connection_pool (2.5.0) + console (1.33.0) + fiber-annotation + fiber-local (~> 1.1) + json coveralls (0.8.23) json (>= 1.8, < 3) simplecov (~> 0.16.1) @@ -146,6 +157,10 @@ GEM ffi (1.17.0-arm64-darwin) ffi (1.17.0-x86_64-darwin) ffi (1.17.0-x86_64-linux-gnu) + fiber-annotation (0.2.0) + fiber-local (1.1.0) + fiber-storage + fiber-storage (1.0.1) gem-release (2.2.2) generator_spec (0.10.0) activesupport (>= 3.0.0) @@ -161,6 +176,7 @@ GEM i18n (1.14.7) concurrent-ruby (~> 1.0) io-console (0.8.0) + io-event (1.12.1) irb (1.15.1) pp (>= 0.6.0) rdoc (>= 4.0.0) @@ -191,6 +207,7 @@ GEM marcel (1.0.4) matrix (0.4.2) method_source (1.1.0) + metrics (0.14.0) mini_mime (1.1.5) minitest (5.25.4) mize (0.4.1) @@ -402,6 +419,7 @@ GEM tins (1.33.0) bigdecimal sync + traces (0.18.1) turbolinks (5.2.1) turbolinks-source (~> 5.2) turbolinks-source (5.2.0) diff --git a/lib/react_on_rails_pro/concerns/stream.rb b/lib/react_on_rails_pro/concerns/stream.rb index d1cecb14c..db55ba2c1 100644 --- a/lib/react_on_rails_pro/concerns/stream.rb +++ b/lib/react_on_rails_pro/concerns/stream.rb @@ -38,12 +38,66 @@ def stream_view_containing_react_components(template:, close_stream_at_end: true # So we strip extra newlines from the template string and add a single newline response.stream.write(template_string) - @rorp_rendering_fibers.each do |fiber| - while (chunk = fiber.resume) - response.stream.write(chunk) + begin + drain_streams_concurrently + ensure + response.stream.close if close_stream_at_end + end + end + + private + + def drain_streams_concurrently + require "async" + require "async/limited_queue" + + return if @rorp_rendering_fibers.empty? + + Sync do |parent| + # To avoid memory bloat, we use a limited queue to buffer chunks in memory. + buffer_size = ReactOnRailsPro.configuration.concurrent_component_streaming_buffer_size + queue = Async::LimitedQueue.new(buffer_size) + + writer = build_writer_task(parent: parent, queue: queue) + tasks = build_producer_tasks(parent: parent, queue: queue) + + # This structure ensures that even if a producer task fails, we always + # signal the writer to stop and then wait for it to finish draining + # any remaining items from the queue before propagating the error. + begin + tasks.each(&:wait) + ensure + # `close` signals end-of-stream; when writer tries to dequeue, it will get nil, so it will exit. + queue.close + writer.wait + end + end + end + + def build_producer_tasks(parent:, queue:) + @rorp_rendering_fibers.each_with_index.map do |fiber, idx| + parent.async do + loop do + chunk = fiber.resume + break unless chunk + + # Will be blocked if the queue is full until a chunk is dequeued + queue.enqueue([idx, chunk]) + end + end + end + end + + def build_writer_task(parent:, queue:) + parent.async do + loop do + pair = queue.dequeue + break if pair.nil? + + _idx_from_queue, item = pair + response.stream.write(item) end end - response.stream.close if close_stream_at_end end end end diff --git a/lib/react_on_rails_pro/configuration.rb b/lib/react_on_rails_pro/configuration.rb index 3f4b541fe..564b95e1f 100644 --- a/lib/react_on_rails_pro/configuration.rb +++ b/lib/react_on_rails_pro/configuration.rb @@ -53,6 +53,7 @@ class Configuration # rubocop:disable Metrics/ClassLength DEFAULT_RAISE_NON_SHELL_SERVER_RENDERING_ERRORS = false DEFAULT_ENABLE_RSC_SUPPORT = false DEFAULT_RSC_PAYLOAD_GENERATION_URL_PATH = "rsc_payload/" + DEFAULT_CONCURRENT_COMPONENT_STREAMING_BUFFER_SIZE = 64 attr_accessor :renderer_url, :renderer_password, :tracing, :server_renderer, :renderer_use_fallback_exec_js, :prerender_caching, @@ -61,7 +62,7 @@ class Configuration # rubocop:disable Metrics/ClassLength :remote_bundle_cache_adapter, :ssr_pre_hook_js, :assets_to_copy, :renderer_request_retry_limit, :throw_js_errors, :ssr_timeout, :profile_server_rendering_js_code, :raise_non_shell_server_rendering_errors, :enable_rsc_support, - :rsc_payload_generation_url_path + :rsc_payload_generation_url_path, :concurrent_component_streaming_buffer_size def initialize(renderer_url: nil, renderer_password: nil, server_renderer: nil, # rubocop:disable Metrics/AbcSize renderer_use_fallback_exec_js: nil, prerender_caching: nil, @@ -71,7 +72,8 @@ def initialize(renderer_url: nil, renderer_password: nil, server_renderer: nil, remote_bundle_cache_adapter: nil, ssr_pre_hook_js: nil, assets_to_copy: nil, renderer_request_retry_limit: nil, throw_js_errors: nil, ssr_timeout: nil, profile_server_rendering_js_code: nil, raise_non_shell_server_rendering_errors: nil, - enable_rsc_support: nil, rsc_payload_generation_url_path: nil) + enable_rsc_support: nil, rsc_payload_generation_url_path: nil, + concurrent_component_streaming_buffer_size: DEFAULT_CONCURRENT_COMPONENT_STREAMING_BUFFER_SIZE) self.renderer_url = renderer_url self.renderer_password = renderer_password self.server_renderer = server_renderer @@ -94,6 +96,7 @@ def initialize(renderer_url: nil, renderer_password: nil, server_renderer: nil, self.raise_non_shell_server_rendering_errors = raise_non_shell_server_rendering_errors self.enable_rsc_support = enable_rsc_support self.rsc_payload_generation_url_path = rsc_payload_generation_url_path + self.concurrent_component_streaming_buffer_size = concurrent_component_streaming_buffer_size end def setup_config_values @@ -193,6 +196,14 @@ def validate_remote_bundle_cache_adapter end end + def validate_concurrent_component_streaming_buffer_size + return if concurrent_component_streaming_buffer_size.is_a?(Numeric) && + concurrent_component_streaming_buffer_size.positive? + + raise ReactOnRailsPro::Error, + "config.concurrent_component_streaming_buffer_size must be set and must be a positive number" + end + def setup_renderer_password return if renderer_password.present? diff --git a/lib/react_on_rails_pro/utils.rb b/lib/react_on_rails_pro/utils.rb index 58480472e..18175c517 100644 --- a/lib/react_on_rails_pro/utils.rb +++ b/lib/react_on_rails_pro/utils.rb @@ -65,7 +65,7 @@ def self.rsc_bundle_hash @rsc_bundle_hash = calc_bundle_hash(server_rsc_bundle_js_file_path) end - # Returns the hashed file name when using webpacker. Useful for creating cache keys. + # Returns the hashed file name when using Shakapacker. Useful for creating cache keys. def self.bundle_file_name(bundle_name) # bundle_js_uri_from_packer can return a file path or a HTTP URL (for files served from the dev server) # Pathname can handle both cases @@ -74,8 +74,8 @@ def self.bundle_file_name(bundle_name) pathname.basename.to_s end - # Returns the hashed file name of the server bundle when using webpacker. - # Necessary fragment-caching keys. + # Returns the hashed file name of the server bundle when using Shakapacker. + # Necessary for fragment-caching keys. def self.server_bundle_file_name return @server_bundle_hash if @server_bundle_hash && !Rails.env.development? diff --git a/react_on_rails_pro.gemspec b/react_on_rails_pro.gemspec index 03faec243..7bfe40560 100644 --- a/react_on_rails_pro.gemspec +++ b/react_on_rails_pro.gemspec @@ -32,6 +32,7 @@ Gem::Specification.new do |s| s.add_runtime_dependency "connection_pool" s.add_runtime_dependency "execjs", "~> 2.9" s.add_runtime_dependency "httpx", "~> 1.5" + s.add_runtime_dependency "async", ">= 2.6" s.add_runtime_dependency "rainbow" s.add_runtime_dependency "react_on_rails", ">= 16.0.0" s.add_development_dependency "bundler" diff --git a/spec/dummy/spec/helpers/react_on_rails_pro_helper_spec.rb b/spec/dummy/spec/helpers/react_on_rails_pro_helper_spec.rb index 1caad9f14..af0711e5a 100644 --- a/spec/dummy/spec/helpers/react_on_rails_pro_helper_spec.rb +++ b/spec/dummy/spec/helpers/react_on_rails_pro_helper_spec.rb @@ -1,5 +1,7 @@ # frozen_string_literal: true +require "async" +require "async/queue" require "rails_helper" require "support/script_tag_utils" @@ -327,6 +329,7 @@ def response; end HTML end + # mock_chunks can be an Async::Queue or an Array def mock_request_and_response(mock_chunks = chunks, count: 1) # Reset connection instance variables to ensure clean state for tests ReactOnRailsPro::Request.instance_variable_set(:@connection, nil) @@ -339,9 +342,19 @@ def mock_request_and_response(mock_chunks = chunks, count: 1) chunks_read.clear mock_streaming_response(%r{http://localhost:3800/bundles/[a-f0-9]{32}-test/render/[a-f0-9]{32}}, 200, count: count) do |yielder| - mock_chunks.each do |chunk| - chunks_read << chunk - yielder.call("#{chunk.to_json}\n") + if mock_chunks.is_a?(Async::Queue) + loop do + chunk = mock_chunks.dequeue + break if chunk.nil? + + chunks_read << chunk + yielder.call("#{chunk.to_json}\n") + end + else + mock_chunks.each do |chunk| + chunks_read << chunk + yielder.call("#{chunk.to_json}\n") + end end end end @@ -428,18 +441,35 @@ def mock_request_and_response(mock_chunks = chunks, count: 1) allow(mocked_stream).to receive(:write) do |chunk| written_chunks << chunk - # Ensures that any chunk received is written immediately to the stream - expect(written_chunks.count).to eq(chunks_read.count) # rubocop:disable RSpec/ExpectInHook end allow(mocked_stream).to receive(:close) mocked_response = instance_double(ActionDispatch::Response) allow(mocked_response).to receive(:stream).and_return(mocked_stream) allow(self).to receive(:response).and_return(mocked_response) - mock_request_and_response + end + + def execute_stream_view_containing_react_components + queue = Async::Queue.new + mock_request_and_response(queue) + + Sync do |parent| + parent.async { stream_view_containing_react_components(template: template_path) } + + chunks_to_write = chunks.dup + while (chunk = chunks_to_write.shift) + queue.enqueue(chunk) + sleep 0.05 + + # Ensures that any chunk received is written immediately to the stream + expect(written_chunks.count).to eq(chunks_read.count) + end + queue.close + sleep 0.05 + end end it "writes the chunk to stream as soon as it is received" do - stream_view_containing_react_components(template: template_path) + execute_stream_view_containing_react_components expect(self).to have_received(:render_to_string).once.with(template: template_path) expect(chunks_read.count).to eq(chunks.count) expect(written_chunks.count).to eq(chunks.count) @@ -448,7 +478,7 @@ def mock_request_and_response(mock_chunks = chunks, count: 1) end it "prepends the rails context to the first chunk only" do - stream_view_containing_react_components(template: template_path) + execute_stream_view_containing_react_components initial_result = written_chunks.first expect(initial_result).to script_tag_be_included(rails_context_tag) @@ -464,7 +494,7 @@ def mock_request_and_response(mock_chunks = chunks, count: 1) end it "prepends the component specification tag to the first chunk only" do - stream_view_containing_react_components(template: template_path) + execute_stream_view_containing_react_components initial_result = written_chunks.first expect(initial_result).to script_tag_be_included(react_component_specification_tag) @@ -475,7 +505,7 @@ def mock_request_and_response(mock_chunks = chunks, count: 1) end it "renders the rails view content in the first chunk" do - stream_view_containing_react_components(template: template_path) + execute_stream_view_containing_react_components initial_result = written_chunks.first expect(initial_result).to include("

Header Rendered In View

") written_chunks[1..].each do |chunk| diff --git a/spec/execjs-compatible-dummy/package.json b/spec/execjs-compatible-dummy/package.json index 24b71d621..1affc7435 100644 --- a/spec/execjs-compatible-dummy/package.json +++ b/spec/execjs-compatible-dummy/package.json @@ -29,7 +29,7 @@ "prop-types": "^15.8.1", "react": "^18.3.1", "react-dom": "^18.3.1", - "react-on-rails": "15.0.0", + "react-on-rails": "16.0.1-rc.4", "shakapacker": "8.0.0", "style-loader": "^4.0.0", "terser-webpack-plugin": "5", diff --git a/spec/execjs-compatible-dummy/yarn.lock b/spec/execjs-compatible-dummy/yarn.lock index 12fb9d3cb..065077e3f 100644 --- a/spec/execjs-compatible-dummy/yarn.lock +++ b/spec/execjs-compatible-dummy/yarn.lock @@ -3365,10 +3365,10 @@ react-is@^16.13.1: resolved "https://registry.yarnpkg.com/react-is/-/react-is-16.13.1.tgz#789729a4dc36de2999dc156dd6c1d9c18cea56a4" integrity sha512-24e6ynE2H+OKt4kqsOvNd8kBpV65zoxbA4BVsEOB3ARVWQki/DHzaUoC5KuON/BiccDaCCTZBuOcfZs70kR8bQ== -react-on-rails@15.0.0: - version "15.0.0" - resolved "https://registry.yarnpkg.com/react-on-rails/-/react-on-rails-15.0.0.tgz#8edc78670129394cd92293842ae88c62d4cb030b" - integrity sha512-Uht8HWm8ZVN8OeDz03b3YLtprzHXhFTw7hb0jl8IpOjbig91DBuurBVxr4tfuymuemGwdjhQpscho33Cnw9Atg== +react-on-rails@16.0.1-rc.4: + version "16.0.1-rc.4" + resolved "https://registry.yarnpkg.com/react-on-rails/-/react-on-rails-16.0.1-rc.4.tgz#c4ed21579dbf8d4602f1fbff26b1ada581f2188c" + integrity sha512-V6unOWd/PvcbEcgtsjf/i10wf2BgZ0NBJ9Dq5jWXm9OtlzRGvIqMXsK05YkyJ1auPlaExxxJezNPC+BQKLRhEg== react-refresh@^0.14.2: version "0.14.2" diff --git a/spec/react_on_rails_pro/stream_spec.rb b/spec/react_on_rails_pro/stream_spec.rb index 86afdc65d..a97f69ffd 100644 --- a/spec/react_on_rails_pro/stream_spec.rb +++ b/spec/react_on_rails_pro/stream_spec.rb @@ -1,7 +1,35 @@ # frozen_string_literal: true +require "async" +require "async/queue" require_relative "spec_helper" +class StreamController + include ReactOnRailsPro::Stream + + attr_reader :response + + def initialize(component_queues:, initial_response: "TEMPLATE") + @component_queues = component_queues + @initial_response = initial_response + end + + def render_to_string(**_opts) + @rorp_rendering_fibers = @component_queues.map do |queue| + Fiber.new do + loop do + chunk = queue.dequeue + break if chunk.nil? + + Fiber.yield chunk + end + end + end + + @initial_response + end +end + RSpec.describe "Streaming API" do let(:origin) { "http://api.example.com" } let(:path) { "/stream" } @@ -342,4 +370,120 @@ expect(mocked_block).to have_received(:call).with("First chunk") end end + + describe "Component streaming concurrency" do + def run_stream(controller, template: "ignored") + Sync do |parent| + parent.async { controller.stream_view_containing_react_components(template: template) } + yield(parent) + end + end + + def setup_stream_test(component_count: 2) + component_queues = Array.new(component_count) { Async::Queue.new } + controller = StreamController.new(component_queues: component_queues) + + mocked_response = instance_double(ActionController::Live::Response) + mocked_stream = instance_double(ActionController::Live::Buffer) + allow(mocked_response).to receive(:stream).and_return(mocked_stream) + allow(mocked_stream).to receive(:write) + allow(mocked_stream).to receive(:close) + allow(controller).to receive(:response).and_return(mocked_response) + + [component_queues, controller, mocked_stream] + end + + it "streams components concurrently" do + queues, controller, stream = setup_stream_test + + run_stream(controller) do |_parent| + queues[1].enqueue("B1") + sleep 0.05 + expect(stream).to have_received(:write).with("B1") + + queues[0].enqueue("A1") + sleep 0.05 + expect(stream).to have_received(:write).with("A1") + + queues[1].enqueue("B2") + queues[1].close + sleep 0.05 + + queues[0].enqueue("A2") + queues[0].close + sleep 0.1 + end + end + + it "maintains per-component ordering" do + queues, controller, stream = setup_stream_test + + run_stream(controller) do |_parent| + queues[0].enqueue("X1") + queues[0].enqueue("X2") + queues[0].enqueue("X3") + queues[0].close + + queues[1].enqueue("Y1") + queues[1].enqueue("Y2") + queues[1].close + + sleep 0.2 + end + + # Verify all chunks were written + expect(stream).to have_received(:write).with("X1") + expect(stream).to have_received(:write).with("X2") + expect(stream).to have_received(:write).with("X3") + expect(stream).to have_received(:write).with("Y1") + expect(stream).to have_received(:write).with("Y2") + end + + it "handles empty component list" do + _queues, controller, stream = setup_stream_test(component_count: 0) + + run_stream(controller) do |_parent| + sleep 0.1 + end + + expect(stream).to have_received(:write).with("TEMPLATE") + expect(stream).to have_received(:close) + end + + it "handles single component" do + queues, controller, stream = setup_stream_test(component_count: 1) + + run_stream(controller) do |_parent| + queues[0].enqueue("Single1") + queues[0].enqueue("Single2") + queues[0].close + + sleep 0.1 + end + + expect(stream).to have_received(:write).with("Single1") + expect(stream).to have_received(:write).with("Single2") + end + + it "applies backpressure with slow writer" do + queues, controller, stream = setup_stream_test(component_count: 1) + + write_timestamps = [] + allow(stream).to receive(:write) do |_data| + write_timestamps << Process.clock_gettime(Process::CLOCK_MONOTONIC) + sleep 0.05 + end + + run_stream(controller) do |_parent| + 5.times { |i| queues[0].enqueue("Chunk#{i}") } + queues[0].close + + sleep 1 + end + + expect(write_timestamps.length).to be >= 2 + gaps = write_timestamps.each_cons(2).map { |a, b| b - a } + expect(gaps.all? { |gap| gap >= 0.04 }).to be true + end + end end