diff --git a/Cargo.toml b/Cargo.toml index c5f2692da..8798269e8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,8 +5,10 @@ members = [ "crates/file_store", "crates/electrum", "crates/esplora", + "crates/bitcoind_rpc", "example-crates/example_cli", "example-crates/example_electrum", + "example-crates/example_rpc", "example-crates/wallet_electrum", "example-crates/wallet_esplora", "example-crates/wallet_esplora_async", diff --git a/crates/bdk/Cargo.toml b/crates/bdk/Cargo.toml index a90f1ab0f..d7d19c4bc 100644 --- a/crates/bdk/Cargo.toml +++ b/crates/bdk/Cargo.toml @@ -47,7 +47,7 @@ dev-getrandom-wasm = ["getrandom/js"] lazy_static = "1.4" env_logger = "0.7" # Move back to importing from rust-bitcoin once https://github.com/rust-bitcoin/rust-bitcoin/pull/1342 is released -base64 = "^0.13" +base64 = "^0.21" assert_matches = "1.5.0" [package.metadata.docs.rs] diff --git a/crates/bdk/src/wallet/mod.rs b/crates/bdk/src/wallet/mod.rs index f2f717d9f..39c9fa37d 100644 --- a/crates/bdk/src/wallet/mod.rs +++ b/crates/bdk/src/wallet/mod.rs @@ -23,7 +23,7 @@ pub use bdk_chain::keychain::Balance; use bdk_chain::{ indexed_tx_graph::IndexedAdditions, keychain::{KeychainTxOutIndex, LocalChangeSet, LocalUpdate}, - local_chain::{self, LocalChain, UpdateNotConnectedError}, + local_chain::{self, CannotConnectError, CheckPoint, CheckPointIter, LocalChain}, tx_graph::{CanonicalTx, TxGraph}, Append, BlockId, ChainPosition, ConfirmationTime, ConfirmationTimeAnchor, FullTxOut, IndexedTxGraph, Persist, PersistBackend, @@ -32,8 +32,8 @@ use bitcoin::consensus::encode::serialize; use bitcoin::secp256k1::Secp256k1; use bitcoin::util::psbt; use bitcoin::{ - Address, BlockHash, EcdsaSighashType, LockTime, Network, OutPoint, SchnorrSighashType, Script, - Sequence, Transaction, TxOut, Txid, Witness, + Address, EcdsaSighashType, LockTime, Network, OutPoint, SchnorrSighashType, Script, Sequence, + Transaction, TxOut, Txid, Witness, }; use core::fmt; use core::ops::Deref; @@ -245,7 +245,7 @@ impl Wallet { }; let changeset = db.load_from_persistence().map_err(NewError::Persist)?; - chain.apply_changeset(changeset.chain_changeset); + chain.apply_changeset(&changeset.chain_changeset); indexed_graph.apply_additions(changeset.indexed_additions); let persist = Persist::new(db); @@ -370,19 +370,19 @@ impl Wallet { .graph() .filter_chain_unspents( &self.chain, - self.chain.tip().unwrap_or_default(), + self.chain.tip().map(|cp| cp.block_id()).unwrap_or_default(), self.indexed_graph.index.outpoints().iter().cloned(), ) .map(|((k, i), full_txo)| new_local_utxo(k, i, full_txo)) } /// Get all the checkpoints the wallet is currently storing indexed by height. - pub fn checkpoints(&self) -> &BTreeMap { - self.chain.blocks() + pub fn checkpoints(&self) -> CheckPointIter { + self.chain.iter_checkpoints(None) } /// Returns the latest checkpoint. - pub fn latest_checkpoint(&self) -> Option { + pub fn latest_checkpoint(&self) -> Option { self.chain.tip() } @@ -420,7 +420,7 @@ impl Wallet { .graph() .filter_chain_unspents( &self.chain, - self.chain.tip().unwrap_or_default(), + self.chain.tip().map(|cp| cp.block_id()).unwrap_or_default(), core::iter::once((spk_i, op)), ) .map(|((k, i), full_txo)| new_local_utxo(k, i, full_txo)) @@ -437,7 +437,7 @@ impl Wallet { let canonical_tx = CanonicalTx { observed_as: graph.get_chain_position( &self.chain, - self.chain.tip().unwrap_or_default(), + self.chain.tip().map(|cp| cp.block_id()).unwrap_or_default(), txid, )?, node: graph.get_tx_node(txid)?, @@ -460,7 +460,7 @@ impl Wallet { pub fn insert_checkpoint( &mut self, block_id: BlockId, - ) -> Result + ) -> Result where D: PersistBackend, { @@ -500,18 +500,15 @@ impl Wallet { // anchor tx to checkpoint with lowest height that is >= position's height let anchor = self .chain - .blocks() + .checkpoints() .range(height..) .next() .ok_or(InsertTxError::ConfirmationHeightCannotBeGreaterThanTip { - tip_height: self.chain.tip().map(|b| b.height), + tip_height: self.chain.tip().map(|b| b.height()), tx_height: height, }) - .map(|(&anchor_height, &anchor_hash)| ConfirmationTimeAnchor { - anchor_block: BlockId { - height: anchor_height, - hash: anchor_hash, - }, + .map(|(&_, cp)| ConfirmationTimeAnchor { + anchor_block: cp.block_id(), confirmation_height: height, confirmation_time: time, })?; @@ -531,9 +528,10 @@ impl Wallet { pub fn transactions( &self, ) -> impl Iterator> + '_ { - self.indexed_graph - .graph() - .list_chain_txs(&self.chain, self.chain.tip().unwrap_or_default()) + self.indexed_graph.graph().list_chain_txs( + &self.chain, + self.chain.tip().map(|cp| cp.block_id()).unwrap_or_default(), + ) } /// Return the balance, separated into available, trusted-pending, untrusted-pending and immature @@ -541,7 +539,7 @@ impl Wallet { pub fn get_balance(&self) -> Balance { self.indexed_graph.graph().balance( &self.chain, - self.chain.tip().unwrap_or_default(), + self.chain.tip().map(|cp| cp.block_id()).unwrap_or_default(), self.indexed_graph.index.outpoints().iter().cloned(), |&(k, _), _| k == KeychainKind::Internal, ) @@ -715,8 +713,7 @@ impl Wallet { None => self .chain .tip() - .and_then(|cp| cp.height.into()) - .map(|height| LockTime::from_height(height).expect("Invalid height")), + .map(|cp| LockTime::from_height(cp.height()).expect("Invalid height")), h => h, }; @@ -1030,7 +1027,7 @@ impl Wallet { ) -> Result, Error> { let graph = self.indexed_graph.graph(); let txout_index = &self.indexed_graph.index; - let chain_tip = self.chain.tip().unwrap_or_default(); + let chain_tip = self.chain.tip().map(|cp| cp.block_id()).unwrap_or_default(); let mut tx = graph .get_tx(txid) @@ -1265,7 +1262,7 @@ impl Wallet { psbt: &mut psbt::PartiallySignedTransaction, sign_options: SignOptions, ) -> Result { - let chain_tip = self.chain.tip().unwrap_or_default(); + let chain_tip = self.chain.tip().map(|cp| cp.block_id()).unwrap_or_default(); let tx = &psbt.unsigned_tx; let mut finished = true; @@ -1288,7 +1285,7 @@ impl Wallet { }); let current_height = sign_options .assume_height - .or(self.chain.tip().map(|b| b.height)); + .or(self.chain.tip().map(|b| b.height())); debug!( "Input #{} - {}, using `confirmation_height` = {:?}, `current_height` = {:?}", @@ -1433,7 +1430,7 @@ impl Wallet { must_only_use_confirmed_tx: bool, current_height: Option, ) -> (Vec, Vec) { - let chain_tip = self.chain.tip().unwrap_or_default(); + let chain_tip = self.chain.tip().map(|cp| cp.block_id()).unwrap_or_default(); // must_spend <- manually selected utxos // may_spend <- all other available utxos let mut may_spend = self.get_available_utxos(); @@ -1697,24 +1694,33 @@ impl Wallet { } /// Applies an update to the wallet and stages the changes (but does not [`commit`] them). + /// Returns whether the `update` resulted in any changes. /// - /// This returns whether the `update` resulted in any changes. + /// If `prune` is set, irrelevant transactions are pruned. Relevant transactions change the UTXO + /// set of tracked script pubkeys (script pubkeys derived from tracked descriptors). /// /// Usually you create an `update` by interacting with some blockchain data source and inserting /// transactions related to your wallet into it. /// /// [`commit`]: Self::commit - pub fn apply_update(&mut self, update: Update) -> Result + pub fn apply_update(&mut self, update: Update, prune: bool) -> Result where D: PersistBackend, { - let mut changeset: ChangeSet = self.chain.apply_update(update.chain)?.into(); + let mut changeset = ChangeSet::from(self.chain.update(update.tip)?); let (_, index_additions) = self .indexed_graph .index .reveal_to_target_multi(&update.keychain); changeset.append(ChangeSet::from(IndexedAdditions::from(index_additions))); - changeset.append(self.indexed_graph.apply_update(update.graph).into()); + changeset.append( + if prune { + self.indexed_graph.prune_and_apply_update(update.graph) + } else { + self.indexed_graph.apply_update(update.graph) + } + .into(), + ); let changed = !changeset.is_empty(); self.persist.stage(changeset); diff --git a/crates/bdk/tests/wallet.rs b/crates/bdk/tests/wallet.rs index 282a74fcb..ed014f70a 100644 --- a/crates/bdk/tests/wallet.rs +++ b/crates/bdk/tests/wallet.rs @@ -44,7 +44,10 @@ fn receive_output(wallet: &mut Wallet, value: u64, height: ConfirmationTime) -> fn receive_output_in_latest_block(wallet: &mut Wallet, value: u64) -> OutPoint { let height = match wallet.latest_checkpoint() { - Some(BlockId { height, .. }) => ConfirmationTime::Confirmed { height, time: 0 }, + Some(cp) => ConfirmationTime::Confirmed { + height: cp.height(), + time: 0, + }, None => ConfirmationTime::Unconfirmed { last_seen: 0 }, }; receive_output(wallet, value, height) @@ -222,7 +225,7 @@ fn test_create_tx_fee_sniping_locktime_last_sync() { // If there's no current_height we're left with using the last sync height assert_eq!( psbt.unsigned_tx.lock_time.0, - wallet.latest_checkpoint().unwrap().height + wallet.latest_checkpoint().unwrap().height() ); } @@ -1482,7 +1485,7 @@ fn test_bump_fee_drain_wallet() { .insert_tx( tx.clone(), ConfirmationTime::Confirmed { - height: wallet.latest_checkpoint().unwrap().height, + height: wallet.latest_checkpoint().unwrap().height(), time: 42_000, }, ) diff --git a/crates/bitcoind_rpc/Cargo.toml b/crates/bitcoind_rpc/Cargo.toml new file mode 100644 index 000000000..f849be627 --- /dev/null +++ b/crates/bitcoind_rpc/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "bdk_bitcoind_rpc" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +bdk_chain = { path = "../chain", version = "0.4.0", features = ["serde", "miniscript"] } +bitcoincore-rpc = { version = "0.16" } +anyhow = { version = "1" } diff --git a/crates/bitcoind_rpc/src/lib.rs b/crates/bitcoind_rpc/src/lib.rs new file mode 100644 index 000000000..ccaf780b8 --- /dev/null +++ b/crates/bitcoind_rpc/src/lib.rs @@ -0,0 +1,240 @@ +use std::collections::HashSet; + +use bdk_chain::{ + bitcoin::{Block, Transaction, Txid}, + keychain::LocalUpdate, + local_chain::CheckPoint, + BlockId, ConfirmationHeightAnchor, ConfirmationTimeAnchor, TxGraph, +}; +pub use bitcoincore_rpc; +use bitcoincore_rpc::{bitcoincore_rpc_json::GetBlockResult, Client, RpcApi}; + +#[derive(Debug, Clone)] +pub enum BitcoindRpcItem { + Block { + cp: CheckPoint, + info: Box, + block: Box, + }, + Mempool { + cp: CheckPoint, + txs: Vec<(Transaction, u64)>, + }, +} + +pub fn confirmation_height_anchor( + info: &GetBlockResult, + _txid: Txid, + _tx_pos: usize, +) -> ConfirmationHeightAnchor { + ConfirmationHeightAnchor { + anchor_block: BlockId { + height: info.height as _, + hash: info.hash, + }, + confirmation_height: info.height as _, + } +} + +pub fn confirmation_time_anchor( + info: &GetBlockResult, + _txid: Txid, + _tx_pos: usize, +) -> ConfirmationTimeAnchor { + ConfirmationTimeAnchor { + anchor_block: BlockId { + height: info.height as _, + hash: info.hash, + }, + confirmation_height: info.height as _, + confirmation_time: info.time as _, + } +} + +impl BitcoindRpcItem { + pub fn is_mempool(&self) -> bool { + matches!(self, Self::Mempool { .. }) + } + + pub fn into_update(self, anchor: F) -> LocalUpdate + where + A: Clone + Ord + PartialOrd, + F: Fn(&GetBlockResult, Txid, usize) -> A, + { + match self { + BitcoindRpcItem::Block { cp, info, block } => LocalUpdate { + graph: { + let mut g = TxGraph::::new(block.txdata); + for (tx_pos, &txid) in info.tx.iter().enumerate() { + let _ = g.insert_anchor(txid, anchor(&info, txid, tx_pos)); + } + g + }, + ..LocalUpdate::new(cp) + }, + BitcoindRpcItem::Mempool { cp, txs } => LocalUpdate { + graph: { + let mut last_seens = Vec::<(Txid, u64)>::with_capacity(txs.len()); + let mut g = TxGraph::::new(txs.into_iter().map(|(tx, last_seen)| { + last_seens.push((tx.txid(), last_seen)); + tx + })); + for (txid, seen_at) in last_seens { + let _ = g.insert_seen_at(txid, seen_at); + } + g + }, + ..LocalUpdate::new(cp) + }, + } + } +} + +pub struct BitcoindRpcIter<'a> { + client: &'a Client, + fallback_height: u32, + + last_cp: Option, + last_info: Option, + + seen_txids: HashSet, +} + +impl<'a> Iterator for BitcoindRpcIter<'a> { + type Item = Result; + + fn next(&mut self) -> Option { + self.next_emission().transpose() + } +} + +impl<'a> BitcoindRpcIter<'a> { + pub fn new(client: &'a Client, fallback_height: u32, last_cp: Option) -> Self { + Self { + client, + fallback_height, + last_cp, + last_info: None, + seen_txids: HashSet::new(), + } + } + + fn next_emission(&mut self) -> Result, bitcoincore_rpc::Error> { + let client = self.client; + + 'main_loop: loop { + match (&mut self.last_cp, &mut self.last_info) { + (last_cp @ None, last_info @ None) => { + // get first item at fallback_height + let info = client + .get_block_info(&client.get_block_hash(self.fallback_height as _)?)?; + let block = self.client.get_block(&info.hash)?; + let cp = CheckPoint::new(BlockId { + height: info.height as _, + hash: info.hash, + }); + *last_info = Some(info.clone()); + *last_cp = Some(cp.clone()); + return Ok(Some(BitcoindRpcItem::Block { + cp, + info: Box::new(info), + block: Box::new(block), + })); + } + (last_cp @ Some(_), last_info @ None) => { + 'cp_loop: for cp in last_cp.clone().iter().flat_map(CheckPoint::iter) { + let cp_block = cp.block_id(); + + let info = client.get_block_info(&cp_block.hash)?; + if info.confirmations < 0 { + // block is not in the main chain + continue 'cp_loop; + } + + // agreement + *last_cp = Some(cp); + *last_info = Some(info); + continue 'main_loop; + } + + // no point of agreement found + // next loop will emit block @ fallback height + *last_cp = None; + *last_info = None; + } + (Some(last_cp), last_info @ Some(_)) => { + // find next block + match last_info.as_ref().unwrap().nextblockhash { + Some(next_hash) => { + let info = self.client.get_block_info(&next_hash)?; + + if info.confirmations < 0 { + *last_info = None; + continue 'main_loop; + } + + let block = self.client.get_block(&info.hash)?; + let cp = last_cp + .clone() + .extend(BlockId { + height: info.height as _, + hash: info.hash, + }) + .expect("must extend from checkpoint"); + + *last_cp = cp.clone(); + *last_info = Some(info.clone()); + + return Ok(Some(BitcoindRpcItem::Block { + cp, + info: Box::new(info), + block: Box::new(block), + })); + } + None => { + // emit from mempool! + let mempool_txs = client + .get_raw_mempool()? + .into_iter() + .filter(|&txid| self.seen_txids.insert(txid)) + .map( + |txid| -> Result<(Transaction, u64), bitcoincore_rpc::Error> { + let first_seen = client + .get_mempool_entry(&txid) + .map(|entry| entry.time)?; + let tx = client.get_raw_transaction(&txid, None)?; + Ok((tx, first_seen)) + }, + ) + .collect::, _>>()?; + + // remove last info... + *last_info = None; + + return Ok(Some(BitcoindRpcItem::Mempool { + txs: mempool_txs, + cp: last_cp.clone(), + })); + } + } + } + (None, Some(info)) => unreachable!("got info with no checkpoint? info={:#?}", info), + } + } + } +} + +pub trait BitcoindRpcErrorExt { + fn is_not_found_error(&self) -> bool; +} + +impl BitcoindRpcErrorExt for bitcoincore_rpc::Error { + fn is_not_found_error(&self) -> bool { + if let bitcoincore_rpc::Error::JsonRpc(bitcoincore_rpc::jsonrpc::Error::Rpc(rpc_err)) = self + { + rpc_err.code == -5 + } else { + false + } + } +} diff --git a/crates/chain/Cargo.toml b/crates/chain/Cargo.toml index 0d8123ca7..332dbae32 100644 --- a/crates/chain/Cargo.toml +++ b/crates/chain/Cargo.toml @@ -13,12 +13,13 @@ readme = "README.md" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +# For no-std, remember to enable the bitcoin/no-std feature bitcoin = { version = "0.29", default-features = false } serde_crate = { package = "serde", version = "1", optional = true, features = ["derive"] } # Use hashbrown as a feature flag to have HashSet and HashMap from it. # note version 0.13 breaks outs MSRV. -hashbrown = { version = "0.12", optional = true, features = ["serde"] } +hashbrown = { version = "0.11", optional = true } miniscript = { version = "9.0.0", optional = true, default-features = false } [dev-dependencies] @@ -26,5 +27,5 @@ rand = "0.8" [features] default = ["std"] -std = ["bitcoin/std"] -serde = ["serde_crate", "bitcoin/serde" ] +std = ["bitcoin/std", "miniscript/std"] +serde = ["serde_crate", "bitcoin/serde", "hashbrown/serde"] diff --git a/crates/chain/src/indexed_tx_graph.rs b/crates/chain/src/indexed_tx_graph.rs index 730b04340..25f193275 100644 --- a/crates/chain/src/indexed_tx_graph.rs +++ b/crates/chain/src/indexed_tx_graph.rs @@ -90,6 +90,38 @@ where } } + /// Apply `update`, but filters out irrelevant transactions. + /// + /// Relevancy is determined by the [`Indexer::is_tx_relevant`] implementation of `I`. + pub fn prune_and_apply_update( + &mut self, + update: TxGraph, + ) -> IndexedAdditions { + let mut additions = IndexedAdditions::::default(); + + // index all transactions first + for tx_node in update.full_txs() { + additions + .index_additions + .append(self.index.index_tx(&tx_node)); + } + + let update = update + .full_txs() + .filter(|tx_node| self.index.is_tx_relevant(tx_node)) + .fold(TxGraph::default(), |mut g, tx_node| -> TxGraph { + let _ = g.insert_tx(tx_node.tx.clone()); + for anchor in tx_node.anchors { + let _ = g.insert_anchor(tx_node.txid, anchor.clone()); + } + let _ = g.insert_seen_at(tx_node.txid, tx_node.last_seen_unconfirmed); + g + }); + + additions.append(self.apply_update(update)); + additions + } + /// Insert a floating `txout` of given `outpoint`. pub fn insert_txout( &mut self, @@ -146,14 +178,12 @@ where // 2. decide whether to insert them into the graph depending on whether `is_tx_relevant` // returns true or not. (in a second loop). let mut additions = IndexedAdditions::::default(); - let mut transactions = Vec::new(); - for (tx, anchors) in txs.into_iter() { - additions.index_additions.append(self.index.index_tx(tx)); - transactions.push((tx, anchors)); - } + let txs = txs + .into_iter() + .inspect(|(tx, _)| additions.index_additions.append(self.index.index_tx(tx))) + .collect::>(); additions.append( - transactions - .into_iter() + txs.into_iter() .filter_map(|(tx, anchors)| match self.index.is_tx_relevant(tx) { true => Some(self.insert_tx(tx, anchors, seen_at)), false => None, diff --git a/crates/chain/src/keychain.rs b/crates/chain/src/keychain.rs index f9b2436f2..d83868890 100644 --- a/crates/chain/src/keychain.rs +++ b/crates/chain/src/keychain.rs @@ -13,7 +13,7 @@ use crate::{ collections::BTreeMap, indexed_tx_graph::IndexedAdditions, - local_chain::{self, LocalChain}, + local_chain::{self, CheckPoint}, tx_graph::TxGraph, Anchor, Append, }; @@ -89,8 +89,9 @@ impl AsRef> for DerivationAdditions { } } -/// A structure to update [`KeychainTxOutIndex`], [`TxGraph`] and [`LocalChain`] -/// atomically. +/// A structure to update [`KeychainTxOutIndex`], [`TxGraph`] and [`LocalChain`] atomically. +/// +/// [`LocalChain`]: local_chain::LocalChain #[derive(Debug, Clone, PartialEq)] pub struct LocalUpdate { /// Last active derivation index per keychain (`K`). @@ -98,15 +99,18 @@ pub struct LocalUpdate { /// Update for the [`TxGraph`]. pub graph: TxGraph, /// Update for the [`LocalChain`]. - pub chain: LocalChain, + /// + /// [`LocalChain`]: local_chain::LocalChain + pub tip: CheckPoint, } -impl Default for LocalUpdate { - fn default() -> Self { +impl LocalUpdate { + /// Construct a [`LocalUpdate`] with a given [`CheckPoint`] tip. + pub fn new(tip: CheckPoint) -> Self { Self { - keychain: Default::default(), - graph: Default::default(), - chain: Default::default(), + keychain: BTreeMap::new(), + graph: TxGraph::default(), + tip, } } } @@ -126,6 +130,8 @@ impl Default for LocalUpdate { )] pub struct LocalChangeSet { /// Changes to the [`LocalChain`]. + /// + /// [`LocalChain`]: local_chain::LocalChain pub chain_changeset: local_chain::ChangeSet, /// Additions to [`IndexedTxGraph`]. diff --git a/crates/chain/src/local_chain.rs b/crates/chain/src/local_chain.rs index fe97e3f27..d4d55dae0 100644 --- a/crates/chain/src/local_chain.rs +++ b/crates/chain/src/local_chain.rs @@ -2,15 +2,124 @@ use core::convert::Infallible; -use alloc::collections::BTreeMap; +use crate::collections::BTreeMap; +use crate::{BlockId, ChainOracle}; +use alloc::sync::Arc; use bitcoin::BlockHash; -use crate::{BlockId, ChainOracle}; +/// A structure that represents changes to [`LocalChain`]. +pub type ChangeSet = BTreeMap>; + +/// Represents a block of [`LocalChain`]. +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] +pub struct CheckPoint(Arc); + +/// The internal contents of [`CheckPoint`]. +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] +struct CPInner { + /// Block id (hash and height). + block: BlockId, + /// Previous checkpoint (if any). + prev: Option>, +} + +impl CheckPoint { + /// Construct a [`CheckPoint`] from a [`BlockId`]. + pub fn new(block: BlockId) -> Self { + Self(Arc::new(CPInner { block, prev: None })) + } + + /// Extends [`CheckPoint`] with `block` and returns the new checkpoint tip. + /// + /// Returns an `Err` of the initial checkpoint + pub fn extend(self, block: BlockId) -> Result { + if self.height() < block.height { + Ok(Self(Arc::new(CPInner { + block, + prev: Some(self.0), + }))) + } else { + Err(self) + } + } + + /// Get the [`BlockId`] of the checkpoint. + pub fn block_id(&self) -> BlockId { + self.0.block + } + + /// Get the height of the checkpoint. + pub fn height(&self) -> u32 { + self.0.block.height + } + + /// Get the block hash of the checkpoint. + pub fn hash(&self) -> BlockHash { + self.0.block.hash + } + + /// Detach this checkpoint from the previous. + pub fn detach(self) -> Self { + Self(Arc::new(CPInner { + block: self.0.block, + prev: None, + })) + } + + /// Get previous checkpoint. + pub fn prev(&self) -> Option { + self.0.prev.clone().map(CheckPoint) + } + + /// Iterate + pub fn iter(&self) -> CheckPointIter { + CheckPointIter { + current: Some(Arc::clone(&self.0)), + } + } +} + +/// A structure that iterates over checkpoints backwards. +pub struct CheckPointIter { + current: Option>, +} + +impl Iterator for CheckPointIter { + type Item = CheckPoint; + + fn next(&mut self) -> Option { + let current = self.current.clone()?; + self.current = current.prev.clone(); + Some(CheckPoint(current)) + } +} /// This is a local implementation of [`ChainOracle`]. #[derive(Debug, Default, Clone, PartialEq, Eq, PartialOrd, Ord)] pub struct LocalChain { - blocks: BTreeMap, + checkpoints: BTreeMap, +} + +impl From for BTreeMap { + fn from(value: LocalChain) -> Self { + value + .checkpoints + .values() + .map(|cp| (cp.height(), cp.hash())) + .collect() + } +} + +impl From for LocalChain { + fn from(value: ChangeSet) -> Self { + Self::from_changeset(value) + } +} + +impl From> for LocalChain { + fn from(value: BTreeMap) -> Self { + Self::from_blocks(value) + } } impl ChainOracle for LocalChain { @@ -19,18 +128,18 @@ impl ChainOracle for LocalChain { fn is_block_in_chain( &self, block: BlockId, - static_block: BlockId, + chain_tip: BlockId, ) -> Result, Self::Error> { - if block.height > static_block.height { + if block.height > chain_tip.height { return Ok(None); } Ok( match ( - self.blocks.get(&block.height), - self.blocks.get(&static_block.height), + self.checkpoints.get(&block.height), + self.checkpoints.get(&chain_tip.height), ) { - (Some(&hash), Some(&static_hash)) => { - Some(hash == block.hash && static_hash == static_block.hash) + (Some(cp), Some(tip_cp)) => { + Some(cp.hash() == block.hash && tip_cp.hash() == chain_tip.hash) } _ => None, }, @@ -38,196 +147,272 @@ impl ChainOracle for LocalChain { } fn get_chain_tip(&self) -> Result, Self::Error> { - Ok(self.tip()) + Ok(self.checkpoints.values().last().map(CheckPoint::block_id)) } } -impl AsRef> for LocalChain { - fn as_ref(&self) -> &BTreeMap { - &self.blocks - } -} - -impl From for BTreeMap { - fn from(value: LocalChain) -> Self { - value.blocks +impl LocalChain { + /// Construct a [`LocalChain`] from an initial `changeset`. + pub fn from_changeset(changeset: ChangeSet) -> Self { + let mut chain = Self::default(); + chain.apply_changeset(&changeset); + chain } -} -impl From> for LocalChain { - fn from(value: BTreeMap) -> Self { - Self { blocks: value } + /// Construct a [`LocalChain`] from a given `checkpoint` tip. + pub fn from_checkpoint(checkpoint: CheckPoint) -> Self { + Self { + checkpoints: checkpoint.iter().map(|cp| (cp.height(), cp)).collect(), + } } -} -impl LocalChain { - /// Contruct a [`LocalChain`] from a list of [`BlockId`]s. - pub fn from_blocks(blocks: B) -> Self - where - B: IntoIterator, - { + /// Constructs a [`LocalChain`] from a [`BTreeMap`] of height to [`BlockHash`]. + /// + /// The [`BTreeMap`] enforces the height order. However, the caller must ensure the blocks are + /// all of the same chain. + pub fn from_blocks(blocks: BTreeMap) -> Self { Self { - blocks: blocks.into_iter().map(|b| (b.height, b.hash)).collect(), + checkpoints: blocks + .into_iter() + .map({ + let mut prev = Option::::None; + move |(height, hash)| { + let cp = match prev.clone() { + Some(prev) => { + prev.extend(BlockId { height, hash }).expect("must extend") + } + None => CheckPoint::new(BlockId { height, hash }), + }; + prev = Some(cp.clone()); + (height, cp) + } + }) + .collect(), } } - /// Get a reference to a map of block height to hash. - pub fn blocks(&self) -> &BTreeMap { - &self.blocks + /// Get the highest checkpoint. + pub fn tip(&self) -> Option { + self.checkpoints.values().last().cloned() } - /// Get the chain tip. - pub fn tip(&self) -> Option { - self.blocks - .iter() - .last() - .map(|(&height, &hash)| BlockId { height, hash }) + /// Returns whether the [`LocalChain`] is empty (has no checkpoints). + pub fn is_empty(&self) -> bool { + self.checkpoints.is_empty() } - /// This is like the sparsechain's logic, expect we must guarantee that all invalidated heights - /// are to be re-filled. - pub fn determine_changeset(&self, update: &Self) -> Result { - let update = update.as_ref(); - let update_tip = match update.keys().last().cloned() { - Some(tip) => tip, - None => return Ok(ChangeSet::default()), - }; - - // this is the latest height where both the update and local chain has the same block hash - let agreement_height = update - .iter() - .rev() - .find(|&(u_height, u_hash)| self.blocks.get(u_height) == Some(u_hash)) - .map(|(&height, _)| height); + /// Updates [`Self`] with the given `new_tip`. + /// + /// The method returns [`ChangeSet`] on success. This represents the applied changes to + /// [`Self`]. + /// + /// To update, the `new_tip` must *connect* with `self`. If `self` and `new_tip` has a mutual + /// checkpoint (same height and hash), it can connect if: + /// * The mutual checkpoint is the tip of `self`. + /// * An ancestor of `new_tip` has a height which is of the checkpoint one higher than the + /// mutual checkpoint from `self`. + /// + /// Additionally: + /// * If `self` is empty, `new_tip` will always connect. + /// * If `self` only has one checkpoint, `new_tip` must have an ancestor checkpoint with the + /// same height as it. + /// + /// To invalidate from a given checkpoint, `new_tip` must contain an ancestor checkpoint with + /// the same height but different hash. + /// + /// # Errors + /// + /// An error will occur if the update does not correctly connect with `self`. + /// + /// Refer to [module-level documentation] for more. + /// + /// [module-level documentation]: crate::local_chain + pub fn update(&mut self, new_tip: CheckPoint) -> Result { + let mut updated_cps = BTreeMap::::new(); + let mut agreement_height = Option::::None; + let mut agreement_ptr_matches = false; + + for cp in new_tip.iter() { + let block = cp.block_id(); + + match self.checkpoints.get(&block.height) { + Some(original_cp) if original_cp.block_id() == block => { + let ptr_matches = Arc::as_ptr(&original_cp.0) == Arc::as_ptr(&cp.0); + + // only record the first agreement height + if agreement_height.is_none() && original_cp.block_id() == block { + agreement_height = Some(block.height); + agreement_ptr_matches = ptr_matches; + } + + // break if the internal pointers of the checkpoints are the same + if ptr_matches { + break; + } + } + // only insert into `updated_cps` if cp is actually updated (original cp is `None`, + // or block ids do not match) + _ => { + updated_cps.insert(block.height, cp.clone()); + } + } + } - // the lower bound of the range to invalidate + // Lower bound of the range to invalidate in `self`. let invalidate_lb = match agreement_height { - Some(height) if height == update_tip => u32::MAX, + // if there is no agreement, we invalidate all of the original chain + None => u32::MIN, + // if the agreement is at the update's tip, we don't need to invalidate + Some(height) if height == new_tip.height() => u32::MAX, Some(height) => height + 1, - None => 0, }; - // the first block's height to invalidate in the local chain - let invalidate_from_height = self.blocks.range(invalidate_lb..).next().map(|(&h, _)| h); - - // the first block of height to invalidate (if any) should be represented in the update - if let Some(first_invalid_height) = invalidate_from_height { - if !update.contains_key(&first_invalid_height) { - return Err(UpdateNotConnectedError(first_invalid_height)); - } - } - - let mut changeset: BTreeMap> = match invalidate_from_height { - Some(first_invalid_height) => { - // the first block of height to invalidate should be represented in the update - if !update.contains_key(&first_invalid_height) { - return Err(UpdateNotConnectedError(first_invalid_height)); + let changeset = { + // Construct initial changeset of heights to invalidate in `self`. + let mut changeset = self + .checkpoints + .range(invalidate_lb..) + .map(|(&height, _)| (height, None)) + .collect::(); + + // The height of the first block to invalidate (if any) must be represented in the `update`. + if let Some(first_invalidated_height) = changeset.keys().next() { + if !updated_cps.contains_key(first_invalidated_height) { + return Err(CannotConnectError { + try_include: self + .checkpoints + .get(first_invalidated_height) + .expect("checkpoint already exists") + .block_id(), + }); } - self.blocks - .range(first_invalid_height..) - .map(|(height, _)| (*height, None)) - .collect() } - None => BTreeMap::new(), + + changeset.extend( + updated_cps + .iter() + .map(|(height, cp)| (*height, Some(cp.hash()))), + ); + changeset }; - for (height, update_hash) in update { - let original_hash = self.blocks.get(height); - if Some(update_hash) != original_hash { - changeset.insert(*height, Some(*update_hash)); + + // apply update if `update_cps` is non-empty + if let Some(&start_height) = updated_cps.keys().next() { + self.checkpoints.split_off(&invalidate_lb); + self.checkpoints.append(&mut updated_cps); + + // we never need to fix links if either: + // 1. the original chain is empty + // 2. the pointers match at the first point of agreement (where the block ids are equal) + if !(self.is_empty() || agreement_ptr_matches) { + self.fix_links(start_height); } } Ok(changeset) } - /// Applies the given `changeset`. - pub fn apply_changeset(&mut self, changeset: ChangeSet) { - for (height, blockhash) in changeset { - match blockhash { - Some(blockhash) => self.blocks.insert(height, blockhash), - None => self.blocks.remove(&height), - }; + /// Apply the given `changeset`. + pub fn apply_changeset(&mut self, changeset: &ChangeSet) { + if let Some(start_height) = changeset.keys().next().cloned() { + for (&height, &hash) in changeset { + match hash { + Some(hash) => self + .checkpoints + .insert(height, CheckPoint::new(BlockId { height, hash })), + None => self.checkpoints.remove(&height), + }; + } + self.fix_links(start_height); } } - /// Updates [`LocalChain`] with an update [`LocalChain`]. + /// Insert a [`BlockId`]. /// - /// This is equivalent to calling [`determine_changeset`] and [`apply_changeset`] in sequence. + /// # Errors /// - /// [`determine_changeset`]: Self::determine_changeset - /// [`apply_changeset`]: Self::apply_changeset - pub fn apply_update(&mut self, update: Self) -> Result { - let changeset = self.determine_changeset(&update)?; - self.apply_changeset(changeset.clone()); - Ok(changeset) + /// Replacing the block hash of an existing checkpoint will result in an error. + pub fn insert_block(&mut self, block_id: BlockId) -> Result { + use crate::collections::btree_map::Entry; + + match self.checkpoints.entry(block_id.height) { + Entry::Vacant(entry) => { + entry.insert(CheckPoint::new(block_id)); + self.fix_links(block_id.height); + Ok(core::iter::once((block_id.height, Some(block_id.hash))).collect()) + } + Entry::Occupied(entry) => { + let cp = entry.get(); + if cp.block_id() == block_id { + Ok(ChangeSet::default()) + } else { + Err(InsertBlockError { + height: block_id.height, + original_hash: cp.hash(), + update_hash: block_id.hash, + }) + } + } + } } - /// Derives a [`ChangeSet`] that assumes that there are no preceding changesets. - /// - /// The changeset returned will record additions of all blocks included in [`Self`]. - pub fn initial_changeset(&self) -> ChangeSet { - self.blocks - .iter() - .map(|(&height, &hash)| (height, Some(hash))) - .collect() - } + fn fix_links(&mut self, start_height: u32) { + let mut prev = self + .checkpoints + .range(..start_height) + .last() + .map(|(_, cp)| cp.clone()); - /// Insert a block of [`BlockId`] into the [`LocalChain`]. - /// - /// # Error - /// - /// If the insertion height already contains a block, and the block has a different blockhash, - /// this will result in an [`InsertBlockNotMatchingError`]. - pub fn insert_block( - &mut self, - block_id: BlockId, - ) -> Result { - let mut update = Self::from_blocks(self.tip()); - - if let Some(original_hash) = update.blocks.insert(block_id.height, block_id.hash) { - if original_hash != block_id.hash { - return Err(InsertBlockNotMatchingError { - height: block_id.height, - original_hash, - update_hash: block_id.hash, + for (_, cp) in self.checkpoints.range_mut(start_height..) { + if cp.0.prev.as_ref().map(Arc::as_ptr) != prev.as_ref().map(|cp| Arc::as_ptr(&cp.0)) { + cp.0 = Arc::new(CPInner { + block: cp.block_id(), + prev: prev.clone().map(|cp| cp.0), }); } + prev = Some(cp.clone()); } + } - Ok(self.apply_update(update).expect("should always connect")) + /// Derives an initial [`ChangeSet`], meaning that it can be applied to an empty chain to + /// recover the current chain. + pub fn initial_changeset(&self) -> ChangeSet { + self.iter_checkpoints(None) + .map(|cp| (cp.height(), Some(cp.hash()))) + .collect() } -} -/// This is the return value of [`determine_changeset`] and represents changes to [`LocalChain`]. -/// -/// [`determine_changeset`]: LocalChain::determine_changeset -pub type ChangeSet = BTreeMap>; + /// Get checkpoint of `height` (if any). + pub fn checkpoint(&self, height: u32) -> Option { + self.checkpoints.get(&height).cloned() + } -/// Represents an update failure of [`LocalChain`] due to the update not connecting to the original -/// chain. -/// -/// The update cannot be applied to the chain because the chain suffix it represents did not -/// connect to the existing chain. This error case contains the checkpoint height to include so -/// that the chains can connect. -#[derive(Clone, Debug, PartialEq)] -pub struct UpdateNotConnectedError(pub u32); + /// Iterate over checkpoints in decending height order. + /// + /// `height_upper_bound` is inclusive. A value of `None` means there is no bound, so all + /// checkpoints will be traversed. + pub fn iter_checkpoints(&self, height_upper_bound: Option) -> CheckPointIter { + CheckPointIter { + current: match height_upper_bound { + Some(height) => self + .checkpoints + .range(..=height) + .last() + .map(|(_, cp)| cp.0.clone()), + None => self.checkpoints.values().last().map(|cp| cp.0.clone()), + }, + } + } -impl core::fmt::Display for UpdateNotConnectedError { - fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { - write!( - f, - "the update cannot connect with the chain, try include block at height {}", - self.0 - ) + /// Get a reference to the internal checkpoint map. + pub fn checkpoints(&self) -> &BTreeMap { + &self.checkpoints } } -#[cfg(feature = "std")] -impl std::error::Error for UpdateNotConnectedError {} - /// Represents a failure when trying to insert a checkpoint into [`LocalChain`]. #[derive(Clone, Debug, PartialEq)] -pub struct InsertBlockNotMatchingError { +pub struct InsertBlockError { /// The checkpoints' height. pub height: u32, /// Original checkpoint's block hash. @@ -236,7 +421,7 @@ pub struct InsertBlockNotMatchingError { pub update_hash: BlockHash, } -impl core::fmt::Display for InsertBlockNotMatchingError { +impl core::fmt::Display for InsertBlockError { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { write!( f, @@ -247,4 +432,24 @@ impl core::fmt::Display for InsertBlockNotMatchingError { } #[cfg(feature = "std")] -impl std::error::Error for InsertBlockNotMatchingError {} +impl std::error::Error for InsertBlockError {} + +/// Occurs when an update does not have a common checkpoint with the original chain. +#[derive(Clone, Debug, PartialEq)] +pub struct CannotConnectError { + /// The suggested checkpoint to include to connect the two chains. + pub try_include: BlockId, +} + +impl core::fmt::Display for CannotConnectError { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + write!( + f, + "introduced chain cannot connect with the original chain, try include {}:{}", + self.try_include.height, self.try_include.hash, + ) + } +} + +#[cfg(feature = "std")] +impl std::error::Error for CannotConnectError {} diff --git a/crates/chain/tests/common/mod.rs b/crates/chain/tests/common/mod.rs index 7d7288bdf..cd799c03a 100644 --- a/crates/chain/tests/common/mod.rs +++ b/crates/chain/tests/common/mod.rs @@ -9,25 +9,17 @@ macro_rules! h { macro_rules! local_chain { [ $(($height:expr, $block_hash:expr)), * ] => {{ #[allow(unused_mut)] - bdk_chain::local_chain::LocalChain::from_blocks([$(($height, $block_hash).into()),*]) + bdk_chain::local_chain::LocalChain::from_blocks([$(($height, $block_hash).into()),*].into_iter().collect()) }}; } #[allow(unused_macros)] -macro_rules! chain { - ($([$($tt:tt)*]),*) => { chain!( checkpoints: [$([$($tt)*]),*] ) }; - (checkpoints: $($tail:tt)*) => { chain!( index: TxHeight, checkpoints: $($tail)*) }; - (index: $ind:ty, checkpoints: [ $([$height:expr, $block_hash:expr]),* ] $(,txids: [$(($txid:expr, $tx_height:expr)),*])?) => {{ +macro_rules! chain_update { + [ $(($height:expr, $hash:expr)), * ] => {{ #[allow(unused_mut)] - let mut chain = bdk_chain::sparse_chain::SparseChain::<$ind>::from_checkpoints([$(($height, $block_hash).into()),*]); - - $( - $( - let _ = chain.insert_tx($txid, $tx_height).expect("should succeed"); - )* - )? - - chain + bdk_chain::local_chain::LocalChain::from_blocks([$(($height, $hash).into()),*].into_iter().collect()) + .tip() + .expect("must have tip") }}; } diff --git a/crates/chain/tests/test_indexed_tx_graph.rs b/crates/chain/tests/test_indexed_tx_graph.rs index 2ebd913c2..3319b2594 100644 --- a/crates/chain/tests/test_indexed_tx_graph.rs +++ b/crates/chain/tests/test_indexed_tx_graph.rs @@ -8,7 +8,7 @@ use bdk_chain::{ keychain::{Balance, DerivationAdditions, KeychainTxOutIndex}, local_chain::LocalChain, tx_graph::Additions, - BlockId, ChainPosition, ConfirmationHeightAnchor, + ChainPosition, ConfirmationHeightAnchor, }; use bitcoin::{secp256k1::Secp256k1, BlockHash, OutPoint, Script, Transaction, TxIn, TxOut}; use miniscript::Descriptor; @@ -109,8 +109,8 @@ fn test_list_owned_txouts() { // Create Local chains let local_chain = (0..150) - .map(|i| (i as u32, h!("random"))) - .collect::>(); + .map(|i| (i as u32, Some(h!("random")))) + .collect::>>(); let local_chain = LocalChain::from(local_chain); // Initiate IndexedTxGraph @@ -212,9 +212,8 @@ fn test_list_owned_txouts() { ( *tx, local_chain - .blocks() - .get(&height) - .map(|&hash| BlockId { height, hash }) + .checkpoint(height) + .map(|cp| cp.block_id()) .map(|anchor_block| ConfirmationHeightAnchor { anchor_block, confirmation_height: anchor_block.height, @@ -231,9 +230,8 @@ fn test_list_owned_txouts() { |height: u32, graph: &IndexedTxGraph>| { let chain_tip = local_chain - .blocks() - .get(&height) - .map(|&hash| BlockId { height, hash }) + .checkpoint(height) + .map(|cp| cp.block_id()) .expect("block must exist"); let txouts = graph .graph() diff --git a/crates/chain/tests/test_local_chain.rs b/crates/chain/tests/test_local_chain.rs index 55d8af113..4d6697841 100644 --- a/crates/chain/tests/test_local_chain.rs +++ b/crates/chain/tests/test_local_chain.rs @@ -1,172 +1,244 @@ -use bdk_chain::local_chain::{ - ChangeSet, InsertBlockNotMatchingError, LocalChain, UpdateNotConnectedError, +use bdk_chain::{ + local_chain::{CannotConnectError, ChangeSet, CheckPoint, InsertBlockError, LocalChain}, + BlockId, }; use bitcoin::BlockHash; #[macro_use] mod common; -#[test] -fn add_first_tip() { - let chain = LocalChain::default(); - assert_eq!( - chain.determine_changeset(&local_chain![(0, h!("A"))]), - Ok([(0, Some(h!("A")))].into()), - "add first tip" - ); -} - -#[test] -fn add_second_tip() { - let chain = local_chain![(0, h!("A"))]; - assert_eq!( - chain.determine_changeset(&local_chain![(0, h!("A")), (1, h!("B"))]), - Ok([(1, Some(h!("B")))].into()) - ); -} - -#[test] -fn two_disjoint_chains_cannot_merge() { - let chain1 = local_chain![(0, h!("A"))]; - let chain2 = local_chain![(1, h!("B"))]; - assert_eq!( - chain1.determine_changeset(&chain2), - Err(UpdateNotConnectedError(0)) - ); -} - -#[test] -fn duplicate_chains_should_merge() { - let chain1 = local_chain![(0, h!("A"))]; - let chain2 = local_chain![(0, h!("A"))]; - assert_eq!(chain1.determine_changeset(&chain2), Ok(Default::default())); -} - -#[test] -fn can_introduce_older_checkpoints() { - let chain1 = local_chain![(2, h!("C")), (3, h!("D"))]; - let chain2 = local_chain![(1, h!("B")), (2, h!("C"))]; - - assert_eq!( - chain1.determine_changeset(&chain2), - Ok([(1, Some(h!("B")))].into()) - ); -} - -#[test] -fn fix_blockhash_before_agreement_point() { - let chain1 = local_chain![(0, h!("im-wrong")), (1, h!("we-agree"))]; - let chain2 = local_chain![(0, h!("fix")), (1, h!("we-agree"))]; - - assert_eq!( - chain1.determine_changeset(&chain2), - Ok([(0, Some(h!("fix")))].into()) - ) -} - -/// B and C are in both chain and update -/// ``` -/// | 0 | 1 | 2 | 3 | 4 -/// chain | B C -/// update | A B C D -/// ``` -/// This should succeed with the point of agreement being C and A should be added in addition. -#[test] -fn two_points_of_agreement() { - let chain1 = local_chain![(1, h!("B")), (2, h!("C"))]; - let chain2 = local_chain![(0, h!("A")), (1, h!("B")), (2, h!("C")), (3, h!("D"))]; - - assert_eq!( - chain1.determine_changeset(&chain2), - Ok([(0, Some(h!("A"))), (3, Some(h!("D")))].into()), - ); +#[derive(Debug)] +struct TestLocalChain<'a> { + name: &'static str, + chain: LocalChain, + new_tip: CheckPoint, + exp: ExpectedResult<'a>, } -/// Update and chain does not connect: -/// ``` -/// | 0 | 1 | 2 | 3 | 4 -/// chain | B C -/// update | A B D -/// ``` -/// This should fail as we cannot figure out whether C & D are on the same chain -#[test] -fn update_and_chain_does_not_connect() { - let chain1 = local_chain![(1, h!("B")), (2, h!("C"))]; - let chain2 = local_chain![(0, h!("A")), (1, h!("B")), (3, h!("D"))]; - - assert_eq!( - chain1.determine_changeset(&chain2), - Err(UpdateNotConnectedError(2)), - ); +#[derive(Debug, PartialEq)] +enum ExpectedResult<'a> { + Ok { + changeset: &'a [(u32, Option)], + init_changeset: &'a [(u32, Option)], + }, + Err(CannotConnectError), } -/// Transient invalidation: -/// ``` -/// | 0 | 1 | 2 | 3 | 4 | 5 -/// chain | A B C E -/// update | A B' C' D -/// ``` -/// This should succeed and invalidate B,C and E with point of agreement being A. -#[test] -fn transitive_invalidation_applies_to_checkpoints_higher_than_invalidation() { - let chain1 = local_chain![(0, h!("A")), (2, h!("B")), (3, h!("C")), (5, h!("E"))]; - let chain2 = local_chain![(0, h!("A")), (2, h!("B'")), (3, h!("C'")), (4, h!("D"))]; - - assert_eq!( - chain1.determine_changeset(&chain2), - Ok([ - (2, Some(h!("B'"))), - (3, Some(h!("C'"))), - (4, Some(h!("D"))), - (5, None), - ] - .into()) - ); -} +impl<'a> TestLocalChain<'a> { + fn run(mut self) { + let got_changeset = match self.chain.update(self.new_tip) { + Ok(changeset) => changeset, + Err(err) => { + assert_eq!(ExpectedResult::Err(err), self.exp); + return; + } + }; -/// Transient invalidation: -/// ``` -/// | 0 | 1 | 2 | 3 | 4 -/// chain | B C E -/// update | B' C' D -/// ``` -/// -/// This should succeed and invalidate B, C and E with no point of agreement -#[test] -fn transitive_invalidation_applies_to_checkpoints_higher_than_invalidation_no_point_of_agreement() { - let chain1 = local_chain![(1, h!("B")), (2, h!("C")), (4, h!("E"))]; - let chain2 = local_chain![(1, h!("B'")), (2, h!("C'")), (3, h!("D"))]; - - assert_eq!( - chain1.determine_changeset(&chain2), - Ok([ - (1, Some(h!("B'"))), - (2, Some(h!("C'"))), - (3, Some(h!("D"))), - (4, None) - ] - .into()) - ) + match self.exp { + ExpectedResult::Ok { + changeset, + init_changeset, + } => { + assert_eq!( + got_changeset, + changeset.iter().cloned().collect(), + "{}: unexpected changeset", + self.name + ); + assert_eq!( + self.chain.initial_changeset(), + init_changeset.iter().cloned().collect(), + "{}: unexpected initial changeset", + self.name + ); + } + ExpectedResult::Err(err) => panic!( + "expected error ({}), got non-error result: {:?}", + err, got_changeset + ), + } + } } -/// Transient invalidation: -/// ``` -/// | 0 | 1 | 2 | 3 | 4 -/// chain | A B C E -/// update | B' C' D -/// ``` -/// -/// This should fail since although it tells us that B and C are invalid it doesn't tell us whether -/// A was invalid. #[test] -fn invalidation_but_no_connection() { - let chain1 = local_chain![(0, h!("A")), (1, h!("B")), (2, h!("C")), (4, h!("E"))]; - let chain2 = local_chain![(1, h!("B'")), (2, h!("C'")), (3, h!("D"))]; - - assert_eq!( - chain1.determine_changeset(&chain2), - Err(UpdateNotConnectedError(0)) - ) +fn update() { + [ + TestLocalChain { + name: "add first tip", + chain: local_chain![], + new_tip: chain_update![(0, h!("A"))], + exp: ExpectedResult::Ok { + changeset: &[(0, Some(h!("A")))], + init_changeset: &[(0, Some(h!("A")))], + }, + }, + TestLocalChain { + name: "add second tip", + chain: local_chain![(0, h!("A"))], + new_tip: chain_update![(0, h!("A")), (1, h!("B"))], + exp: ExpectedResult::Ok { + changeset: &[(1, Some(h!("B")))], + init_changeset: &[(0, Some(h!("A"))), (1, Some(h!("B")))], + }, + }, + TestLocalChain { + name: "two disjoint chains cannot merge", + chain: local_chain![(0, h!("A"))], + new_tip: chain_update![(1, h!("B"))], + exp: ExpectedResult::Err(CannotConnectError { + try_include: BlockId { + height: 0, + hash: h!("A"), + }, + }), + }, + TestLocalChain { + name: "duplicate chains should merge", + chain: local_chain![(0, h!("A"))], + new_tip: chain_update![(0, h!("A"))], + exp: ExpectedResult::Ok { + changeset: &[], + init_changeset: &[(0, Some(h!("A")))], + }, + }, + TestLocalChain { + name: "can introduce older checkpoints", + chain: local_chain![(2, h!("C")), (3, h!("D"))], + new_tip: chain_update![(1, h!("B")), (2, h!("C"))], + exp: ExpectedResult::Ok { + changeset: &[(1, Some(h!("B")))], + init_changeset: &[(1, Some(h!("B"))), (2, Some(h!("C"))), (3, Some(h!("D")))], + }, + }, + TestLocalChain { + name: "fix blockhash before agreement point", + chain: local_chain![(0, h!("im-wrong")), (1, h!("we-agree"))], + new_tip: chain_update![(0, h!("fix")), (1, h!("we-agree"))], + exp: ExpectedResult::Ok { + changeset: &[(0, Some(h!("fix")))], + init_changeset: &[(0, Some(h!("fix"))), (1, Some(h!("we-agree")))], + }, + }, + // B and C are in both chain and update + // | 0 | 1 | 2 | 3 | 4 + // chain | B C + // update | A B C D + // This should succeed with the point of agreement being C and A should be added in addition. + TestLocalChain { + name: "two points of agreement", + chain: local_chain![(1, h!("B")), (2, h!("C"))], + new_tip: chain_update![(0, h!("A")), (1, h!("B")), (2, h!("C")), (3, h!("D"))], + exp: ExpectedResult::Ok { + changeset: &[(0, Some(h!("A"))), (3, Some(h!("D")))], + init_changeset: &[ + (0, Some(h!("A"))), + (1, Some(h!("B"))), + (2, Some(h!("C"))), + (3, Some(h!("D"))), + ], + }, + }, + // Update and chain does not connect: + // | 0 | 1 | 2 | 3 | 4 + // chain | B C + // update | A B D + // This should fail as we cannot figure out whether C & D are on the same chain + TestLocalChain { + name: "update and chain does not connect", + chain: local_chain![(1, h!("B")), (2, h!("C"))], + new_tip: chain_update![(0, h!("A")), (1, h!("B")), (3, h!("D"))], + exp: ExpectedResult::Err(CannotConnectError { + try_include: BlockId { + height: 2, + hash: h!("C"), + }, + }), + }, + // Transient invalidation: + // | 0 | 1 | 2 | 3 | 4 | 5 + // chain | A B C E + // update | A B' C' D + // This should succeed and invalidate B,C and E with point of agreement being A. + TestLocalChain { + name: "transitive invalidation applies to checkpoints higher than invalidation", + chain: local_chain![(0, h!("A")), (2, h!("B")), (3, h!("C")), (5, h!("E"))], + new_tip: chain_update![(0, h!("A")), (2, h!("B'")), (3, h!("C'")), (4, h!("D"))], + exp: ExpectedResult::Ok { + changeset: &[ + (2, Some(h!("B'"))), + (3, Some(h!("C'"))), + (4, Some(h!("D"))), + (5, None), + ], + init_changeset: &[ + (0, Some(h!("A"))), + (2, Some(h!("B'"))), + (3, Some(h!("C'"))), + (4, Some(h!("D"))), + ], + }, + }, + // Transient invalidation: + // | 0 | 1 | 2 | 3 | 4 + // chain | B C E + // update | B' C' D + // This should succeed and invalidate B, C and E with no point of agreement + TestLocalChain { + name: "transitive invalidation applies to checkpoints higher than invalidation no point of agreement", + chain: local_chain![(1, h!("B")), (2, h!("C")), (4, h!("E"))], + new_tip: chain_update![(1, h!("B'")), (2, h!("C'")), (3, h!("D"))], + exp: ExpectedResult::Ok { + changeset: &[ + (1, Some(h!("B'"))), + (2, Some(h!("C'"))), + (3, Some(h!("D"))), + (4, None) + ], + init_changeset: &[ + (1, Some(h!("B'"))), + (2, Some(h!("C'"))), + (3, Some(h!("D"))), + ], + }, + }, + // Transient invalidation: + // | 0 | 1 | 2 | 3 | 4 + // chain | A B C E + // update | B' C' D + // This should fail since although it tells us that B and C are invalid it doesn't tell us whether + // A was invalid. + TestLocalChain { + name: "invalidation but no connection", + chain: local_chain![(0, h!("A")), (1, h!("B")), (2, h!("C")), (4, h!("E"))], + new_tip: chain_update![(1, h!("B'")), (2, h!("C'")), (3, h!("D"))], + exp: ExpectedResult::Err(CannotConnectError { try_include: BlockId { height: 0, hash: h!("A") } }), + }, + // Introduce blocks between two points of agreement + // | 0 | 1 | 2 | 3 | 4 | 5 + // chain | A B D E + // update | A C E F + TestLocalChain { + name: "introduce blocks between two points of agreement", + chain: local_chain![(0, h!("A")), (1, h!("B")), (3, h!("D")), (4, h!("E"))], + new_tip: chain_update![(0, h!("A")), (2, h!("C")), (4, h!("E")), (5, h!("F"))], + exp: ExpectedResult::Ok { + changeset: &[ + (2, Some(h!("C"))), + (5, Some(h!("F"))), + ], + init_changeset: &[ + (0, Some(h!("A"))), + (1, Some(h!("B"))), + (2, Some(h!("C"))), + (3, Some(h!("D"))), + (4, Some(h!("E"))), + (5, Some(h!("F"))), + ], + }, + } + ] + .into_iter() + .for_each(TestLocalChain::run); } #[test] @@ -174,7 +246,7 @@ fn insert_block() { struct TestCase { original: LocalChain, insert: (u32, BlockHash), - expected_result: Result, + expected_result: Result, expected_final: LocalChain, } @@ -206,7 +278,7 @@ fn insert_block() { TestCase { original: local_chain![(2, h!("K"))], insert: (2, h!("J")), - expected_result: Err(InsertBlockNotMatchingError { + expected_result: Err(InsertBlockError { height: 2, original_hash: h!("K"), update_hash: h!("J"), diff --git a/crates/chain/tests/test_tx_graph.rs b/crates/chain/tests/test_tx_graph.rs index c272f97aa..bbffdaf31 100644 --- a/crates/chain/tests/test_tx_graph.rs +++ b/crates/chain/tests/test_tx_graph.rs @@ -697,7 +697,7 @@ fn test_chain_spends() { let _ = graph.insert_anchor( tx.txid(), ConfirmationHeightAnchor { - anchor_block: tip, + anchor_block: tip.block_id(), confirmation_height: *ht, }, ); @@ -705,10 +705,10 @@ fn test_chain_spends() { // Assert that confirmed spends are returned correctly. assert_eq!( - graph.get_chain_spend(&local_chain, tip, OutPoint::new(tx_0.txid(), 0)), + graph.get_chain_spend(&local_chain, tip.block_id(), OutPoint::new(tx_0.txid(), 0)), Some(( ChainPosition::Confirmed(&ConfirmationHeightAnchor { - anchor_block: tip, + anchor_block: tip.block_id(), confirmation_height: 98 }), tx_1.txid(), @@ -717,17 +717,17 @@ fn test_chain_spends() { // Check if chain position is returned correctly. assert_eq!( - graph.get_chain_position(&local_chain, tip, tx_0.txid()), + graph.get_chain_position(&local_chain, tip.block_id(), tx_0.txid()), // Some(ObservedAs::Confirmed(&local_chain.get_block(95).expect("block expected"))), Some(ChainPosition::Confirmed(&ConfirmationHeightAnchor { - anchor_block: tip, + anchor_block: tip.block_id(), confirmation_height: 95 })) ); // Even if unconfirmed tx has a last_seen of 0, it can still be part of a chain spend. assert_eq!( - graph.get_chain_spend(&local_chain, tip, OutPoint::new(tx_0.txid(), 1)), + graph.get_chain_spend(&local_chain, tip.block_id(), OutPoint::new(tx_0.txid(), 1)), Some((ChainPosition::Unconfirmed(0), tx_2.txid())), ); @@ -737,7 +737,7 @@ fn test_chain_spends() { // Check chain spend returned correctly. assert_eq!( graph - .get_chain_spend(&local_chain, tip, OutPoint::new(tx_0.txid(), 1)) + .get_chain_spend(&local_chain, tip.block_id(), OutPoint::new(tx_0.txid(), 1)) .unwrap(), (ChainPosition::Unconfirmed(1234567), tx_2.txid()) ); @@ -754,7 +754,7 @@ fn test_chain_spends() { // Because this tx conflicts with an already confirmed transaction, chain position should return none. assert!(graph - .get_chain_position(&local_chain, tip, tx_1_conflict.txid()) + .get_chain_position(&local_chain, tip.block_id(), tx_1_conflict.txid()) .is_none()); // Another conflicting tx that conflicts with tx_2. @@ -773,7 +773,7 @@ fn test_chain_spends() { // This should return a valid observation with correct last seen. assert_eq!( graph - .get_chain_position(&local_chain, tip, tx_2_conflict.txid()) + .get_chain_position(&local_chain, tip.block_id(), tx_2_conflict.txid()) .expect("position expected"), ChainPosition::Unconfirmed(1234568) ); @@ -781,14 +781,14 @@ fn test_chain_spends() { // Chain_spend now catches the new transaction as the spend. assert_eq!( graph - .get_chain_spend(&local_chain, tip, OutPoint::new(tx_0.txid(), 1)) + .get_chain_spend(&local_chain, tip.block_id(), OutPoint::new(tx_0.txid(), 1)) .expect("expect observation"), (ChainPosition::Unconfirmed(1234568), tx_2_conflict.txid()) ); // Chain position of the `tx_2` is now none, as it is older than `tx_2_conflict` assert!(graph - .get_chain_position(&local_chain, tip, tx_2.txid()) + .get_chain_position(&local_chain, tip.block_id(), tx_2.txid()) .is_none()); } diff --git a/crates/electrum/src/electrum_ext.rs b/crates/electrum/src/electrum_ext.rs index 1ec44d85c..5cd832c4e 100644 --- a/crates/electrum/src/electrum_ext.rs +++ b/crates/electrum/src/electrum_ext.rs @@ -1,11 +1,11 @@ use bdk_chain::{ bitcoin::{hashes::hex::FromHex, BlockHash, OutPoint, Script, Transaction, Txid}, keychain::LocalUpdate, - local_chain::LocalChain, + local_chain::CheckPoint, tx_graph::{self, TxGraph}, Anchor, BlockId, ConfirmationHeightAnchor, ConfirmationTimeAnchor, }; -use electrum_client::{Client, ElectrumApi, Error}; +use electrum_client::{Client, ElectrumApi, Error, HeaderNotification}; use std::{ collections::{BTreeMap, BTreeSet, HashMap, HashSet}, fmt::Debug, @@ -14,21 +14,19 @@ use std::{ #[derive(Debug, Clone)] pub struct ElectrumUpdate { pub graph_update: HashMap>, - pub chain_update: LocalChain, + pub chain_update: CheckPoint, pub keychain_update: BTreeMap, } -impl Default for ElectrumUpdate { - fn default() -> Self { +impl ElectrumUpdate { + pub fn new(cp: CheckPoint) -> Self { Self { - graph_update: Default::default(), - chain_update: Default::default(), - keychain_update: Default::default(), + graph_update: HashMap::new(), + chain_update: cp, + keychain_update: BTreeMap::new(), } } -} -impl ElectrumUpdate { pub fn missing_full_txs(&self, graph: &TxGraph) -> Vec { self.graph_update .keys() @@ -56,7 +54,7 @@ impl ElectrumUpdate { Ok(LocalUpdate { keychain: self.keychain_update, graph: graph_update, - chain: self.chain_update, + tip: self.chain_update, }) } } @@ -128,7 +126,7 @@ impl ElectrumUpdate { graph.apply_additions(graph_additions); graph }, - chain: update.chain, + tip: update.tip, }) } } @@ -138,7 +136,7 @@ pub trait ElectrumExt { fn scan( &self, - local_chain: &BTreeMap, + prev_tip: Option, keychain_spks: BTreeMap>, txids: impl IntoIterator, outpoints: impl IntoIterator, @@ -148,7 +146,7 @@ pub trait ElectrumExt { fn scan_without_keychain( &self, - local_chain: &BTreeMap, + prev_tip: Option, misc_spks: impl IntoIterator, txids: impl IntoIterator, outpoints: impl IntoIterator, @@ -160,7 +158,7 @@ pub trait ElectrumExt { .map(|(i, spk)| (i as u32, spk)); self.scan( - local_chain, + prev_tip, [((), spk_iter)].into(), txids, outpoints, @@ -179,7 +177,7 @@ impl ElectrumExt for Client { fn scan( &self, - local_chain: &BTreeMap, + prev_tip: Option, keychain_spks: BTreeMap>, txids: impl IntoIterator, outpoints: impl IntoIterator, @@ -196,14 +194,10 @@ impl ElectrumExt for Client { let outpoints = outpoints.into_iter().collect::>(); let update = loop { - let mut update = ElectrumUpdate:: { - chain_update: prepare_chain_update(self, local_chain)?, - ..Default::default() - }; - let anchor_block = update - .chain_update - .tip() - .expect("must have atleast one block"); + let mut update = ElectrumUpdate::::new( + prepare_chain_update(self, prev_tip.clone())?, + ); + let anchor_block = update.chain_update.block_id(); if !request_spks.is_empty() { if !scanned_spks.is_empty() { @@ -268,42 +262,76 @@ impl ElectrumExt for Client { } } -/// Prepare an update "template" based on the checkpoints of the `local_chain`. +/// Return a [`CheckPoint`] of the latest tip, that connects with the previous tip. fn prepare_chain_update( client: &Client, - local_chain: &BTreeMap, -) -> Result { - let mut update = LocalChain::default(); - - // Find the local chain block that is still there so our update can connect to the local chain. - for (&existing_height, &existing_hash) in local_chain.iter().rev() { - // TODO: a batch request may be safer, as a reorg that happens when we are obtaining - // `block_header`s will result in inconsistencies - let current_hash = client.block_header(existing_height as usize)?.block_hash(); - let _ = update - .insert_block(BlockId { - height: existing_height, - hash: current_hash, - }) - .expect("This never errors because we are working with a fresh chain"); - - if current_hash == existing_hash { - break; + prev_tip: Option, +) -> Result { + let HeaderNotification { height, mut header } = client.block_headers_subscribe()?; + let mut height = height as u32; + + let (new_blocks, mut last_cp) = 'retry: loop { + // this records new blocks, including blocks that are to be replaced + let mut new_blocks = core::iter::once((height as _, header.block_hash())) + .chain( + height + .checked_sub(1) + .map(|h| (h as _, header.prev_blockhash)), + ) + .collect::>(); + + let mut agreement_cp = Option::::None; + + for cp in prev_tip.iter().flat_map(CheckPoint::iter) { + let cp_block = cp.block_id(); + // TODO: a batch request may be safer, as a reorg that happens when we are obtaining + // `block_header`s will result in inconsistencies + let hash = client.block_header(cp_block.height as _)?.block_hash(); + if hash == cp_block.hash { + agreement_cp = Some(cp); + break; + } + new_blocks.insert(cp_block.height, hash); } - } - // Insert the new tip so new transactions will be accepted into the sparsechain. - let tip = { - let (height, hash) = crate::get_tip(client)?; - BlockId { height, hash } + // check for tip changes + loop { + match client.block_headers_pop()? { + Some(new_notification) => { + let old_height = height; + height = new_notification.height as u32; + header = new_notification.header; + if height <= old_height { + // we may have a reorg + // reorg-detection logic can be improved (false positives are possible) + continue 'retry; + } + } + None => { + let new_blocks = match &agreement_cp { + // `new_blocks` should only include blocks that are actually new + Some(agreement_cp) => new_blocks.split_off(&(agreement_cp.height() + 1)), + None => new_blocks, + }; + + break 'retry (new_blocks, agreement_cp); + } + }; + } }; - if update.insert_block(tip).is_err() { - // There has been a re-org before we even begin scanning addresses. - // Just recursively call (this should never happen). - return prepare_chain_update(client, local_chain); + + // construct checkpoints + for (height, hash) in new_blocks { + let cp = match last_cp.clone() { + Some(last_cp) => last_cp + .extend(BlockId { height, hash }) + .expect("must extend checkpoint"), + None => CheckPoint::new(BlockId { height, hash }), + }; + last_cp = Some(cp); } - Ok(update) + Ok(last_cp.expect("must have atleast one checkpoint")) } fn determine_tx_anchor( diff --git a/crates/electrum/src/lib.rs b/crates/electrum/src/lib.rs index 4826c6dda..ec693fda9 100644 --- a/crates/electrum/src/lib.rs +++ b/crates/electrum/src/lib.rs @@ -15,21 +15,12 @@ //! //! Refer to [`bdk_electrum_example`] for a complete example. //! -//! [`ElectrumClient::scan`]: ElectrumClient::scan +//! [`ElectrumClient::scan`]: electrum_client::ElectrumClient::scan //! [`missing_full_txs`]: ElectrumUpdate::missing_full_txs -//! [`batch_transaction_get`]: ElectrumApi::batch_transaction_get +//! [`batch_transaction_get`]: electrum_client::ElectrumApi::batch_transaction_get //! [`bdk_electrum_example`]: https://github.com/LLFourn/bdk_core_staging/tree/master/bdk_electrum_example -use bdk_chain::bitcoin::BlockHash; -use electrum_client::{Client, ElectrumApi, Error}; mod electrum_ext; pub use bdk_chain; pub use electrum_client; pub use electrum_ext::*; - -fn get_tip(client: &Client) -> Result<(u32, BlockHash), Error> { - // TODO: unsubscribe when added to the client, or is there a better call to use here? - client - .block_headers_subscribe() - .map(|data| (data.height as u32, data.header.block_hash())) -} diff --git a/crates/esplora/src/async_ext.rs b/crates/esplora/src/async_ext.rs index e496e415c..0d07b1520 100644 --- a/crates/esplora/src/async_ext.rs +++ b/crates/esplora/src/async_ext.rs @@ -3,13 +3,12 @@ use bdk_chain::{ bitcoin::{BlockHash, OutPoint, Script, Txid}, collections::BTreeMap, keychain::LocalUpdate, - BlockId, ConfirmationTimeAnchor, + local_chain::CheckPoint, + BlockId, ConfirmationTimeAnchor, TxGraph, }; use esplora_client::{Error, OutputStatus, TxStatus}; use futures::{stream::FuturesOrdered, TryStreamExt}; -use crate::map_confirmation_time_anchor; - /// Trait to extend [`esplora_client::AsyncClient`] functionality. /// /// This is the async version of [`EsploraExt`]. Refer to @@ -35,7 +34,7 @@ pub trait EsploraAsyncExt { #[allow(clippy::result_large_err)] // FIXME async fn scan( &self, - local_chain: &BTreeMap, + prev_tip: Option, keychain_spks: BTreeMap< K, impl IntoIterator + Send> + Send, @@ -52,14 +51,14 @@ pub trait EsploraAsyncExt { #[allow(clippy::result_large_err)] // FIXME async fn scan_without_keychain( &self, - local_chain: &BTreeMap, + prev_tip: Option, misc_spks: impl IntoIterator + Send> + Send, txids: impl IntoIterator + Send> + Send, outpoints: impl IntoIterator + Send> + Send, parallel_requests: usize, ) -> Result, Error> { self.scan( - local_chain, + prev_tip, [( (), misc_spks @@ -83,7 +82,7 @@ impl EsploraAsyncExt for esplora_client::AsyncClient { #[allow(clippy::result_large_err)] // FIXME async fn scan( &self, - local_chain: &BTreeMap, + prev_tip: Option, keychain_spks: BTreeMap< K, impl IntoIterator + Send> + Send, @@ -95,32 +94,9 @@ impl EsploraAsyncExt for esplora_client::AsyncClient { ) -> Result, Error> { let parallel_requests = Ord::max(parallel_requests, 1); - let (mut update, tip_at_start) = loop { - let mut update = LocalUpdate::::default(); - - for (&height, &original_hash) in local_chain.iter().rev() { - let update_block_id = BlockId { - height, - hash: self.get_block_hash(height).await?, - }; - let _ = update - .chain - .insert_block(update_block_id) - .expect("cannot repeat height here"); - if update_block_id.hash == original_hash { - break; - } - } - - let tip_at_start = BlockId { - height: self.get_height().await?, - hash: self.get_tip_hash().await?, - }; - - if update.chain.insert_block(tip_at_start).is_ok() { - break (update, tip_at_start); - } - }; + let (tip, _) = construct_update_tip(self, prev_tip).await?; + let mut make_anchor = crate::confirmation_time_anchor_maker(&tip); + let mut update = LocalUpdate::::new(tip); for (keychain, spks) in keychain_spks { let mut spks = spks.into_iter(); @@ -172,7 +148,7 @@ impl EsploraAsyncExt for esplora_client::AsyncClient { empty_scripts = 0; } for tx in related_txs { - let anchor = map_confirmation_time_anchor(&tx.status, tip_at_start); + let anchor = make_anchor(&tx.status); let _ = update.graph.insert_tx(tx.to_tx()); if let Some(anchor) = anchor { @@ -202,7 +178,7 @@ impl EsploraAsyncExt for esplora_client::AsyncClient { } match self.get_tx_status(&txid).await? { tx_status if tx_status.confirmed => { - if let Some(anchor) = map_confirmation_time_anchor(&tx_status, tip_at_start) { + if let Some(anchor) = make_anchor(&tx_status) { let _ = update.graph.insert_anchor(txid, anchor); } } @@ -236,7 +212,7 @@ impl EsploraAsyncExt for esplora_client::AsyncClient { for (tx, status) in op_txs { let txid = tx.txid(); - let anchor = map_confirmation_time_anchor(&status, tip_at_start); + let anchor = make_anchor(&status); let _ = update.graph.insert_tx(tx); if let Some(anchor) = anchor { @@ -245,25 +221,106 @@ impl EsploraAsyncExt for esplora_client::AsyncClient { } } - if tip_at_start.hash != self.get_block_hash(tip_at_start.height).await? { - // A reorg occurred, so let's find out where all the txids we found are now in the chain - let txids_found = update - .graph - .full_txs() - .map(|tx_node| tx_node.txid) - .collect::>(); - update.chain = EsploraAsyncExt::scan_without_keychain( - self, - local_chain, - [], - txids_found, - [], - parallel_requests, - ) - .await? - .chain; + // If a reorg occured during the update, anchors may be wrong. We handle this by scrapping + // all anchors, reconstructing checkpoints and reconstructing anchors. + while self.get_block_hash(update.tip.height()).await? != update.tip.hash() { + let (new_tip, _) = construct_update_tip(self, Some(update.tip.clone())).await?; + make_anchor = crate::confirmation_time_anchor_maker(&new_tip); + + // Reconstruct graph with only transactions (no anchors). + update.graph = TxGraph::new(update.graph.full_txs().map(|n| n.tx.clone())); + update.tip = new_tip; + + // Re-fetch anchors. + let anchors = { + let mut a = Vec::new(); + for n in update.graph.full_txs() { + let status = self.get_tx_status(&n.txid).await?; + if !status.confirmed { + continue; + } + if let Some(anchor) = make_anchor(&status) { + a.push((n.txid, anchor)); + } + } + a + }; + for (txid, anchor) in anchors { + let _ = update.graph.insert_anchor(txid, anchor); + } } Ok(update) } } + +/// Constructs a new checkpoint tip that can "connect" to our previous checkpoint history. We return +/// the new checkpoint tip alongside the height of agreement between the two histories (if any). +#[allow(clippy::result_large_err)] +async fn construct_update_tip( + client: &esplora_client::AsyncClient, + prev_tip: Option, +) -> Result<(CheckPoint, Option), Error> { + let new_tip_height = client.get_height().await?; + + // If esplora returns a tip height that is lower than our previous tip, then checkpoints do not + // need updating. We just return the previous tip and use that as the point of agreement. + if let Some(prev_tip) = prev_tip.as_ref() { + if new_tip_height < prev_tip.height() { + return Ok((prev_tip.clone(), Some(prev_tip.height()))); + } + } + + // Grab latest blocks from esplora atomically first. We assume that deeper blocks cannot be + // reorged. This ensures that our checkpoint history is consistent. + let mut new_blocks = client + .get_blocks(Some(new_tip_height)) + .await? + .into_iter() + .zip((0..new_tip_height).rev()) + .map(|(b, height)| (height, b.id)) + .collect::>(); + + let mut agreement_cp = Option::::None; + + for cp in prev_tip.iter().flat_map(CheckPoint::iter) { + let cp_block = cp.block_id(); + + // We check esplora blocks cached in `new_blocks` first, keeping the checkpoint history + // consistent even during reorgs. + let hash = match new_blocks.get(&cp_block.height) { + Some(&hash) => hash, + None => { + assert!( + new_tip_height >= cp_block.height, + "already checked that esplora's tip cannot be smaller" + ); + let hash = client.get_block_hash(cp_block.height).await?; + new_blocks.insert(cp_block.height, hash); + hash + } + }; + + if hash == cp_block.hash { + agreement_cp = Some(cp); + break; + } + } + + let agreement_height = agreement_cp.as_ref().map(CheckPoint::height); + + let new_tip = new_blocks + .into_iter() + // Prune `new_blocks` to only include blocks that are actually new. + .filter(|(height, _)| Some(*height) > agreement_height) + .map(|(height, hash)| BlockId { height, hash }) + .fold(agreement_cp, |prev_cp, block| { + Some(match prev_cp { + Some(cp) => cp.extend(block).expect("must extend cp"), + None => CheckPoint::new(block), + }) + }) + .expect("must have at least one checkpoint"); + + Ok((new_tip, agreement_height)) +} diff --git a/crates/esplora/src/blocking_ext.rs b/crates/esplora/src/blocking_ext.rs index 6e1c61993..456c2c6ca 100644 --- a/crates/esplora/src/blocking_ext.rs +++ b/crates/esplora/src/blocking_ext.rs @@ -1,11 +1,10 @@ use bdk_chain::bitcoin::{BlockHash, OutPoint, Script, Txid}; use bdk_chain::collections::BTreeMap; -use bdk_chain::BlockId; +use bdk_chain::local_chain::CheckPoint; use bdk_chain::{keychain::LocalUpdate, ConfirmationTimeAnchor}; +use bdk_chain::{BlockId, TxGraph}; use esplora_client::{Error, OutputStatus, TxStatus}; -use crate::map_confirmation_time_anchor; - /// Trait to extend [`esplora_client::BlockingClient`] functionality. /// /// Refer to [crate-level documentation] for more. @@ -27,7 +26,7 @@ pub trait EsploraExt { #[allow(clippy::result_large_err)] // FIXME fn scan( &self, - local_chain: &BTreeMap, + prev_tip: Option, keychain_spks: BTreeMap>, txids: impl IntoIterator, outpoints: impl IntoIterator, @@ -41,14 +40,14 @@ pub trait EsploraExt { #[allow(clippy::result_large_err)] // FIXME fn scan_without_keychain( &self, - local_chain: &BTreeMap, + prev_tip: Option, misc_spks: impl IntoIterator, txids: impl IntoIterator, outpoints: impl IntoIterator, parallel_requests: usize, ) -> Result, Error> { self.scan( - local_chain, + prev_tip, [( (), misc_spks @@ -68,7 +67,7 @@ pub trait EsploraExt { impl EsploraExt for esplora_client::BlockingClient { fn scan( &self, - local_chain: &BTreeMap, + prev_tip: Option, keychain_spks: BTreeMap>, txids: impl IntoIterator, outpoints: impl IntoIterator, @@ -77,32 +76,9 @@ impl EsploraExt for esplora_client::BlockingClient { ) -> Result, Error> { let parallel_requests = Ord::max(parallel_requests, 1); - let (mut update, tip_at_start) = loop { - let mut update = LocalUpdate::::default(); - - for (&height, &original_hash) in local_chain.iter().rev() { - let update_block_id = BlockId { - height, - hash: self.get_block_hash(height)?, - }; - let _ = update - .chain - .insert_block(update_block_id) - .expect("cannot repeat height here"); - if update_block_id.hash == original_hash { - break; - } - } - - let tip_at_start = BlockId { - height: self.get_height()?, - hash: self.get_tip_hash()?, - }; - - if update.chain.insert_block(tip_at_start).is_ok() { - break (update, tip_at_start); - } - }; + let (tip, _) = construct_update_tip(self, prev_tip)?; + let mut make_anchor = crate::confirmation_time_anchor_maker(&tip); + let mut update = LocalUpdate::::new(tip); for (keychain, spks) in keychain_spks { let mut spks = spks.into_iter(); @@ -155,8 +131,7 @@ impl EsploraExt for esplora_client::BlockingClient { empty_scripts = 0; } for tx in related_txs { - let anchor = map_confirmation_time_anchor(&tx.status, tip_at_start); - + let anchor = make_anchor(&tx.status); let _ = update.graph.insert_tx(tx.to_tx()); if let Some(anchor) = anchor { let _ = update.graph.insert_anchor(tx.txid, anchor); @@ -184,10 +159,8 @@ impl EsploraExt for esplora_client::BlockingClient { } } match self.get_tx_status(&txid)? { - tx_status @ TxStatus { - confirmed: true, .. - } => { - if let Some(anchor) = map_confirmation_time_anchor(&tx_status, tip_at_start) { + tx_status if tx_status.confirmed => { + if let Some(anchor) = make_anchor(&tx_status) { let _ = update.graph.insert_anchor(txid, anchor); } } @@ -219,7 +192,7 @@ impl EsploraExt for esplora_client::BlockingClient { for (tx, status) in op_txs { let txid = tx.txid(); - let anchor = map_confirmation_time_anchor(&status, tip_at_start); + let anchor = make_anchor(&status); let _ = update.graph.insert_tx(tx); if let Some(anchor) = anchor { @@ -228,24 +201,101 @@ impl EsploraExt for esplora_client::BlockingClient { } } - if tip_at_start.hash != self.get_block_hash(tip_at_start.height)? { - // A reorg occurred, so let's find out where all the txids we found are now in the chain - let txids_found = update + // If a reorg occured during the update, anchors may be wrong. We handle this by scrapping + // all anchors, reconstructing checkpoints and reconstructing anchors. + while self.get_block_hash(update.tip.height())? != update.tip.hash() { + let (new_tip, _) = construct_update_tip(self, Some(update.tip.clone()))?; + make_anchor = crate::confirmation_time_anchor_maker(&new_tip); + + // Reconstruct graph with only transactions (no anchors). + update.graph = TxGraph::new(update.graph.full_txs().map(|n| n.tx.clone())); + update.tip = new_tip; + + // Re-fetch anchors. + let anchors = update .graph .full_txs() - .map(|tx_node| tx_node.txid) - .collect::>(); - update.chain = EsploraExt::scan_without_keychain( - self, - local_chain, - [], - txids_found, - [], - parallel_requests, - )? - .chain; + .filter_map(|n| match self.get_tx_status(&n.txid) { + Err(err) => Some(Err(err)), + Ok(status) if status.confirmed => make_anchor(&status).map(|a| Ok((n.txid, a))), + _ => None, + }) + .collect::, _>>()?; + for (txid, anchor) in anchors { + let _ = update.graph.insert_anchor(txid, anchor); + } } Ok(update) } } + +/// Constructs a new checkpoint tip that can "connect" to our previous checkpoint history. We return +/// the new checkpoint tip alongside the height of agreement between the two histories (if any). +#[allow(clippy::result_large_err)] +fn construct_update_tip( + client: &esplora_client::BlockingClient, + prev_tip: Option, +) -> Result<(CheckPoint, Option), Error> { + let new_tip_height = client.get_height()?; + + // If esplora returns a tip height that is lower than our previous tip, then checkpoints do not + // need updating. We just return the previous tip and use that as the point of agreement. + if let Some(prev_tip) = prev_tip.as_ref() { + if new_tip_height < prev_tip.height() { + return Ok((prev_tip.clone(), Some(prev_tip.height()))); + } + } + + // Grab latest blocks from esplora atomically first. We assume that deeper blocks cannot be + // reorged. This ensures that our checkpoint history is consistent. + let mut new_blocks = client + .get_blocks(Some(new_tip_height))? + .into_iter() + .zip((0..new_tip_height).rev()) + .map(|(b, height)| (height, b.id)) + .collect::>(); + + let mut agreement_cp = Option::::None; + + for cp in prev_tip.iter().flat_map(CheckPoint::iter) { + let cp_block = cp.block_id(); + + // We check esplora blocks cached in `new_blocks` first, keeping the checkpoint history + // consistent even during reorgs. + let hash = match new_blocks.get(&cp_block.height) { + Some(&hash) => hash, + None => { + assert!( + new_tip_height >= cp_block.height, + "already checked that esplora's tip cannot be smaller" + ); + let hash = client.get_block_hash(cp_block.height)?; + new_blocks.insert(cp_block.height, hash); + hash + } + }; + + if hash == cp_block.hash { + agreement_cp = Some(cp); + break; + } + } + + let agreement_height = agreement_cp.as_ref().map(CheckPoint::height); + + let new_tip = new_blocks + .into_iter() + // Prune `new_blocks` to only include blocks that are actually new. + .filter(|(height, _)| Some(*height) > agreement_height) + .map(|(height, hash)| BlockId { height, hash }) + .fold(agreement_cp, |prev_cp, block| { + Some(match prev_cp { + Some(cp) => cp.extend(block).expect("must extend cp"), + None => CheckPoint::new(block), + }) + }) + .expect("must have at least one checkpoint"); + + Ok((new_tip, agreement_height)) +} diff --git a/crates/esplora/src/lib.rs b/crates/esplora/src/lib.rs index d5f8d8af6..07ccdab8f 100644 --- a/crates/esplora/src/lib.rs +++ b/crates/esplora/src/lib.rs @@ -1,5 +1,7 @@ #![doc = include_str!("../README.md")] -use bdk_chain::{BlockId, ConfirmationTimeAnchor}; +use std::collections::BTreeMap; + +use bdk_chain::{local_chain::CheckPoint, ConfirmationTimeAnchor}; use esplora_client::TxStatus; pub use esplora_client; @@ -14,16 +16,25 @@ mod async_ext; #[cfg(feature = "async")] pub use async_ext::*; -pub(crate) fn map_confirmation_time_anchor( - tx_status: &TxStatus, - tip_at_start: BlockId, -) -> Option { - match (tx_status.block_time, tx_status.block_height) { - (Some(confirmation_time), Some(confirmation_height)) => Some(ConfirmationTimeAnchor { - anchor_block: tip_at_start, - confirmation_height, - confirmation_time, - }), +pub(crate) fn confirmation_time_anchor_maker( + tip: &CheckPoint, +) -> impl FnMut(&TxStatus) -> Option { + let cache = tip + .iter() + .take(10) + .map(|cp| (cp.height(), cp)) + .collect::>(); + + move |status| match (status.block_time, status.block_height) { + (Some(confirmation_time), Some(confirmation_height)) => { + let (_, anchor_cp) = cache.range(confirmation_height..).next()?; + + Some(ConfirmationTimeAnchor { + anchor_block: anchor_cp.block_id(), + confirmation_height, + confirmation_time, + }) + } _ => None, } } diff --git a/example-crates/example_electrum/src/main.rs b/example-crates/example_electrum/src/main.rs index 41d394234..89a54b7ef 100644 --- a/example-crates/example_electrum/src/main.rs +++ b/example-crates/example_electrum/src/main.rs @@ -5,7 +5,7 @@ use std::{ }; use bdk_chain::{ - bitcoin::{Address, BlockHash, Network, OutPoint, Txid}, + bitcoin::{Address, Network, OutPoint, Txid}, indexed_tx_graph::{IndexedAdditions, IndexedTxGraph}, keychain::LocalChangeSet, local_chain::LocalChain, @@ -22,8 +22,7 @@ use example_cli::{ }; const DB_MAGIC: &[u8] = b"bdk_example_electrum"; -const DB_PATH: &str = ".bdk_electrum_example.db"; -const ASSUME_FINAL_DEPTH: usize = 10; +const DB_PATH: &str = ".bdk_example_electrum.db"; #[derive(Subcommand, Debug, Clone)] enum ElectrumCommands { @@ -73,11 +72,7 @@ fn main() -> anyhow::Result<()> { graph }); - let chain = Mutex::new({ - let mut chain = LocalChain::default(); - chain.apply_changeset(init_changeset.chain_changeset); - chain - }); + let chain = Mutex::new(LocalChain::from_changeset(init_changeset.chain_changeset)); let electrum_url = match args.network { Network::Bitcoin => "ssl://electrum.blockstream.info:50002", @@ -119,7 +114,7 @@ fn main() -> anyhow::Result<()> { stop_gap, scan_options, } => { - let (keychain_spks, local_chain) = { + let (keychain_spks, tip) = { let graph = &*graph.lock().unwrap(); let chain = &*chain.lock().unwrap(); @@ -142,20 +137,13 @@ fn main() -> anyhow::Result<()> { }) .collect::>(); - let c = chain - .blocks() - .iter() - .rev() - .take(ASSUME_FINAL_DEPTH) - .map(|(k, v)| (*k, *v)) - .collect::>(); - - (keychain_spks, c) + let tip = chain.tip(); + (keychain_spks, tip) }; client .scan( - &local_chain, + tip, keychain_spks, core::iter::empty(), core::iter::empty(), @@ -174,7 +162,7 @@ fn main() -> anyhow::Result<()> { // Get a short lock on the tracker to get the spks we're interested in let graph = graph.lock().unwrap(); let chain = chain.lock().unwrap(); - let chain_tip = chain.tip().unwrap_or_default(); + let chain_tip = chain.tip().map(|cp| cp.block_id()).unwrap_or_default(); if !(all_spks || unused_spks || utxos || unconfirmed) { unused_spks = true; @@ -254,19 +242,13 @@ fn main() -> anyhow::Result<()> { })); } - let c = chain - .blocks() - .iter() - .rev() - .take(ASSUME_FINAL_DEPTH) - .map(|(k, v)| (*k, *v)) - .collect::>(); + let tip = chain.tip(); // drop lock on graph and chain drop((graph, chain)); let update = client - .scan_without_keychain(&c, spks, txids, outpoints, scan_options.batch_size) + .scan_without_keychain(tip, spks, txids, outpoints, scan_options.batch_size) .context("scanning the blockchain")?; ElectrumUpdate { graph_update: update.graph_update, @@ -292,7 +274,7 @@ fn main() -> anyhow::Result<()> { let mut chain = chain.lock().unwrap(); let mut graph = graph.lock().unwrap(); - let chain_changeset = chain.apply_update(final_update.chain)?; + let chain_changeset = chain.update(final_update.tip)?; let indexed_additions = { let mut additions = IndexedAdditions::::default(); diff --git a/example-crates/example_rpc/Cargo.toml b/example-crates/example_rpc/Cargo.toml new file mode 100644 index 000000000..c107c49b6 --- /dev/null +++ b/example-crates/example_rpc/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "example_rpc" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +bdk_chain = { path = "../../crates/chain", features = ["serde"] } +bdk_bitcoind_rpc = { path = "../../crates/bitcoind_rpc" } +example_cli = { path = "../example_cli" } +ctrlc = { version = "^2" } diff --git a/example-crates/example_rpc/README.md b/example-crates/example_rpc/README.md new file mode 100644 index 000000000..0eaa71dae --- /dev/null +++ b/example-crates/example_rpc/README.md @@ -0,0 +1,67 @@ +# Example RPC CLI + +### Simple Regtest Test + +1. Start local regtest bitcoind. + ``` + mkdir -p /tmp/regtest/bitcoind + bitcoind -datadir=/tmp/regtest/bitcoind -regtest -server -fallbackfee=0.0002 -rpcallowip=0.0.0.0/0 -rpcbind=0.0.0.0 -blockfilterindex=1 -peerblockfilters=1 -daemon + ``` +2. Create a test bitcoind wallet and set bitcoind env. + ``` + bitcoin-cli -datadir=/tmp/regtest/bitcoind -regtest -named createwallet wallet_name="test" + export RPC_URL=127.0.0.1:18443 + export RPC_COOKIE=/tmp/regtest/bitcoind/regtest/.cookie + ``` +3. Get test bitcoind wallet info. + ``` + bitcoin-cli -rpcwallet="test" -datadir=/tmp/regtest/bitcoind -regtest getwalletinfo + ``` +4. Get new test bitcoind wallet address. + ``` + BITCOIND_ADDRESS=$(bitcoin-cli -rpcwallet="test" -datadir=/tmp/regtest/bitcoind -regtest getnewaddress) + echo $BITCOIND_ADDRESS + ``` +5. Generate 101 blocks with reward to test bitcoind wallet address. + ``` + bitcoin-cli -datadir=/tmp/regtest/bitcoind -regtest generatetoaddress 101 $BITCOIND_ADDRESS + ``` +6. Verify test bitcoind wallet balance. + ``` + bitcoin-cli -rpcwallet="test" -datadir=/tmp/regtest/bitcoind -regtest getbalances + ``` +7. Set descriptor env and get address from RPC CLI wallet. + ``` + export DESCRIPTOR="wpkh(tprv8ZgxMBicQKsPfK9BTf82oQkHhawtZv19CorqQKPFeaHDMA4dXYX6eWsJGNJ7VTQXWmoHdrfjCYuDijcRmNFwSKcVhswzqs4fugE8turndGc/1/*)" + cargo run -- --network regtest address next + ``` +8. Send 5 test bitcoin to RPC CLI wallet. + ``` + bitcoin-cli -rpcwallet="test" -datadir=/tmp/regtest/bitcoind -regtest send '[{"
":5}]' + ``` +9. Scan blockchain with RPC CLI wallet. + ``` + cargo run -- --network regtest scan + + ``` +10. Get RPC CLI wallet unconfirmed balances. + ``` + cargo run -- --network regtest balance + ``` +11. Generate 1 block with reward to test bitcoind wallet address. + ``` + bitcoin-cli -datadir=/tmp/regtest/bitcoind -regtest generatetoaddress 10 $BITCOIND_ADDRESS + ``` +12. Scan blockchain with RPC CLI wallet. + ``` + cargo run -- --network regtest scan + + ``` +13. Get RPC CLI wallet confirmed balances. + ``` + cargo run -- --network regtest balance + ``` +14. Get RPC CLI wallet transactions. + ``` + cargo run -- --network regtest txout list + ``` \ No newline at end of file diff --git a/example-crates/example_rpc/src/main.rs b/example-crates/example_rpc/src/main.rs new file mode 100644 index 000000000..c6e148958 --- /dev/null +++ b/example-crates/example_rpc/src/main.rs @@ -0,0 +1,299 @@ +use std::{ + path::PathBuf, + sync::{ + atomic::{AtomicBool, Ordering}, + mpsc::sync_channel, + Arc, Mutex, + }, + time::{Duration, Instant, SystemTime}, +}; + +use bdk_bitcoind_rpc::{ + bitcoincore_rpc::{Auth, Client, RpcApi}, + confirmation_time_anchor, BitcoindRpcItem, BitcoindRpcIter, +}; +use bdk_chain::{ + bitcoin::{Address, Transaction}, + indexed_tx_graph::IndexedAdditions, + keychain::{LocalChangeSet, LocalUpdate}, + local_chain::LocalChain, + Append, BlockId, ConfirmationTimeAnchor, IndexedTxGraph, +}; +use example_cli::{ + anyhow, + clap::{self, Args, Subcommand}, + CoinSelectionAlgo, Keychain, +}; + +const DB_MAGIC: &[u8] = b"bdk_example_rpc"; +const DB_PATH: &str = ".bdk_example_rpc.db"; +const CHANNEL_BOUND: usize = 10; +const LIVE_POLL_DUR_SECS: Duration = Duration::from_secs(15); + +type ChangeSet = LocalChangeSet; + +#[derive(Args, Debug, Clone)] +struct RpcArgs { + /// RPC URL + #[clap(env = "RPC_URL", long, default_value = "127.0.0.1:8332")] + url: String, + /// RPC auth cookie file + #[clap(env = "RPC_COOKIE", long)] + rpc_cookie: Option, + /// RPC auth username + #[clap(env = "RPC_USER", long)] + rpc_user: Option, + /// RPC auth password + #[clap(env = "RPC_PASS", long)] + rpc_password: Option, +} + +impl From for Auth { + fn from(args: RpcArgs) -> Self { + match (args.rpc_cookie, args.rpc_user, args.rpc_password) { + (None, None, None) => Self::None, + (Some(path), _, _) => Self::CookieFile(path), + (_, Some(user), Some(pass)) => Self::UserPass(user, pass), + (_, Some(_), None) => panic!("rpc auth: missing rpc_pass"), + (_, None, Some(_)) => panic!("rpc auth: missing rpc_user"), + } + } +} + +#[derive(Subcommand, Debug, Clone)] +enum RpcCommands { + /// Scans blocks via RPC (starting from last point of agreement) and stores/indexes relevant + /// transactions + Scan { + /// Starting block height to fallback to if no point of agreement if found + #[clap(env = "FALLBACK_HEIGHT", long, default_value = "0")] + fallback_height: u32, + /// The unused-scripts lookahead will be kept at this size + #[clap(long, default_value = "10")] + lookahead: u32, + /// Whether to be live! + #[clap(long, default_value = "false")] + live: bool, + #[clap(flatten)] + rpc_args: RpcArgs, + }, + /// Create and broadcast a transaction. + Tx { + value: u64, + address: Address, + #[clap(short, default_value = "bnb")] + coin_select: CoinSelectionAlgo, + #[clap(flatten)] + rpc_args: RpcArgs, + }, +} + +impl RpcCommands { + fn rpc_args(&self) -> &RpcArgs { + match self { + RpcCommands::Scan { rpc_args, .. } => rpc_args, + RpcCommands::Tx { rpc_args, .. } => rpc_args, + } + } +} + +fn main() -> anyhow::Result<()> { + let sigterm_flag = start_ctrlc_handler(); + + let (args, keymap, index, db, init_changeset) = + example_cli::init::(DB_MAGIC, DB_PATH)?; + + let graph = Mutex::new({ + let mut graph = IndexedTxGraph::new(index); + graph.apply_additions(init_changeset.indexed_additions); + graph + }); + + let chain = Mutex::new(LocalChain::from_changeset(init_changeset.chain_changeset)); + + let rpc_cmd = match args.command { + example_cli::Commands::ChainSpecific(rpc_cmd) => rpc_cmd, + general_cmd => { + let res = example_cli::handle_commands( + &graph, + &db, + &chain, + &keymap, + args.network, + |_| Err(anyhow::anyhow!("use `tx` instead")), + general_cmd, + ); + db.lock().unwrap().commit()?; + return res; + } + }; + + let rpc_client = { + let a = rpc_cmd.rpc_args(); + Client::new( + &a.url, + match (&a.rpc_cookie, &a.rpc_user, &a.rpc_password) { + (None, None, None) => Auth::None, + (Some(path), _, _) => Auth::CookieFile(path.clone()), + (_, Some(user), Some(pass)) => Auth::UserPass(user.clone(), pass.clone()), + (_, Some(_), None) => panic!("rpc auth: missing rpc_pass"), + (_, None, Some(_)) => panic!("rpc auth: missing rpc_user"), + }, + )? + }; + + match rpc_cmd { + RpcCommands::Scan { + fallback_height, + lookahead, + live, + .. + } => { + graph.lock().unwrap().index.set_lookahead_for_all(lookahead); + + let (chan, recv) = sync_channel::<(BitcoindRpcItem, u32)>(CHANNEL_BOUND); + let prev_cp = chain.lock().unwrap().tip(); + + let join_handle = std::thread::spawn(move || -> anyhow::Result<()> { + let mut tip_height = Option::::None; + + for item in BitcoindRpcIter::new(&rpc_client, fallback_height, prev_cp) { + let item = item?; + let is_block = !item.is_mempool(); + let is_mempool = item.is_mempool(); + + if tip_height.is_none() || !is_block { + tip_height = Some(rpc_client.get_block_count()? as u32); + } + chan.send((item, tip_height.expect("must have tip height")))?; + + if sigterm_flag.load(Ordering::Acquire) { + return Ok(()); + } + if is_mempool { + if !live { + return Ok(()); + } + if await_flag(&sigterm_flag, LIVE_POLL_DUR_SECS) { + return Ok(()); + } + } + } + unreachable!() + }); + + let mut start = Instant::now(); + + for (item, tip_height) in recv { + let is_mempool = item.is_mempool(); + let update: LocalUpdate = + item.into_update(confirmation_time_anchor); + let current_height = update.tip.height(); + + let db_changeset = { + let mut chain = chain.lock().unwrap(); + let mut graph = graph.lock().unwrap(); + + let chain_changeset = chain.update(update.tip)?; + + let mut indexed_additions = + IndexedAdditions::::default(); + let (_, index_additions) = graph.index.reveal_to_target_multi(&update.keychain); + indexed_additions.append(index_additions.into()); + indexed_additions.append(graph.prune_and_apply_update(update.graph)); + + ChangeSet { + indexed_additions, + chain_changeset, + } + }; + + let mut db = db.lock().unwrap(); + db.stage(db_changeset); + + // print stuff every 3 seconds + if start.elapsed() >= Duration::from_secs(3) { + start = Instant::now(); + let balance = { + let chain = chain.lock().unwrap(); + let graph = graph.lock().unwrap(); + graph.graph().balance( + &*chain, + chain.tip().map_or(BlockId::default(), |cp| cp.block_id()), + graph.index.outpoints().iter().cloned(), + |(k, _), _| k == &Keychain::Internal, + ) + }; + println!( + "* scanned_to: {} / {} tip | total: {} sats", + if is_mempool { + "mempool".to_string() + } else { + current_height.to_string() + }, + tip_height, + balance.confirmed + + balance.immature + + balance.trusted_pending + + balance.untrusted_pending + ); + } + } + + db.lock().unwrap().commit()?; + println!("commited to database!"); + + join_handle + .join() + .expect("failed to join chain source thread") + } + RpcCommands::Tx { + value, + address, + coin_select, + .. + } => { + let chain = chain.lock().unwrap(); + let broadcast = move |tx: &Transaction| -> anyhow::Result<()> { + rpc_client.send_raw_transaction(tx)?; + Ok(()) + }; + example_cli::run_send_cmd( + &graph, + &db, + &*chain, + &keymap, + coin_select, + address, + value, + broadcast, + ) + } + } +} + +fn start_ctrlc_handler() -> Arc { + let flag = Arc::new(AtomicBool::new(false)); + let cloned_flag = flag.clone(); + + ctrlc::set_handler(move || cloned_flag.store(true, Ordering::Release)); + + flag +} + +fn await_flag(flag: &AtomicBool, duration: Duration) -> bool { + let start = SystemTime::now(); + loop { + if flag.load(Ordering::Acquire) { + return true; + } + if SystemTime::now() + .duration_since(start) + .expect("should succeed") + >= duration + { + return false; + } + std::thread::sleep(Duration::from_secs(1)); + } +} diff --git a/example-crates/wallet_electrum/src/main.rs b/example-crates/wallet_electrum/src/main.rs index db80f106d..32663b2b5 100644 --- a/example-crates/wallet_electrum/src/main.rs +++ b/example-crates/wallet_electrum/src/main.rs @@ -35,7 +35,7 @@ fn main() -> Result<(), Box> { print!("Syncing..."); let client = electrum_client::Client::new("ssl://electrum.blockstream.info:60002")?; - let local_chain = wallet.checkpoints(); + let prev_tip = wallet.latest_checkpoint(); let keychain_spks = wallet .spks_of_all_keychains() .into_iter() @@ -52,15 +52,14 @@ fn main() -> Result<(), Box> { }) .collect(); - let electrum_update = - client.scan(local_chain, keychain_spks, None, None, STOP_GAP, BATCH_SIZE)?; + let electrum_update = client.scan(prev_tip, keychain_spks, None, None, STOP_GAP, BATCH_SIZE)?; println!(); let missing = electrum_update.missing_full_txs(wallet.as_ref()); let update = electrum_update.finalize_as_confirmation_time(&client, None, missing)?; - wallet.apply_update(update)?; + wallet.apply_update(update, false)?; wallet.commit()?; let balance = wallet.get_balance(); diff --git a/example-crates/wallet_esplora/src/main.rs b/example-crates/wallet_esplora/src/main.rs index 119d9cbd7..187091ff4 100644 --- a/example-crates/wallet_esplora/src/main.rs +++ b/example-crates/wallet_esplora/src/main.rs @@ -1,7 +1,7 @@ const DB_MAGIC: &str = "bdk_wallet_esplora_example"; const SEND_AMOUNT: u64 = 5000; const STOP_GAP: usize = 50; -const PARALLEL_REQUESTS: usize = 5; +const PARALLEL_REQUESTS: usize = 2; use std::{io::Write, str::FromStr}; @@ -36,7 +36,7 @@ fn main() -> Result<(), Box> { let client = esplora_client::Builder::new("https://blockstream.info/testnet/api").build_blocking()?; - let local_chain = wallet.checkpoints(); + let prev_tip = wallet.latest_checkpoint(); let keychain_spks = wallet .spks_of_all_keychains() .into_iter() @@ -53,7 +53,7 @@ fn main() -> Result<(), Box> { }) .collect(); let update = client.scan( - local_chain, + prev_tip, keychain_spks, None, None, @@ -61,7 +61,7 @@ fn main() -> Result<(), Box> { PARALLEL_REQUESTS, )?; println!(); - wallet.apply_update(update)?; + wallet.apply_update(update, false)?; wallet.commit()?; let balance = wallet.get_balance(); diff --git a/example-crates/wallet_esplora_async/src/main.rs b/example-crates/wallet_esplora_async/src/main.rs index 7cb218ec2..a3a3399e1 100644 --- a/example-crates/wallet_esplora_async/src/main.rs +++ b/example-crates/wallet_esplora_async/src/main.rs @@ -37,7 +37,7 @@ async fn main() -> Result<(), Box> { let client = esplora_client::Builder::new("https://blockstream.info/testnet/api").build_async()?; - let local_chain = wallet.checkpoints(); + let prev_cp = wallet.latest_checkpoint(); let keychain_spks = wallet .spks_of_all_keychains() .into_iter() @@ -54,17 +54,10 @@ async fn main() -> Result<(), Box> { }) .collect(); let update = client - .scan( - local_chain, - keychain_spks, - [], - [], - STOP_GAP, - PARALLEL_REQUESTS, - ) + .scan(prev_cp, keychain_spks, [], [], STOP_GAP, PARALLEL_REQUESTS) .await?; println!(); - wallet.apply_update(update)?; + wallet.apply_update(update, false)?; wallet.commit()?; let balance = wallet.get_balance();