Skip to content

Commit 5f80e6b

Browse files
committed
remove the concurrent_stream_drain config flag and always stream components concurrently
1 parent 5d3c03b commit 5f80e6b

File tree

3 files changed

+37
-107
lines changed

3 files changed

+37
-107
lines changed

lib/react_on_rails_pro/concerns/stream.rb

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,7 @@ def stream_view_containing_react_components(template:, close_stream_at_end: true
3939
response.stream.write(template_string)
4040

4141
begin
42-
if ReactOnRailsPro.configuration.concurrent_stream_drain
43-
drain_streams_concurrently
44-
else
45-
drain_streams_sequentially
46-
end
42+
drain_streams_concurrently
4743
ensure
4844
response.stream.close if close_stream_at_end
4945
end
@@ -122,22 +118,6 @@ def build_writer_task(parent:, queue:, semaphore:)
122118
end
123119
end
124120

125-
def drain_streams_sequentially
126-
@rorp_rendering_fibers.each_with_index do |fiber, idx|
127-
loop do
128-
begin
129-
chunk = fiber.resume
130-
rescue FiberError
131-
break
132-
end
133-
break unless chunk
134-
135-
log_stream_write(mode: :sequential, idx: idx, bytesize: safe_bytesize(chunk))
136-
response.stream.write(chunk)
137-
end
138-
end
139-
end
140-
141121
def log_stream_write(mode:, idx:, bytesize:)
142122
return unless ReactOnRailsPro.configuration.tracing
143123

lib/react_on_rails_pro/configuration.rb

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ class Configuration # rubocop:disable Metrics/ClassLength
5353
DEFAULT_RAISE_NON_SHELL_SERVER_RENDERING_ERRORS = false
5454
DEFAULT_ENABLE_RSC_SUPPORT = false
5555
DEFAULT_RSC_PAYLOAD_GENERATION_URL_PATH = "rsc_payload/"
56-
DEFAULT_CONCURRENT_STREAM_DRAIN = false
5756
DEFAULT_CONCURRENT_STREAM_QUEUE_CAPACITY = 64
5857

5958
attr_accessor :renderer_url, :renderer_password, :tracing,
@@ -63,7 +62,7 @@ class Configuration # rubocop:disable Metrics/ClassLength
6362
:remote_bundle_cache_adapter, :ssr_pre_hook_js, :assets_to_copy,
6463
:renderer_request_retry_limit, :throw_js_errors, :ssr_timeout,
6564
:profile_server_rendering_js_code, :raise_non_shell_server_rendering_errors, :enable_rsc_support,
66-
:rsc_payload_generation_url_path, :concurrent_stream_drain, :concurrent_stream_queue_capacity
65+
:rsc_payload_generation_url_path, :concurrent_stream_queue_capacity
6766

6867
def initialize(renderer_url: nil, renderer_password: nil, server_renderer: nil, # rubocop:disable Metrics/AbcSize
6968
renderer_use_fallback_exec_js: nil, prerender_caching: nil,
@@ -74,7 +73,6 @@ def initialize(renderer_url: nil, renderer_password: nil, server_renderer: nil,
7473
renderer_request_retry_limit: nil, throw_js_errors: nil, ssr_timeout: nil,
7574
profile_server_rendering_js_code: nil, raise_non_shell_server_rendering_errors: nil,
7675
enable_rsc_support: nil, rsc_payload_generation_url_path: nil,
77-
concurrent_stream_drain: DEFAULT_CONCURRENT_STREAM_DRAIN,
7876
concurrent_stream_queue_capacity: DEFAULT_CONCURRENT_STREAM_QUEUE_CAPACITY)
7977
self.renderer_url = renderer_url
8078
self.renderer_password = renderer_password
@@ -98,7 +96,6 @@ def initialize(renderer_url: nil, renderer_password: nil, server_renderer: nil,
9896
self.raise_non_shell_server_rendering_errors = raise_non_shell_server_rendering_errors
9997
self.enable_rsc_support = enable_rsc_support
10098
self.rsc_payload_generation_url_path = rsc_payload_generation_url_path
101-
self.concurrent_stream_drain = concurrent_stream_drain
10299
self.concurrent_stream_queue_capacity = concurrent_stream_queue_capacity
103100
end
104101

spec/react_on_rails_pro/stream_spec.rb

Lines changed: 35 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -372,14 +372,6 @@ def render_to_string(**_opts)
372372
end
373373

374374
describe "Component streaming concurrency" do
375-
def with_concurrent_stream_drain(enabled)
376-
original = ReactOnRailsPro.configuration.concurrent_stream_drain
377-
ReactOnRailsPro.configuration.concurrent_stream_drain = enabled
378-
yield
379-
ensure
380-
ReactOnRailsPro.configuration.concurrent_stream_drain = original
381-
end
382-
383375
def run_stream(controller, template: "ignored")
384376
Sync do |parent|
385377
parent.async { controller.stream_view_containing_react_components(template: template) }
@@ -401,75 +393,42 @@ def setup_stream_test(component_count: 2)
401393
[component_queues, controller, mocked_stream]
402394
end
403395

404-
it "processes components concurrently vs sequentially" do
405-
seq_queues, seq_controller, seq_stream = setup_stream_test
406-
407-
with_concurrent_stream_drain(false) do
408-
run_stream(seq_controller) do |_parent|
409-
seq_queues[0].enqueue("A1")
410-
seq_queues[0].enqueue("A2")
411-
seq_queues[0].close
412-
413-
seq_queues[1].enqueue("B1")
414-
seq_queues[1].enqueue("B2")
415-
seq_queues[1].close
416-
417-
sleep 0.1
418-
end
419-
end
420-
421-
# Verify sequential behavior: all A chunks processed before B chunks start
422-
expect(seq_stream).to have_received(:write).with("TEMPLATE")
423-
expect(seq_stream).to have_received(:write).with("A1")
424-
expect(seq_stream).to have_received(:write).with("A2")
425-
expect(seq_stream).to have_received(:write).with("B1")
426-
expect(seq_stream).to have_received(:write).with("B2")
427-
428-
conc_queues, conc_controller, conc_stream = setup_stream_test
396+
it "streams components concurrently" do
397+
queues, controller, stream = setup_stream_test
429398

430-
with_concurrent_stream_drain(true) do
431-
run_stream(conc_controller) do |_parent|
432-
conc_queues[1].enqueue("B1")
433-
sleep 0.05
434-
expect(conc_stream).to have_received(:write).with("B1")
399+
run_stream(controller) do |_parent|
400+
queues[1].enqueue("B1")
401+
sleep 0.05
402+
expect(stream).to have_received(:write).with("B1")
435403

436-
conc_queues[0].enqueue("A1")
437-
sleep 0.05
438-
expect(conc_stream).to have_received(:write).with("A1")
404+
queues[0].enqueue("A1")
405+
sleep 0.05
406+
expect(stream).to have_received(:write).with("A1")
439407

440-
conc_queues[1].enqueue("B2")
441-
conc_queues[1].close
442-
sleep 0.05
408+
queues[1].enqueue("B2")
409+
queues[1].close
410+
sleep 0.05
443411

444-
conc_queues[0].enqueue("A2")
445-
conc_queues[0].close
446-
sleep 0.1
447-
end
412+
queues[0].enqueue("A2")
413+
queues[0].close
414+
sleep 0.1
448415
end
449-
450-
# Verify concurrent behavior: components can process interleaved
451-
expect(conc_stream).to have_received(:write).with("B1")
452-
expect(conc_stream).to have_received(:write).with("A1")
453-
expect(conc_stream).to have_received(:write).with("B2")
454-
expect(conc_stream).to have_received(:write).with("A2")
455416
end
456417

457418
it "maintains per-component ordering" do
458419
queues, controller, stream = setup_stream_test
459420

460-
with_concurrent_stream_drain(true) do
461-
run_stream(controller) do |_parent|
462-
queues[0].enqueue("X1")
463-
queues[0].enqueue("X2")
464-
queues[0].enqueue("X3")
465-
queues[0].close
421+
run_stream(controller) do |_parent|
422+
queues[0].enqueue("X1")
423+
queues[0].enqueue("X2")
424+
queues[0].enqueue("X3")
425+
queues[0].close
466426

467-
queues[1].enqueue("Y1")
468-
queues[1].enqueue("Y2")
469-
queues[1].close
427+
queues[1].enqueue("Y1")
428+
queues[1].enqueue("Y2")
429+
queues[1].close
470430

471-
sleep 0.2
472-
end
431+
sleep 0.2
473432
end
474433

475434
# Verify all chunks were written
@@ -483,10 +442,8 @@ def setup_stream_test(component_count: 2)
483442
it "handles empty component list" do
484443
queues, controller, stream = setup_stream_test(component_count: 0)
485444

486-
with_concurrent_stream_drain(true) do
487-
run_stream(controller) do |_parent|
488-
sleep 0.1
489-
end
445+
run_stream(controller) do |_parent|
446+
sleep 0.1
490447
end
491448

492449
expect(stream).to have_received(:write).with("TEMPLATE")
@@ -496,14 +453,12 @@ def setup_stream_test(component_count: 2)
496453
it "handles single component" do
497454
queues, controller, stream = setup_stream_test(component_count: 1)
498455

499-
with_concurrent_stream_drain(true) do
500-
run_stream(controller) do |_parent|
501-
queues[0].enqueue("Single1")
502-
queues[0].enqueue("Single2")
503-
queues[0].close
456+
run_stream(controller) do |_parent|
457+
queues[0].enqueue("Single1")
458+
queues[0].enqueue("Single2")
459+
queues[0].close
504460

505-
sleep 0.1
506-
end
461+
sleep 0.1
507462
end
508463

509464
expect(stream).to have_received(:write).with("Single1")
@@ -519,13 +474,11 @@ def setup_stream_test(component_count: 2)
519474
sleep 0.05
520475
end
521476

522-
with_concurrent_stream_drain(true) do
523-
run_stream(controller) do |_parent|
524-
5.times { |i| queues[0].enqueue("Chunk#{i}") }
525-
queues[0].close
477+
run_stream(controller) do |_parent|
478+
5.times { |i| queues[0].enqueue("Chunk#{i}") }
479+
queues[0].close
526480

527-
sleep 1
528-
end
481+
sleep 1
529482
end
530483

531484
expect(write_timestamps.length).to be >= 2

0 commit comments

Comments
 (0)