From 21a78511dc9d882150f522387c78317f391be539 Mon Sep 17 00:00:00 2001 From: Arsenii Kulikov Date: Sat, 6 Dec 2025 20:59:41 +0400 Subject: [PATCH 1/4] feat: parallelize recovery --- crates/engine/tree/benches/state_root_task.rs | 4 +- .../tree/src/tree/payload_processor/mod.rs | 36 ++++++++++++++-- .../engine/tree/src/tree/payload_validator.rs | 34 +++++++++++---- crates/ethereum/evm/src/lib.rs | 7 +++- crates/evm/evm/src/engine.rs | 42 +++++++++++++++---- crates/optimism/evm/src/lib.rs | 9 ++-- 6 files changed, 106 insertions(+), 26 deletions(-) diff --git a/crates/engine/tree/benches/state_root_task.rs b/crates/engine/tree/benches/state_root_task.rs index b6306678b5b..51698c847be 100644 --- a/crates/engine/tree/benches/state_root_task.rs +++ b/crates/engine/tree/benches/state_root_task.rs @@ -229,9 +229,7 @@ fn bench_state_root(c: &mut Criterion) { black_box({ let mut handle = payload_processor.spawn( Default::default(), - core::iter::empty::< - Result, core::convert::Infallible>, - >(), + (Vec::new(), |_| unreachable!()), StateProviderBuilder::new(provider.clone(), genesis_hash, None), OverlayStateProviderFactory::new(provider), &TreeConfig::default(), diff --git a/crates/engine/tree/src/tree/payload_processor/mod.rs b/crates/engine/tree/src/tree/payload_processor/mod.rs index ed951a81a54..d6a9d235e4d 100644 --- a/crates/engine/tree/src/tree/payload_processor/mod.rs +++ b/crates/engine/tree/src/tree/payload_processor/mod.rs @@ -21,6 +21,7 @@ use executor::WorkloadExecutor; use multiproof::{SparseTrieUpdate, *}; use parking_lot::RwLock; use prewarm::PrewarmMetrics; +use rayon::iter::{ParallelBridge, ParallelIterator}; use reth_engine_primitives::ExecutableTxIterator; use reth_evm::{ execute::{ExecutableTxFor, WithTxEnv}, @@ -40,6 +41,7 @@ use reth_trie_sparse::{ }; use reth_trie_sparse_parallel::{ParallelSparseTrie, ParallelismThresholds}; use std::{ + collections::BTreeMap, sync::{ atomic::AtomicBool, mpsc::{self, channel}, @@ -312,21 +314,49 @@ where mpsc::Receiver, I::Tx>, I::Error>>, usize, ) { + let (transactions, convert) = transactions.into(); + let transactions = transactions.into_iter(); // Get the transaction count for prewarming task // Use upper bound if available (more accurate), otherwise use lower bound let (lower, upper) = transactions.size_hint(); let transaction_count_hint = upper.unwrap_or(lower); + // Spawn a task that iterates through all transactions in parallel and sends them to the + // main task. + let (tx, rx) = mpsc::channel(); + self.executor.spawn_blocking(move || { + transactions.enumerate().par_bridge().for_each_with(tx, |sender, (idx, tx)| { + let tx = convert(tx); + let tx = tx.map(|tx| WithTxEnv { tx_env: tx.to_tx_env(), tx: Arc::new(tx) }); + let _ = sender.send((idx, tx)); + }); + }); + + // Spawn a task that processes out-of-order transactions from the task above and sends them + // to prewarming and execution tasks. let (prewarm_tx, prewarm_rx) = mpsc::channel(); let (execute_tx, execute_rx) = mpsc::channel(); self.executor.spawn_blocking(move || { - for tx in transactions { - let tx = tx.map(|tx| WithTxEnv { tx_env: tx.to_tx_env(), tx: Arc::new(tx) }); + let mut next_for_execution = 0; + let mut queue = BTreeMap::new(); + while let Ok((idx, tx)) = rx.recv() { // only send Ok(_) variants to prewarming task if let Ok(tx) = &tx { let _ = prewarm_tx.send(tx.clone()); } - let _ = execute_tx.send(tx); + + if next_for_execution == idx { + let _ = execute_tx.send(tx); + next_for_execution += 1; + + while queue.first_key_value().is_some_and(|(idx, _)| *idx == next_for_execution) + { + let _ = execute_tx.send(queue.pop_first().unwrap().1); + next_for_execution += 1; + } + } else { + queue.insert(idx, tx); + } } }); diff --git a/crates/engine/tree/src/tree/payload_validator.rs b/crates/engine/tree/src/tree/payload_validator.rs index af368b47b2d..6e4298cd620 100644 --- a/crates/engine/tree/src/tree/payload_validator.rs +++ b/crates/engine/tree/src/tree/payload_validator.rs @@ -213,16 +213,36 @@ where Evm: ConfigureEngineEvm, { match input { - BlockOrPayload::Payload(payload) => Ok(Either::Left( - self.evm_config + BlockOrPayload::Payload(payload) => { + let (iter, convert) = self + .evm_config .tx_iterator_for_payload(payload) .map_err(NewPayloadError::other)? - .map(|res| res.map(Either::Left).map_err(NewPayloadError::other)), - )), + .into(); + + let iter = Either::Left(iter.into_iter().map(Either::Left)); + let convert = move |tx| { + let Either::Left(tx) = tx else { unreachable!() }; + convert(tx).map(Either::Left).map_err(Either::Left) + }; + + Ok(( + iter, + Box::new(convert) + as Box< + dyn Fn(Either<_, _>) -> Result, _> + Send + Sync + 'static, + >, + )) + } BlockOrPayload::Block(block) => { - Ok(Either::Right(block.body().clone_transactions().into_iter().map(|tx| { - Ok(Either::Right(tx.try_into_recovered().map_err(NewPayloadError::other)?)) - }))) + let iter = + Either::Right(block.body().clone_transactions().into_iter().map(Either::Right)); + let convert = move |tx: Either<_, N::SignedTx>| { + let Either::Right(tx) = tx else { unreachable!() }; + tx.try_into_recovered().map(Either::Right).map_err(Either::Right) + }; + + Ok((iter, Box::new(convert))) } } } diff --git a/crates/ethereum/evm/src/lib.rs b/crates/ethereum/evm/src/lib.rs index c0f8adc9c54..3a7473d776a 100644 --- a/crates/ethereum/evm/src/lib.rs +++ b/crates/ethereum/evm/src/lib.rs @@ -289,12 +289,15 @@ where &self, payload: &ExecutionData, ) -> Result, Self::Error> { - Ok(payload.payload.transactions().clone().into_iter().map(|tx| { + let txs = payload.payload.transactions().clone().into_iter(); + let convert = |tx: Bytes| { let tx = TxTy::::decode_2718_exact(tx.as_ref()).map_err(AnyError::new)?; let signer = tx.try_recover().map_err(AnyError::new)?; Ok::<_, AnyError>(tx.with_signer(signer)) - })) + }; + + Ok((txs, convert)) } } diff --git a/crates/evm/evm/src/engine.rs b/crates/evm/evm/src/engine.rs index e8316426079..70fbd144329 100644 --- a/crates/evm/evm/src/engine.rs +++ b/crates/evm/evm/src/engine.rs @@ -18,22 +18,48 @@ pub trait ConfigureEngineEvm: ConfigureEvm { ) -> Result, Self::Error>; } -/// Iterator over executable transactions. -pub trait ExecutableTxIterator: - Iterator> + Send + 'static -{ +/// A helper trait representing a pair of a "raw" transactions iterator and a closure that can be +/// used to convert them to an executable transaction. This tuple is used in the engine to +/// parallelize heavy work like decoding or recovery. +pub trait ExecutableTxTuple: Into<(Self::Iter, Self::Convert)> + Send + 'static { + /// Raw transaction that can be converted to an [`ExecutableTxIterator::Tx`] + type RawTx: Send + Sync + 'static; /// The executable transaction type iterator yields. - type Tx: ExecutableTxFor + Clone + Send + Sync + 'static; + type Tx: Clone + Send + Sync + 'static; /// Errors that may occur while recovering or decoding transactions. type Error: core::error::Error + Send + Sync + 'static; + + /// Iterator over [`ExecutableTxIterator::Tx`] + type Iter: Iterator + Send + 'static; + /// Closure that can be used to convert a [`ExecutableTxIterator::RawTx`] to a + /// [`ExecutableTxIterator::Tx`]. This might involve heavy work like decoding or recovery + /// and will be parallelized in the engine. + type Convert: Fn(Self::RawTx) -> Result + Send + Sync + 'static; } -impl ExecutableTxIterator for T +impl ExecutableTxTuple for (I, F) where - Tx: ExecutableTxFor + Clone + Send + Sync + 'static, + RawTx: Send + Sync + 'static, + Tx: Clone + Send + Sync + 'static, Err: core::error::Error + Send + Sync + 'static, - T: Iterator> + Send + 'static, + I: Iterator + Send + 'static, + F: Fn(RawTx) -> Result + Send + Sync + 'static, { + type RawTx = RawTx; type Tx = Tx; type Error = Err; + + type Iter = I; + type Convert = F; +} + +/// Iterator over executable transactions. +pub trait ExecutableTxIterator: + ExecutableTxTuple> +{ +} + +impl ExecutableTxIterator for T where + T: ExecutableTxTuple> +{ } diff --git a/crates/optimism/evm/src/lib.rs b/crates/optimism/evm/src/lib.rs index e5df16ee2e7..7509edf1f81 100644 --- a/crates/optimism/evm/src/lib.rs +++ b/crates/optimism/evm/src/lib.rs @@ -16,7 +16,7 @@ use alloy_consensus::{BlockHeader, Header}; use alloy_eips::Decodable2718; use alloy_evm::{EvmFactory, FromRecoveredTx, FromTxWithEncoded}; use alloy_op_evm::block::{receipt_builder::OpReceiptBuilder, OpTxEnv}; -use alloy_primitives::U256; +use alloy_primitives::{Bytes, U256}; use core::fmt::Debug; use op_alloy_consensus::EIP1559ParamError; use op_alloy_rpc_types_engine::OpExecutionData; @@ -265,12 +265,15 @@ where &self, payload: &OpExecutionData, ) -> Result, Self::Error> { - Ok(payload.payload.transactions().clone().into_iter().map(|encoded| { + let transactions = payload.payload.transactions().clone().into_iter(); + let convert = |encoded: Bytes| { let tx = TxTy::::decode_2718_exact(encoded.as_ref()) .map_err(AnyError::new)?; let signer = tx.try_recover().map_err(AnyError::new)?; Ok::<_, AnyError>(WithEncoded::new(encoded, tx.with_signer(signer))) - })) + }; + + Ok((transactions, convert)) } } From 5caeb5200d03c8c7eb470b19b20deaf201a93e6b Mon Sep 17 00:00:00 2001 From: Arsenii Kulikov Date: Sat, 6 Dec 2025 21:09:45 +0400 Subject: [PATCH 2/4] clippy --- crates/engine/tree/benches/state_root_task.rs | 10 +++++++++- .../tree/src/tree/payload_processor/mod.rs | 20 ++++++++++++------- examples/custom-node/src/evm/config.rs | 7 +++++-- 3 files changed, 27 insertions(+), 10 deletions(-) diff --git a/crates/engine/tree/benches/state_root_task.rs b/crates/engine/tree/benches/state_root_task.rs index 51698c847be..a99562923b8 100644 --- a/crates/engine/tree/benches/state_root_task.rs +++ b/crates/engine/tree/benches/state_root_task.rs @@ -229,7 +229,15 @@ fn bench_state_root(c: &mut Criterion) { black_box({ let mut handle = payload_processor.spawn( Default::default(), - (Vec::new(), |_| unreachable!()), + ( + core::iter::empty::< + Result< + Recovered, + core::convert::Infallible, + >, + >(), + std::convert::identity, + ), StateProviderBuilder::new(provider.clone(), genesis_hash, None), OverlayStateProviderFactory::new(provider), &TreeConfig::default(), diff --git a/crates/engine/tree/src/tree/payload_processor/mod.rs b/crates/engine/tree/src/tree/payload_processor/mod.rs index d6a9d235e4d..368edd0a793 100644 --- a/crates/engine/tree/src/tree/payload_processor/mod.rs +++ b/crates/engine/tree/src/tree/payload_processor/mod.rs @@ -1047,13 +1047,19 @@ mod tests { let provider_factory = BlockchainProvider::new(factory).unwrap(); - let mut handle = payload_processor.spawn( - Default::default(), - core::iter::empty::, core::convert::Infallible>>(), - StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None), - OverlayStateProviderFactory::new(provider_factory), - &TreeConfig::default(), - ); + let mut handle = + payload_processor.spawn( + Default::default(), + ( + core::iter::empty::< + Result, core::convert::Infallible>, + >(), + std::convert::identity, + ), + StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None), + OverlayStateProviderFactory::new(provider_factory), + &TreeConfig::default(), + ); let mut state_hook = handle.state_hook(); diff --git a/examples/custom-node/src/evm/config.rs b/examples/custom-node/src/evm/config.rs index f2bd3326893..92810439a83 100644 --- a/examples/custom-node/src/evm/config.rs +++ b/examples/custom-node/src/evm/config.rs @@ -25,6 +25,7 @@ use reth_op::{ primitives::SignedTransaction, }; use reth_rpc_api::eth::helpers::pending_block::BuildPendingEnv; +use revm_primitives::Bytes; use std::sync::Arc; #[derive(Debug, Clone)] @@ -126,13 +127,15 @@ impl ConfigureEngineEvm for CustomEvmConfig { &self, payload: &CustomExecutionData, ) -> Result, Self::Error> { - Ok(payload.inner.payload.transactions().clone().into_iter().map(|encoded| { + let transactions = payload.inner.payload.transactions().clone().into_iter(); + let convert = |encoded: Bytes| { let tx = CustomTransaction::decode_2718_exact(encoded.as_ref()) .map_err(Into::into) .map_err(PayloadError::Decode)?; let signer = tx.try_recover().map_err(NewPayloadError::other)?; Ok::<_, NewPayloadError>(WithEncoded::new(encoded, tx.with_signer(signer))) - })) + }; + Ok((transactions, convert)) } } From be239feed58872148c4e61517768c5a11f2dae25 Mon Sep 17 00:00:00 2001 From: Arsenii Kulikov Date: Mon, 8 Dec 2025 15:54:52 +0400 Subject: [PATCH 3/4] review fixes --- crates/engine/tree/src/tree/payload_processor/mod.rs | 5 +++-- crates/engine/tree/src/tree/payload_validator.rs | 9 ++------- crates/evm/evm/src/engine.rs | 3 +++ 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/crates/engine/tree/src/tree/payload_processor/mod.rs b/crates/engine/tree/src/tree/payload_processor/mod.rs index 368edd0a793..98ba7e6f553 100644 --- a/crates/engine/tree/src/tree/payload_processor/mod.rs +++ b/crates/engine/tree/src/tree/payload_processor/mod.rs @@ -349,9 +349,10 @@ where let _ = execute_tx.send(tx); next_for_execution += 1; - while queue.first_key_value().is_some_and(|(idx, _)| *idx == next_for_execution) + while let Some(entry) = queue.first_entry() && + *entry.key() == next_for_execution { - let _ = execute_tx.send(queue.pop_first().unwrap().1); + let _ = execute_tx.send(entry.remove()); next_for_execution += 1; } } else { diff --git a/crates/engine/tree/src/tree/payload_validator.rs b/crates/engine/tree/src/tree/payload_validator.rs index 6e4298cd620..f5cd58dcb8c 100644 --- a/crates/engine/tree/src/tree/payload_validator.rs +++ b/crates/engine/tree/src/tree/payload_validator.rs @@ -226,13 +226,8 @@ where convert(tx).map(Either::Left).map_err(Either::Left) }; - Ok(( - iter, - Box::new(convert) - as Box< - dyn Fn(Either<_, _>) -> Result, _> + Send + Sync + 'static, - >, - )) + // Box the closure to satisfy the `Fn` bound both here and in the branch below + Ok((iter, Box::new(convert) as Box _ + Send + Sync + 'static>)) } BlockOrPayload::Block(block) => { let iter = diff --git a/crates/evm/evm/src/engine.rs b/crates/evm/evm/src/engine.rs index 70fbd144329..95282b5fd92 100644 --- a/crates/evm/evm/src/engine.rs +++ b/crates/evm/evm/src/engine.rs @@ -23,6 +23,9 @@ pub trait ConfigureEngineEvm: ConfigureEvm { /// parallelize heavy work like decoding or recovery. pub trait ExecutableTxTuple: Into<(Self::Iter, Self::Convert)> + Send + 'static { /// Raw transaction that can be converted to an [`ExecutableTxIterator::Tx`] + /// + /// This can be any type that can be converted to an [`ExecutableTxIterator::Tx`]. For example, + /// an unrecovered transaction or just the transaction bytes. type RawTx: Send + Sync + 'static; /// The executable transaction type iterator yields. type Tx: Clone + Send + Sync + 'static; From c0c8cdb1eb9bed7a65204ed10ac9767a8efcb6ab Mon Sep 17 00:00:00 2001 From: Arsenii Kulikov Date: Mon, 8 Dec 2025 16:17:59 +0400 Subject: [PATCH 4/4] docs --- crates/evm/evm/src/engine.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/evm/evm/src/engine.rs b/crates/evm/evm/src/engine.rs index 95282b5fd92..48fa55162bd 100644 --- a/crates/evm/evm/src/engine.rs +++ b/crates/evm/evm/src/engine.rs @@ -22,9 +22,9 @@ pub trait ConfigureEngineEvm: ConfigureEvm { /// used to convert them to an executable transaction. This tuple is used in the engine to /// parallelize heavy work like decoding or recovery. pub trait ExecutableTxTuple: Into<(Self::Iter, Self::Convert)> + Send + 'static { - /// Raw transaction that can be converted to an [`ExecutableTxIterator::Tx`] + /// Raw transaction that can be converted to an [`ExecutableTxTuple::Tx`] /// - /// This can be any type that can be converted to an [`ExecutableTxIterator::Tx`]. For example, + /// This can be any type that can be converted to an [`ExecutableTxTuple::Tx`]. For example, /// an unrecovered transaction or just the transaction bytes. type RawTx: Send + Sync + 'static; /// The executable transaction type iterator yields. @@ -32,10 +32,10 @@ pub trait ExecutableTxTuple: Into<(Self::Iter, Self::Convert)> + Send + 'static /// Errors that may occur while recovering or decoding transactions. type Error: core::error::Error + Send + Sync + 'static; - /// Iterator over [`ExecutableTxIterator::Tx`] + /// Iterator over [`ExecutableTxTuple::Tx`] type Iter: Iterator + Send + 'static; - /// Closure that can be used to convert a [`ExecutableTxIterator::RawTx`] to a - /// [`ExecutableTxIterator::Tx`]. This might involve heavy work like decoding or recovery + /// Closure that can be used to convert a [`ExecutableTxTuple::RawTx`] to a + /// [`ExecutableTxTuple::Tx`]. This might involve heavy work like decoding or recovery /// and will be parallelized in the engine. type Convert: Fn(Self::RawTx) -> Result + Send + Sync + 'static; }