diff --git a/crates/bitcoind_rpc/examples/filter_iter.rs b/crates/bitcoind_rpc/examples/filter_iter.rs index e79bde672..b7df2a02d 100644 --- a/crates/bitcoind_rpc/examples/filter_iter.rs +++ b/crates/bitcoind_rpc/examples/filter_iter.rs @@ -69,7 +69,9 @@ fn main() -> anyhow::Result<()> { println!("\ntook: {}s", start.elapsed().as_secs()); println!("Local tip: {}", chain.tip().height()); - let canonical_view = graph.canonical_view(&chain, chain.tip().block_id(), Default::default()); + let chain_tip = chain.tip().block_id(); + let task = graph.canonicalization_task(chain_tip, Default::default()); + let canonical_view = chain.canonicalize(task); let unspent: Vec<_> = canonical_view .filter_unspent_outpoints(graph.index.outpoints().clone()) diff --git a/crates/bitcoind_rpc/tests/test_emitter.rs b/crates/bitcoind_rpc/tests/test_emitter.rs index 6453037e6..ec5cdc69e 100644 --- a/crates/bitcoind_rpc/tests/test_emitter.rs +++ b/crates/bitcoind_rpc/tests/test_emitter.rs @@ -310,9 +310,12 @@ fn get_balance( ) -> anyhow::Result { let chain_tip = recv_chain.tip().block_id(); let outpoints = recv_graph.index.outpoints().clone(); - let balance = recv_graph - .canonical_view(recv_chain, chain_tip, CanonicalizationParams::default()) - .balance(outpoints, |_, _| true, 1); + let task = recv_graph + .graph() + .canonicalization_task(chain_tip, CanonicalizationParams::default()); + let balance = recv_chain + .canonicalize(task) + .balance(outpoints, |_, _| true, 0); Ok(balance) } @@ -616,8 +619,9 @@ fn test_expect_tx_evicted() -> anyhow::Result<()> { let _txid_2 = core.send_raw_transaction(&tx1b)?; // Retrieve the expected unconfirmed txids and spks from the graph. - let exp_spk_txids = graph - .canonical_view(&chain, chain_tip, Default::default()) + let task = graph.canonicalization_task(chain_tip, Default::default()); + let exp_spk_txids = chain + .canonicalize(task) .list_expected_spk_txids(&graph.index, ..) .collect::>(); assert_eq!(exp_spk_txids, vec![(spk, txid_1)]); @@ -632,8 +636,11 @@ fn test_expect_tx_evicted() -> anyhow::Result<()> { // Update graph with evicted tx. let _ = graph.batch_insert_relevant_evicted_at(mempool_event.evicted); - let canonical_txids = graph - .canonical_view(&chain, chain_tip, CanonicalizationParams::default()) + let task = graph + .graph() + .canonicalization_task(chain_tip, CanonicalizationParams::default()); + let canonical_txids = chain + .canonicalize(task) .txs() .map(|tx| tx.txid) .collect::>(); diff --git a/crates/chain/benches/canonicalization.rs b/crates/chain/benches/canonicalization.rs index 3d8d8b295..456ca9b04 100644 --- a/crates/chain/benches/canonicalization.rs +++ b/crates/chain/benches/canonicalization.rs @@ -95,31 +95,31 @@ fn setup(f: F) -> (KeychainTxGraph, Lo } fn run_list_canonical_txs(tx_graph: &KeychainTxGraph, chain: &LocalChain, exp_txs: usize) { - let view = tx_graph.canonical_view( - chain, - chain.tip().block_id(), - CanonicalizationParams::default(), - ); + let chain_tip = chain.tip().block_id(); + let task = tx_graph + .graph() + .canonicalization_task(chain_tip, CanonicalizationParams::default()); + let view = chain.canonicalize(task); let txs = view.txs(); assert_eq!(txs.count(), exp_txs); } fn run_filter_chain_txouts(tx_graph: &KeychainTxGraph, chain: &LocalChain, exp_txos: usize) { - let view = tx_graph.canonical_view( - chain, - chain.tip().block_id(), - CanonicalizationParams::default(), - ); + let chain_tip = chain.tip().block_id(); + let task = tx_graph + .graph() + .canonicalization_task(chain_tip, CanonicalizationParams::default()); + let view = chain.canonicalize(task); let utxos = view.filter_outpoints(tx_graph.index.outpoints().clone()); assert_eq!(utxos.count(), exp_txos); } fn run_filter_chain_unspents(tx_graph: &KeychainTxGraph, chain: &LocalChain, exp_utxos: usize) { - let view = tx_graph.canonical_view( - chain, - chain.tip().block_id(), - CanonicalizationParams::default(), - ); + let chain_tip = chain.tip().block_id(); + let task = tx_graph + .graph() + .canonicalization_task(chain_tip, CanonicalizationParams::default()); + let view = chain.canonicalize(task); let utxos = view.filter_unspent_outpoints(tx_graph.index.outpoints().clone()); assert_eq!(utxos.count(), exp_utxos); } diff --git a/crates/chain/benches/indexer.rs b/crates/chain/benches/indexer.rs index 3caea42d2..7f3cd0dce 100644 --- a/crates/chain/benches/indexer.rs +++ b/crates/chain/benches/indexer.rs @@ -84,9 +84,10 @@ fn do_bench(indexed_tx_graph: &KeychainTxGraph, chain: &LocalChain) { // Check balance let chain_tip = chain.tip().block_id(); let op = graph.index.outpoints().clone(); - let bal = graph - .canonical_view(chain, chain_tip, CanonicalizationParams::default()) - .balance(op, |_, _| false, 1); + let task = graph + .graph() + .canonicalization_task(chain_tip, CanonicalizationParams::default()); + let bal = chain.canonicalize(task).balance(op, |_, _| false, 1); assert_eq!(bal.total(), AMOUNT * TX_CT as u64); } diff --git a/crates/chain/src/canonical_iter.rs b/crates/chain/src/canonical_iter.rs deleted file mode 100644 index 204ead451..000000000 --- a/crates/chain/src/canonical_iter.rs +++ /dev/null @@ -1,344 +0,0 @@ -use crate::collections::{HashMap, HashSet, VecDeque}; -use crate::tx_graph::{TxAncestors, TxDescendants}; -use crate::{Anchor, ChainOracle, TxGraph}; -use alloc::boxed::Box; -use alloc::collections::BTreeSet; -use alloc::sync::Arc; -use alloc::vec::Vec; -use bdk_core::BlockId; -use bitcoin::{Transaction, Txid}; - -type CanonicalMap = HashMap, CanonicalReason)>; -type NotCanonicalSet = HashSet; - -/// Modifies the canonicalization algorithm. -#[derive(Debug, Default, Clone)] -pub struct CanonicalizationParams { - /// Transactions that will supercede all other transactions. - /// - /// In case of conflicting transactions within `assume_canonical`, transactions that appear - /// later in the list (have higher index) have precedence. - pub assume_canonical: Vec, -} - -/// Iterates over canonical txs. -pub struct CanonicalIter<'g, A, C> { - tx_graph: &'g TxGraph, - chain: &'g C, - chain_tip: BlockId, - - unprocessed_assumed_txs: Box)> + 'g>, - unprocessed_anchored_txs: - Box, &'g BTreeSet)> + 'g>, - unprocessed_seen_txs: Box, u64)> + 'g>, - unprocessed_leftover_txs: VecDeque<(Txid, Arc, u32)>, - - canonical: CanonicalMap, - not_canonical: NotCanonicalSet, - - queue: VecDeque, -} - -impl<'g, A: Anchor, C: ChainOracle> CanonicalIter<'g, A, C> { - /// Constructs [`CanonicalIter`]. - pub fn new( - tx_graph: &'g TxGraph, - chain: &'g C, - chain_tip: BlockId, - params: CanonicalizationParams, - ) -> Self { - let anchors = tx_graph.all_anchors(); - let unprocessed_assumed_txs = Box::new( - params - .assume_canonical - .into_iter() - .rev() - .filter_map(|txid| Some((txid, tx_graph.get_tx(txid)?))), - ); - let unprocessed_anchored_txs = Box::new( - tx_graph - .txids_by_descending_anchor_height() - .filter_map(|(_, txid)| Some((txid, tx_graph.get_tx(txid)?, anchors.get(&txid)?))), - ); - let unprocessed_seen_txs = Box::new( - tx_graph - .txids_by_descending_last_seen() - .filter_map(|(last_seen, txid)| Some((txid, tx_graph.get_tx(txid)?, last_seen))), - ); - Self { - tx_graph, - chain, - chain_tip, - unprocessed_assumed_txs, - unprocessed_anchored_txs, - unprocessed_seen_txs, - unprocessed_leftover_txs: VecDeque::new(), - canonical: HashMap::new(), - not_canonical: HashSet::new(), - queue: VecDeque::new(), - } - } - - /// Whether this transaction is already canonicalized. - fn is_canonicalized(&self, txid: Txid) -> bool { - self.canonical.contains_key(&txid) || self.not_canonical.contains(&txid) - } - - /// Mark transaction as canonical if it is anchored in the best chain. - fn scan_anchors( - &mut self, - txid: Txid, - tx: Arc, - anchors: &BTreeSet, - ) -> Result<(), C::Error> { - for anchor in anchors { - let in_chain_opt = self - .chain - .is_block_in_chain(anchor.anchor_block(), self.chain_tip)?; - if in_chain_opt == Some(true) { - self.mark_canonical(txid, tx, CanonicalReason::from_anchor(anchor.clone())); - return Ok(()); - } - } - // cannot determine - self.unprocessed_leftover_txs.push_back(( - txid, - tx, - anchors - .iter() - .last() - .expect( - "tx taken from `unprocessed_txs_with_anchors` so it must atleast have an anchor", - ) - .confirmation_height_upper_bound(), - )); - Ok(()) - } - - /// Marks `tx` and it's ancestors as canonical and mark all conflicts of these as - /// `not_canonical`. - /// - /// The exception is when it is discovered that `tx` double spends itself (i.e. two of it's - /// inputs conflict with each other), then no changes will be made. - /// - /// The logic works by having two loops where one is nested in another. - /// * The outer loop iterates through ancestors of `tx` (including `tx`). We can transitively - /// assume that all ancestors of `tx` are also canonical. - /// * The inner loop loops through conflicts of ancestors of `tx`. Any descendants of conflicts - /// are also conflicts and are transitively considered non-canonical. - /// - /// If the inner loop ends up marking `tx` as non-canonical, then we know that it double spends - /// itself. - fn mark_canonical(&mut self, txid: Txid, tx: Arc, reason: CanonicalReason) { - let starting_txid = txid; - let mut is_starting_tx = true; - - // We keep track of changes made so far so that we can undo it later in case we detect that - // `tx` double spends itself. - let mut detected_self_double_spend = false; - let mut undo_not_canonical = Vec::::new(); - - // `staged_queue` doubles as the `undo_canonical` data. - let staged_queue = TxAncestors::new_include_root( - self.tx_graph, - tx, - |_: usize, tx: Arc| -> Option { - let this_txid = tx.compute_txid(); - let this_reason = if is_starting_tx { - is_starting_tx = false; - reason.clone() - } else { - reason.to_transitive(starting_txid) - }; - - use crate::collections::hash_map::Entry; - let canonical_entry = match self.canonical.entry(this_txid) { - // Already visited tx before, exit early. - Entry::Occupied(_) => return None, - Entry::Vacant(entry) => entry, - }; - - // Any conflicts with a canonical tx can be added to `not_canonical`. Descendants - // of `not_canonical` txs can also be added to `not_canonical`. - for (_, conflict_txid) in self.tx_graph.direct_conflicts(&tx) { - TxDescendants::new_include_root( - self.tx_graph, - conflict_txid, - |_: usize, txid: Txid| -> Option<()> { - if self.not_canonical.insert(txid) { - undo_not_canonical.push(txid); - Some(()) - } else { - None - } - }, - ) - .run_until_finished() - } - - if self.not_canonical.contains(&this_txid) { - // Early exit if self-double-spend is detected. - detected_self_double_spend = true; - return None; - } - canonical_entry.insert((tx, this_reason)); - Some(this_txid) - }, - ) - .collect::>(); - - if detected_self_double_spend { - for txid in staged_queue { - self.canonical.remove(&txid); - } - for txid in undo_not_canonical { - self.not_canonical.remove(&txid); - } - } else { - self.queue.extend(staged_queue); - } - } -} - -impl Iterator for CanonicalIter<'_, A, C> { - type Item = Result<(Txid, Arc, CanonicalReason), C::Error>; - - fn next(&mut self) -> Option { - loop { - if let Some(txid) = self.queue.pop_front() { - let (tx, reason) = self - .canonical - .get(&txid) - .cloned() - .expect("reason must exist"); - return Some(Ok((txid, tx, reason))); - } - - if let Some((txid, tx)) = self.unprocessed_assumed_txs.next() { - if !self.is_canonicalized(txid) { - self.mark_canonical(txid, tx, CanonicalReason::assumed()); - } - } - - if let Some((txid, tx, anchors)) = self.unprocessed_anchored_txs.next() { - if !self.is_canonicalized(txid) { - if let Err(err) = self.scan_anchors(txid, tx, anchors) { - return Some(Err(err)); - } - } - continue; - } - - if let Some((txid, tx, last_seen)) = self.unprocessed_seen_txs.next() { - debug_assert!( - !tx.is_coinbase(), - "Coinbase txs must not have `last_seen` (in mempool) value" - ); - if !self.is_canonicalized(txid) { - let observed_in = ObservedIn::Mempool(last_seen); - self.mark_canonical(txid, tx, CanonicalReason::from_observed_in(observed_in)); - } - continue; - } - - if let Some((txid, tx, height)) = self.unprocessed_leftover_txs.pop_front() { - if !self.is_canonicalized(txid) && !tx.is_coinbase() { - let observed_in = ObservedIn::Block(height); - self.mark_canonical(txid, tx, CanonicalReason::from_observed_in(observed_in)); - } - continue; - } - - return None; - } - } -} - -/// Represents when and where a transaction was last observed in. -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] -pub enum ObservedIn { - /// The transaction was last observed in a block of height. - Block(u32), - /// The transaction was last observed in the mempool at the given unix timestamp. - Mempool(u64), -} - -/// The reason why a transaction is canonical. -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum CanonicalReason { - /// This transaction is explicitly assumed to be canonical by the caller, superceding all other - /// canonicalization rules. - Assumed { - /// Whether it is a descendant that is assumed to be canonical. - descendant: Option, - }, - /// This transaction is anchored in the best chain by `A`, and therefore canonical. - Anchor { - /// The anchor that anchored the transaction in the chain. - anchor: A, - /// Whether the anchor is of the transaction's descendant. - descendant: Option, - }, - /// This transaction does not conflict with any other transaction with a more recent - /// [`ObservedIn`] value or one that is anchored in the best chain. - ObservedIn { - /// The [`ObservedIn`] value of the transaction. - observed_in: ObservedIn, - /// Whether the [`ObservedIn`] value is of the transaction's descendant. - descendant: Option, - }, -} - -impl CanonicalReason { - /// Constructs a [`CanonicalReason`] for a transaction that is assumed to supercede all other - /// transactions. - pub fn assumed() -> Self { - Self::Assumed { descendant: None } - } - - /// Constructs a [`CanonicalReason`] from an `anchor`. - pub fn from_anchor(anchor: A) -> Self { - Self::Anchor { - anchor, - descendant: None, - } - } - - /// Constructs a [`CanonicalReason`] from an `observed_in` value. - pub fn from_observed_in(observed_in: ObservedIn) -> Self { - Self::ObservedIn { - observed_in, - descendant: None, - } - } - - /// Contruct a new [`CanonicalReason`] from the original which is transitive to `descendant`. - /// - /// This signals that either the [`ObservedIn`] or [`Anchor`] value belongs to the transaction's - /// descendant, but is transitively relevant. - pub fn to_transitive(&self, descendant: Txid) -> Self { - match self { - CanonicalReason::Assumed { .. } => Self::Assumed { - descendant: Some(descendant), - }, - CanonicalReason::Anchor { anchor, .. } => Self::Anchor { - anchor: anchor.clone(), - descendant: Some(descendant), - }, - CanonicalReason::ObservedIn { observed_in, .. } => Self::ObservedIn { - observed_in: *observed_in, - descendant: Some(descendant), - }, - } - } - - /// This signals that either the [`ObservedIn`] or [`Anchor`] value belongs to the transaction's - /// descendant. - pub fn descendant(&self) -> &Option { - match self { - CanonicalReason::Assumed { descendant, .. } => descendant, - CanonicalReason::Anchor { descendant, .. } => descendant, - CanonicalReason::ObservedIn { descendant, .. } => descendant, - } - } -} diff --git a/crates/chain/src/canonical_task.rs b/crates/chain/src/canonical_task.rs new file mode 100644 index 000000000..c41e267b3 --- /dev/null +++ b/crates/chain/src/canonical_task.rs @@ -0,0 +1,644 @@ +use crate::collections::{HashMap, HashSet, VecDeque}; +use crate::tx_graph::{TxAncestors, TxDescendants}; +use crate::{Anchor, CanonicalView, ChainPosition, TxGraph}; +use alloc::boxed::Box; +use alloc::collections::BTreeSet; +use alloc::sync::Arc; +use alloc::vec::Vec; +use bdk_core::{BlockId, ChainQuery, ChainRequest, ChainResponse}; +use bitcoin::{Transaction, Txid}; + +type CanonicalMap = HashMap, CanonicalReason)>; +type NotCanonicalSet = HashSet; + +/// Represents the current stage of canonicalization processing. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum CanonicalStage { + /// Processing directly anchored transactions. + AnchoredTxs, + /// Processing transactions seen in mempool. + SeenTxs, + /// Processing leftover transactions. + LeftOverTxs, + /// Processing transitively anchored transactions. + TransitivelyAnchoredTxs, + /// All processing is complete. + Finished, +} + +/// Modifies the canonicalization algorithm. +#[derive(Debug, Default, Clone)] +pub struct CanonicalizationParams { + /// Transactions that will supersede all other transactions. + /// + /// In case of conflicting transactions within `assume_canonical`, transactions that appear + /// later in the list (have higher index) have precedence. + pub assume_canonical: Vec, +} + +/// Manages the canonicalization process without direct I/O operations. +pub struct CanonicalizationTask<'g, A> { + tx_graph: &'g TxGraph, + chain_tip: BlockId, + + unprocessed_assumed_txs: Box)> + 'g>, + unprocessed_anchored_txs: VecDeque<(Txid, Arc, &'g BTreeSet)>, + unprocessed_seen_txs: Box, u64)> + 'g>, + unprocessed_leftover_txs: VecDeque<(Txid, Arc, u32)>, + unprocessed_transitively_anchored_txs: VecDeque<(Txid, Arc, &'g BTreeSet)>, + + canonical: CanonicalMap, + not_canonical: NotCanonicalSet, + + // Store canonical transactions in order + canonical_order: Vec, + + // Track which transactions have direct anchors (not transitive) + direct_anchors: HashMap, + + // Track the current stage of processing + current_stage: CanonicalStage, +} + +impl<'g, A: Anchor> ChainQuery for CanonicalizationTask<'g, A> { + type Output = CanonicalView; + + fn next_query(&mut self) -> Option { + // Try to advance to the next stage if needed + self.try_advance(); + + match self.current_stage { + CanonicalStage::AnchoredTxs => { + // Process directly anchored transactions first + if let Some((_txid, _, anchors)) = self.unprocessed_anchored_txs.front() { + let block_ids = anchors.iter().map(|anchor| anchor.anchor_block()).collect(); + return Some(ChainRequest { + chain_tip: self.chain_tip, + block_ids, + }); + } + None + } + CanonicalStage::TransitivelyAnchoredTxs => { + // Process transitively anchored transactions last + if let Some((_txid, _, anchors)) = + self.unprocessed_transitively_anchored_txs.front() + { + let block_ids = anchors.iter().map(|anchor| anchor.anchor_block()).collect(); + return Some(ChainRequest { + chain_tip: self.chain_tip, + block_ids, + }); + } + None + } + CanonicalStage::SeenTxs | CanonicalStage::LeftOverTxs | CanonicalStage::Finished => { + // These stages don't need queries + None + } + } + } + + fn resolve_query(&mut self, response: ChainResponse) { + // Only AnchoredTxs and TransitivelyAnchoredTxs stages should receive query + // responses Other stages don't generate queries and thus shouldn't call + // resolve_query + match self.current_stage { + CanonicalStage::AnchoredTxs => { + // Process directly anchored transaction response + if let Some((txid, tx, anchors)) = self.unprocessed_anchored_txs.pop_front() { + // Find the anchor that matches the confirmed BlockId + let best_anchor = response.and_then(|block_id| { + anchors + .iter() + .find(|anchor| anchor.anchor_block() == block_id) + .cloned() + }); + + match best_anchor { + Some(best_anchor) => { + // Transaction has a confirmed anchor + self.direct_anchors.insert(txid, best_anchor.clone()); + if !self.is_canonicalized(txid) { + self.mark_canonical( + txid, + tx, + CanonicalReason::from_anchor(best_anchor), + ); + } + } + None => { + // No confirmed anchor found, add to leftover transactions for later + // processing + self.unprocessed_leftover_txs.push_back(( + txid, + tx, + anchors + .iter() + .last() + .expect( + "tx taken from `unprocessed_anchored_txs` so it must have at least one anchor", + ) + .confirmation_height_upper_bound(), + )) + } + } + } + } + CanonicalStage::TransitivelyAnchoredTxs => { + // Process transitively anchored transaction response + if let Some((txid, _tx, anchors)) = + self.unprocessed_transitively_anchored_txs.pop_front() + { + // Find the anchor that matches the confirmed BlockId + let best_anchor = response.and_then(|block_id| { + anchors + .iter() + .find(|anchor| anchor.anchor_block() == block_id) + .cloned() + }); + + if let Some(best_anchor) = best_anchor { + // Found a confirmed anchor for this transitively anchored transaction + self.direct_anchors.insert(txid, best_anchor.clone()); + // Note: We don't re-mark as canonical since it's already marked + // from being transitively anchored by its descendant + } + // If no confirmed anchor, we keep the transitive canonicalization status + } + } + CanonicalStage::SeenTxs | CanonicalStage::LeftOverTxs | CanonicalStage::Finished => { + // These stages don't generate queries and shouldn't receive responses + debug_assert!( + false, + "resolve_query called for stage {:?} which doesn't generate queries", + self.current_stage + ); + } + } + } + + fn is_finished(&mut self) -> bool { + // Try to advance stages first + self.try_advance(); + // Check if we've reached the Finished stage + self.current_stage == CanonicalStage::Finished + } + + fn finish(self) -> Self::Output { + // Build the canonical view + let mut view_order = Vec::new(); + let mut view_txs = HashMap::new(); + let mut view_spends = HashMap::new(); + + for txid in &self.canonical_order { + if let Some((tx, reason)) = self.canonical.get(txid) { + view_order.push(*txid); + + // Add spends + if !tx.is_coinbase() { + for input in &tx.input { + view_spends.insert(input.previous_output, *txid); + } + } + + // Get transaction node for first_seen/last_seen info + let tx_node = match self.tx_graph.get_tx_node(*txid) { + Some(tx_node) => tx_node, + None => { + debug_assert!(false, "tx node must exist!"); + continue; + } + }; + + // Determine chain position based on reason + let chain_position = match reason { + CanonicalReason::Assumed { descendant } => match descendant { + Some(_) => match self.direct_anchors.get(txid) { + Some(confirmed_anchor) => ChainPosition::Confirmed { + anchor: confirmed_anchor, + transitively: None, + }, + None => ChainPosition::Unconfirmed { + first_seen: tx_node.first_seen, + last_seen: tx_node.last_seen, + }, + }, + None => ChainPosition::Unconfirmed { + first_seen: tx_node.first_seen, + last_seen: tx_node.last_seen, + }, + }, + CanonicalReason::Anchor { anchor, descendant } => match descendant { + Some(_) => match self.direct_anchors.get(txid) { + Some(confirmed_anchor) => ChainPosition::Confirmed { + anchor: confirmed_anchor, + transitively: None, + }, + None => ChainPosition::Confirmed { + anchor, + transitively: *descendant, + }, + }, + None => ChainPosition::Confirmed { + anchor, + transitively: None, + }, + }, + CanonicalReason::ObservedIn { observed_in, .. } => match observed_in { + ObservedIn::Mempool(last_seen) => ChainPosition::Unconfirmed { + first_seen: tx_node.first_seen, + last_seen: Some(*last_seen), + }, + ObservedIn::Block(_) => ChainPosition::Unconfirmed { + first_seen: tx_node.first_seen, + last_seen: None, + }, + }, + }; + + view_txs.insert(*txid, (tx.clone(), chain_position.cloned())); + } + } + + CanonicalView::new(self.chain_tip, view_order, view_txs, view_spends) + } +} + +impl<'g, A: Anchor> CanonicalizationTask<'g, A> { + /// Try to advance to the next stage if the current stage is complete. + /// The loop continues through stages that process all their transactions at once + /// (SeenTxs and LeftOverTxs) to avoid needing multiple calls. + fn try_advance(&mut self) { + loop { + let advanced = match self.current_stage { + CanonicalStage::AnchoredTxs => { + if self.unprocessed_anchored_txs.is_empty() { + self.current_stage = CanonicalStage::SeenTxs; + true // Continue to process SeenTxs immediately + } else { + false // Still have work, stop advancing + } + } + CanonicalStage::SeenTxs => { + // Process all seen transactions at once + self.process_seen_txs(); + self.current_stage = CanonicalStage::LeftOverTxs; + true // Continue to process LeftOverTxs immediately + } + CanonicalStage::LeftOverTxs => { + // Process all leftover transactions at once + self.process_leftover_txs(); + self.current_stage = CanonicalStage::TransitivelyAnchoredTxs; + false // Stop here - TransitivelyAnchoredTxs need queries + } + CanonicalStage::TransitivelyAnchoredTxs => { + if self.unprocessed_transitively_anchored_txs.is_empty() { + self.current_stage = CanonicalStage::Finished; + } + false // Stop advancing + } + CanonicalStage::Finished => { + false // Already finished, nothing to do + } + }; + + if !advanced { + break; + } + } + } + + /// Creates a new canonicalization task. + pub fn new( + tx_graph: &'g TxGraph, + chain_tip: BlockId, + params: CanonicalizationParams, + ) -> Self { + let anchors = tx_graph.all_anchors(); + let unprocessed_assumed_txs = Box::new( + params + .assume_canonical + .into_iter() + .rev() + .filter_map(|txid| Some((txid, tx_graph.get_tx(txid)?))), + ); + let unprocessed_anchored_txs: VecDeque<_> = tx_graph + .txids_by_descending_anchor_height() + .filter_map(|(_, txid)| Some((txid, tx_graph.get_tx(txid)?, anchors.get(&txid)?))) + .collect(); + let unprocessed_seen_txs = Box::new( + tx_graph + .txids_by_descending_last_seen() + .filter_map(|(last_seen, txid)| Some((txid, tx_graph.get_tx(txid)?, last_seen))), + ); + + let mut task = Self { + tx_graph, + chain_tip, + + unprocessed_assumed_txs, + unprocessed_anchored_txs, + unprocessed_seen_txs, + unprocessed_leftover_txs: VecDeque::new(), + unprocessed_transitively_anchored_txs: VecDeque::new(), + + canonical: HashMap::new(), + not_canonical: HashSet::new(), + + canonical_order: Vec::new(), + direct_anchors: HashMap::new(), + current_stage: CanonicalStage::AnchoredTxs, + }; + + // process assumed transactions first (they don't need queries) + task.process_assumed_txs(); + + task + } + + fn is_canonicalized(&self, txid: Txid) -> bool { + self.canonical.contains_key(&txid) || self.not_canonical.contains(&txid) + } + + fn process_assumed_txs(&mut self) { + while let Some((txid, tx)) = self.unprocessed_assumed_txs.next() { + if !self.is_canonicalized(txid) { + self.mark_canonical(txid, tx, CanonicalReason::assumed()); + } + } + } + + fn process_seen_txs(&mut self) { + while let Some((txid, tx, last_seen)) = self.unprocessed_seen_txs.next() { + debug_assert!( + !tx.is_coinbase(), + "Coinbase txs must not have `last_seen` (in mempool) value" + ); + if !self.is_canonicalized(txid) { + let observed_in = ObservedIn::Mempool(last_seen); + self.mark_canonical(txid, tx, CanonicalReason::from_observed_in(observed_in)); + } + } + } + + fn process_leftover_txs(&mut self) { + while let Some((txid, tx, height)) = self.unprocessed_leftover_txs.pop_front() { + if !self.is_canonicalized(txid) && !tx.is_coinbase() { + let observed_in = ObservedIn::Block(height); + self.mark_canonical(txid, tx, CanonicalReason::from_observed_in(observed_in)); + } + } + } + + fn mark_canonical(&mut self, txid: Txid, tx: Arc, reason: CanonicalReason) { + let starting_txid = txid; + let mut is_starting_tx = true; + + // We keep track of changes made so far so that we can undo it later in case we detect that + // `tx` double spends itself. + let mut detected_self_double_spend = false; + let mut undo_not_canonical = Vec::::new(); + let mut staged_canonical = Vec::<(Txid, Arc, CanonicalReason)>::new(); + + // Process ancestors + TxAncestors::new_include_root( + self.tx_graph, + tx, + |_: usize, tx: Arc| -> Option { + let this_txid = tx.compute_txid(); + let this_reason = if is_starting_tx { + is_starting_tx = false; + reason.clone() + } else { + // This is an ancestor being marked transitively + // Check if it has its own anchor that needs to be verified later + // We'll check anchors after marking it canonical + reason.to_transitive(starting_txid) + }; + + use crate::collections::hash_map::Entry; + let canonical_entry = match self.canonical.entry(this_txid) { + // Already visited tx before, exit early. + Entry::Occupied(_) => return None, + Entry::Vacant(entry) => entry, + }; + + // Any conflicts with a canonical tx can be added to `not_canonical`. Descendants + // of `not_canonical` txs can also be added to `not_canonical`. + for (_, conflict_txid) in self.tx_graph.direct_conflicts(&tx) { + TxDescendants::new_include_root( + self.tx_graph, + conflict_txid, + |_: usize, txid: Txid| -> Option<()> { + if self.not_canonical.insert(txid) { + undo_not_canonical.push(txid); + Some(()) + } else { + None + } + }, + ) + .run_until_finished() + } + + if self.not_canonical.contains(&this_txid) { + // Early exit if self-double-spend is detected. + detected_self_double_spend = true; + return None; + } + + staged_canonical.push((this_txid, tx.clone(), this_reason.clone())); + canonical_entry.insert((tx.clone(), this_reason)); + Some(this_txid) + }, + ) + .run_until_finished(); + + if detected_self_double_spend { + // Undo changes + for (txid, _, _) in staged_canonical { + self.canonical.remove(&txid); + } + for txid in undo_not_canonical { + self.not_canonical.remove(&txid); + } + return; + } + + // Add to canonical order + for (txid, tx, reason) in &staged_canonical { + self.canonical_order.push(*txid); + + // ObservedIn transactions don't need anchor verification + if matches!(reason, CanonicalReason::ObservedIn { .. }) { + continue; + } + + // Check if this transaction was marked transitively and needs its own anchors verified + if reason.is_transitive() { + if let Some(anchors) = self.tx_graph.all_anchors().get(txid) { + // only check anchors we haven't already confirmed + if !self.direct_anchors.contains_key(txid) { + self.unprocessed_transitively_anchored_txs.push_back(( + *txid, + tx.clone(), + anchors, + )); + } + } + } + } + } +} + +/// Represents when and where a transaction was last observed in. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +pub enum ObservedIn { + /// The transaction was last observed in a block of height. + Block(u32), + /// The transaction was last observed in the mempool at the given unix timestamp. + Mempool(u64), +} + +/// The reason why a transaction is canonical. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum CanonicalReason { + /// This transaction is explicitly assumed to be canonical by the caller, superceding all other + /// canonicalization rules. + Assumed { + /// Whether it is a descendant that is assumed to be canonical. + descendant: Option, + }, + /// This transaction is anchored in the best chain by `A`, and therefore canonical. + Anchor { + /// The anchor that anchored the transaction in the chain. + anchor: A, + /// Whether the anchor is of the transaction's descendant. + descendant: Option, + }, + /// This transaction does not conflict with any other transaction with a more recent + /// [`ObservedIn`] value or one that is anchored in the best chain. + ObservedIn { + /// The [`ObservedIn`] value of the transaction. + observed_in: ObservedIn, + /// Whether the [`ObservedIn`] value is of the transaction's descendant. + descendant: Option, + }, +} + +impl CanonicalReason { + /// Constructs a [`CanonicalReason`] for a transaction that is assumed to supercede all other + /// transactions. + pub fn assumed() -> Self { + Self::Assumed { descendant: None } + } + + /// Constructs a [`CanonicalReason`] from an `anchor`. + pub fn from_anchor(anchor: A) -> Self { + Self::Anchor { + anchor, + descendant: None, + } + } + + /// Constructs a [`CanonicalReason`] from an `observed_in` value. + pub fn from_observed_in(observed_in: ObservedIn) -> Self { + Self::ObservedIn { + observed_in, + descendant: None, + } + } + + /// Contruct a new [`CanonicalReason`] from the original which is transitive to `descendant`. + /// + /// This signals that either the [`ObservedIn`] or [`Anchor`] value belongs to the transaction's + /// descendant, but is transitively relevant. + pub fn to_transitive(&self, descendant: Txid) -> Self { + match self { + CanonicalReason::Assumed { .. } => Self::Assumed { + descendant: Some(descendant), + }, + CanonicalReason::Anchor { anchor, .. } => Self::Anchor { + anchor: anchor.clone(), + descendant: Some(descendant), + }, + CanonicalReason::ObservedIn { observed_in, .. } => Self::ObservedIn { + observed_in: *observed_in, + descendant: Some(descendant), + }, + } + } + + /// This signals that either the [`ObservedIn`] or [`Anchor`] value belongs to the transaction's + /// descendant. + pub fn descendant(&self) -> &Option { + match self { + CanonicalReason::Assumed { descendant, .. } => descendant, + CanonicalReason::Anchor { descendant, .. } => descendant, + CanonicalReason::ObservedIn { descendant, .. } => descendant, + } + } + + /// Returns true if this reason represents a transitive canonicalization + /// (i.e., the transaction is canonical because of its descendant). + pub fn is_transitive(&self) -> bool { + self.descendant().is_some() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::local_chain::LocalChain; + use bitcoin::{hashes::Hash, BlockHash, TxIn, TxOut}; + + #[test] + fn test_canonicalization_task_sans_io() { + // Create a simple chain + let blocks = [ + (0, BlockHash::all_zeros()), + (1, BlockHash::from_byte_array([1; 32])), + (2, BlockHash::from_byte_array([2; 32])), + ]; + let chain = LocalChain::from_blocks(blocks.into_iter().collect()).unwrap(); + let chain_tip = chain.tip().block_id(); + + // Create a simple transaction graph + let mut tx_graph = TxGraph::default(); + + // Add a transaction + let tx = bitcoin::Transaction { + version: bitcoin::transaction::Version::ONE, + lock_time: bitcoin::absolute::LockTime::ZERO, + input: vec![TxIn::default()], + output: vec![TxOut { + value: bitcoin::Amount::from_sat(1000), + script_pubkey: bitcoin::ScriptBuf::new(), + }], + }; + let _ = tx_graph.insert_tx(tx.clone()); + let txid = tx.compute_txid(); + + // Add an anchor at height 1 + let anchor = crate::ConfirmationBlockTime { + block_id: chain.get(1).unwrap().block_id(), + confirmation_time: 12345, + }; + let _ = tx_graph.insert_anchor(txid, anchor); + + // Create canonicalization task and canonicalize using the chain + let params = CanonicalizationParams::default(); + let task = CanonicalizationTask::new(&tx_graph, chain_tip, params); + let canonical_view = chain.canonicalize(task); + + // Should have one canonical transaction + assert_eq!(canonical_view.txs().len(), 1); + let canon_tx = canonical_view.txs().next().unwrap(); + assert_eq!(canon_tx.txid, txid); + assert_eq!(canon_tx.tx.compute_txid(), txid); + + // Should be confirmed (anchored) + assert!(matches!(canon_tx.pos, ChainPosition::Confirmed { .. })); + } +} diff --git a/crates/chain/src/canonical_view.rs b/crates/chain/src/canonical_view.rs index 09b18e50a..7840f236f 100644 --- a/crates/chain/src/canonical_view.rs +++ b/crates/chain/src/canonical_view.rs @@ -6,14 +6,15 @@ //! ## Example //! //! ``` -//! # use bdk_chain::{CanonicalView, TxGraph, CanonicalizationParams, local_chain::LocalChain}; +//! # use bdk_chain::{TxGraph, CanonicalizationParams, CanonicalizationTask, local_chain::LocalChain}; //! # use bdk_core::BlockId; //! # use bitcoin::hashes::Hash; //! # let tx_graph = TxGraph::::default(); //! # let chain = LocalChain::from_blocks([(0, bitcoin::BlockHash::all_zeros())].into_iter().collect()).unwrap(); -//! # let chain_tip = chain.tip().block_id(); +//! let chain_tip = chain.tip().block_id(); //! let params = CanonicalizationParams::default(); -//! let view = CanonicalView::new(&tx_graph, &chain, chain_tip, params).unwrap(); +//! let task = CanonicalizationTask::new(&tx_graph, chain_tip, params); +//! let view = chain.canonicalize(task); //! //! // Iterate over canonical transactions //! for tx in view.txs() { @@ -30,10 +31,7 @@ use alloc::vec::Vec; use bdk_core::BlockId; use bitcoin::{Amount, OutPoint, ScriptBuf, Transaction, Txid}; -use crate::{ - spk_txout::SpkTxOutIndex, tx_graph::TxNode, Anchor, Balance, CanonicalIter, CanonicalReason, - CanonicalizationParams, ChainOracle, ChainPosition, FullTxOut, ObservedIn, TxGraph, -}; +use crate::{spk_txout::SpkTxOutIndex, Anchor, Balance, ChainPosition, FullTxOut}; /// A single canonical transaction with its chain position. /// @@ -76,115 +74,24 @@ pub struct CanonicalView { } impl CanonicalView { - /// Create a new canonical view from a transaction graph. + /// Creates a [`CanonicalView`] from its constituent parts. /// - /// This constructor analyzes the given [`TxGraph`] and creates a canonical view of all - /// transactions, resolving conflicts and ordering them according to their chain position. - /// - /// # Returns - /// - /// Returns `Ok(CanonicalView)` on success, or an error if the chain oracle fails. - pub fn new<'g, C>( - tx_graph: &'g TxGraph, - chain: &'g C, - chain_tip: BlockId, - params: CanonicalizationParams, - ) -> Result - where - C: ChainOracle, - { - fn find_direct_anchor<'g, A: Anchor, C: ChainOracle>( - tx_node: &TxNode<'g, Arc, A>, - chain: &C, - chain_tip: BlockId, - ) -> Result, C::Error> { - tx_node - .anchors - .iter() - .find_map(|a| -> Option> { - match chain.is_block_in_chain(a.anchor_block(), chain_tip) { - Ok(Some(true)) => Some(Ok(a.clone())), - Ok(Some(false)) | Ok(None) => None, - Err(err) => Some(Err(err)), - } - }) - .transpose() + /// This internal constructor is used by [`CanonicalizationTask`] to build the view + /// after completing the canonicalization process. It takes the processed transaction + /// data including the canonical ordering, transaction map with chain positions, and + /// spend information. + pub(crate) fn new( + tip: BlockId, + order: Vec, + txs: HashMap, ChainPosition)>, + spends: HashMap, + ) -> Self { + Self { + tip, + order, + txs, + spends, } - - let mut view = Self { - tip: chain_tip, - order: vec![], - txs: HashMap::new(), - spends: HashMap::new(), - }; - - for r in CanonicalIter::new(tx_graph, chain, chain_tip, params) { - let (txid, tx, why) = r?; - - let tx_node = match tx_graph.get_tx_node(txid) { - Some(tx_node) => tx_node, - None => { - // TODO: Have the `CanonicalIter` return `TxNode`s. - debug_assert!(false, "tx node must exist!"); - continue; - } - }; - - view.order.push(txid); - - if !tx.is_coinbase() { - view.spends - .extend(tx.input.iter().map(|txin| (txin.previous_output, txid))); - } - - let pos = match why { - CanonicalReason::Assumed { descendant } => match descendant { - Some(_) => match find_direct_anchor(&tx_node, chain, chain_tip)? { - Some(anchor) => ChainPosition::Confirmed { - anchor, - transitively: None, - }, - None => ChainPosition::Unconfirmed { - first_seen: tx_node.first_seen, - last_seen: tx_node.last_seen, - }, - }, - None => ChainPosition::Unconfirmed { - first_seen: tx_node.first_seen, - last_seen: tx_node.last_seen, - }, - }, - CanonicalReason::Anchor { anchor, descendant } => match descendant { - Some(_) => match find_direct_anchor(&tx_node, chain, chain_tip)? { - Some(anchor) => ChainPosition::Confirmed { - anchor, - transitively: None, - }, - None => ChainPosition::Confirmed { - anchor, - transitively: descendant, - }, - }, - None => ChainPosition::Confirmed { - anchor, - transitively: None, - }, - }, - CanonicalReason::ObservedIn { observed_in, .. } => match observed_in { - ObservedIn::Mempool(last_seen) => ChainPosition::Unconfirmed { - first_seen: tx_node.first_seen, - last_seen: Some(last_seen), - }, - ObservedIn::Block(_) => ChainPosition::Unconfirmed { - first_seen: tx_node.first_seen, - last_seen: None, - }, - }, - }; - view.txs.insert(txid, (tx_node.tx, pos)); - } - - Ok(view) } /// Get a single canonical transaction by its transaction ID. @@ -232,12 +139,14 @@ impl CanonicalView { /// # Example /// /// ``` - /// # use bdk_chain::{CanonicalView, TxGraph, local_chain::LocalChain}; + /// # use bdk_chain::{TxGraph, CanonicalizationTask, local_chain::LocalChain}; /// # use bdk_core::BlockId; /// # use bitcoin::hashes::Hash; /// # let tx_graph = TxGraph::::default(); /// # let chain = LocalChain::from_blocks([(0, bitcoin::BlockHash::all_zeros())].into_iter().collect()).unwrap(); - /// # let view = CanonicalView::new(&tx_graph, &chain, chain.tip().block_id(), Default::default()).unwrap(); + /// # let chain_tip = chain.tip().block_id(); + /// # let task = CanonicalizationTask::new(&tx_graph, chain_tip, Default::default()); + /// # let view = chain.canonicalize(task); /// // Iterate over all canonical transactions /// for tx in view.txs() { /// println!("TX {}: {:?}", tx.txid, tx.pos); @@ -265,12 +174,14 @@ impl CanonicalView { /// # Example /// /// ``` - /// # use bdk_chain::{CanonicalView, TxGraph, local_chain::LocalChain, keychain_txout::KeychainTxOutIndex}; + /// # use bdk_chain::{TxGraph, CanonicalizationTask, local_chain::LocalChain, keychain_txout::KeychainTxOutIndex}; /// # use bdk_core::BlockId; /// # use bitcoin::hashes::Hash; /// # let tx_graph = TxGraph::::default(); /// # let chain = LocalChain::from_blocks([(0, bitcoin::BlockHash::all_zeros())].into_iter().collect()).unwrap(); - /// # let view = CanonicalView::new(&tx_graph, &chain, chain.tip().block_id(), Default::default()).unwrap(); + /// # let chain_tip = chain.tip().block_id(); + /// # let task = CanonicalizationTask::new(&tx_graph, chain_tip, Default::default()); + /// # let view = chain.canonicalize(task); /// # let indexer = KeychainTxOutIndex::<&str>::default(); /// // Get all outputs from an indexer /// for (keychain, txout) in view.filter_outpoints(indexer.outpoints().clone()) { @@ -294,12 +205,14 @@ impl CanonicalView { /// # Example /// /// ``` - /// # use bdk_chain::{CanonicalView, TxGraph, local_chain::LocalChain, keychain_txout::KeychainTxOutIndex}; + /// # use bdk_chain::{TxGraph, CanonicalizationTask, local_chain::LocalChain, keychain_txout::KeychainTxOutIndex}; /// # use bdk_core::BlockId; /// # use bitcoin::hashes::Hash; /// # let tx_graph = TxGraph::::default(); /// # let chain = LocalChain::from_blocks([(0, bitcoin::BlockHash::all_zeros())].into_iter().collect()).unwrap(); - /// # let view = CanonicalView::new(&tx_graph, &chain, chain.tip().block_id(), Default::default()).unwrap(); + /// # let chain_tip = chain.tip().block_id(); + /// # let task = CanonicalizationTask::new(&tx_graph, chain_tip, Default::default()); + /// # let view = chain.canonicalize(task); /// # let indexer = KeychainTxOutIndex::<&str>::default(); /// // Get unspent outputs (UTXOs) from an indexer /// for (keychain, utxo) in view.filter_unspent_outpoints(indexer.outpoints().clone()) { @@ -340,12 +253,14 @@ impl CanonicalView { /// # Example /// /// ``` - /// # use bdk_chain::{CanonicalView, TxGraph, local_chain::LocalChain, keychain_txout::KeychainTxOutIndex}; + /// # use bdk_chain::{CanonicalizationTask, TxGraph, local_chain::LocalChain, keychain_txout::KeychainTxOutIndex}; /// # use bdk_core::BlockId; /// # use bitcoin::hashes::Hash; /// # let tx_graph = TxGraph::::default(); /// # let chain = LocalChain::from_blocks([(0, bitcoin::BlockHash::all_zeros())].into_iter().collect()).unwrap(); - /// # let view = CanonicalView::new(&tx_graph, &chain, chain.tip().block_id(), Default::default()).unwrap(); + /// # let chain_tip = chain.tip().block_id(); + /// # let task = CanonicalizationTask::new(&tx_graph, chain_tip, Default::default()); + /// # let view = chain.canonicalize(task); /// # let indexer = KeychainTxOutIndex::<&str>::default(); /// // Calculate balance with 6 confirmations, trusting all outputs /// let balance = view.balance( diff --git a/crates/chain/src/indexed_tx_graph.rs b/crates/chain/src/indexed_tx_graph.rs index 9adf7ed93..cd4483eb7 100644 --- a/crates/chain/src/indexed_tx_graph.rs +++ b/crates/chain/src/indexed_tx_graph.rs @@ -1,14 +1,14 @@ //! Contains the [`IndexedTxGraph`] and associated types. Refer to the //! [`IndexedTxGraph`] documentation for more. -use core::{convert::Infallible, fmt::Debug}; +use core::fmt::Debug; use alloc::{sync::Arc, vec::Vec}; use bitcoin::{Block, OutPoint, Transaction, TxOut, Txid}; use crate::{ + canonical_task::CanonicalizationParams, tx_graph::{self, TxGraph}, - Anchor, BlockId, CanonicalView, CanonicalizationParams, ChainOracle, Indexer, Merge, - TxPosInBlock, + Anchor, BlockId, CanonicalizationTask, Indexer, Merge, TxPosInBlock, }; /// A [`TxGraph`] paired with an indexer `I`, enforcing that every insertion into the graph is @@ -423,36 +423,28 @@ where } } +impl AsRef> for IndexedTxGraph { + fn as_ref(&self) -> &TxGraph { + &self.graph + } +} + impl IndexedTxGraph where A: Anchor, { - /// Returns a [`CanonicalView`]. - pub fn try_canonical_view<'a, C: ChainOracle>( - &'a self, - chain: &'a C, - chain_tip: BlockId, - params: CanonicalizationParams, - ) -> Result, C::Error> { - self.graph.try_canonical_view(chain, chain_tip, params) - } - - /// Returns a [`CanonicalView`]. + /// Creates a `[CanonicalizationTask]` to determine the `[CanonicalView]` of transactions. /// - /// This is the infallible version of [`try_canonical_view`](Self::try_canonical_view). - pub fn canonical_view<'a, C: ChainOracle>( - &'a self, - chain: &'a C, + /// This method delegates to the underlying [`TxGraph`] to create a [`CanonicalizationTask`] + /// that can be used to determine which transactions are canonical based on the provided + /// parameters. The task handles the stateless canonicalization logic and can be polled + /// for anchor verification requests. + pub fn canonicalization_task( + &'_ self, chain_tip: BlockId, params: CanonicalizationParams, - ) -> CanonicalView { - self.graph.canonical_view(chain, chain_tip, params) - } -} - -impl AsRef> for IndexedTxGraph { - fn as_ref(&self) -> &TxGraph { - &self.graph + ) -> CanonicalizationTask<'_, A> { + self.graph.canonicalization_task(chain_tip, params) } } diff --git a/crates/chain/src/lib.rs b/crates/chain/src/lib.rs index be9170b1a..2e0a83c27 100644 --- a/crates/chain/src/lib.rs +++ b/crates/chain/src/lib.rs @@ -44,8 +44,8 @@ pub mod tx_graph; pub use tx_graph::TxGraph; mod chain_oracle; pub use chain_oracle::*; -mod canonical_iter; -pub use canonical_iter::*; +mod canonical_task; +pub use canonical_task::*; mod canonical_view; pub use canonical_view::*; diff --git a/crates/chain/src/local_chain.rs b/crates/chain/src/local_chain.rs index 81f4a1796..9ed7dab03 100644 --- a/crates/chain/src/local_chain.rs +++ b/crates/chain/src/local_chain.rs @@ -6,7 +6,7 @@ use core::ops::RangeBounds; use crate::collections::BTreeMap; use crate::{BlockId, ChainOracle, Merge}; -use bdk_core::ToBlockHash; +use bdk_core::{ChainQuery, ToBlockHash}; pub use bdk_core::{CheckPoint, CheckPointIter}; use bitcoin::block::Header; use bitcoin::BlockHash; @@ -96,6 +96,80 @@ impl ChainOracle for LocalChain { // Methods for `LocalChain` impl LocalChain { + // /// Check if a block is in the chain. + // /// + // /// # Arguments + // /// * `block` - The block to check + // /// * `chain_tip` - The chain tip to check against + // /// + // /// # Returns + // /// * `Some(true)` if the block is in the chain + // /// * `Some(false)` if the block is not in the chain + // /// * `None` if it cannot be determined + // pub fn is_block_in_chain(&self, block: BlockId, chain_tip: BlockId) -> Option { + // let chain_tip_cp = match self.tip.get(chain_tip.height) { + // // we can only determine whether `block` is in chain of `chain_tip` if `chain_tip` + // can // be identified in chain + // Some(cp) if cp.hash() == chain_tip.hash => cp, + // _ => return None, + // }; + // chain_tip_cp + // .get(block.height) + // .map(|cp| cp.hash() == block.hash) + // } + + // /// Get the chain tip. + // /// + // /// # Returns + // /// The [`BlockId`] of the chain tip. + // pub fn chain_tip(&self) -> BlockId { + // self.tip.block_id() + // } + + /// Canonicalize a transaction graph using this chain. + /// + /// This method processes any type implementing [`ChainQuery`], handling all its requests + /// to determine which transactions are canonical, and returns the query's output. + /// + /// # Example + /// + /// ``` + /// # use bdk_chain::{CanonicalizationTask, CanonicalizationParams, TxGraph, local_chain::LocalChain}; + /// # use bdk_core::BlockId; + /// # use bitcoin::hashes::Hash; + /// # let tx_graph: TxGraph = TxGraph::default(); + /// # let chain = LocalChain::from_blocks([(0, bitcoin::BlockHash::all_zeros())].into_iter().collect()).unwrap(); + /// let chain_tip = chain.tip().block_id(); + /// let task = CanonicalizationTask::new(&tx_graph, chain_tip, CanonicalizationParams::default()); + /// let view = chain.canonicalize(task); + /// ``` + pub fn canonicalize(&self, mut task: Q) -> Q::Output + where + Q: ChainQuery, + { + // Process all requests from the task + while let Some(request) = task.next_query() { + let chain_tip = request.chain_tip; + + // Check each block ID and return the first confirmed one + let mut best_block_id = None; + for block_id in &request.block_ids { + if self + .is_block_in_chain(*block_id, chain_tip) + .expect("infallible") + == Some(true) + { + best_block_id = Some(*block_id); + break; + } + } + task.resolve_query(best_block_id); + } + + // Return the finished canonical view + task.finish() + } + /// Update the chain with a given [`Header`] at `height` which you claim is connected to a /// existing block in the chain. /// diff --git a/crates/chain/src/tx_graph.rs b/crates/chain/src/tx_graph.rs index 97d4ecc02..5731f021a 100644 --- a/crates/chain/src/tx_graph.rs +++ b/crates/chain/src/tx_graph.rs @@ -21,18 +21,26 @@ //! Conflicting transactions are allowed to coexist within a [`TxGraph`]. A process called //! canonicalization is required to get a conflict-free view of transactions. //! -//! * [`canonical_iter`](TxGraph::canonical_iter) returns a [`CanonicalIter`] which performs -//! incremental canonicalization. This is useful when you only need to check specific transactions -//! (e.g., verifying whether a few unconfirmed transactions are canonical) without computing the -//! entire canonical view. -//! * [`canonical_view`](TxGraph::canonical_view) returns a [`CanonicalView`] which provides a -//! complete canonical view of the graph. This is required for typical wallet operations like -//! querying balances, listing outputs, transactions, and UTXOs. You must construct this first -//! before performing these operations. +//! The canonicalization process uses a two-step, sans-IO approach: //! -//! All these methods require a `chain` and `chain_tip` argument. The `chain` must be a -//! [`ChainOracle`] implementation (such as [`LocalChain`](crate::local_chain::LocalChain)) which -//! identifies which blocks exist under a given `chain_tip`. +//! 1. **Create a canonicalization task** using +//! [`canonicalization_task`](TxGraph::canonicalization_task): ```ignore let task = +//! tx_graph.canonicalization_task(params); ``` This creates a [`CanonicalizationTask`] that +//! encapsulates the canonicalization logic without performing any I/O operations. +//! +//! 2. **Execute the task** with a chain oracle to obtain a [`CanonicalView`]: ```ignore let view = +//! chain.canonicalize(task); ``` The chain oracle (such as +//! [`LocalChain`](crate::local_chain::LocalChain)) handles all anchor verification queries from +//! the task. +//! +//! The [`CanonicalView`] provides a complete canonical view of the graph. This is required for +//! typical wallet operations like querying balances, listing outputs, transactions, and UTXOs. +//! You must construct this view before performing these operations. +//! +//! The separation between task creation and execution (sans-IO pattern) enables: +//! * Better testability - tasks can be tested without a real chain +//! * Flexibility - different chain oracle implementations can be used +//! * Clean separation of concerns - canonicalization logic is isolated from I/O //! //! The canonicalization algorithm uses the following associated data to determine which //! transactions have precedence over others: @@ -119,11 +127,9 @@ //! [`insert_txout`]: TxGraph::insert_txout use crate::collections::*; -use crate::BlockId; -use crate::CanonicalIter; -use crate::CanonicalView; use crate::CanonicalizationParams; -use crate::{Anchor, ChainOracle, Merge}; +use crate::CanonicalizationTask; +use crate::{Anchor, BlockId, Merge}; use alloc::collections::vec_deque::VecDeque; use alloc::sync::Arc; use alloc::vec::Vec; @@ -131,10 +137,7 @@ use bdk_core::ConfirmationBlockTime; pub use bdk_core::TxUpdate; use bitcoin::{Amount, OutPoint, SignedAmount, Transaction, TxOut, Txid}; use core::fmt::{self, Formatter}; -use core::{ - convert::Infallible, - ops::{Deref, RangeInclusive}, -}; +use core::ops::{Deref, RangeInclusive}; impl From> for TxUpdate { fn from(graph: TxGraph) -> Self { @@ -952,6 +955,20 @@ impl TxGraph { let _ = self.insert_evicted_at(txid, evicted_at); } } + + /// Creates a `[CanonicalizationTask]` to determine the `[CanonicalView]` of transactions. + /// + /// This method delegates to the underlying [`TxGraph`] to create a [`CanonicalizationTask`] + /// that can be used to determine which transactions are canonical based on the provided + /// parameters. The task handles the stateless canonicalization logic and can be polled + /// for anchor verification requests. + pub fn canonicalization_task( + &'_ self, + chain_tip: BlockId, + params: CanonicalizationParams, + ) -> CanonicalizationTask<'_, A> { + CanonicalizationTask::new(self, chain_tip, params) + } } impl TxGraph { @@ -980,36 +997,6 @@ impl TxGraph { }) } - /// Returns a [`CanonicalIter`]. - pub fn canonical_iter<'a, C: ChainOracle>( - &'a self, - chain: &'a C, - chain_tip: BlockId, - params: CanonicalizationParams, - ) -> CanonicalIter<'a, A, C> { - CanonicalIter::new(self, chain, chain_tip, params) - } - - /// Returns a [`CanonicalView`]. - pub fn try_canonical_view<'a, C: ChainOracle>( - &'a self, - chain: &'a C, - chain_tip: BlockId, - params: CanonicalizationParams, - ) -> Result, C::Error> { - CanonicalView::new(self, chain, chain_tip, params) - } - - /// Returns a [`CanonicalView`]. - pub fn canonical_view<'a, C: ChainOracle>( - &'a self, - chain: &'a C, - chain_tip: BlockId, - params: CanonicalizationParams, - ) -> CanonicalView { - CanonicalView::new(self, chain, chain_tip, params).expect("infallible") - } - /// Construct a `TxGraph` from a `changeset`. pub fn from_changeset(changeset: ChangeSet) -> Self { let mut graph = Self::default(); diff --git a/crates/chain/tests/test_canonical_view.rs b/crates/chain/tests/test_canonical_view.rs index 3c0d54381..47bab2758 100644 --- a/crates/chain/tests/test_canonical_view.rs +++ b/crates/chain/tests/test_canonical_view.rs @@ -54,8 +54,8 @@ fn test_min_confirmations_parameter() { let _ = tx_graph.insert_anchor(txid, anchor_height_5); let chain_tip = chain.tip().block_id(); - let canonical_view = - tx_graph.canonical_view(&chain, chain_tip, CanonicalizationParams::default()); + let task = tx_graph.canonicalization_task(chain_tip, CanonicalizationParams::default()); + let canonical_view = chain.canonicalize(task); // Test min_confirmations = 1: Should be confirmed (has 6 confirmations) let balance_1_conf = canonical_view.balance( @@ -142,11 +142,9 @@ fn test_min_confirmations_with_untrusted_tx() { }; let _ = tx_graph.insert_anchor(txid, anchor); - let canonical_view = tx_graph.canonical_view( - &chain, - chain.tip().block_id(), - CanonicalizationParams::default(), - ); + let chain_tip = chain.tip().block_id(); + let task = tx_graph.canonicalization_task(chain_tip, CanonicalizationParams::default()); + let canonical_view = chain.canonicalize(task); // Test with min_confirmations = 5 and untrusted predicate let balance = canonical_view.balance( @@ -263,11 +261,9 @@ fn test_min_confirmations_multiple_transactions() { ); outpoints.push(((), outpoint2)); - let canonical_view = tx_graph.canonical_view( - &chain, - chain.tip().block_id(), - CanonicalizationParams::default(), - ); + let chain_tip = chain.tip().block_id(); + let task = tx_graph.canonicalization_task(chain_tip, CanonicalizationParams::default()); + let canonical_view = chain.canonicalize(task); // Test with min_confirmations = 5 // tx0: 11 confirmations -> confirmed diff --git a/crates/chain/tests/test_indexed_tx_graph.rs b/crates/chain/tests/test_indexed_tx_graph.rs index 7a2f8ea60..dc3fc5290 100644 --- a/crates/chain/tests/test_indexed_tx_graph.rs +++ b/crates/chain/tests/test_indexed_tx_graph.rs @@ -459,23 +459,30 @@ fn test_list_owned_txouts() { .get(height) .map(|cp| cp.block_id()) .unwrap_or_else(|| panic!("block must exist at {height}")); - let txouts = graph - .canonical_view(&local_chain, chain_tip, CanonicalizationParams::default()) + let task = graph + .graph() + .canonicalization_task(chain_tip, CanonicalizationParams::default()); + let txouts = local_chain + .canonicalize(task) .filter_outpoints(graph.index.outpoints().iter().cloned()) .collect::>(); - let utxos = graph - .canonical_view(&local_chain, chain_tip, CanonicalizationParams::default()) + let task = graph + .graph() + .canonicalization_task(chain_tip, CanonicalizationParams::default()); + let utxos = local_chain + .canonicalize(task) .filter_unspent_outpoints(graph.index.outpoints().iter().cloned()) .collect::>(); - let balance = graph - .canonical_view(&local_chain, chain_tip, CanonicalizationParams::default()) - .balance( - graph.index.outpoints().iter().cloned(), - |_, txout| trusted_spks.contains(&txout.txout.script_pubkey), - 1, - ); + let task = graph + .graph() + .canonicalization_task(chain_tip, CanonicalizationParams::default()); + let balance = local_chain.canonicalize(task).balance( + graph.index.outpoints().iter().cloned(), + |_, txout| trusted_spks.contains(&txout.txout.script_pubkey), + 0, + ); let confirmed_txouts_txid = txouts .iter() @@ -778,20 +785,17 @@ fn test_get_chain_position() { } // check chain position - let chain_pos = graph - .canonical_view( - chain, - chain.tip().block_id(), - CanonicalizationParams::default(), - ) - .txs() - .find_map(|canon_tx| { - if canon_tx.txid == txid { - Some(canon_tx.pos) - } else { - None - } - }); + let chain_tip = chain.tip().block_id(); + let task = graph + .graph() + .canonicalization_task(chain_tip, CanonicalizationParams::default()); + let chain_pos = chain.canonicalize(task).txs().find_map(|canon_tx| { + if canon_tx.txid == txid { + Some(canon_tx.pos) + } else { + None + } + }); assert_eq!(chain_pos, exp_pos, "failed test case: {name}"); } diff --git a/crates/chain/tests/test_tx_graph.rs b/crates/chain/tests/test_tx_graph.rs index b2a359608..2b7ebf847 100644 --- a/crates/chain/tests/test_tx_graph.rs +++ b/crates/chain/tests/test_tx_graph.rs @@ -1014,8 +1014,10 @@ fn test_chain_spends() { let build_canonical_spends = |chain: &LocalChain, tx_graph: &TxGraph| -> HashMap { - tx_graph - .canonical_view(chain, tip.block_id(), CanonicalizationParams::default()) + let task = + tx_graph.canonicalization_task(tip.block_id(), CanonicalizationParams::default()); + chain + .canonicalize(task) .filter_outpoints(tx_graph.all_txouts().map(|(op, _)| ((), op))) .filter_map(|(_, full_txo)| Some((full_txo.outpoint, full_txo.spent_by?))) .collect() @@ -1023,8 +1025,10 @@ fn test_chain_spends() { let build_canonical_positions = |chain: &LocalChain, tx_graph: &TxGraph| -> HashMap> { - tx_graph - .canonical_view(chain, tip.block_id(), CanonicalizationParams::default()) + let task = + tx_graph.canonicalization_task(tip.block_id(), CanonicalizationParams::default()); + chain + .canonicalize(task) .txs() .map(|canon_tx| (canon_tx.txid, canon_tx.pos)) .collect() @@ -1197,38 +1201,25 @@ fn transactions_inserted_into_tx_graph_are_not_canonical_until_they_have_an_anch .into_iter() .collect(); let chain = LocalChain::from_blocks(blocks).unwrap(); - let canonical_txs: Vec<_> = graph - .canonical_view( - &chain, - chain.tip().block_id(), - CanonicalizationParams::default(), - ) - .txs() - .collect(); + let chain_tip = chain.tip().block_id(); + let task = graph.canonicalization_task(chain_tip, CanonicalizationParams::default()); + let canonical_txs: Vec<_> = chain.canonicalize(task).txs().collect(); assert!(canonical_txs.is_empty()); // tx0 with seen_at should be returned by canonical txs let _ = graph.insert_seen_at(txids[0], 2); - let canonical_view = graph.canonical_view( - &chain, - chain.tip().block_id(), - CanonicalizationParams::default(), - ); + let chain_tip = chain.tip().block_id(); + let task = graph.canonicalization_task(chain_tip, CanonicalizationParams::default()); + let canonical_view = chain.canonicalize(task); let mut canonical_txs = canonical_view.txs(); assert_eq!(canonical_txs.next().map(|tx| tx.txid).unwrap(), txids[0]); drop(canonical_txs); // tx1 with anchor is also canonical let _ = graph.insert_anchor(txids[1], block_id!(2, "B")); - let canonical_txids: Vec<_> = graph - .canonical_view( - &chain, - chain.tip().block_id(), - CanonicalizationParams::default(), - ) - .txs() - .map(|tx| tx.txid) - .collect(); + let chain_tip = chain.tip().block_id(); + let task = graph.canonicalization_task(chain_tip, CanonicalizationParams::default()); + let canonical_txids: Vec<_> = chain.canonicalize(task).txs().map(|tx| tx.txid).collect(); assert!(canonical_txids.contains(&txids[1])); assert!(graph.txs_with_no_anchor_or_last_seen().next().is_none()); } diff --git a/crates/chain/tests/test_tx_graph_conflicts.rs b/crates/chain/tests/test_tx_graph_conflicts.rs index 38f21365c..de6325914 100644 --- a/crates/chain/tests/test_tx_graph_conflicts.rs +++ b/crates/chain/tests/test_tx_graph_conflicts.rs @@ -970,9 +970,11 @@ fn test_tx_conflict_handling() { for scenario in scenarios { let env = init_graph(scenario.tx_templates.iter()); - let txs = env + let task = env .tx_graph - .canonical_view(&local_chain, chain_tip, env.canonicalization_params.clone()) + .canonicalization_task(chain_tip, env.canonicalization_params.clone()); + let txs = local_chain + .canonicalize(task) .txs() .map(|tx| tx.txid) .collect::>(); @@ -987,9 +989,11 @@ fn test_tx_conflict_handling() { scenario.name ); - let txouts = env + let task = env .tx_graph - .canonical_view(&local_chain, chain_tip, env.canonicalization_params.clone()) + .canonicalization_task(chain_tip, env.canonicalization_params.clone()); + let txouts = local_chain + .canonicalize(task) .filter_outpoints(env.indexer.outpoints().iter().cloned()) .map(|(_, full_txout)| full_txout.outpoint) .collect::>(); @@ -1007,9 +1011,11 @@ fn test_tx_conflict_handling() { scenario.name ); - let utxos = env + let task = env .tx_graph - .canonical_view(&local_chain, chain_tip, env.canonicalization_params.clone()) + .canonicalization_task(chain_tip, env.canonicalization_params.clone()); + let utxos = local_chain + .canonicalize(task) .filter_unspent_outpoints(env.indexer.outpoints().iter().cloned()) .map(|(_, full_txout)| full_txout.outpoint) .collect::>(); @@ -1027,18 +1033,18 @@ fn test_tx_conflict_handling() { scenario.name ); - let balance = env + let task = env .tx_graph - .canonical_view(&local_chain, chain_tip, env.canonicalization_params.clone()) - .balance( - env.indexer.outpoints().iter().cloned(), - |_, txout| { - env.indexer - .index_of_spk(txout.txout.script_pubkey.as_script()) - .is_some() - }, - 0, - ); + .canonicalization_task(chain_tip, env.canonicalization_params.clone()); + let balance = local_chain.canonicalize(task).balance( + env.indexer.outpoints().iter().cloned(), + |_, txout| { + env.indexer + .index_of_spk(txout.txout.script_pubkey.as_script()) + .is_some() + }, + 0, + ); assert_eq!( balance, scenario.exp_balance, "\n[{}] 'balance' failed", diff --git a/crates/core/src/chain_query.rs b/crates/core/src/chain_query.rs new file mode 100644 index 000000000..9a99d5ea8 --- /dev/null +++ b/crates/core/src/chain_query.rs @@ -0,0 +1,68 @@ +//! Generic trait for query-based operations that require external blockchain data. +//! +//! The [`ChainQuery`] trait provides a standardized interface for implementing +//! algorithms that need to make queries to blockchain sources and process responses +//! in a sans-IO manner. + +use crate::BlockId; +use alloc::vec::Vec; + +/// A request to check which block identifiers are confirmed in the chain. +/// +/// This is used to verify if specific blocks are part of the canonical chain. +/// The generic parameter `B` represents the block identifier type, which defaults to `BlockId`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ChainRequest { + /// The chain tip to use as reference for the query. + pub chain_tip: B, + /// The block identifiers to check for confirmation in the chain. + pub block_ids: Vec, +} + +/// Response containing the best confirmed block identifier, if any. +/// +/// Returns `Some(B)` if at least one of the requested blocks +/// is confirmed in the chain, or `None` if none are confirmed. +/// The generic parameter `B` represents the block identifier type, which defaults to `BlockId`. +pub type ChainResponse = Option; + +/// A trait for types that perform query-based operations against blockchain data. +/// +/// This trait enables types to request blockchain information via queries and process +/// responses in a decoupled, sans-IO manner. It's particularly useful for algorithms +/// that need to interact with blockchain oracles, chain sources, or other blockchain +/// data providers without directly performing I/O. +/// +/// # Type Parameters +/// +/// * `B` - The type of block identifier used in queries (defaults to `BlockId`) +pub trait ChainQuery { + /// The final output type produced when the query process is complete. + type Output; + + /// Returns the next query needed, if any. + /// + /// This method should return `Some(request)` if more information is needed, + /// or `None` if no more queries are required. + fn next_query(&mut self) -> Option>; + + /// Resolves a query with the given response. + /// + /// This method processes the response to a previous query request and updates + /// the internal state accordingly. + fn resolve_query(&mut self, response: ChainResponse); + + /// Returns true if the query process is complete and ready to finish. + /// + /// The default implementation returns `true` when there are no more queries needed. + /// Implementors can override this for more specific behavior if needed. + fn is_finished(&mut self) -> bool { + self.next_query().is_none() + } + + /// Completes the query process and returns the final output. + /// + /// This method should be called when `is_finished` returns `true`. + /// It consumes `self` and produces the final output. + fn finish(self) -> Self::Output; +} diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 95bebe907..33e921687 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -72,3 +72,6 @@ mod merge; pub use merge::*; pub mod spk_client; + +mod chain_query; +pub use chain_query::*; diff --git a/crates/electrum/tests/test_electrum.rs b/crates/electrum/tests/test_electrum.rs index 07979866e..f1351a0ad 100644 --- a/crates/electrum/tests/test_electrum.rs +++ b/crates/electrum/tests/test_electrum.rs @@ -40,9 +40,12 @@ fn get_balance( ) -> anyhow::Result { let chain_tip = recv_chain.tip().block_id(); let outpoints = recv_graph.index.outpoints().clone(); - let balance = recv_graph - .canonical_view(recv_chain, chain_tip, CanonicalizationParams::default()) - .balance(outpoints, |_, _| true, 1); + let task = recv_graph + .graph() + .canonicalization_task(chain_tip, CanonicalizationParams::default()); + let balance = recv_chain + .canonicalize(task) + .balance(outpoints, |_, _| true, 0); Ok(balance) } @@ -147,9 +150,12 @@ pub fn detect_receive_tx_cancel() -> anyhow::Result<()> { .chain_tip(chain.tip()) .spks_with_indexes(graph.index.all_spks().clone()) .expected_spk_txids( - graph - .canonical_view(&chain, chain.tip().block_id(), Default::default()) - .list_expected_spk_txids(&graph.index, ..), + { + let chain_tip = chain.tip().block_id(); + let task = graph.canonicalization_task(chain_tip, Default::default()); + chain.canonicalize(task) + } + .list_expected_spk_txids(&graph.index, ..), ); let sync_response = client.sync(sync_request, BATCH_SIZE, true)?; assert!( @@ -176,9 +182,12 @@ pub fn detect_receive_tx_cancel() -> anyhow::Result<()> { .chain_tip(chain.tip()) .spks_with_indexes(graph.index.all_spks().clone()) .expected_spk_txids( - graph - .canonical_view(&chain, chain.tip().block_id(), Default::default()) - .list_expected_spk_txids(&graph.index, ..), + { + let chain_tip = chain.tip().block_id(); + let task = graph.canonicalization_task(chain_tip, Default::default()); + chain.canonicalize(task) + } + .list_expected_spk_txids(&graph.index, ..), ); let sync_response = client.sync(sync_request, BATCH_SIZE, true)?; assert!( diff --git a/crates/esplora/tests/async_ext.rs b/crates/esplora/tests/async_ext.rs index 3c628c20d..be747a656 100644 --- a/crates/esplora/tests/async_ext.rs +++ b/crates/esplora/tests/async_ext.rs @@ -88,9 +88,12 @@ pub async fn detect_receive_tx_cancel() -> anyhow::Result<()> { .chain_tip(chain.tip()) .spks_with_indexes(graph.index.all_spks().clone()) .expected_spk_txids( - graph - .canonical_view(&chain, chain.tip().block_id(), Default::default()) - .list_expected_spk_txids(&graph.index, ..), + { + let chain_tip = chain.tip().block_id(); + let task = graph.canonicalization_task(chain_tip, Default::default()); + chain.canonicalize(task) + } + .list_expected_spk_txids(&graph.index, ..), ); let sync_response = client.sync(sync_request, 1).await?; assert!( @@ -117,9 +120,12 @@ pub async fn detect_receive_tx_cancel() -> anyhow::Result<()> { .chain_tip(chain.tip()) .spks_with_indexes(graph.index.all_spks().clone()) .expected_spk_txids( - graph - .canonical_view(&chain, chain.tip().block_id(), Default::default()) - .list_expected_spk_txids(&graph.index, ..), + { + let chain_tip = chain.tip().block_id(); + let task = graph.canonicalization_task(chain_tip, Default::default()); + chain.canonicalize(task) + } + .list_expected_spk_txids(&graph.index, ..), ); let sync_response = client.sync(sync_request, 1).await?; assert!( diff --git a/crates/esplora/tests/blocking_ext.rs b/crates/esplora/tests/blocking_ext.rs index 4d5683e8b..0c33ad166 100644 --- a/crates/esplora/tests/blocking_ext.rs +++ b/crates/esplora/tests/blocking_ext.rs @@ -88,9 +88,12 @@ pub fn detect_receive_tx_cancel() -> anyhow::Result<()> { .chain_tip(chain.tip()) .spks_with_indexes(graph.index.all_spks().clone()) .expected_spk_txids( - graph - .canonical_view(&chain, chain.tip().block_id(), Default::default()) - .list_expected_spk_txids(&graph.index, ..), + { + let chain_tip = chain.tip().block_id(); + let task = graph.canonicalization_task(chain_tip, Default::default()); + chain.canonicalize(task) + } + .list_expected_spk_txids(&graph.index, ..), ); let sync_response = client.sync(sync_request, 1)?; assert!( @@ -117,9 +120,12 @@ pub fn detect_receive_tx_cancel() -> anyhow::Result<()> { .chain_tip(chain.tip()) .spks_with_indexes(graph.index.all_spks().clone()) .expected_spk_txids( - graph - .canonical_view(&chain, chain.tip().block_id(), Default::default()) - .list_expected_spk_txids(&graph.index, ..), + { + let chain_tip = chain.tip().block_id(); + let task = graph.canonicalization_task(chain_tip, Default::default()); + chain.canonicalize(task) + } + .list_expected_spk_txids(&graph.index, ..), ); let sync_response = client.sync(sync_request, 1)?; assert!( diff --git a/examples/example_bitcoind_rpc_polling/src/main.rs b/examples/example_bitcoind_rpc_polling/src/main.rs index 0263c5b0b..61e8b05ea 100644 --- a/examples/example_bitcoind_rpc_polling/src/main.rs +++ b/examples/example_bitcoind_rpc_polling/src/main.rs @@ -144,15 +144,16 @@ fn main() -> anyhow::Result<()> { &rpc_client, chain.tip(), fallback_height, - graph - .canonical_view( - &*chain, - chain.tip().block_id(), - CanonicalizationParams::default(), - ) - .txs() - .filter(|tx| tx.pos.is_unconfirmed()) - .map(|tx| tx.tx), + { + let chain_tip = chain.tip().block_id(); + let task = graph + .graph() + .canonicalization_task(chain_tip, CanonicalizationParams::default()); + chain.canonicalize(task) + } + .txs() + .filter(|tx| tx.pos.is_unconfirmed()) + .map(|tx| tx.tx), ) }; let mut db_stage = ChangeSet::default(); @@ -196,17 +197,19 @@ fn main() -> anyhow::Result<()> { last_print = Instant::now(); let synced_to = chain.tip(); let balance = { - graph - .canonical_view( - &*chain, - synced_to.block_id(), + { + let synced_to_block = synced_to.block_id(); + let task = graph.graph().canonicalization_task( + synced_to_block, CanonicalizationParams::default(), - ) - .balance( - graph.index.outpoints().iter().cloned(), - |(k, _), _| k == &Keychain::Internal, - 1, - ) + ); + chain.canonicalize(task) + } + .balance( + graph.index.outpoints().iter().cloned(), + |(k, _), _| k == &Keychain::Internal, + 0, + ) }; println!( "[{:>10}s] synced to {} @ {} | total: {}", @@ -249,15 +252,16 @@ fn main() -> anyhow::Result<()> { rpc_client.clone(), chain.tip(), fallback_height, - graph - .canonical_view( - &*chain, - chain.tip().block_id(), - CanonicalizationParams::default(), - ) - .txs() - .filter(|tx| tx.pos.is_unconfirmed()) - .map(|tx| tx.tx), + { + let chain_tip = chain.tip().block_id(); + let task = graph + .graph() + .canonicalization_task(chain_tip, CanonicalizationParams::default()); + chain.canonicalize(task) + } + .txs() + .filter(|tx| tx.pos.is_unconfirmed()) + .map(|tx| tx.tx), ) }; @@ -356,17 +360,19 @@ fn main() -> anyhow::Result<()> { last_print = Some(Instant::now()); let synced_to = chain.tip(); let balance = { - graph - .canonical_view( - &*chain, - synced_to.block_id(), + { + let synced_to_block = synced_to.block_id(); + let task = graph.graph().canonicalization_task( + synced_to_block, CanonicalizationParams::default(), - ) - .balance( - graph.index.outpoints().iter().cloned(), - |(k, _), _| k == &Keychain::Internal, - 1, - ) + ); + chain.canonicalize(task) + } + .balance( + graph.index.outpoints().iter().cloned(), + |(k, _), _| k == &Keychain::Internal, + 0, + ) }; println!( "[{:>10}s] synced to {} @ {} / {} | total: {}", diff --git a/examples/example_cli/src/lib.rs b/examples/example_cli/src/lib.rs index baa17e6d7..b883b0df4 100644 --- a/examples/example_cli/src/lib.rs +++ b/examples/example_cli/src/lib.rs @@ -2,6 +2,7 @@ use bdk_chain::keychain_txout::DEFAULT_LOOKAHEAD; use serde_json::json; use std::cmp; use std::collections::HashMap; +use std::convert::Infallible; use std::env; use std::fmt; use std::str::FromStr; @@ -260,18 +261,15 @@ pub struct ChangeInfo { pub index: u32, } -pub fn create_tx( +pub fn create_tx( graph: &mut KeychainTxGraph, - chain: &O, + chain: &LocalChain, assets: &Assets, cs_algorithm: CoinSelectionAlgo, address: Address, value: u64, feerate: f32, -) -> anyhow::Result<(Psbt, Option)> -where - O::Error: std::error::Error + Send + Sync + 'static, -{ +) -> anyhow::Result<(Psbt, Option)> { let mut changeset = keychain_txout::ChangeSet::default(); // get planned utxos @@ -424,15 +422,18 @@ where // Alias the elements of `planned_utxos` pub type PlanUtxo = (Plan, FullTxOut); -pub fn planned_utxos( +pub fn planned_utxos( graph: &KeychainTxGraph, - chain: &O, + chain: &LocalChain, assets: &Assets, -) -> Result, O::Error> { - let chain_tip = chain.get_chain_tip()?; +) -> Result, Infallible> { + let chain_tip = chain.tip().block_id(); let outpoints = graph.index.outpoints(); - graph - .try_canonical_view(chain, chain_tip, CanonicalizationParams::default())? + let task = graph + .graph() + .canonicalization_task(chain_tip, CanonicalizationParams::default()); + chain + .canonicalize(task) .filter_unspent_outpoints(outpoints.iter().cloned()) .filter_map(|((k, i), full_txo)| -> Option> { let desc = graph @@ -524,17 +525,15 @@ pub fn handle_commands( } } - let balance = graph - .try_canonical_view( - chain, - chain.get_chain_tip()?, - CanonicalizationParams::default(), - )? - .balance( - graph.index.outpoints().iter().cloned(), - |(k, _), _| k == &Keychain::Internal, - 1, - ); + let chain_tip = chain.tip().block_id(); + let task = graph + .graph() + .canonicalization_task(chain_tip, CanonicalizationParams::default()); + let balance = chain.canonicalize(task).balance( + graph.index.outpoints().iter().cloned(), + |(k, _), _| k == &Keychain::Internal, + 1, + ); let confirmed_total = balance.confirmed + balance.immature; let unconfirmed_total = balance.untrusted_pending + balance.trusted_pending; @@ -571,8 +570,11 @@ pub fn handle_commands( confirmed, unconfirmed, } => { - let txouts = graph - .try_canonical_view(chain, chain_tip, CanonicalizationParams::default())? + let task = graph + .graph() + .canonicalization_task(chain_tip, CanonicalizationParams::default()); + let txouts = chain + .canonicalize(task) .filter_outpoints(outpoints.iter().cloned()) .filter(|(_, full_txo)| match (spent, unspent) { (true, false) => full_txo.spent_by.is_some(), @@ -631,7 +633,7 @@ pub fn handle_commands( create_tx( &mut graph, - &*chain, + &chain, &assets, coin_select, address, diff --git a/examples/example_electrum/src/main.rs b/examples/example_electrum/src/main.rs index aa89f07e1..ed2895a1a 100644 --- a/examples/example_electrum/src/main.rs +++ b/examples/example_electrum/src/main.rs @@ -213,11 +213,11 @@ fn main() -> anyhow::Result<()> { eprintln!("[ SCANNING {pc:03.0}% ] {item}"); }); - let canonical_view = graph.canonical_view( - &*chain, - chain_tip.block_id(), - CanonicalizationParams::default(), - ); + let chain_tip_block = chain_tip.block_id(); + let task = graph + .graph() + .canonicalization_task(chain_tip_block, CanonicalizationParams::default()); + let canonical_view = chain.canonicalize(task); request = request .expected_spk_txids(canonical_view.list_expected_spk_txids(&graph.index, ..)); diff --git a/examples/example_esplora/src/main.rs b/examples/example_esplora/src/main.rs index 99f72391c..2e74d4f2e 100644 --- a/examples/example_esplora/src/main.rs +++ b/examples/example_esplora/src/main.rs @@ -225,11 +225,11 @@ fn main() -> anyhow::Result<()> { { let graph = graph.lock().unwrap(); let chain = chain.lock().unwrap(); - let canonical_view = graph.canonical_view( - &*chain, - local_tip.block_id(), - CanonicalizationParams::default(), - ); + let local_tip_block = local_tip.block_id(); + let task = graph + .graph() + .canonicalization_task(local_tip_block, CanonicalizationParams::default()); + let canonical_view = chain.canonicalize(task); request = request .expected_spk_txids(canonical_view.list_expected_spk_txids(&graph.index, ..));