Skip to content

Commit ce3dead

Browse files
committed
spike: simplify test harness
1 parent e58f1ad commit ce3dead

File tree

13 files changed

+317
-295
lines changed

13 files changed

+317
-295
lines changed

Cargo.lock

Lines changed: 2 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/client/flashblocks/Cargo.toml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,16 @@ repository.workspace = true
1111
[lints]
1212
workspace = true
1313

14+
[features]
15+
default = []
16+
test-utils = [
17+
"dep:base-reth-test-utils",
18+
"dep:reth-e2e-test-utils",
19+
"dep:reth-optimism-node",
20+
"dep:reth-provider",
21+
"dep:derive_more",
22+
]
23+
1424
[dependencies]
1525
# workspace
1626
base-flashtypes.workspace = true
@@ -68,7 +78,15 @@ arc-swap.workspace = true
6878
metrics-derive.workspace = true
6979
rayon.workspace = true
7080

81+
# test-utils feature dependencies
82+
base-reth-test-utils = { workspace = true, optional = true }
83+
reth-e2e-test-utils = { workspace = true, optional = true }
84+
reth-optimism-node = { workspace = true, optional = true }
85+
reth-provider = { workspace = true, optional = true }
86+
derive_more = { workspace = true, features = ["deref"], optional = true }
87+
7188
[dev-dependencies]
89+
base-reth-flashblocks = { path = ".", features = ["test-utils"] }
7290
rstest.workspace = true
7391
rand.workspace = true
7492
eyre.workspace = true

crates/client/flashblocks/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,3 +49,6 @@ pub use extension::{FlashblocksCanonConfig, FlashblocksCanonExtension, Flashbloc
4949

5050
mod rpc_extension;
5151
pub use rpc_extension::FlashblocksRpcExtension;
52+
53+
#[cfg(feature = "test-utils")]
54+
pub mod test_utils;

crates/client/test-utils/src/flashblocks_harness.rs renamed to crates/client/flashblocks/src/test_utils/harness.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,17 @@
33
use std::sync::Arc;
44

55
use base_flashtypes::Flashblock;
6+
use base_reth_test_utils::{
7+
OpAddOns, OpBuilder, TestHarness, default_launcher, init_silenced_tracing,
8+
};
69
use derive_more::Deref;
710
use eyre::Result;
811
use futures_util::Future;
912
use reth::builder::NodeHandle;
1013
use reth_e2e_test_utils::Adapter;
1114
use reth_optimism_node::OpNode;
1215

13-
use crate::{
14-
harness::TestHarness,
15-
init_silenced_tracing,
16-
node::{
17-
FlashblocksLocalNode, FlashblocksParts, LocalFlashblocksState, OpAddOns, OpBuilder,
18-
default_launcher,
19-
},
20-
};
16+
use super::{FlashblocksLocalNode, FlashblocksParts, LocalFlashblocksState};
2117

2218
/// Helper that exposes [`TestHarness`] conveniences plus Flashblocks helpers.
2319
#[derive(Debug, Deref)]
Lines changed: 277 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,277 @@
1+
//! Test utilities for flashblocks integration tests.
2+
//!
3+
//! This module provides test harnesses and helpers for testing flashblocks functionality.
4+
//! It is gated behind the `test-utils` feature flag.
5+
6+
mod harness;
7+
pub use harness::FlashblocksHarness;
8+
9+
use std::{
10+
fmt,
11+
sync::{Arc, Mutex},
12+
};
13+
14+
use base_reth_test_utils::{
15+
LocalNode, LocalNodeProvider, OpAddOns, OpBuilder, default_launcher, init_silenced_tracing,
16+
};
17+
use eyre::Result;
18+
use futures_util::Future;
19+
use once_cell::sync::OnceCell;
20+
use reth::builder::NodeHandle;
21+
use reth_e2e_test_utils::Adapter;
22+
use reth_exex::ExExEvent;
23+
use reth_optimism_node::OpNode;
24+
use tokio::sync::{mpsc, oneshot};
25+
use tokio_stream::StreamExt;
26+
27+
use crate::{
28+
EthApiExt, EthApiOverrideServer, EthPubSub, EthPubSubApiServer, FlashblocksReceiver,
29+
FlashblocksState,
30+
};
31+
32+
use base_flashtypes::Flashblock;
33+
use reth_provider::CanonStateSubscriptions;
34+
35+
/// Convenience alias for the Flashblocks state backing the local node.
36+
pub type LocalFlashblocksState = FlashblocksState<LocalNodeProvider>;
37+
38+
/// Components that allow tests to interact with the Flashblocks worker tasks.
39+
#[derive(Clone)]
40+
pub struct FlashblocksParts {
41+
sender: mpsc::Sender<(Flashblock, oneshot::Sender<()>)>,
42+
state: Arc<LocalFlashblocksState>,
43+
}
44+
45+
impl fmt::Debug for FlashblocksParts {
46+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47+
f.debug_struct("FlashblocksParts").finish_non_exhaustive()
48+
}
49+
}
50+
51+
impl FlashblocksParts {
52+
/// Clone the shared [`FlashblocksState`] handle.
53+
pub fn state(&self) -> Arc<LocalFlashblocksState> {
54+
self.state.clone()
55+
}
56+
57+
/// Send a flashblock to the background processor and wait until it is handled.
58+
pub async fn send(&self, flashblock: Flashblock) -> Result<()> {
59+
let (tx, rx) = oneshot::channel();
60+
self.sender.send((flashblock, tx)).await.map_err(|err| eyre::eyre!(err))?;
61+
rx.await.map_err(|err| eyre::eyre!(err))?;
62+
Ok(())
63+
}
64+
}
65+
66+
#[derive(Clone)]
67+
struct FlashblocksNodeExtensions {
68+
inner: Arc<FlashblocksNodeExtensionsInner>,
69+
}
70+
71+
struct FlashblocksNodeExtensionsInner {
72+
sender: mpsc::Sender<(Flashblock, oneshot::Sender<()>)>,
73+
#[allow(clippy::type_complexity)]
74+
receiver: Arc<Mutex<Option<mpsc::Receiver<(Flashblock, oneshot::Sender<()>)>>>>,
75+
fb_cell: Arc<OnceCell<Arc<LocalFlashblocksState>>>,
76+
process_canonical: bool,
77+
}
78+
79+
impl FlashblocksNodeExtensions {
80+
fn new(process_canonical: bool) -> Self {
81+
let (sender, receiver) = mpsc::channel::<(Flashblock, oneshot::Sender<()>)>(100);
82+
let inner = FlashblocksNodeExtensionsInner {
83+
sender,
84+
receiver: Arc::new(Mutex::new(Some(receiver))),
85+
fb_cell: Arc::new(OnceCell::new()),
86+
process_canonical,
87+
};
88+
Self { inner: Arc::new(inner) }
89+
}
90+
91+
fn apply(&self, builder: OpBuilder) -> OpBuilder {
92+
let fb_cell = self.inner.fb_cell.clone();
93+
let receiver = self.inner.receiver.clone();
94+
let process_canonical = self.inner.process_canonical;
95+
96+
let fb_cell_for_exex = fb_cell.clone();
97+
98+
builder
99+
.install_exex("flashblocks-canon", move |mut ctx| {
100+
let fb_cell = fb_cell_for_exex.clone();
101+
let process_canonical = process_canonical;
102+
async move {
103+
let provider = ctx.provider().clone();
104+
let fb = init_flashblocks_state(&fb_cell, &provider);
105+
Ok(async move {
106+
while let Some(note) = ctx.notifications.try_next().await? {
107+
if let Some(committed) = note.committed_chain() {
108+
let hash = committed.tip().num_hash();
109+
if process_canonical {
110+
// Many suites drive canonical updates manually to reproduce race conditions, so
111+
// allowing this to be disabled keeps canonical replay deterministic.
112+
let chain = Arc::unwrap_or_clone(committed);
113+
for (_, block) in chain.into_blocks() {
114+
fb.on_canonical_block_received(block);
115+
}
116+
}
117+
let _ = ctx.events.send(ExExEvent::FinishedHeight(hash));
118+
}
119+
}
120+
Ok(())
121+
})
122+
}
123+
})
124+
.extend_rpc_modules(move |ctx| {
125+
let fb_cell = fb_cell.clone();
126+
let provider = ctx.provider().clone();
127+
let fb = init_flashblocks_state(&fb_cell, &provider);
128+
129+
let mut canon_stream = tokio_stream::wrappers::BroadcastStream::new(
130+
ctx.provider().subscribe_to_canonical_state(),
131+
);
132+
tokio::spawn(async move {
133+
use tokio_stream::StreamExt;
134+
while let Some(Ok(notification)) = canon_stream.next().await {
135+
provider.canonical_in_memory_state().notify_canon_state(notification);
136+
}
137+
});
138+
let api_ext = EthApiExt::new(
139+
ctx.registry.eth_api().clone(),
140+
ctx.registry.eth_handlers().filter.clone(),
141+
fb.clone(),
142+
);
143+
ctx.modules.replace_configured(api_ext.into_rpc())?;
144+
145+
// Register eth_subscribe subscription endpoint for flashblocks
146+
// Uses replace_configured since eth_subscribe already exists from reth's standard module
147+
// Pass eth_api to enable proxying standard subscription types to reth's implementation
148+
let eth_pubsub = EthPubSub::new(ctx.registry.eth_api().clone(), fb.clone());
149+
ctx.modules.replace_configured(eth_pubsub.into_rpc())?;
150+
151+
let fb_for_task = fb.clone();
152+
let mut receiver = receiver
153+
.lock()
154+
.expect("flashblock receiver mutex poisoned")
155+
.take()
156+
.expect("flashblock receiver should only be initialized once");
157+
tokio::spawn(async move {
158+
while let Some((payload, tx)) = receiver.recv().await {
159+
fb_for_task.on_flashblock_received(payload);
160+
let _ = tx.send(());
161+
}
162+
});
163+
164+
Ok(())
165+
})
166+
}
167+
168+
fn wrap_launcher<L, LRet>(&self, launcher: L) -> impl FnOnce(OpBuilder) -> LRet
169+
where
170+
L: FnOnce(OpBuilder) -> LRet,
171+
{
172+
let extensions = self.clone();
173+
move |builder| {
174+
let builder = extensions.apply(builder);
175+
launcher(builder)
176+
}
177+
}
178+
179+
fn parts(&self) -> Result<FlashblocksParts> {
180+
let state = self.inner.fb_cell.get().ok_or_else(|| {
181+
eyre::eyre!("FlashblocksState should be initialized during node launch")
182+
})?;
183+
Ok(FlashblocksParts { sender: self.inner.sender.clone(), state: state.clone() })
184+
}
185+
}
186+
187+
fn init_flashblocks_state(
188+
cell: &Arc<OnceCell<Arc<LocalFlashblocksState>>>,
189+
provider: &LocalNodeProvider,
190+
) -> Arc<LocalFlashblocksState> {
191+
cell.get_or_init(|| {
192+
let fb = Arc::new(FlashblocksState::new(provider.clone(), 5));
193+
fb.start();
194+
fb
195+
})
196+
.clone()
197+
}
198+
199+
/// Local node wrapper that exposes helpers specific to Flashblocks tests.
200+
pub struct FlashblocksLocalNode {
201+
node: LocalNode,
202+
parts: FlashblocksParts,
203+
}
204+
205+
impl fmt::Debug for FlashblocksLocalNode {
206+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
207+
f.debug_struct("FlashblocksLocalNode")
208+
.field("node", &self.node)
209+
.field("parts", &self.parts)
210+
.finish()
211+
}
212+
}
213+
214+
impl FlashblocksLocalNode {
215+
/// Launch a flashblocks-enabled node using the default launcher.
216+
pub async fn new() -> Result<Self> {
217+
Self::with_launcher(default_launcher).await
218+
}
219+
220+
/// Builds a flashblocks-enabled node with canonical block streaming disabled so tests can call
221+
/// `FlashblocksState::on_canonical_block_received` at precise points.
222+
pub async fn manual_canonical() -> Result<Self> {
223+
Self::with_manual_canonical_launcher(default_launcher).await
224+
}
225+
226+
/// Launch a flashblocks-enabled node with a custom launcher and canonical processing enabled.
227+
pub async fn with_launcher<L, LRet>(launcher: L) -> Result<Self>
228+
where
229+
L: FnOnce(OpBuilder) -> LRet,
230+
LRet: Future<Output = eyre::Result<NodeHandle<Adapter<OpNode>, OpAddOns>>>,
231+
{
232+
Self::with_launcher_inner(launcher, true).await
233+
}
234+
235+
/// Same as [`Self::with_launcher`] but leaves canonical processing to the caller.
236+
pub async fn with_manual_canonical_launcher<L, LRet>(launcher: L) -> Result<Self>
237+
where
238+
L: FnOnce(OpBuilder) -> LRet,
239+
LRet: Future<Output = eyre::Result<NodeHandle<Adapter<OpNode>, OpAddOns>>>,
240+
{
241+
Self::with_launcher_inner(launcher, false).await
242+
}
243+
244+
async fn with_launcher_inner<L, LRet>(launcher: L, process_canonical: bool) -> Result<Self>
245+
where
246+
L: FnOnce(OpBuilder) -> LRet,
247+
LRet: Future<Output = eyre::Result<NodeHandle<Adapter<OpNode>, OpAddOns>>>,
248+
{
249+
init_silenced_tracing();
250+
let extensions = FlashblocksNodeExtensions::new(process_canonical);
251+
let wrapped_launcher = extensions.wrap_launcher(launcher);
252+
let node = LocalNode::new(wrapped_launcher).await?;
253+
254+
let parts = extensions.parts()?;
255+
Ok(Self { node, parts })
256+
}
257+
258+
/// Access the shared Flashblocks state for assertions or manual driving.
259+
pub fn flashblocks_state(&self) -> Arc<LocalFlashblocksState> {
260+
self.parts.state()
261+
}
262+
263+
/// Send a flashblock through the background processor and await completion.
264+
pub async fn send_flashblock(&self, flashblock: Flashblock) -> Result<()> {
265+
self.parts.send(flashblock).await
266+
}
267+
268+
/// Split the wrapper into the underlying node plus flashblocks parts.
269+
pub fn into_parts(self) -> (LocalNode, FlashblocksParts) {
270+
(self.node, self.parts)
271+
}
272+
273+
/// Borrow the underlying [`LocalNode`].
274+
pub fn as_node(&self) -> &LocalNode {
275+
&self.node
276+
}
277+
}

crates/client/flashblocks/tests/eip7702_tests.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,8 @@ use alloy_sol_types::SolCall;
1111
use base_flashtypes::{
1212
ExecutionPayloadBaseV1, ExecutionPayloadFlashblockDeltaV1, Flashblock, Metadata,
1313
};
14-
use base_reth_test_utils::{
15-
Account, FlashblocksHarness, L1_BLOCK_INFO_DEPOSIT_TX, Minimal7702Account, SignerSync,
16-
};
14+
use base_reth_flashblocks::test_utils::FlashblocksHarness;
15+
use base_reth_test_utils::{Account, L1_BLOCK_INFO_DEPOSIT_TX, Minimal7702Account, SignerSync};
1716
use eyre::Result;
1817
use op_alloy_network::ReceiptResponse;
1918

0 commit comments

Comments
 (0)