Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions crates/client/flashblocks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@ repository.workspace = true
[lints]
workspace = true

[features]
default = []
test-utils = [
"dep:base-reth-test-utils",
"dep:reth-e2e-test-utils",
"dep:reth-optimism-node",
"dep:reth-provider",
"dep:derive_more",
]

[dependencies]
# workspace
base-flashtypes.workspace = true
Expand Down Expand Up @@ -68,7 +78,15 @@ arc-swap.workspace = true
metrics-derive.workspace = true
rayon.workspace = true

# test-utils feature dependencies
base-reth-test-utils = { workspace = true, optional = true }
reth-e2e-test-utils = { workspace = true, optional = true }
reth-optimism-node = { workspace = true, optional = true }
reth-provider = { workspace = true, optional = true }
derive_more = { workspace = true, features = ["deref"], optional = true }

[dev-dependencies]
base-reth-flashblocks = { path = ".", features = ["test-utils"] }
rstest.workspace = true
rand.workspace = true
eyre.workspace = true
Expand Down
3 changes: 3 additions & 0 deletions crates/client/flashblocks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,6 @@ pub use extension::{FlashblocksCanonConfig, FlashblocksCanonExtension, Flashbloc

mod rpc_extension;
pub use rpc_extension::FlashblocksRpcExtension;

#[cfg(feature = "test-utils")]
pub mod test_utils;
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,17 @@
use std::sync::Arc;

use base_flashtypes::Flashblock;
use base_reth_test_utils::{
OpAddOns, OpBuilder, TestHarness, default_launcher, init_silenced_tracing,
};
use derive_more::Deref;
use eyre::Result;
use futures_util::Future;
use reth::builder::NodeHandle;
use reth_e2e_test_utils::Adapter;
use reth_optimism_node::OpNode;

use crate::{
harness::TestHarness,
init_silenced_tracing,
node::{
FlashblocksLocalNode, FlashblocksParts, LocalFlashblocksState, OpAddOns, OpBuilder,
default_launcher,
},
};
use super::{FlashblocksLocalNode, FlashblocksParts, LocalFlashblocksState};

/// Helper that exposes [`TestHarness`] conveniences plus Flashblocks helpers.
#[derive(Debug, Deref)]
Expand Down
275 changes: 275 additions & 0 deletions crates/client/flashblocks/src/test_utils/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,275 @@
//! Test utilities for flashblocks integration tests.
//!
//! This module provides test harnesses and helpers for testing flashblocks functionality.
//! It is gated behind the `test-utils` feature flag.

mod harness;
use std::{
fmt,
sync::{Arc, Mutex},
};

use base_flashtypes::Flashblock;
use base_reth_test_utils::{
LocalNode, LocalNodeProvider, OpAddOns, OpBuilder, default_launcher, init_silenced_tracing,
};
use eyre::Result;
use futures_util::Future;
pub use harness::FlashblocksHarness;
use once_cell::sync::OnceCell;
use reth::builder::NodeHandle;
use reth_e2e_test_utils::Adapter;
use reth_exex::ExExEvent;
use reth_optimism_node::OpNode;
use reth_provider::CanonStateSubscriptions;
use tokio::sync::{mpsc, oneshot};
use tokio_stream::StreamExt;

use crate::{
EthApiExt, EthApiOverrideServer, EthPubSub, EthPubSubApiServer, FlashblocksReceiver,
FlashblocksState,
};

/// Convenience alias for the Flashblocks state backing the local node.
pub type LocalFlashblocksState = FlashblocksState<LocalNodeProvider>;

/// Components that allow tests to interact with the Flashblocks worker tasks.
#[derive(Clone)]
pub struct FlashblocksParts {
sender: mpsc::Sender<(Flashblock, oneshot::Sender<()>)>,
state: Arc<LocalFlashblocksState>,
}

impl fmt::Debug for FlashblocksParts {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FlashblocksParts").finish_non_exhaustive()
}
}

impl FlashblocksParts {
/// Clone the shared [`FlashblocksState`] handle.
pub fn state(&self) -> Arc<LocalFlashblocksState> {
self.state.clone()
}

/// Send a flashblock to the background processor and wait until it is handled.
pub async fn send(&self, flashblock: Flashblock) -> Result<()> {
let (tx, rx) = oneshot::channel();
self.sender.send((flashblock, tx)).await.map_err(|err| eyre::eyre!(err))?;
rx.await.map_err(|err| eyre::eyre!(err))?;
Ok(())
}
}

#[derive(Clone)]
struct FlashblocksNodeExtensions {
inner: Arc<FlashblocksNodeExtensionsInner>,
}

struct FlashblocksNodeExtensionsInner {
sender: mpsc::Sender<(Flashblock, oneshot::Sender<()>)>,
#[allow(clippy::type_complexity)]
receiver: Arc<Mutex<Option<mpsc::Receiver<(Flashblock, oneshot::Sender<()>)>>>>,
fb_cell: Arc<OnceCell<Arc<LocalFlashblocksState>>>,
process_canonical: bool,
}

impl FlashblocksNodeExtensions {
fn new(process_canonical: bool) -> Self {
let (sender, receiver) = mpsc::channel::<(Flashblock, oneshot::Sender<()>)>(100);
let inner = FlashblocksNodeExtensionsInner {
sender,
receiver: Arc::new(Mutex::new(Some(receiver))),
fb_cell: Arc::new(OnceCell::new()),
process_canonical,
};
Self { inner: Arc::new(inner) }
}

fn apply(&self, builder: OpBuilder) -> OpBuilder {
let fb_cell = self.inner.fb_cell.clone();
let receiver = self.inner.receiver.clone();
let process_canonical = self.inner.process_canonical;

let fb_cell_for_exex = fb_cell.clone();

builder
.install_exex("flashblocks-canon", move |mut ctx| {
let fb_cell = fb_cell_for_exex.clone();
let process_canonical = process_canonical;
async move {
let provider = ctx.provider().clone();
let fb = init_flashblocks_state(&fb_cell, &provider);
Ok(async move {
while let Some(note) = ctx.notifications.try_next().await? {
if let Some(committed) = note.committed_chain() {
let hash = committed.tip().num_hash();
if process_canonical {
// Many suites drive canonical updates manually to reproduce race conditions, so
// allowing this to be disabled keeps canonical replay deterministic.
let chain = Arc::unwrap_or_clone(committed);
for (_, block) in chain.into_blocks() {
fb.on_canonical_block_received(block);
}
}
let _ = ctx.events.send(ExExEvent::FinishedHeight(hash));
}
}
Ok(())
})
}
})
.extend_rpc_modules(move |ctx| {
let fb_cell = fb_cell.clone();
let provider = ctx.provider().clone();
let fb = init_flashblocks_state(&fb_cell, &provider);

let mut canon_stream = tokio_stream::wrappers::BroadcastStream::new(
ctx.provider().subscribe_to_canonical_state(),
);
tokio::spawn(async move {
use tokio_stream::StreamExt;
while let Some(Ok(notification)) = canon_stream.next().await {
provider.canonical_in_memory_state().notify_canon_state(notification);
}
});
let api_ext = EthApiExt::new(
ctx.registry.eth_api().clone(),
ctx.registry.eth_handlers().filter.clone(),
fb.clone(),
);
ctx.modules.replace_configured(api_ext.into_rpc())?;

// Register eth_subscribe subscription endpoint for flashblocks
// Uses replace_configured since eth_subscribe already exists from reth's standard module
// Pass eth_api to enable proxying standard subscription types to reth's implementation
let eth_pubsub = EthPubSub::new(ctx.registry.eth_api().clone(), fb.clone());
ctx.modules.replace_configured(eth_pubsub.into_rpc())?;

let fb_for_task = fb.clone();
let mut receiver = receiver
.lock()
.expect("flashblock receiver mutex poisoned")
.take()
.expect("flashblock receiver should only be initialized once");
tokio::spawn(async move {
while let Some((payload, tx)) = receiver.recv().await {
fb_for_task.on_flashblock_received(payload);
let _ = tx.send(());
}
});

Ok(())
})
}

fn wrap_launcher<L, LRet>(&self, launcher: L) -> impl FnOnce(OpBuilder) -> LRet
where
L: FnOnce(OpBuilder) -> LRet,
{
let extensions = self.clone();
move |builder| {
let builder = extensions.apply(builder);
launcher(builder)
}
}

fn parts(&self) -> Result<FlashblocksParts> {
let state = self.inner.fb_cell.get().ok_or_else(|| {
eyre::eyre!("FlashblocksState should be initialized during node launch")
})?;
Ok(FlashblocksParts { sender: self.inner.sender.clone(), state: state.clone() })
}
}

fn init_flashblocks_state(
cell: &Arc<OnceCell<Arc<LocalFlashblocksState>>>,
provider: &LocalNodeProvider,
) -> Arc<LocalFlashblocksState> {
cell.get_or_init(|| {
let fb = Arc::new(FlashblocksState::new(provider.clone(), 5));
fb.start();
fb
})
.clone()
}

/// Local node wrapper that exposes helpers specific to Flashblocks tests.
pub struct FlashblocksLocalNode {
node: LocalNode,
parts: FlashblocksParts,
}

impl fmt::Debug for FlashblocksLocalNode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FlashblocksLocalNode")
.field("node", &self.node)
.field("parts", &self.parts)
.finish()
}
}

impl FlashblocksLocalNode {
/// Launch a flashblocks-enabled node using the default launcher.
pub async fn new() -> Result<Self> {
Self::with_launcher(default_launcher).await
}

/// Builds a flashblocks-enabled node with canonical block streaming disabled so tests can call
/// `FlashblocksState::on_canonical_block_received` at precise points.
pub async fn manual_canonical() -> Result<Self> {
Self::with_manual_canonical_launcher(default_launcher).await
}

/// Launch a flashblocks-enabled node with a custom launcher and canonical processing enabled.
pub async fn with_launcher<L, LRet>(launcher: L) -> Result<Self>
where
L: FnOnce(OpBuilder) -> LRet,
LRet: Future<Output = eyre::Result<NodeHandle<Adapter<OpNode>, OpAddOns>>>,
{
Self::with_launcher_inner(launcher, true).await
}

/// Same as [`Self::with_launcher`] but leaves canonical processing to the caller.
pub async fn with_manual_canonical_launcher<L, LRet>(launcher: L) -> Result<Self>
where
L: FnOnce(OpBuilder) -> LRet,
LRet: Future<Output = eyre::Result<NodeHandle<Adapter<OpNode>, OpAddOns>>>,
{
Self::with_launcher_inner(launcher, false).await
}

async fn with_launcher_inner<L, LRet>(launcher: L, process_canonical: bool) -> Result<Self>
where
L: FnOnce(OpBuilder) -> LRet,
LRet: Future<Output = eyre::Result<NodeHandle<Adapter<OpNode>, OpAddOns>>>,
{
init_silenced_tracing();
let extensions = FlashblocksNodeExtensions::new(process_canonical);
let wrapped_launcher = extensions.wrap_launcher(launcher);
let node = LocalNode::new(wrapped_launcher).await?;

let parts = extensions.parts()?;
Ok(Self { node, parts })
}

/// Access the shared Flashblocks state for assertions or manual driving.
pub fn flashblocks_state(&self) -> Arc<LocalFlashblocksState> {
self.parts.state()
}

/// Send a flashblock through the background processor and await completion.
pub async fn send_flashblock(&self, flashblock: Flashblock) -> Result<()> {
self.parts.send(flashblock).await
}

/// Split the wrapper into the underlying node plus flashblocks parts.
pub fn into_parts(self) -> (LocalNode, FlashblocksParts) {
(self.node, self.parts)
}

/// Borrow the underlying [`LocalNode`].
pub fn as_node(&self) -> &LocalNode {
&self.node
}
}
5 changes: 2 additions & 3 deletions crates/client/flashblocks/tests/eip7702_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ use alloy_sol_types::SolCall;
use base_flashtypes::{
ExecutionPayloadBaseV1, ExecutionPayloadFlashblockDeltaV1, Flashblock, Metadata,
};
use base_reth_test_utils::{
Account, FlashblocksHarness, L1_BLOCK_INFO_DEPOSIT_TX, Minimal7702Account, SignerSync,
};
use base_reth_flashblocks::test_utils::FlashblocksHarness;
use base_reth_test_utils::{Account, L1_BLOCK_INFO_DEPOSIT_TX, Minimal7702Account, SignerSync};
use eyre::Result;
use op_alloy_network::ReceiptResponse;

Expand Down
Loading
Loading