diff --git a/Cargo.lock b/Cargo.lock index 2396da1b..874e7cac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1671,11 +1671,11 @@ dependencies = [ "op-alloy-rpc-types", "rand 0.9.2", "rayon", + "reth-chain-state", "reth-chainspec", "reth-db", "reth-db-common", "reth-evm", - "reth-exex", "reth-optimism-chainspec", "reth-optimism-evm", "reth-optimism-node", @@ -1815,7 +1815,6 @@ dependencies = [ "jsonrpsee", "lru 0.16.3", "metrics", - "reth-exex", "reth-node-api", "reth-primitives-traits", "reth-provider", @@ -1824,6 +1823,7 @@ dependencies = [ "serde", "serde_json", "tokio", + "tokio-stream", "tracing", ] diff --git a/crates/builder/op-rbuilder/src/tests/framework/instance.rs b/crates/builder/op-rbuilder/src/tests/framework/instance.rs index 73d9950f..9092623e 100644 --- a/crates/builder/op-rbuilder/src/tests/framework/instance.rs +++ b/crates/builder/op-rbuilder/src/tests/framework/instance.rs @@ -60,7 +60,7 @@ use crate::{ /// This is necessary because clap reads env vars for args with `env = "..."` attributes, /// and external OTEL env vars (e.g., `OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf`) may contain /// values that are incompatible with the CLI's expected values. -fn clear_otel_env_vars() { +pub fn clear_otel_env_vars() { for key in [ "OTEL_EXPORTER_OTLP_ENDPOINT", "OTEL_EXPORTER_OTLP_HEADERS", diff --git a/crates/builder/op-rbuilder/src/tests/framework/macros/src/lib.rs b/crates/builder/op-rbuilder/src/tests/framework/macros/src/lib.rs index fef7f8df..0957dc03 100644 --- a/crates/builder/op-rbuilder/src/tests/framework/macros/src/lib.rs +++ b/crates/builder/op-rbuilder/src/tests/framework/macros/src/lib.rs @@ -239,6 +239,7 @@ pub fn rb_test(args: TokenStream, input: TokenStream) -> TokenStream { let _guard = tracing::subscriber::set_global_default(subscriber); tracing::info!("{} start", stringify!(#test_name)); + crate::tests::clear_otel_env_vars(); let instance = #instance_init; #original_name(instance).await } diff --git a/crates/client/flashblocks/Cargo.toml b/crates/client/flashblocks/Cargo.toml index 832e4d95..f7dc1c46 100644 --- a/crates/client/flashblocks/Cargo.toml +++ b/crates/client/flashblocks/Cargo.toml @@ -16,6 +16,7 @@ test-utils = [ "base-client-node/test-utils", "dep:derive_more", "dep:eyre", + "reth-chain-state/test-utils", "reth-chainspec/test-utils", "reth-evm/test-utils", "reth-primitives/test-utils", @@ -29,7 +30,7 @@ base-flashtypes.workspace = true base-client-node.workspace = true # reth -reth-exex.workspace = true +reth-chain-state.workspace = true reth-evm.workspace = true reth-primitives.workspace = true reth-rpc.workspace = true diff --git a/crates/client/flashblocks/src/extension.rs b/crates/client/flashblocks/src/extension.rs index 17f2b659..0dc4c40c 100644 --- a/crates/client/flashblocks/src/extension.rs +++ b/crates/client/flashblocks/src/extension.rs @@ -1,11 +1,11 @@ //! Contains the [`FlashblocksExtension`] which wires up the flashblocks feature -//! (both the canon ExEx and RPC surface) on the Base node builder. +//! (canonical block subscription and RPC surface) on the Base node builder. use std::sync::Arc; use base_client_node::{BaseNodeExtension, FromExtensionConfig, OpBuilder}; -use futures_util::TryStreamExt; -use reth_exex::ExExEvent; +use reth_chain_state::CanonStateSubscriptions; +use tokio_stream::{StreamExt, wrappers::BroadcastStream}; use tracing::info; use url::Url; @@ -34,7 +34,7 @@ impl FlashblocksConfig { } } -/// Helper struct that wires the Flashblocks feature (canon ExEx and RPC) into the node builder. +/// Helper struct that wires the Flashblocks feature (canonical subscription and RPC) into the node builder. #[derive(Debug)] pub struct FlashblocksExtension { /// Optional Flashblocks configuration (includes state). @@ -59,35 +59,27 @@ impl BaseNodeExtension for FlashblocksExtension { let state = cfg.state; let mut subscriber = FlashblocksSubscriber::new(state.clone(), cfg.websocket_url); - let state_for_exex = state.clone(); + let state_for_canonical = state.clone(); let state_for_rpc = state.clone(); let state_for_start = state; - // Install the canon ExEx - let builder = builder.install_exex("flashblocks-canon", move |mut ctx| { - let fb = state_for_exex; - async move { - Ok(async move { - while let Some(note) = ctx.notifications.try_next().await? { - if let Some(committed) = note.committed_chain() { - let tip = committed.tip().num_hash(); - let chain = Arc::unwrap_or_clone(committed); - for (_, block) in chain.into_blocks() { - fb.on_canonical_block_received(block); - } - let _ = ctx.events.send(ExExEvent::FinishedHeight(tip)); - } - } - Ok(()) - }) - } - }); - - // Start state processor and subscriber after node is started + // Start state processor, subscriber, and canonical subscription after node is started let builder = builder.on_node_started(move |ctx| { info!(message = "Starting Flashblocks state processor"); state_for_start.start(ctx.provider().clone()); subscriber.start(); + + let mut canonical_stream = + BroadcastStream::new(ctx.provider().subscribe_to_canonical_state()); + tokio::spawn(async move { + while let Some(Ok(notification)) = canonical_stream.next().await { + let committed = notification.committed(); + for block in committed.blocks_iter() { + state_for_canonical.on_canonical_block_received(block.clone()); + } + } + }); + Ok(()) }); @@ -95,19 +87,17 @@ impl BaseNodeExtension for FlashblocksExtension { builder.extend_rpc_modules(move |ctx| { info!(message = "Starting Flashblocks RPC"); - let fb = state_for_rpc; - let api_ext = EthApiExt::new( ctx.registry.eth_api().clone(), ctx.registry.eth_handlers().filter.clone(), - fb.clone(), + state_for_rpc.clone(), ); ctx.modules.replace_configured(api_ext.into_rpc())?; // Register the eth_subscribe subscription endpoint for flashblocks // Uses replace_configured since eth_subscribe already exists from reth's standard module // Pass eth_api to enable proxying standard subscription types to reth's implementation - let eth_pubsub = EthPubSub::new(ctx.registry.eth_api().clone(), fb); + let eth_pubsub = EthPubSub::new(ctx.registry.eth_api().clone(), state_for_rpc); ctx.modules.replace_configured(eth_pubsub.into_rpc())?; Ok(()) diff --git a/crates/client/flashblocks/src/test_harness.rs b/crates/client/flashblocks/src/test_harness.rs index 654b2968..2b8feffe 100644 --- a/crates/client/flashblocks/src/test_harness.rs +++ b/crates/client/flashblocks/src/test_harness.rs @@ -21,10 +21,10 @@ use base_client_node::{ use base_flashtypes::Flashblock; use derive_more::Deref; use eyre::Result; +use reth_chain_state::CanonStateSubscriptions; use reth_optimism_chainspec::OpChainSpec; -use reth_provider::CanonStateSubscriptions; use tokio::sync::{mpsc, oneshot}; -use tokio_stream::StreamExt; +use tokio_stream::{StreamExt, wrappers::BroadcastStream}; use crate::{ EthApiExt, EthApiOverrideServer, EthPubSub, EthPubSubApiServer, FlashblocksReceiver, @@ -61,7 +61,7 @@ impl FlashblocksParts { /// Test extension for flashblocks functionality. /// -/// This extension wires up the flashblocks ExEx and RPC modules for testing, +/// This extension wires up the flashblocks canonical subscription and RPC modules for testing, /// with optional control over canonical block processing. #[derive(Clone, Debug)] pub struct FlashblocksTestExtension { @@ -112,77 +112,77 @@ impl BaseNodeExtension for FlashblocksTestExtension { let receiver = self.inner.receiver.clone(); let process_canonical = self.inner.process_canonical; - let state_for_exex = state.clone(); + let state_for_start = state.clone(); let state_for_rpc = state.clone(); - builder - .install_exex("flashblocks-canon", move |mut ctx| { - let fb = state_for_exex.clone(); - async move { - Ok(async move { - use reth_exex::ExExEvent; - while let Some(note) = ctx.notifications.try_next().await? { - if let Some(committed) = note.committed_chain() { - let hash = committed.tip().num_hash(); - if process_canonical { - // Many suites drive canonical updates manually to reproduce race conditions, so - // allowing this to be disabled keeps canonical replay deterministic. - let chain = Arc::unwrap_or_clone(committed); - for (_, block) in chain.into_blocks() { - fb.on_canonical_block_received(block); - } - } - let _ = ctx.events.send(ExExEvent::FinishedHeight(hash)); - } - } - Ok(()) - }) + // Start state processor and subscriptions after node is started + let builder = builder.on_node_started(move |ctx| { + let provider = ctx.provider().clone(); + + // Start the state processor with the provider + state_for_start.start(provider.clone()); + + // Spawn a task to forward canonical state notifications to the in-memory state + let provider_for_notify = provider.clone(); + let mut canon_notify_stream = + BroadcastStream::new(ctx.provider().subscribe_to_canonical_state()); + tokio::spawn(async move { + while let Some(Ok(notification)) = canon_notify_stream.next().await { + provider_for_notify + .canonical_in_memory_state() + .notify_canon_state(notification); } - }) - .extend_rpc_modules(move |ctx| { - let fb = state_for_rpc; - let provider = ctx.provider().clone(); - - // Start the state processor with the provider - fb.start(provider.clone()); + }); - let mut canon_stream = tokio_stream::wrappers::BroadcastStream::new( - ctx.provider().subscribe_to_canonical_state(), - ); + // If process_canonical is enabled, spawn a task to process canonical blocks + if process_canonical { + let state_for_canonical = state_for_start.clone(); + let mut canonical_stream = + BroadcastStream::new(ctx.provider().subscribe_to_canonical_state()); tokio::spawn(async move { - use tokio_stream::StreamExt; - while let Some(Ok(notification)) = canon_stream.next().await { - provider.canonical_in_memory_state().notify_canon_state(notification); - } - }); - let api_ext = EthApiExt::new( - ctx.registry.eth_api().clone(), - ctx.registry.eth_handlers().filter.clone(), - fb.clone(), - ); - ctx.modules.replace_configured(api_ext.into_rpc())?; - - // Register eth_subscribe subscription endpoint for flashblocks - // Uses replace_configured since eth_subscribe already exists from reth's standard module - // Pass eth_api to enable proxying standard subscription types to reth's implementation - let eth_pubsub = EthPubSub::new(ctx.registry.eth_api().clone(), fb.clone()); - ctx.modules.replace_configured(eth_pubsub.into_rpc())?; - - let fb_for_task = fb.clone(); - let mut receiver = receiver - .lock() - .expect("flashblock receiver mutex poisoned") - .take() - .expect("flashblock receiver should only be initialized once"); - tokio::spawn(async move { - while let Some((payload, tx)) = receiver.recv().await { - fb_for_task.on_flashblock_received(payload); - let _ = tx.send(()); + while let Some(Ok(notification)) = canonical_stream.next().await { + let committed = notification.committed(); + for block in committed.blocks_iter() { + state_for_canonical.on_canonical_block_received(block.clone()); + } } }); + } + + Ok(()) + }); + + builder.extend_rpc_modules(move |ctx| { + let fb = state_for_rpc; + + let api_ext = EthApiExt::new( + ctx.registry.eth_api().clone(), + ctx.registry.eth_handlers().filter.clone(), + fb.clone(), + ); + ctx.modules.replace_configured(api_ext.into_rpc())?; + + // Register eth_subscribe subscription endpoint for flashblocks + // Uses replace_configured since eth_subscribe already exists from reth's standard module + // Pass eth_api to enable proxying standard subscription types to reth's implementation + let eth_pubsub = EthPubSub::new(ctx.registry.eth_api().clone(), fb.clone()); + ctx.modules.replace_configured(eth_pubsub.into_rpc())?; + + let fb_for_task = fb.clone(); + let mut receiver = receiver + .lock() + .expect("flashblock receiver mutex poisoned") + .take() + .expect("flashblock receiver should only be initialized once"); + tokio::spawn(async move { + while let Some((payload, tx)) = receiver.recv().await { + fb_for_task.on_flashblock_received(payload); + let _ = tx.send(()); + } + }); - Ok(()) - }) + Ok(()) + }) } } diff --git a/crates/client/txpool/Cargo.toml b/crates/client/txpool/Cargo.toml index 91abe19f..d78a8172 100644 --- a/crates/client/txpool/Cargo.toml +++ b/crates/client/txpool/Cargo.toml @@ -16,7 +16,6 @@ workspace = true base-client-node.workspace = true # reth -reth-exex.workspace = true reth-tracing.workspace = true reth-transaction-pool.workspace = true reth-node-api.workspace = true @@ -31,11 +30,11 @@ jsonrpsee.workspace = true # async tokio.workspace = true +tokio-stream.workspace = true futures.workspace = true # misc lru.workspace = true -eyre.workspace = true chrono.workspace = true metrics.workspace = true derive_more = { workspace = true, features = ["display"] } @@ -43,6 +42,7 @@ tracing.workspace = true serde.workspace = true [dev-dependencies] +eyre.workspace = true httpmock.workspace = true serde_json.workspace = true reth-transaction-pool = { workspace = true, features = ["test-utils"] } diff --git a/crates/client/txpool/src/exex.rs b/crates/client/txpool/src/exex.rs deleted file mode 100644 index dd4797c7..00000000 --- a/crates/client/txpool/src/exex.rs +++ /dev/null @@ -1,43 +0,0 @@ -//! Tracex execution extension wiring. - -use eyre::Result; -use futures::StreamExt; -use reth_exex::ExExContext; -use reth_node_api::FullNodeComponents; -use reth_tracing::tracing::debug; -use reth_transaction_pool::TransactionPool; - -use crate::tracker::Tracker; - -/// Execution extension that tracks transaction timing from mempool to inclusion. -/// -/// Monitors transaction lifecycle events and records timing metrics. -pub async fn tracex_exex( - mut ctx: ExExContext, - enable_logs: bool, -) -> Result<()> { - debug!(target: "tracex", "Starting transaction tracking ExEx"); - let mut tracker = Tracker::new(enable_logs); - - // Subscribe to events from the mempool. - let pool = ctx.pool().clone(); - let mut all_events_stream = pool.all_transactions_event_listener(); - - loop { - tokio::select! { - // Track # of transactions dropped and replaced. - Some(full_event) = all_events_stream.next() => tracker.handle_event(full_event), - - // Use chain notifications to track time to inclusion. - Some(notification) = ctx.notifications.next() => { - match notification { - Ok(notification) => ctx.events.send(tracker.handle_notification(notification))?, - Err(e) => { - debug!(target: "tracex", error = %e, "Notification error"); - return Err(e); - } - } - } - } - } -} diff --git a/crates/client/txpool/src/extension.rs b/crates/client/txpool/src/extension.rs index 3800db71..f58b2563 100644 --- a/crates/client/txpool/src/extension.rs +++ b/crates/client/txpool/src/extension.rs @@ -1,10 +1,12 @@ //! Contains the [`TxPoolExtension`] which wires up the transaction pool features -//! (tracing ExEx and status RPC) on the Base node builder. +//! (tracing subscription and status RPC) on the Base node builder. use base_client_node::{BaseNodeExtension, FromExtensionConfig, OpBuilder}; +use reth_provider::CanonStateSubscriptions; +use tokio_stream::wrappers::BroadcastStream; use tracing::info; -use crate::{TransactionStatusApiImpl, TransactionStatusApiServer, tracex_exex}; +use crate::{TransactionStatusApiImpl, TransactionStatusApiServer, tracex_subscription}; /// Transaction pool configuration. #[derive(Debug, Clone)] @@ -36,20 +38,25 @@ impl BaseNodeExtension for TxPoolExtension { fn apply(self: Box, builder: OpBuilder) -> OpBuilder { let config = self.config; - // Install the tracing ExEx if enabled + // Extend with RPC modules and optionally start tracing subscription + let sequencer_rpc = config.sequencer_rpc; + let tracing_enabled = config.tracing_enabled; let logs_enabled = config.tracing_logs_enabled; - let builder = - builder.install_exex_if(config.tracing_enabled, "tracex", move |ctx| async move { - Ok(tracex_exex(ctx, logs_enabled)) - }); - // Extend with RPC modules - let sequencer_rpc = config.sequencer_rpc; builder.extend_rpc_modules(move |ctx| { info!(message = "Starting Transaction Status RPC"); let proxy_api = TransactionStatusApiImpl::new(sequencer_rpc, ctx.pool().clone()) .expect("Failed to create transaction status proxy"); ctx.modules.merge_configured(proxy_api.into_rpc())?; + + // Start the tracing subscription if enabled + if tracing_enabled { + let canonical_stream = + BroadcastStream::new(ctx.provider().subscribe_to_canonical_state()); + let pool = ctx.pool().clone(); + tokio::spawn(tracex_subscription(canonical_stream, pool, logs_enabled)); + } + Ok(()) }) } diff --git a/crates/client/txpool/src/lib.rs b/crates/client/txpool/src/lib.rs index 3757caa4..045e4736 100644 --- a/crates/client/txpool/src/lib.rs +++ b/crates/client/txpool/src/lib.rs @@ -6,8 +6,8 @@ mod events; pub use events::{EventLog, Pool, TxEvent}; -mod exex; -pub use exex::tracex_exex; +mod subscription; +pub use subscription::tracex_subscription; mod rpc; pub use rpc::{ diff --git a/crates/client/txpool/src/subscription.rs b/crates/client/txpool/src/subscription.rs new file mode 100644 index 00000000..f02666de --- /dev/null +++ b/crates/client/txpool/src/subscription.rs @@ -0,0 +1,42 @@ +//! Tracex canonical block subscription. + +use futures::StreamExt; +use reth_node_api::NodePrimitives; +use reth_provider::CanonStateNotification; +use reth_tracing::tracing::debug; +use reth_transaction_pool::TransactionPool; +use tokio_stream::wrappers::BroadcastStream; + +use crate::tracker::Tracker; + +/// Subscription task that tracks transaction timing from mempool to block inclusion. +/// +/// Monitors transaction lifecycle events and records timing metrics by listening +/// to canonical state notifications and mempool events. +pub async fn tracex_subscription( + canonical_stream: BroadcastStream>, + pool: Pool, + enable_logs: bool, +) where + N: NodePrimitives, + Pool: TransactionPool + 'static, +{ + debug!(target: "tracex", "Starting transaction tracking subscription"); + let mut tracker = Tracker::new(enable_logs); + + // Subscribe to events from the mempool. + let mut all_events_stream = pool.all_transactions_event_listener(); + let mut canonical_stream = canonical_stream; + + loop { + tokio::select! { + // Track # of transactions dropped and replaced. + Some(full_event) = all_events_stream.next() => tracker.handle_event(full_event), + + // Use canonical state notifications to track time to inclusion. + Some(Ok(notification)) = canonical_stream.next() => { + tracker.handle_canon_state_notification(notification); + } + } + } +} diff --git a/crates/client/txpool/src/tracker.rs b/crates/client/txpool/src/tracker.rs index f40d447a..8923ba06 100644 --- a/crates/client/txpool/src/tracker.rs +++ b/crates/client/txpool/src/tracker.rs @@ -8,10 +8,9 @@ use std::{ use alloy_primitives::TxHash; use chrono::Local; use lru::LruCache; -use reth_exex::{ExExEvent, ExExNotification}; use reth_node_api::{BlockBody, NodePrimitives}; -use reth_primitives_traits::{AlloyBlockHeader, transaction::TxHashRef}; -use reth_provider::Chain; +use reth_primitives_traits::transaction::TxHashRef; +use reth_provider::{CanonStateNotification, Chain}; use reth_tracing::tracing::{debug, info}; use reth_transaction_pool::{FullTransactionEvent, PoolTransaction}; @@ -65,26 +64,12 @@ impl Tracker { } } - /// Parse [`ExExNotification`]s and update the tracker. - pub fn handle_notification( + /// Parse [`CanonStateNotification`]s and update the tracker. + pub fn handle_canon_state_notification( &mut self, - notification: ExExNotification, - ) -> ExExEvent { - match notification { - ExExNotification::ChainCommitted { new } => { - self.track_committed_chain(&new); - ExExEvent::FinishedHeight(new.tip().num_hash()) - } - ExExNotification::ChainReorged { old: _, new } => { - debug!(target: "tracex", tip = ?new.tip().number(), "Chain reorg detected"); - self.track_committed_chain(&new); - ExExEvent::FinishedHeight(new.tip().num_hash()) - } - ExExNotification::ChainReverted { old } => { - debug!(target: "tracex", old_tip = ?old.tip().number(), "Chain reverted"); - ExExEvent::FinishedHeight(old.tip().num_hash()) - } - } + notification: CanonStateNotification, + ) { + self.track_committed_chain(¬ification.committed()); } fn track_committed_chain(&mut self, chain: &Chain) {