Skip to content

Commit 63973cc

Browse files
fatxpool: fix: remove invalid txs from the dropped stream controller (#8923)
While testing mortal transaction I encountered exactly the same problem as in #8490. This PR should fix the problem. fixes: #8490 --------- Co-authored-by: cmd[bot] <41898282+github-actions[bot]@users.noreply.github.com>
1 parent 05ca227 commit 63973cc

File tree

8 files changed

+134
-8
lines changed

8 files changed

+134
-8
lines changed

prdoc/pr_8923.prdoc

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
title: '`fatxpool`: fix: remove invalid txs from the dropped stream controller'
2+
doc:
3+
- audience: Node Dev
4+
description: |-
5+
While testing mortal transaction I encountered exactly the same problem as in #8490.
6+
This PR should fix the problem.
7+
8+
fixes: #8490
9+
crates:
10+
- name: sc-transaction-pool
11+
bump: minor

substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -256,8 +256,12 @@ where
256256
let (dropped_stream_controller, dropped_stream) =
257257
MultiViewDroppedWatcherController::<ChainApi>::new();
258258

259-
let view_store =
260-
Arc::new(ViewStore::new(pool_api.clone(), listener, dropped_stream_controller));
259+
let view_store = Arc::new(ViewStore::new(
260+
pool_api.clone(),
261+
listener,
262+
dropped_stream_controller,
263+
import_notification_sink.clone(),
264+
));
261265

262266
let dropped_monitor_task = Self::dropped_monitor_task(
263267
dropped_stream,
@@ -404,8 +408,12 @@ where
404408
let (dropped_stream_controller, dropped_stream) =
405409
MultiViewDroppedWatcherController::<ChainApi>::new();
406410

407-
let view_store =
408-
Arc::new(ViewStore::new(pool_api.clone(), listener, dropped_stream_controller));
411+
let view_store = Arc::new(ViewStore::new(
412+
pool_api.clone(),
413+
listener,
414+
dropped_stream_controller,
415+
import_notification_sink.clone(),
416+
));
409417

410418
let dropped_monitor_task = Self::dropped_monitor_task(
411419
dropped_stream,
@@ -837,6 +845,13 @@ where
837845

838846
Ok(final_results)
839847
}
848+
849+
/// Number of notified items in import_notification_sink.
850+
///
851+
/// Internal detail, exposed only for testing.
852+
pub fn import_notification_sink_len(&self) -> usize {
853+
self.import_notification_sink.notified_items_len()
854+
}
840855
}
841856

842857
/// Converts the input view-to-statuses map into the output vector of statuses.

substrate/client/transaction-pool/src/fork_aware_txpool/import_notification_sink.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,13 @@ where
245245
already_notified_items.remove(i);
246246
});
247247
}
248+
249+
/// Lenght of the `already_notified_items` set.
250+
///
251+
/// Exposed for testing only.
252+
pub fn notified_items_len(&self) -> usize {
253+
self.already_notified_items.read().len()
254+
}
248255
}
249256

250257
#[cfg(test)]

substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -697,8 +697,18 @@ where
697697
let revalidated_invalid_hashes_len = revalidated_invalid_hashes.len();
698698
let invalid_hashes_subtrees_len = invalid_hashes_subtrees.len();
699699

700-
self.listener
701-
.transactions_invalidated(&invalid_hashes_subtrees.into_iter().collect::<Vec<_>>());
700+
let invalid_hashes_subtrees = invalid_hashes_subtrees.into_iter().collect::<Vec<_>>();
701+
702+
//note: here the consistency is assumed: it is expected that transaction will be
703+
// actually removed from the listener with Invalid event. This means assumption that no view
704+
// is referencing tx as ready.
705+
self.listener.transactions_invalidated(&invalid_hashes_subtrees);
706+
view_store
707+
.import_notification_sink
708+
.clean_notified_items(&invalid_hashes_subtrees);
709+
view_store
710+
.dropped_stream_controller
711+
.remove_transactions(invalid_hashes_subtrees);
702712

703713
trace!(
704714
target: LOG_TARGET,

substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
//! Transaction pool view store. Basically block hash to view map with some utility methods.
2020
2121
use super::{
22+
import_notification_sink::MultiViewImportNotificationSink,
2223
multi_view_listener::{MultiViewListener, TxStatusStream},
2324
view::{View, ViewPoolObserver},
2425
};
@@ -171,6 +172,10 @@ where
171172
pub(super) most_recent_view: RwLock<Option<Arc<View<ChainApi>>>>,
172173
/// The controller of multi view dropped stream.
173174
pub(super) dropped_stream_controller: MultiViewDroppedWatcherController<ChainApi>,
175+
/// Util providing an aggregated stream of transactions that were imported to ready queue in
176+
/// any view. Reference kept here for clean up purposes.
177+
pub(super) import_notification_sink:
178+
MultiViewImportNotificationSink<Block::Hash, ExtrinsicHash<ChainApi>>,
174179
/// The map used to synchronize replacement of transactions between maintain and dropped
175180
/// notifcication threads. It is meant to assure that replaced transaction is also removed from
176181
/// newly built views in maintain process.
@@ -202,6 +207,10 @@ where
202207
api: Arc<ChainApi>,
203208
listener: Arc<MultiViewListener<ChainApi>>,
204209
dropped_stream_controller: MultiViewDroppedWatcherController<ChainApi>,
210+
import_notification_sink: MultiViewImportNotificationSink<
211+
Block::Hash,
212+
ExtrinsicHash<ChainApi>,
213+
>,
205214
) -> Self {
206215
Self {
207216
api,
@@ -210,6 +219,7 @@ where
210219
listener,
211220
most_recent_view: RwLock::from(None),
212221
dropped_stream_controller,
222+
import_notification_sink,
213223
pending_txs_tasks: Default::default(),
214224
}
215225
}

substrate/client/transaction-pool/src/graph/validated_pool.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use std::{
3636
sync::Arc,
3737
time::{Duration, Instant},
3838
};
39-
use tracing::{trace, warn, Level};
39+
use tracing::{debug, trace, warn, Level};
4040

4141
use super::{
4242
base_pool::{self as base, PruneStatus},
@@ -722,6 +722,12 @@ impl<B: ChainApi, L: EventHandler<B>> ValidatedPool<B, L> {
722722
}
723723
hashes
724724
};
725+
debug!(
726+
target:LOG_TARGET,
727+
to_remove_len=to_remove.len(),
728+
futures_to_remove_len=futures_to_remove.len(),
729+
"clear_stale"
730+
);
725731
// removing old transactions
726732
self.remove_invalid(&to_remove);
727733
self.remove_invalid(&futures_to_remove);

substrate/client/transaction-pool/tests/fatp.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -484,6 +484,51 @@ fn fatp_linear_old_ready_becoming_stale() {
484484
}
485485
}
486486

487+
#[test]
488+
fn fatp_proper_cleanup_after_mortal_tx_becoming_invalid() {
489+
sp_tracing::try_init_simple();
490+
491+
let (pool, api, _) = pool();
492+
493+
let xts = vec![uxt(Alice, 200), uxt(Alice, 201), uxt(Alice, 202)];
494+
495+
api.set_valid_till(&xts[0], 66);
496+
api.set_valid_till(&xts[1], 66);
497+
api.set_valid_till(&xts[2], 66);
498+
499+
let header01 = api.push_block(1, vec![], true);
500+
let event = new_best_block_event(&pool, None, header01.hash());
501+
block_on(pool.maintain(event));
502+
503+
xts.into_iter().for_each(|xt| {
504+
block_on(pool.submit_one(invalid_hash(), SOURCE, xt)).unwrap();
505+
});
506+
assert_eq!(pool.status_all()[&header01.hash()].ready, 3);
507+
assert_eq!(pool.status_all()[&header01.hash()].future, 0);
508+
509+
// Import enough blocks to make our transactions stale (longevity is 64)
510+
let mut prev_header = header01;
511+
for n in 2..67 {
512+
let header = api.push_block_with_parent(prev_header.hash(), vec![], true);
513+
let event = new_best_block_event(&pool, Some(prev_header.hash()), header.hash());
514+
block_on(pool.maintain(event));
515+
516+
if n == 66 {
517+
assert_eq!(pool.status_all()[&header.hash()].ready, 0);
518+
assert_eq!(pool.status_all()[&header.hash()].future, 0);
519+
} else {
520+
assert_eq!(pool.status_all()[&header.hash()].ready, 3);
521+
assert_eq!(pool.status_all()[&header.hash()].future, 0);
522+
}
523+
prev_header = header;
524+
}
525+
526+
let header = api.push_block_with_parent(prev_header.hash(), vec![], true);
527+
let event = finalized_block_event(&pool, prev_header.hash(), header.hash());
528+
block_on(pool.maintain(event));
529+
assert_eq!(pool.import_notification_sink_len(), 0);
530+
}
531+
487532
#[test]
488533
fn fatp_fork_reorg() {
489534
sp_tracing::try_init_simple();

substrate/test-utils/runtime/transaction-pool/src/lib.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ pub struct ChainState {
8585
pub nonces: HashMap<Hash, HashMap<AccountId, u64>>,
8686
pub invalid_hashes: HashSet<Hash>,
8787
pub priorities: HashMap<Hash, u64>,
88+
pub valid_till_blocks: HashMap<Hash, u64>,
8889
}
8990

9091
/// Test Api for transaction pool.
@@ -269,6 +270,14 @@ impl TestApi {
269270
.insert(Self::hash_and_length_inner(xts).0, priority);
270271
}
271272

273+
/// Set a transaction mortality (block at which it will expire).
274+
pub fn set_valid_till(&self, xts: &Extrinsic, valid_till: u64) {
275+
self.chain
276+
.write()
277+
.valid_till_blocks
278+
.insert(Self::hash_and_length_inner(xts).0, valid_till);
279+
}
280+
272281
/// Query validation requests received.
273282
pub fn validation_requests(&self) -> Vec<Extrinsic> {
274283
self.validation_requests.read().clone()
@@ -443,11 +452,24 @@ impl ChainApi for TestApi {
443452
}
444453

445454
let priority = self.chain.read().priorities.get(&self.hash_and_length(&uxt).0).cloned();
455+
let longevity = self
456+
.chain
457+
.read()
458+
.valid_till_blocks
459+
.get(&self.hash_and_length(&uxt).0)
460+
.cloned()
461+
.map(|valid_till| valid_till.saturating_sub(block_number.unwrap()))
462+
.unwrap_or(64);
463+
464+
if longevity == 0 {
465+
return Ok(Err(TransactionValidityError::Invalid(InvalidTransaction::BadProof)))
466+
}
467+
446468
let mut validity = ValidTransaction {
447469
priority: priority.unwrap_or(1),
448470
requires,
449471
provides,
450-
longevity: 64,
472+
longevity,
451473
propagate: true,
452474
};
453475

0 commit comments

Comments
 (0)