Skip to content

Commit 1efe556

Browse files
feat(flashblocks): parallelize sender recovery
Addresses #282 - reduces sender recovery overhead during flashblock processing. ## Changes - Add rayon for parallel iteration over transactions - Batch ECDSA sender recovery upfront using par_iter() - Add sender_recovery_duration metric for observability - Preserve cache lookup behavior (check prev_pending_blocks first) - Maintain original error handling semantics ## Technical Approach Follows upstream reth pattern (paradigmxyz/reth#20169) - recover all senders in parallel before the sequential transaction processing loop. ## Testing All 10 existing flashblocks tests pass. ## Note Benchmarks not included as benchmark infrastructure was removed from main. The new sender_recovery_duration metric can be used for production measurement.
1 parent 328e8aa commit 1efe556

File tree

5 files changed

+33
-8
lines changed

5 files changed

+33
-8
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ lru = "0.16.2"
140140
rand = "0.9.2"
141141
uuid = "1.19.0"
142142
time = "0.3.44"
143+
rayon = "1.11"
143144
clap = "4.5.53"
144145
eyre = "0.6.12"
145146
bytes = "1.11.0"

crates/flashblocks/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ tracing.workspace = true
5454
metrics.workspace = true
5555
arc-swap.workspace = true
5656
metrics-derive.workspace = true
57+
rayon.workspace = true
5758

5859
[dev-dependencies]
5960
rand.workspace = true

crates/flashblocks/src/metrics.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ pub struct Metrics {
2323
#[metric(describe = "Time taken to process a message")]
2424
pub block_processing_duration: Histogram,
2525

26+
/// Time spent on parallel sender recovery (ECDSA operations).
27+
#[metric(describe = "Time spent on parallel sender recovery")]
28+
pub sender_recovery_duration: Histogram,
29+
2630
/// Number of Flashblocks that arrive in an unexpected order.
2731
#[metric(describe = "Number of Flashblocks that arrive in an unexpected order")]
2832
pub unexpected_block_order: Counter,

crates/flashblocks/src/processor.rs

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use alloy_consensus::{
1111
transaction::{Recovered, SignerRecoverable, TransactionMeta},
1212
};
1313
use alloy_eips::BlockNumberOrTag;
14-
use alloy_primitives::{B256, BlockNumber, Bytes, Sealable, map::foldhash::HashMap};
14+
use alloy_primitives::{Address, B256, BlockNumber, Bytes, Sealable, map::foldhash::HashMap};
1515
use alloy_rpc_types::{TransactionTrait, Withdrawal};
1616
use alloy_rpc_types_engine::{ExecutionPayloadV1, ExecutionPayloadV2, ExecutionPayloadV3};
1717
use alloy_rpc_types_eth::state::StateOverride;
@@ -21,6 +21,7 @@ use eyre::eyre;
2121
use op_alloy_consensus::OpTxEnvelope;
2222
use op_alloy_network::TransactionResponse;
2323
use op_alloy_rpc_types::Transaction;
24+
use rayon::prelude::*;
2425
use reth::{
2526
chainspec::{ChainSpecProvider, EthChainSpec},
2627
providers::{BlockReaderIdExt, StateProviderFactory},
@@ -393,16 +394,33 @@ where
393394
let mut gas_used = 0;
394395
let mut next_log_index = 0;
395396

397+
// Parallel sender recovery - batch all ECDSA operations upfront
398+
let recovery_start = Instant::now();
399+
let sender_by_hash: HashMap<B256, Address> = block
400+
.body
401+
.transactions
402+
.par_iter()
403+
.map(|tx| -> eyre::Result<(B256, Address)> {
404+
let tx_hash = tx.tx_hash();
405+
let sender = match prev_pending_blocks
406+
.as_ref()
407+
.and_then(|p| p.get_transaction_sender(&tx_hash))
408+
{
409+
Some(cached) => cached,
410+
None => tx.recover_signer()?,
411+
};
412+
Ok((tx_hash, sender))
413+
})
414+
.collect::<eyre::Result<_>>()?;
415+
self.metrics.sender_recovery_duration.record(recovery_start.elapsed());
416+
396417
for (idx, transaction) in block.body.transactions.iter().enumerate() {
397418
let tx_hash = transaction.tx_hash();
398419

399-
let sender = match prev_pending_blocks
400-
.as_ref()
401-
.and_then(|p| p.get_transaction_sender(&tx_hash))
402-
{
403-
Some(sender) => sender,
404-
None => transaction.recover_signer()?,
405-
};
420+
// Sender already recovered in parallel phase
421+
let sender = *sender_by_hash
422+
.get(&tx_hash)
423+
.expect("sender must exist from parallel recovery");
406424

407425
pending_blocks_builder.with_transaction_sender(tx_hash, sender);
408426
pending_blocks_builder.increment_nonce(sender);

0 commit comments

Comments
 (0)