diff --git a/monad-eth-txpool/src/pool/mod.rs b/monad-eth-txpool/src/pool/mod.rs index 2527f6883f..a9fe1934a7 100644 --- a/monad-eth-txpool/src/pool/mod.rs +++ b/monad-eth-txpool/src/pool/mod.rs @@ -27,27 +27,35 @@ use monad_chain_config::{ ChainConfig, MockChainConfig, }; use monad_consensus_types::{ - block::{BlockPolicyError, ConsensusBlockHeader, ProposedExecutionInputs}, + block::{ + BlockPolicyBlockValidator, BlockPolicyError, ConsensusBlockHeader, ProposedExecutionInputs, + }, payload::RoundSignature, }; use monad_crypto::certificate_signature::{ CertificateSignaturePubKey, CertificateSignatureRecoverable, }; -use monad_eth_block_policy::{timestamp_ns_to_secs, EthBlockPolicy, EthValidatedBlock}; +use monad_eth_block_policy::{ + timestamp_ns_to_secs, EthBlockPolicy, EthBlockPolicyBlockValidator, EthValidatedBlock, +}; use monad_eth_txpool_types::{EthTxPoolDropReason, EthTxPoolInternalDropReason, EthTxPoolSnapshot}; use monad_eth_types::{EthBlockBody, EthExecutionProtocol, ExtractEthAddress, ProposedEthHeader}; use monad_state_backend::{StateBackend, StateBackendError}; use monad_system_calls::{SystemTransactionGenerator, SYSTEM_SENDER_ETH_ADDRESS}; -use monad_types::{Epoch, NodeId, Round, SeqNum}; +use monad_types::{DropTimer, Epoch, NodeId, Round, SeqNum}; use monad_validator::signature_collection::SignatureCollection; use rayon::iter::{IntoParallelIterator, ParallelIterator}; use tracing::{debug, error, info, warn}; pub use self::transaction::max_eip2718_encoded_length; -use self::{pending::PendingTxMap, tracked::TrackedTxMap, transaction::ValidEthTransaction}; +use self::{ + pending::PendingTxMap, sequencer::ProposalSequencer, tracked::TrackedTxMap, + transaction::ValidEthTransaction, +}; use crate::EthTxPoolEventTracker; mod pending; +mod sequencer; mod tracked; mod transaction; @@ -224,7 +232,6 @@ where block_policy, state_backend, &mut self.pending, - 0, INSERT_TXS_MAX_PROMOTE, ) && self.pending.is_at_promote_txs_watermark() { @@ -251,7 +258,6 @@ where block_policy, state_backend, &mut self.pending, - 0, PENDING_MAX_PROMOTE, ) { warn!("txpool failed to promote during promote_pending call"); @@ -334,42 +340,18 @@ where .map(|tx| tx.length() as u64) .sum(); - let user_transactions = if let Some(last_commit) = self.last_commit.as_ref() { - let last_commit_seq_num = last_commit.seq_num; - - assert!( - block_policy.get_last_commit().ge(&last_commit_seq_num), - "txpool received block policy with lower committed seq num" - ); - - if last_commit_seq_num == block_policy.get_last_commit() { - self.tracked.create_proposal( - event_tracker, - self.chain_id, - proposed_seq_num, - base_fee, - tx_limit - system_transactions.len(), - proposal_gas_limit, - proposal_byte_limit - system_txs_size, - block_policy, - extending_blocks.iter().collect(), - state_backend, - chain_config, - &self.chain_revision, - &self.execution_revision, - )? - } else { - error!( - block_policy_last_commit = block_policy.get_last_commit().0, - txpool_last_commit = last_commit_seq_num.0, - "last commit update does not match block policy last commit" - ); - Vec::default() - } - } else { - error!("txpool create_proposal called before last committed block set"); - Vec::default() - }; + let user_transactions = self.sequence_user_transactions( + event_tracker, + proposed_seq_num, + base_fee, + tx_limit - system_transactions.len(), + proposal_gas_limit, + proposal_byte_limit - system_txs_size, + extending_blocks.iter().collect(), + block_policy, + state_backend, + chain_config, + )?; let body = EthBlockBody { transactions: system_transactions @@ -635,6 +617,124 @@ where .map(|sys_txn| sys_txn.into()) .collect_vec()) } + + pub fn sequence_user_transactions( + &mut self, + event_tracker: &mut EthTxPoolEventTracker<'_>, + proposed_seq_num: SeqNum, + base_fee: u64, + tx_limit: usize, + proposal_gas_limit: u64, + proposal_byte_limit: u64, + extending_blocks: Vec<&EthValidatedBlock>, + block_policy: &EthBlockPolicy, + state_backend: &SBT, + chain_config: &CCT, + ) -> Result>, BlockPolicyError> { + let _timer = DropTimer::start(Duration::ZERO, |elapsed| { + debug!(?elapsed, "txpool create_proposal"); + }); + + let Some(last_commit) = self.last_commit.as_ref() else { + error!("txpool create_proposal called before last committed block set"); + return Ok(Vec::default()); + }; + + let last_commit_seq_num = last_commit.seq_num; + + assert!( + block_policy.get_last_commit().ge(&last_commit_seq_num), + "txpool received block policy with lower committed seq num" + ); + + if last_commit_seq_num != block_policy.get_last_commit() { + error!( + block_policy_last_commit = block_policy.get_last_commit().0, + txpool_last_commit = last_commit_seq_num.0, + "txpool last commit update does not match block policy last commit" + ); + return Ok(Vec::default()); + } + + if tx_limit == 0 { + warn!("txpool create_proposal called with zero tx_limit"); + return Ok(Vec::default()); + } + + let sequencer = + ProposalSequencer::new(self.tracked.iter(), &extending_blocks, base_fee, tx_limit); + let sequencer_len = sequencer.len(); + + if sequencer.is_empty() { + return Ok(Vec::default()); + } + + let (account_balances, state_backend_lookups) = { + let _timer = DropTimer::start(Duration::ZERO, |elapsed| { + debug!( + ?elapsed, + "txpool create_proposal compute account base balances" + ); + }); + + let total_db_lookups_before = state_backend.total_db_lookups(); + + ( + block_policy.compute_account_base_balances( + proposed_seq_num, + state_backend, + chain_config, + Some(&extending_blocks), + sequencer.addresses(), + )?, + state_backend.total_db_lookups() - total_db_lookups_before, + ) + }; + + info!( + addresses = self.tracked.num_addresses(), + num_txs = self.tracked.num_txs(), + sequencer_len, + account_balances = account_balances.len(), + ?state_backend_lookups, + "txpool sequencing transactions" + ); + + let validator = EthBlockPolicyBlockValidator::new( + proposed_seq_num, + block_policy.get_execution_delay(), + base_fee, + &self.chain_revision, + &self.execution_revision, + )?; + + let proposal = sequencer.build_proposal( + tx_limit, + proposal_gas_limit, + proposal_byte_limit, + chain_config, + account_balances, + validator, + ); + + let proposal_num_txs = proposal.txs.len(); + + event_tracker.record_create_proposal( + self.tracked.num_addresses(), + sequencer_len, + state_backend_lookups, + proposal_num_txs, + ); + + info!( + ?proposed_seq_num, + ?proposal_num_txs, + proposal_total_gas = proposal.total_gas, + "created proposal" + ); + + Ok(proposal.txs) + } } impl EthTxPool diff --git a/monad-eth-txpool/src/pool/tracked/sequencer.rs b/monad-eth-txpool/src/pool/sequencer.rs similarity index 96% rename from monad-eth-txpool/src/pool/tracked/sequencer.rs rename to monad-eth-txpool/src/pool/sequencer.rs index 95f5a95653..011951df93 100644 --- a/monad-eth-txpool/src/pool/tracked/sequencer.rs +++ b/monad-eth-txpool/src/pool/sequencer.rs @@ -20,7 +20,6 @@ use std::{ use alloy_consensus::{transaction::Recovered, Transaction, TxEnvelope}; use alloy_primitives::Address; -use indexmap::IndexMap; use monad_chain_config::{revision::ChainRevision, ChainConfig}; use monad_consensus_types::block::{AccountBalanceState, BlockPolicyBlockValidator}; use monad_crypto::certificate_signature::{ @@ -34,8 +33,10 @@ use monad_validator::signature_collection::SignatureCollection; use rand::seq::SliceRandom; use tracing::{debug, error, trace, warn}; -use super::list::TrackedTxList; -use crate::pool::transaction::{ValidEthRecoveredAuthorization, ValidEthTransaction}; +use crate::pool::{ + tracked::TrackedTxList, + transaction::{ValidEthRecoveredAuthorization, ValidEthTransaction}, +}; #[derive(Debug, PartialEq, Eq)] struct OrderedTx<'a> { @@ -96,7 +97,7 @@ pub struct ProposalSequencer<'a> { impl<'a> ProposalSequencer<'a> { pub fn new( - tracked_txs: &'a IndexMap, + tracked_txs: impl Iterator, extending_blocks: &Vec<&EthValidatedBlock>, base_fee: u64, tx_limit: usize, @@ -107,7 +108,7 @@ impl<'a> ProposalSequencer<'a> { { let mut pending_nonce_usages = extending_blocks.get_nonce_usages().into_map(); - let mut heap_vec = Vec::with_capacity(tracked_txs.len()); + let mut heap_vec = Vec::default(); let mut virtual_time = 0; for (address, tx_list) in tracked_txs { @@ -139,6 +140,10 @@ impl<'a> ProposalSequencer<'a> { } } + pub fn is_empty(&self) -> bool { + self.heap.is_empty() + } + pub fn len(&self) -> usize { self.heap.len() } @@ -156,7 +161,6 @@ impl<'a> ProposalSequencer<'a> { pub fn build_proposal( mut self, - chain_id: u64, tx_limit: usize, proposal_gas_limit: u64, proposal_byte_limit: u64, diff --git a/monad-eth-txpool/src/pool/tracked/mod.rs b/monad-eth-txpool/src/pool/tracked/mod.rs index 468f7e82a4..9e414dcf57 100644 --- a/monad-eth-txpool/src/pool/tracked/mod.rs +++ b/monad-eth-txpool/src/pool/tracked/mod.rs @@ -15,22 +15,19 @@ use std::{collections::BTreeMap, marker::PhantomData, time::Duration}; -use alloy_consensus::{transaction::Recovered, TxEnvelope}; use alloy_primitives::Address; use indexmap::{map::Entry as IndexMapEntry, IndexMap}; use itertools::Itertools; use monad_chain_config::{ execution_revision::MonadExecutionRevision, revision::ChainRevision, ChainConfig, }; -use monad_consensus_types::block::{ - BlockPolicyBlockValidator, BlockPolicyError, ConsensusBlockHeader, -}; +use monad_consensus_types::block::ConsensusBlockHeader; use monad_crypto::certificate_signature::{ CertificateSignaturePubKey, CertificateSignatureRecoverable, }; use monad_eth_block_policy::{ nonce_usage::{NonceUsage, NonceUsageMap}, - EthBlockPolicy, EthBlockPolicyBlockValidator, EthValidatedBlock, + EthBlockPolicy, }; use monad_eth_txpool_types::{EthTxPoolDropReason, EthTxPoolInternalDropReason}; use monad_eth_types::EthExecutionProtocol; @@ -39,7 +36,7 @@ use monad_types::{DropTimer, SeqNum}; use monad_validator::signature_collection::SignatureCollection; use tracing::{debug, error, info, warn}; -use self::{list::TrackedTxList, sequencer::ProposalSequencer}; +pub(super) use self::list::TrackedTxList; use super::{ pending::{PendingTxList, PendingTxMap}, transaction::ValidEthTransaction, @@ -47,7 +44,6 @@ use super::{ use crate::EthTxPoolEventTracker; mod list; -mod sequencer; // To produce 5k tx blocks, we need the tracked tx map to hold at least 15k addresses so that, after // pruning the txpool of up to 5k unique addresses in the last committed block update and up to 5k @@ -111,6 +107,10 @@ where self.txs.values().map(TrackedTxList::num_txs).sum() } + pub fn iter(&self) -> impl Iterator { + self.txs.iter() + } + pub fn iter_txs(&self) -> impl Iterator { self.txs.values().flat_map(TrackedTxList::iter) } @@ -140,101 +140,6 @@ where )) } - pub fn create_proposal( - &mut self, - event_tracker: &mut EthTxPoolEventTracker<'_>, - chain_id: u64, - proposed_seq_num: SeqNum, - base_fee: u64, - tx_limit: usize, - proposal_gas_limit: u64, - proposal_byte_limit: u64, - block_policy: &EthBlockPolicy, - extending_blocks: Vec<&EthValidatedBlock>, - state_backend: &SBT, - chain_config: &CCT, - chain_revision: &CRT, - execution_revision: &MonadExecutionRevision, - ) -> Result>, BlockPolicyError> { - let _timer = DropTimer::start(Duration::ZERO, |elapsed| { - debug!(?elapsed, "txpool create_proposal"); - }); - - if self.txs.is_empty() || tx_limit == 0 { - return Ok(Vec::new()); - } - - let sequencer = ProposalSequencer::new(&self.txs, &extending_blocks, base_fee, tx_limit); - let sequencer_len = sequencer.len(); - - let (account_balances, state_backend_lookups) = { - let _timer = DropTimer::start(Duration::ZERO, |elapsed| { - debug!( - ?elapsed, - "txpool create_proposal compute account base balances" - ); - }); - - let total_db_lookups_before = state_backend.total_db_lookups(); - - ( - block_policy.compute_account_base_balances( - proposed_seq_num, - state_backend, - chain_config, - Some(&extending_blocks), - sequencer.addresses(), - )?, - state_backend.total_db_lookups() - total_db_lookups_before, - ) - }; - - info!( - addresses = self.txs.len(), - num_txs = self.num_txs(), - sequencer_len, - account_balances = account_balances.len(), - ?state_backend_lookups, - "txpool sequencing transactions" - ); - - let validator = EthBlockPolicyBlockValidator::new( - proposed_seq_num, - block_policy.get_execution_delay(), - base_fee, - chain_revision, - execution_revision, - )?; - - let proposal = sequencer.build_proposal( - chain_id, - tx_limit, - proposal_gas_limit, - proposal_byte_limit, - chain_config, - account_balances, - validator, - ); - - let proposal_num_txs = proposal.txs.len(); - - event_tracker.record_create_proposal( - self.num_addresses(), - sequencer_len, - state_backend_lookups, - proposal_num_txs, - ); - - info!( - ?proposed_seq_num, - ?proposal_num_txs, - proposal_total_gas = proposal.total_gas, - "created proposal" - ); - - Ok(proposal.txs) - } - pub fn try_promote_pending( &mut self, event_tracker: &mut EthTxPoolEventTracker<'_>, @@ -242,17 +147,12 @@ where block_policy: &EthBlockPolicy, state_backend: &SBT, pending: &mut PendingTxMap, - min_promotable: usize, max_promotable: usize, ) -> bool { let Some(insertable) = MAX_ADDRESSES.checked_sub(self.txs.len()) else { return false; }; - if insertable < min_promotable { - return true; - } - let insertable = insertable.min(max_promotable); if insertable == 0 {