@@ -11,6 +11,21 @@ use bitcoin::{Transaction, Txid};
11
11
type CanonicalMap < A > = HashMap < Txid , ( Arc < Transaction > , CanonicalReason < A > ) > ;
12
12
type NotCanonicalSet = HashSet < Txid > ;
13
13
14
+ /// Represents the current stage of canonicalization processing.
15
+ #[ derive( Debug , Clone , Copy , PartialEq , Eq ) ]
16
+ enum CanonicalStage {
17
+ /// Processing directly anchored transactions.
18
+ AnchoredTxs ,
19
+ /// Processing transactions seen in mempool.
20
+ SeenTxs ,
21
+ /// Processing leftover transactions.
22
+ LeftOverTxs ,
23
+ /// Processing transitively anchored transactions.
24
+ TransitivelyAnchoredTxs ,
25
+ /// All processing is complete.
26
+ Finished ,
27
+ }
28
+
14
29
/// Modifies the canonicalization algorithm.
15
30
#[ derive( Debug , Default , Clone ) ]
16
31
pub struct CanonicalizationParams {
@@ -30,86 +45,147 @@ pub struct CanonicalizationTask<'g, A> {
30
45
unprocessed_anchored_txs : VecDeque < ( Txid , Arc < Transaction > , & ' g BTreeSet < A > ) > ,
31
46
unprocessed_seen_txs : Box < dyn Iterator < Item = ( Txid , Arc < Transaction > , u64 ) > + ' g > ,
32
47
unprocessed_leftover_txs : VecDeque < ( Txid , Arc < Transaction > , u32 ) > ,
48
+ unprocessed_transitively_anchored_txs : VecDeque < ( Txid , Arc < Transaction > , & ' g BTreeSet < A > ) > ,
33
49
34
50
canonical : CanonicalMap < A > ,
35
51
not_canonical : NotCanonicalSet ,
36
52
37
53
// Store canonical transactions in order
38
54
canonical_order : Vec < Txid > ,
39
55
40
- // Track which transactions have confirmed anchors
41
- confirmed_anchors : HashMap < Txid , A > ,
56
+ // Track which transactions have direct anchors (not transitive)
57
+ direct_anchors : HashMap < Txid , A > ,
58
+
59
+ // Track the current stage of processing
60
+ current_stage : CanonicalStage ,
42
61
}
43
62
44
63
impl < ' g , A : Anchor > ChainQuery for CanonicalizationTask < ' g , A > {
45
64
type Output = CanonicalView < A > ;
46
65
47
66
fn next_query ( & mut self ) -> Option < ChainRequest > {
48
- // Find the next non-canonicalized transaction to query
49
- if let Some ( ( _txid, _, anchors) ) = self . unprocessed_anchored_txs . front ( ) {
50
- // if !self.is_canonicalized(*txid) {
51
- // // Build query for this transaction
52
- // let block_ids = anchors.iter().map(|anchor| anchor.anchor_block()).collect();
53
- // return Some(ChainRequest {
54
- // chain_tip: self.chain_tip,
55
- // block_ids,
56
- // });
57
- // }
58
- // // Skip already canonicalized transaction
59
- // self.unprocessed_anchored_txs.pop_front();
60
- // Build query for this transaction
61
- let block_ids = anchors. iter ( ) . map ( |anchor| anchor. anchor_block ( ) ) . collect ( ) ;
62
- return Some ( ChainRequest {
63
- chain_tip : self . chain_tip ,
64
- block_ids,
65
- } ) ;
67
+ // Try to advance to the next stage if needed
68
+ self . try_advance ( ) ;
69
+
70
+ match self . current_stage {
71
+ CanonicalStage :: AnchoredTxs => {
72
+ // Process directly anchored transactions first
73
+ if let Some ( ( _txid, _, anchors) ) = self . unprocessed_anchored_txs . front ( ) {
74
+ let block_ids = anchors. iter ( ) . map ( |anchor| anchor. anchor_block ( ) ) . collect ( ) ;
75
+ return Some ( ChainRequest {
76
+ chain_tip : self . chain_tip ,
77
+ block_ids,
78
+ } ) ;
79
+ }
80
+ None
81
+ }
82
+ CanonicalStage :: TransitivelyAnchoredTxs => {
83
+ // Process transitively anchored transactions last
84
+ if let Some ( ( _txid, _, anchors) ) =
85
+ self . unprocessed_transitively_anchored_txs . front ( )
86
+ {
87
+ let block_ids = anchors. iter ( ) . map ( |anchor| anchor. anchor_block ( ) ) . collect ( ) ;
88
+ return Some ( ChainRequest {
89
+ chain_tip : self . chain_tip ,
90
+ block_ids,
91
+ } ) ;
92
+ }
93
+ None
94
+ }
95
+ CanonicalStage :: SeenTxs | CanonicalStage :: LeftOverTxs | CanonicalStage :: Finished => {
96
+ // These stages don't need queries
97
+ None
98
+ }
66
99
}
67
- None
68
100
}
69
101
70
102
fn resolve_query ( & mut self , response : ChainResponse ) {
71
- if let Some ( ( txid, tx, anchors) ) = self . unprocessed_anchored_txs . pop_front ( ) {
72
- // Find the anchor that matches the confirmed BlockId
73
- let best_anchor = response. and_then ( |block_id| {
74
- anchors
75
- . iter ( )
76
- . find ( |anchor| anchor. anchor_block ( ) == block_id)
77
- . cloned ( )
78
- } ) ;
79
-
80
- match best_anchor {
81
- Some ( best_anchor) => {
82
- self . confirmed_anchors . insert ( txid, best_anchor. clone ( ) ) ;
83
- if !self . is_canonicalized ( txid) {
84
- self . mark_canonical ( txid, tx, CanonicalReason :: from_anchor ( best_anchor) ) ;
103
+ // Only AnchoredTxs and TransitivelyAnchoredTxs stages should receive query
104
+ // responses Other stages don't generate queries and thus shouldn't call
105
+ // resolve_query
106
+ match self . current_stage {
107
+ CanonicalStage :: AnchoredTxs => {
108
+ // Process directly anchored transaction response
109
+ if let Some ( ( txid, tx, anchors) ) = self . unprocessed_anchored_txs . pop_front ( ) {
110
+ // Find the anchor that matches the confirmed BlockId
111
+ let best_anchor = response. and_then ( |block_id| {
112
+ anchors
113
+ . iter ( )
114
+ . find ( |anchor| anchor. anchor_block ( ) == block_id)
115
+ . cloned ( )
116
+ } ) ;
117
+
118
+ match best_anchor {
119
+ Some ( best_anchor) => {
120
+ // Transaction has a confirmed anchor
121
+ self . direct_anchors . insert ( txid, best_anchor. clone ( ) ) ;
122
+ if !self . is_canonicalized ( txid) {
123
+ self . mark_canonical (
124
+ txid,
125
+ tx,
126
+ CanonicalReason :: from_anchor ( best_anchor) ,
127
+ ) ;
128
+ }
129
+ }
130
+ None => {
131
+ // No confirmed anchor found, add to leftover transactions for later
132
+ // processing
133
+ self . unprocessed_leftover_txs . push_back ( (
134
+ txid,
135
+ tx,
136
+ anchors
137
+ . iter ( )
138
+ . last ( )
139
+ . expect (
140
+ "tx taken from `unprocessed_anchored_txs` so it must have at least one anchor" ,
141
+ )
142
+ . confirmation_height_upper_bound ( ) ,
143
+ ) )
144
+ }
85
145
}
86
146
}
87
- None => {
88
- self . unprocessed_leftover_txs . push_back ( (
89
- txid,
90
- tx,
91
- anchors
92
- . iter ( )
93
- . last ( )
94
- . expect (
95
- "tx taken from `unprocessed_txs_with_anchors` so it must at least have an anchor" ,
96
- )
97
- . confirmation_height_upper_bound ( ) ,
98
- ) )
147
+ }
148
+ CanonicalStage :: TransitivelyAnchoredTxs => {
149
+ // Process transitively anchored transaction response
150
+ if let Some ( ( txid, _tx, anchors) ) =
151
+ self . unprocessed_transitively_anchored_txs . pop_front ( )
152
+ {
153
+ // Find the anchor that matches the confirmed BlockId
154
+ let best_anchor = response. and_then ( |block_id| {
155
+ anchors
156
+ . iter ( )
157
+ . find ( |anchor| anchor. anchor_block ( ) == block_id)
158
+ . cloned ( )
159
+ } ) ;
160
+
161
+ if let Some ( best_anchor) = best_anchor {
162
+ // Found a confirmed anchor for this transitively anchored transaction
163
+ self . direct_anchors . insert ( txid, best_anchor. clone ( ) ) ;
164
+ // Note: We don't re-mark as canonical since it's already marked
165
+ // from being transitively anchored by its descendant
166
+ }
167
+ // If no confirmed anchor, we keep the transitive canonicalization status
99
168
}
100
169
}
170
+ CanonicalStage :: SeenTxs | CanonicalStage :: LeftOverTxs | CanonicalStage :: Finished => {
171
+ // These stages don't generate queries and shouldn't receive responses
172
+ debug_assert ! (
173
+ false ,
174
+ "resolve_query called for stage {:?} which doesn't generate queries" ,
175
+ self . current_stage
176
+ ) ;
177
+ }
101
178
}
102
179
}
103
180
104
181
fn is_finished ( & mut self ) -> bool {
105
- self . unprocessed_anchored_txs . is_empty ( )
182
+ // Try to advance stages first
183
+ self . try_advance ( ) ;
184
+ // Check if we've reached the Finished stage
185
+ self . current_stage == CanonicalStage :: Finished
106
186
}
107
187
108
- fn finish ( mut self ) -> Self :: Output {
109
- // Process remaining transactions (seen and leftover)
110
- self . process_seen_txs ( ) ;
111
- self . process_leftover_txs ( ) ;
112
-
188
+ fn finish ( self ) -> Self :: Output {
113
189
// Build the canonical view
114
190
let mut view_order = Vec :: new ( ) ;
115
191
let mut view_txs = HashMap :: new ( ) ;
@@ -138,7 +214,7 @@ impl<'g, A: Anchor> ChainQuery for CanonicalizationTask<'g, A> {
138
214
// Determine chain position based on reason
139
215
let chain_position = match reason {
140
216
CanonicalReason :: Assumed { descendant } => match descendant {
141
- Some ( _) => match self . confirmed_anchors . get ( txid) {
217
+ Some ( _) => match self . direct_anchors . get ( txid) {
142
218
Some ( confirmed_anchor) => ChainPosition :: Confirmed {
143
219
anchor : confirmed_anchor,
144
220
transitively : None ,
@@ -154,7 +230,7 @@ impl<'g, A: Anchor> ChainQuery for CanonicalizationTask<'g, A> {
154
230
} ,
155
231
} ,
156
232
CanonicalReason :: Anchor { anchor, descendant } => match descendant {
157
- Some ( _) => match self . confirmed_anchors . get ( txid) {
233
+ Some ( _) => match self . direct_anchors . get ( txid) {
158
234
Some ( confirmed_anchor) => ChainPosition :: Confirmed {
159
235
anchor : confirmed_anchor,
160
236
transitively : None ,
@@ -190,6 +266,49 @@ impl<'g, A: Anchor> ChainQuery for CanonicalizationTask<'g, A> {
190
266
}
191
267
192
268
impl < ' g , A : Anchor > CanonicalizationTask < ' g , A > {
269
+ /// Try to advance to the next stage if the current stage is complete.
270
+ /// The loop continues through stages that process all their transactions at once
271
+ /// (SeenTxs and LeftOverTxs) to avoid needing multiple calls.
272
+ fn try_advance ( & mut self ) {
273
+ loop {
274
+ let advanced = match self . current_stage {
275
+ CanonicalStage :: AnchoredTxs => {
276
+ if self . unprocessed_anchored_txs . is_empty ( ) {
277
+ self . current_stage = CanonicalStage :: SeenTxs ;
278
+ true // Continue to process SeenTxs immediately
279
+ } else {
280
+ false // Still have work, stop advancing
281
+ }
282
+ }
283
+ CanonicalStage :: SeenTxs => {
284
+ // Process all seen transactions at once
285
+ self . process_seen_txs ( ) ;
286
+ self . current_stage = CanonicalStage :: LeftOverTxs ;
287
+ true // Continue to process LeftOverTxs immediately
288
+ }
289
+ CanonicalStage :: LeftOverTxs => {
290
+ // Process all leftover transactions at once
291
+ self . process_leftover_txs ( ) ;
292
+ self . current_stage = CanonicalStage :: TransitivelyAnchoredTxs ;
293
+ false // Stop here - TransitivelyAnchoredTxs need queries
294
+ }
295
+ CanonicalStage :: TransitivelyAnchoredTxs => {
296
+ if self . unprocessed_transitively_anchored_txs . is_empty ( ) {
297
+ self . current_stage = CanonicalStage :: Finished ;
298
+ }
299
+ false // Stop advancing
300
+ }
301
+ CanonicalStage :: Finished => {
302
+ false // Already finished, nothing to do
303
+ }
304
+ } ;
305
+
306
+ if !advanced {
307
+ break ;
308
+ }
309
+ }
310
+ }
311
+
193
312
/// Creates a new canonicalization task.
194
313
pub fn new (
195
314
tx_graph : & ' g TxGraph < A > ,
@@ -222,12 +341,14 @@ impl<'g, A: Anchor> CanonicalizationTask<'g, A> {
222
341
unprocessed_anchored_txs,
223
342
unprocessed_seen_txs,
224
343
unprocessed_leftover_txs : VecDeque :: new ( ) ,
344
+ unprocessed_transitively_anchored_txs : VecDeque :: new ( ) ,
225
345
226
346
canonical : HashMap :: new ( ) ,
227
347
not_canonical : HashSet :: new ( ) ,
228
348
229
349
canonical_order : Vec :: new ( ) ,
230
- confirmed_anchors : HashMap :: new ( ) ,
350
+ direct_anchors : HashMap :: new ( ) ,
351
+ current_stage : CanonicalStage :: AnchoredTxs ,
231
352
} ;
232
353
233
354
// process assumed transactions first (they don't need queries)
@@ -342,30 +463,28 @@ impl<'g, A: Anchor> CanonicalizationTask<'g, A> {
342
463
for txid in undo_not_canonical {
343
464
self . not_canonical . remove ( & txid) ;
344
465
}
345
- } else {
346
- // Add to canonical order
347
- for ( txid, tx, reason) in & staged_canonical {
348
- self . canonical_order . push ( * txid) ;
349
-
350
- // If this was marked transitively, check if it has anchors to verify
351
- let is_transitive = matches ! (
352
- reason,
353
- CanonicalReason :: Anchor {
354
- descendant: Some ( _) ,
355
- ..
356
- } | CanonicalReason :: Assumed {
357
- descendant: Some ( _) ,
358
- ..
359
- }
360
- ) ;
466
+ return ;
467
+ }
361
468
362
- if is_transitive {
363
- if let Some ( anchors) = self . tx_graph . all_anchors ( ) . get ( txid) {
364
- // only check anchors we haven't already confirmed
365
- if !self . confirmed_anchors . contains_key ( txid) {
366
- self . unprocessed_anchored_txs
367
- . push_back ( ( * txid, tx. clone ( ) , anchors) ) ;
368
- }
469
+ // Add to canonical order
470
+ for ( txid, tx, reason) in & staged_canonical {
471
+ self . canonical_order . push ( * txid) ;
472
+
473
+ // ObservedIn transactions don't need anchor verification
474
+ if matches ! ( reason, CanonicalReason :: ObservedIn { .. } ) {
475
+ continue ;
476
+ }
477
+
478
+ // Check if this transaction was marked transitively and needs its own anchors verified
479
+ if reason. is_transitive ( ) {
480
+ if let Some ( anchors) = self . tx_graph . all_anchors ( ) . get ( txid) {
481
+ // only check anchors we haven't already confirmed
482
+ if !self . direct_anchors . contains_key ( txid) {
483
+ self . unprocessed_transitively_anchored_txs . push_back ( (
484
+ * txid,
485
+ tx. clone ( ) ,
486
+ anchors,
487
+ ) ) ;
369
488
}
370
489
}
371
490
}
@@ -460,6 +579,12 @@ impl<A: Clone> CanonicalReason<A> {
460
579
CanonicalReason :: ObservedIn { descendant, .. } => descendant,
461
580
}
462
581
}
582
+
583
+ /// Returns true if this reason represents a transitive canonicalization
584
+ /// (i.e., the transaction is canonical because of its descendant).
585
+ pub fn is_transitive ( & self ) -> bool {
586
+ self . descendant ( ) . is_some ( )
587
+ }
463
588
}
464
589
465
590
#[ cfg( test) ]
0 commit comments