Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 142 additions & 42 deletions monad-eth-txpool/src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,35 @@ use monad_chain_config::{
ChainConfig, MockChainConfig,
};
use monad_consensus_types::{
block::{BlockPolicyError, ConsensusBlockHeader, ProposedExecutionInputs},
block::{
BlockPolicyBlockValidator, BlockPolicyError, ConsensusBlockHeader, ProposedExecutionInputs,
},
payload::RoundSignature,
};
use monad_crypto::certificate_signature::{
CertificateSignaturePubKey, CertificateSignatureRecoverable,
};
use monad_eth_block_policy::{timestamp_ns_to_secs, EthBlockPolicy, EthValidatedBlock};
use monad_eth_block_policy::{
timestamp_ns_to_secs, EthBlockPolicy, EthBlockPolicyBlockValidator, EthValidatedBlock,
};
use monad_eth_txpool_types::{EthTxPoolDropReason, EthTxPoolInternalDropReason, EthTxPoolSnapshot};
use monad_eth_types::{EthBlockBody, EthExecutionProtocol, ExtractEthAddress, ProposedEthHeader};
use monad_state_backend::{StateBackend, StateBackendError};
use monad_system_calls::{SystemTransactionGenerator, SYSTEM_SENDER_ETH_ADDRESS};
use monad_types::{Epoch, NodeId, Round, SeqNum};
use monad_types::{DropTimer, Epoch, NodeId, Round, SeqNum};
use monad_validator::signature_collection::SignatureCollection;
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use tracing::{debug, error, info, warn};

pub use self::transaction::max_eip2718_encoded_length;
use self::{pending::PendingTxMap, tracked::TrackedTxMap, transaction::ValidEthTransaction};
use self::{
pending::PendingTxMap, sequencer::ProposalSequencer, tracked::TrackedTxMap,
transaction::ValidEthTransaction,
};
use crate::EthTxPoolEventTracker;

mod pending;
mod sequencer;
mod tracked;
mod transaction;

Expand Down Expand Up @@ -224,7 +232,6 @@ where
block_policy,
state_backend,
&mut self.pending,
0,
INSERT_TXS_MAX_PROMOTE,
) && self.pending.is_at_promote_txs_watermark()
{
Expand All @@ -251,7 +258,6 @@ where
block_policy,
state_backend,
&mut self.pending,
0,
PENDING_MAX_PROMOTE,
) {
warn!("txpool failed to promote during promote_pending call");
Expand Down Expand Up @@ -334,42 +340,18 @@ where
.map(|tx| tx.length() as u64)
.sum();

let user_transactions = if let Some(last_commit) = self.last_commit.as_ref() {
let last_commit_seq_num = last_commit.seq_num;

assert!(
block_policy.get_last_commit().ge(&last_commit_seq_num),
"txpool received block policy with lower committed seq num"
);

if last_commit_seq_num == block_policy.get_last_commit() {
self.tracked.create_proposal(
event_tracker,
self.chain_id,
proposed_seq_num,
base_fee,
tx_limit - system_transactions.len(),
proposal_gas_limit,
proposal_byte_limit - system_txs_size,
block_policy,
extending_blocks.iter().collect(),
state_backend,
chain_config,
&self.chain_revision,
&self.execution_revision,
)?
} else {
error!(
block_policy_last_commit = block_policy.get_last_commit().0,
txpool_last_commit = last_commit_seq_num.0,
"last commit update does not match block policy last commit"
);
Vec::default()
}
} else {
error!("txpool create_proposal called before last committed block set");
Vec::default()
};
let user_transactions = self.sequence_user_transactions(
event_tracker,
proposed_seq_num,
base_fee,
tx_limit - system_transactions.len(),
proposal_gas_limit,
proposal_byte_limit - system_txs_size,
extending_blocks.iter().collect(),
block_policy,
state_backend,
chain_config,
)?;

let body = EthBlockBody {
transactions: system_transactions
Expand Down Expand Up @@ -635,6 +617,124 @@ where
.map(|sys_txn| sys_txn.into())
.collect_vec())
}

pub fn sequence_user_transactions(
&mut self,
event_tracker: &mut EthTxPoolEventTracker<'_>,
proposed_seq_num: SeqNum,
base_fee: u64,
tx_limit: usize,
proposal_gas_limit: u64,
proposal_byte_limit: u64,
extending_blocks: Vec<&EthValidatedBlock<ST, SCT>>,
block_policy: &EthBlockPolicy<ST, SCT, CCT, CRT>,
state_backend: &SBT,
chain_config: &CCT,
) -> Result<Vec<Recovered<TxEnvelope>>, BlockPolicyError> {
let _timer = DropTimer::start(Duration::ZERO, |elapsed| {
debug!(?elapsed, "txpool create_proposal");
});

let Some(last_commit) = self.last_commit.as_ref() else {
error!("txpool create_proposal called before last committed block set");
return Ok(Vec::default());
};

let last_commit_seq_num = last_commit.seq_num;

assert!(
block_policy.get_last_commit().ge(&last_commit_seq_num),
"txpool received block policy with lower committed seq num"
);

if last_commit_seq_num != block_policy.get_last_commit() {
error!(
block_policy_last_commit = block_policy.get_last_commit().0,
txpool_last_commit = last_commit_seq_num.0,
"txpool last commit update does not match block policy last commit"
);
return Ok(Vec::default());
}

if tx_limit == 0 {
warn!("txpool create_proposal called with zero tx_limit");
return Ok(Vec::default());
}

let sequencer =
ProposalSequencer::new(self.tracked.iter(), &extending_blocks, base_fee, tx_limit);
let sequencer_len = sequencer.len();

if sequencer.is_empty() {
return Ok(Vec::default());
}

let (account_balances, state_backend_lookups) = {
let _timer = DropTimer::start(Duration::ZERO, |elapsed| {
debug!(
?elapsed,
"txpool create_proposal compute account base balances"
);
});

let total_db_lookups_before = state_backend.total_db_lookups();

(
block_policy.compute_account_base_balances(
proposed_seq_num,
state_backend,
chain_config,
Some(&extending_blocks),
sequencer.addresses(),
)?,
state_backend.total_db_lookups() - total_db_lookups_before,
)
};

info!(
addresses = self.tracked.num_addresses(),
num_txs = self.tracked.num_txs(),
sequencer_len,
account_balances = account_balances.len(),
?state_backend_lookups,
"txpool sequencing transactions"
);

let validator = EthBlockPolicyBlockValidator::new(
proposed_seq_num,
block_policy.get_execution_delay(),
base_fee,
&self.chain_revision,
&self.execution_revision,
)?;

let proposal = sequencer.build_proposal(
tx_limit,
proposal_gas_limit,
proposal_byte_limit,
chain_config,
account_balances,
validator,
);

let proposal_num_txs = proposal.txs.len();

event_tracker.record_create_proposal(
self.tracked.num_addresses(),
sequencer_len,
state_backend_lookups,
proposal_num_txs,
);

info!(
?proposed_seq_num,
?proposal_num_txs,
proposal_total_gas = proposal.total_gas,
"created proposal"
);

Ok(proposal.txs)
}
}

impl<ST, SCT, SBT> EthTxPool<ST, SCT, SBT, MockChainConfig, MockChainRevision>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use std::{

use alloy_consensus::{transaction::Recovered, Transaction, TxEnvelope};
use alloy_primitives::Address;
use indexmap::IndexMap;
use monad_chain_config::{revision::ChainRevision, ChainConfig};
use monad_consensus_types::block::{AccountBalanceState, BlockPolicyBlockValidator};
use monad_crypto::certificate_signature::{
Expand All @@ -34,8 +33,10 @@ use monad_validator::signature_collection::SignatureCollection;
use rand::seq::SliceRandom;
use tracing::{debug, error, trace, warn};

use super::list::TrackedTxList;
use crate::pool::transaction::{ValidEthRecoveredAuthorization, ValidEthTransaction};
use crate::pool::{
tracked::TrackedTxList,
transaction::{ValidEthRecoveredAuthorization, ValidEthTransaction},
};

#[derive(Debug, PartialEq, Eq)]
struct OrderedTx<'a> {
Expand Down Expand Up @@ -96,7 +97,7 @@ pub struct ProposalSequencer<'a> {

impl<'a> ProposalSequencer<'a> {
pub fn new<ST, SCT>(
tracked_txs: &'a IndexMap<Address, TrackedTxList>,
tracked_txs: impl Iterator<Item = (&'a Address, &'a TrackedTxList)>,
extending_blocks: &Vec<&EthValidatedBlock<ST, SCT>>,
base_fee: u64,
tx_limit: usize,
Expand All @@ -107,7 +108,7 @@ impl<'a> ProposalSequencer<'a> {
{
let mut pending_nonce_usages = extending_blocks.get_nonce_usages().into_map();

let mut heap_vec = Vec::with_capacity(tracked_txs.len());
let mut heap_vec = Vec::default();
let mut virtual_time = 0;

for (address, tx_list) in tracked_txs {
Expand Down Expand Up @@ -139,6 +140,10 @@ impl<'a> ProposalSequencer<'a> {
}
}

pub fn is_empty(&self) -> bool {
self.heap.is_empty()
}

pub fn len(&self) -> usize {
self.heap.len()
}
Expand All @@ -156,7 +161,6 @@ impl<'a> ProposalSequencer<'a> {

pub fn build_proposal<CCT, CRT>(
mut self,
chain_id: u64,
tx_limit: usize,
proposal_gas_limit: u64,
proposal_byte_limit: u64,
Expand Down
Loading
Loading