@@ -21,6 +21,7 @@ use executor::WorkloadExecutor;
2121use multiproof:: { SparseTrieUpdate , * } ;
2222use parking_lot:: RwLock ;
2323use prewarm:: PrewarmMetrics ;
24+ use rayon:: iter:: { ParallelBridge , ParallelIterator } ;
2425use reth_engine_primitives:: ExecutableTxIterator ;
2526use reth_evm:: {
2627 execute:: { ExecutableTxFor , WithTxEnv } ,
@@ -40,6 +41,7 @@ use reth_trie_sparse::{
4041} ;
4142use reth_trie_sparse_parallel:: { ParallelSparseTrie , ParallelismThresholds } ;
4243use std:: {
44+ collections:: BTreeMap ,
4345 sync:: {
4446 atomic:: AtomicBool ,
4547 mpsc:: { self , channel} ,
@@ -312,21 +314,50 @@ where
312314 mpsc:: Receiver < Result < WithTxEnv < TxEnvFor < Evm > , I :: Tx > , I :: Error > > ,
313315 usize ,
314316 ) {
317+ let ( transactions, convert) = transactions. into ( ) ;
318+ let transactions = transactions. into_iter ( ) ;
315319 // Get the transaction count for prewarming task
316320 // Use upper bound if available (more accurate), otherwise use lower bound
317321 let ( lower, upper) = transactions. size_hint ( ) ;
318322 let transaction_count_hint = upper. unwrap_or ( lower) ;
319323
324+ // Spawn a task that iterates through all transactions in parallel and sends them to the
325+ // main task.
326+ let ( tx, rx) = mpsc:: channel ( ) ;
327+ self . executor . spawn_blocking ( move || {
328+ transactions. enumerate ( ) . par_bridge ( ) . for_each_with ( tx, |sender, ( idx, tx) | {
329+ let tx = convert ( tx) ;
330+ let tx = tx. map ( |tx| WithTxEnv { tx_env : tx. to_tx_env ( ) , tx : Arc :: new ( tx) } ) ;
331+ let _ = sender. send ( ( idx, tx) ) ;
332+ } ) ;
333+ } ) ;
334+
335+ // Spawn a task that processes out-of-order transactions from the task above and sends them
336+ // to prewarming and execution tasks.
320337 let ( prewarm_tx, prewarm_rx) = mpsc:: channel ( ) ;
321338 let ( execute_tx, execute_rx) = mpsc:: channel ( ) ;
322339 self . executor . spawn_blocking ( move || {
323- for tx in transactions {
324- let tx = tx. map ( |tx| WithTxEnv { tx_env : tx. to_tx_env ( ) , tx : Arc :: new ( tx) } ) ;
340+ let mut next_for_execution = 0 ;
341+ let mut queue = BTreeMap :: new ( ) ;
342+ while let Ok ( ( idx, tx) ) = rx. recv ( ) {
325343 // only send Ok(_) variants to prewarming task
326344 if let Ok ( tx) = & tx {
327345 let _ = prewarm_tx. send ( tx. clone ( ) ) ;
328346 }
329- let _ = execute_tx. send ( tx) ;
347+
348+ if next_for_execution == idx {
349+ let _ = execute_tx. send ( tx) ;
350+ next_for_execution += 1 ;
351+
352+ while let Some ( entry) = queue. first_entry ( ) &&
353+ * entry. key ( ) == next_for_execution
354+ {
355+ let _ = execute_tx. send ( entry. remove ( ) ) ;
356+ next_for_execution += 1 ;
357+ }
358+ } else {
359+ queue. insert ( idx, tx) ;
360+ }
330361 }
331362 } ) ;
332363
@@ -1017,13 +1048,19 @@ mod tests {
10171048
10181049 let provider_factory = BlockchainProvider :: new ( factory) . unwrap ( ) ;
10191050
1020- let mut handle = payload_processor. spawn (
1021- Default :: default ( ) ,
1022- core:: iter:: empty :: < Result < Recovered < TransactionSigned > , core:: convert:: Infallible > > ( ) ,
1023- StateProviderBuilder :: new ( provider_factory. clone ( ) , genesis_hash, None ) ,
1024- OverlayStateProviderFactory :: new ( provider_factory) ,
1025- & TreeConfig :: default ( ) ,
1026- ) ;
1051+ let mut handle =
1052+ payload_processor. spawn (
1053+ Default :: default ( ) ,
1054+ (
1055+ core:: iter:: empty :: <
1056+ Result < Recovered < TransactionSigned > , core:: convert:: Infallible > ,
1057+ > ( ) ,
1058+ std:: convert:: identity,
1059+ ) ,
1060+ StateProviderBuilder :: new ( provider_factory. clone ( ) , genesis_hash, None ) ,
1061+ OverlayStateProviderFactory :: new ( provider_factory) ,
1062+ & TreeConfig :: default ( ) ,
1063+ ) ;
10271064
10281065 let mut state_hook = handle. state_hook ( ) ;
10291066
0 commit comments