@@ -27,16 +27,13 @@ pub struct CanonicalizationTask<'g, A> {
27
27
chain_tip : BlockId ,
28
28
29
29
unprocessed_assumed_txs : Box < dyn Iterator < Item = ( Txid , Arc < Transaction > ) > + ' g > ,
30
- unprocessed_anchored_txs :
31
- Box < dyn Iterator < Item = ( Txid , Arc < Transaction > , & ' g BTreeSet < A > ) > + ' g > ,
30
+ unprocessed_anchored_txs : VecDeque < ( Txid , Arc < Transaction > , & ' g BTreeSet < A > ) > ,
32
31
unprocessed_seen_txs : Box < dyn Iterator < Item = ( Txid , Arc < Transaction > , u64 ) > + ' g > ,
33
32
unprocessed_leftover_txs : VecDeque < ( Txid , Arc < Transaction > , u32 ) > ,
34
33
35
34
canonical : CanonicalMap < A > ,
36
35
not_canonical : NotCanonicalSet ,
37
36
38
- pending_anchor_checks : VecDeque < ( Txid , Arc < Transaction > , Vec < A > ) > ,
39
-
40
37
// Store canonical transactions in order
41
38
canonical_order : Vec < Txid > ,
42
39
@@ -48,22 +45,30 @@ impl<'g, A: Anchor> ChainQuery for CanonicalizationTask<'g, A> {
48
45
type Output = CanonicalView < A > ;
49
46
50
47
fn next_query ( & mut self ) -> Option < ChainRequest > {
51
- // Check if we have pending anchor checks
52
- if let Some ( ( _, _, anchors) ) = self . pending_anchor_checks . front ( ) {
53
- // Convert anchors to BlockIds for the ChainRequest
48
+ // Find the next non-canonicalized transaction to query
49
+ if let Some ( ( _txid, _, anchors) ) = self . unprocessed_anchored_txs . front ( ) {
50
+ // if !self.is_canonicalized(*txid) {
51
+ // // Build query for this transaction
52
+ // let block_ids = anchors.iter().map(|anchor| anchor.anchor_block()).collect();
53
+ // return Some(ChainRequest {
54
+ // chain_tip: self.chain_tip,
55
+ // block_ids,
56
+ // });
57
+ // }
58
+ // // Skip already canonicalized transaction
59
+ // self.unprocessed_anchored_txs.pop_front();
60
+ // Build query for this transaction
54
61
let block_ids = anchors. iter ( ) . map ( |anchor| anchor. anchor_block ( ) ) . collect ( ) ;
55
62
return Some ( ChainRequest {
56
63
chain_tip : self . chain_tip ,
57
64
block_ids,
58
65
} ) ;
59
66
}
60
-
61
- // Process more anchored transactions if available
62
- self . process_anchored_txs ( )
67
+ None
63
68
}
64
69
65
70
fn resolve_query ( & mut self , response : ChainResponse ) {
66
- if let Some ( ( txid, tx, anchors) ) = self . pending_anchor_checks . pop_front ( ) {
71
+ if let Some ( ( txid, tx, anchors) ) = self . unprocessed_anchored_txs . pop_front ( ) {
67
72
// Find the anchor that matches the confirmed BlockId
68
73
let best_anchor = response. and_then ( |block_id| {
69
74
anchors
@@ -81,23 +86,23 @@ impl<'g, A: Anchor> ChainQuery for CanonicalizationTask<'g, A> {
81
86
}
82
87
None => {
83
88
self . unprocessed_leftover_txs . push_back ( (
84
- txid,
85
- tx,
86
- anchors
87
- . iter ( )
88
- . last ( )
89
- . expect (
90
- "tx taken from `unprocessed_txs_with_anchors` so it must at least have an anchor" ,
91
- )
92
- . confirmation_height_upper_bound ( ) ,
93
- ) )
89
+ txid,
90
+ tx,
91
+ anchors
92
+ . iter ( )
93
+ . last ( )
94
+ . expect (
95
+ "tx taken from `unprocessed_txs_with_anchors` so it must at least have an anchor" ,
96
+ )
97
+ . confirmation_height_upper_bound ( ) ,
98
+ ) )
94
99
}
95
100
}
96
101
}
97
102
}
98
103
99
104
fn is_finished ( & mut self ) -> bool {
100
- self . pending_anchor_checks . is_empty ( ) && self . unprocessed_anchored_txs . size_hint ( ) . 0 == 0
105
+ self . unprocessed_anchored_txs . is_empty ( )
101
106
}
102
107
103
108
fn finish ( mut self ) -> Self :: Output {
@@ -134,8 +139,8 @@ impl<'g, A: Anchor> ChainQuery for CanonicalizationTask<'g, A> {
134
139
let chain_position = match reason {
135
140
CanonicalReason :: Assumed { descendant } => match descendant {
136
141
Some ( _) => match self . confirmed_anchors . get ( txid) {
137
- Some ( anchor ) => ChainPosition :: Confirmed {
138
- anchor,
142
+ Some ( confirmed_anchor ) => ChainPosition :: Confirmed {
143
+ anchor : confirmed_anchor ,
139
144
transitively : None ,
140
145
} ,
141
146
None => ChainPosition :: Unconfirmed {
@@ -150,8 +155,8 @@ impl<'g, A: Anchor> ChainQuery for CanonicalizationTask<'g, A> {
150
155
} ,
151
156
CanonicalReason :: Anchor { anchor, descendant } => match descendant {
152
157
Some ( _) => match self . confirmed_anchors . get ( txid) {
153
- Some ( anchor ) => ChainPosition :: Confirmed {
154
- anchor,
158
+ Some ( confirmed_anchor ) => ChainPosition :: Confirmed {
159
+ anchor : confirmed_anchor ,
155
160
transitively : None ,
156
161
} ,
157
162
None => ChainPosition :: Confirmed {
@@ -199,11 +204,10 @@ impl<'g, A: Anchor> CanonicalizationTask<'g, A> {
199
204
. rev ( )
200
205
. filter_map ( |txid| Some ( ( txid, tx_graph. get_tx ( txid) ?) ) ) ,
201
206
) ;
202
- let unprocessed_anchored_txs = Box :: new (
203
- tx_graph
204
- . txids_by_descending_anchor_height ( )
205
- . filter_map ( |( _, txid) | Some ( ( txid, tx_graph. get_tx ( txid) ?, anchors. get ( & txid) ?) ) ) ,
206
- ) ;
207
+ let unprocessed_anchored_txs: VecDeque < _ > = tx_graph
208
+ . txids_by_descending_anchor_height ( )
209
+ . filter_map ( |( _, txid) | Some ( ( txid, tx_graph. get_tx ( txid) ?, anchors. get ( & txid) ?) ) )
210
+ . collect ( ) ;
207
211
let unprocessed_seen_txs = Box :: new (
208
212
tx_graph
209
213
. txids_by_descending_last_seen ( )
@@ -222,8 +226,6 @@ impl<'g, A: Anchor> CanonicalizationTask<'g, A> {
222
226
canonical : HashMap :: new ( ) ,
223
227
not_canonical : HashSet :: new ( ) ,
224
228
225
- pending_anchor_checks : VecDeque :: new ( ) ,
226
-
227
229
canonical_order : Vec :: new ( ) ,
228
230
confirmed_anchors : HashMap :: new ( ) ,
229
231
} ;
@@ -246,17 +248,6 @@ impl<'g, A: Anchor> CanonicalizationTask<'g, A> {
246
248
}
247
249
}
248
250
249
- fn process_anchored_txs ( & mut self ) -> Option < ChainRequest > {
250
- while let Some ( ( txid, tx, anchors) ) = self . unprocessed_anchored_txs . next ( ) {
251
- if !self . is_canonicalized ( txid) {
252
- self . pending_anchor_checks
253
- . push_back ( ( txid, tx, anchors. iter ( ) . cloned ( ) . collect ( ) ) ) ;
254
- return self . next_query ( ) ;
255
- }
256
- }
257
- None
258
- }
259
-
260
251
fn process_seen_txs ( & mut self ) {
261
252
while let Some ( ( txid, tx, last_seen) ) = self . unprocessed_seen_txs . next ( ) {
262
253
debug_assert ! (
@@ -372,11 +363,8 @@ impl<'g, A: Anchor> CanonicalizationTask<'g, A> {
372
363
if let Some ( anchors) = self . tx_graph . all_anchors ( ) . get ( txid) {
373
364
// only check anchors we haven't already confirmed
374
365
if !self . confirmed_anchors . contains_key ( txid) {
375
- self . pending_anchor_checks . push_back ( (
376
- * txid,
377
- tx. clone ( ) ,
378
- anchors. iter ( ) . cloned ( ) . collect ( ) ,
379
- ) ) ;
366
+ self . unprocessed_anchored_txs
367
+ . push_back ( ( * txid, tx. clone ( ) , anchors) ) ;
380
368
}
381
369
}
382
370
}
0 commit comments