Skip to content

Commit 34ccc7a

Browse files
committed
par payload tx iter
1 parent aa4a888 commit 34ccc7a

File tree

1 file changed

+12
-8
lines changed
  • crates/engine/tree/src/tree/payload_processor

1 file changed

+12
-8
lines changed

crates/engine/tree/src/tree/payload_processor/mod.rs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use executor::WorkloadExecutor;
2121
use multiproof::{SparseTrieUpdate, *};
2222
use parking_lot::RwLock;
2323
use prewarm::PrewarmMetrics;
24+
use rayon::iter::{ParallelBridge, ParallelIterator};
2425
use reth_engine_primitives::ExecutableTxIterator;
2526
use reth_evm::{
2627
execute::{ExecutableTxFor, WithTxEnv},
@@ -323,14 +324,17 @@ where
323324
let (prewarm_tx, prewarm_rx) = mpsc::channel();
324325
let (execute_tx, execute_rx) = mpsc::channel();
325326
self.executor.spawn_blocking(move || {
326-
for tx in transactions {
327-
let tx = tx.map(|tx| WithTxEnv { tx_env: tx.to_tx_env(), tx: Arc::new(tx) });
328-
// only send Ok(_) variants to prewarming task
329-
if let Ok(tx) = &tx {
330-
let _ = prewarm_tx.send(tx.clone());
331-
}
332-
let _ = execute_tx.send(tx);
333-
}
327+
transactions.par_bridge().for_each_with(
328+
(prewarm_tx, execute_tx),
329+
|(prewarm_tx, execute_tx), tx| {
330+
let tx = tx.map(|tx| WithTxEnv { tx_env: tx.to_tx_env(), tx: Arc::new(tx) });
331+
// only send Ok(_) variants to prewarming task
332+
if let Ok(tx) = &tx {
333+
let _ = prewarm_tx.send(tx.clone());
334+
}
335+
let _ = execute_tx.send(tx);
336+
},
337+
);
334338
});
335339

336340
(prewarm_rx, execute_rx, transaction_count_hint)

0 commit comments

Comments
 (0)