-
-
Notifications
You must be signed in to change notification settings - Fork 638
Replace manual Fibers with Async gem primitives for component streaming #2111
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 4 commits
c4d570f
d9904d3
62abc5a
effc7a8
6c48628
204561c
d6c04d2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -31,73 +31,47 @@ module Stream | |
| # | ||
| # @see ReactOnRails::Helper#stream_react_component | ||
| def stream_view_containing_react_components(template:, close_stream_at_end: true, **render_options) | ||
| @rorp_rendering_fibers = [] | ||
| template_string = render_to_string(template: template, **render_options) | ||
| # View may contain extra newlines, chunk already contains a newline | ||
| # Having multiple newlines between chunks causes hydration errors | ||
| # So we strip extra newlines from the template string and add a single newline | ||
| response.stream.write(template_string) | ||
|
|
||
| begin | ||
| drain_streams_concurrently | ||
| ensure | ||
| response.stream.close if close_stream_at_end | ||
| end | ||
| end | ||
|
|
||
| private | ||
|
|
||
| def drain_streams_concurrently | ||
| require "async" | ||
| require "async/barrier" | ||
| require "async/limited_queue" | ||
|
|
||
| return if @rorp_rendering_fibers.empty? | ||
|
|
||
| Sync do |parent| | ||
| # To avoid memory bloat, we use a limited queue to buffer chunks in memory. | ||
| Sync do |parent_task| | ||
| # Initialize async primitives for concurrent component streaming | ||
| @async_barrier = Async::Barrier.new | ||
| buffer_size = ReactOnRailsPro.configuration.concurrent_component_streaming_buffer_size | ||
| queue = Async::LimitedQueue.new(buffer_size) | ||
| @main_output_queue = Async::LimitedQueue.new(buffer_size) | ||
|
|
||
| writer = build_writer_task(parent: parent, queue: queue) | ||
| tasks = build_producer_tasks(parent: parent, queue: queue) | ||
| # Render template - components will start streaming immediately | ||
| template_string = render_to_string(template: template, **render_options) | ||
| # View may contain extra newlines, chunk already contains a newline | ||
| # Having multiple newlines between chunks causes hydration errors | ||
| # So we strip extra newlines from the template string and add a single newline | ||
| response.stream.write(template_string) | ||
|
|
||
| # This structure ensures that even if a producer task fails, we always | ||
| # signal the writer to stop and then wait for it to finish draining | ||
| # any remaining items from the queue before propagating the error. | ||
| begin | ||
| tasks.each(&:wait) | ||
| drain_streams_concurrently(parent_task) | ||
| ensure | ||
| # `close` signals end-of-stream; when writer tries to dequeue, it will get nil, so it will exit. | ||
| queue.close | ||
| writer.wait | ||
| response.stream.close if close_stream_at_end | ||
| end | ||
| end | ||
| end | ||
|
|
||
| def build_producer_tasks(parent:, queue:) | ||
| @rorp_rendering_fibers.each_with_index.map do |fiber, idx| | ||
| parent.async do | ||
| loop do | ||
| chunk = fiber.resume | ||
| break unless chunk | ||
| private | ||
|
|
||
| # Will be blocked if the queue is full until a chunk is dequeued | ||
| queue.enqueue([idx, chunk]) | ||
| end | ||
| def drain_streams_concurrently(parent_task) | ||
| writing_task = parent_task.async do | ||
| # Drain all remaining chunks from the queue to the response stream | ||
| while (chunk = @main_output_queue.dequeue) | ||
| response.stream.write(chunk) | ||
| end | ||
| end | ||
| end | ||
|
|
||
| def build_writer_task(parent:, queue:) | ||
| parent.async do | ||
| loop do | ||
| pair = queue.dequeue | ||
| break if pair.nil? | ||
| # Wait for all component streaming tasks to complete | ||
| @async_barrier.wait | ||
|
|
||
| _idx_from_queue, item = pair | ||
| response.stream.write(item) | ||
| end | ||
| end | ||
| # Close the queue to signal end of streaming | ||
| @main_output_queue.close | ||
| writing_task.wait | ||
| end | ||
|
Comment on lines
+61
to
75
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chainEnsure queues and writer task are cleaned up even if a component task fails
Depending on how Consider wrapping the wait/close sequence so the queue is always closed and the writer task joined, even when def drain_streams_concurrently(parent_task)
writing_task = parent_task.async do
while (chunk = @main_output_queue.dequeue)
response.stream.write(chunk)
end
end
begin
@async_barrier.wait
ensure
@main_output_queue.close
writing_task.wait
end
endThis keeps the steady-state behavior but guarantees termination of the writer loop in error paths. To validate this, you can inject a failure into one component task (e.g., raise inside I'll verify the review comment by examining the actual implementation and understanding the async behavior.Good! I found critical information. According to the documentation, [/run_scripts] 🏁 Script executed: #!/bin/bash
# Find the file and verify it exists
find . -name "stream.rb" -path "*/react_on_rails_pro/*" 2>/dev/null | head -5Length of output: 62 [/run_scripts] 🏁 Script executed: #!/bin/bash
# View the exact method and surrounding context
sed -n '50,85p' ./react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rbLength of output: 666 🏁 Script executed: #!/bin/bash
# Search for error handling or rescue blocks in this file
grep -n -B2 -A2 "rescue\|ensure\|raise" ./react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rbLength of output: 160 🏁 Script executed: #!/bin/bash
# Search for tests related to drain_streams_concurrently or error handling
find . -name "*.rb" -path "*/test/*" -o -path "*/spec/*" | xargs grep -l "drain_streams_concurrently\|stream.*fail\|async.*barrier" 2>/dev/nullLength of output: 59 [/run_scripts] 🏁 Script executed: #!/bin/bash
# Examine the test file to see if error scenarios are covered
cat ./react_on_rails_pro/spec/react_on_rails_pro/stream_spec.rbLength of output: 15492 🏁 Script executed: #!/bin/bash
# Check if there are any other usages or dependencies on @async_barrier or @main_output_queue
grep -n "@async_barrier\|@main_output_queue" ./react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rbLength of output: 232 Ensure queues and writer task are cleaned up even if a component task fails The concern is valid. If The proposed fix is correct and necessary: def drain_streams_concurrently(parent_task)
writing_task = parent_task.async do
while (chunk = @main_output_queue.dequeue)
response.stream.write(chunk)
end
end
begin
@async_barrier.wait
ensure
@main_output_queue.close
writing_task.wait
end
endThis ensures termination of the writer loop and proper task cleanup even in error paths. 🤖 Prompt for AI Agents |
||
| end | ||
| end | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason to do it only here and not at the beginning of the file?