Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 0 additions & 20 deletions monad-eth-txpool-executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ mod metrics;
mod preload;
mod reset;

const PROMOTE_PENDING_INTERVAL_MS: u64 = 2;

pub struct EthTxPoolExecutor<ST, SCT, SBT, CCT, CRT>
where
ST: CertificateSignatureRecoverable,
Expand All @@ -83,7 +81,6 @@ where

forwarding_manager: Pin<Box<EthTxPoolForwardingManager>>,
preload_manager: Pin<Box<EthTxPoolPreloadManager>>,
promote_pending_timer: tokio::time::Interval,

metrics: Arc<EthTxPoolExecutorMetrics>,
executor_metrics: ExecutorMetrics,
Expand Down Expand Up @@ -138,11 +135,6 @@ where
{
let metrics = metrics.clone();

let mut promote_pending_timer =
tokio::time::interval(Duration::from_millis(PROMOTE_PENDING_INTERVAL_MS));
promote_pending_timer
.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);

move |command_rx, event_tx| {
let pool = EthTxPool::new(
soft_tx_expiry,
Expand All @@ -166,7 +158,6 @@ where

forwarding_manager: Box::pin(EthTxPoolForwardingManager::default()),
preload_manager: Box::pin(EthTxPoolPreloadManager::default()),
promote_pending_timer,

metrics,
executor_metrics,
Expand Down Expand Up @@ -493,7 +484,6 @@ where

forwarding_manager,
preload_manager,
promote_pending_timer,

metrics,
executor_metrics,
Expand Down Expand Up @@ -619,16 +609,6 @@ where
forwarding_manager.as_mut().complete_ingress();
}

while promote_pending_timer.poll_tick(cx).is_ready() {
pool.promote_pending(
&mut EthTxPoolEventTracker::new(&metrics.pool, &mut ipc_events),
block_policy,
state_backend,
);

promote_pending_timer.reset();
}

while let Poll::Ready((predicted_proposal_seqnum, addresses)) =
preload_manager.as_mut().poll_requests(cx)
{
Expand Down
6 changes: 2 additions & 4 deletions monad-eth-txpool-executor/tests/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,9 @@ async fn setup_txpool_executor_with_client() -> (
)],
}]);

let (ipc_client, EthTxPoolSnapshot { pending, tracked }) =
EthTxPoolIpcClient::new(bind_path).await.unwrap();
let (ipc_client, EthTxPoolSnapshot { txs }) = EthTxPoolIpcClient::new(bind_path).await.unwrap();

assert!(pending.is_empty());
assert!(tracked.is_empty());
assert!(txs.is_empty());

(txpool_executor, ipc_client)
}
Expand Down
11 changes: 3 additions & 8 deletions monad-eth-txpool-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,8 @@ pub struct EthTxPoolEvent {

#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum EthTxPoolEventType {
/// The tx was inserted into the txpool's (pending/tracked) tx list.
Insert {
address: Address,
owned: bool,
tracked: bool,
},
/// The tx was inserted into the txpool.
Insert { address: Address, owned: bool },

/// The tx was committed and is thus finalized.
Commit,
Expand Down Expand Up @@ -118,6 +114,5 @@ pub enum EthTxPoolEvictReason {

#[derive(Debug, Serialize, Deserialize)]
pub struct EthTxPoolSnapshot {
pub pending: HashSet<TxHash>,
pub tracked: HashSet<TxHash>,
pub txs: HashSet<TxHash>,
}
148 changes: 3 additions & 145 deletions monad-eth-txpool/src/event_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl<'a> EthTxPoolEventTracker<'a> {
}
}

pub fn insert_pending(&mut self, tx: &Recovered<TxEnvelope>, owned: bool) {
pub fn insert(&mut self, tx: &Recovered<TxEnvelope>, owned: bool) {
if owned {
self.metrics.insert_owned_txs.fetch_add(1, Ordering::SeqCst);
} else {
Expand All @@ -57,64 +57,11 @@ impl<'a> EthTxPoolEventTracker<'a> {
EthTxPoolEventType::Insert {
address: tx.signer(),
owned,
tracked: false,
},
);
}

pub fn insert_tracked(&mut self, tx: &Recovered<TxEnvelope>, owned: bool) {
if owned {
self.metrics.insert_owned_txs.fetch_add(1, Ordering::SeqCst);
} else {
self.metrics
.insert_forwarded_txs
.fetch_add(1, Ordering::SeqCst);
}

self.events.insert(
*tx.tx_hash(),
EthTxPoolEventType::Insert {
address: tx.signer(),
owned,
tracked: true,
},
);
}

pub fn replace_pending(
&mut self,
address: &Address,
old_tx_hash: TxHash,
new_tx_hash: TxHash,
new_owned: bool,
) {
if new_owned {
self.metrics.insert_owned_txs.fetch_add(1, Ordering::SeqCst);
} else {
self.metrics
.insert_forwarded_txs
.fetch_add(1, Ordering::SeqCst);
}

self.events.insert(
old_tx_hash,
EthTxPoolEventType::Drop {
reason: EthTxPoolDropReason::ReplacedByHigherPriority {
replacement: new_tx_hash,
},
},
);
self.events.insert(
new_tx_hash,
EthTxPoolEventType::Insert {
address: *address,
owned: new_owned,
tracked: false,
},
);
}

pub fn replace_tracked(
pub fn replace(
&mut self,
address: &Address,
old_tx_hash: TxHash,
Expand Down Expand Up @@ -142,7 +89,6 @@ impl<'a> EthTxPoolEventTracker<'a> {
EthTxPoolEventType::Insert {
address: *address,
owned: new_owned,
tracked: true,
},
);
}
Expand Down Expand Up @@ -216,80 +162,6 @@ impl<'a> EthTxPoolEventTracker<'a> {
}
}

pub fn pending_promote<'b>(
&mut self,
txs: impl Iterator<Item = (bool, &'b Recovered<TxEnvelope>)>,
) {
self.metrics
.pending
.promote_addresses
.fetch_add(1, Ordering::SeqCst);

for (owned, tx) in txs {
self.metrics
.pending
.promote_txs
.fetch_add(1, Ordering::SeqCst);

self.events.insert(
*tx.tx_hash(),
EthTxPoolEventType::Insert {
address: tx.signer(),
owned,
tracked: true,
},
);
}
}

pub fn pending_drop_unknown(&mut self, tx_hashes: impl Iterator<Item = TxHash>) {
self.metrics
.pending
.drop_unknown_addresses
.fetch_add(1, Ordering::SeqCst);

for tx_hash in tx_hashes {
self.metrics
.pending
.drop_unknown_txs
.fetch_add(1, Ordering::SeqCst);

self.events.insert(
tx_hash,
EthTxPoolEventType::Drop {
reason: EthTxPoolDropReason::InsufficientBalance,
},
);
}
}

pub fn pending_drop_low_nonce(
&mut self,
address: bool,
tx_hashes: impl Iterator<Item = TxHash>,
) {
if address {
self.metrics
.pending
.drop_low_nonce_addresses
.fetch_add(1, Ordering::SeqCst);
}

for tx_hash in tx_hashes {
self.metrics
.pending
.drop_low_nonce_txs
.fetch_add(1, Ordering::SeqCst);

self.events.insert(
tx_hash,
EthTxPoolEventType::Drop {
reason: EthTxPoolDropReason::NonceTooLow,
},
);
}
}

pub fn tracked_commit(&mut self, address: bool, tx_hashes: impl Iterator<Item = TxHash>) {
if address {
self.metrics
Expand Down Expand Up @@ -335,21 +207,7 @@ impl<'a> EthTxPoolEventTracker<'a> {
}
}

pub fn update_aggregate_metrics(
&mut self,
pending_addresses: u64,
pending_txs: u64,
tracked_addresses: u64,
tracked_txs: u64,
) {
self.metrics
.pending
.addresses
.store(pending_addresses, Ordering::SeqCst);
self.metrics
.pending
.txs
.store(pending_txs, Ordering::SeqCst);
pub fn update_aggregate_metrics(&mut self, tracked_addresses: u64, tracked_txs: u64) {
self.metrics
.tracked
.addresses
Expand Down
33 changes: 0 additions & 33 deletions monad-eth-txpool/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ pub struct EthTxPoolMetrics {
pub create_proposal_available_addresses: AtomicU64,
pub create_proposal_backend_lookups: AtomicU64,

pub pending: EthTxpoolPendingMetrics,
pub tracked: EthTxPoolTrackedMetrics,
}

Expand Down Expand Up @@ -87,42 +86,10 @@ impl EthTxPoolMetrics {
metrics["monad.bft.txpool.pool.create_proposal_backend_lookups"] =
self.create_proposal_backend_lookups.load(Ordering::SeqCst);

self.pending.update(metrics);
self.tracked.update(metrics);
}
}

#[derive(Debug, Default, Serialize, Deserialize)]
pub struct EthTxpoolPendingMetrics {
pub addresses: AtomicU64,
pub txs: AtomicU64,
pub promote_addresses: AtomicU64,
pub promote_txs: AtomicU64,
pub drop_unknown_addresses: AtomicU64,
pub drop_unknown_txs: AtomicU64,
pub drop_low_nonce_addresses: AtomicU64,
pub drop_low_nonce_txs: AtomicU64,
}

impl EthTxpoolPendingMetrics {
pub fn update(&self, metrics: &mut ExecutorMetrics) {
metrics["monad.bft.txpool.pool.pending.addresses"] = self.addresses.load(Ordering::SeqCst);
metrics["monad.bft.txpool.pool.pending.txs"] = self.txs.load(Ordering::SeqCst);
metrics["monad.bft.txpool.pool.pending.promote_addresses"] =
self.promote_addresses.load(Ordering::SeqCst);
metrics["monad.bft.txpool.pool.pending.promote_txs"] =
self.promote_txs.load(Ordering::SeqCst);
metrics["monad.bft.txpool.pool.pending.drop_unknown_addresses"] =
self.drop_unknown_addresses.load(Ordering::SeqCst);
metrics["monad.bft.txpool.pool.pending.drop_unknown_txs"] =
self.drop_unknown_txs.load(Ordering::SeqCst);
metrics["monad.bft.txpool.pool.pending.drop_low_nonce_addresses"] =
self.drop_low_nonce_addresses.load(Ordering::SeqCst);
metrics["monad.bft.txpool.pool.pending.drop_low_nonce_txs"] =
self.drop_low_nonce_txs.load(Ordering::SeqCst);
}
}

#[derive(Debug, Default, Serialize, Deserialize)]
pub struct EthTxPoolTrackedMetrics {
pub addresses: AtomicU64,
Expand Down
Loading