Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions crates/engine/tree/benches/state_root_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,15 @@ fn bench_state_root(c: &mut Criterion) {
black_box({
let mut handle = payload_processor.spawn(
Default::default(),
core::iter::empty::<
Result<Recovered<TransactionSigned>, core::convert::Infallible>,
>(),
(
core::iter::empty::<
Result<
Recovered<TransactionSigned>,
core::convert::Infallible,
>,
>(),
std::convert::identity,
),
StateProviderBuilder::new(provider.clone(), genesis_hash, None),
OverlayStateProviderFactory::new(provider),
&TreeConfig::default(),
Expand Down
56 changes: 46 additions & 10 deletions crates/engine/tree/src/tree/payload_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use executor::WorkloadExecutor;
use multiproof::{SparseTrieUpdate, *};
use parking_lot::RwLock;
use prewarm::PrewarmMetrics;
use rayon::iter::{ParallelBridge, ParallelIterator};
use reth_engine_primitives::ExecutableTxIterator;
use reth_evm::{
execute::{ExecutableTxFor, WithTxEnv},
Expand All @@ -40,6 +41,7 @@ use reth_trie_sparse::{
};
use reth_trie_sparse_parallel::{ParallelSparseTrie, ParallelismThresholds};
use std::{
collections::BTreeMap,
sync::{
atomic::AtomicBool,
mpsc::{self, channel},
Expand Down Expand Up @@ -312,21 +314,49 @@ where
mpsc::Receiver<Result<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>>,
usize,
) {
let (transactions, convert) = transactions.into();
let transactions = transactions.into_iter();
// Get the transaction count for prewarming task
// Use upper bound if available (more accurate), otherwise use lower bound
let (lower, upper) = transactions.size_hint();
let transaction_count_hint = upper.unwrap_or(lower);

// Spawn a task that iterates through all transactions in parallel and sends them to the
// main task.
let (tx, rx) = mpsc::channel();
self.executor.spawn_blocking(move || {
transactions.enumerate().par_bridge().for_each_with(tx, |sender, (idx, tx)| {
let tx = convert(tx);
let tx = tx.map(|tx| WithTxEnv { tx_env: tx.to_tx_env(), tx: Arc::new(tx) });
let _ = sender.send((idx, tx));
});
});

// Spawn a task that processes out-of-order transactions from the task above and sends them
// to prewarming and execution tasks.
let (prewarm_tx, prewarm_rx) = mpsc::channel();
let (execute_tx, execute_rx) = mpsc::channel();
self.executor.spawn_blocking(move || {
for tx in transactions {
let tx = tx.map(|tx| WithTxEnv { tx_env: tx.to_tx_env(), tx: Arc::new(tx) });
let mut next_for_execution = 0;
let mut queue = BTreeMap::new();
while let Ok((idx, tx)) = rx.recv() {
// only send Ok(_) variants to prewarming task
if let Ok(tx) = &tx {
let _ = prewarm_tx.send(tx.clone());
}
let _ = execute_tx.send(tx);

if next_for_execution == idx {
let _ = execute_tx.send(tx);
next_for_execution += 1;

while queue.first_key_value().is_some_and(|(idx, _)| *idx == next_for_execution)
{
let _ = execute_tx.send(queue.pop_first().unwrap().1);
next_for_execution += 1;
}
} else {
queue.insert(idx, tx);
}
}
});

Expand Down Expand Up @@ -1017,13 +1047,19 @@ mod tests {

let provider_factory = BlockchainProvider::new(factory).unwrap();

let mut handle = payload_processor.spawn(
Default::default(),
core::iter::empty::<Result<Recovered<TransactionSigned>, core::convert::Infallible>>(),
StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None),
OverlayStateProviderFactory::new(provider_factory),
&TreeConfig::default(),
);
let mut handle =
payload_processor.spawn(
Default::default(),
(
core::iter::empty::<
Result<Recovered<TransactionSigned>, core::convert::Infallible>,
>(),
std::convert::identity,
),
StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None),
OverlayStateProviderFactory::new(provider_factory),
&TreeConfig::default(),
);

let mut state_hook = handle.state_hook();

Expand Down
34 changes: 27 additions & 7 deletions crates/engine/tree/src/tree/payload_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,16 +213,36 @@ where
Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
{
match input {
BlockOrPayload::Payload(payload) => Ok(Either::Left(
self.evm_config
BlockOrPayload::Payload(payload) => {
let (iter, convert) = self
.evm_config
.tx_iterator_for_payload(payload)
.map_err(NewPayloadError::other)?
.map(|res| res.map(Either::Left).map_err(NewPayloadError::other)),
)),
.into();

let iter = Either::Left(iter.into_iter().map(Either::Left));
let convert = move |tx| {
let Either::Left(tx) = tx else { unreachable!() };
convert(tx).map(Either::Left).map_err(Either::Left)
};

Ok((
iter,
Box::new(convert)
as Box<
dyn Fn(Either<_, _>) -> Result<Either<_, _>, _> + Send + Sync + 'static,
>,
))
}
BlockOrPayload::Block(block) => {
Ok(Either::Right(block.body().clone_transactions().into_iter().map(|tx| {
Ok(Either::Right(tx.try_into_recovered().map_err(NewPayloadError::other)?))
})))
let iter =
Either::Right(block.body().clone_transactions().into_iter().map(Either::Right));
let convert = move |tx: Either<_, N::SignedTx>| {
let Either::Right(tx) = tx else { unreachable!() };
tx.try_into_recovered().map(Either::Right).map_err(Either::Right)
};

Ok((iter, Box::new(convert)))
}
}
}
Expand Down
7 changes: 5 additions & 2 deletions crates/ethereum/evm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,12 +289,15 @@ where
&self,
payload: &ExecutionData,
) -> Result<impl ExecutableTxIterator<Self>, Self::Error> {
Ok(payload.payload.transactions().clone().into_iter().map(|tx| {
let txs = payload.payload.transactions().clone().into_iter();
let convert = |tx: Bytes| {
let tx =
TxTy::<Self::Primitives>::decode_2718_exact(tx.as_ref()).map_err(AnyError::new)?;
let signer = tx.try_recover().map_err(AnyError::new)?;
Ok::<_, AnyError>(tx.with_signer(signer))
}))
};

Ok((txs, convert))
}
}

Expand Down
42 changes: 34 additions & 8 deletions crates/evm/evm/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,48 @@ pub trait ConfigureEngineEvm<ExecutionData>: ConfigureEvm {
) -> Result<impl ExecutableTxIterator<Self>, Self::Error>;
}

/// Iterator over executable transactions.
pub trait ExecutableTxIterator<Evm: ConfigureEvm>:
Iterator<Item = Result<Self::Tx, Self::Error>> + Send + 'static
{
/// A helper trait representing a pair of a "raw" transactions iterator and a closure that can be
/// used to convert them to an executable transaction. This tuple is used in the engine to
/// parallelize heavy work like decoding or recovery.
pub trait ExecutableTxTuple: Into<(Self::Iter, Self::Convert)> + Send + 'static {
/// Raw transaction that can be converted to an [`ExecutableTxIterator::Tx`]
type RawTx: Send + Sync + 'static;
/// The executable transaction type iterator yields.
type Tx: ExecutableTxFor<Evm> + Clone + Send + Sync + 'static;
type Tx: Clone + Send + Sync + 'static;
/// Errors that may occur while recovering or decoding transactions.
type Error: core::error::Error + Send + Sync + 'static;

/// Iterator over [`ExecutableTxIterator::Tx`]
type Iter: Iterator<Item = Self::RawTx> + Send + 'static;
/// Closure that can be used to convert a [`ExecutableTxIterator::RawTx`] to a
/// [`ExecutableTxIterator::Tx`]. This might involve heavy work like decoding or recovery
/// and will be parallelized in the engine.
type Convert: Fn(Self::RawTx) -> Result<Self::Tx, Self::Error> + Send + Sync + 'static;
}

impl<Evm: ConfigureEvm, Tx, Err, T> ExecutableTxIterator<Evm> for T
impl<RawTx, Tx, Err, I, F> ExecutableTxTuple for (I, F)
where
Tx: ExecutableTxFor<Evm> + Clone + Send + Sync + 'static,
RawTx: Send + Sync + 'static,
Tx: Clone + Send + Sync + 'static,
Err: core::error::Error + Send + Sync + 'static,
T: Iterator<Item = Result<Tx, Err>> + Send + 'static,
I: Iterator<Item = RawTx> + Send + 'static,
F: Fn(RawTx) -> Result<Tx, Err> + Send + Sync + 'static,
{
type RawTx = RawTx;
type Tx = Tx;
type Error = Err;

type Iter = I;
type Convert = F;
}

/// Iterator over executable transactions.
pub trait ExecutableTxIterator<Evm: ConfigureEvm>:
ExecutableTxTuple<Tx: ExecutableTxFor<Evm>>
{
}

impl<T, Evm: ConfigureEvm> ExecutableTxIterator<Evm> for T where
T: ExecutableTxTuple<Tx: ExecutableTxFor<Evm>>
{
}
9 changes: 6 additions & 3 deletions crates/optimism/evm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use alloy_consensus::{BlockHeader, Header};
use alloy_eips::Decodable2718;
use alloy_evm::{EvmFactory, FromRecoveredTx, FromTxWithEncoded};
use alloy_op_evm::block::{receipt_builder::OpReceiptBuilder, OpTxEnv};
use alloy_primitives::U256;
use alloy_primitives::{Bytes, U256};
use core::fmt::Debug;
use op_alloy_consensus::EIP1559ParamError;
use op_alloy_rpc_types_engine::OpExecutionData;
Expand Down Expand Up @@ -265,12 +265,15 @@ where
&self,
payload: &OpExecutionData,
) -> Result<impl ExecutableTxIterator<Self>, Self::Error> {
Ok(payload.payload.transactions().clone().into_iter().map(|encoded| {
let transactions = payload.payload.transactions().clone().into_iter();
let convert = |encoded: Bytes| {
let tx = TxTy::<Self::Primitives>::decode_2718_exact(encoded.as_ref())
.map_err(AnyError::new)?;
let signer = tx.try_recover().map_err(AnyError::new)?;
Ok::<_, AnyError>(WithEncoded::new(encoded, tx.with_signer(signer)))
}))
};

Ok((transactions, convert))
}
}

Expand Down
7 changes: 5 additions & 2 deletions examples/custom-node/src/evm/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use reth_op::{
primitives::SignedTransaction,
};
use reth_rpc_api::eth::helpers::pending_block::BuildPendingEnv;
use revm_primitives::Bytes;
use std::sync::Arc;

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -126,13 +127,15 @@ impl ConfigureEngineEvm<CustomExecutionData> for CustomEvmConfig {
&self,
payload: &CustomExecutionData,
) -> Result<impl ExecutableTxIterator<Self>, Self::Error> {
Ok(payload.inner.payload.transactions().clone().into_iter().map(|encoded| {
let transactions = payload.inner.payload.transactions().clone().into_iter();
let convert = |encoded: Bytes| {
let tx = CustomTransaction::decode_2718_exact(encoded.as_ref())
.map_err(Into::into)
.map_err(PayloadError::Decode)?;
let signer = tx.try_recover().map_err(NewPayloadError::other)?;
Ok::<_, NewPayloadError>(WithEncoded::new(encoded, tx.with_signer(signer)))
}))
};
Ok((transactions, convert))
}
}

Expand Down
Loading