From ec057d04f2003eb72aca46e787a70c38c376e4a1 Mon Sep 17 00:00:00 2001 From: Andre Benedito <38482987+andr-dev@users.noreply.github.com> Date: Wed, 22 Oct 2025 16:00:01 -0400 Subject: [PATCH 1/4] Move txpool sequencer out of tracked pool --- monad-eth-txpool/src/pool/mod.rs | 182 ++++++++++++++---- .../src/pool/{tracked => }/sequencer.rs | 16 +- monad-eth-txpool/src/pool/tracked/mod.rs | 109 +---------- 3 files changed, 159 insertions(+), 148 deletions(-) rename monad-eth-txpool/src/pool/{tracked => }/sequencer.rs (96%) diff --git a/monad-eth-txpool/src/pool/mod.rs b/monad-eth-txpool/src/pool/mod.rs index 2527f6883f..beda97a190 100644 --- a/monad-eth-txpool/src/pool/mod.rs +++ b/monad-eth-txpool/src/pool/mod.rs @@ -27,27 +27,35 @@ use monad_chain_config::{ ChainConfig, MockChainConfig, }; use monad_consensus_types::{ - block::{BlockPolicyError, ConsensusBlockHeader, ProposedExecutionInputs}, + block::{ + BlockPolicyBlockValidator, BlockPolicyError, ConsensusBlockHeader, ProposedExecutionInputs, + }, payload::RoundSignature, }; use monad_crypto::certificate_signature::{ CertificateSignaturePubKey, CertificateSignatureRecoverable, }; -use monad_eth_block_policy::{timestamp_ns_to_secs, EthBlockPolicy, EthValidatedBlock}; +use monad_eth_block_policy::{ + timestamp_ns_to_secs, EthBlockPolicy, EthBlockPolicyBlockValidator, EthValidatedBlock, +}; use monad_eth_txpool_types::{EthTxPoolDropReason, EthTxPoolInternalDropReason, EthTxPoolSnapshot}; use monad_eth_types::{EthBlockBody, EthExecutionProtocol, ExtractEthAddress, ProposedEthHeader}; use monad_state_backend::{StateBackend, StateBackendError}; use monad_system_calls::{SystemTransactionGenerator, SYSTEM_SENDER_ETH_ADDRESS}; -use monad_types::{Epoch, NodeId, Round, SeqNum}; +use monad_types::{DropTimer, Epoch, NodeId, Round, SeqNum}; use monad_validator::signature_collection::SignatureCollection; use rayon::iter::{IntoParallelIterator, ParallelIterator}; use tracing::{debug, error, info, warn}; pub use self::transaction::max_eip2718_encoded_length; -use self::{pending::PendingTxMap, tracked::TrackedTxMap, transaction::ValidEthTransaction}; +use self::{ + pending::PendingTxMap, sequencer::ProposalSequencer, tracked::TrackedTxMap, + transaction::ValidEthTransaction, +}; use crate::EthTxPoolEventTracker; mod pending; +mod sequencer; mod tracked; mod transaction; @@ -334,42 +342,18 @@ where .map(|tx| tx.length() as u64) .sum(); - let user_transactions = if let Some(last_commit) = self.last_commit.as_ref() { - let last_commit_seq_num = last_commit.seq_num; - - assert!( - block_policy.get_last_commit().ge(&last_commit_seq_num), - "txpool received block policy with lower committed seq num" - ); - - if last_commit_seq_num == block_policy.get_last_commit() { - self.tracked.create_proposal( - event_tracker, - self.chain_id, - proposed_seq_num, - base_fee, - tx_limit - system_transactions.len(), - proposal_gas_limit, - proposal_byte_limit - system_txs_size, - block_policy, - extending_blocks.iter().collect(), - state_backend, - chain_config, - &self.chain_revision, - &self.execution_revision, - )? - } else { - error!( - block_policy_last_commit = block_policy.get_last_commit().0, - txpool_last_commit = last_commit_seq_num.0, - "last commit update does not match block policy last commit" - ); - Vec::default() - } - } else { - error!("txpool create_proposal called before last committed block set"); - Vec::default() - }; + let user_transactions = self.sequence_user_transactions( + event_tracker, + proposed_seq_num, + base_fee, + tx_limit - system_transactions.len(), + proposal_gas_limit, + proposal_byte_limit - system_txs_size, + extending_blocks.iter().collect(), + block_policy, + state_backend, + chain_config, + )?; let body = EthBlockBody { transactions: system_transactions @@ -635,6 +619,124 @@ where .map(|sys_txn| sys_txn.into()) .collect_vec()) } + + pub fn sequence_user_transactions( + &mut self, + event_tracker: &mut EthTxPoolEventTracker<'_>, + proposed_seq_num: SeqNum, + base_fee: u64, + tx_limit: usize, + proposal_gas_limit: u64, + proposal_byte_limit: u64, + extending_blocks: Vec<&EthValidatedBlock>, + block_policy: &EthBlockPolicy, + state_backend: &SBT, + chain_config: &CCT, + ) -> Result>, BlockPolicyError> { + let _timer = DropTimer::start(Duration::ZERO, |elapsed| { + debug!(?elapsed, "txpool create_proposal"); + }); + + let Some(last_commit) = self.last_commit.as_ref() else { + error!("txpool create_proposal called before last committed block set"); + return Ok(Vec::default()); + }; + + let last_commit_seq_num = last_commit.seq_num; + + assert!( + block_policy.get_last_commit().ge(&last_commit_seq_num), + "txpool received block policy with lower committed seq num" + ); + + if last_commit_seq_num != block_policy.get_last_commit() { + error!( + block_policy_last_commit = block_policy.get_last_commit().0, + txpool_last_commit = last_commit_seq_num.0, + "txpool last commit update does not match block policy last commit" + ); + return Ok(Vec::default()); + } + + if tx_limit == 0 { + warn!("txpool create_proposal called with zero tx_limit"); + return Ok(Vec::default()); + } + + let sequencer = + ProposalSequencer::new(self.tracked.iter(), &extending_blocks, base_fee, tx_limit); + let sequencer_len = sequencer.len(); + + if sequencer.is_empty() { + return Ok(Vec::default()); + } + + let (account_balances, state_backend_lookups) = { + let _timer = DropTimer::start(Duration::ZERO, |elapsed| { + debug!( + ?elapsed, + "txpool create_proposal compute account base balances" + ); + }); + + let total_db_lookups_before = state_backend.total_db_lookups(); + + ( + block_policy.compute_account_base_balances( + proposed_seq_num, + state_backend, + chain_config, + Some(&extending_blocks), + sequencer.addresses(), + )?, + state_backend.total_db_lookups() - total_db_lookups_before, + ) + }; + + info!( + addresses = self.tracked.num_addresses(), + num_txs = self.tracked.num_txs(), + sequencer_len, + account_balances = account_balances.len(), + ?state_backend_lookups, + "txpool sequencing transactions" + ); + + let validator = EthBlockPolicyBlockValidator::new( + proposed_seq_num, + block_policy.get_execution_delay(), + base_fee, + &self.chain_revision, + &self.execution_revision, + )?; + + let proposal = sequencer.build_proposal( + tx_limit, + proposal_gas_limit, + proposal_byte_limit, + chain_config, + account_balances, + validator, + ); + + let proposal_num_txs = proposal.txs.len(); + + event_tracker.record_create_proposal( + self.tracked.num_addresses(), + sequencer_len, + state_backend_lookups, + proposal_num_txs, + ); + + info!( + ?proposed_seq_num, + ?proposal_num_txs, + proposal_total_gas = proposal.total_gas, + "created proposal" + ); + + Ok(proposal.txs) + } } impl EthTxPool diff --git a/monad-eth-txpool/src/pool/tracked/sequencer.rs b/monad-eth-txpool/src/pool/sequencer.rs similarity index 96% rename from monad-eth-txpool/src/pool/tracked/sequencer.rs rename to monad-eth-txpool/src/pool/sequencer.rs index 95f5a95653..011951df93 100644 --- a/monad-eth-txpool/src/pool/tracked/sequencer.rs +++ b/monad-eth-txpool/src/pool/sequencer.rs @@ -20,7 +20,6 @@ use std::{ use alloy_consensus::{transaction::Recovered, Transaction, TxEnvelope}; use alloy_primitives::Address; -use indexmap::IndexMap; use monad_chain_config::{revision::ChainRevision, ChainConfig}; use monad_consensus_types::block::{AccountBalanceState, BlockPolicyBlockValidator}; use monad_crypto::certificate_signature::{ @@ -34,8 +33,10 @@ use monad_validator::signature_collection::SignatureCollection; use rand::seq::SliceRandom; use tracing::{debug, error, trace, warn}; -use super::list::TrackedTxList; -use crate::pool::transaction::{ValidEthRecoveredAuthorization, ValidEthTransaction}; +use crate::pool::{ + tracked::TrackedTxList, + transaction::{ValidEthRecoveredAuthorization, ValidEthTransaction}, +}; #[derive(Debug, PartialEq, Eq)] struct OrderedTx<'a> { @@ -96,7 +97,7 @@ pub struct ProposalSequencer<'a> { impl<'a> ProposalSequencer<'a> { pub fn new( - tracked_txs: &'a IndexMap, + tracked_txs: impl Iterator, extending_blocks: &Vec<&EthValidatedBlock>, base_fee: u64, tx_limit: usize, @@ -107,7 +108,7 @@ impl<'a> ProposalSequencer<'a> { { let mut pending_nonce_usages = extending_blocks.get_nonce_usages().into_map(); - let mut heap_vec = Vec::with_capacity(tracked_txs.len()); + let mut heap_vec = Vec::default(); let mut virtual_time = 0; for (address, tx_list) in tracked_txs { @@ -139,6 +140,10 @@ impl<'a> ProposalSequencer<'a> { } } + pub fn is_empty(&self) -> bool { + self.heap.is_empty() + } + pub fn len(&self) -> usize { self.heap.len() } @@ -156,7 +161,6 @@ impl<'a> ProposalSequencer<'a> { pub fn build_proposal( mut self, - chain_id: u64, tx_limit: usize, proposal_gas_limit: u64, proposal_byte_limit: u64, diff --git a/monad-eth-txpool/src/pool/tracked/mod.rs b/monad-eth-txpool/src/pool/tracked/mod.rs index 468f7e82a4..849a8472a4 100644 --- a/monad-eth-txpool/src/pool/tracked/mod.rs +++ b/monad-eth-txpool/src/pool/tracked/mod.rs @@ -15,22 +15,19 @@ use std::{collections::BTreeMap, marker::PhantomData, time::Duration}; -use alloy_consensus::{transaction::Recovered, TxEnvelope}; use alloy_primitives::Address; use indexmap::{map::Entry as IndexMapEntry, IndexMap}; use itertools::Itertools; use monad_chain_config::{ execution_revision::MonadExecutionRevision, revision::ChainRevision, ChainConfig, }; -use monad_consensus_types::block::{ - BlockPolicyBlockValidator, BlockPolicyError, ConsensusBlockHeader, -}; +use monad_consensus_types::block::ConsensusBlockHeader; use monad_crypto::certificate_signature::{ CertificateSignaturePubKey, CertificateSignatureRecoverable, }; use monad_eth_block_policy::{ nonce_usage::{NonceUsage, NonceUsageMap}, - EthBlockPolicy, EthBlockPolicyBlockValidator, EthValidatedBlock, + EthBlockPolicy, }; use monad_eth_txpool_types::{EthTxPoolDropReason, EthTxPoolInternalDropReason}; use monad_eth_types::EthExecutionProtocol; @@ -39,7 +36,7 @@ use monad_types::{DropTimer, SeqNum}; use monad_validator::signature_collection::SignatureCollection; use tracing::{debug, error, info, warn}; -use self::{list::TrackedTxList, sequencer::ProposalSequencer}; +pub(super) use self::list::TrackedTxList; use super::{ pending::{PendingTxList, PendingTxMap}, transaction::ValidEthTransaction, @@ -47,7 +44,6 @@ use super::{ use crate::EthTxPoolEventTracker; mod list; -mod sequencer; // To produce 5k tx blocks, we need the tracked tx map to hold at least 15k addresses so that, after // pruning the txpool of up to 5k unique addresses in the last committed block update and up to 5k @@ -111,6 +107,10 @@ where self.txs.values().map(TrackedTxList::num_txs).sum() } + pub fn iter(&self) -> impl Iterator { + self.txs.iter() + } + pub fn iter_txs(&self) -> impl Iterator { self.txs.values().flat_map(TrackedTxList::iter) } @@ -140,101 +140,6 @@ where )) } - pub fn create_proposal( - &mut self, - event_tracker: &mut EthTxPoolEventTracker<'_>, - chain_id: u64, - proposed_seq_num: SeqNum, - base_fee: u64, - tx_limit: usize, - proposal_gas_limit: u64, - proposal_byte_limit: u64, - block_policy: &EthBlockPolicy, - extending_blocks: Vec<&EthValidatedBlock>, - state_backend: &SBT, - chain_config: &CCT, - chain_revision: &CRT, - execution_revision: &MonadExecutionRevision, - ) -> Result>, BlockPolicyError> { - let _timer = DropTimer::start(Duration::ZERO, |elapsed| { - debug!(?elapsed, "txpool create_proposal"); - }); - - if self.txs.is_empty() || tx_limit == 0 { - return Ok(Vec::new()); - } - - let sequencer = ProposalSequencer::new(&self.txs, &extending_blocks, base_fee, tx_limit); - let sequencer_len = sequencer.len(); - - let (account_balances, state_backend_lookups) = { - let _timer = DropTimer::start(Duration::ZERO, |elapsed| { - debug!( - ?elapsed, - "txpool create_proposal compute account base balances" - ); - }); - - let total_db_lookups_before = state_backend.total_db_lookups(); - - ( - block_policy.compute_account_base_balances( - proposed_seq_num, - state_backend, - chain_config, - Some(&extending_blocks), - sequencer.addresses(), - )?, - state_backend.total_db_lookups() - total_db_lookups_before, - ) - }; - - info!( - addresses = self.txs.len(), - num_txs = self.num_txs(), - sequencer_len, - account_balances = account_balances.len(), - ?state_backend_lookups, - "txpool sequencing transactions" - ); - - let validator = EthBlockPolicyBlockValidator::new( - proposed_seq_num, - block_policy.get_execution_delay(), - base_fee, - chain_revision, - execution_revision, - )?; - - let proposal = sequencer.build_proposal( - chain_id, - tx_limit, - proposal_gas_limit, - proposal_byte_limit, - chain_config, - account_balances, - validator, - ); - - let proposal_num_txs = proposal.txs.len(); - - event_tracker.record_create_proposal( - self.num_addresses(), - sequencer_len, - state_backend_lookups, - proposal_num_txs, - ); - - info!( - ?proposed_seq_num, - ?proposal_num_txs, - proposal_total_gas = proposal.total_gas, - "created proposal" - ); - - Ok(proposal.txs) - } - pub fn try_promote_pending( &mut self, event_tracker: &mut EthTxPoolEventTracker<'_>, From db544e1080356de088f5636bbfba8e17c1712ddf Mon Sep 17 00:00:00 2001 From: Andre Benedito <38482987+andr-dev@users.noreply.github.com> Date: Thu, 23 Oct 2025 13:49:00 -0400 Subject: [PATCH 2/4] Remove unused min_promotable param from tracked txpool --- monad-eth-txpool/src/pool/mod.rs | 2 -- monad-eth-txpool/src/pool/tracked/mod.rs | 5 ----- 2 files changed, 7 deletions(-) diff --git a/monad-eth-txpool/src/pool/mod.rs b/monad-eth-txpool/src/pool/mod.rs index beda97a190..a9fe1934a7 100644 --- a/monad-eth-txpool/src/pool/mod.rs +++ b/monad-eth-txpool/src/pool/mod.rs @@ -232,7 +232,6 @@ where block_policy, state_backend, &mut self.pending, - 0, INSERT_TXS_MAX_PROMOTE, ) && self.pending.is_at_promote_txs_watermark() { @@ -259,7 +258,6 @@ where block_policy, state_backend, &mut self.pending, - 0, PENDING_MAX_PROMOTE, ) { warn!("txpool failed to promote during promote_pending call"); diff --git a/monad-eth-txpool/src/pool/tracked/mod.rs b/monad-eth-txpool/src/pool/tracked/mod.rs index 849a8472a4..9e414dcf57 100644 --- a/monad-eth-txpool/src/pool/tracked/mod.rs +++ b/monad-eth-txpool/src/pool/tracked/mod.rs @@ -147,17 +147,12 @@ where block_policy: &EthBlockPolicy, state_backend: &SBT, pending: &mut PendingTxMap, - min_promotable: usize, max_promotable: usize, ) -> bool { let Some(insertable) = MAX_ADDRESSES.checked_sub(self.txs.len()) else { return false; }; - if insertable < min_promotable { - return true; - } - let insertable = insertable.min(max_promotable); if insertable == 0 { From 07f4e9bb6f4c19995d920b4d2877980c679f8070 Mon Sep 17 00:00:00 2001 From: Andre Benedito <38482987+andr-dev@users.noreply.github.com> Date: Fri, 19 Sep 2025 12:02:50 -0400 Subject: [PATCH 3/4] Remove pending pool from txpool --- monad-eth-txpool-executor/src/lib.rs | 20 -- monad-eth-txpool-executor/tests/executor.rs | 6 +- monad-eth-txpool-types/src/lib.rs | 11 +- monad-eth-txpool/src/event_tracker.rs | 148 +-------------- monad-eth-txpool/src/metrics.rs | 33 ---- monad-eth-txpool/src/pool/mod.rs | 162 +++++++--------- monad-eth-txpool/src/pool/pending/list.rs | 131 ------------- monad-eth-txpool/src/pool/pending/mod.rs | 167 ----------------- monad-eth-txpool/src/pool/tracked/list.rs | 65 ++++--- monad-eth-txpool/src/pool/tracked/mod.rs | 181 ++++-------------- monad-eth-txpool/tests/pool.rs | 8 +- monad-rpc/src/handlers/eth/txn.rs | 4 +- monad-rpc/src/txpool/state.rs | 194 ++------------------ monad-rpc/src/txpool/types.rs | 1 - monad-rpc/src/vpool.rs | 1 - 15 files changed, 159 insertions(+), 973 deletions(-) delete mode 100644 monad-eth-txpool/src/pool/pending/list.rs delete mode 100644 monad-eth-txpool/src/pool/pending/mod.rs diff --git a/monad-eth-txpool-executor/src/lib.rs b/monad-eth-txpool-executor/src/lib.rs index 7aff5bc933..1296be7d05 100644 --- a/monad-eth-txpool-executor/src/lib.rs +++ b/monad-eth-txpool-executor/src/lib.rs @@ -60,8 +60,6 @@ mod metrics; mod preload; mod reset; -const PROMOTE_PENDING_INTERVAL_MS: u64 = 2; - pub struct EthTxPoolExecutor where ST: CertificateSignatureRecoverable, @@ -83,7 +81,6 @@ where forwarding_manager: Pin>, preload_manager: Pin>, - promote_pending_timer: tokio::time::Interval, metrics: Arc, executor_metrics: ExecutorMetrics, @@ -138,11 +135,6 @@ where { let metrics = metrics.clone(); - let mut promote_pending_timer = - tokio::time::interval(Duration::from_millis(PROMOTE_PENDING_INTERVAL_MS)); - promote_pending_timer - .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); - move |command_rx, event_tx| { let pool = EthTxPool::new( soft_tx_expiry, @@ -166,7 +158,6 @@ where forwarding_manager: Box::pin(EthTxPoolForwardingManager::default()), preload_manager: Box::pin(EthTxPoolPreloadManager::default()), - promote_pending_timer, metrics, executor_metrics, @@ -493,7 +484,6 @@ where forwarding_manager, preload_manager, - promote_pending_timer, metrics, executor_metrics, @@ -619,16 +609,6 @@ where forwarding_manager.as_mut().complete_ingress(); } - while promote_pending_timer.poll_tick(cx).is_ready() { - pool.promote_pending( - &mut EthTxPoolEventTracker::new(&metrics.pool, &mut ipc_events), - block_policy, - state_backend, - ); - - promote_pending_timer.reset(); - } - while let Poll::Ready((predicted_proposal_seqnum, addresses)) = preload_manager.as_mut().poll_requests(cx) { diff --git a/monad-eth-txpool-executor/tests/executor.rs b/monad-eth-txpool-executor/tests/executor.rs index e514e9bdba..4f5ca2203e 100644 --- a/monad-eth-txpool-executor/tests/executor.rs +++ b/monad-eth-txpool-executor/tests/executor.rs @@ -100,11 +100,9 @@ async fn setup_txpool_executor_with_client() -> ( )], }]); - let (ipc_client, EthTxPoolSnapshot { pending, tracked }) = - EthTxPoolIpcClient::new(bind_path).await.unwrap(); + let (ipc_client, EthTxPoolSnapshot { txs }) = EthTxPoolIpcClient::new(bind_path).await.unwrap(); - assert!(pending.is_empty()); - assert!(tracked.is_empty()); + assert!(txs.is_empty()); (txpool_executor, ipc_client) } diff --git a/monad-eth-txpool-types/src/lib.rs b/monad-eth-txpool-types/src/lib.rs index 963d48e147..751f6e72bf 100644 --- a/monad-eth-txpool-types/src/lib.rs +++ b/monad-eth-txpool-types/src/lib.rs @@ -26,12 +26,8 @@ pub struct EthTxPoolEvent { #[derive(Clone, Debug, Serialize, Deserialize)] pub enum EthTxPoolEventType { - /// The tx was inserted into the txpool's (pending/tracked) tx list. - Insert { - address: Address, - owned: bool, - tracked: bool, - }, + /// The tx was inserted into the txpool. + Insert { address: Address, owned: bool }, /// The tx was committed and is thus finalized. Commit, @@ -118,6 +114,5 @@ pub enum EthTxPoolEvictReason { #[derive(Debug, Serialize, Deserialize)] pub struct EthTxPoolSnapshot { - pub pending: HashSet, - pub tracked: HashSet, + pub txs: HashSet, } diff --git a/monad-eth-txpool/src/event_tracker.rs b/monad-eth-txpool/src/event_tracker.rs index 0d7f4b7948..7f12e5a9a9 100644 --- a/monad-eth-txpool/src/event_tracker.rs +++ b/monad-eth-txpool/src/event_tracker.rs @@ -43,7 +43,7 @@ impl<'a> EthTxPoolEventTracker<'a> { } } - pub fn insert_pending(&mut self, tx: &Recovered, owned: bool) { + pub fn insert(&mut self, tx: &Recovered, owned: bool) { if owned { self.metrics.insert_owned_txs.fetch_add(1, Ordering::SeqCst); } else { @@ -57,64 +57,11 @@ impl<'a> EthTxPoolEventTracker<'a> { EthTxPoolEventType::Insert { address: tx.signer(), owned, - tracked: false, }, ); } - pub fn insert_tracked(&mut self, tx: &Recovered, owned: bool) { - if owned { - self.metrics.insert_owned_txs.fetch_add(1, Ordering::SeqCst); - } else { - self.metrics - .insert_forwarded_txs - .fetch_add(1, Ordering::SeqCst); - } - - self.events.insert( - *tx.tx_hash(), - EthTxPoolEventType::Insert { - address: tx.signer(), - owned, - tracked: true, - }, - ); - } - - pub fn replace_pending( - &mut self, - address: &Address, - old_tx_hash: TxHash, - new_tx_hash: TxHash, - new_owned: bool, - ) { - if new_owned { - self.metrics.insert_owned_txs.fetch_add(1, Ordering::SeqCst); - } else { - self.metrics - .insert_forwarded_txs - .fetch_add(1, Ordering::SeqCst); - } - - self.events.insert( - old_tx_hash, - EthTxPoolEventType::Drop { - reason: EthTxPoolDropReason::ReplacedByHigherPriority { - replacement: new_tx_hash, - }, - }, - ); - self.events.insert( - new_tx_hash, - EthTxPoolEventType::Insert { - address: *address, - owned: new_owned, - tracked: false, - }, - ); - } - - pub fn replace_tracked( + pub fn replace( &mut self, address: &Address, old_tx_hash: TxHash, @@ -142,7 +89,6 @@ impl<'a> EthTxPoolEventTracker<'a> { EthTxPoolEventType::Insert { address: *address, owned: new_owned, - tracked: true, }, ); } @@ -216,80 +162,6 @@ impl<'a> EthTxPoolEventTracker<'a> { } } - pub fn pending_promote<'b>( - &mut self, - txs: impl Iterator)>, - ) { - self.metrics - .pending - .promote_addresses - .fetch_add(1, Ordering::SeqCst); - - for (owned, tx) in txs { - self.metrics - .pending - .promote_txs - .fetch_add(1, Ordering::SeqCst); - - self.events.insert( - *tx.tx_hash(), - EthTxPoolEventType::Insert { - address: tx.signer(), - owned, - tracked: true, - }, - ); - } - } - - pub fn pending_drop_unknown(&mut self, tx_hashes: impl Iterator) { - self.metrics - .pending - .drop_unknown_addresses - .fetch_add(1, Ordering::SeqCst); - - for tx_hash in tx_hashes { - self.metrics - .pending - .drop_unknown_txs - .fetch_add(1, Ordering::SeqCst); - - self.events.insert( - tx_hash, - EthTxPoolEventType::Drop { - reason: EthTxPoolDropReason::InsufficientBalance, - }, - ); - } - } - - pub fn pending_drop_low_nonce( - &mut self, - address: bool, - tx_hashes: impl Iterator, - ) { - if address { - self.metrics - .pending - .drop_low_nonce_addresses - .fetch_add(1, Ordering::SeqCst); - } - - for tx_hash in tx_hashes { - self.metrics - .pending - .drop_low_nonce_txs - .fetch_add(1, Ordering::SeqCst); - - self.events.insert( - tx_hash, - EthTxPoolEventType::Drop { - reason: EthTxPoolDropReason::NonceTooLow, - }, - ); - } - } - pub fn tracked_commit(&mut self, address: bool, tx_hashes: impl Iterator) { if address { self.metrics @@ -335,21 +207,7 @@ impl<'a> EthTxPoolEventTracker<'a> { } } - pub fn update_aggregate_metrics( - &mut self, - pending_addresses: u64, - pending_txs: u64, - tracked_addresses: u64, - tracked_txs: u64, - ) { - self.metrics - .pending - .addresses - .store(pending_addresses, Ordering::SeqCst); - self.metrics - .pending - .txs - .store(pending_txs, Ordering::SeqCst); + pub fn update_aggregate_metrics(&mut self, tracked_addresses: u64, tracked_txs: u64) { self.metrics .tracked .addresses diff --git a/monad-eth-txpool/src/metrics.rs b/monad-eth-txpool/src/metrics.rs index e6dd3d52e1..5aaa60b01e 100644 --- a/monad-eth-txpool/src/metrics.rs +++ b/monad-eth-txpool/src/metrics.rs @@ -41,7 +41,6 @@ pub struct EthTxPoolMetrics { pub create_proposal_available_addresses: AtomicU64, pub create_proposal_backend_lookups: AtomicU64, - pub pending: EthTxpoolPendingMetrics, pub tracked: EthTxPoolTrackedMetrics, } @@ -87,42 +86,10 @@ impl EthTxPoolMetrics { metrics["monad.bft.txpool.pool.create_proposal_backend_lookups"] = self.create_proposal_backend_lookups.load(Ordering::SeqCst); - self.pending.update(metrics); self.tracked.update(metrics); } } -#[derive(Debug, Default, Serialize, Deserialize)] -pub struct EthTxpoolPendingMetrics { - pub addresses: AtomicU64, - pub txs: AtomicU64, - pub promote_addresses: AtomicU64, - pub promote_txs: AtomicU64, - pub drop_unknown_addresses: AtomicU64, - pub drop_unknown_txs: AtomicU64, - pub drop_low_nonce_addresses: AtomicU64, - pub drop_low_nonce_txs: AtomicU64, -} - -impl EthTxpoolPendingMetrics { - pub fn update(&self, metrics: &mut ExecutorMetrics) { - metrics["monad.bft.txpool.pool.pending.addresses"] = self.addresses.load(Ordering::SeqCst); - metrics["monad.bft.txpool.pool.pending.txs"] = self.txs.load(Ordering::SeqCst); - metrics["monad.bft.txpool.pool.pending.promote_addresses"] = - self.promote_addresses.load(Ordering::SeqCst); - metrics["monad.bft.txpool.pool.pending.promote_txs"] = - self.promote_txs.load(Ordering::SeqCst); - metrics["monad.bft.txpool.pool.pending.drop_unknown_addresses"] = - self.drop_unknown_addresses.load(Ordering::SeqCst); - metrics["monad.bft.txpool.pool.pending.drop_unknown_txs"] = - self.drop_unknown_txs.load(Ordering::SeqCst); - metrics["monad.bft.txpool.pool.pending.drop_low_nonce_addresses"] = - self.drop_low_nonce_addresses.load(Ordering::SeqCst); - metrics["monad.bft.txpool.pool.pending.drop_low_nonce_txs"] = - self.drop_low_nonce_txs.load(Ordering::SeqCst); - } -} - #[derive(Debug, Default, Serialize, Deserialize)] pub struct EthTxPoolTrackedMetrics { pub addresses: AtomicU64, diff --git a/monad-eth-txpool/src/pool/mod.rs b/monad-eth-txpool/src/pool/mod.rs index a9fe1934a7..400a1b0e61 100644 --- a/monad-eth-txpool/src/pool/mod.rs +++ b/monad-eth-txpool/src/pool/mod.rs @@ -48,24 +48,13 @@ use rayon::iter::{IntoParallelIterator, ParallelIterator}; use tracing::{debug, error, info, warn}; pub use self::transaction::max_eip2718_encoded_length; -use self::{ - pending::PendingTxMap, sequencer::ProposalSequencer, tracked::TrackedTxMap, - transaction::ValidEthTransaction, -}; +use self::{sequencer::ProposalSequencer, tracked::TrackedTxMap, transaction::ValidEthTransaction}; use crate::EthTxPoolEventTracker; -mod pending; mod sequencer; mod tracked; mod transaction; -// This constants controls the maximum number of addresses that get promoted during the tx insertion -// process. It was set based on intuition and should be changed once we have more data on txpool -// performance. -// Each account lookup takes about 30us so this should block the thread for at most roughly 8ms. -const INSERT_TXS_MAX_PROMOTE: usize = 128; -const PENDING_MAX_PROMOTE: usize = 128; - #[derive(Clone, Debug)] pub struct EthTxPool where @@ -75,7 +64,6 @@ where CCT: ChainConfig, CRT: ChainRevision, { - pending: PendingTxMap, tracked: TrackedTxMap, last_commit: Option>, @@ -105,7 +93,6 @@ where do_local_insert: bool, ) -> Self { Self { - pending: PendingTxMap::default(), tracked: TrackedTxMap::new(soft_tx_expiry, hard_tx_expiry), last_commit: None, @@ -119,14 +106,11 @@ where } pub fn is_empty(&self) -> bool { - self.pending.is_empty() && self.tracked.is_empty() + self.tracked.is_empty() } pub fn num_txs(&self) -> usize { - self.pending - .num_txs() - .checked_add(self.tracked.num_txs()) - .expect("pool size does not overflow") + self.tracked.num_txs() } pub fn current_revision(&self) -> (&CRT, &MonadExecutionRevision) { @@ -178,18 +162,21 @@ where // the range at N-k+1. let block_seq_num = block_policy.get_last_commit() + SeqNum(1); - let addresses = txs.iter().map(ValidEthTransaction::signer).collect_vec(); + let account_balance_addresses = txs.iter().map(ValidEthTransaction::signer).collect_vec(); let account_balances = match block_policy.compute_account_base_balances( block_seq_num, state_backend, chain_config, None, - addresses.iter(), + account_balance_addresses.iter(), ) { Ok(account_balances) => account_balances, Err(err) => { - warn!(?err, "failed to insert transactions"); + warn!( + ?err, + "failed to insert transactions at account_balance lookups" + ); event_tracker.drop_all( txs.into_iter().map(ValidEthTransaction::into_raw), EthTxPoolDropReason::Internal(EthTxPoolInternalDropReason::StateBackendError), @@ -200,70 +187,70 @@ where let last_commit_base_fee = last_commit.execution_inputs.base_fee_per_gas; - for tx in txs { - if account_balances - .get(tx.signer_ref()) - .is_none_or(|account_balance_state| { - account_balance_state.balance - < last_commit_base_fee.saturating_mul(tx.gas_limit()) - }) - { - event_tracker.drop(tx.hash(), EthTxPoolDropReason::InsufficientBalance); - continue; + let txs = txs + .into_iter() + .filter(|tx| { + if account_balances + .get(tx.signer_ref()) + .is_none_or(|account_balance_state| { + account_balance_state.balance + < last_commit_base_fee.saturating_mul(tx.gas_limit()) + }) + { + event_tracker.drop(tx.hash(), EthTxPoolDropReason::InsufficientBalance); + return false; + } + + true + }) + .into_group_map_by(|tx| tx.signer()); + + let account_nonce_addresses = txs.keys().cloned().collect_vec(); + + let mut account_nonces = match block_policy.get_account_base_nonces( + block_seq_num, + state_backend, + &vec![], + account_nonce_addresses.iter(), + ) { + Ok(account_nonces) => account_nonces, + Err(err) => { + warn!( + ?err, + "failed to insert transactions at account_nonce lookups" + ); + event_tracker.drop_all( + txs.into_values() + .flatten() + .map(ValidEthTransaction::into_raw), + EthTxPoolDropReason::Internal(EthTxPoolInternalDropReason::StateBackendError), + ); + return; } + }; - let Some(tx) = self - .tracked - .try_insert_tx(event_tracker, last_commit, tx) - .unwrap_or_else(|tx| { - self.pending - .try_insert_tx(event_tracker, tx, last_commit_base_fee) - }) - else { + for (address, txs) in txs { + let Some(account_nonce) = account_nonces.remove(&address) else { + event_tracker.drop_all( + txs.into_iter().map(ValidEthTransaction::into_raw), + EthTxPoolDropReason::Internal(EthTxPoolInternalDropReason::StateBackendError), + ); continue; }; - on_insert(tx); - } - - if !self.tracked.try_promote_pending( - event_tracker, - last_commit, - block_policy, - state_backend, - &mut self.pending, - INSERT_TXS_MAX_PROMOTE, - ) && self.pending.is_at_promote_txs_watermark() - { - warn!("txpool failed to promote at pending promote txs watermark"); + self.tracked.try_insert_txs( + event_tracker, + last_commit, + address, + txs, + account_nonce, + &mut on_insert, + ); } self.update_aggregate_metrics(event_tracker); } - pub fn promote_pending( - &mut self, - event_tracker: &mut EthTxPoolEventTracker<'_>, - block_policy: &EthBlockPolicy, - state_backend: &SBT, - ) { - let Some(last_commit) = self.last_commit.as_ref() else { - warn!("txpool promote_pending called before last committed block set"); - return; - }; - - if !self.tracked.try_promote_pending( - event_tracker, - last_commit, - block_policy, - state_backend, - &mut self.pending, - PENDING_MAX_PROMOTE, - ) { - warn!("txpool failed to promote during promote_pending call"); - } - } - pub fn create_proposal( &mut self, event_tracker: &mut EthTxPoolEventTracker<'_>, @@ -461,11 +448,8 @@ where self.static_validate_all_txs(event_tracker); } - self.tracked.update_committed_nonce_usages( - event_tracker, - committed_block.nonce_usages, - &mut self.pending, - ); + self.tracked + .update_committed_nonce_usages(event_tracker, committed_block.nonce_usages); self.tracked.evict_expired_txs(event_tracker); @@ -509,12 +493,6 @@ where &self.chain_revision, &self.execution_revision, ); - self.pending.static_validate_all_txs( - event_tracker, - self.chain_id, - &self.chain_revision, - &self.execution_revision, - ); } pub fn get_forwardable_txs( @@ -535,8 +513,6 @@ where fn update_aggregate_metrics(&self, event_tracker: &mut EthTxPoolEventTracker<'_>) { event_tracker.update_aggregate_metrics( - self.pending.num_addresses() as u64, - self.pending.num_txs() as u64, self.tracked.num_addresses() as u64, self.tracked.num_txs() as u64, ); @@ -544,12 +520,7 @@ where pub fn generate_snapshot(&self) -> EthTxPoolSnapshot { EthTxPoolSnapshot { - pending: self - .pending - .iter_txs() - .map(ValidEthTransaction::hash) - .collect(), - tracked: self + txs: self .tracked .iter_txs() .map(ValidEthTransaction::hash) @@ -561,7 +532,6 @@ where self.tracked .iter_txs() .map(ValidEthTransaction::signer) - .chain(self.pending.iter_txs().map(ValidEthTransaction::signer)) .unique() .collect() } diff --git a/monad-eth-txpool/src/pool/pending/list.rs b/monad-eth-txpool/src/pool/pending/list.rs deleted file mode 100644 index ab8d33044f..0000000000 --- a/monad-eth-txpool/src/pool/pending/list.rs +++ /dev/null @@ -1,131 +0,0 @@ -// Copyright (C) 2025 Category Labs, Inc. -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -use std::collections::{btree_map::Entry, BTreeMap}; - -use alloy_primitives::Address; -use monad_chain_config::{execution_revision::MonadExecutionRevision, revision::ChainRevision}; -use monad_eth_txpool_types::EthTxPoolDropReason; -use monad_types::Nonce; - -use crate::{pool::transaction::ValidEthTransaction, EthTxPoolEventTracker}; - -/// This struct ensures at the type level that nonce_map is never empty, a property used to enforce -/// that every address in the PendingTxMap has an associated pending transaction. -#[derive(Clone, Debug, Default)] -pub struct PendingTxList { - nonce_map: BTreeMap, -} - -impl PendingTxList { - pub fn iter(&self) -> impl Iterator { - self.nonce_map.values() - } - - pub fn iter_mut(&mut self) -> impl Iterator { - self.nonce_map.values_mut() - } - - pub fn insert_entry<'a>( - event_tracker: &mut EthTxPoolEventTracker<'_>, - entry: indexmap::map::VacantEntry<'a, Address, Self>, - tx: ValidEthTransaction, - ) -> &'a ValidEthTransaction { - let mut nonce_map = BTreeMap::new(); - - let nonce = tx.nonce(); - - event_tracker.insert_pending(tx.raw(), tx.is_owned()); - nonce_map.insert(nonce, tx); - - let entry = entry.insert(Self { nonce_map }); - - entry.nonce_map.get(&nonce).unwrap() - } - - pub fn num_txs(&self) -> usize { - self.nonce_map.len() - } - - /// Produces a reference to the tx if it is present in the tx list after attempting to insert - /// it along with a boolean indicating if it was newly added, ie. does not replace an existing - /// tx. - pub fn try_insert_tx( - &mut self, - event_tracker: &mut EthTxPoolEventTracker<'_>, - tx: ValidEthTransaction, - last_commit_base_fee: u64, - ) -> Option<(&ValidEthTransaction, bool)> { - match self.nonce_map.entry(tx.nonce()) { - Entry::Vacant(v) => { - event_tracker.insert_pending(tx.raw(), tx.is_owned()); - Some((v.insert(tx), true)) - } - Entry::Occupied(mut entry) => { - let existing_tx = entry.get(); - - if tx.has_higher_priority(existing_tx, last_commit_base_fee) { - event_tracker.replace_pending( - tx.signer_ref(), - existing_tx.hash(), - tx.hash(), - tx.is_owned(), - ); - entry.insert(tx); - - Some((entry.into_mut(), false)) - } else { - event_tracker.drop(tx.hash(), EthTxPoolDropReason::ExistingHigherPriority); - - None - } - } - } - } - - pub fn into_map(self) -> BTreeMap { - self.nonce_map - } - - pub fn static_validate_all_txs( - &mut self, - event_tracker: &mut EthTxPoolEventTracker<'_>, - chain_id: u64, - chain_revision: &CRT, - execution_revision: &MonadExecutionRevision, - ) -> usize - where - CRT: ChainRevision, - { - let mut removed = 0; - - self.nonce_map.retain(|_, tx| { - let Err(error) = tx.static_validate( - chain_id, - chain_revision.chain_params(), - execution_revision.execution_chain_params(), - ) else { - return true; - }; - - event_tracker.drop(tx.hash(), EthTxPoolDropReason::NotWellFormed(error)); - removed += 1; - - false - }); - - removed - } -} diff --git a/monad-eth-txpool/src/pool/pending/mod.rs b/monad-eth-txpool/src/pool/pending/mod.rs deleted file mode 100644 index 72fe057b6b..0000000000 --- a/monad-eth-txpool/src/pool/pending/mod.rs +++ /dev/null @@ -1,167 +0,0 @@ -// Copyright (C) 2025 Category Labs, Inc. -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -use alloy_primitives::Address; -use indexmap::IndexMap; -use monad_chain_config::{execution_revision::MonadExecutionRevision, revision::ChainRevision}; -use monad_eth_txpool_types::EthTxPoolDropReason; -use tracing::warn; - -pub use self::list::PendingTxList; -use super::transaction::ValidEthTransaction; -use crate::EthTxPoolEventTracker; - -mod list; - -// These constants were set using intuition and should be changed once we have more performance -// numbers for the txpool. -const MAX_ADDRESSES: usize = 8 * 1024; -const MAX_TXS: usize = 16 * 1024; -const PROMOTE_TXS_WATERMARK: usize = MAX_TXS * 3 / 4; - -/// Wrapper type to store byte-validated transactions and quickly query the total number of -/// transactions in the txs map. -#[derive(Clone, Debug, Default)] -pub struct PendingTxMap { - txs: IndexMap, - num_txs: usize, -} - -impl PendingTxMap { - pub fn is_empty(&self) -> bool { - self.txs.is_empty() - } - - pub fn num_addresses(&self) -> usize { - self.txs.len() - } - - pub fn num_txs(&self) -> usize { - self.num_txs - } - - pub fn is_at_promote_txs_watermark(&self) -> bool { - self.num_txs >= PROMOTE_TXS_WATERMARK - } - - pub fn iter_txs(&self) -> impl Iterator { - self.txs.values().flat_map(PendingTxList::iter) - } - - pub fn iter_mut_txs(&mut self) -> impl Iterator { - self.txs.values_mut().flat_map(PendingTxList::iter_mut) - } - - pub fn try_insert_tx( - &mut self, - event_tracker: &mut EthTxPoolEventTracker<'_>, - tx: ValidEthTransaction, - last_commit_base_fee: u64, - ) -> Option<&ValidEthTransaction> { - if self.num_txs >= MAX_TXS { - event_tracker.drop(tx.hash(), EthTxPoolDropReason::PoolFull); - return None; - } - - let num_addresses = self.txs.len(); - assert!(num_addresses <= MAX_ADDRESSES); - - match self.txs.entry(tx.signer()) { - indexmap::map::Entry::Occupied(tx_list) => { - let (tx, new_tx) = - tx_list - .into_mut() - .try_insert_tx(event_tracker, tx, last_commit_base_fee)?; - - if new_tx { - self.num_txs += 1; - } - - Some(tx) - } - indexmap::map::Entry::Vacant(v) => { - if num_addresses == MAX_ADDRESSES { - event_tracker.drop(tx.hash(), EthTxPoolDropReason::PoolFull); - return None; - } - - let tx = PendingTxList::insert_entry(event_tracker, v, tx); - self.num_txs += 1; - - Some(tx) - } - } - } - - pub fn remove(&mut self, address: &Address) -> Option { - if let Some(tx_list) = self.txs.swap_remove(address) { - self.num_txs = self - .num_txs - .checked_sub(tx_list.num_txs()) - .unwrap_or_else(|| { - warn!("txpool pending tx map underflowed on remove"); - - 0 - }); - - return Some(tx_list); - } - - None - } - - pub fn split_off(&mut self, num_addresses: usize) -> IndexMap { - if num_addresses >= self.txs.len() { - self.num_txs = 0; - return std::mem::take(&mut self.txs); - } - - let mut split = self.txs.split_off(num_addresses); - std::mem::swap(&mut split, &mut self.txs); - - self.num_txs = self - .num_txs - .checked_sub(split.values().map(PendingTxList::num_txs).sum()) - .expect("num txs does not underflow"); - - split - } - - pub fn static_validate_all_txs( - &mut self, - event_tracker: &mut EthTxPoolEventTracker<'_>, - chain_id: u64, - chain_revision: &CRT, - execution_revision: &MonadExecutionRevision, - ) where - CRT: ChainRevision, - { - self.txs.retain(|_, tx_list| { - let removed = tx_list.static_validate_all_txs( - event_tracker, - chain_id, - chain_revision, - execution_revision, - ); - - self.num_txs = self - .num_txs - .checked_sub(removed) - .expect("num txs does not underflow"); - - tx_list.num_txs() > 0 - }); - } -} diff --git a/monad-eth-txpool/src/pool/tracked/list.rs b/monad-eth-txpool/src/pool/tracked/list.rs index 09fed1f1a8..b906de9882 100644 --- a/monad-eth-txpool/src/pool/tracked/list.rs +++ b/monad-eth-txpool/src/pool/tracked/list.rs @@ -19,16 +19,14 @@ use std::{ }; use alloy_primitives::Address; +use indexmap::map::VacantEntry; use monad_chain_config::{execution_revision::MonadExecutionRevision, revision::ChainRevision}; use monad_eth_block_policy::nonce_usage::NonceUsage; use monad_eth_txpool_types::EthTxPoolDropReason; use monad_types::Nonce; use tracing::error; -use crate::{ - pool::{pending::PendingTxList, transaction::ValidEthTransaction}, - EthTxPoolEventTracker, -}; +use crate::{pool::transaction::ValidEthTransaction, EthTxPoolEventTracker}; /// Stores byte-validated transactions alongside the an account_nonce to enforce at the type level /// that all the transactions in the txs map have a nonce at least account_nonce. Similar to @@ -41,41 +39,40 @@ pub struct TrackedTxList { } impl TrackedTxList { - pub fn iter(&self) -> impl Iterator { - self.txs.values().map(|(tx, _)| tx) - } - - pub fn iter_mut(&mut self) -> impl Iterator { - self.txs.values_mut().map(|(tx, _)| tx) - } - - pub fn new_from_promote_pending( + pub fn try_new( + this_entry: VacantEntry<'_, Address, Self>, event_tracker: &mut EthTxPoolEventTracker<'_>, + txs: Vec, account_nonce: u64, - tx_list: PendingTxList, - ) -> Option { - let mut tx_list = tx_list.into_map(); - - let txs = tx_list.split_off(&account_nonce); + on_insert: &mut impl FnMut(&ValidEthTransaction), + last_commit_base_fee: u64, + tx_expiry: Duration, + ) { + let mut this = TrackedTxList { + account_nonce, + txs: BTreeMap::default(), + }; - event_tracker.pending_drop_low_nonce( - txs.is_empty(), - tx_list.values().map(ValidEthTransaction::hash), - ); + for tx in txs { + if let Some(tx) = this.try_insert_tx(event_tracker, tx, last_commit_base_fee, tx_expiry) + { + on_insert(tx); + } + } - if txs.is_empty() { - return None; + if this.txs.is_empty() { + return; } - event_tracker.pending_promote(txs.values().map(|tx| (tx.is_owned(), tx.raw()))); + this_entry.insert(this); + } - Some(Self { - account_nonce, - txs: txs - .into_iter() - .map(|(nonce, tx)| (nonce, (tx, event_tracker.now))) - .collect(), - }) + pub fn iter(&self) -> impl Iterator { + self.txs.values().map(|(tx, _)| tx) + } + + pub fn iter_mut(&mut self) -> impl Iterator { + self.txs.values_mut().map(|(tx, _)| tx) } pub fn num_txs(&self) -> usize { @@ -119,7 +116,7 @@ impl TrackedTxList { match self.txs.entry(tx.nonce()) { btree_map::Entry::Vacant(v) => { - event_tracker.insert_pending(tx.raw(), tx.is_owned()); + event_tracker.insert(tx.raw(), tx.is_owned()); Some(&v.insert((tx, event_tracker.now)).0) } btree_map::Entry::Occupied(mut entry) => { @@ -128,7 +125,7 @@ impl TrackedTxList { if tx_expired(existing_tx_insert_time, tx_expiry, &event_tracker.now) || tx.has_higher_priority(existing_tx, last_commit_base_fee) { - event_tracker.replace_tracked( + event_tracker.replace( tx.signer_ref(), existing_tx.hash(), tx.hash(), diff --git a/monad-eth-txpool/src/pool/tracked/mod.rs b/monad-eth-txpool/src/pool/tracked/mod.rs index 9e414dcf57..7a5bd4f467 100644 --- a/monad-eth-txpool/src/pool/tracked/mod.rs +++ b/monad-eth-txpool/src/pool/tracked/mod.rs @@ -13,11 +13,10 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use std::{collections::BTreeMap, marker::PhantomData, time::Duration}; +use std::{marker::PhantomData, time::Duration}; use alloy_primitives::Address; use indexmap::{map::Entry as IndexMapEntry, IndexMap}; -use itertools::Itertools; use monad_chain_config::{ execution_revision::MonadExecutionRevision, revision::ChainRevision, ChainConfig, }; @@ -25,22 +24,14 @@ use monad_consensus_types::block::ConsensusBlockHeader; use monad_crypto::certificate_signature::{ CertificateSignaturePubKey, CertificateSignatureRecoverable, }; -use monad_eth_block_policy::{ - nonce_usage::{NonceUsage, NonceUsageMap}, - EthBlockPolicy, -}; -use monad_eth_txpool_types::{EthTxPoolDropReason, EthTxPoolInternalDropReason}; +use monad_eth_block_policy::nonce_usage::NonceUsageMap; use monad_eth_types::EthExecutionProtocol; use monad_state_backend::StateBackend; -use monad_types::{DropTimer, SeqNum}; use monad_validator::signature_collection::SignatureCollection; -use tracing::{debug, error, info, warn}; +use tracing::info; pub(super) use self::list::TrackedTxList; -use super::{ - pending::{PendingTxList, PendingTxMap}, - transaction::ValidEthTransaction, -}; +use super::transaction::ValidEthTransaction; use crate::EthTxPoolEventTracker; mod list; @@ -119,161 +110,55 @@ where self.txs.values_mut().flat_map(TrackedTxList::iter_mut) } - /// Produces a reference to the tx if it was inserted, producing None when the tx signer was - /// tracked but the tx was not inserted. If the tx signer is not tracked or the tracked pool is - /// not ready to accept txs, an error is produced with the original tx. - pub fn try_insert_tx( - &mut self, - event_tracker: &mut EthTxPoolEventTracker<'_>, - last_commit: &ConsensusBlockHeader, - tx: ValidEthTransaction, - ) -> Result, ValidEthTransaction> { - let Some(tx_list) = self.txs.get_mut(tx.signer_ref()) else { - return Err(tx); - }; - - Ok(tx_list.try_insert_tx( - event_tracker, - tx, - last_commit.execution_inputs.base_fee_per_gas, - self.hard_tx_expiry, - )) - } - - pub fn try_promote_pending( + pub fn try_insert_txs( &mut self, event_tracker: &mut EthTxPoolEventTracker<'_>, last_commit: &ConsensusBlockHeader, - block_policy: &EthBlockPolicy, - state_backend: &SBT, - pending: &mut PendingTxMap, - max_promotable: usize, - ) -> bool { - let Some(insertable) = MAX_ADDRESSES.checked_sub(self.txs.len()) else { - return false; - }; - - let insertable = insertable.min(max_promotable); - - if insertable == 0 { - return true; - } - - let to_insert = pending.split_off(insertable); - - if to_insert.is_empty() { - return true; - } - - let last_commit_seq_num = last_commit.seq_num; - - let addresses = to_insert.len(); - let _timer = DropTimer::start(Duration::ZERO, |elapsed| { - debug!(?elapsed, addresses, "txpool promote_pending") - }); - - let addresses = to_insert.keys().cloned().collect_vec(); - - // BlockPolicy only guarantees that data is available for seqnum (N-k, N] for some execution - // delay k. Since block_policy looks up seqnum - execution_delay, passing the last commit - // seqnum will result in a lookup outside that range. As a fix, we add 1 so the seqnum is on - // the edge of the range. - let account_nonces = match block_policy.get_account_base_nonces( - last_commit_seq_num + SeqNum(1), - state_backend, - &Vec::default(), - addresses.iter(), - ) { - Ok(account_nonces) => account_nonces, - Err(err) => { - warn!( - ?err, - "failed to lookup account nonces during promote pending" - ); - event_tracker.drop_all( - to_insert - .into_values() - .map(PendingTxList::into_map) - .flat_map(BTreeMap::into_values) - .map(ValidEthTransaction::into_raw), - EthTxPoolDropReason::Internal(EthTxPoolInternalDropReason::StateBackendError), - ); - return false; - } - }; - - for (address, pending_tx_list) in to_insert { - let Some(account_nonce) = account_nonces.get(&address) else { - error!("txpool address missing from state backend"); - - event_tracker - .pending_drop_unknown(pending_tx_list.into_map().values().map(|tx| tx.hash())); - - continue; - }; + address: Address, + txs: Vec, + account_nonce: u64, + on_insert: &mut impl FnMut(&ValidEthTransaction), + ) { + match self.txs.entry(address) { + IndexMapEntry::Occupied(o) => { + let tx_list = o.into_mut(); - match self.txs.entry(address) { - IndexMapEntry::Occupied(_) => { - unreachable!("pending address present in tracked map") - } - IndexMapEntry::Vacant(v) => { - let Some(tracked_tx_list) = TrackedTxList::new_from_promote_pending( + for tx in txs { + if let Some(tx) = tx_list.try_insert_tx( event_tracker, - *account_nonce, - pending_tx_list, - ) else { - continue; - }; - - v.insert(tracked_tx_list); + tx, + last_commit.execution_inputs.base_fee_per_gas, + self.hard_tx_expiry, + ) { + on_insert(tx); + } } } + IndexMapEntry::Vacant(v) => { + TrackedTxList::try_new( + v, + event_tracker, + txs, + account_nonce, + on_insert, + last_commit.execution_inputs.base_fee_per_gas, + self.hard_tx_expiry, + ); + } } - - true } pub fn update_committed_nonce_usages( &mut self, event_tracker: &mut EthTxPoolEventTracker<'_>, nonce_usages: NonceUsageMap, - pending: &mut PendingTxMap, ) { - let mut insertable = MAX_ADDRESSES.saturating_sub(self.txs.len()); - for (address, nonce_usage) in nonce_usages.into_map() { match self.txs.entry(address) { IndexMapEntry::Occupied(tx_list) => { TrackedTxList::update_committed_nonce_usage(event_tracker, tx_list, nonce_usage) } - IndexMapEntry::Vacant(v) => match nonce_usage { - NonceUsage::Possible(_) => continue, - NonceUsage::Known(nonce) => { - if insertable == 0 { - continue; - } - - let Some(pending_tx_list) = pending.remove(&address) else { - continue; - }; - - let account_nonce = nonce - .checked_add(1) - .expect("account nonce does not overflow"); - - let Some(tracked_tx_list) = TrackedTxList::new_from_promote_pending( - event_tracker, - account_nonce, - pending_tx_list, - ) else { - continue; - }; - - insertable -= 1; - - v.insert(tracked_tx_list); - } - }, + IndexMapEntry::Vacant(_) => {} } } } diff --git a/monad-eth-txpool/tests/pool.rs b/monad-eth-txpool/tests/pool.rs index e80db5adb6..171b09d17a 100644 --- a/monad-eth-txpool/tests/pool.rs +++ b/monad-eth-txpool/tests/pool.rs @@ -822,7 +822,7 @@ fn test_intermediary_nonce_gap() { #[test] #[traced_test] fn test_nonce_exists_in_committed_block() { - // A transaction with nonce 0 should not be included in the block if the latest nonce of the account is 0 + // A transaction with nonce 0 should not be included in the block if the latest nonce of the account is 1 let tx1 = make_legacy_tx(S1, BASE_FEE, GAS_LIMIT, 0, 10); let tx2 = make_legacy_tx(S1, BASE_FEE, GAS_LIMIT, 1, 10); @@ -837,7 +837,7 @@ fn test_nonce_exists_in_committed_block() { Some(nonces), [ TxPoolTestEvent::InsertTxs { - txs: vec![(&tx1, true), (&tx2, true)], + txs: vec![(&tx1, false), (&tx2, true)], expected_pool_size_change: 1, }, TxPoolTestEvent::CreateProposal { @@ -1331,10 +1331,10 @@ fn test_large_batch_many_senders() { let txs = txs.clone(); move |pool| { - let EthTxPoolSnapshot { pending, tracked } = pool.generate_snapshot(); + let EthTxPoolSnapshot { txs: snapshot_txs } = pool.generate_snapshot(); let mut txs = txs.clone(); - txs.retain(|tx| !pending.contains(tx.tx_hash()) && !tracked.contains(tx.tx_hash())); + txs.retain(|tx| !snapshot_txs.contains(tx.tx_hash())); assert!(txs.is_empty()) } diff --git a/monad-rpc/src/handlers/eth/txn.rs b/monad-rpc/src/handlers/eth/txn.rs index d5b8e7c10d..8402646596 100644 --- a/monad-rpc/src/handlers/eth/txn.rs +++ b/monad-rpc/src/handlers/eth/txn.rs @@ -244,9 +244,7 @@ pub async fn monad_eth_sendRawTransaction( TxStatus::Dropped { reason } => { return Err(JsonRpcError::custom(reason.as_user_string())) } - TxStatus::Pending | TxStatus::Tracked | TxStatus::Committed => { - return Ok(hash.to_string()) - } + TxStatus::Tracked | TxStatus::Committed => return Ok(hash.to_string()), TxStatus::Unknown => { error!("txpool bridge sent unknown status"); } diff --git a/monad-rpc/src/txpool/state.rs b/monad-rpc/src/txpool/state.rs index c79602f5fe..023ea73347 100644 --- a/monad-rpc/src/txpool/state.rs +++ b/monad-rpc/src/txpool/state.rs @@ -118,10 +118,7 @@ impl EthTxPoolBridgeState { eviction_queue: &mut EthTxPoolBridgeEvictionQueue, snapshot: EthTxPoolSnapshot, ) { - let EthTxPoolSnapshot { - mut pending, - mut tracked, - } = snapshot; + let EthTxPoolSnapshot { mut txs } = snapshot; let now = Instant::now(); @@ -130,13 +127,7 @@ impl EthTxPoolBridgeState { self.status.retain(|tx_hash, status| { status.1 = None; - if pending.remove(tx_hash) { - status.0 = TxStatus::Pending; - eviction_queue.push_back((now, *tx_hash)); - return true; - } - - if tracked.remove(tx_hash) { + if txs.remove(tx_hash) { status.0 = TxStatus::Tracked; eviction_queue.push_back((now, *tx_hash)); return true; @@ -153,12 +144,7 @@ impl EthTxPoolBridgeState { false }); - for tx_hash in pending { - self.status.insert(tx_hash, (TxStatus::Pending, None)); - eviction_queue.push_back((now, tx_hash)); - } - - for tx_hash in tracked { + for tx_hash in txs { self.status.insert(tx_hash, (TxStatus::Tracked, None)); eviction_queue.push_back((now, tx_hash)); } @@ -191,19 +177,8 @@ impl EthTxPoolBridgeState { for EthTxPoolEvent { tx_hash, action } in events { match action { - EthTxPoolEventType::Insert { - address, - owned: _, - tracked, - } => { - insert( - tx_hash, - if tracked { - TxStatus::Tracked - } else { - TxStatus::Pending - }, - ); + EthTxPoolEventType::Insert { address, owned: _ } => { + insert(tx_hash, TxStatus::Tracked); self.hash_address.entry(tx_hash).insert(address); self.address_hashes @@ -291,8 +266,7 @@ mod test { let state = EthTxPoolBridgeState::new( &mut eviction_queue, EthTxPoolSnapshot { - pending: HashSet::default(), - tracked: HashSet::default(), + txs: HashSet::default(), }, ); let state_view = state.create_view(); @@ -332,28 +306,18 @@ mod test { async fn test_handle_events_and_snapshot() { enum TestCases { EmptySnapshot, - InsertPending, - InsertPendingSnapshot, - InsertTracked, - InsertTrackedSnapshot, + Insert, + InsertSnapshot, Drop, - Promote, - PromoteSnapshot, - DemoteSnapshot, Commit, Evict, } for test in [ TestCases::EmptySnapshot, - TestCases::InsertPending, - TestCases::InsertPendingSnapshot, - TestCases::InsertTracked, - TestCases::InsertTrackedSnapshot, + TestCases::Insert, + TestCases::InsertSnapshot, TestCases::Drop, - TestCases::Promote, - TestCases::PromoteSnapshot, - TestCases::DemoteSnapshot, TestCases::Commit, TestCases::Evict, ] { @@ -370,13 +334,12 @@ mod test { state.apply_snapshot( &mut eviction_queue, EthTxPoolSnapshot { - pending: HashSet::default(), - tracked: HashSet::default(), + txs: HashSet::default(), }, ); assert_eq!(state_view.get_status_by_hash(tx.tx_hash()), None); } - TestCases::InsertPending => { + TestCases::Insert => { state.handle_events( &mut eviction_queue, vec![EthTxPoolEvent { @@ -384,37 +347,6 @@ mod test { action: EthTxPoolEventType::Insert { address: tx.recover_signer().unwrap(), owned: true, - tracked: false, - }, - }], - ); - assert_eq!( - state_view.get_status_by_hash(tx.tx_hash()), - Some(TxStatus::Pending) - ); - } - TestCases::InsertPendingSnapshot => { - state.apply_snapshot( - &mut eviction_queue, - EthTxPoolSnapshot { - pending: HashSet::from_iter(std::iter::once(tx.tx_hash().to_owned())), - tracked: HashSet::default(), - }, - ); - assert_eq!( - state_view.get_status_by_hash(tx.tx_hash()), - Some(TxStatus::Pending) - ); - } - TestCases::InsertTracked => { - state.handle_events( - &mut eviction_queue, - vec![EthTxPoolEvent { - tx_hash: tx.tx_hash().to_owned(), - action: EthTxPoolEventType::Insert { - address: tx.recover_signer().unwrap(), - owned: true, - tracked: true, }, }], ); @@ -423,12 +355,11 @@ mod test { Some(TxStatus::Tracked) ); } - TestCases::InsertTrackedSnapshot => { + TestCases::InsertSnapshot => { state.apply_snapshot( &mut eviction_queue, EthTxPoolSnapshot { - pending: HashSet::default(), - tracked: HashSet::from_iter(std::iter::once(tx.tx_hash().to_owned())), + txs: HashSet::from_iter(std::iter::once(tx.tx_hash().to_owned())), }, ); assert_eq!( @@ -453,97 +384,6 @@ mod test { }) ); } - TestCases::Promote => { - state.handle_events( - &mut eviction_queue, - vec![EthTxPoolEvent { - tx_hash: tx.tx_hash().to_owned(), - action: EthTxPoolEventType::Insert { - address: tx.recover_signer().unwrap(), - owned: true, - tracked: false, - }, - }], - ); - assert_eq!( - state_view.get_status_by_hash(tx.tx_hash()), - Some(TxStatus::Pending) - ); - - state.handle_events( - &mut eviction_queue, - vec![EthTxPoolEvent { - tx_hash: tx.tx_hash().to_owned(), - action: EthTxPoolEventType::Insert { - address: tx.recover_signer().unwrap(), - owned: true, - tracked: true, - }, - }], - ); - assert_eq!( - state_view.get_status_by_hash(tx.tx_hash()), - Some(TxStatus::Tracked) - ); - } - TestCases::PromoteSnapshot => { - state.handle_events( - &mut eviction_queue, - vec![EthTxPoolEvent { - tx_hash: tx.tx_hash().to_owned(), - action: EthTxPoolEventType::Insert { - address: tx.recover_signer().unwrap(), - owned: true, - tracked: false, - }, - }], - ); - assert_eq!( - state_view.get_status_by_hash(tx.tx_hash()), - Some(TxStatus::Pending) - ); - - state.apply_snapshot( - &mut eviction_queue, - EthTxPoolSnapshot { - pending: HashSet::default(), - tracked: HashSet::from_iter(std::iter::once(tx.tx_hash().to_owned())), - }, - ); - assert_eq!( - state_view.get_status_by_hash(tx.tx_hash()), - Some(TxStatus::Tracked) - ); - } - TestCases::DemoteSnapshot => { - state.handle_events( - &mut eviction_queue, - vec![EthTxPoolEvent { - tx_hash: tx.tx_hash().to_owned(), - action: EthTxPoolEventType::Insert { - address: tx.recover_signer().unwrap(), - owned: true, - tracked: true, - }, - }], - ); - assert_eq!( - state_view.get_status_by_hash(tx.tx_hash()), - Some(TxStatus::Tracked) - ); - - state.apply_snapshot( - &mut eviction_queue, - EthTxPoolSnapshot { - pending: HashSet::from_iter(std::iter::once(tx.tx_hash().to_owned())), - tracked: HashSet::default(), - }, - ); - assert_eq!( - state_view.get_status_by_hash(tx.tx_hash()), - Some(TxStatus::Pending) - ); - } TestCases::Commit => { state.handle_events( &mut eviction_queue, @@ -560,8 +400,7 @@ mod test { state.apply_snapshot( &mut eviction_queue, EthTxPoolSnapshot { - pending: HashSet::default(), - tracked: HashSet::default(), + txs: HashSet::default(), }, ); assert_eq!(state_view.get_status_by_hash(tx.tx_hash()), None); @@ -586,8 +425,7 @@ mod test { state.apply_snapshot( &mut eviction_queue, EthTxPoolSnapshot { - pending: HashSet::default(), - tracked: HashSet::default(), + txs: HashSet::default(), }, ); assert_eq!(state_view.get_status_by_hash(tx.tx_hash()), None); diff --git a/monad-rpc/src/txpool/types.rs b/monad-rpc/src/txpool/types.rs index 678d93f70c..598f64c524 100644 --- a/monad-rpc/src/txpool/types.rs +++ b/monad-rpc/src/txpool/types.rs @@ -21,7 +21,6 @@ pub enum TxStatus { Unknown, // Alive - Pending, Tracked, // Dead diff --git a/monad-rpc/src/vpool.rs b/monad-rpc/src/vpool.rs index b28f88e5fd..0382b10496 100644 --- a/monad-rpc/src/vpool.rs +++ b/monad-rpc/src/vpool.rs @@ -38,7 +38,6 @@ impl From for TxPoolStatusResult { fn from(value: TxStatus) -> Self { let (status, reason) = match value { TxStatus::Unknown => ("unknown", None), - TxStatus::Pending => ("pending", None), TxStatus::Tracked => ("tracked", None), TxStatus::Dropped { reason } => ("dropped", Some(reason.as_user_string())), TxStatus::Evicted { reason } => ( From ed9db524d29d847e9eda3805269593bc3bb534f0 Mon Sep 17 00:00:00 2001 From: Andre Benedito <38482987+andr-dev@users.noreply.github.com> Date: Thu, 23 Oct 2025 16:54:43 -0400 Subject: [PATCH 4/4] Add limits to txpool --- monad-eth-txpool/src/pool/tracked/limits.rs | 138 ++++++++++++++++++++ monad-eth-txpool/src/pool/tracked/list.rs | 58 +++++--- monad-eth-txpool/src/pool/tracked/mod.rs | 66 +++++----- 3 files changed, 208 insertions(+), 54 deletions(-) create mode 100644 monad-eth-txpool/src/pool/tracked/limits.rs diff --git a/monad-eth-txpool/src/pool/tracked/limits.rs b/monad-eth-txpool/src/pool/tracked/limits.rs new file mode 100644 index 0000000000..403ffd6a0d --- /dev/null +++ b/monad-eth-txpool/src/pool/tracked/limits.rs @@ -0,0 +1,138 @@ +// Copyright (C) 2025 Category Labs, Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use std::time::Duration; + +use alloy_primitives::Address; +use indexmap::IndexMap; +use tracing::{error, info}; + +use crate::pool::transaction::ValidEthTransaction; + +// To produce 5k tx blocks, we need the tracked tx map to hold at least 15k addresses so that, after +// pruning the txpool of up to 5k unique addresses in the last committed block update and up to 5k +// unique addresses in the pending blocktree, the tracked tx map will still have at least 5k other +// addresses with at least one tx each to use when creating the next block. +const DEFAULT_MAX_ADDRESSES: usize = 16 * 1024; + +const DEFAULT_MAX_TXS: usize = 64 * 1024; + +const DEFAULT_MAX_EIP2718_BYTES: usize = 4 * 1024 * 1024 * 1024; + +#[derive(Clone, Debug)] +pub(crate) struct TrackedTxLimitsConfig { + max_addresses: usize, + max_txs: usize, + max_eip2718_bytes: usize, + + soft_evict_addresses_watermark: usize, + + soft_tx_expiry: Duration, + hard_tx_expiry: Duration, +} + +impl TrackedTxLimitsConfig { + pub fn new(soft_tx_expiry: Duration, hard_tx_expiry: Duration) -> Self { + Self { + max_addresses: DEFAULT_MAX_ADDRESSES, + max_txs: DEFAULT_MAX_TXS, + max_eip2718_bytes: DEFAULT_MAX_EIP2718_BYTES, + + soft_evict_addresses_watermark: DEFAULT_MAX_ADDRESSES - 512, + + soft_tx_expiry, + hard_tx_expiry, + } + } +} + +#[derive(Clone, Debug)] +pub(crate) struct TrackedTxLimits { + config: TrackedTxLimitsConfig, + + txs: usize, + eip2718_bytes: usize, +} + +impl TrackedTxLimits { + pub fn new(config: TrackedTxLimitsConfig) -> Self { + Self { + config, + + txs: 0, + eip2718_bytes: 0, + } + } + + pub fn build_txs_map(&self) -> IndexMap { + IndexMap::with_capacity(self.config.max_addresses) + } + + pub fn expiry_duration_during_evict(&self) -> Duration { + if self.txs < self.config.soft_evict_addresses_watermark { + self.config.hard_tx_expiry + } else { + info!(num_txs =? self.txs, "txpool limits hit soft evict addresses watermark"); + self.config.soft_tx_expiry + } + } + + pub fn expiry_duration_during_insert(&self) -> Duration { + self.config.hard_tx_expiry + } + + pub fn can_add_address(&self, addresses: usize) -> bool { + addresses < self.config.max_addresses + } + + pub fn add_tx(&mut self, tx: &ValidEthTransaction) -> bool { + let txs = self.txs + 1; + let eip2718_bytes = self.eip2718_bytes + tx.raw().eip2718_encoded_length(); + + if txs > self.config.max_txs { + return false; + } + + if eip2718_bytes > self.config.max_eip2718_bytes { + return false; + } + + self.txs = txs; + self.eip2718_bytes = eip2718_bytes; + + true + } + + pub fn remove_tx(&mut self, tx: &ValidEthTransaction) { + self.txs = self.txs.checked_sub(1).unwrap_or_else(|| { + error!("txpool txs limit underflowed, detected during remove_tx"); + 0 + }); + + self.eip2718_bytes = self + .eip2718_bytes + .checked_sub(tx.raw().eip2718_encoded_length()) + .unwrap_or_else(|| { + error!("txpool eip2718_bytes limit underflowed, detected during remove_tx"); + 0 + }); + } + + pub fn remove_txs<'a>(&mut self, txs: impl Iterator) { + for tx in txs { + self.remove_tx(tx) + } + } +} diff --git a/monad-eth-txpool/src/pool/tracked/list.rs b/monad-eth-txpool/src/pool/tracked/list.rs index b906de9882..a12937f0c0 100644 --- a/monad-eth-txpool/src/pool/tracked/list.rs +++ b/monad-eth-txpool/src/pool/tracked/list.rs @@ -26,7 +26,8 @@ use monad_eth_txpool_types::EthTxPoolDropReason; use monad_types::Nonce; use tracing::error; -use crate::{pool::transaction::ValidEthTransaction, EthTxPoolEventTracker}; +use super::{limits::TrackedTxLimits, ValidEthTransaction}; +use crate::EthTxPoolEventTracker; /// Stores byte-validated transactions alongside the an account_nonce to enforce at the type level /// that all the transactions in the txs map have a nonce at least account_nonce. Similar to @@ -42,11 +43,11 @@ impl TrackedTxList { pub fn try_new( this_entry: VacantEntry<'_, Address, Self>, event_tracker: &mut EthTxPoolEventTracker<'_>, + limit_tracker: &mut TrackedTxLimits, txs: Vec, account_nonce: u64, on_insert: &mut impl FnMut(&ValidEthTransaction), last_commit_base_fee: u64, - tx_expiry: Duration, ) { let mut this = TrackedTxList { account_nonce, @@ -54,7 +55,8 @@ impl TrackedTxList { }; for tx in txs { - if let Some(tx) = this.try_insert_tx(event_tracker, tx, last_commit_base_fee, tx_expiry) + if let Some(tx) = + this.try_insert_tx(event_tracker, limit_tracker, tx, last_commit_base_fee) { on_insert(tx); } @@ -105,9 +107,9 @@ impl TrackedTxList { pub(crate) fn try_insert_tx( &mut self, event_tracker: &mut EthTxPoolEventTracker<'_>, + limit_tracker: &mut TrackedTxLimits, tx: ValidEthTransaction, last_commit_base_fee: u64, - tx_expiry: Duration, ) -> Option<&ValidEthTransaction> { if tx.nonce() < self.account_nonce { event_tracker.drop(tx.hash(), EthTxPoolDropReason::NonceTooLow); @@ -122,29 +124,37 @@ impl TrackedTxList { btree_map::Entry::Occupied(mut entry) => { let (existing_tx, existing_tx_insert_time) = entry.get(); - if tx_expired(existing_tx_insert_time, tx_expiry, &event_tracker.now) - || tx.has_higher_priority(existing_tx, last_commit_base_fee) + if !tx_expired( + existing_tx_insert_time, + limit_tracker.expiry_duration_during_insert(), + &event_tracker.now, + ) && !tx.has_higher_priority(existing_tx, last_commit_base_fee) { - event_tracker.replace( - tx.signer_ref(), - existing_tx.hash(), - tx.hash(), - tx.is_owned(), - ); - entry.insert((tx, event_tracker.now)); - - Some(&entry.into_mut().0) - } else { event_tracker.drop(tx.hash(), EthTxPoolDropReason::ExistingHigherPriority); + return None; + } - None + if !limit_tracker.add_tx(&tx) { + event_tracker.drop(tx.hash(), EthTxPoolDropReason::PoolFull); + return None; } + + event_tracker.replace( + tx.signer_ref(), + existing_tx.hash(), + tx.hash(), + tx.is_owned(), + ); + entry.insert((tx, event_tracker.now)); + + Some(&entry.into_mut().0) } } } pub fn update_committed_nonce_usage( event_tracker: &mut EthTxPoolEventTracker<'_>, + limit_tracker: &mut TrackedTxLimits, mut this: indexmap::map::OccupiedEntry<'_, Address, Self>, nonce_usage: NonceUsage, ) { @@ -164,6 +174,7 @@ impl TrackedTxList { let txs = this.get_mut().txs.split_off(&account_nonce); + limit_tracker.remove_txs(txs.values().map(|(tx, _)| tx)); event_tracker.tracked_commit( txs.is_empty(), this.get().txs.values().map(|(tx, _)| tx.hash()), @@ -180,6 +191,7 @@ impl TrackedTxList { // Produces true when the entry was removed and false otherwise pub fn evict_expired_txs( event_tracker: &mut EthTxPoolEventTracker<'_>, + limit_tracker: &mut TrackedTxLimits, mut this: indexmap::map::IndexedEntry<'_, Address, Self>, tx_expiry: Duration, ) -> bool { @@ -187,18 +199,22 @@ impl TrackedTxList { let txs = &mut this.get_mut().txs; - let mut removed_hashes = Vec::default(); + let mut removed_txs = Vec::default(); txs.retain(|_, (tx, tx_insert)| { if !tx_expired(tx_insert, tx_expiry, &now) { return true; } - removed_hashes.push(tx.hash()); + removed_txs.push(tx.clone()); false }); - event_tracker.tracked_evict_expired(txs.is_empty(), removed_hashes.into_iter()); + limit_tracker.remove_txs(removed_txs.iter()); + event_tracker.tracked_evict_expired( + txs.is_empty(), + removed_txs.iter().map(ValidEthTransaction::hash), + ); if txs.is_empty() { this.swap_remove(); @@ -211,6 +227,7 @@ impl TrackedTxList { pub fn static_validate_all_txs( &mut self, event_tracker: &mut EthTxPoolEventTracker<'_>, + limit_tracker: &mut TrackedTxLimits, chain_id: u64, chain_revision: &CRT, execution_revision: &MonadExecutionRevision, @@ -227,6 +244,7 @@ impl TrackedTxList { return true; }; + limit_tracker.remove_tx(tx); event_tracker.drop(tx.hash(), EthTxPoolDropReason::NotWellFormed(error)); false diff --git a/monad-eth-txpool/src/pool/tracked/mod.rs b/monad-eth-txpool/src/pool/tracked/mod.rs index 7a5bd4f467..9b90b89fb3 100644 --- a/monad-eth-txpool/src/pool/tracked/mod.rs +++ b/monad-eth-txpool/src/pool/tracked/mod.rs @@ -25,28 +25,19 @@ use monad_crypto::certificate_signature::{ CertificateSignaturePubKey, CertificateSignatureRecoverable, }; use monad_eth_block_policy::nonce_usage::NonceUsageMap; +use monad_eth_txpool_types::EthTxPoolDropReason; use monad_eth_types::EthExecutionProtocol; use monad_state_backend::StateBackend; use monad_validator::signature_collection::SignatureCollection; -use tracing::info; +use self::limits::TrackedTxLimits; pub(super) use self::list::TrackedTxList; use super::transaction::ValidEthTransaction; -use crate::EthTxPoolEventTracker; +use crate::{pool::tracked::limits::TrackedTxLimitsConfig, EthTxPoolEventTracker}; +mod limits; mod list; -// To produce 5k tx blocks, we need the tracked tx map to hold at least 15k addresses so that, after -// pruning the txpool of up to 5k unique addresses in the last committed block update and up to 5k -// unique addresses in the pending blocktree, the tracked tx map will still have at least 5k other -// addresses with at least one tx each to use when creating the next block. -const MAX_ADDRESSES: usize = 16 * 1024; - -// Tx batches from rpc can contain up to roughly 500 transactions. Since we don't evict based on how -// many txs are in the pool, we need to ensure that after eviction there is always space for all 500 -// txs. -const SOFT_EVICT_ADDRESSES_WATERMARK: usize = MAX_ADDRESSES - 512; - /// Stores transactions using a "snapshot" system by which each address has an associated /// account_nonce stored in the TrackedTxList which is guaranteed to be the correct /// account_nonce for the seqnum stored in last_commit_seq_num. @@ -60,9 +51,7 @@ where // By using IndexMap, we can iterate through the map with Vec-like performance and are able to // evict expired txs through the entry API. txs: IndexMap, - - soft_tx_expiry: Duration, - hard_tx_expiry: Duration, + limits: TrackedTxLimits, _phantom: PhantomData<(ST, SCT, SBT, CCT, CRT)>, } @@ -76,11 +65,13 @@ where CRT: ChainRevision, { pub fn new(soft_tx_expiry: Duration, hard_tx_expiry: Duration) -> Self { - Self { - txs: IndexMap::with_capacity(MAX_ADDRESSES), + let limits_config = TrackedTxLimitsConfig::new(soft_tx_expiry, hard_tx_expiry); + + let limits = TrackedTxLimits::new(limits_config); - soft_tx_expiry, - hard_tx_expiry, + Self { + txs: limits.build_txs_map(), + limits, _phantom: PhantomData, } @@ -119,6 +110,8 @@ where account_nonce: u64, on_insert: &mut impl FnMut(&ValidEthTransaction), ) { + let txs_len = self.txs.len(); + match self.txs.entry(address) { IndexMapEntry::Occupied(o) => { let tx_list = o.into_mut(); @@ -126,23 +119,31 @@ where for tx in txs { if let Some(tx) = tx_list.try_insert_tx( event_tracker, + &mut self.limits, tx, last_commit.execution_inputs.base_fee_per_gas, - self.hard_tx_expiry, ) { on_insert(tx); } } } IndexMapEntry::Vacant(v) => { + if self.limits.can_add_address(txs_len) { + event_tracker.drop_all( + txs.into_iter().map(ValidEthTransaction::into_raw), + EthTxPoolDropReason::PoolFull, + ); + return; + } + TrackedTxList::try_new( v, event_tracker, + &mut self.limits, txs, account_nonce, on_insert, last_commit.execution_inputs.base_fee_per_gas, - self.hard_tx_expiry, ); } } @@ -155,23 +156,19 @@ where ) { for (address, nonce_usage) in nonce_usages.into_map() { match self.txs.entry(address) { - IndexMapEntry::Occupied(tx_list) => { - TrackedTxList::update_committed_nonce_usage(event_tracker, tx_list, nonce_usage) - } + IndexMapEntry::Occupied(tx_list) => TrackedTxList::update_committed_nonce_usage( + event_tracker, + &mut self.limits, + tx_list, + nonce_usage, + ), IndexMapEntry::Vacant(_) => {} } } } pub fn evict_expired_txs(&mut self, event_tracker: &mut EthTxPoolEventTracker<'_>) { - let num_txs = self.num_txs(); - - let tx_expiry = if num_txs < SOFT_EVICT_ADDRESSES_WATERMARK { - self.hard_tx_expiry - } else { - info!(?num_txs, "txpool hit soft evict addresses watermark"); - self.soft_tx_expiry - }; + let tx_expiry = self.limits.expiry_duration_during_evict(); let mut idx = 0; @@ -184,7 +181,7 @@ where break; }; - if TrackedTxList::evict_expired_txs(event_tracker, entry, tx_expiry) { + if TrackedTxList::evict_expired_txs(event_tracker, &mut self.limits, entry, tx_expiry) { continue; } @@ -206,6 +203,7 @@ where self.txs.retain(|_, tx_list| { tx_list.static_validate_all_txs( event_tracker, + &mut self.limits, chain_id, chain_revision, execution_revision,