Skip to content

Commit ae7591c

Browse files
committed
feat(chain)!: introduce new TopologicalIterWithLevels
- add two new fields to `CanonicalIter`, the `canonical_roots` and `canonical_ancestors`, they're used by the `TopologicalIterWithLevels` in order to know what are the `roots` that it should start emitting from, and what are the ancestors/parents of each canonical transaction, in order to build the inputs dependencies and parents count. - add new `TopologicalIterWithLevels`, it's used to topologically iterate through the canonicalized `TxGraph`, emitting each level and it's canonical descendants in a topological order, following the correct spending order (e.g. tx1 -> tx2 and tx1, tx2 -> tx3, so it should output tx1, tx2 and then tx3 as it's spends both tx1 and tx2).
1 parent 1682def commit ae7591c

File tree

2 files changed

+213
-50
lines changed

2 files changed

+213
-50
lines changed

crates/chain/src/canonical_iter.rs

Lines changed: 191 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
use core::u32;
2+
13
use crate::collections::{HashMap, HashSet, VecDeque};
2-
use crate::tx_graph::{TxAncestors, TxDescendants};
4+
use crate::tx_graph::{TxAncestors, TxDescendants, TxNode};
35
use crate::{Anchor, ChainOracle, TxGraph};
46
use alloc::boxed::Box;
57
use alloc::collections::BTreeSet;
@@ -36,6 +38,9 @@ pub struct CanonicalIter<'g, A, C> {
3638
canonical: CanonicalMap<A>,
3739
not_canonical: NotCanonicalSet,
3840

41+
canonical_ancestors: HashMap<Txid, Vec<Txid>>,
42+
canonical_roots: VecDeque<Txid>,
43+
3944
queue: VecDeque<Txid>,
4045
}
4146

@@ -75,6 +80,8 @@ impl<'g, A: Anchor, C: ChainOracle> CanonicalIter<'g, A, C> {
7580
unprocessed_leftover_txs: VecDeque::new(),
7681
canonical: HashMap::new(),
7782
not_canonical: HashSet::new(),
83+
canonical_ancestors: HashMap::new(),
84+
canonical_roots: VecDeque::new(),
7885
queue: VecDeque::new(),
7986
}
8087
}
@@ -160,7 +167,7 @@ impl<'g, A: Anchor, C: ChainOracle> CanonicalIter<'g, A, C> {
160167

161168
// Any conflicts with a canonical tx can be added to `not_canonical`. Descendants
162169
// of `not_canonical` txs can also be added to `not_canonical`.
163-
for (_, conflict_txid) in self.tx_graph.direct_conflicts(&tx) {
170+
for (_, conflict_txid) in self.tx_graph.direct_conflicts(&tx.clone()) {
164171
TxDescendants::new_include_root(
165172
self.tx_graph,
166173
conflict_txid,
@@ -181,6 +188,18 @@ impl<'g, A: Anchor, C: ChainOracle> CanonicalIter<'g, A, C> {
181188
detected_self_double_spend = true;
182189
return None;
183190
}
191+
192+
// Calculates all the existing ancestors for the given Txid
193+
self.canonical_ancestors.insert(
194+
this_txid,
195+
tx.clone()
196+
.input
197+
.iter()
198+
.filter(|txin| self.tx_graph.get_tx(txin.previous_output.txid).is_some())
199+
.map(|txin| txin.previous_output.txid)
200+
.collect(),
201+
);
202+
184203
canonical_entry.insert((tx, this_reason));
185204
Some(this_txid)
186205
},
@@ -190,12 +209,30 @@ impl<'g, A: Anchor, C: ChainOracle> CanonicalIter<'g, A, C> {
190209
if detected_self_double_spend {
191210
for txid in staged_queue {
192211
self.canonical.remove(&txid);
212+
self.canonical_ancestors.remove(&txid);
193213
}
194214
for txid in undo_not_canonical {
195215
self.not_canonical.remove(&txid);
196216
}
197217
} else {
198-
self.queue.extend(staged_queue);
218+
// TODO: (@oleonardolima) Can this be optimized somehow ?
219+
// Can we just do a simple lookup on the `canonical_ancestors` field ?
220+
// println!("{:?}", self.canonical_ancestors);
221+
for txid in staged_queue {
222+
let tx = self.tx_graph.get_tx(txid).expect("tx must exist");
223+
let ancestors = tx
224+
.input
225+
.iter()
226+
.map(|txin| txin.previous_output.txid)
227+
.filter_map(|prev_txid| self.tx_graph.get_tx(prev_txid))
228+
.collect::<Vec<_>>();
229+
230+
// check if it's a root: it's either a coinbase transaction or has not known ancestors in the tx_graph
231+
if tx.is_coinbase() || ancestors.is_empty() {
232+
self.canonical_roots.push_back(txid);
233+
}
234+
}
235+
println!("canonical_roots: {:?}", self.canonical_roots);
199236
}
200237
}
201238
}
@@ -205,31 +242,21 @@ impl<A: Anchor, C: ChainOracle> Iterator for CanonicalIter<'_, A, C> {
205242

206243
fn next(&mut self) -> Option<Self::Item> {
207244
loop {
208-
if let Some(txid) = self.queue.pop_front() {
209-
let (tx, reason) = self
210-
.canonical
211-
.get(&txid)
212-
.cloned()
213-
.expect("reason must exist");
214-
return Some(Ok((txid, tx, reason)));
215-
}
216-
217-
if let Some((txid, tx)) = self.unprocessed_assumed_txs.next() {
245+
while let Some((txid, tx)) = self.unprocessed_assumed_txs.next() {
218246
if !self.is_canonicalized(txid) {
219247
self.mark_canonical(txid, tx, CanonicalReason::assumed());
220248
}
221249
}
222250

223-
if let Some((txid, tx, anchors)) = self.unprocessed_anchored_txs.next() {
251+
while let Some((txid, tx, anchors)) = self.unprocessed_anchored_txs.next() {
224252
if !self.is_canonicalized(txid) {
225253
if let Err(err) = self.scan_anchors(txid, tx, anchors) {
226254
return Some(Err(err));
227255
}
228256
}
229-
continue;
230257
}
231258

232-
if let Some((txid, tx, last_seen)) = self.unprocessed_seen_txs.next() {
259+
while let Some((txid, tx, last_seen)) = self.unprocessed_seen_txs.next() {
233260
debug_assert!(
234261
!tx.is_coinbase(),
235262
"Coinbase txs must not have `last_seen` (in mempool) value"
@@ -238,15 +265,34 @@ impl<A: Anchor, C: ChainOracle> Iterator for CanonicalIter<'_, A, C> {
238265
let observed_in = ObservedIn::Mempool(last_seen);
239266
self.mark_canonical(txid, tx, CanonicalReason::from_observed_in(observed_in));
240267
}
241-
continue;
242268
}
243269

244-
if let Some((txid, tx, height)) = self.unprocessed_leftover_txs.pop_front() {
270+
while let Some((txid, tx, height)) = self.unprocessed_leftover_txs.pop_front() {
245271
if !self.is_canonicalized(txid) && !tx.is_coinbase() {
246272
let observed_in = ObservedIn::Block(height);
247273
self.mark_canonical(txid, tx, CanonicalReason::from_observed_in(observed_in));
248274
}
249-
continue;
275+
}
276+
277+
if !self.canonical_roots.clone().is_empty() {
278+
let topological_iter = TopologicalIteratorWithLevels::new(
279+
self.tx_graph,
280+
self.chain,
281+
self.chain_tip,
282+
&self.canonical_ancestors,
283+
self.canonical_roots.drain(..).collect(),
284+
);
285+
286+
self.queue.extend(topological_iter);
287+
}
288+
289+
while let Some(txid) = self.queue.pop_front() {
290+
let (tx, reason) = self
291+
.canonical
292+
.get(&txid)
293+
.cloned()
294+
.expect("canonical reason must exist");
295+
return Some(Ok((txid, tx, reason)));
250296
}
251297

252298
return None;
@@ -342,3 +388,129 @@ impl<A: Clone> CanonicalReason<A> {
342388
}
343389
}
344390
}
391+
392+
struct TopologicalIteratorWithLevels<'a, A, C> {
393+
tx_graph: &'a TxGraph<A>,
394+
chain: &'a C,
395+
chain_tip: BlockId,
396+
397+
current_level: Vec<Txid>,
398+
next_level: Vec<Txid>,
399+
400+
adj_list: HashMap<Txid, Vec<Txid>>,
401+
parent_count: HashMap<Txid, usize>,
402+
403+
current_index: usize,
404+
}
405+
406+
impl<'a, A: Anchor, C: ChainOracle> TopologicalIteratorWithLevels<'a, A, C> {
407+
fn new(
408+
tx_graph: &'a TxGraph<A>,
409+
chain: &'a C,
410+
chain_tip: BlockId,
411+
ancestors_by_txid: &HashMap<Txid, Vec<Txid>>,
412+
roots: Vec<Txid>,
413+
) -> Self {
414+
let mut parent_count = HashMap::new();
415+
let mut adj_list: HashMap<Txid, Vec<Txid>> = HashMap::new();
416+
417+
for (txid, ancestors) in ancestors_by_txid {
418+
for ancestor in ancestors {
419+
adj_list.entry(*ancestor).or_default().push(*txid);
420+
*parent_count.entry(*txid).or_insert(0) += 1;
421+
}
422+
}
423+
424+
let mut current_level: Vec<Txid> = roots.to_vec();
425+
426+
// Sort the initial level by confirmation height
427+
current_level.sort_by_key(|&txid| {
428+
let tx_node = tx_graph.get_tx_node(txid).expect("tx should exist");
429+
Self::find_direct_anchor(&tx_node, chain, chain_tip)
430+
.expect("should not fail")
431+
.map(|anchor| anchor.confirmation_height_upper_bound())
432+
.unwrap_or(u32::MAX)
433+
});
434+
435+
Self {
436+
current_level,
437+
next_level: Vec::new(),
438+
adj_list,
439+
parent_count,
440+
current_index: 0,
441+
tx_graph,
442+
chain,
443+
chain_tip,
444+
}
445+
}
446+
447+
fn find_direct_anchor(
448+
tx_node: &TxNode<'_, Arc<Transaction>, A>,
449+
chain: &C,
450+
chain_tip: BlockId,
451+
) -> Result<Option<A>, C::Error> {
452+
tx_node
453+
.anchors
454+
.iter()
455+
.find_map(|a| -> Option<Result<A, C::Error>> {
456+
match chain.is_block_in_chain(a.anchor_block(), chain_tip) {
457+
Ok(Some(true)) => Some(Ok(a.clone())),
458+
Ok(Some(false)) | Ok(None) => None,
459+
Err(err) => Some(Err(err)),
460+
}
461+
})
462+
.transpose()
463+
}
464+
465+
fn advance_to_next_level(&mut self) {
466+
self.current_level = std::mem::take(&mut self.next_level);
467+
468+
// Sort by confirmation height
469+
self.current_level.sort_by_key(|&txid| {
470+
let tx_node = self.tx_graph.get_tx_node(txid).expect("tx should exist");
471+
472+
Self::find_direct_anchor(&tx_node, self.chain, self.chain_tip)
473+
.expect("should not fail")
474+
.map(|anchor| anchor.confirmation_height_upper_bound())
475+
.unwrap_or(u32::MAX)
476+
});
477+
478+
self.current_index = 0;
479+
}
480+
}
481+
482+
impl<'a, A: Anchor, C: ChainOracle> Iterator for TopologicalIteratorWithLevels<'a, A, C> {
483+
type Item = Txid;
484+
485+
fn next(&mut self) -> Option<Self::Item> {
486+
// If we've exhausted the current level, move to next
487+
if self.current_index >= self.current_level.len() {
488+
if self.next_level.is_empty() {
489+
return None;
490+
}
491+
self.advance_to_next_level();
492+
}
493+
494+
let current = self.current_level[self.current_index];
495+
self.current_index += 1;
496+
497+
// If this is the last item in current level, prepare dependents for next level
498+
if self.current_index == self.current_level.len() {
499+
// Process all dependents of all transactions in current level
500+
for &tx in &self.current_level {
501+
if let Some(dependents) = self.adj_list.get(&tx) {
502+
for &dependent in dependents {
503+
if let Some(degree) = self.parent_count.get_mut(&dependent) {
504+
*degree -= 1;
505+
if *degree == 0 {
506+
self.next_level.push(dependent);
507+
}
508+
}
509+
}
510+
}
511+
}
512+
}
513+
514+
Some(current)
515+
}
516+
}

crates/chain/tests/test_tx_graph.rs

Lines changed: 22 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1743,63 +1743,54 @@ fn test_list_canonical_txs_topological_order() {
17431743
for (_, scenario) in scenarios.iter().enumerate() {
17441744
let env = init_graph(scenario.tx_templates.iter());
17451745

1746-
let canonical_txs = env
1746+
let canonical_txids = env
17471747
.tx_graph
17481748
.list_canonical_txs(&local_chain, chain_tip, env.canonicalization_params.clone())
17491749
.map(|tx| tx.tx_node.txid)
1750-
.collect::<BTreeSet<_>>();
1750+
.collect::<Vec<_>>();
17511751

1752-
let exp_txs = scenario
1752+
let exp_txids = scenario
17531753
.exp_chain_txs
17541754
.iter()
17551755
.map(|txid| *env.tx_name_to_txid.get(txid).expect("txid must exist"))
1756-
.collect::<BTreeSet<_>>();
1756+
.collect::<Vec<_>>();
17571757

17581758
assert_eq!(
1759-
canonical_txs, exp_txs,
1759+
HashSet::<Txid>::from_iter(canonical_txids.clone()),
1760+
HashSet::<Txid>::from_iter(exp_txids.clone()),
17601761
"\n[{}] 'list_canonical_txs' failed",
17611762
scenario.name
17621763
);
17631764

1764-
let canonical_txs = canonical_txs.iter().map(|txid| *txid).collect::<Vec<_>>();
1765-
17661765
assert!(
1767-
is_txs_in_topological_order(canonical_txs, env.tx_graph),
1766+
is_txs_in_topological_order(canonical_txids, env.tx_graph),
17681767
"\n[{}] 'list_canonical_txs' failed to output the txs in topological order",
17691768
scenario.name
17701769
);
17711770
}
17721771
}
17731772

17741773
fn is_txs_in_topological_order(txs: Vec<Txid>, tx_graph: TxGraph<BlockId>) -> bool {
1775-
let enumerated_txs = txs
1776-
.iter()
1777-
.enumerate()
1778-
.map(|(i, txid)| (i, *txid))
1779-
.collect::<Vec<(usize, Txid)>>();
1774+
let mut seen: HashSet<Txid> = HashSet::new();
17801775

1781-
let txid_to_pos = enumerated_txs
1782-
.iter()
1783-
.map(|(i, txid)| (*txid, *i))
1784-
.collect::<HashMap<Txid, usize>>();
1785-
1786-
for (pos, txid) in enumerated_txs {
1787-
let descendants_pos: Vec<(&usize, Txid)> = tx_graph
1788-
.walk_descendants(txid, |_depth, this_txid| {
1789-
let pos = txid_to_pos.get(&this_txid).unwrap();
1790-
Some((pos, this_txid))
1791-
})
1776+
for txid in txs {
1777+
let tx = tx_graph.get_tx(txid).expect("should exist");
1778+
let inputs: Vec<Txid> = tx
1779+
.input
1780+
.iter()
1781+
.map(|txin| txin.previous_output.txid)
17921782
.collect();
17931783

1794-
for (desc_pos, this_txid) in descendants_pos {
1795-
if desc_pos < &pos {
1796-
println!(
1797-
"ancestor: ({:?}, {:?}) , descendant ({:?}, {:?})",
1798-
txid, pos, this_txid, desc_pos
1799-
);
1784+
// assert that all the txin's have been seen already
1785+
for input_txid in inputs {
1786+
if !seen.contains(&input_txid) {
18001787
return false;
18011788
}
18021789
}
1790+
1791+
// Add current transaction to seen set
1792+
seen.insert(txid);
18031793
}
1804-
return true;
1794+
1795+
true
18051796
}

0 commit comments

Comments
 (0)