Skip to content

Commit fd8618d

Browse files
committed
fix(client): accumulate RPC extension hooks instead of replacing
Reth's `extend_rpc_modules` and `on_node_started` use a replacement model where each call overwrites the previous hook. This caused only the last extension's hooks to execute, making endpoints like `base_meterBundle` and `base_transactionStatus` unavailable. Introduce `BaseBuilder` wrapper that accumulates hooks in vectors, then applies them all in a single reth call when launching. This maintains the existing extension API while fixing the accumulation issue. Closes #438
1 parent 35fc4be commit fd8618d

File tree

10 files changed

+156
-25
lines changed

10 files changed

+156
-25
lines changed

crates/client/flashblocks/src/extension.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
44
use std::sync::Arc;
55

6-
use base_client_node::{BaseNodeExtension, FromExtensionConfig, OpBuilder};
6+
use base_client_node::{BaseBuilder, BaseNodeExtension, FromExtensionConfig};
77
use reth_chain_state::CanonStateSubscriptions;
88
use tokio_stream::{StreamExt, wrappers::BroadcastStream};
99
use tracing::info;
@@ -50,7 +50,7 @@ impl FlashblocksExtension {
5050

5151
impl BaseNodeExtension for FlashblocksExtension {
5252
/// Applies the extension to the supplied builder.
53-
fn apply(self: Box<Self>, builder: OpBuilder) -> OpBuilder {
53+
fn apply(self: Box<Self>, builder: BaseBuilder) -> BaseBuilder {
5454
let Some(cfg) = self.config else {
5555
info!(message = "flashblocks integration is disabled");
5656
return builder;
@@ -64,7 +64,7 @@ impl BaseNodeExtension for FlashblocksExtension {
6464
let state_for_start = state;
6565

6666
// Start state processor, subscriber, and canonical subscription after node is started
67-
let builder = builder.on_node_started(move |ctx| {
67+
let builder = builder.add_node_started_hook(move |ctx| {
6868
info!(message = "Starting Flashblocks state processor");
6969
state_for_start.start(ctx.provider().clone());
7070
subscriber.start();
@@ -84,7 +84,7 @@ impl BaseNodeExtension for FlashblocksExtension {
8484
});
8585

8686
// Extend with RPC modules
87-
builder.extend_rpc_modules(move |ctx| {
87+
builder.add_rpc_module(move |ctx| {
8888
info!(message = "Starting Flashblocks RPC");
8989

9090
let api_ext = EthApiExt::new(

crates/client/flashblocks/src/test_harness.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use std::{
1313
};
1414

1515
use base_client_node::{
16-
BaseNodeExtension,
16+
BaseBuilder, BaseNodeExtension,
1717
test_utils::{
1818
LocalNode, NODE_STARTUP_DELAY_MS, TestHarness, build_test_genesis, init_silenced_tracing,
1919
},
@@ -107,7 +107,7 @@ impl FlashblocksTestExtension {
107107
}
108108

109109
impl BaseNodeExtension for FlashblocksTestExtension {
110-
fn apply(self: Box<Self>, builder: base_client_node::OpBuilder) -> base_client_node::OpBuilder {
110+
fn apply(self: Box<Self>, builder: BaseBuilder) -> BaseBuilder {
111111
let state = self.inner.state.clone();
112112
let receiver = self.inner.receiver.clone();
113113
let process_canonical = self.inner.process_canonical;
@@ -116,7 +116,7 @@ impl BaseNodeExtension for FlashblocksTestExtension {
116116
let state_for_rpc = state.clone();
117117

118118
// Start state processor and subscriptions after node is started
119-
let builder = builder.on_node_started(move |ctx| {
119+
let builder = builder.add_node_started_hook(move |ctx| {
120120
let provider = ctx.provider().clone();
121121

122122
// Start the state processor with the provider
@@ -152,7 +152,7 @@ impl BaseNodeExtension for FlashblocksTestExtension {
152152
Ok(())
153153
});
154154

155-
builder.extend_rpc_modules(move |ctx| {
155+
builder.add_rpc_module(move |ctx| {
156156
let fb = state_for_rpc;
157157

158158
let api_ext = EthApiExt::new(

crates/client/metering/src/extension.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
44
use std::sync::Arc;
55

6-
use base_client_node::{BaseNodeExtension, FromExtensionConfig, OpBuilder};
6+
use base_client_node::{BaseBuilder, BaseNodeExtension, FromExtensionConfig};
77
use base_flashblocks::{FlashblocksConfig, FlashblocksState};
88
use tracing::info;
99

@@ -27,14 +27,14 @@ impl MeteringExtension {
2727

2828
impl BaseNodeExtension for MeteringExtension {
2929
/// Applies the extension to the supplied builder.
30-
fn apply(self: Box<Self>, builder: OpBuilder) -> OpBuilder {
30+
fn apply(self: Box<Self>, builder: BaseBuilder) -> BaseBuilder {
3131
if !self.enabled {
3232
return builder;
3333
}
3434

3535
let flashblocks_config = self.flashblocks_config;
3636

37-
builder.extend_rpc_modules(move |ctx| {
37+
builder.add_rpc_module(move |ctx| {
3838
info!(message = "Starting Metering RPC");
3939

4040
// Get flashblocks state from config, or create a default one if not configured

crates/client/node/src/builder.rs

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
//! Wrapper around the OP node builder that accumulates hooks instead of replacing them.
2+
3+
use eyre::Result;
4+
use reth_node_builder::{
5+
NodeAdapter, NodeComponentsBuilder,
6+
node::FullNode,
7+
rpc::{RethRpcAddOns, RpcContext},
8+
};
9+
use std::fmt;
10+
11+
use crate::OpBuilder;
12+
use crate::types::{OpAddOns, OpComponentsBuilder, OpNodeTypes};
13+
14+
/// Convenience alias for the OP node adapter type used by the reth builder.
15+
pub(crate) type OpNodeAdapter = NodeAdapter<
16+
OpNodeTypes,
17+
<OpComponentsBuilder as NodeComponentsBuilder<OpNodeTypes>>::Components,
18+
>;
19+
20+
/// Convenience alias for the OP Eth API type exposed by the reth RPC add-ons.
21+
type OpEthApi = <OpAddOns as RethRpcAddOns<OpNodeAdapter>>::EthApi;
22+
23+
/// Convenience alias for the full OP node handle produced after launch.
24+
type OpFullNode = FullNode<OpNodeAdapter, OpAddOns>;
25+
26+
/// Alias for the RPC context used by Base extensions.
27+
pub type BaseRpcContext<'a> = RpcContext<'a, OpNodeAdapter, OpEthApi>;
28+
29+
/// Hook type for extending RPC modules.
30+
type RpcModuleHook = Box<dyn FnMut(&mut BaseRpcContext<'_>) -> Result<()> + Send + 'static>;
31+
32+
/// Hook type for node-started callbacks.
33+
type NodeStartedHook = Box<dyn FnMut(OpFullNode) -> Result<()> + Send + 'static>;
34+
35+
/// A thin wrapper over [`OpBuilder`] that accumulates RPC and node-start hooks.
36+
pub struct BaseBuilder {
37+
builder: OpBuilder,
38+
rpc_hooks: Vec<RpcModuleHook>,
39+
node_started_hooks: Vec<NodeStartedHook>,
40+
}
41+
42+
impl BaseBuilder {
43+
/// Create a new BaseBuilder wrapping the provided OP builder.
44+
pub const fn new(builder: OpBuilder) -> Self {
45+
Self { builder, rpc_hooks: Vec::new(), node_started_hooks: Vec::new() }
46+
}
47+
48+
/// Consumes the wrapper and returns the inner builder after installing the accumulated hooks.
49+
pub fn build(self) -> OpBuilder {
50+
let BaseBuilder { mut builder, mut rpc_hooks, node_started_hooks } = self;
51+
52+
if !rpc_hooks.is_empty() {
53+
builder = builder.extend_rpc_modules(move |mut ctx: BaseRpcContext<'_>| {
54+
for hook in rpc_hooks.iter_mut() {
55+
hook(&mut ctx)?;
56+
}
57+
58+
Ok(())
59+
});
60+
}
61+
62+
if !node_started_hooks.is_empty() {
63+
builder = builder.on_node_started(move |full_node: OpFullNode| {
64+
let mut hooks = node_started_hooks;
65+
for hook in hooks.iter_mut() {
66+
hook(full_node.clone())?;
67+
}
68+
Ok(())
69+
});
70+
}
71+
72+
builder
73+
}
74+
75+
/// Adds an RPC hook that will run when RPC modules are configured.
76+
pub fn add_rpc_module<F>(mut self, hook: F) -> Self
77+
where
78+
F: FnOnce(&mut BaseRpcContext<'_>) -> Result<()> + Send + 'static,
79+
{
80+
let mut hook = Some(hook);
81+
self.rpc_hooks.push(Box::new(move |ctx| {
82+
if let Some(hook) = hook.take() {
83+
hook(ctx)?;
84+
}
85+
Ok(())
86+
}));
87+
self
88+
}
89+
90+
/// Adds a node-started hook that will run after the node has started.
91+
pub fn add_node_started_hook<F>(mut self, hook: F) -> Self
92+
where
93+
F: FnOnce(OpFullNode) -> Result<()> + Send + 'static,
94+
{
95+
let mut hook = Some(hook);
96+
self.node_started_hooks.push(Box::new(move |node| {
97+
if let Some(hook) = hook.take() {
98+
hook(node)?;
99+
}
100+
Ok(())
101+
}));
102+
self
103+
}
104+
105+
/// Launches the node after applying accumulated hooks, delegating to the provided closure.
106+
pub fn launch_with_fn<L, R>(self, launcher: L) -> R
107+
where
108+
L: FnOnce(OpBuilder) -> R,
109+
{
110+
launcher(self.build())
111+
}
112+
}
113+
114+
impl fmt::Debug for BaseBuilder {
115+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
116+
f.debug_struct("BaseBuilder").finish_non_exhaustive()
117+
}
118+
}

crates/client/node/src/extension.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@
22
33
use std::fmt::Debug;
44

5-
use crate::OpBuilder;
5+
use crate::BaseBuilder;
66

77
/// Customizes the node builder before launch.
88
///
99
/// Register extensions via [`BaseNodeRunner::install_ext`].
1010
pub trait BaseNodeExtension: Send + Sync + Debug {
1111
/// Applies the extension to the supplied builder.
12-
fn apply(self: Box<Self>, builder: OpBuilder) -> OpBuilder;
12+
fn apply(self: Box<Self>, builder: BaseBuilder) -> BaseBuilder;
1313
}
1414

1515
/// An extension that can be built from a config.

crates/client/node/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
44
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
55

6+
mod builder;
7+
pub use builder::{BaseBuilder, BaseRpcContext};
8+
69
mod extension;
710
pub use extension::{BaseNodeExtension, FromExtensionConfig};
811

crates/client/node/src/runner.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use reth_optimism_node::{OpNode, args::RollupArgs};
66
use reth_provider::providers::BlockchainProvider;
77
use tracing::info;
88

9-
use crate::{BaseNodeBuilder, BaseNodeExtension, BaseNodeHandle, FromExtensionConfig};
9+
use crate::{BaseBuilder, BaseNodeBuilder, BaseNodeExtension, BaseNodeHandle, FromExtensionConfig};
1010

1111
/// Wraps the Base node configuration and orchestrates builder wiring.
1212
#[derive(Debug)]
@@ -50,8 +50,9 @@ impl BaseNodeRunner {
5050
.with_add_ons(op_node.add_ons())
5151
.on_component_initialized(move |_ctx| Ok(()));
5252

53-
let builder =
54-
extensions.into_iter().fold(builder, |builder, extension| extension.apply(builder));
53+
let builder = extensions
54+
.into_iter()
55+
.fold(BaseBuilder::new(builder), |builder, extension| extension.apply(builder));
5556

5657
builder
5758
.launch_with_fn(|builder| {

crates/client/node/src/test_utils/node.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use reth_optimism_node::{OpNode, args::RollupArgs};
2222
use reth_provider::providers::BlockchainProvider;
2323
use reth_tasks::TaskManager;
2424

25-
use crate::{BaseNodeExtension, OpProvider, test_utils::engine::EngineApi};
25+
use crate::{BaseBuilder, BaseNodeExtension, OpProvider, test_utils::engine::EngineApi};
2626

2727
/// Convenience alias for the local blockchain provider type.
2828
pub type LocalNodeProvider = OpProvider;
@@ -103,8 +103,9 @@ impl LocalNode {
103103
.on_component_initialized(move |_ctx| Ok(()));
104104

105105
// Apply all extensions
106-
let builder =
107-
extensions.into_iter().fold(builder, |builder, extension| extension.apply(builder));
106+
let builder = extensions
107+
.into_iter()
108+
.fold(BaseBuilder::new(builder), |builder, extension| extension.apply(builder));
108109

109110
// Launch with EngineNodeLauncher
110111
let NodeHandle { node: node_handle, node_exit_future } = builder
@@ -166,6 +167,11 @@ impl LocalNode {
166167
Ok(RootProvider::<Optimism>::new(client))
167168
}
168169

170+
/// HTTP RPC address for the local node.
171+
pub const fn http_addr(&self) -> SocketAddr {
172+
self.http_api_addr
173+
}
174+
169175
/// Build an Engine API client that talks to the node's IPC endpoint.
170176
pub fn engine_api(&self) -> Result<EngineApi<crate::test_utils::engine::IpcEngine>> {
171177
EngineApi::<crate::test_utils::engine::IpcEngine>::new(self.engine_ipc_path.clone())

crates/client/node/src/types.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,12 @@ use reth_optimism_chainspec::OpChainSpec;
1111
use reth_optimism_node::OpNode;
1212
use reth_provider::providers::BlockchainProvider;
1313

14-
type OpNodeTypes = FullNodeTypesAdapter<OpNode, Arc<DatabaseEnv>, OpProvider>;
15-
type OpComponentsBuilder = <OpNode as Node<OpNodeTypes>>::ComponentsBuilder;
16-
type OpAddOns = <OpNode as Node<OpNodeTypes>>::AddOns;
14+
/// Internal alias for the OP node type adapter.
15+
pub(crate) type OpNodeTypes = FullNodeTypesAdapter<OpNode, Arc<DatabaseEnv>, OpProvider>;
16+
/// Internal alias for the OP node components builder.
17+
pub(crate) type OpComponentsBuilder = <OpNode as Node<OpNodeTypes>>::ComponentsBuilder;
18+
/// Internal alias for the OP node add-ons.
19+
pub(crate) type OpAddOns = <OpNode as Node<OpNodeTypes>>::AddOns;
1720

1821
/// A [`BlockchainProvider`] instance.
1922
pub type OpProvider = BlockchainProvider<NodeTypesWithDBAdapter<OpNode, Arc<DatabaseEnv>>>;

crates/client/txpool/src/extension.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//! Contains the [`TxPoolExtension`] which wires up the transaction pool features
22
//! (tracing subscription and status RPC) on the Base node builder.
33
4-
use base_client_node::{BaseNodeExtension, FromExtensionConfig, OpBuilder};
4+
use base_client_node::{BaseBuilder, BaseNodeExtension, FromExtensionConfig};
55
use reth_provider::CanonStateSubscriptions;
66
use tokio_stream::wrappers::BroadcastStream;
77
use tracing::info;
@@ -35,15 +35,15 @@ impl TxPoolExtension {
3535

3636
impl BaseNodeExtension for TxPoolExtension {
3737
/// Applies the extension to the supplied builder.
38-
fn apply(self: Box<Self>, builder: OpBuilder) -> OpBuilder {
38+
fn apply(self: Box<Self>, builder: BaseBuilder) -> BaseBuilder {
3939
let config = self.config;
4040

4141
// Extend with RPC modules and optionally start tracing subscription
4242
let sequencer_rpc = config.sequencer_rpc;
4343
let tracing_enabled = config.tracing_enabled;
4444
let logs_enabled = config.tracing_logs_enabled;
4545

46-
builder.extend_rpc_modules(move |ctx| {
46+
builder.add_rpc_module(move |ctx| {
4747
info!(message = "Starting Transaction Status RPC");
4848
let proxy_api = TransactionStatusApiImpl::new(sequencer_rpc, ctx.pool().clone())
4949
.expect("Failed to create transaction status proxy");

0 commit comments

Comments
 (0)