diff --git a/Cargo.toml b/Cargo.toml index d505c1a0a..bc40f4df6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ members = [ "examples/example_electrum", "examples/example_esplora", "examples/example_bitcoind_rpc_polling", + "examples/example_chain_query", ] [workspace.package] diff --git a/crates/bitcoind_rpc/examples/filter_iter.rs b/crates/bitcoind_rpc/examples/filter_iter.rs index e79bde672..b7df2a02d 100644 --- a/crates/bitcoind_rpc/examples/filter_iter.rs +++ b/crates/bitcoind_rpc/examples/filter_iter.rs @@ -69,7 +69,9 @@ fn main() -> anyhow::Result<()> { println!("\ntook: {}s", start.elapsed().as_secs()); println!("Local tip: {}", chain.tip().height()); - let canonical_view = graph.canonical_view(&chain, chain.tip().block_id(), Default::default()); + let chain_tip = chain.tip().block_id(); + let task = graph.canonicalization_task(chain_tip, Default::default()); + let canonical_view = chain.canonicalize(task); let unspent: Vec<_> = canonical_view .filter_unspent_outpoints(graph.index.outpoints().clone()) diff --git a/crates/bitcoind_rpc/tests/test_emitter.rs b/crates/bitcoind_rpc/tests/test_emitter.rs index 6453037e6..ec5cdc69e 100644 --- a/crates/bitcoind_rpc/tests/test_emitter.rs +++ b/crates/bitcoind_rpc/tests/test_emitter.rs @@ -310,9 +310,12 @@ fn get_balance( ) -> anyhow::Result { let chain_tip = recv_chain.tip().block_id(); let outpoints = recv_graph.index.outpoints().clone(); - let balance = recv_graph - .canonical_view(recv_chain, chain_tip, CanonicalizationParams::default()) - .balance(outpoints, |_, _| true, 1); + let task = recv_graph + .graph() + .canonicalization_task(chain_tip, CanonicalizationParams::default()); + let balance = recv_chain + .canonicalize(task) + .balance(outpoints, |_, _| true, 0); Ok(balance) } @@ -616,8 +619,9 @@ fn test_expect_tx_evicted() -> anyhow::Result<()> { let _txid_2 = core.send_raw_transaction(&tx1b)?; // Retrieve the expected unconfirmed txids and spks from the graph. - let exp_spk_txids = graph - .canonical_view(&chain, chain_tip, Default::default()) + let task = graph.canonicalization_task(chain_tip, Default::default()); + let exp_spk_txids = chain + .canonicalize(task) .list_expected_spk_txids(&graph.index, ..) .collect::>(); assert_eq!(exp_spk_txids, vec![(spk, txid_1)]); @@ -632,8 +636,11 @@ fn test_expect_tx_evicted() -> anyhow::Result<()> { // Update graph with evicted tx. let _ = graph.batch_insert_relevant_evicted_at(mempool_event.evicted); - let canonical_txids = graph - .canonical_view(&chain, chain_tip, CanonicalizationParams::default()) + let task = graph + .graph() + .canonicalization_task(chain_tip, CanonicalizationParams::default()); + let canonical_txids = chain + .canonicalize(task) .txs() .map(|tx| tx.txid) .collect::>(); diff --git a/crates/chain/benches/canonicalization.rs b/crates/chain/benches/canonicalization.rs index 3d8d8b295..456ca9b04 100644 --- a/crates/chain/benches/canonicalization.rs +++ b/crates/chain/benches/canonicalization.rs @@ -95,31 +95,31 @@ fn setup(f: F) -> (KeychainTxGraph, Lo } fn run_list_canonical_txs(tx_graph: &KeychainTxGraph, chain: &LocalChain, exp_txs: usize) { - let view = tx_graph.canonical_view( - chain, - chain.tip().block_id(), - CanonicalizationParams::default(), - ); + let chain_tip = chain.tip().block_id(); + let task = tx_graph + .graph() + .canonicalization_task(chain_tip, CanonicalizationParams::default()); + let view = chain.canonicalize(task); let txs = view.txs(); assert_eq!(txs.count(), exp_txs); } fn run_filter_chain_txouts(tx_graph: &KeychainTxGraph, chain: &LocalChain, exp_txos: usize) { - let view = tx_graph.canonical_view( - chain, - chain.tip().block_id(), - CanonicalizationParams::default(), - ); + let chain_tip = chain.tip().block_id(); + let task = tx_graph + .graph() + .canonicalization_task(chain_tip, CanonicalizationParams::default()); + let view = chain.canonicalize(task); let utxos = view.filter_outpoints(tx_graph.index.outpoints().clone()); assert_eq!(utxos.count(), exp_txos); } fn run_filter_chain_unspents(tx_graph: &KeychainTxGraph, chain: &LocalChain, exp_utxos: usize) { - let view = tx_graph.canonical_view( - chain, - chain.tip().block_id(), - CanonicalizationParams::default(), - ); + let chain_tip = chain.tip().block_id(); + let task = tx_graph + .graph() + .canonicalization_task(chain_tip, CanonicalizationParams::default()); + let view = chain.canonicalize(task); let utxos = view.filter_unspent_outpoints(tx_graph.index.outpoints().clone()); assert_eq!(utxos.count(), exp_utxos); } diff --git a/crates/chain/benches/indexer.rs b/crates/chain/benches/indexer.rs index 3caea42d2..7f3cd0dce 100644 --- a/crates/chain/benches/indexer.rs +++ b/crates/chain/benches/indexer.rs @@ -84,9 +84,10 @@ fn do_bench(indexed_tx_graph: &KeychainTxGraph, chain: &LocalChain) { // Check balance let chain_tip = chain.tip().block_id(); let op = graph.index.outpoints().clone(); - let bal = graph - .canonical_view(chain, chain_tip, CanonicalizationParams::default()) - .balance(op, |_, _| false, 1); + let task = graph + .graph() + .canonicalization_task(chain_tip, CanonicalizationParams::default()); + let bal = chain.canonicalize(task).balance(op, |_, _| false, 1); assert_eq!(bal.total(), AMOUNT * TX_CT as u64); } diff --git a/crates/chain/src/canonical_iter.rs b/crates/chain/src/canonical_iter.rs deleted file mode 100644 index 204ead451..000000000 --- a/crates/chain/src/canonical_iter.rs +++ /dev/null @@ -1,344 +0,0 @@ -use crate::collections::{HashMap, HashSet, VecDeque}; -use crate::tx_graph::{TxAncestors, TxDescendants}; -use crate::{Anchor, ChainOracle, TxGraph}; -use alloc::boxed::Box; -use alloc::collections::BTreeSet; -use alloc::sync::Arc; -use alloc::vec::Vec; -use bdk_core::BlockId; -use bitcoin::{Transaction, Txid}; - -type CanonicalMap = HashMap, CanonicalReason)>; -type NotCanonicalSet = HashSet; - -/// Modifies the canonicalization algorithm. -#[derive(Debug, Default, Clone)] -pub struct CanonicalizationParams { - /// Transactions that will supercede all other transactions. - /// - /// In case of conflicting transactions within `assume_canonical`, transactions that appear - /// later in the list (have higher index) have precedence. - pub assume_canonical: Vec, -} - -/// Iterates over canonical txs. -pub struct CanonicalIter<'g, A, C> { - tx_graph: &'g TxGraph, - chain: &'g C, - chain_tip: BlockId, - - unprocessed_assumed_txs: Box)> + 'g>, - unprocessed_anchored_txs: - Box, &'g BTreeSet)> + 'g>, - unprocessed_seen_txs: Box, u64)> + 'g>, - unprocessed_leftover_txs: VecDeque<(Txid, Arc, u32)>, - - canonical: CanonicalMap, - not_canonical: NotCanonicalSet, - - queue: VecDeque, -} - -impl<'g, A: Anchor, C: ChainOracle> CanonicalIter<'g, A, C> { - /// Constructs [`CanonicalIter`]. - pub fn new( - tx_graph: &'g TxGraph, - chain: &'g C, - chain_tip: BlockId, - params: CanonicalizationParams, - ) -> Self { - let anchors = tx_graph.all_anchors(); - let unprocessed_assumed_txs = Box::new( - params - .assume_canonical - .into_iter() - .rev() - .filter_map(|txid| Some((txid, tx_graph.get_tx(txid)?))), - ); - let unprocessed_anchored_txs = Box::new( - tx_graph - .txids_by_descending_anchor_height() - .filter_map(|(_, txid)| Some((txid, tx_graph.get_tx(txid)?, anchors.get(&txid)?))), - ); - let unprocessed_seen_txs = Box::new( - tx_graph - .txids_by_descending_last_seen() - .filter_map(|(last_seen, txid)| Some((txid, tx_graph.get_tx(txid)?, last_seen))), - ); - Self { - tx_graph, - chain, - chain_tip, - unprocessed_assumed_txs, - unprocessed_anchored_txs, - unprocessed_seen_txs, - unprocessed_leftover_txs: VecDeque::new(), - canonical: HashMap::new(), - not_canonical: HashSet::new(), - queue: VecDeque::new(), - } - } - - /// Whether this transaction is already canonicalized. - fn is_canonicalized(&self, txid: Txid) -> bool { - self.canonical.contains_key(&txid) || self.not_canonical.contains(&txid) - } - - /// Mark transaction as canonical if it is anchored in the best chain. - fn scan_anchors( - &mut self, - txid: Txid, - tx: Arc, - anchors: &BTreeSet, - ) -> Result<(), C::Error> { - for anchor in anchors { - let in_chain_opt = self - .chain - .is_block_in_chain(anchor.anchor_block(), self.chain_tip)?; - if in_chain_opt == Some(true) { - self.mark_canonical(txid, tx, CanonicalReason::from_anchor(anchor.clone())); - return Ok(()); - } - } - // cannot determine - self.unprocessed_leftover_txs.push_back(( - txid, - tx, - anchors - .iter() - .last() - .expect( - "tx taken from `unprocessed_txs_with_anchors` so it must atleast have an anchor", - ) - .confirmation_height_upper_bound(), - )); - Ok(()) - } - - /// Marks `tx` and it's ancestors as canonical and mark all conflicts of these as - /// `not_canonical`. - /// - /// The exception is when it is discovered that `tx` double spends itself (i.e. two of it's - /// inputs conflict with each other), then no changes will be made. - /// - /// The logic works by having two loops where one is nested in another. - /// * The outer loop iterates through ancestors of `tx` (including `tx`). We can transitively - /// assume that all ancestors of `tx` are also canonical. - /// * The inner loop loops through conflicts of ancestors of `tx`. Any descendants of conflicts - /// are also conflicts and are transitively considered non-canonical. - /// - /// If the inner loop ends up marking `tx` as non-canonical, then we know that it double spends - /// itself. - fn mark_canonical(&mut self, txid: Txid, tx: Arc, reason: CanonicalReason) { - let starting_txid = txid; - let mut is_starting_tx = true; - - // We keep track of changes made so far so that we can undo it later in case we detect that - // `tx` double spends itself. - let mut detected_self_double_spend = false; - let mut undo_not_canonical = Vec::::new(); - - // `staged_queue` doubles as the `undo_canonical` data. - let staged_queue = TxAncestors::new_include_root( - self.tx_graph, - tx, - |_: usize, tx: Arc| -> Option { - let this_txid = tx.compute_txid(); - let this_reason = if is_starting_tx { - is_starting_tx = false; - reason.clone() - } else { - reason.to_transitive(starting_txid) - }; - - use crate::collections::hash_map::Entry; - let canonical_entry = match self.canonical.entry(this_txid) { - // Already visited tx before, exit early. - Entry::Occupied(_) => return None, - Entry::Vacant(entry) => entry, - }; - - // Any conflicts with a canonical tx can be added to `not_canonical`. Descendants - // of `not_canonical` txs can also be added to `not_canonical`. - for (_, conflict_txid) in self.tx_graph.direct_conflicts(&tx) { - TxDescendants::new_include_root( - self.tx_graph, - conflict_txid, - |_: usize, txid: Txid| -> Option<()> { - if self.not_canonical.insert(txid) { - undo_not_canonical.push(txid); - Some(()) - } else { - None - } - }, - ) - .run_until_finished() - } - - if self.not_canonical.contains(&this_txid) { - // Early exit if self-double-spend is detected. - detected_self_double_spend = true; - return None; - } - canonical_entry.insert((tx, this_reason)); - Some(this_txid) - }, - ) - .collect::>(); - - if detected_self_double_spend { - for txid in staged_queue { - self.canonical.remove(&txid); - } - for txid in undo_not_canonical { - self.not_canonical.remove(&txid); - } - } else { - self.queue.extend(staged_queue); - } - } -} - -impl Iterator for CanonicalIter<'_, A, C> { - type Item = Result<(Txid, Arc, CanonicalReason), C::Error>; - - fn next(&mut self) -> Option { - loop { - if let Some(txid) = self.queue.pop_front() { - let (tx, reason) = self - .canonical - .get(&txid) - .cloned() - .expect("reason must exist"); - return Some(Ok((txid, tx, reason))); - } - - if let Some((txid, tx)) = self.unprocessed_assumed_txs.next() { - if !self.is_canonicalized(txid) { - self.mark_canonical(txid, tx, CanonicalReason::assumed()); - } - } - - if let Some((txid, tx, anchors)) = self.unprocessed_anchored_txs.next() { - if !self.is_canonicalized(txid) { - if let Err(err) = self.scan_anchors(txid, tx, anchors) { - return Some(Err(err)); - } - } - continue; - } - - if let Some((txid, tx, last_seen)) = self.unprocessed_seen_txs.next() { - debug_assert!( - !tx.is_coinbase(), - "Coinbase txs must not have `last_seen` (in mempool) value" - ); - if !self.is_canonicalized(txid) { - let observed_in = ObservedIn::Mempool(last_seen); - self.mark_canonical(txid, tx, CanonicalReason::from_observed_in(observed_in)); - } - continue; - } - - if let Some((txid, tx, height)) = self.unprocessed_leftover_txs.pop_front() { - if !self.is_canonicalized(txid) && !tx.is_coinbase() { - let observed_in = ObservedIn::Block(height); - self.mark_canonical(txid, tx, CanonicalReason::from_observed_in(observed_in)); - } - continue; - } - - return None; - } - } -} - -/// Represents when and where a transaction was last observed in. -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] -pub enum ObservedIn { - /// The transaction was last observed in a block of height. - Block(u32), - /// The transaction was last observed in the mempool at the given unix timestamp. - Mempool(u64), -} - -/// The reason why a transaction is canonical. -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum CanonicalReason { - /// This transaction is explicitly assumed to be canonical by the caller, superceding all other - /// canonicalization rules. - Assumed { - /// Whether it is a descendant that is assumed to be canonical. - descendant: Option, - }, - /// This transaction is anchored in the best chain by `A`, and therefore canonical. - Anchor { - /// The anchor that anchored the transaction in the chain. - anchor: A, - /// Whether the anchor is of the transaction's descendant. - descendant: Option, - }, - /// This transaction does not conflict with any other transaction with a more recent - /// [`ObservedIn`] value or one that is anchored in the best chain. - ObservedIn { - /// The [`ObservedIn`] value of the transaction. - observed_in: ObservedIn, - /// Whether the [`ObservedIn`] value is of the transaction's descendant. - descendant: Option, - }, -} - -impl CanonicalReason { - /// Constructs a [`CanonicalReason`] for a transaction that is assumed to supercede all other - /// transactions. - pub fn assumed() -> Self { - Self::Assumed { descendant: None } - } - - /// Constructs a [`CanonicalReason`] from an `anchor`. - pub fn from_anchor(anchor: A) -> Self { - Self::Anchor { - anchor, - descendant: None, - } - } - - /// Constructs a [`CanonicalReason`] from an `observed_in` value. - pub fn from_observed_in(observed_in: ObservedIn) -> Self { - Self::ObservedIn { - observed_in, - descendant: None, - } - } - - /// Contruct a new [`CanonicalReason`] from the original which is transitive to `descendant`. - /// - /// This signals that either the [`ObservedIn`] or [`Anchor`] value belongs to the transaction's - /// descendant, but is transitively relevant. - pub fn to_transitive(&self, descendant: Txid) -> Self { - match self { - CanonicalReason::Assumed { .. } => Self::Assumed { - descendant: Some(descendant), - }, - CanonicalReason::Anchor { anchor, .. } => Self::Anchor { - anchor: anchor.clone(), - descendant: Some(descendant), - }, - CanonicalReason::ObservedIn { observed_in, .. } => Self::ObservedIn { - observed_in: *observed_in, - descendant: Some(descendant), - }, - } - } - - /// This signals that either the [`ObservedIn`] or [`Anchor`] value belongs to the transaction's - /// descendant. - pub fn descendant(&self) -> &Option { - match self { - CanonicalReason::Assumed { descendant, .. } => descendant, - CanonicalReason::Anchor { descendant, .. } => descendant, - CanonicalReason::ObservedIn { descendant, .. } => descendant, - } - } -} diff --git a/crates/chain/src/canonical_task.rs b/crates/chain/src/canonical_task.rs new file mode 100644 index 000000000..986599620 --- /dev/null +++ b/crates/chain/src/canonical_task.rs @@ -0,0 +1,519 @@ +use crate::collections::{HashMap, HashSet, VecDeque}; +use crate::tx_graph::{TxAncestors, TxDescendants}; +use crate::{Anchor, CanonicalView, ChainPosition, TxGraph}; +use alloc::boxed::Box; +use alloc::collections::BTreeSet; +use alloc::sync::Arc; +use alloc::vec::Vec; +use bdk_core::{BlockId, ChainQuery, ChainRequest, ChainResponse}; +use bitcoin::{Transaction, Txid}; + +type CanonicalMap = HashMap, CanonicalReason)>; +type NotCanonicalSet = HashSet; + +/// Modifies the canonicalization algorithm. +#[derive(Debug, Default, Clone)] +pub struct CanonicalizationParams { + /// Transactions that will supersede all other transactions. + /// + /// In case of conflicting transactions within `assume_canonical`, transactions that appear + /// later in the list (have higher index) have precedence. + pub assume_canonical: Vec, +} + +/// Manages the canonicalization process without direct I/O operations. +pub struct CanonicalizationTask<'g, A> { + tx_graph: &'g TxGraph, + chain_tip: BlockId, + + unprocessed_assumed_txs: Box)> + 'g>, + unprocessed_anchored_txs: VecDeque<(Txid, Arc, &'g BTreeSet)>, + unprocessed_seen_txs: Box, u64)> + 'g>, + unprocessed_leftover_txs: VecDeque<(Txid, Arc, u32)>, + + canonical: CanonicalMap, + not_canonical: NotCanonicalSet, + + // Store canonical transactions in order + canonical_order: Vec, + + // Track which transactions have confirmed anchors + confirmed_anchors: HashMap, +} + +impl<'g, A: Anchor> ChainQuery for CanonicalizationTask<'g, A> { + type Output = CanonicalView; + + fn next_query(&mut self) -> Option { + // Find the next non-canonicalized transaction to query + if let Some((_txid, _, anchors)) = self.unprocessed_anchored_txs.front() { + // if !self.is_canonicalized(*txid) { + // // Build query for this transaction + // let block_ids = anchors.iter().map(|anchor| anchor.anchor_block()).collect(); + // return Some(ChainRequest { + // chain_tip: self.chain_tip, + // block_ids, + // }); + // } + // // Skip already canonicalized transaction + // self.unprocessed_anchored_txs.pop_front(); + // Build query for this transaction + let block_ids = anchors.iter().map(|anchor| anchor.anchor_block()).collect(); + return Some(ChainRequest { + chain_tip: self.chain_tip, + block_ids, + }); + } + None + } + + fn resolve_query(&mut self, response: ChainResponse) { + if let Some((txid, tx, anchors)) = self.unprocessed_anchored_txs.pop_front() { + // Find the anchor that matches the confirmed BlockId + let best_anchor = response.and_then(|block_id| { + anchors + .iter() + .find(|anchor| anchor.anchor_block() == block_id) + .cloned() + }); + + match best_anchor { + Some(best_anchor) => { + self.confirmed_anchors.insert(txid, best_anchor.clone()); + if !self.is_canonicalized(txid) { + self.mark_canonical(txid, tx, CanonicalReason::from_anchor(best_anchor)); + } + } + None => { + self.unprocessed_leftover_txs.push_back(( + txid, + tx, + anchors + .iter() + .last() + .expect( + "tx taken from `unprocessed_txs_with_anchors` so it must at least have an anchor", + ) + .confirmation_height_upper_bound(), + )) + } + } + } + } + + fn is_finished(&mut self) -> bool { + self.unprocessed_anchored_txs.is_empty() + } + + fn finish(mut self) -> Self::Output { + // Process remaining transactions (seen and leftover) + self.process_seen_txs(); + self.process_leftover_txs(); + + // Build the canonical view + let mut view_order = Vec::new(); + let mut view_txs = HashMap::new(); + let mut view_spends = HashMap::new(); + + for txid in &self.canonical_order { + if let Some((tx, reason)) = self.canonical.get(txid) { + view_order.push(*txid); + + // Add spends + if !tx.is_coinbase() { + for input in &tx.input { + view_spends.insert(input.previous_output, *txid); + } + } + + // Get transaction node for first_seen/last_seen info + let tx_node = match self.tx_graph.get_tx_node(*txid) { + Some(tx_node) => tx_node, + None => { + debug_assert!(false, "tx node must exist!"); + continue; + } + }; + + // Determine chain position based on reason + let chain_position = match reason { + CanonicalReason::Assumed { descendant } => match descendant { + Some(_) => match self.confirmed_anchors.get(txid) { + Some(confirmed_anchor) => ChainPosition::Confirmed { + anchor: confirmed_anchor, + transitively: None, + }, + None => ChainPosition::Unconfirmed { + first_seen: tx_node.first_seen, + last_seen: tx_node.last_seen, + }, + }, + None => ChainPosition::Unconfirmed { + first_seen: tx_node.first_seen, + last_seen: tx_node.last_seen, + }, + }, + CanonicalReason::Anchor { anchor, descendant } => match descendant { + Some(_) => match self.confirmed_anchors.get(txid) { + Some(confirmed_anchor) => ChainPosition::Confirmed { + anchor: confirmed_anchor, + transitively: None, + }, + None => ChainPosition::Confirmed { + anchor, + transitively: *descendant, + }, + }, + None => ChainPosition::Confirmed { + anchor, + transitively: None, + }, + }, + CanonicalReason::ObservedIn { observed_in, .. } => match observed_in { + ObservedIn::Mempool(last_seen) => ChainPosition::Unconfirmed { + first_seen: tx_node.first_seen, + last_seen: Some(*last_seen), + }, + ObservedIn::Block(_) => ChainPosition::Unconfirmed { + first_seen: tx_node.first_seen, + last_seen: None, + }, + }, + }; + + view_txs.insert(*txid, (tx.clone(), chain_position.cloned())); + } + } + + CanonicalView::new(self.chain_tip, view_order, view_txs, view_spends) + } +} + +impl<'g, A: Anchor> CanonicalizationTask<'g, A> { + /// Creates a new canonicalization task. + pub fn new( + tx_graph: &'g TxGraph, + chain_tip: BlockId, + params: CanonicalizationParams, + ) -> Self { + let anchors = tx_graph.all_anchors(); + let unprocessed_assumed_txs = Box::new( + params + .assume_canonical + .into_iter() + .rev() + .filter_map(|txid| Some((txid, tx_graph.get_tx(txid)?))), + ); + let unprocessed_anchored_txs: VecDeque<_> = tx_graph + .txids_by_descending_anchor_height() + .filter_map(|(_, txid)| Some((txid, tx_graph.get_tx(txid)?, anchors.get(&txid)?))) + .collect(); + let unprocessed_seen_txs = Box::new( + tx_graph + .txids_by_descending_last_seen() + .filter_map(|(last_seen, txid)| Some((txid, tx_graph.get_tx(txid)?, last_seen))), + ); + + let mut task = Self { + tx_graph, + chain_tip, + + unprocessed_assumed_txs, + unprocessed_anchored_txs, + unprocessed_seen_txs, + unprocessed_leftover_txs: VecDeque::new(), + + canonical: HashMap::new(), + not_canonical: HashSet::new(), + + canonical_order: Vec::new(), + confirmed_anchors: HashMap::new(), + }; + + // process assumed transactions first (they don't need queries) + task.process_assumed_txs(); + + task + } + + fn is_canonicalized(&self, txid: Txid) -> bool { + self.canonical.contains_key(&txid) || self.not_canonical.contains(&txid) + } + + fn process_assumed_txs(&mut self) { + while let Some((txid, tx)) = self.unprocessed_assumed_txs.next() { + if !self.is_canonicalized(txid) { + self.mark_canonical(txid, tx, CanonicalReason::assumed()); + } + } + } + + fn process_seen_txs(&mut self) { + while let Some((txid, tx, last_seen)) = self.unprocessed_seen_txs.next() { + debug_assert!( + !tx.is_coinbase(), + "Coinbase txs must not have `last_seen` (in mempool) value" + ); + if !self.is_canonicalized(txid) { + let observed_in = ObservedIn::Mempool(last_seen); + self.mark_canonical(txid, tx, CanonicalReason::from_observed_in(observed_in)); + } + } + } + + fn process_leftover_txs(&mut self) { + while let Some((txid, tx, height)) = self.unprocessed_leftover_txs.pop_front() { + if !self.is_canonicalized(txid) && !tx.is_coinbase() { + let observed_in = ObservedIn::Block(height); + self.mark_canonical(txid, tx, CanonicalReason::from_observed_in(observed_in)); + } + } + } + + fn mark_canonical(&mut self, txid: Txid, tx: Arc, reason: CanonicalReason) { + let starting_txid = txid; + let mut is_starting_tx = true; + + // We keep track of changes made so far so that we can undo it later in case we detect that + // `tx` double spends itself. + let mut detected_self_double_spend = false; + let mut undo_not_canonical = Vec::::new(); + let mut staged_canonical = Vec::<(Txid, Arc, CanonicalReason)>::new(); + + // Process ancestors + TxAncestors::new_include_root( + self.tx_graph, + tx, + |_: usize, tx: Arc| -> Option { + let this_txid = tx.compute_txid(); + let this_reason = if is_starting_tx { + is_starting_tx = false; + reason.clone() + } else { + // This is an ancestor being marked transitively + // Check if it has its own anchor that needs to be verified later + // We'll check anchors after marking it canonical + reason.to_transitive(starting_txid) + }; + + use crate::collections::hash_map::Entry; + let canonical_entry = match self.canonical.entry(this_txid) { + // Already visited tx before, exit early. + Entry::Occupied(_) => return None, + Entry::Vacant(entry) => entry, + }; + + // Any conflicts with a canonical tx can be added to `not_canonical`. Descendants + // of `not_canonical` txs can also be added to `not_canonical`. + for (_, conflict_txid) in self.tx_graph.direct_conflicts(&tx) { + TxDescendants::new_include_root( + self.tx_graph, + conflict_txid, + |_: usize, txid: Txid| -> Option<()> { + if self.not_canonical.insert(txid) { + undo_not_canonical.push(txid); + Some(()) + } else { + None + } + }, + ) + .run_until_finished() + } + + if self.not_canonical.contains(&this_txid) { + // Early exit if self-double-spend is detected. + detected_self_double_spend = true; + return None; + } + + staged_canonical.push((this_txid, tx.clone(), this_reason.clone())); + canonical_entry.insert((tx.clone(), this_reason)); + Some(this_txid) + }, + ) + .run_until_finished(); + + if detected_self_double_spend { + // Undo changes + for (txid, _, _) in staged_canonical { + self.canonical.remove(&txid); + } + for txid in undo_not_canonical { + self.not_canonical.remove(&txid); + } + } else { + // Add to canonical order + for (txid, tx, reason) in &staged_canonical { + self.canonical_order.push(*txid); + + // If this was marked transitively, check if it has anchors to verify + let is_transitive = matches!( + reason, + CanonicalReason::Anchor { + descendant: Some(_), + .. + } | CanonicalReason::Assumed { + descendant: Some(_), + .. + } + ); + + if is_transitive { + if let Some(anchors) = self.tx_graph.all_anchors().get(txid) { + // only check anchors we haven't already confirmed + if !self.confirmed_anchors.contains_key(txid) { + self.unprocessed_anchored_txs + .push_back((*txid, tx.clone(), anchors)); + } + } + } + } + } + } +} + +/// Represents when and where a transaction was last observed in. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +pub enum ObservedIn { + /// The transaction was last observed in a block of height. + Block(u32), + /// The transaction was last observed in the mempool at the given unix timestamp. + Mempool(u64), +} + +/// The reason why a transaction is canonical. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum CanonicalReason { + /// This transaction is explicitly assumed to be canonical by the caller, superceding all other + /// canonicalization rules. + Assumed { + /// Whether it is a descendant that is assumed to be canonical. + descendant: Option, + }, + /// This transaction is anchored in the best chain by `A`, and therefore canonical. + Anchor { + /// The anchor that anchored the transaction in the chain. + anchor: A, + /// Whether the anchor is of the transaction's descendant. + descendant: Option, + }, + /// This transaction does not conflict with any other transaction with a more recent + /// [`ObservedIn`] value or one that is anchored in the best chain. + ObservedIn { + /// The [`ObservedIn`] value of the transaction. + observed_in: ObservedIn, + /// Whether the [`ObservedIn`] value is of the transaction's descendant. + descendant: Option, + }, +} + +impl CanonicalReason { + /// Constructs a [`CanonicalReason`] for a transaction that is assumed to supercede all other + /// transactions. + pub fn assumed() -> Self { + Self::Assumed { descendant: None } + } + + /// Constructs a [`CanonicalReason`] from an `anchor`. + pub fn from_anchor(anchor: A) -> Self { + Self::Anchor { + anchor, + descendant: None, + } + } + + /// Constructs a [`CanonicalReason`] from an `observed_in` value. + pub fn from_observed_in(observed_in: ObservedIn) -> Self { + Self::ObservedIn { + observed_in, + descendant: None, + } + } + + /// Contruct a new [`CanonicalReason`] from the original which is transitive to `descendant`. + /// + /// This signals that either the [`ObservedIn`] or [`Anchor`] value belongs to the transaction's + /// descendant, but is transitively relevant. + pub fn to_transitive(&self, descendant: Txid) -> Self { + match self { + CanonicalReason::Assumed { .. } => Self::Assumed { + descendant: Some(descendant), + }, + CanonicalReason::Anchor { anchor, .. } => Self::Anchor { + anchor: anchor.clone(), + descendant: Some(descendant), + }, + CanonicalReason::ObservedIn { observed_in, .. } => Self::ObservedIn { + observed_in: *observed_in, + descendant: Some(descendant), + }, + } + } + + /// This signals that either the [`ObservedIn`] or [`Anchor`] value belongs to the transaction's + /// descendant. + pub fn descendant(&self) -> &Option { + match self { + CanonicalReason::Assumed { descendant, .. } => descendant, + CanonicalReason::Anchor { descendant, .. } => descendant, + CanonicalReason::ObservedIn { descendant, .. } => descendant, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::local_chain::LocalChain; + use bitcoin::{hashes::Hash, BlockHash, TxIn, TxOut}; + + #[test] + fn test_canonicalization_task_sans_io() { + // Create a simple chain + let blocks = [ + (0, BlockHash::all_zeros()), + (1, BlockHash::from_byte_array([1; 32])), + (2, BlockHash::from_byte_array([2; 32])), + ]; + let chain = LocalChain::from_blocks(blocks.into_iter().collect()).unwrap(); + let chain_tip = chain.tip().block_id(); + + // Create a simple transaction graph + let mut tx_graph = TxGraph::default(); + + // Add a transaction + let tx = bitcoin::Transaction { + version: bitcoin::transaction::Version::ONE, + lock_time: bitcoin::absolute::LockTime::ZERO, + input: vec![TxIn::default()], + output: vec![TxOut { + value: bitcoin::Amount::from_sat(1000), + script_pubkey: bitcoin::ScriptBuf::new(), + }], + }; + let _ = tx_graph.insert_tx(tx.clone()); + let txid = tx.compute_txid(); + + // Add an anchor at height 1 + let anchor = crate::ConfirmationBlockTime { + block_id: chain.get(1).unwrap().block_id(), + confirmation_time: 12345, + }; + let _ = tx_graph.insert_anchor(txid, anchor); + + // Create canonicalization task and canonicalize using the chain + let params = CanonicalizationParams::default(); + let task = CanonicalizationTask::new(&tx_graph, chain_tip, params); + let canonical_view = chain.canonicalize(task); + + // Should have one canonical transaction + assert_eq!(canonical_view.txs().len(), 1); + let canon_tx = canonical_view.txs().next().unwrap(); + assert_eq!(canon_tx.txid, txid); + assert_eq!(canon_tx.tx.compute_txid(), txid); + + // Should be confirmed (anchored) + assert!(matches!(canon_tx.pos, ChainPosition::Confirmed { .. })); + } +} diff --git a/crates/chain/src/canonical_view.rs b/crates/chain/src/canonical_view.rs index 09b18e50a..7840f236f 100644 --- a/crates/chain/src/canonical_view.rs +++ b/crates/chain/src/canonical_view.rs @@ -6,14 +6,15 @@ //! ## Example //! //! ``` -//! # use bdk_chain::{CanonicalView, TxGraph, CanonicalizationParams, local_chain::LocalChain}; +//! # use bdk_chain::{TxGraph, CanonicalizationParams, CanonicalizationTask, local_chain::LocalChain}; //! # use bdk_core::BlockId; //! # use bitcoin::hashes::Hash; //! # let tx_graph = TxGraph::::default(); //! # let chain = LocalChain::from_blocks([(0, bitcoin::BlockHash::all_zeros())].into_iter().collect()).unwrap(); -//! # let chain_tip = chain.tip().block_id(); +//! let chain_tip = chain.tip().block_id(); //! let params = CanonicalizationParams::default(); -//! let view = CanonicalView::new(&tx_graph, &chain, chain_tip, params).unwrap(); +//! let task = CanonicalizationTask::new(&tx_graph, chain_tip, params); +//! let view = chain.canonicalize(task); //! //! // Iterate over canonical transactions //! for tx in view.txs() { @@ -30,10 +31,7 @@ use alloc::vec::Vec; use bdk_core::BlockId; use bitcoin::{Amount, OutPoint, ScriptBuf, Transaction, Txid}; -use crate::{ - spk_txout::SpkTxOutIndex, tx_graph::TxNode, Anchor, Balance, CanonicalIter, CanonicalReason, - CanonicalizationParams, ChainOracle, ChainPosition, FullTxOut, ObservedIn, TxGraph, -}; +use crate::{spk_txout::SpkTxOutIndex, Anchor, Balance, ChainPosition, FullTxOut}; /// A single canonical transaction with its chain position. /// @@ -76,115 +74,24 @@ pub struct CanonicalView { } impl CanonicalView { - /// Create a new canonical view from a transaction graph. + /// Creates a [`CanonicalView`] from its constituent parts. /// - /// This constructor analyzes the given [`TxGraph`] and creates a canonical view of all - /// transactions, resolving conflicts and ordering them according to their chain position. - /// - /// # Returns - /// - /// Returns `Ok(CanonicalView)` on success, or an error if the chain oracle fails. - pub fn new<'g, C>( - tx_graph: &'g TxGraph, - chain: &'g C, - chain_tip: BlockId, - params: CanonicalizationParams, - ) -> Result - where - C: ChainOracle, - { - fn find_direct_anchor<'g, A: Anchor, C: ChainOracle>( - tx_node: &TxNode<'g, Arc, A>, - chain: &C, - chain_tip: BlockId, - ) -> Result, C::Error> { - tx_node - .anchors - .iter() - .find_map(|a| -> Option> { - match chain.is_block_in_chain(a.anchor_block(), chain_tip) { - Ok(Some(true)) => Some(Ok(a.clone())), - Ok(Some(false)) | Ok(None) => None, - Err(err) => Some(Err(err)), - } - }) - .transpose() + /// This internal constructor is used by [`CanonicalizationTask`] to build the view + /// after completing the canonicalization process. It takes the processed transaction + /// data including the canonical ordering, transaction map with chain positions, and + /// spend information. + pub(crate) fn new( + tip: BlockId, + order: Vec, + txs: HashMap, ChainPosition)>, + spends: HashMap, + ) -> Self { + Self { + tip, + order, + txs, + spends, } - - let mut view = Self { - tip: chain_tip, - order: vec![], - txs: HashMap::new(), - spends: HashMap::new(), - }; - - for r in CanonicalIter::new(tx_graph, chain, chain_tip, params) { - let (txid, tx, why) = r?; - - let tx_node = match tx_graph.get_tx_node(txid) { - Some(tx_node) => tx_node, - None => { - // TODO: Have the `CanonicalIter` return `TxNode`s. - debug_assert!(false, "tx node must exist!"); - continue; - } - }; - - view.order.push(txid); - - if !tx.is_coinbase() { - view.spends - .extend(tx.input.iter().map(|txin| (txin.previous_output, txid))); - } - - let pos = match why { - CanonicalReason::Assumed { descendant } => match descendant { - Some(_) => match find_direct_anchor(&tx_node, chain, chain_tip)? { - Some(anchor) => ChainPosition::Confirmed { - anchor, - transitively: None, - }, - None => ChainPosition::Unconfirmed { - first_seen: tx_node.first_seen, - last_seen: tx_node.last_seen, - }, - }, - None => ChainPosition::Unconfirmed { - first_seen: tx_node.first_seen, - last_seen: tx_node.last_seen, - }, - }, - CanonicalReason::Anchor { anchor, descendant } => match descendant { - Some(_) => match find_direct_anchor(&tx_node, chain, chain_tip)? { - Some(anchor) => ChainPosition::Confirmed { - anchor, - transitively: None, - }, - None => ChainPosition::Confirmed { - anchor, - transitively: descendant, - }, - }, - None => ChainPosition::Confirmed { - anchor, - transitively: None, - }, - }, - CanonicalReason::ObservedIn { observed_in, .. } => match observed_in { - ObservedIn::Mempool(last_seen) => ChainPosition::Unconfirmed { - first_seen: tx_node.first_seen, - last_seen: Some(last_seen), - }, - ObservedIn::Block(_) => ChainPosition::Unconfirmed { - first_seen: tx_node.first_seen, - last_seen: None, - }, - }, - }; - view.txs.insert(txid, (tx_node.tx, pos)); - } - - Ok(view) } /// Get a single canonical transaction by its transaction ID. @@ -232,12 +139,14 @@ impl CanonicalView { /// # Example /// /// ``` - /// # use bdk_chain::{CanonicalView, TxGraph, local_chain::LocalChain}; + /// # use bdk_chain::{TxGraph, CanonicalizationTask, local_chain::LocalChain}; /// # use bdk_core::BlockId; /// # use bitcoin::hashes::Hash; /// # let tx_graph = TxGraph::::default(); /// # let chain = LocalChain::from_blocks([(0, bitcoin::BlockHash::all_zeros())].into_iter().collect()).unwrap(); - /// # let view = CanonicalView::new(&tx_graph, &chain, chain.tip().block_id(), Default::default()).unwrap(); + /// # let chain_tip = chain.tip().block_id(); + /// # let task = CanonicalizationTask::new(&tx_graph, chain_tip, Default::default()); + /// # let view = chain.canonicalize(task); /// // Iterate over all canonical transactions /// for tx in view.txs() { /// println!("TX {}: {:?}", tx.txid, tx.pos); @@ -265,12 +174,14 @@ impl CanonicalView { /// # Example /// /// ``` - /// # use bdk_chain::{CanonicalView, TxGraph, local_chain::LocalChain, keychain_txout::KeychainTxOutIndex}; + /// # use bdk_chain::{TxGraph, CanonicalizationTask, local_chain::LocalChain, keychain_txout::KeychainTxOutIndex}; /// # use bdk_core::BlockId; /// # use bitcoin::hashes::Hash; /// # let tx_graph = TxGraph::::default(); /// # let chain = LocalChain::from_blocks([(0, bitcoin::BlockHash::all_zeros())].into_iter().collect()).unwrap(); - /// # let view = CanonicalView::new(&tx_graph, &chain, chain.tip().block_id(), Default::default()).unwrap(); + /// # let chain_tip = chain.tip().block_id(); + /// # let task = CanonicalizationTask::new(&tx_graph, chain_tip, Default::default()); + /// # let view = chain.canonicalize(task); /// # let indexer = KeychainTxOutIndex::<&str>::default(); /// // Get all outputs from an indexer /// for (keychain, txout) in view.filter_outpoints(indexer.outpoints().clone()) { @@ -294,12 +205,14 @@ impl CanonicalView { /// # Example /// /// ``` - /// # use bdk_chain::{CanonicalView, TxGraph, local_chain::LocalChain, keychain_txout::KeychainTxOutIndex}; + /// # use bdk_chain::{TxGraph, CanonicalizationTask, local_chain::LocalChain, keychain_txout::KeychainTxOutIndex}; /// # use bdk_core::BlockId; /// # use bitcoin::hashes::Hash; /// # let tx_graph = TxGraph::::default(); /// # let chain = LocalChain::from_blocks([(0, bitcoin::BlockHash::all_zeros())].into_iter().collect()).unwrap(); - /// # let view = CanonicalView::new(&tx_graph, &chain, chain.tip().block_id(), Default::default()).unwrap(); + /// # let chain_tip = chain.tip().block_id(); + /// # let task = CanonicalizationTask::new(&tx_graph, chain_tip, Default::default()); + /// # let view = chain.canonicalize(task); /// # let indexer = KeychainTxOutIndex::<&str>::default(); /// // Get unspent outputs (UTXOs) from an indexer /// for (keychain, utxo) in view.filter_unspent_outpoints(indexer.outpoints().clone()) { @@ -340,12 +253,14 @@ impl CanonicalView { /// # Example /// /// ``` - /// # use bdk_chain::{CanonicalView, TxGraph, local_chain::LocalChain, keychain_txout::KeychainTxOutIndex}; + /// # use bdk_chain::{CanonicalizationTask, TxGraph, local_chain::LocalChain, keychain_txout::KeychainTxOutIndex}; /// # use bdk_core::BlockId; /// # use bitcoin::hashes::Hash; /// # let tx_graph = TxGraph::::default(); /// # let chain = LocalChain::from_blocks([(0, bitcoin::BlockHash::all_zeros())].into_iter().collect()).unwrap(); - /// # let view = CanonicalView::new(&tx_graph, &chain, chain.tip().block_id(), Default::default()).unwrap(); + /// # let chain_tip = chain.tip().block_id(); + /// # let task = CanonicalizationTask::new(&tx_graph, chain_tip, Default::default()); + /// # let view = chain.canonicalize(task); /// # let indexer = KeychainTxOutIndex::<&str>::default(); /// // Calculate balance with 6 confirmations, trusting all outputs /// let balance = view.balance( diff --git a/crates/chain/src/indexed_tx_graph.rs b/crates/chain/src/indexed_tx_graph.rs index 9adf7ed93..cd4483eb7 100644 --- a/crates/chain/src/indexed_tx_graph.rs +++ b/crates/chain/src/indexed_tx_graph.rs @@ -1,14 +1,14 @@ //! Contains the [`IndexedTxGraph`] and associated types. Refer to the //! [`IndexedTxGraph`] documentation for more. -use core::{convert::Infallible, fmt::Debug}; +use core::fmt::Debug; use alloc::{sync::Arc, vec::Vec}; use bitcoin::{Block, OutPoint, Transaction, TxOut, Txid}; use crate::{ + canonical_task::CanonicalizationParams, tx_graph::{self, TxGraph}, - Anchor, BlockId, CanonicalView, CanonicalizationParams, ChainOracle, Indexer, Merge, - TxPosInBlock, + Anchor, BlockId, CanonicalizationTask, Indexer, Merge, TxPosInBlock, }; /// A [`TxGraph`] paired with an indexer `I`, enforcing that every insertion into the graph is @@ -423,36 +423,28 @@ where } } +impl AsRef> for IndexedTxGraph { + fn as_ref(&self) -> &TxGraph { + &self.graph + } +} + impl IndexedTxGraph where A: Anchor, { - /// Returns a [`CanonicalView`]. - pub fn try_canonical_view<'a, C: ChainOracle>( - &'a self, - chain: &'a C, - chain_tip: BlockId, - params: CanonicalizationParams, - ) -> Result, C::Error> { - self.graph.try_canonical_view(chain, chain_tip, params) - } - - /// Returns a [`CanonicalView`]. + /// Creates a `[CanonicalizationTask]` to determine the `[CanonicalView]` of transactions. /// - /// This is the infallible version of [`try_canonical_view`](Self::try_canonical_view). - pub fn canonical_view<'a, C: ChainOracle>( - &'a self, - chain: &'a C, + /// This method delegates to the underlying [`TxGraph`] to create a [`CanonicalizationTask`] + /// that can be used to determine which transactions are canonical based on the provided + /// parameters. The task handles the stateless canonicalization logic and can be polled + /// for anchor verification requests. + pub fn canonicalization_task( + &'_ self, chain_tip: BlockId, params: CanonicalizationParams, - ) -> CanonicalView { - self.graph.canonical_view(chain, chain_tip, params) - } -} - -impl AsRef> for IndexedTxGraph { - fn as_ref(&self) -> &TxGraph { - &self.graph + ) -> CanonicalizationTask<'_, A> { + self.graph.canonicalization_task(chain_tip, params) } } diff --git a/crates/chain/src/lib.rs b/crates/chain/src/lib.rs index be9170b1a..2e0a83c27 100644 --- a/crates/chain/src/lib.rs +++ b/crates/chain/src/lib.rs @@ -44,8 +44,8 @@ pub mod tx_graph; pub use tx_graph::TxGraph; mod chain_oracle; pub use chain_oracle::*; -mod canonical_iter; -pub use canonical_iter::*; +mod canonical_task; +pub use canonical_task::*; mod canonical_view; pub use canonical_view::*; diff --git a/crates/chain/src/local_chain.rs b/crates/chain/src/local_chain.rs index 81f4a1796..9ed7dab03 100644 --- a/crates/chain/src/local_chain.rs +++ b/crates/chain/src/local_chain.rs @@ -6,7 +6,7 @@ use core::ops::RangeBounds; use crate::collections::BTreeMap; use crate::{BlockId, ChainOracle, Merge}; -use bdk_core::ToBlockHash; +use bdk_core::{ChainQuery, ToBlockHash}; pub use bdk_core::{CheckPoint, CheckPointIter}; use bitcoin::block::Header; use bitcoin::BlockHash; @@ -96,6 +96,80 @@ impl ChainOracle for LocalChain { // Methods for `LocalChain` impl LocalChain { + // /// Check if a block is in the chain. + // /// + // /// # Arguments + // /// * `block` - The block to check + // /// * `chain_tip` - The chain tip to check against + // /// + // /// # Returns + // /// * `Some(true)` if the block is in the chain + // /// * `Some(false)` if the block is not in the chain + // /// * `None` if it cannot be determined + // pub fn is_block_in_chain(&self, block: BlockId, chain_tip: BlockId) -> Option { + // let chain_tip_cp = match self.tip.get(chain_tip.height) { + // // we can only determine whether `block` is in chain of `chain_tip` if `chain_tip` + // can // be identified in chain + // Some(cp) if cp.hash() == chain_tip.hash => cp, + // _ => return None, + // }; + // chain_tip_cp + // .get(block.height) + // .map(|cp| cp.hash() == block.hash) + // } + + // /// Get the chain tip. + // /// + // /// # Returns + // /// The [`BlockId`] of the chain tip. + // pub fn chain_tip(&self) -> BlockId { + // self.tip.block_id() + // } + + /// Canonicalize a transaction graph using this chain. + /// + /// This method processes any type implementing [`ChainQuery`], handling all its requests + /// to determine which transactions are canonical, and returns the query's output. + /// + /// # Example + /// + /// ``` + /// # use bdk_chain::{CanonicalizationTask, CanonicalizationParams, TxGraph, local_chain::LocalChain}; + /// # use bdk_core::BlockId; + /// # use bitcoin::hashes::Hash; + /// # let tx_graph: TxGraph = TxGraph::default(); + /// # let chain = LocalChain::from_blocks([(0, bitcoin::BlockHash::all_zeros())].into_iter().collect()).unwrap(); + /// let chain_tip = chain.tip().block_id(); + /// let task = CanonicalizationTask::new(&tx_graph, chain_tip, CanonicalizationParams::default()); + /// let view = chain.canonicalize(task); + /// ``` + pub fn canonicalize(&self, mut task: Q) -> Q::Output + where + Q: ChainQuery, + { + // Process all requests from the task + while let Some(request) = task.next_query() { + let chain_tip = request.chain_tip; + + // Check each block ID and return the first confirmed one + let mut best_block_id = None; + for block_id in &request.block_ids { + if self + .is_block_in_chain(*block_id, chain_tip) + .expect("infallible") + == Some(true) + { + best_block_id = Some(*block_id); + break; + } + } + task.resolve_query(best_block_id); + } + + // Return the finished canonical view + task.finish() + } + /// Update the chain with a given [`Header`] at `height` which you claim is connected to a /// existing block in the chain. /// diff --git a/crates/chain/src/tx_graph.rs b/crates/chain/src/tx_graph.rs index 97d4ecc02..5731f021a 100644 --- a/crates/chain/src/tx_graph.rs +++ b/crates/chain/src/tx_graph.rs @@ -21,18 +21,26 @@ //! Conflicting transactions are allowed to coexist within a [`TxGraph`]. A process called //! canonicalization is required to get a conflict-free view of transactions. //! -//! * [`canonical_iter`](TxGraph::canonical_iter) returns a [`CanonicalIter`] which performs -//! incremental canonicalization. This is useful when you only need to check specific transactions -//! (e.g., verifying whether a few unconfirmed transactions are canonical) without computing the -//! entire canonical view. -//! * [`canonical_view`](TxGraph::canonical_view) returns a [`CanonicalView`] which provides a -//! complete canonical view of the graph. This is required for typical wallet operations like -//! querying balances, listing outputs, transactions, and UTXOs. You must construct this first -//! before performing these operations. +//! The canonicalization process uses a two-step, sans-IO approach: //! -//! All these methods require a `chain` and `chain_tip` argument. The `chain` must be a -//! [`ChainOracle`] implementation (such as [`LocalChain`](crate::local_chain::LocalChain)) which -//! identifies which blocks exist under a given `chain_tip`. +//! 1. **Create a canonicalization task** using +//! [`canonicalization_task`](TxGraph::canonicalization_task): ```ignore let task = +//! tx_graph.canonicalization_task(params); ``` This creates a [`CanonicalizationTask`] that +//! encapsulates the canonicalization logic without performing any I/O operations. +//! +//! 2. **Execute the task** with a chain oracle to obtain a [`CanonicalView`]: ```ignore let view = +//! chain.canonicalize(task); ``` The chain oracle (such as +//! [`LocalChain`](crate::local_chain::LocalChain)) handles all anchor verification queries from +//! the task. +//! +//! The [`CanonicalView`] provides a complete canonical view of the graph. This is required for +//! typical wallet operations like querying balances, listing outputs, transactions, and UTXOs. +//! You must construct this view before performing these operations. +//! +//! The separation between task creation and execution (sans-IO pattern) enables: +//! * Better testability - tasks can be tested without a real chain +//! * Flexibility - different chain oracle implementations can be used +//! * Clean separation of concerns - canonicalization logic is isolated from I/O //! //! The canonicalization algorithm uses the following associated data to determine which //! transactions have precedence over others: @@ -119,11 +127,9 @@ //! [`insert_txout`]: TxGraph::insert_txout use crate::collections::*; -use crate::BlockId; -use crate::CanonicalIter; -use crate::CanonicalView; use crate::CanonicalizationParams; -use crate::{Anchor, ChainOracle, Merge}; +use crate::CanonicalizationTask; +use crate::{Anchor, BlockId, Merge}; use alloc::collections::vec_deque::VecDeque; use alloc::sync::Arc; use alloc::vec::Vec; @@ -131,10 +137,7 @@ use bdk_core::ConfirmationBlockTime; pub use bdk_core::TxUpdate; use bitcoin::{Amount, OutPoint, SignedAmount, Transaction, TxOut, Txid}; use core::fmt::{self, Formatter}; -use core::{ - convert::Infallible, - ops::{Deref, RangeInclusive}, -}; +use core::ops::{Deref, RangeInclusive}; impl From> for TxUpdate { fn from(graph: TxGraph) -> Self { @@ -952,6 +955,20 @@ impl TxGraph { let _ = self.insert_evicted_at(txid, evicted_at); } } + + /// Creates a `[CanonicalizationTask]` to determine the `[CanonicalView]` of transactions. + /// + /// This method delegates to the underlying [`TxGraph`] to create a [`CanonicalizationTask`] + /// that can be used to determine which transactions are canonical based on the provided + /// parameters. The task handles the stateless canonicalization logic and can be polled + /// for anchor verification requests. + pub fn canonicalization_task( + &'_ self, + chain_tip: BlockId, + params: CanonicalizationParams, + ) -> CanonicalizationTask<'_, A> { + CanonicalizationTask::new(self, chain_tip, params) + } } impl TxGraph { @@ -980,36 +997,6 @@ impl TxGraph { }) } - /// Returns a [`CanonicalIter`]. - pub fn canonical_iter<'a, C: ChainOracle>( - &'a self, - chain: &'a C, - chain_tip: BlockId, - params: CanonicalizationParams, - ) -> CanonicalIter<'a, A, C> { - CanonicalIter::new(self, chain, chain_tip, params) - } - - /// Returns a [`CanonicalView`]. - pub fn try_canonical_view<'a, C: ChainOracle>( - &'a self, - chain: &'a C, - chain_tip: BlockId, - params: CanonicalizationParams, - ) -> Result, C::Error> { - CanonicalView::new(self, chain, chain_tip, params) - } - - /// Returns a [`CanonicalView`]. - pub fn canonical_view<'a, C: ChainOracle>( - &'a self, - chain: &'a C, - chain_tip: BlockId, - params: CanonicalizationParams, - ) -> CanonicalView { - CanonicalView::new(self, chain, chain_tip, params).expect("infallible") - } - /// Construct a `TxGraph` from a `changeset`. pub fn from_changeset(changeset: ChangeSet) -> Self { let mut graph = Self::default(); diff --git a/crates/chain/tests/test_canonical_view.rs b/crates/chain/tests/test_canonical_view.rs index 3c0d54381..47bab2758 100644 --- a/crates/chain/tests/test_canonical_view.rs +++ b/crates/chain/tests/test_canonical_view.rs @@ -54,8 +54,8 @@ fn test_min_confirmations_parameter() { let _ = tx_graph.insert_anchor(txid, anchor_height_5); let chain_tip = chain.tip().block_id(); - let canonical_view = - tx_graph.canonical_view(&chain, chain_tip, CanonicalizationParams::default()); + let task = tx_graph.canonicalization_task(chain_tip, CanonicalizationParams::default()); + let canonical_view = chain.canonicalize(task); // Test min_confirmations = 1: Should be confirmed (has 6 confirmations) let balance_1_conf = canonical_view.balance( @@ -142,11 +142,9 @@ fn test_min_confirmations_with_untrusted_tx() { }; let _ = tx_graph.insert_anchor(txid, anchor); - let canonical_view = tx_graph.canonical_view( - &chain, - chain.tip().block_id(), - CanonicalizationParams::default(), - ); + let chain_tip = chain.tip().block_id(); + let task = tx_graph.canonicalization_task(chain_tip, CanonicalizationParams::default()); + let canonical_view = chain.canonicalize(task); // Test with min_confirmations = 5 and untrusted predicate let balance = canonical_view.balance( @@ -263,11 +261,9 @@ fn test_min_confirmations_multiple_transactions() { ); outpoints.push(((), outpoint2)); - let canonical_view = tx_graph.canonical_view( - &chain, - chain.tip().block_id(), - CanonicalizationParams::default(), - ); + let chain_tip = chain.tip().block_id(); + let task = tx_graph.canonicalization_task(chain_tip, CanonicalizationParams::default()); + let canonical_view = chain.canonicalize(task); // Test with min_confirmations = 5 // tx0: 11 confirmations -> confirmed diff --git a/crates/chain/tests/test_indexed_tx_graph.rs b/crates/chain/tests/test_indexed_tx_graph.rs index 7a2f8ea60..dc3fc5290 100644 --- a/crates/chain/tests/test_indexed_tx_graph.rs +++ b/crates/chain/tests/test_indexed_tx_graph.rs @@ -459,23 +459,30 @@ fn test_list_owned_txouts() { .get(height) .map(|cp| cp.block_id()) .unwrap_or_else(|| panic!("block must exist at {height}")); - let txouts = graph - .canonical_view(&local_chain, chain_tip, CanonicalizationParams::default()) + let task = graph + .graph() + .canonicalization_task(chain_tip, CanonicalizationParams::default()); + let txouts = local_chain + .canonicalize(task) .filter_outpoints(graph.index.outpoints().iter().cloned()) .collect::>(); - let utxos = graph - .canonical_view(&local_chain, chain_tip, CanonicalizationParams::default()) + let task = graph + .graph() + .canonicalization_task(chain_tip, CanonicalizationParams::default()); + let utxos = local_chain + .canonicalize(task) .filter_unspent_outpoints(graph.index.outpoints().iter().cloned()) .collect::>(); - let balance = graph - .canonical_view(&local_chain, chain_tip, CanonicalizationParams::default()) - .balance( - graph.index.outpoints().iter().cloned(), - |_, txout| trusted_spks.contains(&txout.txout.script_pubkey), - 1, - ); + let task = graph + .graph() + .canonicalization_task(chain_tip, CanonicalizationParams::default()); + let balance = local_chain.canonicalize(task).balance( + graph.index.outpoints().iter().cloned(), + |_, txout| trusted_spks.contains(&txout.txout.script_pubkey), + 0, + ); let confirmed_txouts_txid = txouts .iter() @@ -778,20 +785,17 @@ fn test_get_chain_position() { } // check chain position - let chain_pos = graph - .canonical_view( - chain, - chain.tip().block_id(), - CanonicalizationParams::default(), - ) - .txs() - .find_map(|canon_tx| { - if canon_tx.txid == txid { - Some(canon_tx.pos) - } else { - None - } - }); + let chain_tip = chain.tip().block_id(); + let task = graph + .graph() + .canonicalization_task(chain_tip, CanonicalizationParams::default()); + let chain_pos = chain.canonicalize(task).txs().find_map(|canon_tx| { + if canon_tx.txid == txid { + Some(canon_tx.pos) + } else { + None + } + }); assert_eq!(chain_pos, exp_pos, "failed test case: {name}"); } diff --git a/crates/chain/tests/test_tx_graph.rs b/crates/chain/tests/test_tx_graph.rs index b2a359608..2b7ebf847 100644 --- a/crates/chain/tests/test_tx_graph.rs +++ b/crates/chain/tests/test_tx_graph.rs @@ -1014,8 +1014,10 @@ fn test_chain_spends() { let build_canonical_spends = |chain: &LocalChain, tx_graph: &TxGraph| -> HashMap { - tx_graph - .canonical_view(chain, tip.block_id(), CanonicalizationParams::default()) + let task = + tx_graph.canonicalization_task(tip.block_id(), CanonicalizationParams::default()); + chain + .canonicalize(task) .filter_outpoints(tx_graph.all_txouts().map(|(op, _)| ((), op))) .filter_map(|(_, full_txo)| Some((full_txo.outpoint, full_txo.spent_by?))) .collect() @@ -1023,8 +1025,10 @@ fn test_chain_spends() { let build_canonical_positions = |chain: &LocalChain, tx_graph: &TxGraph| -> HashMap> { - tx_graph - .canonical_view(chain, tip.block_id(), CanonicalizationParams::default()) + let task = + tx_graph.canonicalization_task(tip.block_id(), CanonicalizationParams::default()); + chain + .canonicalize(task) .txs() .map(|canon_tx| (canon_tx.txid, canon_tx.pos)) .collect() @@ -1197,38 +1201,25 @@ fn transactions_inserted_into_tx_graph_are_not_canonical_until_they_have_an_anch .into_iter() .collect(); let chain = LocalChain::from_blocks(blocks).unwrap(); - let canonical_txs: Vec<_> = graph - .canonical_view( - &chain, - chain.tip().block_id(), - CanonicalizationParams::default(), - ) - .txs() - .collect(); + let chain_tip = chain.tip().block_id(); + let task = graph.canonicalization_task(chain_tip, CanonicalizationParams::default()); + let canonical_txs: Vec<_> = chain.canonicalize(task).txs().collect(); assert!(canonical_txs.is_empty()); // tx0 with seen_at should be returned by canonical txs let _ = graph.insert_seen_at(txids[0], 2); - let canonical_view = graph.canonical_view( - &chain, - chain.tip().block_id(), - CanonicalizationParams::default(), - ); + let chain_tip = chain.tip().block_id(); + let task = graph.canonicalization_task(chain_tip, CanonicalizationParams::default()); + let canonical_view = chain.canonicalize(task); let mut canonical_txs = canonical_view.txs(); assert_eq!(canonical_txs.next().map(|tx| tx.txid).unwrap(), txids[0]); drop(canonical_txs); // tx1 with anchor is also canonical let _ = graph.insert_anchor(txids[1], block_id!(2, "B")); - let canonical_txids: Vec<_> = graph - .canonical_view( - &chain, - chain.tip().block_id(), - CanonicalizationParams::default(), - ) - .txs() - .map(|tx| tx.txid) - .collect(); + let chain_tip = chain.tip().block_id(); + let task = graph.canonicalization_task(chain_tip, CanonicalizationParams::default()); + let canonical_txids: Vec<_> = chain.canonicalize(task).txs().map(|tx| tx.txid).collect(); assert!(canonical_txids.contains(&txids[1])); assert!(graph.txs_with_no_anchor_or_last_seen().next().is_none()); } diff --git a/crates/chain/tests/test_tx_graph_conflicts.rs b/crates/chain/tests/test_tx_graph_conflicts.rs index 70dc01884..aef1e2d43 100644 --- a/crates/chain/tests/test_tx_graph_conflicts.rs +++ b/crates/chain/tests/test_tx_graph_conflicts.rs @@ -970,9 +970,11 @@ fn test_tx_conflict_handling() { for scenario in scenarios { let env = init_graph(scenario.tx_templates.iter()); - let txs = env + let task = env .tx_graph - .canonical_view(&local_chain, chain_tip, env.canonicalization_params.clone()) + .canonicalization_task(chain_tip, env.canonicalization_params.clone()); + let txs = local_chain + .canonicalize(task) .txs() .map(|tx| tx.txid) .collect::>(); @@ -987,9 +989,11 @@ fn test_tx_conflict_handling() { scenario.name ); - let txouts = env + let task = env .tx_graph - .canonical_view(&local_chain, chain_tip, env.canonicalization_params.clone()) + .canonicalization_task(chain_tip, env.canonicalization_params.clone()); + let txouts = local_chain + .canonicalize(task) .filter_outpoints(env.indexer.outpoints().iter().cloned()) .map(|(_, full_txout)| full_txout.outpoint) .collect::>(); @@ -1007,9 +1011,11 @@ fn test_tx_conflict_handling() { scenario.name ); - let utxos = env + let task = env .tx_graph - .canonical_view(&local_chain, chain_tip, env.canonicalization_params.clone()) + .canonicalization_task(chain_tip, env.canonicalization_params.clone()); + let utxos = local_chain + .canonicalize(task) .filter_unspent_outpoints(env.indexer.outpoints().iter().cloned()) .map(|(_, full_txout)| full_txout.outpoint) .collect::>(); @@ -1027,18 +1033,18 @@ fn test_tx_conflict_handling() { scenario.name ); - let balance = env + let task = env .tx_graph - .canonical_view(&local_chain, chain_tip, env.canonicalization_params.clone()) - .balance( - env.indexer.outpoints().iter().cloned(), - |_, txout| { - env.indexer - .index_of_spk(txout.txout.script_pubkey.clone()) - .is_some() - }, - 0, - ); + .canonicalization_task(chain_tip, env.canonicalization_params.clone()); + let balance = local_chain.canonicalize(task).balance( + env.indexer.outpoints().iter().cloned(), + |_, txout| { + env.indexer + .index_of_spk(txout.txout.script_pubkey.clone()) + .is_some() + }, + 0, + ); assert_eq!( balance, scenario.exp_balance, "\n[{}] 'balance' failed", diff --git a/crates/core/src/chain_query.rs b/crates/core/src/chain_query.rs new file mode 100644 index 000000000..9a99d5ea8 --- /dev/null +++ b/crates/core/src/chain_query.rs @@ -0,0 +1,68 @@ +//! Generic trait for query-based operations that require external blockchain data. +//! +//! The [`ChainQuery`] trait provides a standardized interface for implementing +//! algorithms that need to make queries to blockchain sources and process responses +//! in a sans-IO manner. + +use crate::BlockId; +use alloc::vec::Vec; + +/// A request to check which block identifiers are confirmed in the chain. +/// +/// This is used to verify if specific blocks are part of the canonical chain. +/// The generic parameter `B` represents the block identifier type, which defaults to `BlockId`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ChainRequest { + /// The chain tip to use as reference for the query. + pub chain_tip: B, + /// The block identifiers to check for confirmation in the chain. + pub block_ids: Vec, +} + +/// Response containing the best confirmed block identifier, if any. +/// +/// Returns `Some(B)` if at least one of the requested blocks +/// is confirmed in the chain, or `None` if none are confirmed. +/// The generic parameter `B` represents the block identifier type, which defaults to `BlockId`. +pub type ChainResponse = Option; + +/// A trait for types that perform query-based operations against blockchain data. +/// +/// This trait enables types to request blockchain information via queries and process +/// responses in a decoupled, sans-IO manner. It's particularly useful for algorithms +/// that need to interact with blockchain oracles, chain sources, or other blockchain +/// data providers without directly performing I/O. +/// +/// # Type Parameters +/// +/// * `B` - The type of block identifier used in queries (defaults to `BlockId`) +pub trait ChainQuery { + /// The final output type produced when the query process is complete. + type Output; + + /// Returns the next query needed, if any. + /// + /// This method should return `Some(request)` if more information is needed, + /// or `None` if no more queries are required. + fn next_query(&mut self) -> Option>; + + /// Resolves a query with the given response. + /// + /// This method processes the response to a previous query request and updates + /// the internal state accordingly. + fn resolve_query(&mut self, response: ChainResponse); + + /// Returns true if the query process is complete and ready to finish. + /// + /// The default implementation returns `true` when there are no more queries needed. + /// Implementors can override this for more specific behavior if needed. + fn is_finished(&mut self) -> bool { + self.next_query().is_none() + } + + /// Completes the query process and returns the final output. + /// + /// This method should be called when `is_finished` returns `true`. + /// It consumes `self` and produces the final output. + fn finish(self) -> Self::Output; +} diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 95bebe907..33e921687 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -72,3 +72,6 @@ mod merge; pub use merge::*; pub mod spk_client; + +mod chain_query; +pub use chain_query::*; diff --git a/crates/electrum/tests/test_electrum.rs b/crates/electrum/tests/test_electrum.rs index befeeb4cc..1845dc602 100644 --- a/crates/electrum/tests/test_electrum.rs +++ b/crates/electrum/tests/test_electrum.rs @@ -40,9 +40,12 @@ fn get_balance( ) -> anyhow::Result { let chain_tip = recv_chain.tip().block_id(); let outpoints = recv_graph.index.outpoints().clone(); - let balance = recv_graph - .canonical_view(recv_chain, chain_tip, CanonicalizationParams::default()) - .balance(outpoints, |_, _| true, 1); + let task = recv_graph + .graph() + .canonicalization_task(chain_tip, CanonicalizationParams::default()); + let balance = recv_chain + .canonicalize(task) + .balance(outpoints, |_, _| true, 0); Ok(balance) } @@ -147,9 +150,12 @@ pub fn detect_receive_tx_cancel() -> anyhow::Result<()> { .chain_tip(chain.tip()) .spks_with_indexes(graph.index.all_spks().clone()) .expected_spk_txids( - graph - .canonical_view(&chain, chain.tip().block_id(), Default::default()) - .list_expected_spk_txids(&graph.index, ..), + { + let chain_tip = chain.tip().block_id(); + let task = graph.canonicalization_task(chain_tip, Default::default()); + chain.canonicalize(task) + } + .list_expected_spk_txids(&graph.index, ..), ); let sync_response = client.sync(sync_request, BATCH_SIZE, true)?; assert!( @@ -176,9 +182,12 @@ pub fn detect_receive_tx_cancel() -> anyhow::Result<()> { .chain_tip(chain.tip()) .spks_with_indexes(graph.index.all_spks().clone()) .expected_spk_txids( - graph - .canonical_view(&chain, chain.tip().block_id(), Default::default()) - .list_expected_spk_txids(&graph.index, ..), + { + let chain_tip = chain.tip().block_id(); + let task = graph.canonicalization_task(chain_tip, Default::default()); + chain.canonicalize(task) + } + .list_expected_spk_txids(&graph.index, ..), ); let sync_response = client.sync(sync_request, BATCH_SIZE, true)?; assert!( diff --git a/crates/esplora/tests/async_ext.rs b/crates/esplora/tests/async_ext.rs index 3c628c20d..be747a656 100644 --- a/crates/esplora/tests/async_ext.rs +++ b/crates/esplora/tests/async_ext.rs @@ -88,9 +88,12 @@ pub async fn detect_receive_tx_cancel() -> anyhow::Result<()> { .chain_tip(chain.tip()) .spks_with_indexes(graph.index.all_spks().clone()) .expected_spk_txids( - graph - .canonical_view(&chain, chain.tip().block_id(), Default::default()) - .list_expected_spk_txids(&graph.index, ..), + { + let chain_tip = chain.tip().block_id(); + let task = graph.canonicalization_task(chain_tip, Default::default()); + chain.canonicalize(task) + } + .list_expected_spk_txids(&graph.index, ..), ); let sync_response = client.sync(sync_request, 1).await?; assert!( @@ -117,9 +120,12 @@ pub async fn detect_receive_tx_cancel() -> anyhow::Result<()> { .chain_tip(chain.tip()) .spks_with_indexes(graph.index.all_spks().clone()) .expected_spk_txids( - graph - .canonical_view(&chain, chain.tip().block_id(), Default::default()) - .list_expected_spk_txids(&graph.index, ..), + { + let chain_tip = chain.tip().block_id(); + let task = graph.canonicalization_task(chain_tip, Default::default()); + chain.canonicalize(task) + } + .list_expected_spk_txids(&graph.index, ..), ); let sync_response = client.sync(sync_request, 1).await?; assert!( diff --git a/crates/esplora/tests/blocking_ext.rs b/crates/esplora/tests/blocking_ext.rs index 4d5683e8b..0c33ad166 100644 --- a/crates/esplora/tests/blocking_ext.rs +++ b/crates/esplora/tests/blocking_ext.rs @@ -88,9 +88,12 @@ pub fn detect_receive_tx_cancel() -> anyhow::Result<()> { .chain_tip(chain.tip()) .spks_with_indexes(graph.index.all_spks().clone()) .expected_spk_txids( - graph - .canonical_view(&chain, chain.tip().block_id(), Default::default()) - .list_expected_spk_txids(&graph.index, ..), + { + let chain_tip = chain.tip().block_id(); + let task = graph.canonicalization_task(chain_tip, Default::default()); + chain.canonicalize(task) + } + .list_expected_spk_txids(&graph.index, ..), ); let sync_response = client.sync(sync_request, 1)?; assert!( @@ -117,9 +120,12 @@ pub fn detect_receive_tx_cancel() -> anyhow::Result<()> { .chain_tip(chain.tip()) .spks_with_indexes(graph.index.all_spks().clone()) .expected_spk_txids( - graph - .canonical_view(&chain, chain.tip().block_id(), Default::default()) - .list_expected_spk_txids(&graph.index, ..), + { + let chain_tip = chain.tip().block_id(); + let task = graph.canonicalization_task(chain_tip, Default::default()); + chain.canonicalize(task) + } + .list_expected_spk_txids(&graph.index, ..), ); let sync_response = client.sync(sync_request, 1)?; assert!( diff --git a/examples/example_bitcoind_rpc_polling/src/main.rs b/examples/example_bitcoind_rpc_polling/src/main.rs index 0263c5b0b..61e8b05ea 100644 --- a/examples/example_bitcoind_rpc_polling/src/main.rs +++ b/examples/example_bitcoind_rpc_polling/src/main.rs @@ -144,15 +144,16 @@ fn main() -> anyhow::Result<()> { &rpc_client, chain.tip(), fallback_height, - graph - .canonical_view( - &*chain, - chain.tip().block_id(), - CanonicalizationParams::default(), - ) - .txs() - .filter(|tx| tx.pos.is_unconfirmed()) - .map(|tx| tx.tx), + { + let chain_tip = chain.tip().block_id(); + let task = graph + .graph() + .canonicalization_task(chain_tip, CanonicalizationParams::default()); + chain.canonicalize(task) + } + .txs() + .filter(|tx| tx.pos.is_unconfirmed()) + .map(|tx| tx.tx), ) }; let mut db_stage = ChangeSet::default(); @@ -196,17 +197,19 @@ fn main() -> anyhow::Result<()> { last_print = Instant::now(); let synced_to = chain.tip(); let balance = { - graph - .canonical_view( - &*chain, - synced_to.block_id(), + { + let synced_to_block = synced_to.block_id(); + let task = graph.graph().canonicalization_task( + synced_to_block, CanonicalizationParams::default(), - ) - .balance( - graph.index.outpoints().iter().cloned(), - |(k, _), _| k == &Keychain::Internal, - 1, - ) + ); + chain.canonicalize(task) + } + .balance( + graph.index.outpoints().iter().cloned(), + |(k, _), _| k == &Keychain::Internal, + 0, + ) }; println!( "[{:>10}s] synced to {} @ {} | total: {}", @@ -249,15 +252,16 @@ fn main() -> anyhow::Result<()> { rpc_client.clone(), chain.tip(), fallback_height, - graph - .canonical_view( - &*chain, - chain.tip().block_id(), - CanonicalizationParams::default(), - ) - .txs() - .filter(|tx| tx.pos.is_unconfirmed()) - .map(|tx| tx.tx), + { + let chain_tip = chain.tip().block_id(); + let task = graph + .graph() + .canonicalization_task(chain_tip, CanonicalizationParams::default()); + chain.canonicalize(task) + } + .txs() + .filter(|tx| tx.pos.is_unconfirmed()) + .map(|tx| tx.tx), ) }; @@ -356,17 +360,19 @@ fn main() -> anyhow::Result<()> { last_print = Some(Instant::now()); let synced_to = chain.tip(); let balance = { - graph - .canonical_view( - &*chain, - synced_to.block_id(), + { + let synced_to_block = synced_to.block_id(); + let task = graph.graph().canonicalization_task( + synced_to_block, CanonicalizationParams::default(), - ) - .balance( - graph.index.outpoints().iter().cloned(), - |(k, _), _| k == &Keychain::Internal, - 1, - ) + ); + chain.canonicalize(task) + } + .balance( + graph.index.outpoints().iter().cloned(), + |(k, _), _| k == &Keychain::Internal, + 0, + ) }; println!( "[{:>10}s] synced to {} @ {} / {} | total: {}", diff --git a/examples/example_chain_query/Cargo.toml b/examples/example_chain_query/Cargo.toml new file mode 100644 index 000000000..85d39f66f --- /dev/null +++ b/examples/example_chain_query/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "example_chain_query" +version = "0.1.0" +edition = "2021" +rust-version = "1.84" + +[dependencies] +bdk_chain = { path = "../../crates/chain" } +bdk_core = { path = "../../crates/core" } +bitcoin = { version = "0.32.0" } +bitcoincore-rpc = { version = "0.19.0" } +bip157 = { version = "0.3.2" } +tokio = { version = "1.19", features = ["rt-multi-thread", "macros"] } +anyhow = "1" +tracing = "0.1" +tracing-subscriber = "0.3" + +[[bin]] +name = "kyoto_oracle" +path = "bin/kyoto_oracle.rs" + +[[bin]] +name = "bitcoind_rpc_oracle" +path = "bin/bitcoind_rpc_oracle.rs" \ No newline at end of file diff --git a/examples/example_chain_query/README.md b/examples/example_chain_query/README.md new file mode 100644 index 000000000..b3d2bfacd --- /dev/null +++ b/examples/example_chain_query/README.md @@ -0,0 +1,48 @@ +# ChainQuery Examples + +This directory contains examples demonstrating the use of BDK's `ChainQuery` trait for transaction canonicalization without requiring a full local chain store. + +## Examples + +### bitcoind_rpc_oracle +Uses Bitcoin Core RPC with the `ChainOracle` trait implementation to perform on-demand block verification during canonicalization. + +#### Setup for Signet + +1. Start local signet bitcoind (~8 GB space required): + ```bash + mkdir -p /tmp/signet/bitcoind + bitcoind -signet -server -fallbackfee=0.0002 -blockfilterindex -datadir=/tmp/signet/bitcoind -daemon + tail -f /tmp/signet/bitcoind/signet/debug.log + ``` + Watch debug.log and wait for bitcoind to finish syncing. + +2. Set bitcoind environment variables: + ```bash + export RPC_URL=127.0.0.1:38332 + export RPC_COOKIE=/tmp/signet/bitcoind/signet/.cookie + ``` + +3. Run the example: + ```bash + cargo run --bin bitcoind_rpc_oracle + ``` + +### kyoto_oracle +Uses Kyoto (BIP157/158 compact block filters) with async on-demand block fetching for canonicalization. Connects to Signet network peers. + +To run: +```bash +cargo run --bin kyoto_oracle +``` + +## Key Concepts + +Both examples demonstrate: +- Using `CanonicalizationTask` with the `ChainQuery` trait +- On-demand chain data fetching instead of storing all headers locally +- Processing transaction graphs without a full `LocalChain` + +The main difference is the backend: +- `bitcoind_rpc_oracle`: Synchronous RPC calls to Bitcoin Core +- `kyoto_oracle`: Async P2P network communication using compact block filters \ No newline at end of file diff --git a/examples/example_chain_query/bin/bitcoind_rpc_oracle.rs b/examples/example_chain_query/bin/bitcoind_rpc_oracle.rs new file mode 100644 index 000000000..49d9d5cad --- /dev/null +++ b/examples/example_chain_query/bin/bitcoind_rpc_oracle.rs @@ -0,0 +1,301 @@ +#![allow(clippy::print_stdout, clippy::print_stderr)] +use std::time::Instant; + +use anyhow::Context; +use bdk_chain::bitcoin::{bip158::BlockFilter, secp256k1::Secp256k1, Block, ScriptBuf}; +use bdk_chain::indexer::keychain_txout::KeychainTxOutIndex; +use bdk_chain::miniscript::Descriptor; +use bdk_chain::{ + Anchor, BlockId, CanonicalizationParams, CanonicalizationTask, ChainOracle, ChainQuery, + ConfirmationBlockTime, IndexedTxGraph, SpkIterator, +}; +use bitcoincore_rpc::json::GetBlockHeaderResult; +use bitcoincore_rpc::{Client, RpcApi}; + +// This example shows how to use a CoreOracle that implements ChainOracle trait +// to handle canonicalization with bitcoind RPC, without needing LocalChain. + +const EXTERNAL: &str = "tr([83737d5e/86'/1'/0']tpubDDR5GgtoxS8fJyjjvdahN4VzV5DV6jtbcyvVXhEKq2XtpxjxBXmxH3r8QrNbQqHg4bJM1EGkxi7Pjfkgnui9jQWqS7kxHvX6rhUeriLDKxz/0/*)"; +const INTERNAL: &str = "tr([83737d5e/86'/1'/0']tpubDDR5GgtoxS8fJyjjvdahN4VzV5DV6jtbcyvVXhEKq2XtpxjxBXmxH3r8QrNbQqHg4bJM1EGkxi7Pjfkgnui9jQWqS7kxHvX6rhUeriLDKxz/1/*)"; +const SPK_COUNT: u32 = 25; + +const START_HEIGHT: u32 = 205_000; + +/// Error types for CoreOracle and FilterIterV2 +#[derive(Debug)] +pub enum Error { + /// RPC error + Rpc(bitcoincore_rpc::Error), + /// `bitcoin::bip158` error + Bip158(bdk_chain::bitcoin::bip158::Error), + /// Max reorg depth exceeded + ReorgDepthExceeded, + /// Error converting an integer + TryFromInt(core::num::TryFromIntError), +} + +impl core::fmt::Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Rpc(e) => write!(f, "{e}"), + Self::Bip158(e) => write!(f, "{e}"), + Self::ReorgDepthExceeded => write!(f, "maximum reorg depth exceeded"), + Self::TryFromInt(e) => write!(f, "{e}"), + } + } +} + +impl std::error::Error for Error {} + +impl From for Error { + fn from(e: bitcoincore_rpc::Error) -> Self { + Self::Rpc(e) + } +} + +impl From for Error { + fn from(e: core::num::TryFromIntError) -> Self { + Self::TryFromInt(e) + } +} + +impl From for Error { + fn from(e: bdk_chain::bitcoin::bip158::Error) -> Self { + Self::Bip158(e) + } +} + +/// Whether the RPC error is a "not found" error (code: `-5`) +fn is_not_found(e: &bitcoincore_rpc::Error) -> bool { + matches!( + e, + bitcoincore_rpc::Error::JsonRpc(bitcoincore_rpc::jsonrpc::Error::Rpc(e)) + if e.code == -5 + ) +} + +/// CoreOracle implements ChainOracle using bitcoind RPC +pub struct CoreOracle { + client: Client, +} + +impl CoreOracle { + pub fn new(client: Client) -> Self { + Self { client } + } + + /// Canonicalize a transaction graph using this oracle + pub fn canonicalize( + &self, + mut task: CanonicalizationTask<'_, A>, + chain_tip: BlockId, + ) -> bdk_chain::CanonicalView { + // Process all queries from the task + while let Some(request) = task.next_query() { + // Check each block_id against the chain to find the best one + let mut best_block = None; + + for block_id in &request.block_ids { + // Check if block is in chain + match self.is_block_in_chain(*block_id, chain_tip) { + Ok(Some(true)) => { + best_block = Some(*block_id); + break; // Found a confirmed block + } + _ => continue, // Not confirmed or error, check next + } + } + + task.resolve_query(best_block); + } + + // Finish and return the canonical view + task.finish() + } +} + +impl ChainOracle for CoreOracle { + type Error = Error; + + fn is_block_in_chain( + &self, + block: BlockId, + chain_tip: BlockId, + ) -> Result, Self::Error> { + // Check if the requested block height is within range + if block.height > chain_tip.height { + return Ok(Some(false)); + } + + // Get the block hash at the requested height + match self.client.get_block_hash(block.height as u64) { + Ok(hash_at_height) => Ok(Some(hash_at_height == block.hash)), + Err(e) if is_not_found(&e) => Ok(Some(false)), + Err(_) => Ok(None), // Can't determine, return None + } + } + + fn get_chain_tip(&self) -> Result { + let height = self.client.get_block_count()? as u32; + let hash = self.client.get_block_hash(height as u64)?; + Ok(BlockId { height, hash }) + } +} + +/// FilterIterV2: Similar to FilterIter but doesn't manage CheckPoints +pub struct FilterIterV2<'a> { + client: &'a Client, + spks: Vec, + current_height: u32, + header: Option, +} + +impl<'a> FilterIterV2<'a> { + pub fn new( + client: &'a Client, + start_height: u32, + spks: impl IntoIterator, + ) -> Self { + Self { + client, + spks: spks.into_iter().collect(), + current_height: start_height, + header: None, + } + } + + /// Find the starting point for iteration + fn find_base(&self) -> Result { + let hash = self.client.get_block_hash(self.current_height as u64)?; + Ok(self.client.get_block_header_info(&hash)?) + } +} + +/// Event returned by FilterIterV2 - contains a block that matches the filter +#[derive(Debug, Clone)] +pub struct EventV2 { + pub block: Option, + pub height: u32, +} + +impl Iterator for FilterIterV2<'_> { + type Item = Result; + + fn next(&mut self) -> Option { + let result = (|| -> Result, Error> { + let header = match self.header.take() { + Some(header) => header, + None => self.find_base()?, + }; + + let next_hash = match header.next_block_hash { + Some(hash) => hash, + None => return Ok(None), // Reached chain tip + }; + + let mut next_header = self.client.get_block_header_info(&next_hash)?; + + // Handle reorgs + while next_header.confirmations < 0 { + let prev_hash = next_header + .previous_block_hash + .ok_or(Error::ReorgDepthExceeded)?; + next_header = self.client.get_block_header_info(&prev_hash)?; + } + + let height = next_header.height.try_into()?; + let hash = next_header.hash; + + // Check if block matches our filters + let mut block = None; + let filter = BlockFilter::new(self.client.get_block_filter(&hash)?.filter.as_slice()); + + if filter.match_any(&hash, self.spks.iter().map(ScriptBuf::as_ref))? { + block = Some(self.client.get_block(&hash)?); + } + + // Update state + self.current_height = height; + self.header = Some(next_header); + + Ok(Some(EventV2 { block, height })) + })(); + + result.transpose() + } +} + +fn main() -> anyhow::Result<()> { + // Setup descriptors and graph + let secp = Secp256k1::new(); + let (descriptor, _) = Descriptor::parse_descriptor(&secp, EXTERNAL)?; + let (change_descriptor, _) = Descriptor::parse_descriptor(&secp, INTERNAL)?; + + let mut graph = IndexedTxGraph::>::new({ + let mut index = KeychainTxOutIndex::default(); + index.insert_descriptor("external", descriptor.clone())?; + index.insert_descriptor("internal", change_descriptor.clone())?; + index + }); + + // Configure RPC client + let url = std::env::var("RPC_URL").context("must set RPC_URL")?; + let cookie = std::env::var("RPC_COOKIE").context("must set RPC_COOKIE")?; + let rpc_client = Client::new(&url, bitcoincore_rpc::Auth::CookieFile(cookie.into()))?; + + // Initialize `FilterIter` + let mut spks = vec![]; + for (_, desc) in graph.index.keychains() { + spks.extend(SpkIterator::new_with_range(desc, 0..SPK_COUNT).map(|(_, s)| s)); + } + let iter = FilterIterV2::new(&rpc_client, START_HEIGHT, spks); + + let start = Instant::now(); + + for res in iter { + let event = res?; + + if let Some(block) = event.block { + let _ = graph.apply_block_relevant(&block, event.height); + println!("Matched block {}", event.height); + } + } + + println!("\ntook: {}s", start.elapsed().as_secs()); + + // Create `CoreOracle` + let oracle = CoreOracle::new(rpc_client); + + // Get current chain tip from `CoreOracle` + let chain_tip = oracle.get_chain_tip()?; + println!( + "chain tip: height={}, hash={}", + chain_tip.height, chain_tip.hash + ); + + // Canonicalize TxGraph with `CoreCoracle` + println!("\nPerforming canonicalization using CoreOracle..."); + let task = graph.canonicalization_task(chain_tip, CanonicalizationParams::default()); + let canonical_view = oracle.canonicalize(task, chain_tip); + + // Display unspent outputs + let unspent: Vec<_> = canonical_view + .filter_unspent_outpoints(graph.index.outpoints().clone()) + .collect(); + + if !unspent.is_empty() { + println!("\nUnspent"); + for (index, utxo) in unspent { + // (k, index) | value | outpoint | + println!("{:?} | {} | {}", index, utxo.txout.value, utxo.outpoint); + } + } + + for canon_tx in canonical_view.txs() { + if !canon_tx.pos.is_confirmed() { + eprintln!("ERROR: canonical tx should be confirmed {}", canon_tx.txid); + } + } + + Ok(()) +} diff --git a/examples/example_chain_query/bin/kyoto_oracle.rs b/examples/example_chain_query/bin/kyoto_oracle.rs new file mode 100644 index 000000000..33461f0c9 --- /dev/null +++ b/examples/example_chain_query/bin/kyoto_oracle.rs @@ -0,0 +1,276 @@ +use std::collections::HashSet; +use std::str::FromStr; +use std::time::Instant; + +use anyhow::Context; +use bdk_chain::bitcoin::{secp256k1::Secp256k1, Network}; +use bdk_chain::indexer::keychain_txout::KeychainTxOutIndex; +use bdk_chain::miniscript::Descriptor; +use bdk_chain::{ + Anchor, BlockId, CanonicalizationParams, CanonicalizationTask, ChainQuery, + ConfirmationBlockTime, IndexedTxGraph, SpkIterator, +}; +use bip157::chain::{BlockHeaderChanges, ChainState}; +use bip157::messages::Event; +use bip157::{error::FetchBlockError, Builder, Client, HeaderCheckpoint, Requester}; +use tracing::{debug, error, info, warn}; + +// This example shows how to use Kyoto (BIP157/158) with ChainOracle +// to handle canonicalization without storing all chain data locally. + +const EXTERNAL: &str = "tr([83737d5e/86'/1'/0']tpubDDR5GgtoxS8fJyjjvdahN4VzV5DV6jtbcyvVXhEKq2XtpxjxBXmxH3r8QrNbQqHg4bJM1EGkxi7Pjfkgnui9jQWqS7kxHvX6rhUeriLDKxz/0/*)"; +const INTERNAL: &str = "tr([83737d5e/86'/1'/0']tpubDDR5GgtoxS8fJyjjvdahN4VzV5DV6jtbcyvVXhEKq2XtpxjxBXmxH3r8QrNbQqHg4bJM1EGkxi7Pjfkgnui9jQWqS7kxHvX6rhUeriLDKxz/1/*)"; +const SPK_COUNT: u32 = 25; + +const NETWORK: Network = Network::Signet; +const START_HEIGHT: u32 = 201_000; +const START_HASH: &str = "0000002238d05b522875f9edc4c9f418dd89ccfde7e4c305e8448a87a5dc71b7"; + +/// `KyotoOracle`` uses Kyoto's requester for on-demand chain queries +/// It doesn't implement `ChainOracle` trait since that's synchronous and we need async +pub struct KyotoOracle { + /// Requester to fetch blocks on-demand + requester: Requester, + /// Current chain tip + chain_tip: BlockId, +} + +impl KyotoOracle { + pub fn new(requester: Requester, chain_tip: BlockId) -> Self { + Self { + requester, + chain_tip, + } + } + + /// Get the current chain tip + pub fn get_chain_tip(&self) -> BlockId { + self.chain_tip + } + + /// Canonicalize a transaction graph using async on-demand queries to Kyoto + pub async fn canonicalize( + &self, + mut task: CanonicalizationTask<'_, A>, + ) -> bdk_chain::CanonicalView { + // Process all queries from the task + while let Some(request) = task.next_query() { + // Check each block_id against the chain to find the best one + let mut best_block = None; + + for block_id in &request.block_ids { + // Check if block is in chain by fetching it on-demand + match self.is_block_in_chain(*block_id).await { + Ok(true) => { + best_block = Some(*block_id); + break; // Found a confirmed block + } + Ok(false) => continue, // Not in chain, check next + Err(_) => continue, // Error fetching, skip this one + } + } + + task.resolve_query(best_block); + } + + // Finish and return the canonical view + task.finish() + } + + /// Check if a block is in the chain by fetching it on-demand from Kyoto + async fn is_block_in_chain(&self, block: BlockId) -> Result { + // Check if the requested block height is within range + if block.height > self.chain_tip.height { + return Ok(false); + } + + // Try to fetch the block by its hash + // If it exists and the height matches, it's in the chain + match self.requester.get_block(block.hash).await { + Ok(indexed_block) => { + // Verify the height matches what we expect + Ok(indexed_block.height == block.height) + } + Err(FetchBlockError::UnknownHash) => Ok(false), + Err(e) => Err(e), + } + } +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + // Initialize tracing + tracing_subscriber::fmt::init(); + + // Setup descriptors and graph + let secp = Secp256k1::new(); + let (descriptor, _) = Descriptor::parse_descriptor(&secp, EXTERNAL)?; + let (change_descriptor, _) = Descriptor::parse_descriptor(&secp, INTERNAL)?; + + let mut graph = IndexedTxGraph::>::new({ + let mut index = KeychainTxOutIndex::default(); + index.insert_descriptor("external", descriptor.clone())?; + index.insert_descriptor("internal", change_descriptor.clone())?; + index + }); + + // Collect scripts to watch + let mut spks = HashSet::new(); + for (_, desc) in graph.index.keychains() { + spks.extend(SpkIterator::new_with_range(desc, 0..SPK_COUNT).map(|(_, s)| s)); + } + + // Build Kyoto node with checkpoint + let checkpoint = HeaderCheckpoint::new( + START_HEIGHT, + bitcoin::BlockHash::from_str(START_HASH).context("invalid checkpoint hash")?, + ); + + let builder = Builder::new(NETWORK); + let (node, client) = builder + .chain_state(ChainState::Checkpoint(checkpoint)) + .required_peers(1) + .build(); + + // Run the node in background + tokio::task::spawn(async move { node.run().await }); + + let Client { + requester, + mut info_rx, + mut warn_rx, + mut event_rx, + } = client; + + let start = Instant::now(); + #[allow(unused_assignments)] + let mut chain_tip = BlockId { + height: 0, + hash: bitcoin::constants::genesis_block(bitcoin::Network::Signet).block_hash(), + }; + let mut matched_blocks_count = 0; + + info!("Starting sync with Kyoto..."); + + // Event loop to process filters and apply matching blocks immediately + #[allow(unused_assignments)] + #[allow(clippy::incompatible_msrv)] + loop { + tokio::select! { + info_msg = info_rx.recv() => { + if let Some(info_msg) = info_msg { + info!("Kyoto: {}", info_msg); + } + } + warn_msg = warn_rx.recv() => { + if let Some(warn_msg) = warn_msg { + warn!("Kyoto: {}", warn_msg); + } + } + event = event_rx.recv() => { + if let Some(event) = event { + match event { + Event::IndexedFilter(filter) => { + let height = filter.height(); + if filter.contains_any(spks.iter()) { + let hash = filter.block_hash(); + info!("Matched filter at height {}", height); + match requester.get_block(hash).await { + Ok(indexed_block) => { + // Apply block immediately to the graph + let _ = graph.apply_block_relevant(&indexed_block.block, indexed_block.height); + matched_blocks_count += 1; + debug!("Applied block at height {}", indexed_block.height); + } + Err(e) => { + error!("Failed to fetch block {}: {}", hash, e); + } + } + } + }, + Event::ChainUpdate(changes) => { + match &changes { + BlockHeaderChanges::Connected(header) => { + // Update chain tip on each new header + chain_tip = BlockId { + height: header.height, + hash: header.block_hash(), + }; + if header.height % 1000 == 0 { + info!("Synced to height {}", header.height); + } + } + BlockHeaderChanges::Reorganized { accepted, .. } => { + // On reorg, update to the new tip (last in accepted) + if let Some(header) = accepted.last() { + chain_tip = BlockId { + height: header.height, + hash: header.block_hash(), + }; + warn!("Reorg to height {}", header.height); + } + } + BlockHeaderChanges::ForkAdded(_) => { + // Ignore forks that are not on the main chain + debug!("Fork detected, ignoring"); + } + } + } + Event::FiltersSynced(sync_update) => { + let tip = sync_update.tip(); + chain_tip = BlockId { + height: tip.height, + hash: tip.hash, + }; + info!("Filters synced! Tip: height={}, hash={}", tip.height, tip.hash); + break; + } + _ => (), + } + } + } + } + } + + info!("Sync completed in {}s", start.elapsed().as_secs()); + info!("Found and applied {} matching blocks", matched_blocks_count); + + info!( + "Chain tip: height={}, hash={}", + chain_tip.height, chain_tip.hash + ); + + // Create KyotoOracle with requester for on-demand queries + let oracle = KyotoOracle::new(requester.clone(), chain_tip); + + // Canonicalize TxGraph with KyotoOracle + info!("Performing canonicalization using KyotoOracle..."); + let task = graph.canonicalization_task(chain_tip, CanonicalizationParams::default()); + let canonical_view = oracle.canonicalize(task).await; + + // Display unspent outputs + let unspent: Vec<_> = canonical_view + .filter_unspent_outpoints(graph.index.outpoints().clone()) + .collect(); + + if !unspent.is_empty() { + info!("Found {} unspent outputs:", unspent.len()); + for (index, utxo) in unspent { + info!("{:?} | {} | {}", index, utxo.txout.value, utxo.outpoint); + } + } else { + info!("No unspent outputs found"); + } + + // Verify all canonical transactions are confirmed + for canon_tx in canonical_view.txs() { + if !canon_tx.pos.is_confirmed() { + error!("Canonical tx should be confirmed: {}", canon_tx.txid); + } + } + + let _ = requester.shutdown(); + info!("Shutdown complete"); + + Ok(()) +} diff --git a/examples/example_cli/src/lib.rs b/examples/example_cli/src/lib.rs index f130c1adf..3e165c3b4 100644 --- a/examples/example_cli/src/lib.rs +++ b/examples/example_cli/src/lib.rs @@ -2,6 +2,7 @@ use bdk_chain::keychain_txout::DEFAULT_LOOKAHEAD; use serde_json::json; use std::cmp; use std::collections::HashMap; +use std::convert::Infallible; use std::env; use std::fmt; use std::str::FromStr; @@ -260,18 +261,15 @@ pub struct ChangeInfo { pub index: u32, } -pub fn create_tx( +pub fn create_tx( graph: &mut KeychainTxGraph, - chain: &O, + chain: &LocalChain, assets: &Assets, cs_algorithm: CoinSelectionAlgo, address: Address, value: u64, feerate: f32, -) -> anyhow::Result<(Psbt, Option)> -where - O::Error: std::error::Error + Send + Sync + 'static, -{ +) -> anyhow::Result<(Psbt, Option)> { let mut changeset = keychain_txout::ChangeSet::default(); // get planned utxos @@ -424,15 +422,18 @@ where // Alias the elements of `planned_utxos` pub type PlanUtxo = (Plan, FullTxOut); -pub fn planned_utxos( +pub fn planned_utxos( graph: &KeychainTxGraph, - chain: &O, + chain: &LocalChain, assets: &Assets, -) -> Result, O::Error> { - let chain_tip = chain.get_chain_tip()?; +) -> Result, Infallible> { + let chain_tip = chain.tip().block_id(); let outpoints = graph.index.outpoints(); - graph - .try_canonical_view(chain, chain_tip, CanonicalizationParams::default())? + let task = graph + .graph() + .canonicalization_task(chain_tip, CanonicalizationParams::default()); + chain + .canonicalize(task) .filter_unspent_outpoints(outpoints.iter().cloned()) .filter_map(|((k, i), full_txo)| -> Option> { let desc = graph @@ -524,17 +525,15 @@ pub fn handle_commands( } } - let balance = graph - .try_canonical_view( - chain, - chain.get_chain_tip()?, - CanonicalizationParams::default(), - )? - .balance( - graph.index.outpoints().iter().cloned(), - |(k, _), _| k == &Keychain::Internal, - 1, - ); + let chain_tip = chain.tip().block_id(); + let task = graph + .graph() + .canonicalization_task(chain_tip, CanonicalizationParams::default()); + let balance = chain.canonicalize(task).balance( + graph.index.outpoints().iter().cloned(), + |(k, _), _| k == &Keychain::Internal, + 1, + ); let confirmed_total = balance.confirmed + balance.immature; let unconfirmed_total = balance.untrusted_pending + balance.trusted_pending; @@ -571,8 +570,11 @@ pub fn handle_commands( confirmed, unconfirmed, } => { - let txouts = graph - .try_canonical_view(chain, chain_tip, CanonicalizationParams::default())? + let task = graph + .graph() + .canonicalization_task(chain_tip, CanonicalizationParams::default()); + let txouts = chain + .canonicalize(task) .filter_outpoints(outpoints.iter().cloned()) .filter(|(_, full_txo)| match (spent, unspent) { (true, false) => full_txo.spent_by.is_some(), @@ -631,7 +633,7 @@ pub fn handle_commands( create_tx( &mut graph, - &*chain, + &chain, &assets, coin_select, address, diff --git a/examples/example_electrum/src/main.rs b/examples/example_electrum/src/main.rs index aa89f07e1..ed2895a1a 100644 --- a/examples/example_electrum/src/main.rs +++ b/examples/example_electrum/src/main.rs @@ -213,11 +213,11 @@ fn main() -> anyhow::Result<()> { eprintln!("[ SCANNING {pc:03.0}% ] {item}"); }); - let canonical_view = graph.canonical_view( - &*chain, - chain_tip.block_id(), - CanonicalizationParams::default(), - ); + let chain_tip_block = chain_tip.block_id(); + let task = graph + .graph() + .canonicalization_task(chain_tip_block, CanonicalizationParams::default()); + let canonical_view = chain.canonicalize(task); request = request .expected_spk_txids(canonical_view.list_expected_spk_txids(&graph.index, ..)); diff --git a/examples/example_esplora/src/main.rs b/examples/example_esplora/src/main.rs index 99f72391c..2e74d4f2e 100644 --- a/examples/example_esplora/src/main.rs +++ b/examples/example_esplora/src/main.rs @@ -225,11 +225,11 @@ fn main() -> anyhow::Result<()> { { let graph = graph.lock().unwrap(); let chain = chain.lock().unwrap(); - let canonical_view = graph.canonical_view( - &*chain, - local_tip.block_id(), - CanonicalizationParams::default(), - ); + let local_tip_block = local_tip.block_id(); + let task = graph + .graph() + .canonicalization_task(local_tip_block, CanonicalizationParams::default()); + let canonical_view = chain.canonicalize(task); request = request .expected_spk_txids(canonical_view.list_expected_spk_txids(&graph.index, ..));