Skip to content

Commit 2f3e12c

Browse files
committed
fix(client): accumulate RPC extension hooks instead of replacing
Reth's `extend_rpc_modules` and `on_node_started` use a replacement model where each call overwrites the previous hook. This caused only the last extension's hooks to execute, making endpoints like `base_meterBundle` and `base_transactionStatus` unavailable. Introduce `BaseBuilder` wrapper that accumulates hooks in vectors, then applies them all in a single reth call when launching. This maintains the existing extension API while fixing the accumulation issue. Closes #438
1 parent 8ba470e commit 2f3e12c

File tree

11 files changed

+234
-94
lines changed

11 files changed

+234
-94
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/client/flashblocks/src/extension.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
44
use std::sync::Arc;
55

6-
use base_client_node::{BaseNodeExtension, FromExtensionConfig, OpBuilder};
6+
use base_client_node::{BaseBuilder, BaseNodeExtension, FromExtensionConfig};
77
use futures_util::TryStreamExt;
88
use reth_exex::ExExEvent;
99
use tracing::info;
@@ -50,7 +50,7 @@ impl FlashblocksExtension {
5050

5151
impl BaseNodeExtension for FlashblocksExtension {
5252
/// Applies the extension to the supplied builder.
53-
fn apply(self: Box<Self>, builder: OpBuilder) -> OpBuilder {
53+
fn apply(self: Box<Self>, mut builder: BaseBuilder) -> BaseBuilder {
5454
let Some(cfg) = self.config else {
5555
info!(message = "flashblocks integration is disabled");
5656
return builder;
@@ -63,8 +63,8 @@ impl BaseNodeExtension for FlashblocksExtension {
6363
let state_for_rpc = state.clone();
6464
let state_for_start = state;
6565

66-
// Install the canon ExEx
67-
let builder = builder.install_exex("flashblocks-canon", move |mut ctx| {
66+
// Install the canon ExEx (directly on inner builder - no accumulation needed)
67+
builder.builder = builder.builder.install_exex("flashblocks-canon", move |mut ctx| {
6868
let fb = state_for_exex;
6969
async move {
7070
Ok(async move {
@@ -84,15 +84,15 @@ impl BaseNodeExtension for FlashblocksExtension {
8484
});
8585

8686
// Start state processor and subscriber after node is started
87-
let builder = builder.on_node_started(move |ctx| {
87+
let builder = builder.add_node_started_hook(move |ctx| {
8888
info!(message = "Starting Flashblocks state processor");
8989
state_for_start.start(ctx.provider().clone());
9090
subscriber.start();
9191
Ok(())
9292
});
9393

9494
// Extend with RPC modules
95-
builder.extend_rpc_modules(move |ctx| {
95+
builder.add_rpc_module(move |ctx| {
9696
info!(message = "Starting Flashblocks RPC");
9797

9898
let fb = state_for_rpc;

crates/client/flashblocks/src/test_harness.rs

Lines changed: 67 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,19 @@ use std::{
1313
};
1414

1515
use base_client_node::{
16-
BaseNodeExtension,
16+
BaseBuilder, BaseNodeExtension,
1717
test_utils::{
1818
LocalNode, NODE_STARTUP_DELAY_MS, TestHarness, build_test_genesis, init_silenced_tracing,
1919
},
2020
};
2121
use base_flashtypes::Flashblock;
2222
use derive_more::Deref;
2323
use eyre::Result;
24+
use futures_util::TryStreamExt as _;
25+
use reth_exex::ExExEvent;
2426
use reth_optimism_chainspec::OpChainSpec;
2527
use reth_provider::CanonStateSubscriptions;
2628
use tokio::sync::{mpsc, oneshot};
27-
use tokio_stream::StreamExt;
2829

2930
use crate::{
3031
EthApiExt, EthApiOverrideServer, EthPubSub, EthPubSubApiServer, FlashblocksReceiver,
@@ -107,82 +108,82 @@ impl FlashblocksTestExtension {
107108
}
108109

109110
impl BaseNodeExtension for FlashblocksTestExtension {
110-
fn apply(self: Box<Self>, builder: base_client_node::OpBuilder) -> base_client_node::OpBuilder {
111+
fn apply(self: Box<Self>, mut builder: BaseBuilder) -> BaseBuilder {
111112
let state = self.inner.state.clone();
112113
let receiver = self.inner.receiver.clone();
113114
let process_canonical = self.inner.process_canonical;
114115

115116
let state_for_exex = state.clone();
116117
let state_for_rpc = state.clone();
117118

118-
builder
119-
.install_exex("flashblocks-canon", move |mut ctx| {
120-
let fb = state_for_exex.clone();
121-
async move {
122-
Ok(async move {
123-
use reth_exex::ExExEvent;
124-
while let Some(note) = ctx.notifications.try_next().await? {
125-
if let Some(committed) = note.committed_chain() {
126-
let hash = committed.tip().num_hash();
127-
if process_canonical {
128-
// Many suites drive canonical updates manually to reproduce race conditions, so
129-
// allowing this to be disabled keeps canonical replay deterministic.
130-
let chain = Arc::unwrap_or_clone(committed);
131-
for (_, block) in chain.into_blocks() {
132-
fb.on_canonical_block_received(block);
133-
}
119+
// Install the canon ExEx (directly on inner builder - no accumulation needed)
120+
builder.builder = builder.builder.install_exex("flashblocks-canon", move |mut ctx| {
121+
let fb = state_for_exex.clone();
122+
async move {
123+
Ok(async move {
124+
while let Some(note) = ctx.notifications.try_next().await? {
125+
if let Some(committed) = note.committed_chain() {
126+
let hash = committed.tip().num_hash();
127+
if process_canonical {
128+
// Many suites drive canonical updates manually to reproduce race conditions, so
129+
// allowing this to be disabled keeps canonical replay deterministic.
130+
let chain = Arc::unwrap_or_clone(committed);
131+
for (_, block) in chain.into_blocks() {
132+
fb.on_canonical_block_received(block);
134133
}
135-
let _ = ctx.events.send(ExExEvent::FinishedHeight(hash));
136134
}
135+
let _ = ctx.events.send(ExExEvent::FinishedHeight(hash));
137136
}
138-
Ok(())
139-
})
140-
}
141-
})
142-
.extend_rpc_modules(move |ctx| {
143-
let fb = state_for_rpc;
144-
let provider = ctx.provider().clone();
145-
146-
// Start the state processor with the provider
147-
fb.start(provider.clone());
148-
149-
let mut canon_stream = tokio_stream::wrappers::BroadcastStream::new(
150-
ctx.provider().subscribe_to_canonical_state(),
151-
);
152-
tokio::spawn(async move {
153-
use tokio_stream::StreamExt;
154-
while let Some(Ok(notification)) = canon_stream.next().await {
155-
provider.canonical_in_memory_state().notify_canon_state(notification);
156-
}
157-
});
158-
let api_ext = EthApiExt::new(
159-
ctx.registry.eth_api().clone(),
160-
ctx.registry.eth_handlers().filter.clone(),
161-
fb.clone(),
162-
);
163-
ctx.modules.replace_configured(api_ext.into_rpc())?;
164-
165-
// Register eth_subscribe subscription endpoint for flashblocks
166-
// Uses replace_configured since eth_subscribe already exists from reth's standard module
167-
// Pass eth_api to enable proxying standard subscription types to reth's implementation
168-
let eth_pubsub = EthPubSub::new(ctx.registry.eth_api().clone(), fb.clone());
169-
ctx.modules.replace_configured(eth_pubsub.into_rpc())?;
170-
171-
let fb_for_task = fb.clone();
172-
let mut receiver = receiver
173-
.lock()
174-
.expect("flashblock receiver mutex poisoned")
175-
.take()
176-
.expect("flashblock receiver should only be initialized once");
177-
tokio::spawn(async move {
178-
while let Some((payload, tx)) = receiver.recv().await {
179-
fb_for_task.on_flashblock_received(payload);
180-
let _ = tx.send(());
181137
}
182-
});
138+
Ok(())
139+
})
140+
}
141+
});
142+
143+
builder.add_rpc_module(move |ctx| {
144+
let fb = state_for_rpc;
145+
let provider = ctx.provider().clone();
146+
147+
// Start the state processor with the provider
148+
fb.start(provider.clone());
149+
150+
let mut canon_stream = tokio_stream::wrappers::BroadcastStream::new(
151+
ctx.provider().subscribe_to_canonical_state(),
152+
);
153+
tokio::spawn(async move {
154+
use tokio_stream::StreamExt;
155+
while let Some(Ok(notification)) = canon_stream.next().await {
156+
provider.canonical_in_memory_state().notify_canon_state(notification);
157+
}
158+
});
159+
let api_ext = EthApiExt::new(
160+
ctx.registry.eth_api().clone(),
161+
ctx.registry.eth_handlers().filter.clone(),
162+
fb.clone(),
163+
);
164+
ctx.modules.replace_configured(api_ext.into_rpc())?;
165+
166+
// Register eth_subscribe subscription endpoint for flashblocks
167+
// Uses replace_configured since eth_subscribe already exists from reth's standard module
168+
// Pass eth_api to enable proxying standard subscription types to reth's implementation
169+
let eth_pubsub = EthPubSub::new(ctx.registry.eth_api().clone(), fb.clone());
170+
ctx.modules.replace_configured(eth_pubsub.into_rpc())?;
171+
172+
let fb_for_task = fb.clone();
173+
let mut receiver = receiver
174+
.lock()
175+
.expect("flashblock receiver mutex poisoned")
176+
.take()
177+
.expect("flashblock receiver should only be initialized once");
178+
tokio::spawn(async move {
179+
while let Some((payload, tx)) = receiver.recv().await {
180+
fb_for_task.on_flashblock_received(payload);
181+
let _ = tx.send(());
182+
}
183+
});
183184

184-
Ok(())
185-
})
185+
Ok(())
186+
})
186187
}
187188
}
188189

crates/client/metering/src/extension.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
44
use std::sync::Arc;
55

6-
use base_client_node::{BaseNodeExtension, FromExtensionConfig, OpBuilder};
6+
use base_client_node::{BaseBuilder, BaseNodeExtension, FromExtensionConfig};
77
use base_flashblocks::{FlashblocksConfig, FlashblocksState};
88
use tracing::info;
99

@@ -27,14 +27,14 @@ impl MeteringExtension {
2727

2828
impl BaseNodeExtension for MeteringExtension {
2929
/// Applies the extension to the supplied builder.
30-
fn apply(self: Box<Self>, builder: OpBuilder) -> OpBuilder {
30+
fn apply(self: Box<Self>, builder: BaseBuilder) -> BaseBuilder {
3131
if !self.enabled {
3232
return builder;
3333
}
3434

3535
let flashblocks_config = self.flashblocks_config;
3636

37-
builder.extend_rpc_modules(move |ctx| {
37+
builder.add_rpc_module(move |ctx| {
3838
info!(message = "Starting Metering RPC");
3939

4040
// Get flashblocks state from config, or create a default one if not configured

crates/client/node/src/builder.rs

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
//! Wrapper around the OP node builder that accumulates hooks instead of replacing them.
2+
3+
use std::fmt;
4+
5+
use eyre::Result;
6+
use reth_node_builder::{
7+
NodeAdapter, NodeComponentsBuilder,
8+
node::FullNode,
9+
rpc::{RethRpcAddOns, RpcContext},
10+
};
11+
12+
use crate::{
13+
OpBuilder,
14+
types::{OpAddOns, OpComponentsBuilder, OpNodeTypes},
15+
};
16+
17+
/// Convenience alias for the OP node adapter type used by the reth builder.
18+
pub(crate) type OpNodeAdapter = NodeAdapter<
19+
OpNodeTypes,
20+
<OpComponentsBuilder as NodeComponentsBuilder<OpNodeTypes>>::Components,
21+
>;
22+
23+
/// Convenience alias for the OP Eth API type exposed by the reth RPC add-ons.
24+
type OpEthApi = <OpAddOns as RethRpcAddOns<OpNodeAdapter>>::EthApi;
25+
26+
/// Convenience alias for the full OP node handle produced after launch.
27+
type OpFullNode = FullNode<OpNodeAdapter, OpAddOns>;
28+
29+
/// Alias for the RPC context used by Base extensions.
30+
pub type BaseRpcContext<'a> = RpcContext<'a, OpNodeAdapter, OpEthApi>;
31+
32+
/// Hook type for extending RPC modules.
33+
type RpcModuleHook = Box<dyn FnMut(&mut BaseRpcContext<'_>) -> Result<()> + Send + 'static>;
34+
35+
/// Hook type for node-started callbacks.
36+
type NodeStartedHook = Box<dyn FnMut(OpFullNode) -> Result<()> + Send + 'static>;
37+
38+
/// A thin wrapper over [`OpBuilder`] that accumulates RPC and node-start hooks.
39+
pub struct BaseBuilder {
40+
/// The underlying OP node builder.
41+
///
42+
/// Exposed publicly so extensions can call methods that don't need accumulation
43+
/// (e.g., `install_exex`) directly on the inner builder.
44+
pub builder: OpBuilder,
45+
rpc_hooks: Vec<RpcModuleHook>,
46+
node_started_hooks: Vec<NodeStartedHook>,
47+
}
48+
49+
impl BaseBuilder {
50+
/// Create a new BaseBuilder wrapping the provided OP builder.
51+
pub const fn new(builder: OpBuilder) -> Self {
52+
Self { builder, rpc_hooks: Vec::new(), node_started_hooks: Vec::new() }
53+
}
54+
55+
/// Consumes the wrapper and returns the inner builder after installing the accumulated hooks.
56+
pub fn build(self) -> OpBuilder {
57+
let Self { mut builder, mut rpc_hooks, node_started_hooks } = self;
58+
59+
if !rpc_hooks.is_empty() {
60+
builder = builder.extend_rpc_modules(move |mut ctx: BaseRpcContext<'_>| {
61+
for hook in rpc_hooks.iter_mut() {
62+
hook(&mut ctx)?;
63+
}
64+
65+
Ok(())
66+
});
67+
}
68+
69+
if !node_started_hooks.is_empty() {
70+
builder = builder.on_node_started(move |full_node: OpFullNode| {
71+
let mut hooks = node_started_hooks;
72+
for hook in hooks.iter_mut() {
73+
hook(full_node.clone())?;
74+
}
75+
Ok(())
76+
});
77+
}
78+
79+
builder
80+
}
81+
82+
/// Adds an RPC hook that will run when RPC modules are configured.
83+
pub fn add_rpc_module<F>(mut self, hook: F) -> Self
84+
where
85+
F: FnOnce(&mut BaseRpcContext<'_>) -> Result<()> + Send + 'static,
86+
{
87+
let mut hook = Some(hook);
88+
self.rpc_hooks.push(Box::new(move |ctx| {
89+
if let Some(hook) = hook.take() {
90+
hook(ctx)?;
91+
}
92+
Ok(())
93+
}));
94+
self
95+
}
96+
97+
/// Adds a node-started hook that will run after the node has started.
98+
pub fn add_node_started_hook<F>(mut self, hook: F) -> Self
99+
where
100+
F: FnOnce(OpFullNode) -> Result<()> + Send + 'static,
101+
{
102+
let mut hook = Some(hook);
103+
self.node_started_hooks.push(Box::new(move |node| {
104+
if let Some(hook) = hook.take() {
105+
hook(node)?;
106+
}
107+
Ok(())
108+
}));
109+
self
110+
}
111+
112+
/// Launches the node after applying accumulated hooks, delegating to the provided closure.
113+
pub fn launch_with_fn<L, R>(self, launcher: L) -> R
114+
where
115+
L: FnOnce(OpBuilder) -> R,
116+
{
117+
launcher(self.build())
118+
}
119+
}
120+
121+
impl fmt::Debug for BaseBuilder {
122+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
123+
f.debug_struct("BaseBuilder").finish_non_exhaustive()
124+
}
125+
}

crates/client/node/src/extension.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@
22
33
use std::fmt::Debug;
44

5-
use crate::OpBuilder;
5+
use crate::BaseBuilder;
66

77
/// Customizes the node builder before launch.
88
///
99
/// Register extensions via [`BaseNodeRunner::install_ext`].
1010
pub trait BaseNodeExtension: Send + Sync + Debug {
1111
/// Applies the extension to the supplied builder.
12-
fn apply(self: Box<Self>, builder: OpBuilder) -> OpBuilder;
12+
fn apply(self: Box<Self>, builder: BaseBuilder) -> BaseBuilder;
1313
}
1414

1515
/// An extension that can be built from a config.

0 commit comments

Comments
 (0)