Skip to content

Commit ea354a2

Browse files
committed
Refactor txpool last commit state
1 parent 9d7c205 commit ea354a2

File tree

2 files changed

+94
-98
lines changed

2 files changed

+94
-98
lines changed

monad-eth-txpool/src/pool/mod.rs

Lines changed: 80 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use monad_chain_config::{
2727
ChainConfig, MockChainConfig,
2828
};
2929
use monad_consensus_types::{
30-
block::{BlockPolicyError, ProposedExecutionInputs},
30+
block::{BlockPolicyError, ConsensusBlockHeader, ProposedExecutionInputs},
3131
payload::RoundSignature,
3232
};
3333
use monad_crypto::certificate_signature::{
@@ -41,7 +41,7 @@ use monad_system_calls::{SystemTransactionGenerator, SYSTEM_SENDER_ETH_ADDRESS};
4141
use monad_types::{Epoch, NodeId, Round, SeqNum};
4242
use monad_validator::signature_collection::SignatureCollection;
4343
use rayon::iter::{IntoParallelIterator, ParallelIterator};
44-
use tracing::{debug, info, warn};
44+
use tracing::{debug, error, info, warn};
4545

4646
pub use self::transaction::max_eip2718_encoded_length;
4747
use self::{pending::PendingTxMap, tracked::TrackedTxMap, transaction::ValidEthTransaction};
@@ -70,6 +70,8 @@ where
7070
pending: PendingTxMap,
7171
tracked: TrackedTxMap<ST, SCT, SBT, CCT, CRT>,
7272

73+
last_commit: Option<ConsensusBlockHeader<ST, SCT, EthExecutionProtocol>>,
74+
7375
chain_id: u64,
7476
chain_revision: CRT,
7577
execution_revision: MonadExecutionRevision,
@@ -98,6 +100,8 @@ where
98100
pending: PendingTxMap::default(),
99101
tracked: TrackedTxMap::new(soft_tx_expiry, hard_tx_expiry),
100102

103+
last_commit: None,
104+
101105
chain_id,
102106
chain_revision,
103107
execution_revision,
@@ -136,7 +140,7 @@ where
136140
return;
137141
}
138142

139-
let Some(last_commit) = self.tracked.last_commit() else {
143+
let Some(last_commit) = self.last_commit.as_ref() else {
140144
event_tracker.drop_all(txs.into_iter(), EthTxPoolDropReason::PoolNotReady);
141145
return;
142146
};
@@ -202,7 +206,7 @@ where
202206

203207
let Some(tx) = self
204208
.tracked
205-
.try_insert_tx(event_tracker, tx)
209+
.try_insert_tx(event_tracker, last_commit, tx)
206210
.unwrap_or_else(|tx| {
207211
self.pending
208212
.try_insert_tx(event_tracker, tx, last_commit_base_fee)
@@ -216,6 +220,7 @@ where
216220

217221
if !self.tracked.try_promote_pending(
218222
event_tracker,
223+
last_commit,
219224
block_policy,
220225
state_backend,
221226
&mut self.pending,
@@ -235,8 +240,14 @@ where
235240
block_policy: &EthBlockPolicy<ST, SCT, CCT, CRT>,
236241
state_backend: &SBT,
237242
) {
243+
let Some(last_commit) = self.last_commit.as_ref() else {
244+
warn!("txpool promote_pending called before last committed block set");
245+
return;
246+
};
247+
238248
if !self.tracked.try_promote_pending(
239249
event_tracker,
250+
last_commit,
240251
block_policy,
241252
state_backend,
242253
&mut self.pending,
@@ -323,21 +334,42 @@ where
323334
.map(|tx| tx.length() as u64)
324335
.sum();
325336

326-
let user_transactions = self.tracked.create_proposal(
327-
event_tracker,
328-
self.chain_id,
329-
proposed_seq_num,
330-
base_fee,
331-
tx_limit - system_transactions.len(),
332-
proposal_gas_limit,
333-
proposal_byte_limit - system_txs_size,
334-
block_policy,
335-
extending_blocks.iter().collect(),
336-
state_backend,
337-
chain_config,
338-
&self.chain_revision,
339-
&self.execution_revision,
340-
)?;
337+
let user_transactions = if let Some(last_commit) = self.last_commit.as_ref() {
338+
let last_commit_seq_num = last_commit.seq_num;
339+
340+
assert!(
341+
block_policy.get_last_commit().ge(&last_commit_seq_num),
342+
"txpool received block policy with lower committed seq num"
343+
);
344+
345+
if last_commit_seq_num == block_policy.get_last_commit() {
346+
self.tracked.create_proposal(
347+
event_tracker,
348+
self.chain_id,
349+
proposed_seq_num,
350+
base_fee,
351+
tx_limit - system_transactions.len(),
352+
proposal_gas_limit,
353+
proposal_byte_limit - system_txs_size,
354+
block_policy,
355+
extending_blocks.iter().collect(),
356+
state_backend,
357+
chain_config,
358+
&self.chain_revision,
359+
&self.execution_revision,
360+
)?
361+
} else {
362+
error!(
363+
block_policy_last_commit = block_policy.get_last_commit().0,
364+
txpool_last_commit = last_commit_seq_num.0,
365+
"last commit update does not match block policy last commit"
366+
);
367+
Vec::default()
368+
}
369+
} else {
370+
error!("txpool create_proposal called before last committed block set");
371+
Vec::default()
372+
};
341373

342374
let body = EthBlockBody {
343375
transactions: system_transactions
@@ -423,13 +455,22 @@ where
423455
chain_config: &impl ChainConfig<CRT>,
424456
committed_block: EthValidatedBlock<ST, SCT>,
425457
) {
426-
let execution_revision = chain_config
427-
.get_execution_chain_revision(committed_block.header().execution_inputs.timestamp);
458+
{
459+
let seqnum = committed_block.get_seq_num();
460+
debug!(?seqnum, "txpool updating committed block");
461+
}
428462

429-
self.tracked
430-
.update_committed_block(event_tracker, committed_block, &mut self.pending);
463+
if let Some(last_commit) = self.last_commit.as_ref() {
464+
assert_eq!(
465+
committed_block.get_seq_num(),
466+
last_commit.seq_num + SeqNum(1),
467+
"txpool received out of order committed block"
468+
);
469+
}
470+
self.last_commit = Some(committed_block.header().clone());
431471

432-
self.tracked.evict_expired_txs(event_tracker);
472+
let execution_revision = chain_config
473+
.get_execution_chain_revision(committed_block.header().execution_inputs.timestamp);
433474

434475
if self.execution_revision != execution_revision {
435476
self.execution_revision = execution_revision;
@@ -438,6 +479,14 @@ where
438479
self.static_validate_all_txs(event_tracker);
439480
}
440481

482+
self.tracked.update_committed_nonce_usages(
483+
event_tracker,
484+
committed_block.nonce_usages,
485+
&mut self.pending,
486+
);
487+
488+
self.tracked.evict_expired_txs(event_tracker);
489+
441490
self.update_aggregate_metrics(event_tracker);
442491
}
443492

@@ -447,6 +496,10 @@ where
447496
chain_config: &impl ChainConfig<CRT>,
448497
last_delay_committed_blocks: Vec<EthValidatedBlock<ST, SCT>>,
449498
) {
499+
self.last_commit = last_delay_committed_blocks
500+
.last()
501+
.map(|block| block.header().clone());
502+
450503
let execution_revision = chain_config.get_execution_chain_revision(
451504
last_delay_committed_blocks
452505
.last()
@@ -455,15 +508,15 @@ where
455508
}),
456509
);
457510

458-
self.tracked.reset(last_delay_committed_blocks);
459-
460511
if self.execution_revision != execution_revision {
461512
self.execution_revision = execution_revision;
462513
info!(execution_revision =? self.execution_revision, "updating execution revision");
463514

464515
self.static_validate_all_txs(event_tracker);
465516
}
466517

518+
self.tracked.reset();
519+
467520
self.update_aggregate_metrics(event_tracker);
468521
}
469522

@@ -485,7 +538,7 @@ where
485538
pub fn get_forwardable_txs<const MIN_SEQNUM_DIFF: u64, const MAX_RETRIES: usize>(
486539
&mut self,
487540
) -> Option<impl Iterator<Item = &TxEnvelope>> {
488-
let last_commit = self.tracked.last_commit()?;
541+
let last_commit = self.last_commit.as_ref()?;
489542

490543
let last_commit_seq_num = last_commit.seq_num;
491544
let last_commit_base_fee = last_commit.execution_inputs.base_fee_per_gas;

monad-eth-txpool/src/pool/tracked/mod.rs

Lines changed: 14 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use monad_crypto::certificate_signature::{
2929
CertificateSignaturePubKey, CertificateSignatureRecoverable,
3030
};
3131
use monad_eth_block_policy::{
32-
nonce_usage::{NonceUsage, NonceUsageRetrievable},
32+
nonce_usage::{NonceUsage, NonceUsageMap},
3333
EthBlockPolicy, EthBlockPolicyBlockValidator, EthValidatedBlock,
3434
};
3535
use monad_eth_txpool_types::{EthTxPoolDropReason, EthTxPoolInternalDropReason};
@@ -70,15 +70,14 @@ where
7070
SCT: SignatureCollection<NodeIdPubKey = CertificateSignaturePubKey<ST>>,
7171
SBT: StateBackend<ST, SCT>,
7272
{
73-
last_commit: Option<ConsensusBlockHeader<ST, SCT, EthExecutionProtocol>>,
74-
soft_tx_expiry: Duration,
75-
hard_tx_expiry: Duration,
76-
7773
// By using IndexMap, we can iterate through the map with Vec-like performance and are able to
7874
// evict expired txs through the entry API.
7975
txs: IndexMap<Address, TrackedTxList>,
8076

81-
_phantom: PhantomData<(SBT, CCT, CRT)>,
77+
soft_tx_expiry: Duration,
78+
hard_tx_expiry: Duration,
79+
80+
_phantom: PhantomData<(ST, SCT, SBT, CCT, CRT)>,
8281
}
8382

8483
impl<ST, SCT, SBT, CCT, CRT> TrackedTxMap<ST, SCT, SBT, CCT, CRT>
@@ -91,20 +90,15 @@ where
9190
{
9291
pub fn new(soft_tx_expiry: Duration, hard_tx_expiry: Duration) -> Self {
9392
Self {
94-
last_commit: None,
93+
txs: IndexMap::with_capacity(MAX_ADDRESSES),
94+
9595
soft_tx_expiry,
9696
hard_tx_expiry,
9797

98-
txs: IndexMap::with_capacity(MAX_ADDRESSES),
99-
10098
_phantom: PhantomData,
10199
}
102100
}
103101

104-
pub fn last_commit(&self) -> Option<&ConsensusBlockHeader<ST, SCT, EthExecutionProtocol>> {
105-
self.last_commit.as_ref()
106-
}
107-
108102
pub fn is_empty(&self) -> bool {
109103
self.txs.is_empty()
110104
}
@@ -131,12 +125,9 @@ where
131125
pub fn try_insert_tx(
132126
&mut self,
133127
event_tracker: &mut EthTxPoolEventTracker<'_>,
128+
last_commit: &ConsensusBlockHeader<ST, SCT, EthExecutionProtocol>,
134129
tx: ValidEthTransaction,
135130
) -> Result<Option<&ValidEthTransaction>, ValidEthTransaction> {
136-
let Some(last_commit) = self.last_commit.as_ref() else {
137-
return Err(tx);
138-
};
139-
140131
let Some(tx_list) = self.txs.get_mut(tx.signer_ref()) else {
141132
return Err(tx);
142133
};
@@ -165,26 +156,6 @@ where
165156
chain_revision: &CRT,
166157
execution_revision: &MonadExecutionRevision,
167158
) -> Result<Vec<Recovered<TxEnvelope>>, BlockPolicyError> {
168-
let Some(last_commit) = &self.last_commit else {
169-
return Ok(Vec::new());
170-
};
171-
let last_commit_seq_num = last_commit.seq_num;
172-
173-
assert!(
174-
block_policy.get_last_commit().ge(&last_commit_seq_num),
175-
"txpool received block policy with lower committed seq num"
176-
);
177-
178-
if last_commit_seq_num != block_policy.get_last_commit() {
179-
error!(
180-
block_policy_last_commit = block_policy.get_last_commit().0,
181-
txpool_last_commit = last_commit_seq_num.0,
182-
"last commit update does not match block policy last commit"
183-
);
184-
185-
return Ok(Vec::new());
186-
}
187-
188159
let _timer = DropTimer::start(Duration::ZERO, |elapsed| {
189160
debug!(?elapsed, "txpool create_proposal");
190161
});
@@ -196,7 +167,7 @@ where
196167
let sequencer = ProposalSequencer::new(&self.txs, &extending_blocks, base_fee, tx_limit);
197168
let sequencer_len = sequencer.len();
198169

199-
let (mut account_balances, state_backend_lookups) = {
170+
let (account_balances, state_backend_lookups) = {
200171
let _timer = DropTimer::start(Duration::ZERO, |elapsed| {
201172
debug!(
202173
?elapsed,
@@ -267,6 +238,7 @@ where
267238
pub fn try_promote_pending(
268239
&mut self,
269240
event_tracker: &mut EthTxPoolEventTracker<'_>,
241+
last_commit: &ConsensusBlockHeader<ST, SCT, EthExecutionProtocol>,
270242
block_policy: &EthBlockPolicy<ST, SCT, CCT, CRT>,
271243
state_backend: &SBT,
272244
pending: &mut PendingTxMap,
@@ -293,18 +265,6 @@ where
293265
return true;
294266
}
295267

296-
let Some(last_commit) = &self.last_commit else {
297-
warn!("txpool attempted to promote pending before first committed block");
298-
event_tracker.drop_all(
299-
to_insert
300-
.into_values()
301-
.map(PendingTxList::into_map)
302-
.flat_map(BTreeMap::into_values)
303-
.map(ValidEthTransaction::into_raw),
304-
EthTxPoolDropReason::Internal(EthTxPoolInternalDropReason::NotReady),
305-
);
306-
return false;
307-
};
308268
let last_commit_seq_num = last_commit.seq_num;
309269

310270
let addresses = to_insert.len();
@@ -373,29 +333,15 @@ where
373333
true
374334
}
375335

376-
pub fn update_committed_block(
336+
pub fn update_committed_nonce_usages(
377337
&mut self,
378338
event_tracker: &mut EthTxPoolEventTracker<'_>,
379-
committed_block: EthValidatedBlock<ST, SCT>,
339+
nonce_usages: NonceUsageMap,
380340
pending: &mut PendingTxMap,
381341
) {
382-
{
383-
let seqnum = committed_block.get_seq_num();
384-
debug!(?seqnum, "txpool updating committed block");
385-
}
386-
387-
if let Some(last_commit) = &self.last_commit {
388-
assert_eq!(
389-
committed_block.get_seq_num(),
390-
last_commit.seq_num + SeqNum(1),
391-
"txpool received out of order committed block"
392-
);
393-
}
394-
self.last_commit = Some(committed_block.header().clone());
395-
396342
let mut insertable = MAX_ADDRESSES.saturating_sub(self.txs.len());
397343

398-
for (address, nonce_usage) in committed_block.get_nonce_usages().into_map() {
344+
for (address, nonce_usage) in nonce_usages.into_map() {
399345
match self.txs.entry(address) {
400346
IndexMapEntry::Occupied(tx_list) => {
401347
TrackedTxList::update_committed_nonce_usage(event_tracker, tx_list, nonce_usage)
@@ -461,11 +407,8 @@ where
461407
}
462408
}
463409

464-
pub fn reset(&mut self, last_delay_committed_blocks: Vec<EthValidatedBlock<ST, SCT>>) {
410+
pub fn reset(&mut self) {
465411
self.txs.clear();
466-
self.last_commit = last_delay_committed_blocks
467-
.last()
468-
.map(|block| block.header().clone())
469412
}
470413

471414
pub fn static_validate_all_txs(

0 commit comments

Comments
 (0)