Skip to content

Commit 88a99ba

Browse files
oleonardolimaclaude
andcommitted
refactor(chain): restructure canonicalization with staged processing
- add private `CanonicalStage` enum to track processing phases - add private `try_advance()` for automatic stage progression - add `is_transitive()` helper to `CanonicalReason` - rename internal `confirmed_anchors` to `direct_anchors` for clarity - update `resolve_query()` with stage-specific logic Co-authored-by: Claude <[email protected]> Signed-off-by: Leonardo Lima <[email protected]>
1 parent bc2884b commit 88a99ba

File tree

1 file changed

+204
-79
lines changed

1 file changed

+204
-79
lines changed

crates/chain/src/canonical_task.rs

Lines changed: 204 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,21 @@ use bitcoin::{Transaction, Txid};
1111
type CanonicalMap<A> = HashMap<Txid, (Arc<Transaction>, CanonicalReason<A>)>;
1212
type NotCanonicalSet = HashSet<Txid>;
1313

14+
/// Represents the current stage of canonicalization processing.
15+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16+
enum CanonicalStage {
17+
/// Processing directly anchored transactions.
18+
AnchoredTxs,
19+
/// Processing transactions seen in mempool.
20+
SeenTxs,
21+
/// Processing leftover transactions.
22+
LeftOverTxs,
23+
/// Processing transitively anchored transactions.
24+
TransitivelyAnchoredTxs,
25+
/// All processing is complete.
26+
Finished,
27+
}
28+
1429
/// Modifies the canonicalization algorithm.
1530
#[derive(Debug, Default, Clone)]
1631
pub struct CanonicalizationParams {
@@ -30,86 +45,147 @@ pub struct CanonicalizationTask<'g, A> {
3045
unprocessed_anchored_txs: VecDeque<(Txid, Arc<Transaction>, &'g BTreeSet<A>)>,
3146
unprocessed_seen_txs: Box<dyn Iterator<Item = (Txid, Arc<Transaction>, u64)> + 'g>,
3247
unprocessed_leftover_txs: VecDeque<(Txid, Arc<Transaction>, u32)>,
48+
unprocessed_transitively_anchored_txs: VecDeque<(Txid, Arc<Transaction>, &'g BTreeSet<A>)>,
3349

3450
canonical: CanonicalMap<A>,
3551
not_canonical: NotCanonicalSet,
3652

3753
// Store canonical transactions in order
3854
canonical_order: Vec<Txid>,
3955

40-
// Track which transactions have confirmed anchors
41-
confirmed_anchors: HashMap<Txid, A>,
56+
// Track which transactions have direct anchors (not transitive)
57+
direct_anchors: HashMap<Txid, A>,
58+
59+
// Track the current stage of processing
60+
current_stage: CanonicalStage,
4261
}
4362

4463
impl<'g, A: Anchor> ChainQuery for CanonicalizationTask<'g, A> {
4564
type Output = CanonicalView<A>;
4665

4766
fn next_query(&mut self) -> Option<ChainRequest> {
48-
// Find the next non-canonicalized transaction to query
49-
if let Some((_txid, _, anchors)) = self.unprocessed_anchored_txs.front() {
50-
// if !self.is_canonicalized(*txid) {
51-
// // Build query for this transaction
52-
// let block_ids = anchors.iter().map(|anchor| anchor.anchor_block()).collect();
53-
// return Some(ChainRequest {
54-
// chain_tip: self.chain_tip,
55-
// block_ids,
56-
// });
57-
// }
58-
// // Skip already canonicalized transaction
59-
// self.unprocessed_anchored_txs.pop_front();
60-
// Build query for this transaction
61-
let block_ids = anchors.iter().map(|anchor| anchor.anchor_block()).collect();
62-
return Some(ChainRequest {
63-
chain_tip: self.chain_tip,
64-
block_ids,
65-
});
67+
// Try to advance to the next stage if needed
68+
self.try_advance();
69+
70+
match self.current_stage {
71+
CanonicalStage::AnchoredTxs => {
72+
// Process directly anchored transactions first
73+
if let Some((_txid, _, anchors)) = self.unprocessed_anchored_txs.front() {
74+
let block_ids = anchors.iter().map(|anchor| anchor.anchor_block()).collect();
75+
return Some(ChainRequest {
76+
chain_tip: self.chain_tip,
77+
block_ids,
78+
});
79+
}
80+
None
81+
}
82+
CanonicalStage::TransitivelyAnchoredTxs => {
83+
// Process transitively anchored transactions last
84+
if let Some((_txid, _, anchors)) =
85+
self.unprocessed_transitively_anchored_txs.front()
86+
{
87+
let block_ids = anchors.iter().map(|anchor| anchor.anchor_block()).collect();
88+
return Some(ChainRequest {
89+
chain_tip: self.chain_tip,
90+
block_ids,
91+
});
92+
}
93+
None
94+
}
95+
CanonicalStage::SeenTxs | CanonicalStage::LeftOverTxs | CanonicalStage::Finished => {
96+
// These stages don't need queries
97+
None
98+
}
6699
}
67-
None
68100
}
69101

70102
fn resolve_query(&mut self, response: ChainResponse) {
71-
if let Some((txid, tx, anchors)) = self.unprocessed_anchored_txs.pop_front() {
72-
// Find the anchor that matches the confirmed BlockId
73-
let best_anchor = response.and_then(|block_id| {
74-
anchors
75-
.iter()
76-
.find(|anchor| anchor.anchor_block() == block_id)
77-
.cloned()
78-
});
79-
80-
match best_anchor {
81-
Some(best_anchor) => {
82-
self.confirmed_anchors.insert(txid, best_anchor.clone());
83-
if !self.is_canonicalized(txid) {
84-
self.mark_canonical(txid, tx, CanonicalReason::from_anchor(best_anchor));
103+
// Only AnchoredTxs and TransitivelyAnchoredTxs stages should receive query
104+
// responses Other stages don't generate queries and thus shouldn't call
105+
// resolve_query
106+
match self.current_stage {
107+
CanonicalStage::AnchoredTxs => {
108+
// Process directly anchored transaction response
109+
if let Some((txid, tx, anchors)) = self.unprocessed_anchored_txs.pop_front() {
110+
// Find the anchor that matches the confirmed BlockId
111+
let best_anchor = response.and_then(|block_id| {
112+
anchors
113+
.iter()
114+
.find(|anchor| anchor.anchor_block() == block_id)
115+
.cloned()
116+
});
117+
118+
match best_anchor {
119+
Some(best_anchor) => {
120+
// Transaction has a confirmed anchor
121+
self.direct_anchors.insert(txid, best_anchor.clone());
122+
if !self.is_canonicalized(txid) {
123+
self.mark_canonical(
124+
txid,
125+
tx,
126+
CanonicalReason::from_anchor(best_anchor),
127+
);
128+
}
129+
}
130+
None => {
131+
// No confirmed anchor found, add to leftover transactions for later
132+
// processing
133+
self.unprocessed_leftover_txs.push_back((
134+
txid,
135+
tx,
136+
anchors
137+
.iter()
138+
.last()
139+
.expect(
140+
"tx taken from `unprocessed_anchored_txs` so it must have at least one anchor",
141+
)
142+
.confirmation_height_upper_bound(),
143+
))
144+
}
85145
}
86146
}
87-
None => {
88-
self.unprocessed_leftover_txs.push_back((
89-
txid,
90-
tx,
91-
anchors
92-
.iter()
93-
.last()
94-
.expect(
95-
"tx taken from `unprocessed_txs_with_anchors` so it must at least have an anchor",
96-
)
97-
.confirmation_height_upper_bound(),
98-
))
147+
}
148+
CanonicalStage::TransitivelyAnchoredTxs => {
149+
// Process transitively anchored transaction response
150+
if let Some((txid, _tx, anchors)) =
151+
self.unprocessed_transitively_anchored_txs.pop_front()
152+
{
153+
// Find the anchor that matches the confirmed BlockId
154+
let best_anchor = response.and_then(|block_id| {
155+
anchors
156+
.iter()
157+
.find(|anchor| anchor.anchor_block() == block_id)
158+
.cloned()
159+
});
160+
161+
if let Some(best_anchor) = best_anchor {
162+
// Found a confirmed anchor for this transitively anchored transaction
163+
self.direct_anchors.insert(txid, best_anchor.clone());
164+
// Note: We don't re-mark as canonical since it's already marked
165+
// from being transitively anchored by its descendant
166+
}
167+
// If no confirmed anchor, we keep the transitive canonicalization status
99168
}
100169
}
170+
CanonicalStage::SeenTxs | CanonicalStage::LeftOverTxs | CanonicalStage::Finished => {
171+
// These stages don't generate queries and shouldn't receive responses
172+
debug_assert!(
173+
false,
174+
"resolve_query called for stage {:?} which doesn't generate queries",
175+
self.current_stage
176+
);
177+
}
101178
}
102179
}
103180

104181
fn is_finished(&mut self) -> bool {
105-
self.unprocessed_anchored_txs.is_empty()
182+
// Try to advance stages first
183+
self.try_advance();
184+
// Check if we've reached the Finished stage
185+
self.current_stage == CanonicalStage::Finished
106186
}
107187

108-
fn finish(mut self) -> Self::Output {
109-
// Process remaining transactions (seen and leftover)
110-
self.process_seen_txs();
111-
self.process_leftover_txs();
112-
188+
fn finish(self) -> Self::Output {
113189
// Build the canonical view
114190
let mut view_order = Vec::new();
115191
let mut view_txs = HashMap::new();
@@ -138,7 +214,7 @@ impl<'g, A: Anchor> ChainQuery for CanonicalizationTask<'g, A> {
138214
// Determine chain position based on reason
139215
let chain_position = match reason {
140216
CanonicalReason::Assumed { descendant } => match descendant {
141-
Some(_) => match self.confirmed_anchors.get(txid) {
217+
Some(_) => match self.direct_anchors.get(txid) {
142218
Some(confirmed_anchor) => ChainPosition::Confirmed {
143219
anchor: confirmed_anchor,
144220
transitively: None,
@@ -154,7 +230,7 @@ impl<'g, A: Anchor> ChainQuery for CanonicalizationTask<'g, A> {
154230
},
155231
},
156232
CanonicalReason::Anchor { anchor, descendant } => match descendant {
157-
Some(_) => match self.confirmed_anchors.get(txid) {
233+
Some(_) => match self.direct_anchors.get(txid) {
158234
Some(confirmed_anchor) => ChainPosition::Confirmed {
159235
anchor: confirmed_anchor,
160236
transitively: None,
@@ -190,6 +266,49 @@ impl<'g, A: Anchor> ChainQuery for CanonicalizationTask<'g, A> {
190266
}
191267

192268
impl<'g, A: Anchor> CanonicalizationTask<'g, A> {
269+
/// Try to advance to the next stage if the current stage is complete.
270+
/// The loop continues through stages that process all their transactions at once
271+
/// (SeenTxs and LeftOverTxs) to avoid needing multiple calls.
272+
fn try_advance(&mut self) {
273+
loop {
274+
let advanced = match self.current_stage {
275+
CanonicalStage::AnchoredTxs => {
276+
if self.unprocessed_anchored_txs.is_empty() {
277+
self.current_stage = CanonicalStage::SeenTxs;
278+
true // Continue to process SeenTxs immediately
279+
} else {
280+
false // Still have work, stop advancing
281+
}
282+
}
283+
CanonicalStage::SeenTxs => {
284+
// Process all seen transactions at once
285+
self.process_seen_txs();
286+
self.current_stage = CanonicalStage::LeftOverTxs;
287+
true // Continue to process LeftOverTxs immediately
288+
}
289+
CanonicalStage::LeftOverTxs => {
290+
// Process all leftover transactions at once
291+
self.process_leftover_txs();
292+
self.current_stage = CanonicalStage::TransitivelyAnchoredTxs;
293+
false // Stop here - TransitivelyAnchoredTxs need queries
294+
}
295+
CanonicalStage::TransitivelyAnchoredTxs => {
296+
if self.unprocessed_transitively_anchored_txs.is_empty() {
297+
self.current_stage = CanonicalStage::Finished;
298+
}
299+
false // Stop advancing
300+
}
301+
CanonicalStage::Finished => {
302+
false // Already finished, nothing to do
303+
}
304+
};
305+
306+
if !advanced {
307+
break;
308+
}
309+
}
310+
}
311+
193312
/// Creates a new canonicalization task.
194313
pub fn new(
195314
tx_graph: &'g TxGraph<A>,
@@ -222,12 +341,14 @@ impl<'g, A: Anchor> CanonicalizationTask<'g, A> {
222341
unprocessed_anchored_txs,
223342
unprocessed_seen_txs,
224343
unprocessed_leftover_txs: VecDeque::new(),
344+
unprocessed_transitively_anchored_txs: VecDeque::new(),
225345

226346
canonical: HashMap::new(),
227347
not_canonical: HashSet::new(),
228348

229349
canonical_order: Vec::new(),
230-
confirmed_anchors: HashMap::new(),
350+
direct_anchors: HashMap::new(),
351+
current_stage: CanonicalStage::AnchoredTxs,
231352
};
232353

233354
// process assumed transactions first (they don't need queries)
@@ -342,30 +463,28 @@ impl<'g, A: Anchor> CanonicalizationTask<'g, A> {
342463
for txid in undo_not_canonical {
343464
self.not_canonical.remove(&txid);
344465
}
345-
} else {
346-
// Add to canonical order
347-
for (txid, tx, reason) in &staged_canonical {
348-
self.canonical_order.push(*txid);
349-
350-
// If this was marked transitively, check if it has anchors to verify
351-
let is_transitive = matches!(
352-
reason,
353-
CanonicalReason::Anchor {
354-
descendant: Some(_),
355-
..
356-
} | CanonicalReason::Assumed {
357-
descendant: Some(_),
358-
..
359-
}
360-
);
466+
return;
467+
}
361468

362-
if is_transitive {
363-
if let Some(anchors) = self.tx_graph.all_anchors().get(txid) {
364-
// only check anchors we haven't already confirmed
365-
if !self.confirmed_anchors.contains_key(txid) {
366-
self.unprocessed_anchored_txs
367-
.push_back((*txid, tx.clone(), anchors));
368-
}
469+
// Add to canonical order
470+
for (txid, tx, reason) in &staged_canonical {
471+
self.canonical_order.push(*txid);
472+
473+
// ObservedIn transactions don't need anchor verification
474+
if matches!(reason, CanonicalReason::ObservedIn { .. }) {
475+
continue;
476+
}
477+
478+
// Check if this transaction was marked transitively and needs its own anchors verified
479+
if reason.is_transitive() {
480+
if let Some(anchors) = self.tx_graph.all_anchors().get(txid) {
481+
// only check anchors we haven't already confirmed
482+
if !self.direct_anchors.contains_key(txid) {
483+
self.unprocessed_transitively_anchored_txs.push_back((
484+
*txid,
485+
tx.clone(),
486+
anchors,
487+
));
369488
}
370489
}
371490
}
@@ -460,6 +579,12 @@ impl<A: Clone> CanonicalReason<A> {
460579
CanonicalReason::ObservedIn { descendant, .. } => descendant,
461580
}
462581
}
582+
583+
/// Returns true if this reason represents a transitive canonicalization
584+
/// (i.e., the transaction is canonical because of its descendant).
585+
pub fn is_transitive(&self) -> bool {
586+
self.descendant().is_some()
587+
}
463588
}
464589

465590
#[cfg(test)]

0 commit comments

Comments
 (0)