Skip to content

Commit 449cc7b

Browse files
Replace manual Fiber management with async gem primitives (#2111)
## Problem We need to run async gem code inside the `each_chunk` function for the upcoming React on Rails async props implementation. However, this raises the error: ``` Running scheduler on non-blocking fiber ``` Eliminates manual Fiber.new/yield in streaming architecture, replacing with Async::Barrier, Async::Queue, and Async::Variable. This enables async gem code to run inside component streaming without "Running scheduler on non-blocking fiber" errors. Key changes: - Wrapped streaming in Sync blocks for async context - Replaced @rorp_rendering_fibers with Async::Barrier - Replaced fiber-based queuing with Async::LimitedQueue - Implemented concurrent client stream writing (non-blocking) - Added on_complete callback pattern for cache writes - Updated all tests to use async primitives
1 parent 79e334c commit 449cc7b

File tree

4 files changed

+181
-125
lines changed

4 files changed

+181
-125
lines changed

react_on_rails_pro/app/helpers/react_on_rails_pro_helper.rb

Lines changed: 59 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,11 @@ def stream_react_component(component_name, options = {})
128128
# Because setting prerender to false is equivalent to calling react_component with prerender: false
129129
options[:prerender] = true
130130
options = options.merge(immediate_hydration: true) unless options.key?(:immediate_hydration)
131-
run_stream_inside_fiber do
131+
132+
# Extract streaming-specific callback
133+
on_complete = options.delete(:on_complete)
134+
135+
consumer_stream_async(on_complete: on_complete) do
132136
internal_stream_react_component(component_name, options)
133137
end
134138
end
@@ -185,7 +189,11 @@ def rsc_payload_react_component(component_name, options = {})
185189
# rsc_payload_react_component doesn't have the prerender option
186190
# Because setting prerender to false will not do anything
187191
options[:prerender] = true
188-
run_stream_inside_fiber do
192+
193+
# Extract streaming-specific callback
194+
on_complete = options.delete(:on_complete)
195+
196+
consumer_stream_async(on_complete: on_complete) do
189197
internal_rsc_payload_react_component(component_name, options)
190198
end
191199
end
@@ -246,30 +254,29 @@ def handle_stream_cache_hit(component_name, raw_options, auto_load_bundle, cache
246254
load_pack_for_generated_component(component_name, render_options)
247255

248256
initial_result, *rest_chunks = cached_chunks
249-
hit_fiber = Fiber.new do
250-
rest_chunks.each { |chunk| Fiber.yield(chunk) }
251-
nil
257+
258+
# Enqueue remaining chunks asynchronously
259+
@async_barrier.async do
260+
rest_chunks.each { |chunk| @main_output_queue.enqueue(chunk) }
252261
end
253-
@rorp_rendering_fibers << hit_fiber
262+
263+
# Return first chunk directly
254264
initial_result
255265
end
256266

257267
def handle_stream_cache_miss(component_name, raw_options, auto_load_bundle, view_cache_key, &block)
258-
# Kick off the normal streaming helper to get the initial result and the original fiber
259-
initial_result = render_stream_component_with_props(component_name, raw_options, auto_load_bundle, &block)
260-
original_fiber = @rorp_rendering_fibers.pop
261-
262-
buffered_chunks = [initial_result]
263-
wrapper_fiber = Fiber.new do
264-
while (chunk = original_fiber.resume)
265-
buffered_chunks << chunk
266-
Fiber.yield(chunk)
267-
end
268-
Rails.cache.write(view_cache_key, buffered_chunks, raw_options[:cache_options] || {})
269-
nil
270-
end
271-
@rorp_rendering_fibers << wrapper_fiber
272-
initial_result
268+
cache_aware_options = raw_options.merge(
269+
on_complete: lambda { |chunks|
270+
Rails.cache.write(view_cache_key, chunks, raw_options[:cache_options] || {})
271+
}
272+
)
273+
274+
render_stream_component_with_props(
275+
component_name,
276+
cache_aware_options,
277+
auto_load_bundle,
278+
&block
279+
)
273280
end
274281

275282
def render_stream_component_with_props(component_name, raw_options, auto_load_bundle)
@@ -291,25 +298,46 @@ def check_caching_options!(raw_options, block)
291298
raise ReactOnRailsPro::Error, "Option 'cache_key' is required for React on Rails caching"
292299
end
293300

294-
def run_stream_inside_fiber
295-
if @rorp_rendering_fibers.nil?
301+
def consumer_stream_async(on_complete:)
302+
require "async/variable"
303+
304+
if @async_barrier.nil?
296305
raise ReactOnRails::Error,
297306
"You must call stream_view_containing_react_components to render the view containing the react component"
298307
end
299308

300-
rendering_fiber = Fiber.new do
309+
# Create a variable to hold the first chunk for synchronous return
310+
first_chunk_var = Async::Variable.new
311+
all_chunks = [] if on_complete # Only collect if callback provided
312+
313+
# Start an async task on the barrier to stream all chunks
314+
@async_barrier.async do
301315
stream = yield
316+
is_first = true
317+
302318
stream.each_chunk do |chunk|
303-
Fiber.yield chunk
319+
all_chunks << chunk if on_complete # Collect for callback
320+
321+
if is_first
322+
# Store first chunk in variable for synchronous access
323+
first_chunk_var.value = chunk
324+
is_first = false
325+
else
326+
# Enqueue remaining chunks to main output queue
327+
@main_output_queue.enqueue(chunk)
328+
end
304329
end
305-
end
306330

307-
@rorp_rendering_fibers << rendering_fiber
331+
# Handle case where stream has no chunks
332+
first_chunk_var.value = nil if is_first
333+
334+
# Call callback with all chunks when streaming completes
335+
on_complete&.call(all_chunks)
336+
end
308337

309-
# return the first chunk of the fiber
310-
# It contains the initial html of the component
311-
# all updates will be appended to the stream sent to browser
312-
rendering_fiber.resume
338+
# Wait for and return the first chunk (blocking)
339+
first_chunk_var.wait
340+
first_chunk_var.value
313341
end
314342

315343
def internal_stream_react_component(component_name, options = {})

react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb

Lines changed: 32 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -31,73 +31,54 @@ module Stream
3131
#
3232
# @see ReactOnRails::Helper#stream_react_component
3333
def stream_view_containing_react_components(template:, close_stream_at_end: true, **render_options)
34-
@rorp_rendering_fibers = []
35-
template_string = render_to_string(template: template, **render_options)
36-
# View may contain extra newlines, chunk already contains a newline
37-
# Having multiple newlines between chunks causes hydration errors
38-
# So we strip extra newlines from the template string and add a single newline
39-
response.stream.write(template_string)
40-
41-
begin
42-
drain_streams_concurrently
43-
ensure
44-
response.stream.close if close_stream_at_end
45-
end
46-
end
47-
48-
private
49-
50-
def drain_streams_concurrently
5134
require "async"
35+
require "async/barrier"
5236
require "async/limited_queue"
5337

54-
return if @rorp_rendering_fibers.empty?
55-
56-
Sync do |parent|
57-
# To avoid memory bloat, we use a limited queue to buffer chunks in memory.
38+
Sync do |parent_task|
39+
# Initialize async primitives for concurrent component streaming
40+
@async_barrier = Async::Barrier.new
5841
buffer_size = ReactOnRailsPro.configuration.concurrent_component_streaming_buffer_size
59-
queue = Async::LimitedQueue.new(buffer_size)
42+
@main_output_queue = Async::LimitedQueue.new(buffer_size)
6043

61-
writer = build_writer_task(parent: parent, queue: queue)
62-
tasks = build_producer_tasks(parent: parent, queue: queue)
44+
# Render template - components will start streaming immediately
45+
template_string = render_to_string(template: template, **render_options)
46+
# View may contain extra newlines, chunk already contains a newline
47+
# Having multiple newlines between chunks causes hydration errors
48+
# So we strip extra newlines from the template string and add a single newline
49+
response.stream.write(template_string)
6350

64-
# This structure ensures that even if a producer task fails, we always
65-
# signal the writer to stop and then wait for it to finish draining
66-
# any remaining items from the queue before propagating the error.
6751
begin
68-
tasks.each(&:wait)
69-
ensure
70-
# `close` signals end-of-stream; when writer tries to dequeue, it will get nil, so it will exit.
71-
queue.close
72-
writer.wait
52+
drain_streams_concurrently(parent_task)
53+
# Do not close the response stream in an ensure block.
54+
# If an error occurs we may need the stream open to send diagnostic/error details
55+
# (for example, ApplicationController#rescue_from in the dummy app).
56+
response.stream.close if close_stream_at_end
7357
end
7458
end
7559
end
7660

77-
def build_producer_tasks(parent:, queue:)
78-
@rorp_rendering_fibers.each_with_index.map do |fiber, idx|
79-
parent.async do
80-
loop do
81-
chunk = fiber.resume
82-
break unless chunk
61+
private
8362

84-
# Will be blocked if the queue is full until a chunk is dequeued
85-
queue.enqueue([idx, chunk])
86-
end
63+
def drain_streams_concurrently(parent_task)
64+
writing_task = parent_task.async do
65+
# Drain all remaining chunks from the queue to the response stream
66+
while (chunk = @main_output_queue.dequeue)
67+
response.stream.write(chunk)
8768
end
8869
end
89-
end
9070

91-
def build_writer_task(parent:, queue:)
92-
parent.async do
93-
loop do
94-
pair = queue.dequeue
95-
break if pair.nil?
96-
97-
_idx_from_queue, item = pair
98-
response.stream.write(item)
99-
end
71+
# Wait for all component streaming tasks to complete
72+
begin
73+
@async_barrier.wait
74+
rescue StandardError => e
75+
@async_barrier.stop
76+
raise e
10077
end
78+
ensure
79+
# Close the queue to signal end of streaming
80+
@main_output_queue.close
81+
writing_task.wait
10182
end
10283
end
10384
end

0 commit comments

Comments
 (0)