Skip to content

Commit 6f94207

Browse files
Add incremental rendering support with bidirectional streaming
Step 3: Implement render_code_with_incremental_updates and supporting infrastructure This commit adds support for incremental rendering using HTTP2 bidirectional streaming, enabling progressive resolution of async props during server-side rendering. ## Key Changes: **AsyncPropsEmitter** (lib/react_on_rails_pro/async_props_emitter.rb): - Created emitter class for sending async prop updates to Node renderer - Implements `call(prop_name, prop_value)` for emitting updates - Owns generation of `end_stream_chunk` for onRequestClosedUpdateChunk - Encapsulates all update chunk and end stream JavaScript generation **StreamRequest** (lib/react_on_rails_pro/stream_request.rb): - Added Async::Barrier support for non-blocking concurrent task management - Wraps each_chunk execution in Sync block with barrier - Passes barrier to request_executor for spawning async tasks - Ensures proper cleanup and synchronization of async operations **Request** (lib/react_on_rails_pro/request.rb): - Refactored connection architecture with two separate connections: - standard_connection: Uses :stream plugin for regular streaming - incremental_connection: Uses :stream_bidi for bidirectional streaming - Implemented render_code_with_incremental_updates: - Creates bidirectional streaming request with NDJSON format - Spawns async props block in background using barrier.async - Properly closes request stream after async block completes - Selects bundle timestamp based on RSC vs standard rendering - Added build_initial_incremental_request helper using emitter **Tests**: - Added comprehensive tests for render_code_with_incremental_updates - Tests verify NDJSON request format, barrier.async spawning, and emitter usage - Tests for rsc_bundle_hash selection when is_rsc_payload is true - Uses unverified doubles for HTTPX streaming interfaces with justification - Updated stream_request_spec to test barrier parameter passing ## Technical Details: - Uses Async gem with Async::Barrier for concurrent async prop resolution - HTTP2 bidirectional streaming via HTTPX :stream_bidi plugin - NDJSON protocol for streaming communication - Proper separation of concerns: AsyncPropsEmitter owns chunk generation 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent 9a1209e commit 6f94207

File tree

7 files changed

+262
-23
lines changed

7 files changed

+262
-23
lines changed

react_on_rails_pro/app/helpers/react_on_rails_pro_helper.rb

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,11 @@ def stream_react_component(component_name, options = {})
137137
end
138138
end
139139

140+
def stream_react_component_with_async_props(component_name, options = {}, &props_block)
141+
options[:async_props_block] = props_block
142+
stream_react_component(component_name, options)
143+
end
144+
140145
# Renders the React Server Component (RSC) payload for a given component. This helper generates
141146
# a special format designed by React for serializing server components and transmitting them
142147
# to the client.

react_on_rails_pro/lib/react_on_rails_pro/async_props_emitter.rb

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,15 @@ def call(prop_name, prop_value)
2121
# Continue - don't abort entire render because one prop failed
2222
end
2323

24+
# Generates the chunk that should be executed when the request stream closes
25+
# This tells the asyncPropsManager to end the stream
26+
def end_stream_chunk
27+
{
28+
bundleTimestamp: @bundle_timestamp,
29+
updateChunk: generate_end_stream_js
30+
}
31+
end
32+
2433
private
2534

2635
def generate_update_chunk(prop_name, value)
@@ -38,5 +47,14 @@ def generate_set_prop_js(prop_name, value)
3847
})()
3948
JS
4049
end
50+
51+
def generate_end_stream_js
52+
<<~JS.strip
53+
(function(){
54+
var asyncPropsManager = sharedExecutionContext.get("asyncPropsManager");
55+
asyncPropsManager.endStream();
56+
})()
57+
JS
58+
end
4159
end
4260
end

react_on_rails_pro/lib/react_on_rails_pro/request.rb

Lines changed: 68 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,16 @@
33
require "uri"
44
require "httpx"
55
require_relative "stream_request"
6+
require_relative "async_props_emitter"
67

78
module ReactOnRailsPro
89
class Request # rubocop:disable Metrics/ClassLength
910
class << self
1011
def reset_connection
11-
@connection&.close
12-
@connection = create_connection
12+
@standard_connection&.close
13+
@incremental_connection&.close
14+
@standard_connection = nil
15+
@incremental_connection = nil
1316
end
1417

1518
def render_code(path, js_code, send_bundle)
@@ -27,7 +30,7 @@ def render_code_as_stream(path, js_code, is_rsc_payload:)
2730
"rendering any RSC payload."
2831
end
2932

30-
ReactOnRailsPro::StreamRequest.create do |send_bundle|
33+
ReactOnRailsPro::StreamRequest.create do |send_bundle, _barrier|
3134
if send_bundle
3235
Rails.logger.info { "[ReactOnRailsPro] Sending bundle to the node renderer" }
3336
upload_assets
@@ -38,6 +41,45 @@ def render_code_as_stream(path, js_code, is_rsc_payload:)
3841
end
3942
end
4043

44+
def render_code_with_incremental_updates(path, js_code, async_props_block:, is_rsc_payload:)
45+
Rails.logger.info { "[ReactOnRailsPro] Perform incremental rendering request #{path}" }
46+
47+
# Determine bundle timestamp based on RSC support
48+
pool = ReactOnRailsPro::ServerRenderingPool::NodeRenderingPool
49+
bundle_timestamp = is_rsc_payload ? pool.rsc_bundle_hash : pool.server_bundle_hash
50+
51+
ReactOnRailsPro::StreamRequest.create do |send_bundle, barrier|
52+
if send_bundle
53+
Rails.logger.info { "[ReactOnRailsPro] Sending bundle to the node renderer" }
54+
upload_assets
55+
end
56+
57+
# Build bidirectional streaming request
58+
request = incremental_connection.build_request(
59+
"POST",
60+
path,
61+
headers: { "content-type" => "application/x-ndjson" },
62+
body: []
63+
)
64+
65+
# Create emitter and use it to generate initial request data
66+
emitter = ReactOnRailsPro::AsyncPropsEmitter.new(bundle_timestamp, request)
67+
initial_data = build_initial_incremental_request(js_code, emitter)
68+
request.write("#{initial_data.to_json}\n")
69+
70+
response = incremental_connection.request(request, stream: true)
71+
72+
# Execute async props block in background using barrier
73+
barrier.async do
74+
async_props_block.call(emitter)
75+
ensure
76+
request.close
77+
end
78+
79+
response
80+
end
81+
end
82+
4183
def upload_assets
4284
Rails.logger.info { "[ReactOnRailsPro] Uploading assets" }
4385

@@ -87,8 +129,14 @@ def asset_exists_on_vm_renderer?(filename)
87129

88130
private
89131

132+
# rubocop:disable Naming/MemoizedInstanceVariableName
90133
def connection
91-
@connection ||= create_connection
134+
@standard_connection ||= create_standard_connection
135+
end
136+
# rubocop:enable Naming/MemoizedInstanceVariableName
137+
138+
def incremental_connection
139+
@incremental_connection ||= create_incremental_connection
92140
end
93141

94142
def perform_request(path, **post_options) # rubocop:disable Metrics/AbcSize,Metrics/CyclomaticComplexity
@@ -222,7 +270,22 @@ def common_form_data
222270
ReactOnRailsPro::Utils.common_form_data
223271
end
224272

225-
def create_connection # rubocop:disable Metrics/MethodLength, Metrics/AbcSize
273+
def build_initial_incremental_request(js_code, emitter)
274+
common_form_data.merge(
275+
renderingRequest: js_code,
276+
onRequestClosedUpdateChunk: emitter.end_stream_chunk
277+
)
278+
end
279+
280+
def create_standard_connection
281+
build_connection_config.plugin(:stream)
282+
end
283+
284+
def create_incremental_connection
285+
build_connection_config.plugin(:stream_bidi)
286+
end
287+
288+
def build_connection_config # rubocop:disable Metrics/MethodLength, Metrics/AbcSize
226289
url = ReactOnRailsPro.configuration.renderer_url
227290
Rails.logger.info do
228291
"[ReactOnRailsPro] Setting up Node Renderer connection to #{url}"
@@ -266,7 +329,6 @@ def create_connection # rubocop:disable Metrics/MethodLength, Metrics/AbcSize
266329
nil
267330
end
268331
)
269-
.plugin(:stream_bidi)
270332
# See https://www.rubydoc.info/gems/httpx/1.3.3/HTTPX%2FOptions:initialize for the available options
271333
.with(
272334
origin: url,

react_on_rails_pro/lib/react_on_rails_pro/stream_request.rb

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
# frozen_string_literal: true
22

3+
require "async"
4+
require "async/barrier"
5+
36
module ReactOnRailsPro
47
class StreamDecorator
58
def initialize(component)
@@ -92,22 +95,28 @@ def initialize(&request_block)
9295
def each_chunk(&block)
9396
return enum_for(:each_chunk) unless block
9497

95-
send_bundle = false
96-
error_body = +""
97-
loop do
98-
stream_response = @request_executor.call(send_bundle)
99-
100-
# Chunks can be merged during streaming, so we separate them by newlines
101-
# Also, we check the status code inside the loop block because calling `status` outside the loop block
102-
# is blocking, it will wait for the response to be fully received
103-
# Look at the spec of `status` in `spec/react_on_rails_pro/stream_spec.rb` for more details
104-
process_response_chunks(stream_response, error_body, &block)
105-
break
106-
rescue HTTPX::HTTPError => e
107-
send_bundle = handle_http_error(e, error_body, send_bundle)
108-
rescue HTTPX::ReadTimeoutError => e
109-
raise ReactOnRailsPro::Error, "Time out error while server side render streaming a component.\n" \
110-
"Original error:\n#{e}\n#{e.backtrace}"
98+
Sync do
99+
barrier = Async::Barrier.new
100+
101+
send_bundle = false
102+
error_body = +""
103+
loop do
104+
stream_response = @request_executor.call(send_bundle, barrier)
105+
106+
# Chunks can be merged during streaming, so we separate them by newlines
107+
# Also, we check the status code inside the loop block because calling `status` outside the loop block
108+
# is blocking, it will wait for the response to be fully received
109+
# Look at the spec of `status` in `spec/react_on_rails_pro/stream_spec.rb` for more details
110+
process_response_chunks(stream_response, error_body, &block)
111+
break
112+
rescue HTTPX::HTTPError => e
113+
send_bundle = handle_http_error(e, error_body, send_bundle)
114+
rescue HTTPX::ReadTimeoutError => e
115+
raise ReactOnRailsPro::Error, "Time out error while server side render streaming a component.\n" \
116+
"Original error:\n#{e}\n#{e.backtrace}"
117+
end
118+
119+
barrier.wait
111120
end
112121
end
113122

react_on_rails_pro/spec/dummy/Procfile.dev

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Procfile for development with hot reloading of JavaScript and CSS
22

3-
rails: rails s -p 3000
3+
# rails: rails s -p 3000
44

55
# Run the hot reload server for client development
66
webpack-dev-server: HMR=true bin/shakapacker-dev-server

react_on_rails_pro/spec/react_on_rails_pro/request_spec.rb

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
require_relative "spec_helper"
44
require "fakefs/safe"
5+
require "httpx"
6+
7+
HTTPX::Plugins.load_plugin(:stream)
58

69
describe ReactOnRailsPro::Request do
710
let(:logger_mock) { instance_double(ActiveSupport::Logger).as_null_object }
@@ -218,4 +221,102 @@
218221
end
219222
end
220223
end
224+
225+
# Unverified doubles are required for HTTPX bidirectional streaming because:
226+
# 1. HTTPX::StreamResponse doesn't define status in its interface (causes verified double failures)
227+
# 2. The :stream_bidi plugin adds methods (#write, #close, #build_request) not in standard interfaces
228+
# 3. Using double(ClassName) documents the class while allowing interface flexibility
229+
# rubocop:disable RSpec/VerifiedDoubles, RSpec/MultipleMemoizedHelpers
230+
describe "render_code_with_incremental_updates" do
231+
let(:js_code) { "console.log('incremental rendering');" }
232+
let(:async_props_block) { proc { |_emitter| } }
233+
let(:mock_request) { double(HTTPX::Request) }
234+
let(:mock_response) { double(HTTPX::StreamResponse, status: 200) }
235+
let(:mock_connection) { double(HTTPX::Session) }
236+
237+
before do
238+
allow(ReactOnRailsPro::ServerRenderingPool::NodeRenderingPool).to receive_messages(
239+
server_bundle_hash: "server_bundle.js",
240+
rsc_bundle_hash: "rsc_bundle.js"
241+
)
242+
243+
allow(mock_connection).to receive_messages(build_request: mock_request, request: mock_response)
244+
allow(mock_request).to receive(:close)
245+
allow(mock_request).to receive(:write)
246+
allow(mock_response).to receive(:is_a?).with(HTTPX::ErrorResponse).and_return(false)
247+
allow(mock_response).to receive(:each).and_yield("chunk\n")
248+
allow(described_class).to receive(:incremental_connection).and_return(mock_connection)
249+
250+
# Stub AsyncPropsEmitter to return a mock with end_stream_chunk
251+
allow(ReactOnRailsPro::AsyncPropsEmitter).to receive(:new) do |_bundle_timestamp, _request|
252+
double(
253+
ReactOnRailsPro::AsyncPropsEmitter,
254+
end_stream_chunk: { bundleTimestamp: "mocked", updateChunk: "mocked_js" }
255+
)
256+
end
257+
end
258+
259+
it "creates NDJSON request with correct initial data" do
260+
stream = described_class.render_code_with_incremental_updates(
261+
"/render-incremental",
262+
js_code,
263+
async_props_block: async_props_block,
264+
is_rsc_payload: false
265+
)
266+
267+
stream.each_chunk(&:itself)
268+
269+
expect(mock_connection).to have_received(:build_request).with(
270+
"POST",
271+
"/render-incremental",
272+
headers: { "content-type" => "application/x-ndjson" },
273+
body: []
274+
)
275+
expect(mock_request).to have_received(:write).at_least(:once)
276+
end
277+
278+
it "spawns barrier.async task and passes emitter to async_props_block" do
279+
emitter_received = nil
280+
test_async_props_block = proc { |emitter| emitter_received = emitter }
281+
282+
# Allow real emitter to be created for this test
283+
allow(ReactOnRailsPro::AsyncPropsEmitter).to receive(:new).and_call_original
284+
285+
stream = described_class.render_code_with_incremental_updates(
286+
"/render-incremental",
287+
js_code,
288+
async_props_block: test_async_props_block,
289+
is_rsc_payload: false
290+
)
291+
292+
stream.each_chunk(&:itself)
293+
294+
expect(emitter_received).to be_a(ReactOnRailsPro::AsyncPropsEmitter)
295+
end
296+
297+
it "uses rsc_bundle_hash when is_rsc_payload is true" do
298+
allow(ReactOnRailsPro.configuration).to receive(:enable_rsc_support).and_return(true)
299+
300+
emitter_captured = nil
301+
allow(ReactOnRailsPro::AsyncPropsEmitter).to receive(:new) do |bundle_timestamp, request_stream|
302+
emitter_captured = { bundle_timestamp: bundle_timestamp, request_stream: request_stream }
303+
double(
304+
ReactOnRailsPro::AsyncPropsEmitter,
305+
end_stream_chunk: { bundleTimestamp: bundle_timestamp, updateChunk: "mocked_js" }
306+
)
307+
end
308+
309+
stream = described_class.render_code_with_incremental_updates(
310+
"/render-incremental",
311+
js_code,
312+
async_props_block: async_props_block,
313+
is_rsc_payload: true
314+
)
315+
316+
stream.each_chunk(&:itself)
317+
318+
expect(emitter_captured[:bundle_timestamp]).to eq("rsc_bundle.js")
319+
end
320+
end
321+
# rubocop:enable RSpec/VerifiedDoubles, RSpec/MultipleMemoizedHelpers
221322
end

react_on_rails_pro/spec/react_on_rails_pro/stream_request_spec.rb

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22

33
require_relative "spec_helper"
44
require "react_on_rails_pro/stream_request"
5+
require "async/barrier"
6+
require "httpx"
7+
8+
HTTPX::Plugins.load_plugin(:stream)
59

610
RSpec.describe ReactOnRailsPro::StreamRequest do
711
describe ".create" do
@@ -10,4 +14,44 @@
1014
expect(result).to be_a(ReactOnRailsPro::StreamDecorator)
1115
end
1216
end
17+
18+
# Unverified doubles are required for streaming responses because:
19+
# 1. HTTP streaming responses don't have a dedicated class type in HTTPX
20+
# 2. The #each method for streaming is added dynamically at runtime
21+
# 3. The interface varies based on the streaming mode (HTTP/2, chunked, etc.)
22+
# rubocop:disable RSpec/VerifiedDoubles
23+
describe "#each_chunk with barrier" do
24+
it "passes barrier to request_executor block" do
25+
barrier_received = nil
26+
mock_response = double(HTTPX::StreamResponse, status: 200)
27+
allow(mock_response).to receive(:is_a?).with(HTTPX::ErrorResponse).and_return(false)
28+
allow(mock_response).to receive(:each).and_yield("chunk\n")
29+
30+
stream = described_class.create do |_send_bundle, barrier|
31+
barrier_received = barrier
32+
mock_response
33+
end
34+
35+
stream.each_chunk(&:itself)
36+
37+
expect(barrier_received).to be_a(Async::Barrier)
38+
end
39+
40+
it "calls barrier.wait after yielding chunks" do
41+
barrier = Async::Barrier.new
42+
allow(Async::Barrier).to receive(:new).and_return(barrier)
43+
expect(barrier).to receive(:wait)
44+
45+
mock_response = double(HTTPX::StreamResponse, status: 200)
46+
allow(mock_response).to receive(:is_a?).with(HTTPX::ErrorResponse).and_return(false)
47+
allow(mock_response).to receive(:each).and_yield("chunk\n")
48+
49+
stream = described_class.create do |_send_bundle, _barrier|
50+
mock_response
51+
end
52+
53+
stream.each_chunk(&:itself)
54+
end
55+
end
56+
# rubocop:enable RSpec/VerifiedDoubles
1357
end

0 commit comments

Comments
 (0)