From 93716250f0b14299aca1ee347221eb6661656a59 Mon Sep 17 00:00:00 2001 From: Vladimir Fomene Date: Mon, 21 Aug 2023 11:20:38 +0300 Subject: [PATCH 01/15] refactor: Edit ElectrumExt not to use WalletUpdate --- crates/electrum/src/electrum_ext.rs | 45 +++++++++------------ example-crates/example_electrum/src/main.rs | 16 ++++---- example-crates/wallet_electrum/src/main.rs | 16 ++++++-- 3 files changed, 41 insertions(+), 36 deletions(-) diff --git a/crates/electrum/src/electrum_ext.rs b/crates/electrum/src/electrum_ext.rs index c7859bdfe..e81ef1d32 100644 --- a/crates/electrum/src/electrum_ext.rs +++ b/crates/electrum/src/electrum_ext.rs @@ -1,6 +1,5 @@ use bdk_chain::{ bitcoin::{OutPoint, ScriptBuf, Transaction, Txid}, - keychain::WalletUpdate, local_chain::{self, CheckPoint}, tx_graph::{self, TxGraph}, Anchor, BlockId, ConfirmationHeightAnchor, ConfirmationTimeAnchor, @@ -15,7 +14,8 @@ use std::{ /// We assume that a block of this depth and deeper cannot be reorged. const ASSUME_FINAL_DEPTH: u32 = 8; -/// Represents an update fetched from an Electrum server, but excludes full transactions. +/// Represents an update fetched from an Electrum server, but excludes full +/// transactions. /// /// To provide a complete update to [`TxGraph`], you'll need to call [`Self::missing_full_txs`] to /// determine the full transactions missing from [`TxGraph`]. Then call [`Self::finalize`] to fetch @@ -58,7 +58,7 @@ impl ElectrumUpdate { client: &Client, seen_at: Option, missing: Vec, - ) -> Result, Error> { + ) -> Result<(TxGraph, BTreeMap, local_chain::CheckPoint), Error> { let new_txs = client.batch_transaction_get(&missing)?; let mut graph_update = TxGraph::::new(new_txs); for (txid, anchors) in self.graph_update { @@ -69,14 +69,7 @@ impl ElectrumUpdate { let _ = graph_update.insert_anchor(txid, anchor); } } - Ok(WalletUpdate { - last_active_indices: self.keychain_update, - graph: graph_update, - chain: local_chain::Update { - tip: self.new_tip, - introduce_older_blocks: true, - }, - }) + Ok((graph_update, self.keychain_update, self.new_tip)) } } @@ -92,13 +85,19 @@ impl ElectrumUpdate { client: &Client, seen_at: Option, missing: Vec, - ) -> Result, Error> { - let update = self.finalize(client, seen_at, missing)?; + ) -> Result< + ( + TxGraph, + BTreeMap, + local_chain::CheckPoint, + ), + Error, + > { + let (graph, keychain_update, update_tip) = self.finalize(client, seen_at, missing)?; let relevant_heights = { let mut visited_heights = HashSet::new(); - update - .graph + graph .all_anchors() .iter() .map(|(a, _)| a.confirmation_height_upper_bound()) @@ -118,7 +117,7 @@ impl ElectrumUpdate { .collect::>(); let graph_changeset = { - let old_changeset = TxGraph::default().apply_update(update.graph.clone()); + let old_changeset = TxGraph::default().apply_update(graph.clone()); tx_graph::ChangeSet { txs: old_changeset.txs, txouts: old_changeset.txouts, @@ -140,15 +139,10 @@ impl ElectrumUpdate { } }; - Ok(WalletUpdate { - last_active_indices: update.last_active_indices, - graph: { - let mut graph = TxGraph::default(); - graph.apply_changeset(graph_changeset); - graph - }, - chain: update.chain, - }) + let mut update = TxGraph::default(); + update.apply_changeset(graph_changeset); + + Ok((update, keychain_update, update_tip)) } } @@ -457,7 +451,6 @@ fn populate_with_outpoints( }; let anchor = determine_tx_anchor(cps, res.height, res.tx_hash); - let tx_entry = update.graph_update.entry(res.tx_hash).or_default(); if let Some(anchor) = anchor { tx_entry.insert(anchor); diff --git a/example-crates/example_electrum/src/main.rs b/example-crates/example_electrum/src/main.rs index 2a5c1310c..aa62f0da2 100644 --- a/example-crates/example_electrum/src/main.rs +++ b/example-crates/example_electrum/src/main.rs @@ -8,7 +8,7 @@ use bdk_chain::{ bitcoin::{Address, Network, OutPoint, ScriptBuf, Txid}, indexed_tx_graph::{self, IndexedTxGraph}, keychain::WalletChangeSet, - local_chain::LocalChain, + local_chain::{self, LocalChain}, Append, ConfirmationHeightAnchor, }; use bdk_electrum::{ @@ -269,25 +269,27 @@ fn main() -> anyhow::Result<()> { .expect("must get time") .as_secs(); - let final_update = response.finalize(&client, Some(now), missing_txids)?; + let (graph_update, keychain_update, update_tip) = + response.finalize(&client, Some(now), missing_txids)?; let db_changeset = { let mut chain = chain.lock().unwrap(); let mut graph = graph.lock().unwrap(); - let chain = chain.apply_update(final_update.chain)?; + let chain = chain.apply_update(local_chain::Update { + tip: update_tip, + introduce_older_blocks: true, + })?; let index_tx_graph = { let mut changeset = indexed_tx_graph::ChangeSet::::default(); - let (_, indexer) = graph - .index - .reveal_to_target_multi(&final_update.last_active_indices); + let (_, indexer) = graph.index.reveal_to_target_multi(&keychain_update); changeset.append(indexed_tx_graph::ChangeSet { indexer, ..Default::default() }); - changeset.append(graph.apply_update(final_update.graph)); + changeset.append(graph.apply_update(graph_update)); changeset }; diff --git a/example-crates/wallet_electrum/src/main.rs b/example-crates/wallet_electrum/src/main.rs index d53317f8c..b2bdf274f 100644 --- a/example-crates/wallet_electrum/src/main.rs +++ b/example-crates/wallet_electrum/src/main.rs @@ -9,6 +9,7 @@ use std::str::FromStr; use bdk::bitcoin::Address; use bdk::SignOptions; use bdk::{bitcoin::Network, Wallet}; +use bdk_electrum::bdk_chain::{keychain::WalletUpdate, local_chain}; use bdk_electrum::electrum_client::{self, ElectrumApi}; use bdk_electrum::ElectrumExt; use bdk_file_store::Store; @@ -57,9 +58,18 @@ fn main() -> Result<(), Box> { println!(); let missing = electrum_update.missing_full_txs(wallet.as_ref()); - let update = electrum_update.finalize_as_confirmation_time(&client, None, missing)?; - - wallet.apply_update(update)?; + let (graph_update, keychain_update, update_tip) = + electrum_update.finalize_as_confirmation_time(&client, None, missing)?; + + let wallet_update = WalletUpdate { + last_active_indices: keychain_update, + graph: graph_update, + chain: local_chain::Update { + tip: update_tip, + introduce_older_blocks: true, + }, + }; + wallet.apply_update(wallet_update)?; wallet.commit()?; let balance = wallet.get_balance(); From d308a618ff1011d222ee441e058df8fa6c394a5c Mon Sep 17 00:00:00 2001 From: Vladimir Fomene Date: Mon, 21 Aug 2023 11:45:42 +0300 Subject: [PATCH 02/15] refactor: Remove ForEachTxout trait --- crates/chain/src/keychain/txout_index.rs | 14 ++++----- crates/chain/src/spk_txout_index.rs | 39 ++++++++---------------- crates/chain/src/tx_data_traits.rs | 33 -------------------- crates/chain/src/tx_graph.rs | 14 +-------- 4 files changed, 20 insertions(+), 80 deletions(-) diff --git a/crates/chain/src/keychain/txout_index.rs b/crates/chain/src/keychain/txout_index.rs index 9b38a7ade..0376473ff 100644 --- a/crates/chain/src/keychain/txout_index.rs +++ b/crates/chain/src/keychain/txout_index.rs @@ -3,7 +3,7 @@ use crate::{ indexed_tx_graph::Indexer, miniscript::{Descriptor, DescriptorPublicKey}, spk_iter::BIP32_MAX_INDEX, - ForEachTxOut, SpkIterator, SpkTxOutIndex, + SpkIterator, SpkTxOutIndex, }; use alloc::vec::Vec; use bitcoin::{OutPoint, Script, TxOut}; @@ -112,7 +112,7 @@ impl Indexer for KeychainTxOutIndex { } impl KeychainTxOutIndex { - /// Scans an object for relevant outpoints, which are stored and indexed internally. + /// Scans a transaction for relevant outpoints, which are stored and indexed internally. /// /// If the matched script pubkey is part of the lookahead, the last stored index is updated for /// the script pubkey's keychain and the [`super::ChangeSet`] returned will reflect the @@ -124,13 +124,11 @@ impl KeychainTxOutIndex { /// your txouts. /// 2. When getting new data from the chain, you usually scan it before incorporating it into /// your chain state (i.e., `SparseChain`, `ChainGraph`). - /// - /// See [`ForEachTxout`] for the types that support this. - /// - /// [`ForEachTxout`]: crate::ForEachTxOut - pub fn scan(&mut self, txouts: &impl ForEachTxOut) -> super::ChangeSet { + pub fn scan(&mut self, tx: &bitcoin::Transaction) -> super::ChangeSet { let mut changeset = super::ChangeSet::::default(); - txouts.for_each_txout(|(op, txout)| changeset.append(self.scan_txout(op, txout))); + for (op, txout) in tx.output.iter().enumerate() { + changeset.append(self.scan_txout(OutPoint::new(tx.txid(), op as u32), txout)); + } changeset } diff --git a/crates/chain/src/spk_txout_index.rs b/crates/chain/src/spk_txout_index.rs index db749f44c..6a8ae27cc 100644 --- a/crates/chain/src/spk_txout_index.rs +++ b/crates/chain/src/spk_txout_index.rs @@ -3,7 +3,6 @@ use core::ops::RangeBounds; use crate::{ collections::{hash_map::Entry, BTreeMap, BTreeSet, HashMap}, indexed_tx_graph::Indexer, - ForEachTxOut, }; use bitcoin::{self, OutPoint, Script, ScriptBuf, Transaction, TxOut, Txid}; @@ -77,41 +76,23 @@ impl Indexer for SpkTxOutIndex { } } -/// This macro is used instead of a member function of `SpkTxOutIndex`, which would result in a -/// compiler error[E0521]: "borrowed data escapes out of closure" when we attempt to take a -/// reference out of the `ForEachTxOut` closure during scanning. -macro_rules! scan_txout { - ($self:ident, $op:expr, $txout:expr) => {{ - let spk_i = $self.spk_indices.get(&$txout.script_pubkey); - if let Some(spk_i) = spk_i { - $self.txouts.insert($op, (spk_i.clone(), $txout.clone())); - $self.spk_txouts.insert((spk_i.clone(), $op)); - $self.unused.remove(&spk_i); - } - spk_i - }}; -} - impl SpkTxOutIndex { - /// Scans an object containing many txouts. + /// Scans a transaction containing many txouts. /// /// Typically, this is used in two situations: /// /// 1. After loading transaction data from the disk, you may scan over all the txouts to restore all /// your txouts. /// 2. When getting new data from the chain, you usually scan it before incorporating it into your chain state. - /// - /// See [`ForEachTxout`] for the types that support this. - /// - /// [`ForEachTxout`]: crate::ForEachTxOut - pub fn scan(&mut self, txouts: &impl ForEachTxOut) -> BTreeSet { + pub fn scan(&mut self, tx: &bitcoin::Transaction) -> BTreeSet { let mut scanned_indices = BTreeSet::new(); - txouts.for_each_txout(|(op, txout)| { - if let Some(spk_i) = scan_txout!(self, op, txout) { + for (i, txout) in tx.output.iter().enumerate() { + let op = OutPoint::new(tx.txid(), i as u32); + if let Some(spk_i) = self.scan_txout(op, txout) { scanned_indices.insert(spk_i.clone()); } - }); + } scanned_indices } @@ -119,7 +100,13 @@ impl SpkTxOutIndex { /// Scan a single `TxOut` for a matching script pubkey and returns the index that matches the /// script pubkey (if any). pub fn scan_txout(&mut self, op: OutPoint, txout: &TxOut) -> Option<&I> { - scan_txout!(self, op, txout) + let spk_i = self.spk_indices.get(&txout.script_pubkey); + if let Some(spk_i) = spk_i { + self.txouts.insert(op, (spk_i.clone(), txout.clone())); + self.spk_txouts.insert((spk_i.clone(), op)); + self.unused.remove(spk_i); + } + spk_i } /// Get a reference to the set of indexed outpoints. diff --git a/crates/chain/src/tx_data_traits.rs b/crates/chain/src/tx_data_traits.rs index 811b1ff41..d0ed67d75 100644 --- a/crates/chain/src/tx_data_traits.rs +++ b/crates/chain/src/tx_data_traits.rs @@ -2,39 +2,6 @@ use crate::collections::BTreeMap; use crate::collections::BTreeSet; use crate::BlockId; use alloc::vec::Vec; -use bitcoin::{Block, OutPoint, Transaction, TxOut}; - -/// Trait to do something with every txout contained in a structure. -/// -/// We would prefer to just work with things that can give us an `Iterator` -/// here, but rust's type system makes it extremely hard to do this (without trait objects). -pub trait ForEachTxOut { - /// The provided closure `f` will be called with each `outpoint/txout` pair. - fn for_each_txout(&self, f: impl FnMut((OutPoint, &TxOut))); -} - -impl ForEachTxOut for Block { - fn for_each_txout(&self, mut f: impl FnMut((OutPoint, &TxOut))) { - for tx in self.txdata.iter() { - tx.for_each_txout(&mut f) - } - } -} - -impl ForEachTxOut for Transaction { - fn for_each_txout(&self, mut f: impl FnMut((OutPoint, &TxOut))) { - let txid = self.txid(); - for (i, txout) in self.output.iter().enumerate() { - f(( - OutPoint { - txid, - vout: i as u32, - }, - txout, - )) - } - } -} /// Trait that "anchors" blockchain data to a specific block of height and hash. /// diff --git a/crates/chain/src/tx_graph.rs b/crates/chain/src/tx_graph.rs index adb84ca22..a741ddb6d 100644 --- a/crates/chain/src/tx_graph.rs +++ b/crates/chain/src/tx_graph.rs @@ -52,7 +52,7 @@ use crate::{ collections::*, keychain::Balance, local_chain::LocalChain, Anchor, Append, BlockId, - ChainOracle, ChainPosition, ForEachTxOut, FullTxOut, + ChainOracle, ChainPosition, FullTxOut, }; use alloc::vec::Vec; use bitcoin::{OutPoint, Script, Transaction, TxOut, Txid}; @@ -1072,18 +1072,6 @@ impl AsRef> for TxGraph { } } -impl ForEachTxOut for ChangeSet { - fn for_each_txout(&self, f: impl FnMut((OutPoint, &TxOut))) { - self.txouts().for_each(f) - } -} - -impl ForEachTxOut for TxGraph { - fn for_each_txout(&self, f: impl FnMut((OutPoint, &TxOut))) { - self.all_txouts().for_each(f) - } -} - /// An iterator that traverses transaction descendants. /// /// This `struct` is created by the [`walk_descendants`] method of [`TxGraph`]. From f96fca2471e9863558757beece92c8359ba63606 Mon Sep 17 00:00:00 2001 From: Vladimir Fomene Date: Mon, 21 Aug 2023 12:13:58 +0300 Subject: [PATCH 03/15] refactor: Move WalletUpdate to wallet module --- crates/bdk/src/wallet/mod.rs | 31 ++++++++++++++++- crates/chain/src/keychain.rs | 33 +------------------ example-crates/wallet_electrum/src/main.rs | 4 +-- .../wallet_esplora_async/src/main.rs | 3 +- .../wallet_esplora_blocking/src/main.rs | 3 +- 5 files changed, 35 insertions(+), 39 deletions(-) diff --git a/crates/bdk/src/wallet/mod.rs b/crates/bdk/src/wallet/mod.rs index 1ca78a775..7b0ca56a9 100644 --- a/crates/bdk/src/wallet/mod.rs +++ b/crates/bdk/src/wallet/mod.rs @@ -22,7 +22,7 @@ use alloc::{ pub use bdk_chain::keychain::Balance; use bdk_chain::{ indexed_tx_graph, - keychain::{KeychainTxOutIndex, WalletChangeSet, WalletUpdate}, + keychain::{KeychainTxOutIndex, WalletChangeSet}, local_chain::{self, CannotConnectError, CheckPoint, CheckPointIter, LocalChain}, tx_graph::{CanonicalTx, TxGraph}, Append, BlockId, ChainPosition, ConfirmationTime, ConfirmationTimeAnchor, FullTxOut, @@ -94,6 +94,35 @@ pub struct Wallet { secp: SecpCtx, } +/// A structure to update [`KeychainTxOutIndex`], [`TxGraph`] and [`LocalChain`] atomically. +/// +/// [`LocalChain`]: local_chain::LocalChain +#[derive(Debug, Clone)] +pub struct WalletUpdate { + /// Contains the last active derivation indices per keychain (`K`), which is used to update the + /// [`KeychainTxOutIndex`]. + pub last_active_indices: BTreeMap, + + /// Update for the [`TxGraph`]. + pub graph: TxGraph, + + /// Update for the [`LocalChain`]. + /// + /// [`LocalChain`]: local_chain::LocalChain + pub chain: local_chain::Update, +} + +impl WalletUpdate { + /// Construct a [`WalletUpdate`] with a given [`local_chain::Update`]. + pub fn new(chain_update: local_chain::Update) -> Self { + Self { + last_active_indices: BTreeMap::new(), + graph: TxGraph::default(), + chain: chain_update, + } + } +} + /// The update to a [`Wallet`] used in [`Wallet::apply_update`]. This is usually returned from blockchain data sources. pub type Update = WalletUpdate; diff --git a/crates/chain/src/keychain.rs b/crates/chain/src/keychain.rs index 64d68d81e..b1f9c6f92 100644 --- a/crates/chain/src/keychain.rs +++ b/crates/chain/src/keychain.rs @@ -10,9 +10,7 @@ //! //! [`SpkTxOutIndex`]: crate::SpkTxOutIndex -use crate::{ - collections::BTreeMap, indexed_tx_graph, local_chain, tx_graph::TxGraph, Anchor, Append, -}; +use crate::{collections::BTreeMap, indexed_tx_graph, local_chain, Anchor, Append}; #[cfg(feature = "miniscript")] mod txout_index; @@ -82,35 +80,6 @@ impl AsRef> for ChangeSet { } } -/// A structure to update [`KeychainTxOutIndex`], [`TxGraph`] and [`LocalChain`] atomically. -/// -/// [`LocalChain`]: local_chain::LocalChain -#[derive(Debug, Clone)] -pub struct WalletUpdate { - /// Contains the last active derivation indices per keychain (`K`), which is used to update the - /// [`KeychainTxOutIndex`]. - pub last_active_indices: BTreeMap, - - /// Update for the [`TxGraph`]. - pub graph: TxGraph, - - /// Update for the [`LocalChain`]. - /// - /// [`LocalChain`]: local_chain::LocalChain - pub chain: local_chain::Update, -} - -impl WalletUpdate { - /// Construct a [`WalletUpdate`] with a given [`local_chain::Update`]. - pub fn new(chain_update: local_chain::Update) -> Self { - Self { - last_active_indices: BTreeMap::new(), - graph: TxGraph::default(), - chain: chain_update, - } - } -} - /// A structure that records the corresponding changes as result of applying an [`WalletUpdate`]. #[derive(Debug, Clone, PartialEq)] #[cfg_attr( diff --git a/example-crates/wallet_electrum/src/main.rs b/example-crates/wallet_electrum/src/main.rs index b2bdf274f..5bd4e9ef3 100644 --- a/example-crates/wallet_electrum/src/main.rs +++ b/example-crates/wallet_electrum/src/main.rs @@ -8,8 +8,8 @@ use std::str::FromStr; use bdk::bitcoin::Address; use bdk::SignOptions; -use bdk::{bitcoin::Network, Wallet}; -use bdk_electrum::bdk_chain::{keychain::WalletUpdate, local_chain}; +use bdk::{bitcoin::Network, wallet::WalletUpdate, Wallet}; +use bdk_electrum::bdk_chain::local_chain; use bdk_electrum::electrum_client::{self, ElectrumApi}; use bdk_electrum::ElectrumExt; use bdk_file_store::Store; diff --git a/example-crates/wallet_esplora_async/src/main.rs b/example-crates/wallet_esplora_async/src/main.rs index 144e1edf5..f32341c47 100644 --- a/example-crates/wallet_esplora_async/src/main.rs +++ b/example-crates/wallet_esplora_async/src/main.rs @@ -2,8 +2,7 @@ use std::{io::Write, str::FromStr}; use bdk::{ bitcoin::{Address, Network}, - chain::keychain::WalletUpdate, - wallet::AddressIndex, + wallet::{AddressIndex, WalletUpdate}, SignOptions, Wallet, }; use bdk_esplora::{esplora_client, EsploraAsyncExt}; diff --git a/example-crates/wallet_esplora_blocking/src/main.rs b/example-crates/wallet_esplora_blocking/src/main.rs index 02d060430..a4dc1890d 100644 --- a/example-crates/wallet_esplora_blocking/src/main.rs +++ b/example-crates/wallet_esplora_blocking/src/main.rs @@ -7,8 +7,7 @@ use std::{io::Write, str::FromStr}; use bdk::{ bitcoin::{Address, Network}, - chain::keychain::WalletUpdate, - wallet::AddressIndex, + wallet::{AddressIndex, WalletUpdate}, SignOptions, Wallet, }; use bdk_esplora::{esplora_client, EsploraExt}; From 642456285fe66a9fd7d8ca9348492fb5f9efe0a2 Mon Sep 17 00:00:00 2001 From: Vladimir Fomene Date: Mon, 21 Aug 2023 15:18:16 +0300 Subject: [PATCH 04/15] refactor: move WalletChangeset to wallet module --- crates/bdk/src/wallet/mod.rs | 60 ++++++++++++++++++- crates/chain/src/keychain.rs | 65 +-------------------- crates/chain/src/lib.rs | 8 +++ example-crates/example_electrum/src/main.rs | 16 ++--- 4 files changed, 75 insertions(+), 74 deletions(-) diff --git a/crates/bdk/src/wallet/mod.rs b/crates/bdk/src/wallet/mod.rs index 7b0ca56a9..c74357495 100644 --- a/crates/bdk/src/wallet/mod.rs +++ b/crates/bdk/src/wallet/mod.rs @@ -22,10 +22,10 @@ use alloc::{ pub use bdk_chain::keychain::Balance; use bdk_chain::{ indexed_tx_graph, - keychain::{KeychainTxOutIndex, WalletChangeSet}, + keychain::{self, KeychainTxOutIndex}, local_chain::{self, CannotConnectError, CheckPoint, CheckPointIter, LocalChain}, tx_graph::{CanonicalTx, TxGraph}, - Append, BlockId, ChainPosition, ConfirmationTime, ConfirmationTimeAnchor, FullTxOut, + Anchor, Append, BlockId, ChainPosition, ConfirmationTime, ConfirmationTimeAnchor, FullTxOut, IndexedTxGraph, Persist, PersistBackend, }; use bitcoin::consensus::encode::serialize; @@ -123,6 +123,62 @@ impl WalletUpdate { } } +/// A structure that records the corresponding changes as result of applying an [`WalletUpdate`]. +#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)] +pub struct WalletChangeSet { + /// Changes to the [`LocalChain`]. + /// + /// [`LocalChain`]: local_chain::LocalChain + pub chain: local_chain::ChangeSet, + + /// ChangeSet to [`IndexedTxGraph`]. + /// + /// [`IndexedTxGraph`]: bdk_chain::indexed_tx_graph::IndexedTxGraph + #[serde(bound( + deserialize = "K: Ord + serde::Deserialize<'de>, A: Ord + serde::Deserialize<'de>", + serialize = "K: Ord + serde::Serialize, A: Ord + serde::Serialize", + ))] + pub index_tx_graph: indexed_tx_graph::ChangeSet>, +} + +impl Default for WalletChangeSet { + fn default() -> Self { + Self { + chain: Default::default(), + index_tx_graph: Default::default(), + } + } +} + +impl Append for WalletChangeSet { + fn append(&mut self, other: Self) { + Append::append(&mut self.chain, other.chain); + Append::append(&mut self.index_tx_graph, other.index_tx_graph); + } + + fn is_empty(&self) -> bool { + self.chain.is_empty() && self.index_tx_graph.is_empty() + } +} + +impl From for WalletChangeSet { + fn from(chain: local_chain::ChangeSet) -> Self { + Self { + chain, + ..Default::default() + } + } +} + +impl From>> for WalletChangeSet { + fn from(index_tx_graph: indexed_tx_graph::ChangeSet>) -> Self { + Self { + index_tx_graph, + ..Default::default() + } + } +} + /// The update to a [`Wallet`] used in [`Wallet::apply_update`]. This is usually returned from blockchain data sources. pub type Update = WalletUpdate; diff --git a/crates/chain/src/keychain.rs b/crates/chain/src/keychain.rs index b1f9c6f92..63972a0ad 100644 --- a/crates/chain/src/keychain.rs +++ b/crates/chain/src/keychain.rs @@ -10,7 +10,7 @@ //! //! [`SpkTxOutIndex`]: crate::SpkTxOutIndex -use crate::{collections::BTreeMap, indexed_tx_graph, local_chain, Anchor, Append}; +use crate::{collections::BTreeMap, Append}; #[cfg(feature = "miniscript")] mod txout_index; @@ -80,69 +80,6 @@ impl AsRef> for ChangeSet { } } -/// A structure that records the corresponding changes as result of applying an [`WalletUpdate`]. -#[derive(Debug, Clone, PartialEq)] -#[cfg_attr( - feature = "serde", - derive(serde::Deserialize, serde::Serialize), - serde( - crate = "serde_crate", - bound( - deserialize = "K: Ord + serde::Deserialize<'de>, A: Ord + serde::Deserialize<'de>", - serialize = "K: Ord + serde::Serialize, A: Ord + serde::Serialize", - ) - ) -)] -pub struct WalletChangeSet { - /// Changes to the [`LocalChain`]. - /// - /// [`LocalChain`]: local_chain::LocalChain - pub chain: local_chain::ChangeSet, - - /// ChangeSet to [`IndexedTxGraph`]. - /// - /// [`IndexedTxGraph`]: crate::indexed_tx_graph::IndexedTxGraph - pub index_tx_graph: indexed_tx_graph::ChangeSet>, -} - -impl Default for WalletChangeSet { - fn default() -> Self { - Self { - chain: Default::default(), - index_tx_graph: Default::default(), - } - } -} - -impl Append for WalletChangeSet { - fn append(&mut self, other: Self) { - Append::append(&mut self.chain, other.chain); - Append::append(&mut self.index_tx_graph, other.index_tx_graph); - } - - fn is_empty(&self) -> bool { - self.chain.is_empty() && self.index_tx_graph.is_empty() - } -} - -impl From for WalletChangeSet { - fn from(chain: local_chain::ChangeSet) -> Self { - Self { - chain, - ..Default::default() - } - } -} - -impl From>> for WalletChangeSet { - fn from(index_tx_graph: indexed_tx_graph::ChangeSet>) -> Self { - Self { - index_tx_graph, - ..Default::default() - } - } -} - /// Balance, differentiated into various categories. #[derive(Debug, PartialEq, Eq, Clone, Default)] #[cfg_attr( diff --git a/crates/chain/src/lib.rs b/crates/chain/src/lib.rs index ed167ebf6..f38b7ee53 100644 --- a/crates/chain/src/lib.rs +++ b/crates/chain/src/lib.rs @@ -100,3 +100,11 @@ pub mod collections { /// How many confirmations are needed f or a coinbase output to be spent. pub const COINBASE_MATURITY: u32 = 100; + +impl From> + for (local_chain::ChangeSet, indexed_tx_graph::ChangeSet) +{ + fn from(indexed_changeset: indexed_tx_graph::ChangeSet) -> Self { + (local_chain::ChangeSet::default(), indexed_changeset) + } +} diff --git a/example-crates/example_electrum/src/main.rs b/example-crates/example_electrum/src/main.rs index aa62f0da2..c2b0a0f10 100644 --- a/example-crates/example_electrum/src/main.rs +++ b/example-crates/example_electrum/src/main.rs @@ -7,7 +7,7 @@ use std::{ use bdk_chain::{ bitcoin::{Address, Network, OutPoint, ScriptBuf, Txid}, indexed_tx_graph::{self, IndexedTxGraph}, - keychain::WalletChangeSet, + keychain, local_chain::{self, LocalChain}, Append, ConfirmationHeightAnchor, }; @@ -60,7 +60,10 @@ pub struct ScanOptions { pub batch_size: usize, } -type ChangeSet = WalletChangeSet; +type ChangeSet = ( + local_chain::ChangeSet, + indexed_tx_graph::ChangeSet>, +); fn main() -> anyhow::Result<()> { let (args, keymap, index, db, init_changeset) = @@ -68,11 +71,11 @@ fn main() -> anyhow::Result<()> { let graph = Mutex::new({ let mut graph = IndexedTxGraph::new(index); - graph.apply_changeset(init_changeset.index_tx_graph); + graph.apply_changeset(init_changeset.1); graph }); - let chain = Mutex::new(LocalChain::from_changeset(init_changeset.chain)); + let chain = Mutex::new(LocalChain::from_changeset(init_changeset.0)); let electrum_url = match args.network { Network::Bitcoin => "ssl://electrum.blockstream.info:50002", @@ -293,10 +296,7 @@ fn main() -> anyhow::Result<()> { changeset }; - ChangeSet { - index_tx_graph, - chain, - } + (chain, index_tx_graph) }; let mut db = db.lock().unwrap(); From e78fa2acc0c63304b3fa8d8458ca0b7bd8d0c3d5 Mon Sep 17 00:00:00 2001 From: Vladimir Fomene Date: Thu, 24 Aug 2023 16:03:47 +0300 Subject: [PATCH 05/15] refactor: Allow for no chain update --- crates/bdk/src/wallet/mod.rs | 10 +++++++--- example-crates/wallet_electrum/src/main.rs | 4 ++-- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/crates/bdk/src/wallet/mod.rs b/crates/bdk/src/wallet/mod.rs index c74357495..d82648861 100644 --- a/crates/bdk/src/wallet/mod.rs +++ b/crates/bdk/src/wallet/mod.rs @@ -109,7 +109,7 @@ pub struct WalletUpdate { /// Update for the [`LocalChain`]. /// /// [`LocalChain`]: local_chain::LocalChain - pub chain: local_chain::Update, + pub chain: Option, } impl WalletUpdate { @@ -118,7 +118,7 @@ impl WalletUpdate { Self { last_active_indices: BTreeMap::new(), graph: TxGraph::default(), - chain: chain_update, + chain: Some(chain_update), } } } @@ -1804,7 +1804,11 @@ impl Wallet { where D: PersistBackend, { - let mut changeset = ChangeSet::from(self.chain.apply_update(update.chain)?); + let mut changeset = match update.chain { + Some(chain_update) => ChangeSet::from(self.chain.apply_update(chain_update)?), + None => ChangeSet::default(), + }; + let (_, index_changeset) = self .indexed_graph .index diff --git a/example-crates/wallet_electrum/src/main.rs b/example-crates/wallet_electrum/src/main.rs index 5bd4e9ef3..baaac0b7e 100644 --- a/example-crates/wallet_electrum/src/main.rs +++ b/example-crates/wallet_electrum/src/main.rs @@ -64,10 +64,10 @@ fn main() -> Result<(), Box> { let wallet_update = WalletUpdate { last_active_indices: keychain_update, graph: graph_update, - chain: local_chain::Update { + chain: Some(local_chain::Update { tip: update_tip, introduce_older_blocks: true, - }, + }), }; wallet.apply_update(wallet_update)?; wallet.commit()?; From aea1522952147a1909e9f0b8ec8705883778f1dd Mon Sep 17 00:00:00 2001 From: Vladimir Fomene Date: Fri, 25 Aug 2023 12:49:29 +0300 Subject: [PATCH 06/15] refactor: Implement Default for WalletUpdate --- crates/bdk/src/wallet/mod.rs | 11 +++++------ example-crates/wallet_esplora_async/src/main.rs | 2 +- example-crates/wallet_esplora_blocking/src/main.rs | 2 +- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/crates/bdk/src/wallet/mod.rs b/crates/bdk/src/wallet/mod.rs index d82648861..ac3c808ee 100644 --- a/crates/bdk/src/wallet/mod.rs +++ b/crates/bdk/src/wallet/mod.rs @@ -94,9 +94,9 @@ pub struct Wallet { secp: SecpCtx, } -/// A structure to update [`KeychainTxOutIndex`], [`TxGraph`] and [`LocalChain`] atomically. +/// A structure to update [`Wallet`]. /// -/// [`LocalChain`]: local_chain::LocalChain +/// It updates [`bdk_chain::keychain::KeychainTxOutIndex`], [`bdk_chain::TxGraph`] and [`local_chain::LocalChain`] atomically. #[derive(Debug, Clone)] pub struct WalletUpdate { /// Contains the last active derivation indices per keychain (`K`), which is used to update the @@ -112,13 +112,12 @@ pub struct WalletUpdate { pub chain: Option, } -impl WalletUpdate { - /// Construct a [`WalletUpdate`] with a given [`local_chain::Update`]. - pub fn new(chain_update: local_chain::Update) -> Self { +impl Default for WalletUpdate { + fn default() -> Self { Self { last_active_indices: BTreeMap::new(), graph: TxGraph::default(), - chain: Some(chain_update), + chain: None, } } } diff --git a/example-crates/wallet_esplora_async/src/main.rs b/example-crates/wallet_esplora_async/src/main.rs index f32341c47..435bea87e 100644 --- a/example-crates/wallet_esplora_async/src/main.rs +++ b/example-crates/wallet_esplora_async/src/main.rs @@ -61,7 +61,7 @@ async fn main() -> Result<(), Box> { let update = WalletUpdate { last_active_indices, graph: update_graph, - ..WalletUpdate::new(chain_update) + chain: Some(chain_update), }; wallet.apply_update(update)?; wallet.commit()?; diff --git a/example-crates/wallet_esplora_blocking/src/main.rs b/example-crates/wallet_esplora_blocking/src/main.rs index a4dc1890d..2e5a850e3 100644 --- a/example-crates/wallet_esplora_blocking/src/main.rs +++ b/example-crates/wallet_esplora_blocking/src/main.rs @@ -60,7 +60,7 @@ fn main() -> Result<(), Box> { let update = WalletUpdate { last_active_indices, graph: update_graph, - ..WalletUpdate::new(chain_update) + chain: Some(chain_update), }; wallet.apply_update(update)?; From 05a24a625ea12b22d3488f0ce8ae5c5229153801 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Sat, 26 Aug 2023 20:29:46 +0800 Subject: [PATCH 07/15] feat(electrum)!: change signature of `ElectrumExt` We remove `ElectrumUpdate` and return tuples instead for `ElectrumExt` methods. We introduce the `IncompleteTxGraph` structure to specifically hodl the incomplete `TxGraph`. This change is motivated by @LLFourn's comment: https://github.com/bitcoindevkit/bdk/pull/1084/commits/794bf37e637d3266b75827678f015e14f827d318#r1305432603 --- crates/electrum/src/electrum_ext.rs | 144 ++++++++++---------- crates/electrum/src/lib.rs | 12 +- example-crates/example_electrum/src/main.rs | 22 ++- example-crates/wallet_electrum/src/main.rs | 18 ++- 4 files changed, 93 insertions(+), 103 deletions(-) diff --git a/crates/electrum/src/electrum_ext.rs b/crates/electrum/src/electrum_ext.rs index e81ef1d32..b74358627 100644 --- a/crates/electrum/src/electrum_ext.rs +++ b/crates/electrum/src/electrum_ext.rs @@ -14,86 +14,63 @@ use std::{ /// We assume that a block of this depth and deeper cannot be reorged. const ASSUME_FINAL_DEPTH: u32 = 8; -/// Represents an update fetched from an Electrum server, but excludes full -/// transactions. +/// Represents a [`TxGraph`] update fetched from an Electrum server, but excludes full transactions. /// /// To provide a complete update to [`TxGraph`], you'll need to call [`Self::missing_full_txs`] to -/// determine the full transactions missing from [`TxGraph`]. Then call [`Self::finalize`] to fetch -/// the full transactions from Electrum and finalize the update. -#[derive(Debug, Clone)] -pub struct ElectrumUpdate { - /// Map of [`Txid`]s to associated [`Anchor`]s. - pub graph_update: HashMap>, - /// The latest chain tip, as seen by the Electrum server. - pub new_tip: local_chain::CheckPoint, - /// Last-used index update for [`KeychainTxOutIndex`](bdk_chain::keychain::KeychainTxOutIndex). - pub keychain_update: BTreeMap, -} - -impl ElectrumUpdate { - fn new(new_tip: local_chain::CheckPoint) -> Self { - Self { - new_tip, - graph_update: HashMap::new(), - keychain_update: BTreeMap::new(), - } - } +/// determine the full transactions missing from [`TxGraph`]. Then call [`Self::finalize`] to +/// fetch the full transactions from Electrum and finalize the update. +#[derive(Debug, Default, Clone)] +pub struct IncompleteTxGraph(HashMap>); +impl IncompleteTxGraph { /// Determine the full transactions that are missing from `graph`. /// - /// Refer to [`ElectrumUpdate`]. + /// Refer to [`IncompleteTxGraph`] for more. pub fn missing_full_txs(&self, graph: &TxGraph) -> Vec { - self.graph_update + self.0 .keys() .filter(move |&&txid| graph.as_ref().get_tx(txid).is_none()) .cloned() .collect() } - /// Finalizes update with `missing` txids to fetch from `client`. + /// Finalizes the [`TxGraph`] update by fetching `missing` txids from the `client`. /// - /// Refer to [`ElectrumUpdate`]. + /// Refer to [`IncompleteTxGraph`] for more. pub fn finalize( self, client: &Client, seen_at: Option, missing: Vec, - ) -> Result<(TxGraph, BTreeMap, local_chain::CheckPoint), Error> { + ) -> Result, Error> { let new_txs = client.batch_transaction_get(&missing)?; - let mut graph_update = TxGraph::::new(new_txs); - for (txid, anchors) in self.graph_update { + let mut graph = TxGraph::::new(new_txs); + for (txid, anchors) in self.0 { if let Some(seen_at) = seen_at { - let _ = graph_update.insert_seen_at(txid, seen_at); + let _ = graph.insert_seen_at(txid, seen_at); } for anchor in anchors { - let _ = graph_update.insert_anchor(txid, anchor); + let _ = graph.insert_anchor(txid, anchor); } } - Ok((graph_update, self.keychain_update, self.new_tip)) + Ok(graph) } } -impl ElectrumUpdate { - /// Finalizes the [`ElectrumUpdate`] with `new_txs` and anchors of type +impl IncompleteTxGraph { + /// Finalizes the [`IncompleteTxGraph`] with `new_txs` and anchors of type /// [`ConfirmationTimeAnchor`]. /// /// **Note:** The confirmation time might not be precisely correct if there has been a reorg. /// Electrum's API intends that we use the merkle proof API, we should change `bdk_electrum` to /// use it. - pub fn finalize_as_confirmation_time( + pub fn finalize_with_confirmation_time( self, client: &Client, seen_at: Option, missing: Vec, - ) -> Result< - ( - TxGraph, - BTreeMap, - local_chain::CheckPoint, - ), - Error, - > { - let (graph, keychain_update, update_tip) = self.finalize(client, seen_at, missing)?; + ) -> Result, Error> { + let graph = self.finalize(client, seen_at, missing)?; let relevant_heights = { let mut visited_heights = HashSet::new(); @@ -117,7 +94,7 @@ impl ElectrumUpdate { .collect::>(); let graph_changeset = { - let old_changeset = TxGraph::default().apply_update(graph.clone()); + let old_changeset = TxGraph::default().apply_update(graph); tx_graph::ChangeSet { txs: old_changeset.txs, txouts: old_changeset.txouts, @@ -139,16 +116,16 @@ impl ElectrumUpdate { } }; - let mut update = TxGraph::default(); - update.apply_changeset(graph_changeset); - - Ok((update, keychain_update, update_tip)) + let mut new_graph = TxGraph::default(); + new_graph.apply_changeset(graph_changeset); + Ok(new_graph) } } /// Trait to extend [`Client`] functionality. pub trait ElectrumExt { - /// Scan the blockchain (via electrum) for the data specified and returns a [`ElectrumUpdate`]. + /// Scan the blockchain (via electrum) for the data specified and returns updates for + /// [`bdk_chain`] data structures. /// /// - `prev_tip`: the most recent blockchain tip present locally /// - `keychain_spks`: keychains that we want to scan transactions for @@ -159,6 +136,7 @@ pub trait ElectrumExt { /// The scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated /// transactions. `batch_size` specifies the max number of script pubkeys to request for in a /// single batch request. + #[allow(clippy::type_complexity)] fn scan( &self, prev_tip: Option, @@ -167,7 +145,7 @@ pub trait ElectrumExt { outpoints: impl IntoIterator, stop_gap: usize, batch_size: usize, - ) -> Result, Error>; + ) -> Result<(local_chain::Update, IncompleteTxGraph, BTreeMap), Error>; /// Convenience method to call [`scan`] without requiring a keychain. /// @@ -179,20 +157,22 @@ pub trait ElectrumExt { txids: impl IntoIterator, outpoints: impl IntoIterator, batch_size: usize, - ) -> Result, Error> { + ) -> Result<(local_chain::Update, IncompleteTxGraph), Error> { let spk_iter = misc_spks .into_iter() .enumerate() .map(|(i, spk)| (i as u32, spk)); - self.scan( + let (chain, graph, _) = self.scan( prev_tip, [((), spk_iter)].into(), txids, outpoints, usize::MAX, batch_size, - ) + )?; + + Ok((chain, graph)) } } @@ -205,7 +185,14 @@ impl ElectrumExt for Client { outpoints: impl IntoIterator, stop_gap: usize, batch_size: usize, - ) -> Result, Error> { + ) -> Result< + ( + local_chain::Update, + IncompleteTxGraph, + BTreeMap, + ), + Error, + > { let mut request_spks = keychain_spks .into_iter() .map(|(k, s)| (k, s.into_iter())) @@ -217,9 +204,8 @@ impl ElectrumExt for Client { let update = loop { let (tip, _) = construct_update_tip(self, prev_tip.clone())?; - let mut update = ElectrumUpdate::::new(tip.clone()); - let cps = update - .new_tip + let mut graph_update = IncompleteTxGraph::::default(); + let cps = tip .iter() .take(10) .map(|cp| (cp.height(), cp)) @@ -230,7 +216,7 @@ impl ElectrumExt for Client { scanned_spks.append(&mut populate_with_spks( self, &cps, - &mut update, + &mut graph_update, &mut scanned_spks .iter() .map(|(i, (spk, _))| (i.clone(), spk.clone())), @@ -243,7 +229,7 @@ impl ElectrumExt for Client { populate_with_spks( self, &cps, - &mut update, + &mut graph_update, keychain_spks, stop_gap, batch_size, @@ -254,10 +240,14 @@ impl ElectrumExt for Client { } } - populate_with_txids(self, &cps, &mut update, &mut txids.iter().cloned())?; + populate_with_txids(self, &cps, &mut graph_update, &mut txids.iter().cloned())?; - let _txs = - populate_with_outpoints(self, &cps, &mut update, &mut outpoints.iter().cloned())?; + let _txs = populate_with_outpoints( + self, + &cps, + &mut graph_update, + &mut outpoints.iter().cloned(), + )?; // check for reorgs during scan process let server_blockhash = self.block_header(tip.height() as usize)?.block_hash(); @@ -265,7 +255,12 @@ impl ElectrumExt for Client { continue; // reorg } - update.keychain_update = request_spks + let chain_update = local_chain::Update { + tip, + introduce_older_blocks: true, + }; + + let keychain_update = request_spks .into_keys() .filter_map(|k| { scanned_spks @@ -275,7 +270,8 @@ impl ElectrumExt for Client { .map(|((_, i), _)| (k, *i)) }) .collect::>(); - break update; + + break (chain_update, graph_update, keychain_update); }; Ok(update) @@ -399,10 +395,10 @@ fn determine_tx_anchor( } } -fn populate_with_outpoints( +fn populate_with_outpoints( client: &Client, cps: &BTreeMap, - update: &mut ElectrumUpdate, + graph_update: &mut IncompleteTxGraph, outpoints: &mut impl Iterator, ) -> Result, Error> { let mut full_txs = HashMap::new(); @@ -451,7 +447,7 @@ fn populate_with_outpoints( }; let anchor = determine_tx_anchor(cps, res.height, res.tx_hash); - let tx_entry = update.graph_update.entry(res.tx_hash).or_default(); + let tx_entry = graph_update.0.entry(res.tx_hash).or_default(); if let Some(anchor) = anchor { tx_entry.insert(anchor); } @@ -460,10 +456,10 @@ fn populate_with_outpoints( Ok(full_txs) } -fn populate_with_txids( +fn populate_with_txids( client: &Client, cps: &BTreeMap, - update: &mut ElectrumUpdate, + graph_update: &mut IncompleteTxGraph, txids: &mut impl Iterator, ) -> Result<(), Error> { for txid in txids { @@ -488,7 +484,7 @@ fn populate_with_txids( None => continue, }; - let tx_entry = update.graph_update.entry(txid).or_default(); + let tx_entry = graph_update.0.entry(txid).or_default(); if let Some(anchor) = anchor { tx_entry.insert(anchor); } @@ -496,10 +492,10 @@ fn populate_with_txids( Ok(()) } -fn populate_with_spks( +fn populate_with_spks( client: &Client, cps: &BTreeMap, - update: &mut ElectrumUpdate, + graph_update: &mut IncompleteTxGraph, spks: &mut impl Iterator, stop_gap: usize, batch_size: usize, @@ -532,7 +528,7 @@ fn populate_with_spks( } for tx in spk_history { - let tx_entry = update.graph_update.entry(tx.tx_hash).or_default(); + let tx_entry = graph_update.0.entry(tx.tx_hash).or_default(); if let Some(anchor) = determine_tx_anchor(cps, tx.height, tx.tx_hash) { tx_entry.insert(anchor); } diff --git a/crates/electrum/src/lib.rs b/crates/electrum/src/lib.rs index 716c4d3f7..097726268 100644 --- a/crates/electrum/src/lib.rs +++ b/crates/electrum/src/lib.rs @@ -1,14 +1,16 @@ //! This crate is used for updating structures of the [`bdk_chain`] crate with data from electrum. //! //! The star of the show is the [`ElectrumExt::scan`] method, which scans for relevant blockchain -//! data (via electrum) and outputs an [`ElectrumUpdate`]. +//! data (via electrum) and outputs updates for [`bdk_chain`] structures as a tuple of form: //! -//! An [`ElectrumUpdate`] only includes `txid`s and no full transactions. The caller is responsible -//! for obtaining full transactions before applying. This can be done with +//! ([`bdk_chain::local_chain::Update`], [`IncompleteTxGraph`], `keychain_update`) +//! +//! An [`IncompleteTxGraph`] only includes `txid`s and no full transactions. The caller is +//! responsible for obtaining full transactions before applying. This can be done with //! these steps: //! //! 1. Determine which full transactions are missing. The method [`missing_full_txs`] of -//! [`ElectrumUpdate`] can be used. +//! [`IncompleteTxGraph`] can be used. //! //! 2. Obtaining the full transactions. To do this via electrum, the method //! [`batch_transaction_get`] can be used. @@ -16,7 +18,7 @@ //! Refer to [`bdk_electrum_example`] for a complete example. //! //! [`ElectrumClient::scan`]: electrum_client::ElectrumClient::scan -//! [`missing_full_txs`]: ElectrumUpdate::missing_full_txs +//! [`missing_full_txs`]: IncompleteTxGraph::missing_full_txs //! [`batch_transaction_get`]: electrum_client::ElectrumApi::batch_transaction_get //! [`bdk_electrum_example`]: https://github.com/LLFourn/bdk_core_staging/tree/master/bdk_electrum_example diff --git a/example-crates/example_electrum/src/main.rs b/example-crates/example_electrum/src/main.rs index c2b0a0f10..e8f1730ba 100644 --- a/example-crates/example_electrum/src/main.rs +++ b/example-crates/example_electrum/src/main.rs @@ -13,7 +13,7 @@ use bdk_chain::{ }; use bdk_electrum::{ electrum_client::{self, ElectrumApi}, - ElectrumExt, ElectrumUpdate, + ElectrumExt, }; use example_cli::{ anyhow::{self, Context}, @@ -251,20 +251,18 @@ fn main() -> anyhow::Result<()> { // drop lock on graph and chain drop((graph, chain)); - let update = client + let (chain_update, graph_update) = client .scan_without_keychain(tip, spks, txids, outpoints, scan_options.batch_size) .context("scanning the blockchain")?; - ElectrumUpdate { - graph_update: update.graph_update, - new_tip: update.new_tip, - keychain_update: BTreeMap::new(), - } + (chain_update, graph_update, BTreeMap::new()) } }; + let (chain_update, incomplete_graph_update, keychain_update) = response; + let missing_txids = { let graph = &*graph.lock().unwrap(); - response.missing_full_txs(graph.graph()) + incomplete_graph_update.missing_full_txs(graph.graph()) }; let now = std::time::UNIX_EPOCH @@ -272,17 +270,13 @@ fn main() -> anyhow::Result<()> { .expect("must get time") .as_secs(); - let (graph_update, keychain_update, update_tip) = - response.finalize(&client, Some(now), missing_txids)?; + let graph_update = incomplete_graph_update.finalize(&client, Some(now), missing_txids)?; let db_changeset = { let mut chain = chain.lock().unwrap(); let mut graph = graph.lock().unwrap(); - let chain = chain.apply_update(local_chain::Update { - tip: update_tip, - introduce_older_blocks: true, - })?; + let chain = chain.apply_update(chain_update)?; let index_tx_graph = { let mut changeset = diff --git a/example-crates/wallet_electrum/src/main.rs b/example-crates/wallet_electrum/src/main.rs index baaac0b7e..186d5906c 100644 --- a/example-crates/wallet_electrum/src/main.rs +++ b/example-crates/wallet_electrum/src/main.rs @@ -7,9 +7,9 @@ use std::io::Write; use std::str::FromStr; use bdk::bitcoin::Address; +use bdk::wallet::WalletUpdate; use bdk::SignOptions; -use bdk::{bitcoin::Network, wallet::WalletUpdate, Wallet}; -use bdk_electrum::bdk_chain::local_chain; +use bdk::{bitcoin::Network, Wallet}; use bdk_electrum::electrum_client::{self, ElectrumApi}; use bdk_electrum::ElectrumExt; use bdk_file_store::Store; @@ -53,21 +53,19 @@ fn main() -> Result<(), Box> { }) .collect(); - let electrum_update = client.scan(prev_tip, keychain_spks, None, None, STOP_GAP, BATCH_SIZE)?; + let (chain_update, incomplete_graph_update, keychain_update) = + client.scan(prev_tip, keychain_spks, None, None, STOP_GAP, BATCH_SIZE)?; println!(); - let missing = electrum_update.missing_full_txs(wallet.as_ref()); - let (graph_update, keychain_update, update_tip) = - electrum_update.finalize_as_confirmation_time(&client, None, missing)?; + let missing = incomplete_graph_update.missing_full_txs(wallet.as_ref()); + let graph_update = + incomplete_graph_update.finalize_with_confirmation_time(&client, None, missing)?; let wallet_update = WalletUpdate { last_active_indices: keychain_update, graph: graph_update, - chain: Some(local_chain::Update { - tip: update_tip, - introduce_older_blocks: true, - }), + chain: Some(chain_update), }; wallet.apply_update(wallet_update)?; wallet.commit()?; From 28b0d8f35755710e0f381ff5f2fcb42808eaa714 Mon Sep 17 00:00:00 2001 From: Vladimir Fomene Date: Fri, 25 Aug 2023 12:52:09 +0300 Subject: [PATCH 08/15] refactor: Remove `scan` and `scan_txout` from SpkTxoutIndex and KeychainTxoutIndex --- crates/chain/src/indexed_tx_graph.rs | 13 +++- crates/chain/src/keychain/txout_index.rs | 47 ++++--------- crates/chain/src/spk_txout_index.rs | 66 +++++++------------ .../chain/tests/test_keychain_txout_index.rs | 7 +- crates/chain/tests/test_spk_txout_index.rs | 6 +- 5 files changed, 55 insertions(+), 84 deletions(-) diff --git a/crates/chain/src/indexed_tx_graph.rs b/crates/chain/src/indexed_tx_graph.rs index 6dc2e9943..4643a93ec 100644 --- a/crates/chain/src/indexed_tx_graph.rs +++ b/crates/chain/src/indexed_tx_graph.rs @@ -233,7 +233,18 @@ pub trait Indexer { /// Scan and index the given `outpoint` and `txout`. fn index_txout(&mut self, outpoint: OutPoint, txout: &TxOut) -> Self::ChangeSet; - /// Scan and index the given transaction. + /// Scans a transaction for relevant outpoints, which are stored and indexed internally. + /// + /// If the matched script pubkey is part of the lookahead, the last stored index is updated for + /// the script pubkey's keychain and the [`ChangeSet`] returned will reflect the + /// change. + /// + /// Typically, this method is used in two situations: + /// + /// 1. After loading transaction data from the disk, you may scan over all the txouts to restore all + /// your txouts. + /// 2. When getting new data from the chain, you usually scan it before incorporating it into + /// your chain state. fn index_tx(&mut self, tx: &Transaction) -> Self::ChangeSet; /// Apply changeset to itself. diff --git a/crates/chain/src/keychain/txout_index.rs b/crates/chain/src/keychain/txout_index.rs index 0376473ff..5996d4d46 100644 --- a/crates/chain/src/keychain/txout_index.rs +++ b/crates/chain/src/keychain/txout_index.rs @@ -91,11 +91,19 @@ impl Indexer for KeychainTxOutIndex { type ChangeSet = super::ChangeSet; fn index_txout(&mut self, outpoint: OutPoint, txout: &TxOut) -> Self::ChangeSet { - self.scan_txout(outpoint, txout) + let mut changeset = super::ChangeSet::::default(); + for (keychain, index) in self.inner.index_txout(outpoint, txout) { + changeset.append(self.reveal_to_target(&keychain, index).1); + } + changeset } fn index_tx(&mut self, tx: &bitcoin::Transaction) -> Self::ChangeSet { - self.scan(tx) + let mut changeset = super::ChangeSet::::default(); + for (op, txout) in tx.output.iter().enumerate() { + changeset.append(self.index_txout(OutPoint::new(tx.txid(), op as u32), txout)); + } + changeset } fn initial_changeset(&self) -> Self::ChangeSet { @@ -112,36 +120,6 @@ impl Indexer for KeychainTxOutIndex { } impl KeychainTxOutIndex { - /// Scans a transaction for relevant outpoints, which are stored and indexed internally. - /// - /// If the matched script pubkey is part of the lookahead, the last stored index is updated for - /// the script pubkey's keychain and the [`super::ChangeSet`] returned will reflect the - /// change. - /// - /// Typically, this method is used in two situations: - /// - /// 1. After loading transaction data from the disk, you may scan over all the txouts to restore all - /// your txouts. - /// 2. When getting new data from the chain, you usually scan it before incorporating it into - /// your chain state (i.e., `SparseChain`, `ChainGraph`). - pub fn scan(&mut self, tx: &bitcoin::Transaction) -> super::ChangeSet { - let mut changeset = super::ChangeSet::::default(); - for (op, txout) in tx.output.iter().enumerate() { - changeset.append(self.scan_txout(OutPoint::new(tx.txid(), op as u32), txout)); - } - changeset - } - - /// Scan a single outpoint for a matching script pubkey. - /// - /// If it matches, this will store and index it. - pub fn scan_txout(&mut self, op: OutPoint, txout: &TxOut) -> super::ChangeSet { - match self.inner.scan_txout(op, txout).cloned() { - Some((keychain, index)) => self.reveal_to_target(&keychain, index).1, - None => super::ChangeSet::default(), - } - } - /// Return a reference to the internal [`SpkTxOutIndex`]. pub fn inner(&self) -> &SpkTxOutIndex<(K, u32)> { &self.inner @@ -198,14 +176,11 @@ impl KeychainTxOutIndex { /// Set the lookahead count for `keychain`. /// /// The lookahead is the number of scripts to cache ahead of the last stored script index. This - /// is useful during a scan via [`scan`] or [`scan_txout`]. + /// is useful during a scan via [`Indexer::index_tx`] or [`Indexer::index_txout`]. /// /// # Panics /// /// This will panic if the `keychain` does not exist. - /// - /// [`scan`]: Self::scan - /// [`scan_txout`]: Self::scan_txout pub fn set_lookahead(&mut self, keychain: &K, lookahead: u32) { self.lookahead.insert(keychain.clone(), lookahead); self.replenish_lookahead(keychain); diff --git a/crates/chain/src/spk_txout_index.rs b/crates/chain/src/spk_txout_index.rs index 6a8ae27cc..6c69daf31 100644 --- a/crates/chain/src/spk_txout_index.rs +++ b/crates/chain/src/spk_txout_index.rs @@ -9,8 +9,9 @@ use bitcoin::{self, OutPoint, Script, ScriptBuf, Transaction, TxOut, Txid}; /// An index storing [`TxOut`]s that have a script pubkey that matches those in a list. /// /// The basic idea is that you insert script pubkeys you care about into the index with -/// [`insert_spk`] and then when you call [`scan`], the index will look at any txouts you pass in and -/// store and index any txouts matching one of its script pubkeys. +/// [`insert_spk`] and then when you call [`Indexer::index_tx`] or [`Indexer::index_txout`], the +/// index will look at any txouts you pass in and store and index any txouts matching one of its +/// script pubkeys. /// /// Each script pubkey is associated with an application-defined index script index `I`, which must be /// [`Ord`]. Usually, this is used to associate the derivation index of the script pubkey or even a @@ -24,7 +25,6 @@ use bitcoin::{self, OutPoint, Script, ScriptBuf, Transaction, TxOut, Txid}; /// [`TxOut`]: bitcoin::TxOut /// [`insert_spk`]: Self::insert_spk /// [`Ord`]: core::cmp::Ord -/// [`scan`]: Self::scan /// [`TxGraph`]: crate::tx_graph::TxGraph #[derive(Clone, Debug)] pub struct SpkTxOutIndex { @@ -53,19 +53,35 @@ impl Default for SpkTxOutIndex { } impl Indexer for SpkTxOutIndex { - type ChangeSet = (); + type ChangeSet = BTreeSet; fn index_txout(&mut self, outpoint: OutPoint, txout: &TxOut) -> Self::ChangeSet { - self.scan_txout(outpoint, txout); - Default::default() + let spk_i = self.spk_indices.get(&txout.script_pubkey); + let mut scanned_indices = BTreeSet::new(); + if let Some(spk_i) = spk_i { + self.txouts.insert(outpoint, (spk_i.clone(), txout.clone())); + self.spk_txouts.insert((spk_i.clone(), outpoint)); + self.unused.remove(spk_i); + scanned_indices.insert(spk_i.clone()); + } + scanned_indices } fn index_tx(&mut self, tx: &Transaction) -> Self::ChangeSet { - self.scan(tx); - Default::default() + let mut scanned_indices = BTreeSet::new(); + + for (i, txout) in tx.output.iter().enumerate() { + let op = OutPoint::new(tx.txid(), i as u32); + let mut txout_indices = self.index_txout(op, txout); + scanned_indices.append(&mut txout_indices); + } + + scanned_indices } - fn initial_changeset(&self) -> Self::ChangeSet {} + fn initial_changeset(&self) -> Self::ChangeSet { + self.spks.keys().cloned().collect() + } fn apply_changeset(&mut self, _changeset: Self::ChangeSet) { // This applies nothing. @@ -77,38 +93,6 @@ impl Indexer for SpkTxOutIndex { } impl SpkTxOutIndex { - /// Scans a transaction containing many txouts. - /// - /// Typically, this is used in two situations: - /// - /// 1. After loading transaction data from the disk, you may scan over all the txouts to restore all - /// your txouts. - /// 2. When getting new data from the chain, you usually scan it before incorporating it into your chain state. - pub fn scan(&mut self, tx: &bitcoin::Transaction) -> BTreeSet { - let mut scanned_indices = BTreeSet::new(); - - for (i, txout) in tx.output.iter().enumerate() { - let op = OutPoint::new(tx.txid(), i as u32); - if let Some(spk_i) = self.scan_txout(op, txout) { - scanned_indices.insert(spk_i.clone()); - } - } - - scanned_indices - } - - /// Scan a single `TxOut` for a matching script pubkey and returns the index that matches the - /// script pubkey (if any). - pub fn scan_txout(&mut self, op: OutPoint, txout: &TxOut) -> Option<&I> { - let spk_i = self.spk_indices.get(&txout.script_pubkey); - if let Some(spk_i) = spk_i { - self.txouts.insert(op, (spk_i.clone(), txout.clone())); - self.spk_txouts.insert((spk_i.clone(), op)); - self.unused.remove(spk_i); - } - spk_i - } - /// Get a reference to the set of indexed outpoints. pub fn outpoints(&self) -> &BTreeSet<(I, OutPoint)> { &self.spk_txouts diff --git a/crates/chain/tests/test_keychain_txout_index.rs b/crates/chain/tests/test_keychain_txout_index.rs index 96a1afd1a..f3886ab6f 100644 --- a/crates/chain/tests/test_keychain_txout_index.rs +++ b/crates/chain/tests/test_keychain_txout_index.rs @@ -4,6 +4,7 @@ mod common; use bdk_chain::{ collections::BTreeMap, + indexed_tx_graph::Indexer, keychain::{self, KeychainTxOutIndex}, Append, }; @@ -194,7 +195,7 @@ fn test_lookahead() { ], ..common::new_tx(external_index) }; - assert_eq!(txout_index.scan(&tx), keychain::ChangeSet::default()); + assert_eq!(txout_index.index_tx(&tx), keychain::ChangeSet::default()); assert_eq!( txout_index.last_revealed_index(&TestKeychain::External), Some(last_external_index) @@ -248,7 +249,7 @@ fn test_scan_with_lookahead() { value: 0, }; - let changeset = txout_index.scan_txout(op, &txout); + let changeset = txout_index.index_txout(op, &txout); assert_eq!( changeset.as_inner(), &[(TestKeychain::External, spk_i)].into() @@ -273,7 +274,7 @@ fn test_scan_with_lookahead() { script_pubkey: spk_41, value: 0, }; - let changeset = txout_index.scan_txout(op, &txout); + let changeset = txout_index.index_txout(op, &txout); assert!(changeset.is_empty()); } diff --git a/crates/chain/tests/test_spk_txout_index.rs b/crates/chain/tests/test_spk_txout_index.rs index 099b4ca88..e8b752146 100644 --- a/crates/chain/tests/test_spk_txout_index.rs +++ b/crates/chain/tests/test_spk_txout_index.rs @@ -1,4 +1,4 @@ -use bdk_chain::SpkTxOutIndex; +use bdk_chain::{indexed_tx_graph::Indexer, SpkTxOutIndex}; use bitcoin::{absolute, OutPoint, ScriptBuf, Transaction, TxIn, TxOut}; #[test] @@ -22,7 +22,7 @@ fn spk_txout_sent_and_received() { assert_eq!(index.sent_and_received(&tx1), (0, 42_000)); assert_eq!(index.net_value(&tx1), 42_000); - index.scan(&tx1); + index.index_tx(&tx1); assert_eq!( index.sent_and_received(&tx1), (0, 42_000), @@ -82,7 +82,7 @@ fn mark_used() { }], }; - spk_index.scan(&tx1); + spk_index.index_tx(&tx1); spk_index.unmark_used(&1); assert!( spk_index.is_used(&1), From d747a630e6151e3346a6bbdf7ac1d1e2c2139aa3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Mon, 28 Aug 2023 14:35:52 +0800 Subject: [PATCH 09/15] fix: cargo MSRV issues with `minreq` Co-authored-by: Steve Myers --- .github/workflows/cont_integration.yml | 1 + README.md | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/.github/workflows/cont_integration.yml b/.github/workflows/cont_integration.yml index f02dcb834..2ec2e0a3a 100644 --- a/.github/workflows/cont_integration.yml +++ b/.github/workflows/cont_integration.yml @@ -32,6 +32,7 @@ jobs: run: | cargo update -p log --precise "0.4.18" cargo update -p tempfile --precise "3.6.0" + cargo update -p minreq --precise "2.8.1" cargo update -p rustls:0.21.6 --precise "0.21.1" cargo update -p tokio:1.32.0 --precise "1.29.1" cargo update -p flate2:1.0.27 --precise "1.0.26" diff --git a/README.md b/README.md index ae230abbd..a0af8dd7f 100644 --- a/README.md +++ b/README.md @@ -69,6 +69,8 @@ To build with the MSRV you will need to pin dependencies as follows: cargo update -p log --precise "0.4.18" # tempfile 3.7.0 has MSRV 1.63.0+ cargo update -p tempfile --precise "3.6.0" +# minreq 2.9.0 prevents pinning rustls to 0.21.1 +cargo update -p minreq --precise "2.8.1" # rustls 0.21.2 has MSRV 1.60.0+ cargo update -p rustls:0.21.6 --precise "0.21.1" # tokio 1.30 has MSRV 1.63.0+ @@ -79,7 +81,7 @@ cargo update -p flate2:1.0.27 --precise "1.0.26" cargo update -p reqwest --precise "0.11.18" # h2 0.3.21 has MSRV 1.63.0+ cargo update -p h2 --precise "0.3.20" -# rustls-webpki has MSRV 1.60.0+ +# rustls-webpki 0.100.2 has MSRV 1.60.0+ cargo update -p rustls-webpki --precise "0.100.1" ``` From c93ed0104d663973cf22d69858cee6645d3176ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Thu, 20 Jul 2023 09:28:38 +0800 Subject: [PATCH 10/15] feat(bitcoind_rpc): introduce `bitcoind_rpc` chain-src crate Also introduce `bitcoind_rpc` cli example. Add tests: * `test_sync_local_chain` ensures that `Emitter::emit_block` emits blocks in order, even after reorg. * `test_into_tx_graph` ensures that `into_tx_graph` behaves appropriately for both mempool and block updates. It should also filter txs and map anchors correctly. --- Cargo.toml | 2 + crates/bitcoind_rpc/Cargo.toml | 14 + crates/bitcoind_rpc/src/lib.rs | 476 ++++++++++++++++++++++ crates/bitcoind_rpc/tests/test_emitter.rs | 332 +++++++++++++++ example-crates/example_rpc/Cargo.toml | 12 + example-crates/example_rpc/src/main.rs | 312 ++++++++++++++ 6 files changed, 1148 insertions(+) create mode 100644 crates/bitcoind_rpc/Cargo.toml create mode 100644 crates/bitcoind_rpc/src/lib.rs create mode 100644 crates/bitcoind_rpc/tests/test_emitter.rs create mode 100644 example-crates/example_rpc/Cargo.toml create mode 100644 example-crates/example_rpc/src/main.rs diff --git a/Cargo.toml b/Cargo.toml index 9fafb8b78..b235da3e7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,8 +5,10 @@ members = [ "crates/file_store", "crates/electrum", "crates/esplora", + "crates/bitcoind_rpc", "example-crates/example_cli", "example-crates/example_electrum", + "example-crates/example_rpc", "example-crates/wallet_electrum", "example-crates/wallet_esplora_blocking", "example-crates/wallet_esplora_async", diff --git a/crates/bitcoind_rpc/Cargo.toml b/crates/bitcoind_rpc/Cargo.toml new file mode 100644 index 000000000..2faf16eb1 --- /dev/null +++ b/crates/bitcoind_rpc/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "bdk_bitcoind_rpc" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +bdk_chain = { path = "../chain", version = "0.5.0", features = ["serde", "miniscript"] } +bitcoincore-rpc = { version = "0.17.0" } + +[dev-dependencies] +bitcoind = { version = "0.32.0", features = ["25_0"] } +anyhow = { version = "1" } diff --git a/crates/bitcoind_rpc/src/lib.rs b/crates/bitcoind_rpc/src/lib.rs new file mode 100644 index 000000000..2817cb7df --- /dev/null +++ b/crates/bitcoind_rpc/src/lib.rs @@ -0,0 +1,476 @@ +//! This crate is used for updating [`bdk_chain`] structures with data from the `bitcoind` RPC +//! interface. +//! +//! The main structure is [`Emitter`], which sources blockchain data from +//! [`bitcoincore_rpc::Client`]. +//! +//! To only get block updates (exlude mempool transactions), the caller can use +//! [`Emitter::emit_block`] until it returns `Ok(None)` (which means the chain tip is reached). A +//! separate method, [`Emitter::emit_mempool`] can be used to emit the whole mempool. Another +//! method, [`Emitter::emit_update`] is avaliable, which emits block updates until the block tip is +//! reached, then the next update will be the mempool. +//! +//! # [`IntoIterator`] implementation +//! +//! [`Emitter`] implements [`IntoIterator`] which transforms itself into [`UpdateIter`]. The +//! iterator is implemented in a way that even after a call to [`Iterator::next`] returns [`None`], +//! subsequent calls may resume returning [`Some`]. +//! +//! The iterator initially returns blocks in increasing height order. After the chain tip is +//! reached, the next update is the mempool. After the mempool update is released, the first +//! succeeding call to [`Iterator::next`] will return [`None`]. +//! +//! This logic is useful if the caller wishes to "update once". +//! +//! ```rust,no_run +//! use bdk_bitcoind_rpc::{EmittedUpdate, Emitter}; +//! # let client: bdk_bitcoind_rpc::bitcoincore_rpc::Client = todo!(); +//! +//! for r in Emitter::new(&client, 709_632, None) { +//! let update = r.expect("todo: deal with the error properly"); +//! +//! if update.is_block() { +//! let cp = update.checkpoint(); +//! println!("block {}:{}", cp.height(), cp.hash()); +//! } else { +//! println!("mempool!"); +//! } +//! } +//! ``` +//! +//! Alternatively, if the caller wishes to keep [`Emitter`] in a dedicated update-thread, the caller +//! can continue to poll [`Iterator::next`] with a delay. + +#![warn(missing_docs)] + +use bdk_chain::{ + bitcoin::{Block, Transaction}, + indexed_tx_graph::Indexer, + local_chain::CheckPoint, + Append, BlockId, ConfirmationHeightAnchor, ConfirmationTimeAnchor, TxGraph, +}; +pub use bitcoincore_rpc; +use bitcoincore_rpc::{json::GetBlockResult, RpcApi}; +use std::fmt::Debug; + +/// An update emitted from [`Emitter`]. This can either be of a block or a subset of +/// mempool transactions. +#[derive(Debug, Clone)] +pub enum EmittedUpdate { + /// An emitted block. + Block(EmittedBlock), + /// An emitted subset of mempool transactions. + /// + /// [`Emitter`] attempts to avoid re-emitting transactions. + Mempool(EmittedMempool), +} + +impl EmittedUpdate { + /// Returns whether the update is of a subset of the mempool. + pub fn is_mempool(&self) -> bool { + matches!(self, Self::Mempool { .. }) + } + + /// Returns whether the update is of a block. + pub fn is_block(&self) -> bool { + matches!(self, Self::Block { .. }) + } + + /// Get the emission's checkpoint. + pub fn checkpoint(&self) -> CheckPoint { + match self { + EmittedUpdate::Block(e) => e.checkpoint(), + EmittedUpdate::Mempool(e) => e.checkpoint(), + } + } + + /// Transforms the emitted update into a [`TxGraph`] update. + /// + /// The `tx_filter` parameter takes in a closure that filters out irrelevant transactions so + /// they do not get included in the [`TxGraph`] update. We have provided two closures; + /// [`empty_filter`] and [`indexer_filter`] for this purpose. + /// + /// The `anchor_map` parameter takes in a closure that creates anchors of a specific type. + /// [`confirmation_height_anchor`] and [`confirmation_time_anchor`] are avaliable to create + /// updates with [`ConfirmationHeightAnchor`] and [`ConfirmationTimeAnchor`] respectively. + pub fn into_tx_graph_update(self, tx_filter: F, anchor_map: M) -> TxGraph + where + F: FnMut(&Transaction) -> bool, + M: Fn(&CheckPoint, &Block, usize) -> A, + A: Clone + Ord + PartialOrd, + { + match self { + EmittedUpdate::Block(e) => e.into_tx_graph_update(tx_filter, anchor_map), + EmittedUpdate::Mempool(e) => e.into_tx_graph_update(tx_filter), + } + } +} + +/// An emitted block. +#[derive(Debug, Clone)] +pub struct EmittedBlock { + /// The checkpoint constructed from the block's height/hash and connected to the previous block. + pub cp: CheckPoint, + /// The actual block of the chain. + pub block: Block, +} + +impl EmittedBlock { + /// Get the emission's checkpoint. + pub fn checkpoint(&self) -> CheckPoint { + self.cp.clone() + } + + /// Transforms the emitted update into a [`TxGraph`] update. + /// + /// The `tx_filter` parameter takes in a closure that filters out irrelevant transactions so + /// they do not get included in the [`TxGraph`] update. We have provided two closures; + /// [`empty_filter`] and [`indexer_filter`] for this purpose. + /// + /// The `anchor_map` parameter takes in a closure that creates anchors of a specific type. + /// [`confirmation_height_anchor`] and [`confirmation_time_anchor`] are avaliable to create + /// updates with [`ConfirmationHeightAnchor`] and [`ConfirmationTimeAnchor`] respectively. + pub fn into_tx_graph_update(self, mut tx_filter: F, anchor_map: M) -> TxGraph + where + F: FnMut(&Transaction) -> bool, + M: Fn(&CheckPoint, &Block, usize) -> A, + A: Clone + Ord + PartialOrd, + { + let mut tx_graph = TxGraph::default(); + let tx_iter = self + .block + .txdata + .iter() + .enumerate() + .filter(move |(_, tx)| tx_filter(tx)); + for (tx_pos, tx) in tx_iter { + let txid = tx.txid(); + let _ = tx_graph.insert_anchor(txid, anchor_map(&self.cp, &self.block, tx_pos)); + let _ = tx_graph.insert_tx(tx.clone()); + } + tx_graph + } +} + +/// An emitted subset of mempool transactions. +#[derive(Debug, Clone)] +pub struct EmittedMempool { + /// The checkpoint of the last-seen tip. + pub cp: CheckPoint, + /// Subset of mempool transactions. + pub txs: Vec<(Transaction, u64)>, +} + +impl EmittedMempool { + /// Get the emission's checkpoint. + pub fn checkpoint(&self) -> CheckPoint { + self.cp.clone() + } + + /// Transforms the emitted mempool into a [`TxGraph`] update. + /// + /// The `tx_filter` parameter takes in a closure that filters out irrelevant transactions so + /// they do not get included in the [`TxGraph`] update. We have provided two closures; + /// [`empty_filter`] and [`indexer_filter`] for this purpose. + pub fn into_tx_graph_update(self, mut tx_filter: F) -> TxGraph + where + F: FnMut(&Transaction) -> bool, + A: Clone + Ord + PartialOrd, + { + let mut tx_graph = TxGraph::default(); + let tx_iter = self.txs.into_iter().filter(move |(tx, _)| tx_filter(tx)); + for (tx, seen_at) in tx_iter { + let _ = tx_graph.insert_seen_at(tx.txid(), seen_at); + let _ = tx_graph.insert_tx(tx); + } + tx_graph + } +} + +/// Creates a closure that filters transactions based on an [`Indexer`] implementation. +pub fn indexer_filter<'i, I: Indexer>( + indexer: &'i mut I, + changeset: &'i mut I::ChangeSet, +) -> impl FnMut(&Transaction) -> bool + 'i +where + I::ChangeSet: bdk_chain::Append, +{ + |tx| { + changeset.append(indexer.index_tx(tx)); + indexer.is_tx_relevant(tx) + } +} + +/// Returns an empty filter-closure. +pub fn empty_filter() -> impl FnMut(&Transaction) -> bool { + |_| true +} + +/// A closure that transforms a [`EmittedUpdate`] into a [`ConfirmationHeightAnchor`]. +/// +/// This is to be used as an input to [`EmittedUpdate::into_tx_graph_update`]. +pub fn confirmation_height_anchor( + cp: &CheckPoint, + _block: &Block, + _tx_pos: usize, +) -> ConfirmationHeightAnchor { + let anchor_block = cp.block_id(); + ConfirmationHeightAnchor { + anchor_block, + confirmation_height: anchor_block.height, + } +} + +/// A closure that transforms a [`EmittedUpdate`] into a [`ConfirmationTimeAnchor`]. +/// +/// This is to be used as an input to [`EmittedUpdate::into_tx_graph_update`]. +pub fn confirmation_time_anchor( + cp: &CheckPoint, + block: &Block, + _tx_pos: usize, +) -> ConfirmationTimeAnchor { + let anchor_block = cp.block_id(); + ConfirmationTimeAnchor { + anchor_block, + confirmation_height: anchor_block.height, + confirmation_time: block.header.time as _, + } +} + +/// A structure that emits updates for [`bdk_chain`] structures, sourcing blockchain data from +/// [`bitcoincore_rpc::Client`]. +/// +/// Refer to [module-level documentation] for more. +/// +/// [module-level documentation]: crate +pub struct Emitter<'c, C> { + client: &'c C, + fallback_height: u32, + + last_cp: Option, + last_info: Option, +} + +impl<'c, C: RpcApi> IntoIterator for Emitter<'c, C> { + type Item = as Iterator>::Item; + type IntoIter = UpdateIter<'c, C>; + + fn into_iter(self) -> Self::IntoIter { + UpdateIter { + emitter: self, + last_emission_was_mempool: false, + } + } +} + +impl<'c, C: RpcApi> Emitter<'c, C> { + /// Constructs a new [`Emitter`] with the provided [`bitcoincore_rpc::Client`]. + /// + /// * `fallback_height` is the block height to start from if `last_cp` is not provided, or a + /// point of agreement is not found. + /// * `last_cp` is the last known checkpoint to build updates on (if any). + pub fn new(client: &'c C, fallback_height: u32, last_cp: Option) -> Self { + Self { + client, + fallback_height, + last_cp, + last_info: None, + } + } + + /// Emits the whole mempool contents. + pub fn emit_mempool(&self) -> Result { + let txs = self + .client + .get_raw_mempool()? + .into_iter() + .map( + |txid| -> Result<(Transaction, u64), bitcoincore_rpc::Error> { + let first_seen = self + .client + .get_mempool_entry(&txid) + .map(|entry| entry.time)?; + let tx = self.client.get_raw_transaction(&txid, None)?; + Ok((tx, first_seen)) + }, + ) + .collect::, _>>()?; + let cp = match &self.last_cp { + Some(cp) => cp.clone(), + None => { + let hash = self.client.get_best_block_hash()?; + let height = self.client.get_block_info(&hash)?.height as u32; + CheckPoint::new(BlockId { height, hash }) + } + }; + Ok(EmittedMempool { cp, txs }) + } + + /// Emits the next block (if any). + pub fn emit_block(&mut self) -> Result, bitcoincore_rpc::Error> { + enum PollResponse { + /// A new block that is in chain is found. Congratulations! + Block { + cp: CheckPoint, + info: GetBlockResult, + }, + /// This either signals that we have reached the tip, or that the blocks ahead are not + /// in the best chain. In either case, we need to find the agreement point again. + NoMoreBlocks, + /// We have exhausted the local checkpoint history and there is no agreement point. We + /// should emit from the fallback height for the next round. + AgreementPointNotFound, + /// We have found an agreement point! Do not emit this one, emit the one higher. + AgreementPointFound { + cp: CheckPoint, + info: GetBlockResult, + }, + } + + fn poll(emitter: &mut Emitter) -> Result + where + C: RpcApi, + { + let client = emitter.client; + + match (&mut emitter.last_cp, &mut emitter.last_info) { + (None, None) => { + let info = client + .get_block_info(&client.get_block_hash(emitter.fallback_height as _)?)?; + let cp = CheckPoint::new(BlockId { + height: info.height as _, + hash: info.hash, + }); + Ok(PollResponse::Block { cp, info }) + } + (Some(last_cp), None) => { + for cp in last_cp.iter() { + let cp_block = cp.block_id(); + let info = client.get_block_info(&cp_block.hash)?; + if info.confirmations < 0 { + // block is not in the main chain + continue; + } + // agreement point found + return Ok(PollResponse::AgreementPointFound { cp, info }); + } + // no agreement point found + Ok(PollResponse::AgreementPointNotFound) + } + (Some(last_cp), Some(last_info)) => { + let next_hash = match last_info.nextblockhash { + None => return Ok(PollResponse::NoMoreBlocks), + Some(next_hash) => next_hash, + }; + let info = client.get_block_info(&next_hash)?; + if info.confirmations < 0 { + return Ok(PollResponse::NoMoreBlocks); + } + let cp = last_cp + .clone() + .push(BlockId { + height: info.height as _, + hash: info.hash, + }) + .expect("must extend from checkpoint"); + Ok(PollResponse::Block { cp, info }) + } + (None, Some(last_info)) => unreachable!( + "info cannot exist without checkpoint: info={:#?}", + last_info + ), + } + } + + loop { + match poll(self)? { + PollResponse::Block { cp, info } => { + let block = self.client.get_block(&info.hash)?; + self.last_cp = Some(cp.clone()); + self.last_info = Some(info); + return Ok(Some(EmittedBlock { cp, block })); + } + PollResponse::NoMoreBlocks => { + // we have reached the tip, try find agreement point in next round + self.last_info = None; + return Ok(None); + } + PollResponse::AgreementPointNotFound => { + self.last_cp = None; + self.last_info = None; + continue; + } + PollResponse::AgreementPointFound { cp, info } => { + self.last_cp = Some(cp); + self.last_info = Some(info); + continue; + } + } + } + } + + /// Continuously poll [`bitcoincore_rpc::Client`] until an update is found. + pub fn emit_update(&mut self) -> Result { + match self.emit_block()? { + Some(emitted_block) => Ok(EmittedUpdate::Block(emitted_block)), + None => self.emit_mempool().map(EmittedUpdate::Mempool), + } + } +} + +/// Extends [`bitcoincore_rpc::Error`]. +pub trait BitcoindRpcErrorExt { + /// Returns whether the error is a "not found" error. + /// + /// This is useful since [`Emitter`] emits [`Result<_, bitcoincore_rpc::Error>`]s as + /// [`Iterator::Item`]. + fn is_not_found_error(&self) -> bool; +} + +impl BitcoindRpcErrorExt for bitcoincore_rpc::Error { + fn is_not_found_error(&self) -> bool { + if let bitcoincore_rpc::Error::JsonRpc(bitcoincore_rpc::jsonrpc::Error::Rpc(rpc_err)) = self + { + rpc_err.code == -5 + } else { + false + } + } +} + +/// An [`Iterator`] that wraps an [`Emitter`], and emits [`Result`]s of [`EmittedUpdate`]. +/// +/// ```rust,no_run +/// use bdk_bitcoind_rpc::{EmittedUpdate, Emitter, UpdateIter}; +/// use core::iter::{IntoIterator, Iterator}; +/// # let client: bdk_bitcoind_rpc::bitcoincore_rpc::Client = todo!(); +/// +/// let mut update_iter = Emitter::new(&client, 706_932, None).into_iter(); +/// let update = update_iter.next().expect("must get next update"); +/// println!("got update: {:?}", update); +/// ``` +/// +/// Refer to [module-level documentation] for more. +/// +/// [module-level documentation]: crate +pub struct UpdateIter<'c, C> { + emitter: Emitter<'c, C>, + last_emission_was_mempool: bool, +} + +impl<'c, C: RpcApi> Iterator for UpdateIter<'c, C> { + type Item = Result; + + fn next(&mut self) -> Option { + if self.last_emission_was_mempool { + self.last_emission_was_mempool = false; + None + } else { + let update = self.emitter.emit_update(); + if matches!(update, Ok(EmittedUpdate::Mempool(_))) { + self.last_emission_was_mempool = true; + } + Some(update) + } + } +} diff --git a/crates/bitcoind_rpc/tests/test_emitter.rs b/crates/bitcoind_rpc/tests/test_emitter.rs new file mode 100644 index 000000000..7a0c2e6b2 --- /dev/null +++ b/crates/bitcoind_rpc/tests/test_emitter.rs @@ -0,0 +1,332 @@ +use std::collections::{BTreeMap, BTreeSet}; + +use bdk_bitcoind_rpc::Emitter; +use bdk_chain::{ + bitcoin::{Address, Amount, BlockHash, Txid}, + local_chain::LocalChain, + Append, BlockId, ConfirmationHeightAnchor, IndexedTxGraph, SpkTxOutIndex, +}; +use bitcoincore_rpc::RpcApi; + +struct TestEnv { + #[allow(dead_code)] + daemon: bitcoind::BitcoinD, + client: bitcoincore_rpc::Client, +} + +impl TestEnv { + fn new() -> anyhow::Result { + let daemon = match std::env::var_os("TEST_BITCOIND") { + Some(bitcoind_path) => bitcoind::BitcoinD::new(bitcoind_path), + None => bitcoind::BitcoinD::from_downloaded(), + }?; + let client = bitcoincore_rpc::Client::new( + &daemon.rpc_url(), + bitcoincore_rpc::Auth::CookieFile(daemon.params.cookie_file.clone()), + )?; + Ok(Self { daemon, client }) + } + + fn mine_blocks( + &self, + count: usize, + address: Option
, + ) -> anyhow::Result> { + let coinbase_address = match address { + Some(address) => address, + None => self.client.get_new_address(None, None)?.assume_checked(), + }; + let block_hashes = self + .client + .generate_to_address(count as _, &coinbase_address)?; + Ok(block_hashes) + } + + fn reorg(&self, count: usize) -> anyhow::Result> { + let start_height = self.client.get_block_count()?; + + let mut hash = self.client.get_best_block_hash()?; + for _ in 0..count { + let prev_hash = self.client.get_block_info(&hash)?.previousblockhash; + self.client.invalidate_block(&hash)?; + match prev_hash { + Some(prev_hash) => hash = prev_hash, + None => break, + } + } + + let res = self.mine_blocks(count, None); + assert_eq!( + self.client.get_block_count()?, + start_height, + "reorg should not result in height change" + ); + res + } +} + +/// Ensure that blocks are emitted in order even after reorg. +/// +/// 1. Mine 101 blocks. +/// 2. Emit blocks from [`Emitter`] and update the [`LocalChain`]. +/// 3. Reorg highest 6 blocks. +/// 4. Emit blocks from [`Emitter`] and re-update the [`LocalChain`]. +#[test] +pub fn test_sync_local_chain() -> anyhow::Result<()> { + let env = TestEnv::new()?; + let mut local_chain = LocalChain::default(); + let mut emitter = Emitter::new(&env.client, 0, local_chain.tip()); + + // mine some blocks and returned the actual block hashes + let exp_hashes = { + let mut hashes = vec![env.client.get_block_hash(0)?]; // include genesis block + hashes.extend(env.mine_blocks(101, None)?); + hashes + }; + + // see if the emitter outputs the right blocks + loop { + let cp = match emitter.emit_block()? { + Some(b) => b.checkpoint(), + None => break, + }; + assert_eq!( + cp.hash(), + exp_hashes[cp.height() as usize], + "emitted block hash is unexpected" + ); + + let chain_update = bdk_chain::local_chain::Update { + tip: cp.clone(), + introduce_older_blocks: false, + }; + assert_eq!( + local_chain.apply_update(chain_update)?, + BTreeMap::from([(cp.height(), Some(cp.hash()))]), + "chain update changeset is unexpected", + ); + } + + assert_eq!( + local_chain.blocks(), + &exp_hashes + .iter() + .enumerate() + .map(|(i, hash)| (i as u32, *hash)) + .collect(), + "final local_chain state is unexpected", + ); + + // create new emitter (just for testing sake) + drop(emitter); + let mut emitter = Emitter::new(&env.client, 0, local_chain.tip()); + + // perform reorg + let reorged_blocks = env.reorg(6)?; + let exp_hashes = exp_hashes + .iter() + .take(exp_hashes.len() - reorged_blocks.len()) + .chain(&reorged_blocks) + .cloned() + .collect::>(); + + // see if the emitter outputs the right blocks + let mut exp_height = exp_hashes.len() - reorged_blocks.len(); + loop { + let cp = match emitter.emit_block()? { + Some(b) => b.checkpoint(), + None => break, + }; + assert_eq!( + cp.height(), + exp_height as u32, + "emitted block has unexpected height" + ); + + assert_eq!( + cp.hash(), + exp_hashes[cp.height() as usize], + "emitted block is unexpected" + ); + + let chain_update = bdk_chain::local_chain::Update { + tip: cp.clone(), + introduce_older_blocks: false, + }; + assert_eq!( + local_chain.apply_update(chain_update)?, + if exp_height == exp_hashes.len() - reorged_blocks.len() { + core::iter::once((cp.height(), Some(cp.hash()))) + .chain((cp.height() + 1..exp_hashes.len() as u32).map(|h| (h, None))) + .collect::() + } else { + BTreeMap::from([(cp.height(), Some(cp.hash()))]) + }, + "chain update changeset is unexpected", + ); + + exp_height += 1; + } + + assert_eq!( + local_chain.blocks(), + &exp_hashes + .iter() + .enumerate() + .map(|(i, hash)| (i as u32, *hash)) + .collect(), + "final local_chain state is unexpected after reorg", + ); + + Ok(()) +} + +/// Ensure that [`EmittedUpdate::into_tx_graph_update`] behaves appropriately for both mempool and +/// block updates. +/// +/// [`EmittedUpdate::into_tx_graph_update`]: bdk_bitcoind_rpc::EmittedUpdate::into_tx_graph_update +#[test] +fn test_into_tx_graph() -> anyhow::Result<()> { + let env = TestEnv::new()?; + + println!("getting new addresses!"); + let addr_0 = env.client.get_new_address(None, None)?.assume_checked(); + let addr_1 = env.client.get_new_address(None, None)?.assume_checked(); + let addr_2 = env.client.get_new_address(None, None)?.assume_checked(); + println!("got new addresses!"); + + println!("mining block!"); + env.mine_blocks(101, None)?; + println!("mined blocks!"); + + let mut chain = LocalChain::default(); + let mut indexed_tx_graph = IndexedTxGraph::::new({ + let mut index = SpkTxOutIndex::::default(); + index.insert_spk(0, addr_0.script_pubkey()); + index.insert_spk(1, addr_1.script_pubkey()); + index.insert_spk(2, addr_2.script_pubkey()); + index + }); + + for r in Emitter::new(&env.client, 0, chain.tip()) { + let update = r?; + + let _ = chain.apply_update(bdk_chain::local_chain::Update { + tip: update.checkpoint(), + introduce_older_blocks: false, + })?; + + let tx_graph_update = update.into_tx_graph_update( + bdk_bitcoind_rpc::indexer_filter(&mut indexed_tx_graph.index, &mut BTreeSet::new()), + bdk_bitcoind_rpc::confirmation_height_anchor, + ); + assert_eq!(tx_graph_update.full_txs().count(), 0); + assert_eq!(tx_graph_update.all_txouts().count(), 0); + assert_eq!(tx_graph_update.all_anchors().len(), 0); + + let indexed_additions = indexed_tx_graph.apply_update(tx_graph_update); + assert!(indexed_additions.is_empty()); + } + + // send 3 txs to a tracked address, these txs will be in the mempool + let exp_txids = { + let mut txids = BTreeSet::new(); + for _ in 0..3 { + txids.insert(env.client.send_to_address( + &addr_0, + Amount::from_sat(10_000), + None, + None, + None, + None, + None, + None, + )?); + } + txids + }; + + // expect the next update to be a mempool update (with 3 relevant tx) + { + let update = Emitter::new(&env.client, 0, chain.tip()).emit_update()?; + assert!(update.is_mempool()); + + let tx_graph_update = update.into_tx_graph_update( + bdk_bitcoind_rpc::indexer_filter(&mut indexed_tx_graph.index, &mut BTreeSet::new()), + bdk_bitcoind_rpc::confirmation_height_anchor, + ); + assert_eq!( + tx_graph_update + .full_txs() + .map(|tx| tx.txid) + .collect::>(), + exp_txids, + "the mempool update should have 3 relevant transactions", + ); + + let indexed_additions = indexed_tx_graph.apply_update(tx_graph_update); + assert_eq!( + indexed_additions + .graph + .txs + .iter() + .map(|tx| tx.txid()) + .collect::>(), + exp_txids, + "changeset should have the 3 mempool transactions", + ); + assert!(indexed_additions.graph.anchors.is_empty()); + } + + // mine a block that confirms the 3 txs + let exp_block_hash = env.mine_blocks(1, None)?[0]; + let exp_block_height = env.client.get_block_info(&exp_block_hash)?.height as u32; + let exp_anchors = exp_txids + .iter() + .map({ + let anchor = ConfirmationHeightAnchor { + anchor_block: BlockId { + height: exp_block_height, + hash: exp_block_hash, + }, + confirmation_height: exp_block_height, + }; + move |&txid| (anchor, txid) + }) + .collect::>(); + + { + let update = Emitter::new(&env.client, 0, chain.tip()).emit_update()?; + assert!(update.is_block()); + + let _ = chain.apply_update(bdk_chain::local_chain::Update { + tip: update.checkpoint(), + introduce_older_blocks: false, + })?; + + let tx_graph_update = update.into_tx_graph_update( + bdk_bitcoind_rpc::indexer_filter(&mut indexed_tx_graph.index, &mut BTreeSet::new()), + bdk_bitcoind_rpc::confirmation_height_anchor, + ); + assert_eq!( + tx_graph_update + .full_txs() + .map(|tx| tx.txid) + .collect::>(), + exp_txids, + "block update should have 3 relevant transactions", + ); + assert_eq!( + tx_graph_update.all_anchors(), + &exp_anchors, + "the block update should introduce anchors", + ); + + let indexed_additions = indexed_tx_graph.apply_update(tx_graph_update); + assert!(indexed_additions.graph.txs.is_empty()); + assert!(indexed_additions.graph.txouts.is_empty()); + assert_eq!(indexed_additions.graph.anchors, exp_anchors); + } + + Ok(()) +} diff --git a/example-crates/example_rpc/Cargo.toml b/example-crates/example_rpc/Cargo.toml new file mode 100644 index 000000000..c107c49b6 --- /dev/null +++ b/example-crates/example_rpc/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "example_rpc" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +bdk_chain = { path = "../../crates/chain", features = ["serde"] } +bdk_bitcoind_rpc = { path = "../../crates/bitcoind_rpc" } +example_cli = { path = "../example_cli" } +ctrlc = { version = "^2" } diff --git a/example-crates/example_rpc/src/main.rs b/example-crates/example_rpc/src/main.rs new file mode 100644 index 000000000..248dd13c0 --- /dev/null +++ b/example-crates/example_rpc/src/main.rs @@ -0,0 +1,312 @@ +use std::{ + path::PathBuf, + sync::{ + atomic::{AtomicBool, Ordering}, + mpsc::sync_channel, + Arc, Mutex, + }, + time::{Duration, Instant, SystemTime}, +}; + +use bdk_bitcoind_rpc::{ + bitcoincore_rpc::{Auth, Client, RpcApi}, + EmittedUpdate, Emitter, +}; +use bdk_chain::{ + bitcoin::{address, Address, Transaction}, + indexed_tx_graph, keychain, + local_chain::{self, LocalChain}, + Append, BlockId, ConfirmationTimeAnchor, IndexedTxGraph, +}; +use example_cli::{ + anyhow, + clap::{self, Args, Subcommand}, + CoinSelectionAlgo, Keychain, +}; + +const DB_MAGIC: &[u8] = b"bdk_example_rpc"; +const DB_PATH: &str = ".bdk_example_rpc.db"; +const CHANNEL_BOUND: usize = 10; +const LIVE_POLL_DUR_SECS: Duration = Duration::from_secs(15); + +type ChangeSet = ( + local_chain::ChangeSet, + indexed_tx_graph::ChangeSet>, +); + +#[derive(Args, Debug, Clone)] +struct RpcArgs { + /// RPC URL + #[clap(env = "RPC_URL", long, default_value = "127.0.0.1:8332")] + url: String, + /// RPC auth cookie file + #[clap(env = "RPC_COOKIE", long)] + rpc_cookie: Option, + /// RPC auth username + #[clap(env = "RPC_USER", long)] + rpc_user: Option, + /// RPC auth password + #[clap(env = "RPC_PASS", long)] + rpc_password: Option, +} + +impl From for Auth { + fn from(args: RpcArgs) -> Self { + match (args.rpc_cookie, args.rpc_user, args.rpc_password) { + (None, None, None) => Self::None, + (Some(path), _, _) => Self::CookieFile(path), + (_, Some(user), Some(pass)) => Self::UserPass(user, pass), + (_, Some(_), None) => panic!("rpc auth: missing rpc_pass"), + (_, None, Some(_)) => panic!("rpc auth: missing rpc_user"), + } + } +} + +#[derive(Subcommand, Debug, Clone)] +enum RpcCommands { + /// Scans blocks via RPC (starting from last point of agreement) and stores/indexes relevant + /// transactions + Scan { + /// Starting block height to fallback to if no point of agreement if found + #[clap(env = "FALLBACK_HEIGHT", long, default_value = "0")] + fallback_height: u32, + /// The unused-scripts lookahead will be kept at this size + #[clap(long, default_value = "10")] + lookahead: u32, + /// Whether to be live! + #[clap(long, default_value = "false")] + live: bool, + #[clap(flatten)] + rpc_args: RpcArgs, + }, + /// Create and broadcast a transaction. + Tx { + value: u64, + address: Address, + #[clap(short, default_value = "bnb")] + coin_select: CoinSelectionAlgo, + #[clap(flatten)] + rpc_args: RpcArgs, + }, +} + +impl RpcCommands { + fn rpc_args(&self) -> &RpcArgs { + match self { + RpcCommands::Scan { rpc_args, .. } => rpc_args, + RpcCommands::Tx { rpc_args, .. } => rpc_args, + } + } +} + +fn main() -> anyhow::Result<()> { + let sigterm_flag = start_ctrlc_handler(); + + let (args, keymap, index, db, init_changeset) = + example_cli::init::(DB_MAGIC, DB_PATH)?; + + let graph = Mutex::new({ + let mut graph = IndexedTxGraph::new(index); + graph.apply_changeset(init_changeset.1); + graph + }); + + let chain = Mutex::new(LocalChain::from_changeset(init_changeset.0)); + + let rpc_cmd = match args.command { + example_cli::Commands::ChainSpecific(rpc_cmd) => rpc_cmd, + general_cmd => { + let res = example_cli::handle_commands( + &graph, + &db, + &chain, + &keymap, + args.network, + |_| Err(anyhow::anyhow!("use `tx` instead")), + general_cmd, + ); + db.lock().unwrap().commit()?; + return res; + } + }; + + let rpc_client = { + let a = rpc_cmd.rpc_args(); + Client::new( + &a.url, + match (&a.rpc_cookie, &a.rpc_user, &a.rpc_password) { + (None, None, None) => Auth::None, + (Some(path), _, _) => Auth::CookieFile(path.clone()), + (_, Some(user), Some(pass)) => Auth::UserPass(user.clone(), pass.clone()), + (_, Some(_), None) => panic!("rpc auth: missing rpc_pass"), + (_, None, Some(_)) => panic!("rpc auth: missing rpc_user"), + }, + )? + }; + + match rpc_cmd { + RpcCommands::Scan { + fallback_height, + lookahead, + live, + .. + } => { + graph.lock().unwrap().index.set_lookahead_for_all(lookahead); + + let (chan, recv) = sync_channel::<(EmittedUpdate, u32)>(CHANNEL_BOUND); + let prev_cp = chain.lock().unwrap().tip(); + + let join_handle = std::thread::spawn(move || -> anyhow::Result<()> { + let mut tip_height = Option::::None; + + let mut emitter = Emitter::new(&rpc_client, fallback_height, prev_cp); + loop { + let item = emitter.emit_update()?; + let is_mempool = item.is_mempool(); + + if tip_height.is_none() || is_mempool { + tip_height = Some(rpc_client.get_block_count()? as u32); + } + chan.send((item, tip_height.expect("must have tip height")))?; + + if !is_mempool { + // break if sigterm is detected + if sigterm_flag.load(Ordering::Acquire) { + break; + } + continue; + } + + // everything after this point is a mempool update + // mempool update is emitted after we reach the chain tip + // if we are are in "sync-once" mode, we break here + // otherwise, we sleep or wait for sigterm + if !live || await_flag(&sigterm_flag, LIVE_POLL_DUR_SECS) { + break; + } + } + + Ok(()) + }); + + let mut start = Instant::now(); + + for (item, tip_height) in recv { + let is_mempool = item.is_mempool(); + let tip = item.checkpoint(); + let current_height = tip.height(); + + let db_changeset = { + let mut indexed_changeset = indexed_tx_graph::ChangeSet::default(); + let mut chain = chain.lock().unwrap(); + let mut graph = graph.lock().unwrap(); + + let graph_update = { + let tx_filter = bdk_bitcoind_rpc::indexer_filter( + &mut graph.index, + &mut indexed_changeset.indexer, + ); + let anchor_map = bdk_bitcoind_rpc::confirmation_time_anchor; + item.into_tx_graph_update(tx_filter, anchor_map) + }; + indexed_changeset.append(graph.apply_update(graph_update)); + + let chain_changeset = chain.apply_update(local_chain::Update { + tip, + introduce_older_blocks: false, + })?; + + (chain_changeset, indexed_changeset) + }; + + let mut db = db.lock().unwrap(); + db.stage(db_changeset); + + // print stuff every 3 seconds + if start.elapsed() >= Duration::from_secs(3) { + start = Instant::now(); + let balance = { + let chain = chain.lock().unwrap(); + let graph = graph.lock().unwrap(); + graph.graph().balance( + &*chain, + chain.tip().map_or(BlockId::default(), |cp| cp.block_id()), + graph.index.outpoints().iter().cloned(), + |(k, _), _| k == &Keychain::Internal, + ) + }; + println!( + "* scanned_to: {} / {} tip | total: {} sats", + if is_mempool { + "mempool".to_string() + } else { + current_height.to_string() + }, + tip_height, + balance.confirmed + + balance.immature + + balance.trusted_pending + + balance.untrusted_pending + ); + } + } + + db.lock().unwrap().commit()?; + println!("commited to database!"); + + join_handle + .join() + .expect("failed to join chain source thread") + } + RpcCommands::Tx { + value, + address, + coin_select, + .. + } => { + let chain = chain.lock().unwrap(); + let broadcast = move |tx: &Transaction| -> anyhow::Result<()> { + rpc_client.send_raw_transaction(tx)?; + Ok(()) + }; + example_cli::run_send_cmd( + &graph, + &db, + &*chain, + &keymap, + coin_select, + address + .require_network(args.network) + .expect("address has the wrong network"), + value, + broadcast, + ) + } + } +} + +fn start_ctrlc_handler() -> Arc { + let flag = Arc::new(AtomicBool::new(false)); + let cloned_flag = flag.clone(); + + ctrlc::set_handler(move || cloned_flag.store(true, Ordering::Release)); + + flag +} + +fn await_flag(flag: &AtomicBool, duration: Duration) -> bool { + let start = SystemTime::now(); + loop { + if flag.load(Ordering::Acquire) { + return true; + } + if SystemTime::now() + .duration_since(start) + .expect("should succeed") + >= duration + { + return false; + } + std::thread::sleep(Duration::from_secs(1)); + } +} From e6f2dbb0ac2e750a123f2a4025d3dc50cdc05290 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Sun, 27 Aug 2023 00:49:42 +0800 Subject: [PATCH 11/15] feat(bitcoind_rpc)!: rm `cp` field from `EmittedMempool` --- crates/bitcoind_rpc/src/lib.rs | 43 ++++++++++------------- crates/bitcoind_rpc/tests/test_emitter.rs | 14 ++++---- example-crates/example_rpc/src/main.rs | 18 +++++----- 3 files changed, 32 insertions(+), 43 deletions(-) diff --git a/crates/bitcoind_rpc/src/lib.rs b/crates/bitcoind_rpc/src/lib.rs index 2817cb7df..51ba7ca39 100644 --- a/crates/bitcoind_rpc/src/lib.rs +++ b/crates/bitcoind_rpc/src/lib.rs @@ -29,11 +29,9 @@ //! for r in Emitter::new(&client, 709_632, None) { //! let update = r.expect("todo: deal with the error properly"); //! -//! if update.is_block() { -//! let cp = update.checkpoint(); -//! println!("block {}:{}", cp.height(), cp.hash()); -//! } else { -//! println!("mempool!"); +//! match update.checkpoint() { +//! Some(cp) => println!("block {}:{}", cp.height(), cp.hash()), +//! None => println!("mempool!"), //! } //! } //! ``` @@ -46,7 +44,7 @@ use bdk_chain::{ bitcoin::{Block, Transaction}, indexed_tx_graph::Indexer, - local_chain::CheckPoint, + local_chain::{self, CheckPoint}, Append, BlockId, ConfirmationHeightAnchor, ConfirmationTimeAnchor, TxGraph, }; pub use bitcoincore_rpc; @@ -77,13 +75,23 @@ impl EmittedUpdate { } /// Get the emission's checkpoint. - pub fn checkpoint(&self) -> CheckPoint { + /// + /// The emission will only have a checkpoint if it is the [`EmittedUpdate::Block`] variant. + pub fn checkpoint(&self) -> Option { match self { - EmittedUpdate::Block(e) => e.checkpoint(), - EmittedUpdate::Mempool(e) => e.checkpoint(), + EmittedUpdate::Block(e) => Some(e.checkpoint()), + EmittedUpdate::Mempool(_) => None, } } + /// Convenience method to get [`local_chain::Update`]. + pub fn chain_update(&self) -> Option { + Some(local_chain::Update { + tip: self.checkpoint()?, + introduce_older_blocks: false, + }) + } + /// Transforms the emitted update into a [`TxGraph`] update. /// /// The `tx_filter` parameter takes in a closure that filters out irrelevant transactions so @@ -155,18 +163,11 @@ impl EmittedBlock { /// An emitted subset of mempool transactions. #[derive(Debug, Clone)] pub struct EmittedMempool { - /// The checkpoint of the last-seen tip. - pub cp: CheckPoint, /// Subset of mempool transactions. pub txs: Vec<(Transaction, u64)>, } impl EmittedMempool { - /// Get the emission's checkpoint. - pub fn checkpoint(&self) -> CheckPoint { - self.cp.clone() - } - /// Transforms the emitted mempool into a [`TxGraph`] update. /// /// The `tx_filter` parameter takes in a closure that filters out irrelevant transactions so @@ -295,15 +296,7 @@ impl<'c, C: RpcApi> Emitter<'c, C> { }, ) .collect::, _>>()?; - let cp = match &self.last_cp { - Some(cp) => cp.clone(), - None => { - let hash = self.client.get_best_block_hash()?; - let height = self.client.get_block_info(&hash)?.height as u32; - CheckPoint::new(BlockId { height, hash }) - } - }; - Ok(EmittedMempool { cp, txs }) + Ok(EmittedMempool { txs }) } /// Emits the next block (if any). diff --git a/crates/bitcoind_rpc/tests/test_emitter.rs b/crates/bitcoind_rpc/tests/test_emitter.rs index 7a0c2e6b2..f3c800113 100644 --- a/crates/bitcoind_rpc/tests/test_emitter.rs +++ b/crates/bitcoind_rpc/tests/test_emitter.rs @@ -211,10 +211,9 @@ fn test_into_tx_graph() -> anyhow::Result<()> { for r in Emitter::new(&env.client, 0, chain.tip()) { let update = r?; - let _ = chain.apply_update(bdk_chain::local_chain::Update { - tip: update.checkpoint(), - introduce_older_blocks: false, - })?; + if let Some(chain_update) = update.chain_update() { + let _ = chain.apply_update(chain_update)?; + } let tx_graph_update = update.into_tx_graph_update( bdk_bitcoind_rpc::indexer_filter(&mut indexed_tx_graph.index, &mut BTreeSet::new()), @@ -299,10 +298,9 @@ fn test_into_tx_graph() -> anyhow::Result<()> { let update = Emitter::new(&env.client, 0, chain.tip()).emit_update()?; assert!(update.is_block()); - let _ = chain.apply_update(bdk_chain::local_chain::Update { - tip: update.checkpoint(), - introduce_older_blocks: false, - })?; + if let Some(chain_update) = update.chain_update() { + let _ = chain.apply_update(chain_update)?; + } let tx_graph_update = update.into_tx_graph_update( bdk_bitcoind_rpc::indexer_filter(&mut indexed_tx_graph.index, &mut BTreeSet::new()), diff --git a/example-crates/example_rpc/src/main.rs b/example-crates/example_rpc/src/main.rs index 248dd13c0..53631b434 100644 --- a/example-crates/example_rpc/src/main.rs +++ b/example-crates/example_rpc/src/main.rs @@ -192,9 +192,8 @@ fn main() -> anyhow::Result<()> { let mut start = Instant::now(); for (item, tip_height) in recv { - let is_mempool = item.is_mempool(); + let chain_update = item.chain_update(); let tip = item.checkpoint(); - let current_height = tip.height(); let db_changeset = { let mut indexed_changeset = indexed_tx_graph::ChangeSet::default(); @@ -211,10 +210,10 @@ fn main() -> anyhow::Result<()> { }; indexed_changeset.append(graph.apply_update(graph_update)); - let chain_changeset = chain.apply_update(local_chain::Update { - tip, - introduce_older_blocks: false, - })?; + let chain_changeset = match chain_update { + Some(update) => chain.apply_update(update)?, + None => local_chain::ChangeSet::default(), + }; (chain_changeset, indexed_changeset) }; @@ -237,10 +236,9 @@ fn main() -> anyhow::Result<()> { }; println!( "* scanned_to: {} / {} tip | total: {} sats", - if is_mempool { - "mempool".to_string() - } else { - current_height.to_string() + match tip { + Some(cp) => cp.height().to_string(), + None => "mempool".to_string(), }, tip_height, balance.confirmed From 105d6c6013a7043d09833a832c6921a516915f44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Sun, 27 Aug 2023 02:55:14 +0800 Subject: [PATCH 12/15] feat(bitcoind_rpc)!: simplified tx graph update Replaced `into_tx_graph_update` with `indexed_tx_graph_update`. The latter returns a vec of `(tx, anchors, last_seen)` to be passed into `IndexedTxGraph::insert_relevant_txs`. --- crates/bitcoind_rpc/src/lib.rs | 114 ++++++++------------ crates/bitcoind_rpc/tests/test_emitter.rs | 48 ++------- crates/chain/src/indexed_tx_graph.rs | 20 ++-- crates/chain/tests/test_indexed_tx_graph.rs | 13 ++- example-crates/example_rpc/src/main.rs | 22 ++-- 5 files changed, 78 insertions(+), 139 deletions(-) diff --git a/crates/bitcoind_rpc/src/lib.rs b/crates/bitcoind_rpc/src/lib.rs index 51ba7ca39..c2e29ca78 100644 --- a/crates/bitcoind_rpc/src/lib.rs +++ b/crates/bitcoind_rpc/src/lib.rs @@ -1,8 +1,7 @@ //! This crate is used for updating [`bdk_chain`] structures with data from the `bitcoind` RPC -//! interface. +//! interface (excluding the RPC wallet API). //! -//! The main structure is [`Emitter`], which sources blockchain data from -//! [`bitcoincore_rpc::Client`]. +//! [`Emitter`] is the main structure which sources blockchain data from [`bitcoincore_rpc::Client`]. //! //! To only get block updates (exlude mempool transactions), the caller can use //! [`Emitter::emit_block`] until it returns `Ok(None)` (which means the chain tip is reached). A @@ -43,9 +42,9 @@ use bdk_chain::{ bitcoin::{Block, Transaction}, - indexed_tx_graph::Indexer, + indexed_tx_graph::TxItem, local_chain::{self, CheckPoint}, - Append, BlockId, ConfirmationHeightAnchor, ConfirmationTimeAnchor, TxGraph, + BlockId, ConfirmationHeightAnchor, ConfirmationTimeAnchor, }; pub use bitcoincore_rpc; use bitcoincore_rpc::{json::GetBlockResult, RpcApi}; @@ -92,24 +91,21 @@ impl EmittedUpdate { }) } - /// Transforms the emitted update into a [`TxGraph`] update. - /// - /// The `tx_filter` parameter takes in a closure that filters out irrelevant transactions so - /// they do not get included in the [`TxGraph`] update. We have provided two closures; - /// [`empty_filter`] and [`indexer_filter`] for this purpose. + /// Return transaction items to be consumed by [`IndexedTxGraph::insert_relevant_txs`]. /// /// The `anchor_map` parameter takes in a closure that creates anchors of a specific type. /// [`confirmation_height_anchor`] and [`confirmation_time_anchor`] are avaliable to create /// updates with [`ConfirmationHeightAnchor`] and [`ConfirmationTimeAnchor`] respectively. - pub fn into_tx_graph_update(self, tx_filter: F, anchor_map: M) -> TxGraph + /// + /// [`IndexedTxGraph::insert_relevant_txs`]: bdk_chain::IndexedTxGraph::insert_relevant_txs + pub fn indexed_tx_graph_update(&self, anchor_map: M) -> Vec>> where - F: FnMut(&Transaction) -> bool, M: Fn(&CheckPoint, &Block, usize) -> A, - A: Clone + Ord + PartialOrd, + A: Clone + Ord + PartialEq, { match self { - EmittedUpdate::Block(e) => e.into_tx_graph_update(tx_filter, anchor_map), - EmittedUpdate::Mempool(e) => e.into_tx_graph_update(tx_filter), + EmittedUpdate::Block(e) => e.indexed_tx_graph_update(anchor_map).collect(), + EmittedUpdate::Mempool(e) => e.indexed_tx_graph_update().collect(), } } } @@ -129,87 +125,63 @@ impl EmittedBlock { self.cp.clone() } - /// Transforms the emitted update into a [`TxGraph`] update. + /// Convenience method to get [`local_chain::Update`]. + pub fn chain_update(&self) -> local_chain::Update { + local_chain::Update { + tip: self.cp.clone(), + introduce_older_blocks: false, + } + } + + /// Return transaction items to be consumed by [`IndexedTxGraph::insert_relevant_txs`]. /// - /// The `tx_filter` parameter takes in a closure that filters out irrelevant transactions so - /// they do not get included in the [`TxGraph`] update. We have provided two closures; - /// [`empty_filter`] and [`indexer_filter`] for this purpose. + /// Refer to [`EmittedUpdate::indexed_tx_graph_update`] for more. /// - /// The `anchor_map` parameter takes in a closure that creates anchors of a specific type. - /// [`confirmation_height_anchor`] and [`confirmation_time_anchor`] are avaliable to create - /// updates with [`ConfirmationHeightAnchor`] and [`ConfirmationTimeAnchor`] respectively. - pub fn into_tx_graph_update(self, mut tx_filter: F, anchor_map: M) -> TxGraph + /// [`IndexedTxGraph::insert_relevant_txs`]: bdk_chain::IndexedTxGraph::insert_relevant_txs + pub fn indexed_tx_graph_update( + &self, + anchor_map: M, + ) -> impl Iterator>> where - F: FnMut(&Transaction) -> bool, M: Fn(&CheckPoint, &Block, usize) -> A, - A: Clone + Ord + PartialOrd, + A: Clone + Ord + PartialEq, { - let mut tx_graph = TxGraph::default(); - let tx_iter = self - .block + self.block .txdata .iter() .enumerate() - .filter(move |(_, tx)| tx_filter(tx)); - for (tx_pos, tx) in tx_iter { - let txid = tx.txid(); - let _ = tx_graph.insert_anchor(txid, anchor_map(&self.cp, &self.block, tx_pos)); - let _ = tx_graph.insert_tx(tx.clone()); - } - tx_graph + .map(move |(i, tx)| (tx, Some(anchor_map(&self.cp, &self.block, i)), None)) } } /// An emitted subset of mempool transactions. #[derive(Debug, Clone)] pub struct EmittedMempool { - /// Subset of mempool transactions. + /// Subset of mempool transactions as tuples of `(tx, seen_at)`. + /// + /// `seen_at` is the unix timestamp of when the transaction was first seen in the mempool. pub txs: Vec<(Transaction, u64)>, } impl EmittedMempool { - /// Transforms the emitted mempool into a [`TxGraph`] update. + /// Return transaction items to be consumed by [`IndexedTxGraph::insert_relevant_txs`]. + /// + /// Refer to [`EmittedUpdate::indexed_tx_graph_update`] for more. /// - /// The `tx_filter` parameter takes in a closure that filters out irrelevant transactions so - /// they do not get included in the [`TxGraph`] update. We have provided two closures; - /// [`empty_filter`] and [`indexer_filter`] for this purpose. - pub fn into_tx_graph_update(self, mut tx_filter: F) -> TxGraph + /// [`IndexedTxGraph::insert_relevant_txs`]: bdk_chain::IndexedTxGraph::insert_relevant_txs + pub fn indexed_tx_graph_update(&self) -> impl Iterator>> where - F: FnMut(&Transaction) -> bool, - A: Clone + Ord + PartialOrd, + A: Clone + Ord + PartialEq, { - let mut tx_graph = TxGraph::default(); - let tx_iter = self.txs.into_iter().filter(move |(tx, _)| tx_filter(tx)); - for (tx, seen_at) in tx_iter { - let _ = tx_graph.insert_seen_at(tx.txid(), seen_at); - let _ = tx_graph.insert_tx(tx); - } - tx_graph - } -} - -/// Creates a closure that filters transactions based on an [`Indexer`] implementation. -pub fn indexer_filter<'i, I: Indexer>( - indexer: &'i mut I, - changeset: &'i mut I::ChangeSet, -) -> impl FnMut(&Transaction) -> bool + 'i -where - I::ChangeSet: bdk_chain::Append, -{ - |tx| { - changeset.append(indexer.index_tx(tx)); - indexer.is_tx_relevant(tx) + self.txs + .iter() + .map(|(tx, seen_at)| (tx, None, Some(*seen_at))) } } -/// Returns an empty filter-closure. -pub fn empty_filter() -> impl FnMut(&Transaction) -> bool { - |_| true -} - /// A closure that transforms a [`EmittedUpdate`] into a [`ConfirmationHeightAnchor`]. /// -/// This is to be used as an input to [`EmittedUpdate::into_tx_graph_update`]. +/// This is to be used as an input to [`EmittedUpdate::indexed_tx_graph_update`]. pub fn confirmation_height_anchor( cp: &CheckPoint, _block: &Block, @@ -224,7 +196,7 @@ pub fn confirmation_height_anchor( /// A closure that transforms a [`EmittedUpdate`] into a [`ConfirmationTimeAnchor`]. /// -/// This is to be used as an input to [`EmittedUpdate::into_tx_graph_update`]. +/// This is to be used as an input to [`EmittedUpdate::indexed_tx_graph_update`]. pub fn confirmation_time_anchor( cp: &CheckPoint, block: &Block, diff --git a/crates/bitcoind_rpc/tests/test_emitter.rs b/crates/bitcoind_rpc/tests/test_emitter.rs index f3c800113..d5dc4c6bd 100644 --- a/crates/bitcoind_rpc/tests/test_emitter.rs +++ b/crates/bitcoind_rpc/tests/test_emitter.rs @@ -215,15 +215,10 @@ fn test_into_tx_graph() -> anyhow::Result<()> { let _ = chain.apply_update(chain_update)?; } - let tx_graph_update = update.into_tx_graph_update( - bdk_bitcoind_rpc::indexer_filter(&mut indexed_tx_graph.index, &mut BTreeSet::new()), - bdk_bitcoind_rpc::confirmation_height_anchor, - ); - assert_eq!(tx_graph_update.full_txs().count(), 0); - assert_eq!(tx_graph_update.all_txouts().count(), 0); - assert_eq!(tx_graph_update.all_anchors().len(), 0); + let tx_graph_update = + update.indexed_tx_graph_update(bdk_bitcoind_rpc::confirmation_height_anchor); - let indexed_additions = indexed_tx_graph.apply_update(tx_graph_update); + let indexed_additions = indexed_tx_graph.insert_relevant_txs(tx_graph_update); assert!(indexed_additions.is_empty()); } @@ -250,20 +245,10 @@ fn test_into_tx_graph() -> anyhow::Result<()> { let update = Emitter::new(&env.client, 0, chain.tip()).emit_update()?; assert!(update.is_mempool()); - let tx_graph_update = update.into_tx_graph_update( - bdk_bitcoind_rpc::indexer_filter(&mut indexed_tx_graph.index, &mut BTreeSet::new()), - bdk_bitcoind_rpc::confirmation_height_anchor, - ); - assert_eq!( - tx_graph_update - .full_txs() - .map(|tx| tx.txid) - .collect::>(), - exp_txids, - "the mempool update should have 3 relevant transactions", - ); + let tx_graph_update = + update.indexed_tx_graph_update(bdk_bitcoind_rpc::confirmation_height_anchor); - let indexed_additions = indexed_tx_graph.apply_update(tx_graph_update); + let indexed_additions = indexed_tx_graph.insert_relevant_txs(tx_graph_update); assert_eq!( indexed_additions .graph @@ -302,25 +287,10 @@ fn test_into_tx_graph() -> anyhow::Result<()> { let _ = chain.apply_update(chain_update)?; } - let tx_graph_update = update.into_tx_graph_update( - bdk_bitcoind_rpc::indexer_filter(&mut indexed_tx_graph.index, &mut BTreeSet::new()), - bdk_bitcoind_rpc::confirmation_height_anchor, - ); - assert_eq!( - tx_graph_update - .full_txs() - .map(|tx| tx.txid) - .collect::>(), - exp_txids, - "block update should have 3 relevant transactions", - ); - assert_eq!( - tx_graph_update.all_anchors(), - &exp_anchors, - "the block update should introduce anchors", - ); + let tx_graph_update = + update.indexed_tx_graph_update(bdk_bitcoind_rpc::confirmation_height_anchor); - let indexed_additions = indexed_tx_graph.apply_update(tx_graph_update); + let indexed_additions = indexed_tx_graph.insert_relevant_txs(tx_graph_update); assert!(indexed_additions.graph.txs.is_empty()); assert!(indexed_additions.graph.txouts.is_empty()); assert_eq!(indexed_additions.graph.anchors, exp_anchors); diff --git a/crates/chain/src/indexed_tx_graph.rs b/crates/chain/src/indexed_tx_graph.rs index 4643a93ec..169c4ad36 100644 --- a/crates/chain/src/indexed_tx_graph.rs +++ b/crates/chain/src/indexed_tx_graph.rs @@ -135,8 +135,7 @@ where /// timestamp of when the transactions are last seen. pub fn insert_relevant_txs<'t>( &mut self, - txs: impl IntoIterator)>, - seen_at: Option, + txs: impl IntoIterator>>, ) -> ChangeSet { // The algorithm below allows for non-topologically ordered transactions by using two loops. // This is achieved by: @@ -146,17 +145,19 @@ where // returns true or not. (in a second loop). let mut changeset = ChangeSet::::default(); let mut transactions = Vec::new(); - for (tx, anchors) in txs.into_iter() { + for (tx, anchors, seen_at) in txs.into_iter() { changeset.indexer.append(self.index.index_tx(tx)); - transactions.push((tx, anchors)); + transactions.push((tx, anchors, seen_at)); } changeset.append( transactions .into_iter() - .filter_map(|(tx, anchors)| match self.index.is_tx_relevant(tx) { - true => Some(self.insert_tx(tx, anchors, seen_at)), - false => None, - }) + .filter_map( + |(tx, anchors, seen_at)| match self.index.is_tx_relevant(tx) { + true => Some(self.insert_tx(tx, anchors, seen_at)), + false => None, + }, + ) .fold(Default::default(), |mut acc, other| { acc.append(other); acc @@ -166,6 +167,9 @@ where } } +/// Represents a single transaction update. +pub type TxItem<'t, A> = (&'t Transaction, A, Option); + /// A structure that represents changes to an [`IndexedTxGraph`]. #[derive(Clone, Debug, PartialEq)] #[cfg_attr( diff --git a/crates/chain/tests/test_indexed_tx_graph.rs b/crates/chain/tests/test_indexed_tx_graph.rs index 84506ec11..312096979 100644 --- a/crates/chain/tests/test_indexed_tx_graph.rs +++ b/crates/chain/tests/test_indexed_tx_graph.rs @@ -74,7 +74,7 @@ fn insert_relevant_txs() { }; assert_eq!( - graph.insert_relevant_txs(txs.iter().map(|tx| (tx, None)), None), + graph.insert_relevant_txs(txs.iter().map(|tx| (tx, None, None))), changeset, ); @@ -211,8 +211,8 @@ fn test_list_owned_txouts() { // Insert transactions into graph with respective anchors // For unconfirmed txs we pass in `None`. - let _ = graph.insert_relevant_txs( - [&tx1, &tx2, &tx3, &tx6].iter().enumerate().map(|(i, tx)| { + let _ = + graph.insert_relevant_txs([&tx1, &tx2, &tx3, &tx6].iter().enumerate().map(|(i, tx)| { let height = i as u32; ( *tx, @@ -225,12 +225,11 @@ fn test_list_owned_txouts() { anchor_block, confirmation_height: anchor_block.height, }), + None, ) - }), - None, - ); + })); - let _ = graph.insert_relevant_txs([&tx4, &tx5].iter().map(|tx| (*tx, None)), Some(100)); + let _ = graph.insert_relevant_txs([&tx4, &tx5].iter().map(|tx| (*tx, None, Some(100)))); // A helper lambda to extract and filter data from the graph. let fetch = diff --git a/example-crates/example_rpc/src/main.rs b/example-crates/example_rpc/src/main.rs index 53631b434..3a80e131c 100644 --- a/example-crates/example_rpc/src/main.rs +++ b/example-crates/example_rpc/src/main.rs @@ -64,9 +64,9 @@ impl From for Auth { #[derive(Subcommand, Debug, Clone)] enum RpcCommands { - /// Scans blocks via RPC (starting from last point of agreement) and stores/indexes relevant - /// transactions - Scan { + /// Syncs local state with remote state via RPC (starting from last point of agreement) and + /// stores/indexes relevant transactions + Sync { /// Starting block height to fallback to if no point of agreement if found #[clap(env = "FALLBACK_HEIGHT", long, default_value = "0")] fallback_height: u32, @@ -93,7 +93,7 @@ enum RpcCommands { impl RpcCommands { fn rpc_args(&self) -> &RpcArgs { match self { - RpcCommands::Scan { rpc_args, .. } => rpc_args, + RpcCommands::Sync { rpc_args, .. } => rpc_args, RpcCommands::Tx { rpc_args, .. } => rpc_args, } } @@ -145,7 +145,7 @@ fn main() -> anyhow::Result<()> { }; match rpc_cmd { - RpcCommands::Scan { + RpcCommands::Sync { fallback_height, lookahead, live, @@ -200,15 +200,9 @@ fn main() -> anyhow::Result<()> { let mut chain = chain.lock().unwrap(); let mut graph = graph.lock().unwrap(); - let graph_update = { - let tx_filter = bdk_bitcoind_rpc::indexer_filter( - &mut graph.index, - &mut indexed_changeset.indexer, - ); - let anchor_map = bdk_bitcoind_rpc::confirmation_time_anchor; - item.into_tx_graph_update(tx_filter, anchor_map) - }; - indexed_changeset.append(graph.apply_update(graph_update)); + let graph_update = + item.indexed_tx_graph_update(bdk_bitcoind_rpc::confirmation_time_anchor); + indexed_changeset.append(graph.insert_relevant_txs(graph_update)); let chain_changeset = match chain_update { Some(update) => chain.apply_update(update)?, From fd4e71e8ca922de38eb0f2b2e5e83bf6c04775fd Mon Sep 17 00:00:00 2001 From: Vladimir Fomene Date: Wed, 30 Aug 2023 09:00:45 +0300 Subject: [PATCH 13/15] feat: create RPC wallet project --- Cargo.toml | 1 + example-crates/wallet_rpc/Cargo.toml | 8 ++++++++ example-crates/wallet_rpc/src/main.rs | 3 +++ 3 files changed, 12 insertions(+) create mode 100644 example-crates/wallet_rpc/Cargo.toml create mode 100644 example-crates/wallet_rpc/src/main.rs diff --git a/Cargo.toml b/Cargo.toml index b235da3e7..8a7838800 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ members = [ "example-crates/wallet_electrum", "example-crates/wallet_esplora_blocking", "example-crates/wallet_esplora_async", + "example-crates/wallet_rpc", "nursery/tmp_plan", "nursery/coin_select" ] diff --git a/example-crates/wallet_rpc/Cargo.toml b/example-crates/wallet_rpc/Cargo.toml new file mode 100644 index 000000000..816c249de --- /dev/null +++ b/example-crates/wallet_rpc/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "wallet_rpc" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] diff --git a/example-crates/wallet_rpc/src/main.rs b/example-crates/wallet_rpc/src/main.rs new file mode 100644 index 000000000..e7a11a969 --- /dev/null +++ b/example-crates/wallet_rpc/src/main.rs @@ -0,0 +1,3 @@ +fn main() { + println!("Hello, world!"); +} From b69c61f24e3262e5ae487f07714393b0f9600568 Mon Sep 17 00:00:00 2001 From: Vladimir Fomene Date: Thu, 31 Aug 2023 11:46:13 +0300 Subject: [PATCH 14/15] feat: Stage WalletChangeSet and mutable local_chain and indexed_tx_graph --- crates/bdk/src/wallet/mod.rs | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/crates/bdk/src/wallet/mod.rs b/crates/bdk/src/wallet/mod.rs index ac3c808ee..526442afb 100644 --- a/crates/bdk/src/wallet/mod.rs +++ b/crates/bdk/src/wallet/mod.rs @@ -1846,11 +1846,25 @@ impl Wallet { self.persist.staged() } + /// Stages wallet changes in memory for later commit. + /// + /// To commit changes to the persistence backend, call [`commit`]. + /// + /// [`commit`]: Self::commit + pub fn stage(&mut self, changeset: ChangeSet) where D: PersistBackend { + self.persist.stage(changeset); + } + /// Get a reference to the inner [`TxGraph`]. pub fn tx_graph(&self) -> &TxGraph { self.indexed_graph.graph() } + /// Get a mutable reference to the inner [`IndexedTxGraph`]. + pub fn mut_indexed_tx_graph(&mut self) -> &mut IndexedTxGraph> { + &mut self.indexed_graph + } + /// Get a reference to the inner [`KeychainTxOutIndex`]. pub fn spk_index(&self) -> &KeychainTxOutIndex { &self.indexed_graph.index @@ -1860,6 +1874,11 @@ impl Wallet { pub fn local_chain(&self) -> &LocalChain { &self.chain } + + /// Get a mutable reference to the inner [`LocalChain`]. + pub fn mut_local_chain(&mut self) -> &mut LocalChain { + &mut self.chain + } } impl AsRef> for Wallet { From 7120eaf8edb73369fec47bd0409e99c26784d0d6 Mon Sep 17 00:00:00 2001 From: Vladimir Fomene Date: Thu, 31 Aug 2023 12:01:34 +0300 Subject: [PATCH 15/15] feat: implement RPC wallet example --- example-crates/wallet_rpc/Cargo.toml | 4 + example-crates/wallet_rpc/src/main.rs | 133 +++++++++++++++++++++++++- 2 files changed, 135 insertions(+), 2 deletions(-) diff --git a/example-crates/wallet_rpc/Cargo.toml b/example-crates/wallet_rpc/Cargo.toml index 816c249de..5f731be1f 100644 --- a/example-crates/wallet_rpc/Cargo.toml +++ b/example-crates/wallet_rpc/Cargo.toml @@ -6,3 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +bdk = { path = "../../crates/bdk" } +bdk_bitcoind_rpc = { path = "../../crates/bitcoind_rpc" } +bdk_file_store = { path = "../../crates/file_store" } +anyhow = "1" \ No newline at end of file diff --git a/example-crates/wallet_rpc/src/main.rs b/example-crates/wallet_rpc/src/main.rs index e7a11a969..102a75502 100644 --- a/example-crates/wallet_rpc/src/main.rs +++ b/example-crates/wallet_rpc/src/main.rs @@ -1,3 +1,132 @@ -fn main() { - println!("Hello, world!"); +use bdk::{ + bitcoin::{ Network, Address }, + chain::{indexed_tx_graph, local_chain, Append}, + wallet::{AddressIndex, WalletChangeSet}, + Wallet, + SignOptions, +}; +use bdk_bitcoind_rpc::{ + bitcoincore_rpc::{Auth, Client, RpcApi}, + EmittedUpdate, Emitter, +}; +use bdk_file_store::Store; +use std::sync::mpsc::sync_channel; +use std::str::FromStr; + +const DB_MAGIC: &str = "bdk-rpc-example"; +const FALLBACK_HEIGHT: u32 = 2476300; +const CHANNEL_BOUND: usize = 100; +const SEND_AMOUNT: u64 = 5000; + +fn main() -> Result<(), Box> { + let db_path = std::env::temp_dir().join("bdk-rpc-example"); + let db = Store::::new_from_path(DB_MAGIC.as_bytes(), db_path)?; + let external_descriptor = "wpkh(tprv8ZgxMBicQKsPdy6LMhUtFHAgpocR8GC6QmwMSFpZs7h6Eziw3SpThFfczTDh5rW2krkqffa11UpX3XkeTTB2FvzZKWXqPY54Y6Rq4AQ5R8L/84'/1'/0'/0/*)"; + let internal_descriptor = "wpkh(tprv8ZgxMBicQKsPdy6LMhUtFHAgpocR8GC6QmwMSFpZs7h6Eziw3SpThFfczTDh5rW2krkqffa11UpX3XkeTTB2FvzZKWXqPY54Y6Rq4AQ5R8L/84'/1'/0'/1/*)"; + + let mut wallet = Wallet::new( + external_descriptor, + Some(internal_descriptor), + db, + Network::Testnet, + )?; + + let address = wallet.get_address(AddressIndex::New); + println!("Generated Address: {}", address); + + let balance = wallet.get_balance(); + println!("Wallet balance before syncing: {} sats", balance.total()); + + + print!("Syncing..."); + + let client = Client::new( + "127.0.0.1:18332", + Auth::UserPass("bitcoin".to_string(), "password".to_string()), + )?; + + println!("Connected to Bitcoin Core RPC at {:?}", client.get_blockchain_info().unwrap()); + + wallet.mut_indexed_tx_graph().index.set_lookahead_for_all(20); + + let (chan, recv) = sync_channel::<(EmittedUpdate, u32)>(CHANNEL_BOUND); + let prev_cp = wallet.latest_checkpoint(); + + let join_handle = std::thread::spawn(move || -> anyhow::Result<()> { + let mut tip_height = Option::::None; + + let mut emitter = Emitter::new(&client, FALLBACK_HEIGHT, prev_cp); + loop { + let item = emitter.emit_update()?; + let is_mempool = item.is_mempool(); + + if tip_height.is_none() || is_mempool { + tip_height = Some(client.get_block_count()? as u32); + } + chan.send((item, tip_height.expect("must have tip height")))?; + + if !is_mempool { + continue; + } else { + break; + } + } + + Ok(()) + }); + + for (item, _) in recv { + let chain_update = item.chain_update(); + + let mut indexed_changeset = indexed_tx_graph::ChangeSet::default(); + + let graph_update = item.indexed_tx_graph_update(bdk_bitcoind_rpc::confirmation_time_anchor); + indexed_changeset.append(wallet.mut_indexed_tx_graph().insert_relevant_txs(graph_update)); + + let chain_changeset = match chain_update { + Some(update) => wallet.mut_local_chain().apply_update(update)?, + None => local_chain::ChangeSet::default(), + }; + + let mut wallet_changeset = WalletChangeSet::from(chain_changeset); + wallet_changeset.append(WalletChangeSet::from(indexed_changeset)); + wallet.stage(wallet_changeset); + println!("scanning ..."); + wallet.commit()?; + } + + let _ = join_handle.join().expect("failed to join chain source thread"); + + let balance = wallet.get_balance(); + println!("Wallet balance after syncing: {} sats", balance.total()); + + if balance.total() < SEND_AMOUNT { + println!( + "Please send at least {} sats to the receiving address", + SEND_AMOUNT + ); + std::process::exit(0); + } + + let faucet_address = Address::from_str("tb1qw2c3lxufxqe2x9s4rdzh65tpf4d7fssjgh8nv6")? + .require_network(Network::Testnet)?; + + let mut tx_builder = wallet.build_tx(); + tx_builder + .add_recipient(faucet_address.script_pubkey(), SEND_AMOUNT) + .enable_rbf(); + + let (mut psbt, _) = tx_builder.finish()?; + let finalized = wallet.sign(&mut psbt, SignOptions::default())?; + assert!(finalized); + + let tx = psbt.extract_tx(); + let client = Client::new( + "127.0.0.1:18332", + Auth::UserPass("bitcoin".to_string(), "password".to_string()), + )?; + client.send_raw_transaction(&tx)?; + println!("Tx broadcasted! Txid: {}", tx.txid()); + + Ok(()) }