From f90de1ada10e269c5e4078e203c8a9814c161d68 Mon Sep 17 00:00:00 2001 From: AurelienFT Date: Thu, 16 Jan 2025 11:37:40 +0100 Subject: [PATCH 1/7] Update `height_expiration_txs` to not grow indefinitely --- crates/services/txpool_v2/src/pool.rs | 2 +- crates/services/txpool_v2/src/service.rs | 30 ++++++++++++++++--- .../services/txpool_v2/src/service/pruner.rs | 3 +- 3 files changed, 29 insertions(+), 6 deletions(-) diff --git a/crates/services/txpool_v2/src/pool.rs b/crates/services/txpool_v2/src/pool.rs index de80ded6b7e..67ac3c1670c 100644 --- a/crates/services/txpool_v2/src/pool.rs +++ b/crates/services/txpool_v2/src/pool.rs @@ -525,7 +525,7 @@ where /// Remove transaction and its dependents. pub fn remove_transaction_and_dependents( &mut self, - tx_ids: Vec, + tx_ids: impl Iterator, ) -> Vec { let mut removed_transactions = vec![]; for tx_id in tx_ids { diff --git a/crates/services/txpool_v2/src/service.rs b/crates/services/txpool_v2/src/service.rs index 5ae046c757e..fba2754218a 100644 --- a/crates/services/txpool_v2/src/service.rs +++ b/crates/services/txpool_v2/src/service.rs @@ -345,12 +345,24 @@ where let expired_txs = height_expiration_txs.remove(&height); if let Some(expired_txs) = expired_txs { let mut tx_pool = self.pool.write(); - removed_txs - .extend(tx_pool.remove_transaction_and_dependents(expired_txs)); + removed_txs.extend( + tx_pool + .remove_transaction_and_dependents(expired_txs.into_iter()), + ); } } } for tx in removed_txs { + { + let mut height_expiration_txs = self.pruner.height_expiration_txs.write(); + let expiration = tx.expiration(); + if expiration < u32::MAX.into() { + if let Some(expired_txs) = height_expiration_txs.get_mut(&expiration) + { + expired_txs.remove(&tx.id()); + } + } + } self.shared_state .tx_status_sender .send_squeezed_out(tx.id(), Error::Removed(RemovedReason::Ttl)); @@ -486,7 +498,7 @@ where if expiration < u32::MAX.into() { let mut lock = height_expiration_txs.write(); let block_height_expiration = lock.entry(expiration).or_default(); - block_height_expiration.push(tx_id); + block_height_expiration.insert(tx_id); } let duration = submitted_time @@ -662,10 +674,20 @@ where let removed; { let mut pool = self.pool.write(); - removed = pool.remove_transaction_and_dependents(txs_to_remove); + removed = pool.remove_transaction_and_dependents(txs_to_remove.into_iter()); } for tx in removed { + { + let mut height_expiration_txs = self.pruner.height_expiration_txs.write(); + let expiration = tx.expiration(); + if expiration < u32::MAX.into() { + if let Some(expired_txs) = height_expiration_txs.get_mut(&expiration) + { + expired_txs.remove(&tx.id()); + } + } + } self.shared_state .tx_status_sender .send_squeezed_out(tx.id(), Error::Removed(RemovedReason::Ttl)); diff --git a/crates/services/txpool_v2/src/service/pruner.rs b/crates/services/txpool_v2/src/service/pruner.rs index 7551afa31f3..a7c76429f59 100644 --- a/crates/services/txpool_v2/src/service/pruner.rs +++ b/crates/services/txpool_v2/src/service/pruner.rs @@ -6,6 +6,7 @@ use fuel_core_types::{ use std::{ collections::{ BTreeMap, + HashSet, VecDeque, }, time::SystemTime, @@ -13,7 +14,7 @@ use std::{ pub(super) struct TransactionPruner { pub time_txs_submitted: Shared>, - pub height_expiration_txs: Shared>>, + pub height_expiration_txs: Shared>>, pub ttl_timer: tokio::time::Interval, pub txs_ttl: tokio::time::Duration, } From b38de2075af213cdbfa3c6c8c1eaac7e601a4422 Mon Sep 17 00:00:00 2001 From: AurelienFT Date: Thu, 16 Jan 2025 15:02:13 +0100 Subject: [PATCH 2/7] Add changelog and change lock times --- CHANGELOG.md | 1 + crates/services/txpool_v2/src/service.rs | 25 +++++++++++------------- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0513c5eb1e0..216bd70fcfb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Added - [2551](https://github.com/FuelLabs/fuel-core/pull/2551): Enhanced the DA compressed block header to include block id. +- [2579](https://github.com/FuelLabs/fuel-core/pull/2579): Clear expiration txs cache in transaction pool based on inserted transactions ## [Version 0.41.0] diff --git a/crates/services/txpool_v2/src/service.rs b/crates/services/txpool_v2/src/service.rs index fba2754218a..7eeb788393c 100644 --- a/crates/services/txpool_v2/src/service.rs +++ b/crates/services/txpool_v2/src/service.rs @@ -352,15 +352,12 @@ where } } } + let mut height_expiration_txs = self.pruner.height_expiration_txs.write(); for tx in removed_txs { - { - let mut height_expiration_txs = self.pruner.height_expiration_txs.write(); - let expiration = tx.expiration(); - if expiration < u32::MAX.into() { - if let Some(expired_txs) = height_expiration_txs.get_mut(&expiration) - { - expired_txs.remove(&tx.id()); - } + let expiration = tx.expiration(); + if expiration < u32::MAX.into() { + if let Some(expired_txs) = height_expiration_txs.get_mut(&expiration) { + expired_txs.remove(&tx.id()); } } self.shared_state @@ -677,9 +674,9 @@ where removed = pool.remove_transaction_and_dependents(txs_to_remove.into_iter()); } - for tx in removed { - { - let mut height_expiration_txs = self.pruner.height_expiration_txs.write(); + { + let mut height_expiration_txs = self.pruner.height_expiration_txs.write(); + for tx in removed { let expiration = tx.expiration(); if expiration < u32::MAX.into() { if let Some(expired_txs) = height_expiration_txs.get_mut(&expiration) @@ -687,10 +684,10 @@ where expired_txs.remove(&tx.id()); } } + self.shared_state + .tx_status_sender + .send_squeezed_out(tx.id(), Error::Removed(RemovedReason::Ttl)); } - self.shared_state - .tx_status_sender - .send_squeezed_out(tx.id(), Error::Removed(RemovedReason::Ttl)); } { From 293ef6f0ef1bbdf8daaf61f405fcadb26c4f57de Mon Sep 17 00:00:00 2001 From: AurelienFT Date: Mon, 20 Jan 2025 09:47:03 +0100 Subject: [PATCH 3/7] fix logic deletion --- crates/services/txpool_v2/src/pool.rs | 7 +++++- crates/services/txpool_v2/src/service.rs | 31 +++++++++++++++--------- 2 files changed, 26 insertions(+), 12 deletions(-) diff --git a/crates/services/txpool_v2/src/pool.rs b/crates/services/txpool_v2/src/pool.rs index 67ac3c1670c..b93414b9121 100644 --- a/crates/services/txpool_v2/src/pool.rs +++ b/crates/services/txpool_v2/src/pool.rs @@ -354,7 +354,8 @@ where /// Remove transaction but keep its dependents. /// The dependents become executables. - pub fn remove_transaction(&mut self, tx_ids: Vec) { + pub fn remove_transactions(&mut self, tx_ids: Vec) -> Vec> { + let mut removed_transactions = Vec::with_capacity(tx_ids.len()); for tx_id in tx_ids { if let Some(storage_id) = self.tx_id_to_storage_id.remove(&tx_id) { let dependents: Vec = @@ -383,10 +384,14 @@ where .new_executable_transaction(dependent, storage_data); } self.update_components_and_caches_on_removal(iter::once(&transaction)); + removed_transactions.push(Some(transaction.transaction)); + } else { + removed_transactions.push(None); } } self.update_stats(); + removed_transactions } /// Check if the pool has enough space to store a transaction. diff --git a/crates/services/txpool_v2/src/service.rs b/crates/services/txpool_v2/src/service.rs index 7eeb788393c..2c2ce228cee 100644 --- a/crates/services/txpool_v2/src/service.rs +++ b/crates/services/txpool_v2/src/service.rs @@ -315,16 +315,32 @@ where { fn import_block(&mut self, result: SharedImportResult) { let new_height = *result.sealed_block.entity.header().height(); - let executed_transaction = result.tx_status.iter().map(|s| s.id).collect(); + let executed_transactions = result.tx_status.iter().map(|s| s.id).collect(); // We don't want block importer way for us to process the result. drop(result); - { + let removed_transactions = { let mut tx_pool = self.pool.write(); - tx_pool.remove_transaction(executed_transaction); + let removed_transactions = tx_pool.remove_transactions(executed_transactions); if !tx_pool.is_empty() { self.shared_state.new_txs_notifier.send_replace(()); } + removed_transactions + }; + if !removed_transactions.is_empty() { + let mut height_expiration_txs = self.pruner.height_expiration_txs.write(); + for tx in removed_transactions { + if let Some(tx) = tx { + let expiration = tx.expiration(); + if expiration < u32::MAX.into() { + if let Some(expired_txs) = + height_expiration_txs.get_mut(&expiration) + { + expired_txs.remove(&tx.id()); + } + } + } + } } { @@ -352,14 +368,7 @@ where } } } - let mut height_expiration_txs = self.pruner.height_expiration_txs.write(); for tx in removed_txs { - let expiration = tx.expiration(); - if expiration < u32::MAX.into() { - if let Some(expired_txs) = height_expiration_txs.get_mut(&expiration) { - expired_txs.remove(&tx.id()); - } - } self.shared_state .tx_status_sender .send_squeezed_out(tx.id(), Error::Removed(RemovedReason::Ttl)); @@ -674,7 +683,7 @@ where removed = pool.remove_transaction_and_dependents(txs_to_remove.into_iter()); } - { + if !removed.is_empty() { let mut height_expiration_txs = self.pruner.height_expiration_txs.write(); for tx in removed { let expiration = tx.expiration(); From 396701e6e735467bd28e690e79a48ec1202996b9 Mon Sep 17 00:00:00 2001 From: AurelienFT Date: Mon, 20 Jan 2025 10:37:40 +0100 Subject: [PATCH 4/7] clippy --- crates/services/txpool_v2/src/service.rs | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/crates/services/txpool_v2/src/service.rs b/crates/services/txpool_v2/src/service.rs index 2c2ce228cee..cea5cf4aa1a 100644 --- a/crates/services/txpool_v2/src/service.rs +++ b/crates/services/txpool_v2/src/service.rs @@ -329,15 +329,12 @@ where }; if !removed_transactions.is_empty() { let mut height_expiration_txs = self.pruner.height_expiration_txs.write(); - for tx in removed_transactions { - if let Some(tx) = tx { - let expiration = tx.expiration(); - if expiration < u32::MAX.into() { - if let Some(expired_txs) = - height_expiration_txs.get_mut(&expiration) - { - expired_txs.remove(&tx.id()); - } + for tx in removed_transactions.into_iter().flatten() { + let expiration = tx.expiration(); + if expiration < u32::MAX.into() { + if let Some(expired_txs) = height_expiration_txs.get_mut(&expiration) + { + expired_txs.remove(&tx.id()); } } } From 3f8b6acfb9db7a2df436f5ba1637a57310462eb8 Mon Sep 17 00:00:00 2001 From: AurelienFT Date: Mon, 20 Jan 2025 16:52:51 +0100 Subject: [PATCH 5/7] Remove option from `remove_transactions` --- crates/services/txpool_v2/src/pool.rs | 6 ++---- crates/services/txpool_v2/src/service.rs | 2 +- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/crates/services/txpool_v2/src/pool.rs b/crates/services/txpool_v2/src/pool.rs index b93414b9121..cbd886d3141 100644 --- a/crates/services/txpool_v2/src/pool.rs +++ b/crates/services/txpool_v2/src/pool.rs @@ -354,7 +354,7 @@ where /// Remove transaction but keep its dependents. /// The dependents become executables. - pub fn remove_transactions(&mut self, tx_ids: Vec) -> Vec> { + pub fn remove_transactions(&mut self, tx_ids: Vec) -> Vec { let mut removed_transactions = Vec::with_capacity(tx_ids.len()); for tx_id in tx_ids { if let Some(storage_id) = self.tx_id_to_storage_id.remove(&tx_id) { @@ -384,9 +384,7 @@ where .new_executable_transaction(dependent, storage_data); } self.update_components_and_caches_on_removal(iter::once(&transaction)); - removed_transactions.push(Some(transaction.transaction)); - } else { - removed_transactions.push(None); + removed_transactions.push(transaction.transaction); } } diff --git a/crates/services/txpool_v2/src/service.rs b/crates/services/txpool_v2/src/service.rs index cea5cf4aa1a..b1a9446fdc3 100644 --- a/crates/services/txpool_v2/src/service.rs +++ b/crates/services/txpool_v2/src/service.rs @@ -329,7 +329,7 @@ where }; if !removed_transactions.is_empty() { let mut height_expiration_txs = self.pruner.height_expiration_txs.write(); - for tx in removed_transactions.into_iter().flatten() { + for tx in removed_transactions.into_iter() { let expiration = tx.expiration(); if expiration < u32::MAX.into() { if let Some(expired_txs) = height_expiration_txs.get_mut(&expiration) From ff1ce24ec9577c497f6ebd10e5c97744eee0a876 Mon Sep 17 00:00:00 2001 From: AurelienFT Date: Mon, 27 Jan 2025 17:25:27 +0100 Subject: [PATCH 6/7] use iterator in remove_transactions --- crates/services/txpool_v2/src/pool.rs | 7 +++++-- crates/services/txpool_v2/src/service.rs | 6 ++++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/crates/services/txpool_v2/src/pool.rs b/crates/services/txpool_v2/src/pool.rs index cbd886d3141..81a8143423a 100644 --- a/crates/services/txpool_v2/src/pool.rs +++ b/crates/services/txpool_v2/src/pool.rs @@ -354,8 +354,11 @@ where /// Remove transaction but keep its dependents. /// The dependents become executables. - pub fn remove_transactions(&mut self, tx_ids: Vec) -> Vec { - let mut removed_transactions = Vec::with_capacity(tx_ids.len()); + pub fn remove_transactions( + &mut self, + tx_ids: impl Iterator, + ) -> Vec { + let mut removed_transactions = Vec::with_capacity(tx_ids.size_hint().0); for tx_id in tx_ids { if let Some(storage_id) = self.tx_id_to_storage_id.remove(&tx_id) { let dependents: Vec = diff --git a/crates/services/txpool_v2/src/service.rs b/crates/services/txpool_v2/src/service.rs index b1a9446fdc3..2c0f3fbf06d 100644 --- a/crates/services/txpool_v2/src/service.rs +++ b/crates/services/txpool_v2/src/service.rs @@ -315,13 +315,15 @@ where { fn import_block(&mut self, result: SharedImportResult) { let new_height = *result.sealed_block.entity.header().height(); - let executed_transactions = result.tx_status.iter().map(|s| s.id).collect(); + let executed_transactions: Vec = + result.tx_status.iter().map(|s| s.id).collect(); // We don't want block importer way for us to process the result. drop(result); let removed_transactions = { let mut tx_pool = self.pool.write(); - let removed_transactions = tx_pool.remove_transactions(executed_transactions); + let removed_transactions = + tx_pool.remove_transactions(executed_transactions.into_iter()); if !tx_pool.is_empty() { self.shared_state.new_txs_notifier.send_replace(()); } From b976f140b0a3f53afe7de1404e88aca1f5223483 Mon Sep 17 00:00:00 2001 From: AurelienFT Date: Thu, 20 Feb 2025 00:31:14 +0100 Subject: [PATCH 7/7] Add changelog --- .changes/added/2579.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 .changes/added/2579.md diff --git a/.changes/added/2579.md b/.changes/added/2579.md new file mode 100644 index 00000000000..adc3a5d4c23 --- /dev/null +++ b/.changes/added/2579.md @@ -0,0 +1 @@ +Clear expiration txs cache in transaction pool based on inserted transactions \ No newline at end of file