Skip to content
2 changes: 1 addition & 1 deletion crates/services/txpool_v2/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ where
/// Remove transaction and its dependents.
pub fn remove_transaction_and_dependents(
&mut self,
tx_ids: Vec<TxId>,
tx_ids: impl Iterator<Item = TxId>,
) -> Vec<ArcPoolTx> {
let mut removed_transactions = vec![];
for tx_id in tx_ids {
Expand Down
30 changes: 26 additions & 4 deletions crates/services/txpool_v2/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,12 +345,24 @@ where
let expired_txs = height_expiration_txs.remove(&height);
if let Some(expired_txs) = expired_txs {
let mut tx_pool = self.pool.write();
removed_txs
.extend(tx_pool.remove_transaction_and_dependents(expired_txs));
removed_txs.extend(
tx_pool
.remove_transaction_and_dependents(expired_txs.into_iter()),
);
}
}
}
for tx in removed_txs {
{
let mut height_expiration_txs = self.pruner.height_expiration_txs.write();
let expiration = tx.expiration();
if expiration < u32::MAX.into() {
if let Some(expired_txs) = height_expiration_txs.get_mut(&expiration)
{
expired_txs.remove(&tx.id());
}
}
}
self.shared_state
.tx_status_sender
.send_squeezed_out(tx.id(), Error::Removed(RemovedReason::Ttl));
Expand Down Expand Up @@ -486,7 +498,7 @@ where
if expiration < u32::MAX.into() {
let mut lock = height_expiration_txs.write();
let block_height_expiration = lock.entry(expiration).or_default();
block_height_expiration.push(tx_id);
block_height_expiration.insert(tx_id);
}

let duration = submitted_time
Expand Down Expand Up @@ -662,10 +674,20 @@ where
let removed;
{
let mut pool = self.pool.write();
removed = pool.remove_transaction_and_dependents(txs_to_remove);
removed = pool.remove_transaction_and_dependents(txs_to_remove.into_iter());
}

for tx in removed {
{
let mut height_expiration_txs = self.pruner.height_expiration_txs.write();
let expiration = tx.expiration();
if expiration < u32::MAX.into() {
if let Some(expired_txs) = height_expiration_txs.get_mut(&expiration)
{
expired_txs.remove(&tx.id());
}
}
}
self.shared_state
.tx_status_sender
.send_squeezed_out(tx.id(), Error::Removed(RemovedReason::Ttl));
Expand Down
3 changes: 2 additions & 1 deletion crates/services/txpool_v2/src/service/pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ use fuel_core_types::{
use std::{
collections::{
BTreeMap,
HashSet,
VecDeque,
},
time::SystemTime,
};

pub(super) struct TransactionPruner {
pub time_txs_submitted: Shared<VecDeque<(SystemTime, TxId)>>,
pub height_expiration_txs: Shared<BTreeMap<BlockHeight, Vec<TxId>>>,
pub height_expiration_txs: Shared<BTreeMap<BlockHeight, HashSet<TxId>>>,
pub ttl_timer: tokio::time::Interval,
pub txs_ttl: tokio::time::Duration,
}
Loading