Skip to content

Commit 08e4ce8

Browse files
committed
feat(chain)!: introduce new TopologicalIterWithLevels
- add two new fields to `CanonicalIter`, the `canonical_roots` and `canonical_ancestors`, they're used by the `TopologicalIterWithLevels` in order to know what are the `roots` that it should start emitting from, and what are the ancestors/parents of each canonical transaction, in order to build the inputs dependencies and parents count. - add new `TopologicalIterWithLevels`, it's used to topologically iterate through the canonicalized `TxGraph`, emitting each level and it's canonical descendants in a topological order, following the correct spending order (e.g. tx1 -> tx2 and tx1, tx2 -> tx3, so it should output tx1, tx2 and then tx3 as it's spends both tx1 and tx2).
1 parent 3cb3e07 commit 08e4ce8

File tree

2 files changed

+212
-52
lines changed

2 files changed

+212
-52
lines changed

crates/chain/src/canonical_iter.rs

Lines changed: 188 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::collections::{HashMap, HashSet, VecDeque};
2-
use crate::tx_graph::{TxAncestors, TxDescendants};
2+
use crate::tx_graph::{TxAncestors, TxDescendants, TxNode};
33
use crate::{Anchor, ChainOracle, TxGraph};
44
use alloc::boxed::Box;
55
use alloc::collections::BTreeSet;
@@ -36,6 +36,9 @@ pub struct CanonicalIter<'g, A, C> {
3636
canonical: CanonicalMap<A>,
3737
not_canonical: NotCanonicalSet,
3838

39+
canonical_ancestors: HashMap<Txid, Vec<Txid>>,
40+
canonical_roots: VecDeque<Txid>,
41+
3942
queue: VecDeque<Txid>,
4043
}
4144

@@ -75,6 +78,8 @@ impl<'g, A: Anchor, C: ChainOracle> CanonicalIter<'g, A, C> {
7578
unprocessed_leftover_txs: VecDeque::new(),
7679
canonical: HashMap::new(),
7780
not_canonical: HashSet::new(),
81+
canonical_ancestors: HashMap::new(),
82+
canonical_roots: VecDeque::new(),
7883
queue: VecDeque::new(),
7984
}
8085
}
@@ -160,7 +165,7 @@ impl<'g, A: Anchor, C: ChainOracle> CanonicalIter<'g, A, C> {
160165

161166
// Any conflicts with a canonical tx can be added to `not_canonical`. Descendants
162167
// of `not_canonical` txs can also be added to `not_canonical`.
163-
for (_, conflict_txid) in self.tx_graph.direct_conflicts(&tx) {
168+
for (_, conflict_txid) in self.tx_graph.direct_conflicts(&tx.clone()) {
164169
TxDescendants::new_include_root(
165170
self.tx_graph,
166171
conflict_txid,
@@ -181,6 +186,18 @@ impl<'g, A: Anchor, C: ChainOracle> CanonicalIter<'g, A, C> {
181186
detected_self_double_spend = true;
182187
return None;
183188
}
189+
190+
// Calculates all the existing ancestors for the given Txid
191+
self.canonical_ancestors.insert(
192+
this_txid,
193+
tx.clone()
194+
.input
195+
.iter()
196+
.filter(|txin| self.tx_graph.get_tx(txin.previous_output.txid).is_some())
197+
.map(|txin| txin.previous_output.txid)
198+
.collect(),
199+
);
200+
184201
canonical_entry.insert((tx, this_reason));
185202
Some(this_txid)
186203
},
@@ -190,12 +207,29 @@ impl<'g, A: Anchor, C: ChainOracle> CanonicalIter<'g, A, C> {
190207
if detected_self_double_spend {
191208
for txid in staged_queue {
192209
self.canonical.remove(&txid);
210+
self.canonical_ancestors.remove(&txid);
193211
}
194212
for txid in undo_not_canonical {
195213
self.not_canonical.remove(&txid);
196214
}
197215
} else {
198-
self.queue.extend(staged_queue);
216+
// TODO: (@oleonardolima) Can this be optimized somehow ?
217+
// Can we just do a simple lookup on the `canonical_ancestors` field ?
218+
for txid in staged_queue {
219+
let tx = self.tx_graph.get_tx(txid).expect("tx must exist");
220+
let ancestors = tx
221+
.input
222+
.iter()
223+
.map(|txin| txin.previous_output.txid)
224+
.filter_map(|prev_txid| self.tx_graph.get_tx(prev_txid))
225+
.collect::<Vec<_>>();
226+
227+
// check if it's a root: it's either a coinbase transaction or has not known
228+
// ancestors in the tx_graph
229+
if tx.is_coinbase() || ancestors.is_empty() {
230+
self.canonical_roots.push_back(txid);
231+
}
232+
}
199233
}
200234
}
201235
}
@@ -205,31 +239,21 @@ impl<A: Anchor, C: ChainOracle> Iterator for CanonicalIter<'_, A, C> {
205239

206240
fn next(&mut self) -> Option<Self::Item> {
207241
loop {
208-
if let Some(txid) = self.queue.pop_front() {
209-
let (tx, reason) = self
210-
.canonical
211-
.get(&txid)
212-
.cloned()
213-
.expect("reason must exist");
214-
return Some(Ok((txid, tx, reason)));
215-
}
216-
217-
if let Some((txid, tx)) = self.unprocessed_assumed_txs.next() {
242+
while let Some((txid, tx)) = self.unprocessed_assumed_txs.next() {
218243
if !self.is_canonicalized(txid) {
219244
self.mark_canonical(txid, tx, CanonicalReason::assumed());
220245
}
221246
}
222247

223-
if let Some((txid, tx, anchors)) = self.unprocessed_anchored_txs.next() {
248+
while let Some((txid, tx, anchors)) = self.unprocessed_anchored_txs.next() {
224249
if !self.is_canonicalized(txid) {
225250
if let Err(err) = self.scan_anchors(txid, tx, anchors) {
226251
return Some(Err(err));
227252
}
228253
}
229-
continue;
230254
}
231255

232-
if let Some((txid, tx, last_seen)) = self.unprocessed_seen_txs.next() {
256+
while let Some((txid, tx, last_seen)) = self.unprocessed_seen_txs.next() {
233257
debug_assert!(
234258
!tx.is_coinbase(),
235259
"Coinbase txs must not have `last_seen` (in mempool) value"
@@ -238,15 +262,34 @@ impl<A: Anchor, C: ChainOracle> Iterator for CanonicalIter<'_, A, C> {
238262
let observed_in = ObservedIn::Mempool(last_seen);
239263
self.mark_canonical(txid, tx, CanonicalReason::from_observed_in(observed_in));
240264
}
241-
continue;
242265
}
243266

244-
if let Some((txid, tx, height)) = self.unprocessed_leftover_txs.pop_front() {
267+
while let Some((txid, tx, height)) = self.unprocessed_leftover_txs.pop_front() {
245268
if !self.is_canonicalized(txid) && !tx.is_coinbase() {
246269
let observed_in = ObservedIn::Block(height);
247270
self.mark_canonical(txid, tx, CanonicalReason::from_observed_in(observed_in));
248271
}
249-
continue;
272+
}
273+
274+
if !self.canonical_roots.is_empty() {
275+
let topological_iter = TopologicalIteratorWithLevels::new(
276+
self.tx_graph,
277+
self.chain,
278+
self.chain_tip,
279+
&self.canonical_ancestors,
280+
self.canonical_roots.drain(..).collect(),
281+
);
282+
283+
self.queue.extend(topological_iter);
284+
}
285+
286+
if let Some(txid) = self.queue.pop_front() {
287+
let (tx, reason) = self
288+
.canonical
289+
.get(&txid)
290+
.cloned()
291+
.expect("canonical reason must exist");
292+
return Some(Ok((txid, tx, reason)));
250293
}
251294

252295
return None;
@@ -342,3 +385,129 @@ impl<A: Clone> CanonicalReason<A> {
342385
}
343386
}
344387
}
388+
389+
struct TopologicalIteratorWithLevels<'a, A, C> {
390+
tx_graph: &'a TxGraph<A>,
391+
chain: &'a C,
392+
chain_tip: BlockId,
393+
394+
current_level: Vec<Txid>,
395+
next_level: Vec<Txid>,
396+
397+
adj_list: HashMap<Txid, Vec<Txid>>,
398+
parent_count: HashMap<Txid, usize>,
399+
400+
current_index: usize,
401+
}
402+
403+
impl<'a, A: Anchor, C: ChainOracle> TopologicalIteratorWithLevels<'a, A, C> {
404+
fn new(
405+
tx_graph: &'a TxGraph<A>,
406+
chain: &'a C,
407+
chain_tip: BlockId,
408+
ancestors_by_txid: &HashMap<Txid, Vec<Txid>>,
409+
roots: Vec<Txid>,
410+
) -> Self {
411+
let mut parent_count = HashMap::new();
412+
let mut adj_list: HashMap<Txid, Vec<Txid>> = HashMap::new();
413+
414+
for (txid, ancestors) in ancestors_by_txid {
415+
for ancestor in ancestors {
416+
adj_list.entry(*ancestor).or_default().push(*txid);
417+
*parent_count.entry(*txid).or_insert(0) += 1;
418+
}
419+
}
420+
421+
let mut current_level: Vec<Txid> = roots.to_vec();
422+
423+
// Sort the initial level by confirmation height
424+
current_level.sort_by_key(|&txid| {
425+
let tx_node = tx_graph.get_tx_node(txid).expect("tx should exist");
426+
Self::find_direct_anchor(&tx_node, chain, chain_tip)
427+
.expect("should not fail")
428+
.map(|anchor| anchor.confirmation_height_upper_bound())
429+
.unwrap_or(u32::MAX)
430+
});
431+
432+
Self {
433+
current_level,
434+
next_level: Vec::new(),
435+
adj_list,
436+
parent_count,
437+
current_index: 0,
438+
tx_graph,
439+
chain,
440+
chain_tip,
441+
}
442+
}
443+
444+
fn find_direct_anchor(
445+
tx_node: &TxNode<'_, Arc<Transaction>, A>,
446+
chain: &C,
447+
chain_tip: BlockId,
448+
) -> Result<Option<A>, C::Error> {
449+
tx_node
450+
.anchors
451+
.iter()
452+
.find_map(|a| -> Option<Result<A, C::Error>> {
453+
match chain.is_block_in_chain(a.anchor_block(), chain_tip) {
454+
Ok(Some(true)) => Some(Ok(a.clone())),
455+
Ok(Some(false)) | Ok(None) => None,
456+
Err(err) => Some(Err(err)),
457+
}
458+
})
459+
.transpose()
460+
}
461+
462+
fn advance_to_next_level(&mut self) {
463+
self.current_level = core::mem::take(&mut self.next_level);
464+
465+
// Sort by confirmation height
466+
self.current_level.sort_by_key(|&txid| {
467+
let tx_node = self.tx_graph.get_tx_node(txid).expect("tx should exist");
468+
469+
Self::find_direct_anchor(&tx_node, self.chain, self.chain_tip)
470+
.expect("should not fail")
471+
.map(|anchor| anchor.confirmation_height_upper_bound())
472+
.unwrap_or(u32::MAX)
473+
});
474+
475+
self.current_index = 0;
476+
}
477+
}
478+
479+
impl<'a, A: Anchor, C: ChainOracle> Iterator for TopologicalIteratorWithLevels<'a, A, C> {
480+
type Item = Txid;
481+
482+
fn next(&mut self) -> Option<Self::Item> {
483+
// If we've exhausted the current level, move to next
484+
if self.current_index >= self.current_level.len() {
485+
if self.next_level.is_empty() {
486+
return None;
487+
}
488+
self.advance_to_next_level();
489+
}
490+
491+
let current = self.current_level[self.current_index];
492+
self.current_index += 1;
493+
494+
// If this is the last item in current level, prepare dependents for next level
495+
if self.current_index == self.current_level.len() {
496+
// Process all dependents of all transactions in current level
497+
for &tx in &self.current_level {
498+
if let Some(dependents) = self.adj_list.get(&tx) {
499+
for &dependent in dependents {
500+
if let Some(degree) = self.parent_count.get_mut(&dependent) {
501+
*degree -= 1;
502+
if *degree == 0 {
503+
self.next_level.push(dependent);
504+
}
505+
}
506+
}
507+
}
508+
}
509+
}
510+
511+
Some(current)
512+
}
513+
}

crates/chain/tests/test_tx_graph.rs

Lines changed: 24 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1554,7 +1554,7 @@ struct Scenario<'a> {
15541554
#[test]
15551555
fn test_list_canonical_txs_topological_order() {
15561556
// chain
1557-
let local_chain = local_chain!(
1557+
let local_chain: LocalChain<BlockHash> = local_chain!(
15581558
(0, hash!("A")),
15591559
(1, hash!("B")),
15601560
(2, hash!("C")),
@@ -1740,66 +1740,57 @@ fn test_list_canonical_txs_topological_order() {
17401740
exp_chain_txs: Vec::from(["a0", "e0", "f0", "b0", "c0", "b1", "c1", "d0"]),
17411741
}];
17421742

1743-
for (_, scenario) in scenarios.iter().enumerate() {
1743+
for scenario in scenarios {
17441744
let env = init_graph(scenario.tx_templates.iter());
17451745

1746-
let canonical_txs = env
1746+
let canonical_txids = env
17471747
.tx_graph
17481748
.list_canonical_txs(&local_chain, chain_tip, env.canonicalization_params.clone())
17491749
.map(|tx| tx.tx_node.txid)
1750-
.collect::<BTreeSet<_>>();
1750+
.collect::<Vec<_>>();
17511751

1752-
let exp_txs = scenario
1752+
let exp_txids = scenario
17531753
.exp_chain_txs
17541754
.iter()
17551755
.map(|txid| *env.tx_name_to_txid.get(txid).expect("txid must exist"))
1756-
.collect::<BTreeSet<_>>();
1756+
.collect::<Vec<_>>();
17571757

17581758
assert_eq!(
1759-
canonical_txs, exp_txs,
1759+
HashSet::<Txid>::from_iter(canonical_txids.clone()),
1760+
HashSet::<Txid>::from_iter(exp_txids.clone()),
17601761
"\n[{}] 'list_canonical_txs' failed",
17611762
scenario.name
17621763
);
17631764

1764-
let canonical_txs = canonical_txs.iter().map(|txid| *txid).collect::<Vec<_>>();
1765-
17661765
assert!(
1767-
is_txs_in_topological_order(canonical_txs, env.tx_graph),
1766+
is_txs_in_topological_order(canonical_txids, env.tx_graph),
17681767
"\n[{}] 'list_canonical_txs' failed to output the txs in topological order",
17691768
scenario.name
17701769
);
17711770
}
17721771
}
17731772

17741773
fn is_txs_in_topological_order(txs: Vec<Txid>, tx_graph: TxGraph<BlockId>) -> bool {
1775-
let enumerated_txs = txs
1776-
.iter()
1777-
.enumerate()
1778-
.map(|(i, txid)| (i, *txid))
1779-
.collect::<Vec<(usize, Txid)>>();
1774+
let mut seen: HashSet<Txid> = HashSet::new();
17801775

1781-
let txid_to_pos = enumerated_txs
1782-
.iter()
1783-
.map(|(i, txid)| (*txid, *i))
1784-
.collect::<HashMap<Txid, usize>>();
1785-
1786-
for (pos, txid) in enumerated_txs {
1787-
let descendants_pos: Vec<(&usize, Txid)> = tx_graph
1788-
.walk_descendants(txid, |_depth, this_txid| {
1789-
let pos = txid_to_pos.get(&this_txid).unwrap();
1790-
Some((pos, this_txid))
1791-
})
1776+
for txid in txs {
1777+
let tx = tx_graph.get_tx(txid).expect("should exist");
1778+
let inputs: Vec<Txid> = tx
1779+
.input
1780+
.iter()
1781+
.map(|txin| txin.previous_output.txid)
17921782
.collect();
17931783

1794-
for (desc_pos, this_txid) in descendants_pos {
1795-
if desc_pos < &pos {
1796-
println!(
1797-
"ancestor: ({:?}, {:?}) , descendant ({:?}, {:?})",
1798-
txid, pos, this_txid, desc_pos
1799-
);
1784+
// assert that all the txin's have been seen already
1785+
for input_txid in inputs {
1786+
if !seen.contains(&input_txid) {
18001787
return false;
18011788
}
18021789
}
1790+
1791+
// Add current transaction to seen set
1792+
seen.insert(txid);
18031793
}
1804-
return true;
1794+
1795+
true
18051796
}

0 commit comments

Comments
 (0)