Skip to content

Commit d3a4bae

Browse files
committed
refactor(chain): use single queue for anchored txs in canonicalization
- convert `unprocessed_anchored_txs` from iterator to `VecDeque` - remove `pending_anchor_checks` queue entirely - collect anchored transactions upfront instead of lazy iteration - make `LocalChain::canonicalize()` generic over `ChainQuery` trait
1 parent f6c8b02 commit d3a4bae

File tree

1 file changed

+37
-49
lines changed

1 file changed

+37
-49
lines changed

crates/chain/src/canonical_task.rs

Lines changed: 37 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,13 @@ pub struct CanonicalizationTask<'g, A> {
2727
chain_tip: BlockId,
2828

2929
unprocessed_assumed_txs: Box<dyn Iterator<Item = (Txid, Arc<Transaction>)> + 'g>,
30-
unprocessed_anchored_txs:
31-
Box<dyn Iterator<Item = (Txid, Arc<Transaction>, &'g BTreeSet<A>)> + 'g>,
30+
unprocessed_anchored_txs: VecDeque<(Txid, Arc<Transaction>, &'g BTreeSet<A>)>,
3231
unprocessed_seen_txs: Box<dyn Iterator<Item = (Txid, Arc<Transaction>, u64)> + 'g>,
3332
unprocessed_leftover_txs: VecDeque<(Txid, Arc<Transaction>, u32)>,
3433

3534
canonical: CanonicalMap<A>,
3635
not_canonical: NotCanonicalSet,
3736

38-
pending_anchor_checks: VecDeque<(Txid, Arc<Transaction>, Vec<A>)>,
39-
4037
// Store canonical transactions in order
4138
canonical_order: Vec<Txid>,
4239

@@ -48,22 +45,30 @@ impl<'g, A: Anchor> ChainQuery for CanonicalizationTask<'g, A> {
4845
type Output = CanonicalView<A>;
4946

5047
fn next_query(&mut self) -> Option<ChainRequest> {
51-
// Check if we have pending anchor checks
52-
if let Some((_, _, anchors)) = self.pending_anchor_checks.front() {
53-
// Convert anchors to BlockIds for the ChainRequest
48+
// Find the next non-canonicalized transaction to query
49+
if let Some((_txid, _, anchors)) = self.unprocessed_anchored_txs.front() {
50+
// if !self.is_canonicalized(*txid) {
51+
// // Build query for this transaction
52+
// let block_ids = anchors.iter().map(|anchor| anchor.anchor_block()).collect();
53+
// return Some(ChainRequest {
54+
// chain_tip: self.chain_tip,
55+
// block_ids,
56+
// });
57+
// }
58+
// // Skip already canonicalized transaction
59+
// self.unprocessed_anchored_txs.pop_front();
60+
// Build query for this transaction
5461
let block_ids = anchors.iter().map(|anchor| anchor.anchor_block()).collect();
5562
return Some(ChainRequest {
5663
chain_tip: self.chain_tip,
5764
block_ids,
5865
});
5966
}
60-
61-
// Process more anchored transactions if available
62-
self.process_anchored_txs()
67+
None
6368
}
6469

6570
fn resolve_query(&mut self, response: ChainResponse) {
66-
if let Some((txid, tx, anchors)) = self.pending_anchor_checks.pop_front() {
71+
if let Some((txid, tx, anchors)) = self.unprocessed_anchored_txs.pop_front() {
6772
// Find the anchor that matches the confirmed BlockId
6873
let best_anchor = response.and_then(|block_id| {
6974
anchors
@@ -81,23 +86,23 @@ impl<'g, A: Anchor> ChainQuery for CanonicalizationTask<'g, A> {
8186
}
8287
None => {
8388
self.unprocessed_leftover_txs.push_back((
84-
txid,
85-
tx,
86-
anchors
87-
.iter()
88-
.last()
89-
.expect(
90-
"tx taken from `unprocessed_txs_with_anchors` so it must at least have an anchor",
91-
)
92-
.confirmation_height_upper_bound(),
93-
))
89+
txid,
90+
tx,
91+
anchors
92+
.iter()
93+
.last()
94+
.expect(
95+
"tx taken from `unprocessed_txs_with_anchors` so it must at least have an anchor",
96+
)
97+
.confirmation_height_upper_bound(),
98+
))
9499
}
95100
}
96101
}
97102
}
98103

99104
fn is_finished(&mut self) -> bool {
100-
self.pending_anchor_checks.is_empty() && self.unprocessed_anchored_txs.size_hint().0 == 0
105+
self.unprocessed_anchored_txs.is_empty()
101106
}
102107

103108
fn finish(mut self) -> Self::Output {
@@ -134,8 +139,8 @@ impl<'g, A: Anchor> ChainQuery for CanonicalizationTask<'g, A> {
134139
let chain_position = match reason {
135140
CanonicalReason::Assumed { descendant } => match descendant {
136141
Some(_) => match self.confirmed_anchors.get(txid) {
137-
Some(anchor) => ChainPosition::Confirmed {
138-
anchor,
142+
Some(confirmed_anchor) => ChainPosition::Confirmed {
143+
anchor: confirmed_anchor,
139144
transitively: None,
140145
},
141146
None => ChainPosition::Unconfirmed {
@@ -150,8 +155,8 @@ impl<'g, A: Anchor> ChainQuery for CanonicalizationTask<'g, A> {
150155
},
151156
CanonicalReason::Anchor { anchor, descendant } => match descendant {
152157
Some(_) => match self.confirmed_anchors.get(txid) {
153-
Some(anchor) => ChainPosition::Confirmed {
154-
anchor,
158+
Some(confirmed_anchor) => ChainPosition::Confirmed {
159+
anchor: confirmed_anchor,
155160
transitively: None,
156161
},
157162
None => ChainPosition::Confirmed {
@@ -199,11 +204,10 @@ impl<'g, A: Anchor> CanonicalizationTask<'g, A> {
199204
.rev()
200205
.filter_map(|txid| Some((txid, tx_graph.get_tx(txid)?))),
201206
);
202-
let unprocessed_anchored_txs = Box::new(
203-
tx_graph
204-
.txids_by_descending_anchor_height()
205-
.filter_map(|(_, txid)| Some((txid, tx_graph.get_tx(txid)?, anchors.get(&txid)?))),
206-
);
207+
let unprocessed_anchored_txs: VecDeque<_> = tx_graph
208+
.txids_by_descending_anchor_height()
209+
.filter_map(|(_, txid)| Some((txid, tx_graph.get_tx(txid)?, anchors.get(&txid)?)))
210+
.collect();
207211
let unprocessed_seen_txs = Box::new(
208212
tx_graph
209213
.txids_by_descending_last_seen()
@@ -222,8 +226,6 @@ impl<'g, A: Anchor> CanonicalizationTask<'g, A> {
222226
canonical: HashMap::new(),
223227
not_canonical: HashSet::new(),
224228

225-
pending_anchor_checks: VecDeque::new(),
226-
227229
canonical_order: Vec::new(),
228230
confirmed_anchors: HashMap::new(),
229231
};
@@ -246,17 +248,6 @@ impl<'g, A: Anchor> CanonicalizationTask<'g, A> {
246248
}
247249
}
248250

249-
fn process_anchored_txs(&mut self) -> Option<ChainRequest> {
250-
while let Some((txid, tx, anchors)) = self.unprocessed_anchored_txs.next() {
251-
if !self.is_canonicalized(txid) {
252-
self.pending_anchor_checks
253-
.push_back((txid, tx, anchors.iter().cloned().collect()));
254-
return self.next_query();
255-
}
256-
}
257-
None
258-
}
259-
260251
fn process_seen_txs(&mut self) {
261252
while let Some((txid, tx, last_seen)) = self.unprocessed_seen_txs.next() {
262253
debug_assert!(
@@ -372,11 +363,8 @@ impl<'g, A: Anchor> CanonicalizationTask<'g, A> {
372363
if let Some(anchors) = self.tx_graph.all_anchors().get(txid) {
373364
// only check anchors we haven't already confirmed
374365
if !self.confirmed_anchors.contains_key(txid) {
375-
self.pending_anchor_checks.push_back((
376-
*txid,
377-
tx.clone(),
378-
anchors.iter().cloned().collect(),
379-
));
366+
self.unprocessed_anchored_txs
367+
.push_back((*txid, tx.clone(), anchors));
380368
}
381369
}
382370
}

0 commit comments

Comments
 (0)