From 1f16155cf00313efc8c24cf0061d9188a2a6838a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Mon, 26 Jun 2023 08:22:26 +0800 Subject: [PATCH 1/9] Fix cargo features * `bdk_chain/std` should also enable `miniscript/std` * `hashbrown/serde` should only be enabled when `bdk_chain/serde` is enabled * use the version of `hashbrown` that `bitcoin` and `miniscript` is using --- crates/chain/Cargo.toml | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/crates/chain/Cargo.toml b/crates/chain/Cargo.toml index 0d8123ca7..332dbae32 100644 --- a/crates/chain/Cargo.toml +++ b/crates/chain/Cargo.toml @@ -13,12 +13,13 @@ readme = "README.md" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +# For no-std, remember to enable the bitcoin/no-std feature bitcoin = { version = "0.29", default-features = false } serde_crate = { package = "serde", version = "1", optional = true, features = ["derive"] } # Use hashbrown as a feature flag to have HashSet and HashMap from it. # note version 0.13 breaks outs MSRV. -hashbrown = { version = "0.12", optional = true, features = ["serde"] } +hashbrown = { version = "0.11", optional = true } miniscript = { version = "9.0.0", optional = true, default-features = false } [dev-dependencies] @@ -26,5 +27,5 @@ rand = "0.8" [features] default = ["std"] -std = ["bitcoin/std"] -serde = ["serde_crate", "bitcoin/serde" ] +std = ["bitcoin/std", "miniscript/std"] +serde = ["serde_crate", "bitcoin/serde", "hashbrown/serde"] From af75d3f7324bd11f5f5d7c66fabe7c95bb0f83d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Sun, 4 Jun 2023 03:12:42 +0800 Subject: [PATCH 2/9] [local_chain] Implement `LocalChain` with linked list This allows the data-source thread to hold a reference to checkpoints without a lock on `LocalChain` itself. Introduce `LocalChain::update` that replaces `determine_changeset` and `apply_update`. This returns a closure that actually updates `self` when called. This method allows for efficient and elegant updating while being able to "preview" the update before applying. The `LocalChain` update/determine_changeset tests have been updated to also check for the final state after applying the update (not just looking at the changeset). Update `keychain::LocalUpdate` struct to use `CheckPoint` Instead of containing a complete `LocalChain`, the update uses `CheckPoint`. This simplifies the API since updating a `LocalChain` only requires a `CheckPoint` now. The examples and chain source `..Ext` implementations have all been updated to use the new API. Additionally, `..Ext` implementations didn't 100% guarantee consistency of the updates, the logic has been changed to enforce better guarantees. --- crates/bdk/Cargo.toml | 2 +- crates/bdk/src/wallet/mod.rs | 59 +- crates/bdk/tests/wallet.rs | 9 +- crates/chain/src/keychain.rs | 24 +- crates/chain/src/local_chain.rs | 533 +++++++++++++----- crates/chain/tests/common/mod.rs | 20 +- crates/chain/tests/test_indexed_tx_graph.rs | 16 +- crates/chain/tests/test_local_chain.rs | 370 ++++++------ crates/chain/tests/test_tx_graph.rs | 22 +- crates/electrum/src/electrum_ext.rs | 129 +++-- crates/electrum/src/lib.rs | 13 +- crates/esplora/src/async_ext.rs | 99 +++- crates/esplora/src/blocking_ext.rs | 92 +-- crates/esplora/src/lib.rs | 6 +- example-crates/example_electrum/src/main.rs | 39 +- example-crates/wallet_electrum/src/main.rs | 5 +- example-crates/wallet_esplora/src/main.rs | 4 +- .../wallet_esplora_async/src/main.rs | 11 +- 18 files changed, 901 insertions(+), 552 deletions(-) diff --git a/crates/bdk/Cargo.toml b/crates/bdk/Cargo.toml index a90f1ab0f..d7d19c4bc 100644 --- a/crates/bdk/Cargo.toml +++ b/crates/bdk/Cargo.toml @@ -47,7 +47,7 @@ dev-getrandom-wasm = ["getrandom/js"] lazy_static = "1.4" env_logger = "0.7" # Move back to importing from rust-bitcoin once https://github.com/rust-bitcoin/rust-bitcoin/pull/1342 is released -base64 = "^0.13" +base64 = "^0.21" assert_matches = "1.5.0" [package.metadata.docs.rs] diff --git a/crates/bdk/src/wallet/mod.rs b/crates/bdk/src/wallet/mod.rs index f2f717d9f..af818e1ff 100644 --- a/crates/bdk/src/wallet/mod.rs +++ b/crates/bdk/src/wallet/mod.rs @@ -23,7 +23,7 @@ pub use bdk_chain::keychain::Balance; use bdk_chain::{ indexed_tx_graph::IndexedAdditions, keychain::{KeychainTxOutIndex, LocalChangeSet, LocalUpdate}, - local_chain::{self, LocalChain, UpdateNotConnectedError}, + local_chain::{self, CannotConnectError, CheckPoint, CheckPointIter, LocalChain}, tx_graph::{CanonicalTx, TxGraph}, Append, BlockId, ChainPosition, ConfirmationTime, ConfirmationTimeAnchor, FullTxOut, IndexedTxGraph, Persist, PersistBackend, @@ -32,8 +32,8 @@ use bitcoin::consensus::encode::serialize; use bitcoin::secp256k1::Secp256k1; use bitcoin::util::psbt; use bitcoin::{ - Address, BlockHash, EcdsaSighashType, LockTime, Network, OutPoint, SchnorrSighashType, Script, - Sequence, Transaction, TxOut, Txid, Witness, + Address, EcdsaSighashType, LockTime, Network, OutPoint, SchnorrSighashType, Script, Sequence, + Transaction, TxOut, Txid, Witness, }; use core::fmt; use core::ops::Deref; @@ -245,7 +245,7 @@ impl Wallet { }; let changeset = db.load_from_persistence().map_err(NewError::Persist)?; - chain.apply_changeset(changeset.chain_changeset); + chain.apply_changeset(&changeset.chain_changeset); indexed_graph.apply_additions(changeset.indexed_additions); let persist = Persist::new(db); @@ -370,19 +370,19 @@ impl Wallet { .graph() .filter_chain_unspents( &self.chain, - self.chain.tip().unwrap_or_default(), + self.chain.tip().map(|cp| cp.block_id()).unwrap_or_default(), self.indexed_graph.index.outpoints().iter().cloned(), ) .map(|((k, i), full_txo)| new_local_utxo(k, i, full_txo)) } /// Get all the checkpoints the wallet is currently storing indexed by height. - pub fn checkpoints(&self) -> &BTreeMap { - self.chain.blocks() + pub fn checkpoints(&self) -> CheckPointIter { + self.chain.iter_checkpoints(None) } /// Returns the latest checkpoint. - pub fn latest_checkpoint(&self) -> Option { + pub fn latest_checkpoint(&self) -> Option { self.chain.tip() } @@ -420,7 +420,7 @@ impl Wallet { .graph() .filter_chain_unspents( &self.chain, - self.chain.tip().unwrap_or_default(), + self.chain.tip().map(|cp| cp.block_id()).unwrap_or_default(), core::iter::once((spk_i, op)), ) .map(|((k, i), full_txo)| new_local_utxo(k, i, full_txo)) @@ -437,7 +437,7 @@ impl Wallet { let canonical_tx = CanonicalTx { observed_as: graph.get_chain_position( &self.chain, - self.chain.tip().unwrap_or_default(), + self.chain.tip().map(|cp| cp.block_id()).unwrap_or_default(), txid, )?, node: graph.get_tx_node(txid)?, @@ -460,11 +460,11 @@ impl Wallet { pub fn insert_checkpoint( &mut self, block_id: BlockId, - ) -> Result + ) -> Result where D: PersistBackend, { - let changeset = self.chain.insert_block(block_id)?; + let (_, changeset) = self.chain.get_or_insert(block_id)?; let changed = !changeset.is_empty(); self.persist.stage(changeset.into()); Ok(changed) @@ -500,18 +500,15 @@ impl Wallet { // anchor tx to checkpoint with lowest height that is >= position's height let anchor = self .chain - .blocks() + .checkpoints() .range(height..) .next() .ok_or(InsertTxError::ConfirmationHeightCannotBeGreaterThanTip { - tip_height: self.chain.tip().map(|b| b.height), + tip_height: self.chain.tip().map(|b| b.height()), tx_height: height, }) - .map(|(&anchor_height, &anchor_hash)| ConfirmationTimeAnchor { - anchor_block: BlockId { - height: anchor_height, - hash: anchor_hash, - }, + .map(|(&_, cp)| ConfirmationTimeAnchor { + anchor_block: cp.block_id(), confirmation_height: height, confirmation_time: time, })?; @@ -531,9 +528,10 @@ impl Wallet { pub fn transactions( &self, ) -> impl Iterator> + '_ { - self.indexed_graph - .graph() - .list_chain_txs(&self.chain, self.chain.tip().unwrap_or_default()) + self.indexed_graph.graph().list_chain_txs( + &self.chain, + self.chain.tip().map(|cp| cp.block_id()).unwrap_or_default(), + ) } /// Return the balance, separated into available, trusted-pending, untrusted-pending and immature @@ -541,7 +539,7 @@ impl Wallet { pub fn get_balance(&self) -> Balance { self.indexed_graph.graph().balance( &self.chain, - self.chain.tip().unwrap_or_default(), + self.chain.tip().map(|cp| cp.block_id()).unwrap_or_default(), self.indexed_graph.index.outpoints().iter().cloned(), |&(k, _), _| k == KeychainKind::Internal, ) @@ -715,8 +713,7 @@ impl Wallet { None => self .chain .tip() - .and_then(|cp| cp.height.into()) - .map(|height| LockTime::from_height(height).expect("Invalid height")), + .map(|cp| LockTime::from_height(cp.height()).expect("Invalid height")), h => h, }; @@ -1030,7 +1027,7 @@ impl Wallet { ) -> Result, Error> { let graph = self.indexed_graph.graph(); let txout_index = &self.indexed_graph.index; - let chain_tip = self.chain.tip().unwrap_or_default(); + let chain_tip = self.chain.tip().map(|cp| cp.block_id()).unwrap_or_default(); let mut tx = graph .get_tx(txid) @@ -1265,7 +1262,7 @@ impl Wallet { psbt: &mut psbt::PartiallySignedTransaction, sign_options: SignOptions, ) -> Result { - let chain_tip = self.chain.tip().unwrap_or_default(); + let chain_tip = self.chain.tip().map(|cp| cp.block_id()).unwrap_or_default(); let tx = &psbt.unsigned_tx; let mut finished = true; @@ -1288,7 +1285,7 @@ impl Wallet { }); let current_height = sign_options .assume_height - .or(self.chain.tip().map(|b| b.height)); + .or(self.chain.tip().map(|b| b.height())); debug!( "Input #{} - {}, using `confirmation_height` = {:?}, `current_height` = {:?}", @@ -1433,7 +1430,7 @@ impl Wallet { must_only_use_confirmed_tx: bool, current_height: Option, ) -> (Vec, Vec) { - let chain_tip = self.chain.tip().unwrap_or_default(); + let chain_tip = self.chain.tip().map(|cp| cp.block_id()).unwrap_or_default(); // must_spend <- manually selected utxos // may_spend <- all other available utxos let mut may_spend = self.get_available_utxos(); @@ -1704,11 +1701,11 @@ impl Wallet { /// transactions related to your wallet into it. /// /// [`commit`]: Self::commit - pub fn apply_update(&mut self, update: Update) -> Result + pub fn apply_update(&mut self, update: Update) -> Result where D: PersistBackend, { - let mut changeset: ChangeSet = self.chain.apply_update(update.chain)?.into(); + let mut changeset = ChangeSet::from(self.chain.apply_update(update.tip)?); let (_, index_additions) = self .indexed_graph .index diff --git a/crates/bdk/tests/wallet.rs b/crates/bdk/tests/wallet.rs index 282a74fcb..ed014f70a 100644 --- a/crates/bdk/tests/wallet.rs +++ b/crates/bdk/tests/wallet.rs @@ -44,7 +44,10 @@ fn receive_output(wallet: &mut Wallet, value: u64, height: ConfirmationTime) -> fn receive_output_in_latest_block(wallet: &mut Wallet, value: u64) -> OutPoint { let height = match wallet.latest_checkpoint() { - Some(BlockId { height, .. }) => ConfirmationTime::Confirmed { height, time: 0 }, + Some(cp) => ConfirmationTime::Confirmed { + height: cp.height(), + time: 0, + }, None => ConfirmationTime::Unconfirmed { last_seen: 0 }, }; receive_output(wallet, value, height) @@ -222,7 +225,7 @@ fn test_create_tx_fee_sniping_locktime_last_sync() { // If there's no current_height we're left with using the last sync height assert_eq!( psbt.unsigned_tx.lock_time.0, - wallet.latest_checkpoint().unwrap().height + wallet.latest_checkpoint().unwrap().height() ); } @@ -1482,7 +1485,7 @@ fn test_bump_fee_drain_wallet() { .insert_tx( tx.clone(), ConfirmationTime::Confirmed { - height: wallet.latest_checkpoint().unwrap().height, + height: wallet.latest_checkpoint().unwrap().height(), time: 42_000, }, ) diff --git a/crates/chain/src/keychain.rs b/crates/chain/src/keychain.rs index f9b2436f2..d83868890 100644 --- a/crates/chain/src/keychain.rs +++ b/crates/chain/src/keychain.rs @@ -13,7 +13,7 @@ use crate::{ collections::BTreeMap, indexed_tx_graph::IndexedAdditions, - local_chain::{self, LocalChain}, + local_chain::{self, CheckPoint}, tx_graph::TxGraph, Anchor, Append, }; @@ -89,8 +89,9 @@ impl AsRef> for DerivationAdditions { } } -/// A structure to update [`KeychainTxOutIndex`], [`TxGraph`] and [`LocalChain`] -/// atomically. +/// A structure to update [`KeychainTxOutIndex`], [`TxGraph`] and [`LocalChain`] atomically. +/// +/// [`LocalChain`]: local_chain::LocalChain #[derive(Debug, Clone, PartialEq)] pub struct LocalUpdate { /// Last active derivation index per keychain (`K`). @@ -98,15 +99,18 @@ pub struct LocalUpdate { /// Update for the [`TxGraph`]. pub graph: TxGraph, /// Update for the [`LocalChain`]. - pub chain: LocalChain, + /// + /// [`LocalChain`]: local_chain::LocalChain + pub tip: CheckPoint, } -impl Default for LocalUpdate { - fn default() -> Self { +impl LocalUpdate { + /// Construct a [`LocalUpdate`] with a given [`CheckPoint`] tip. + pub fn new(tip: CheckPoint) -> Self { Self { - keychain: Default::default(), - graph: Default::default(), - chain: Default::default(), + keychain: BTreeMap::new(), + graph: TxGraph::default(), + tip, } } } @@ -126,6 +130,8 @@ impl Default for LocalUpdate { )] pub struct LocalChangeSet { /// Changes to the [`LocalChain`]. + /// + /// [`LocalChain`]: local_chain::LocalChain pub chain_changeset: local_chain::ChangeSet, /// Additions to [`IndexedTxGraph`]. diff --git a/crates/chain/src/local_chain.rs b/crates/chain/src/local_chain.rs index fe97e3f27..361dcff01 100644 --- a/crates/chain/src/local_chain.rs +++ b/crates/chain/src/local_chain.rs @@ -2,15 +2,148 @@ use core::convert::Infallible; -use alloc::collections::BTreeMap; +use crate::collections::BTreeMap; +use crate::{BlockId, ChainOracle}; +use alloc::sync::Arc; use bitcoin::BlockHash; -use crate::{BlockId, ChainOracle}; +/// A structure that represents changes to [`LocalChain`]. +pub type ChangeSet = BTreeMap>; + +/// Represents a block of [`LocalChain`]. +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] +pub struct CheckPoint(Arc); + +/// The internal contents of [`CheckPoint`]. +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] +struct CPInner { + /// Block id (hash and height). + block: BlockId, + /// Previous checkpoint (if any). + prev: Option>, +} + +/// Occurs when the caller contructs a [`CheckPoint`] with a height that is not higher than the +/// previous checkpoint it points to. +#[derive(Debug, Clone, PartialEq)] +pub struct NewCheckPointError { + /// The height of the new checkpoint. + pub new_height: u32, + /// The height of the previous checkpoint. + pub prev_height: u32, +} + +impl core::fmt::Display for NewCheckPointError { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + write!(f, "cannot construct checkpoint with a height ({}) that is not higher than the previous checkpoint ({})", self.new_height, self.prev_height) + } +} + +impl std::error::Error for NewCheckPointError {} + +impl CheckPoint { + /// Construct a [`CheckPoint`] from a [`BlockId`]. + pub fn new(block: BlockId) -> Self { + Self(Arc::new(CPInner { block, prev: None })) + } + + /// Construct a [`CheckPoint`] of `block` with a previous checkpoint. + pub fn new_with_prev( + block: BlockId, + prev: Option, + ) -> Result { + if let Some(prev_cp) = &prev { + if prev_cp.height() >= block.height { + return Err(NewCheckPointError { + new_height: block.height, + prev_height: prev_cp.height(), + }); + } + } + + Ok(Self(Arc::new(CPInner { + block, + prev: prev.map(|cp| cp.0), + }))) + } + + /// Get the [`BlockId`] of the checkpoint. + pub fn block_id(&self) -> BlockId { + self.0.block + } + + /// Get the height of the checkpoint. + pub fn height(&self) -> u32 { + self.0.block.height + } + + /// Get the block hash of the checkpoint. + pub fn hash(&self) -> BlockHash { + self.0.block.hash + } + + /// Detach this checkpoint from the previous. + pub fn detach(self) -> Self { + Self(Arc::new(CPInner { + block: self.0.block, + prev: None, + })) + } + + /// Get previous checkpoint. + pub fn prev(&self) -> Option { + self.0.prev.clone().map(CheckPoint) + } + + /// Iterate + pub fn iter(&self) -> CheckPointIter { + CheckPointIter { + current: Some(Arc::clone(&self.0)), + } + } +} + +/// A structure that iterates over checkpoints backwards. +pub struct CheckPointIter { + current: Option>, +} + +impl Iterator for CheckPointIter { + type Item = CheckPoint; + + fn next(&mut self) -> Option { + let current = self.current.clone()?; + self.current = current.prev.clone(); + Some(CheckPoint(current)) + } +} /// This is a local implementation of [`ChainOracle`]. #[derive(Debug, Default, Clone, PartialEq, Eq, PartialOrd, Ord)] pub struct LocalChain { - blocks: BTreeMap, + checkpoints: BTreeMap, +} + +impl From for BTreeMap { + fn from(value: LocalChain) -> Self { + value + .checkpoints + .values() + .map(|cp| (cp.height(), cp.hash())) + .collect() + } +} + +impl From for LocalChain { + fn from(value: ChangeSet) -> Self { + Self::from_changeset(value) + } +} + +impl From> for LocalChain { + fn from(value: BTreeMap) -> Self { + Self::from_blocks(value) + } } impl ChainOracle for LocalChain { @@ -19,18 +152,18 @@ impl ChainOracle for LocalChain { fn is_block_in_chain( &self, block: BlockId, - static_block: BlockId, + chain_tip: BlockId, ) -> Result, Self::Error> { - if block.height > static_block.height { + if block.height > chain_tip.height { return Ok(None); } Ok( match ( - self.blocks.get(&block.height), - self.blocks.get(&static_block.height), + self.checkpoints.get(&block.height), + self.checkpoints.get(&chain_tip.height), ) { - (Some(&hash), Some(&static_hash)) => { - Some(hash == block.hash && static_hash == static_block.hash) + (Some(cp), Some(tip_cp)) => { + Some(cp.hash() == block.hash && tip_cp.hash() == chain_tip.hash) } _ => None, }, @@ -38,196 +171,282 @@ impl ChainOracle for LocalChain { } fn get_chain_tip(&self) -> Result, Self::Error> { - Ok(self.tip()) + Ok(self.checkpoints.values().last().map(CheckPoint::block_id)) } } -impl AsRef> for LocalChain { - fn as_ref(&self) -> &BTreeMap { - &self.blocks - } -} - -impl From for BTreeMap { - fn from(value: LocalChain) -> Self { - value.blocks +impl LocalChain { + /// Construct a [`LocalChain`] from an initial `changeset`. + pub fn from_changeset(changeset: ChangeSet) -> Self { + let mut chain = Self::default(); + chain.apply_changeset(&changeset); + chain } -} -impl From> for LocalChain { - fn from(value: BTreeMap) -> Self { - Self { blocks: value } + /// Construct a [`LocalChain`] from a given `checkpoint` tip. + pub fn from_checkpoint(checkpoint: CheckPoint) -> Self { + Self { + checkpoints: checkpoint.iter().map(|cp| (cp.height(), cp)).collect(), + } } -} -impl LocalChain { - /// Contruct a [`LocalChain`] from a list of [`BlockId`]s. - pub fn from_blocks(blocks: B) -> Self - where - B: IntoIterator, - { + /// Constructs a [`LocalChain`] from a [`BTreeMap`] of height to [`BlockHash`]. + /// + /// The [`BTreeMap`] enforces the height order. However, the caller must ensure the blocks are + /// all of the same chain. + pub fn from_blocks(blocks: BTreeMap) -> Self { Self { - blocks: blocks.into_iter().map(|b| (b.height, b.hash)).collect(), + checkpoints: blocks + .into_iter() + .map({ + let mut prev = None; + move |(height, hash)| { + let cp = CheckPoint::new_with_prev(BlockId { height, hash }, prev.clone()) + .expect("must not fail"); + prev = Some(cp.clone()); + (height, cp) + } + }) + .collect(), } } - /// Get a reference to a map of block height to hash. - pub fn blocks(&self) -> &BTreeMap { - &self.blocks + /// Get the highest checkpoint. + pub fn tip(&self) -> Option { + self.checkpoints.values().last().cloned() } - /// Get the chain tip. - pub fn tip(&self) -> Option { - self.blocks - .iter() - .last() - .map(|(&height, &hash)| BlockId { height, hash }) + /// Returns whether the [`LocalChain`] is empty (has no checkpoints). + pub fn is_empty(&self) -> bool { + self.checkpoints.is_empty() } - /// This is like the sparsechain's logic, expect we must guarantee that all invalidated heights - /// are to be re-filled. - pub fn determine_changeset(&self, update: &Self) -> Result { - let update = update.as_ref(); - let update_tip = match update.keys().last().cloned() { - Some(tip) => tip, - None => return Ok(ChangeSet::default()), - }; + /// Previews, and optionally applies updates to [`Self`] with the given `new_tip`. + /// + /// The method returns `(apply_update, changeset)` if [`Ok`]. `apply_update` is a closure that + /// can be called to apply the changes represented in `changeset. + /// + /// To update, the `new_tip` must *connect* with `self`. If `self` and `new_tip` has a mutual + /// checkpoint (same height and hash), it can connect if: + /// * The mutual checkpoint is the tip of `self`. + /// * An ancestor of `new_tip` has a height which is of the checkpoint one higher than the + /// mutual checkpoint from `self`. + /// + /// Additionally: + /// * If `self` is empty, `new_tip` will always connect. + /// * If `self` only has one checkpoint, `new_tip` must have an ancestor checkpoint with the + /// same height as it. + /// + /// To invalidate from a given checkpoint, `new_tip` must contain an ancestor checkpoint with + /// the same height but different hash. + /// + /// # Errors + /// + /// An error will occur if the update does not correctly connect with `self`. + /// + /// Refer to [module-level documentation] for more. + /// + /// [module-level documentation]: crate::local_chain + pub fn update( + &mut self, + new_tip: CheckPoint, + ) -> Result<(impl FnOnce() + '_, ChangeSet), CannotConnectError> { + let mut updated_cps = BTreeMap::::new(); + let mut agreement_height = Option::::None; + let mut complete_match = false; + + for cp in new_tip.iter() { + let block = cp.block_id(); + let original_cp = self.checkpoints.get(&block.height); + + // if original block of height does not exist, or if the hash does not match we will + // need to update the original checkpoint at that height + if original_cp.map(CheckPoint::block_id) != Some(block) { + updated_cps.insert(block.height, cp.clone()); + } - // this is the latest height where both the update and local chain has the same block hash - let agreement_height = update - .iter() - .rev() - .find(|&(u_height, u_hash)| self.blocks.get(u_height) == Some(u_hash)) - .map(|(&height, _)| height); + if let Some(original_cp) = original_cp { + // record the first agreement height + if agreement_height.is_none() && original_cp.block_id() == block { + agreement_height = Some(block.height); + } + // break if the internal pointers of the checkpoints are the same + if Arc::as_ptr(&original_cp.0) == Arc::as_ptr(&cp.0) { + complete_match = true; + break; + } + } + } - // the lower bound of the range to invalidate + // Lower bound of the range to invalidate in `self`. let invalidate_lb = match agreement_height { - Some(height) if height == update_tip => u32::MAX, + // if there is no agreement, we invalidate all of the original chain + None => u32::MIN, + // if the agreement is at the update's tip, we don't need to invalidate + Some(height) if height == new_tip.height() => u32::MAX, Some(height) => height + 1, - None => 0, }; - // the first block's height to invalidate in the local chain - let invalidate_from_height = self.blocks.range(invalidate_lb..).next().map(|(&h, _)| h); - - // the first block of height to invalidate (if any) should be represented in the update - if let Some(first_invalid_height) = invalidate_from_height { - if !update.contains_key(&first_invalid_height) { - return Err(UpdateNotConnectedError(first_invalid_height)); + let changeset = { + // Construct initial changeset of heights to invalidate in `self`. + let mut changeset = self + .checkpoints + .range(invalidate_lb..) + .map(|(&height, _)| (height, None)) + .collect::(); + + // The height of the first block to invalidate (if any) must be represented in the `update`. + if let Some(first_invalidated_height) = changeset.keys().next() { + if !updated_cps.contains_key(first_invalidated_height) { + return Err(CannotConnectError { + try_include: self + .checkpoints + .get(first_invalidated_height) + .expect("checkpoint already exists") + .block_id(), + }); + } } - } - let mut changeset: BTreeMap> = match invalidate_from_height { - Some(first_invalid_height) => { - // the first block of height to invalidate should be represented in the update - if !update.contains_key(&first_invalid_height) { - return Err(UpdateNotConnectedError(first_invalid_height)); + changeset.extend( + updated_cps + .iter() + .map(|(height, cp)| (*height, Some(cp.hash()))), + ); + changeset + }; + + let apply_update = move || { + if let Some(&start_height) = updated_cps.keys().next() { + self.checkpoints.split_off(&invalidate_lb); + self.checkpoints.append(&mut updated_cps); + if !self.is_empty() && !complete_match { + self.fix_links(start_height); } - self.blocks - .range(first_invalid_height..) - .map(|(height, _)| (*height, None)) - .collect() } - None => BTreeMap::new(), }; - for (height, update_hash) in update { - let original_hash = self.blocks.get(height); - if Some(update_hash) != original_hash { - changeset.insert(*height, Some(*update_hash)); - } - } - Ok(changeset) + Ok((apply_update, changeset)) } - /// Applies the given `changeset`. - pub fn apply_changeset(&mut self, changeset: ChangeSet) { - for (height, blockhash) in changeset { - match blockhash { - Some(blockhash) => self.blocks.insert(height, blockhash), - None => self.blocks.remove(&height), - }; + /// Apply the given `changeset`. + pub fn apply_changeset(&mut self, changeset: &ChangeSet) { + if let Some(start_height) = changeset.keys().next().cloned() { + for (&height, &hash) in changeset { + match hash { + Some(hash) => self + .checkpoints + .insert(height, CheckPoint::new(BlockId { height, hash })), + None => self.checkpoints.remove(&height), + }; + } + self.fix_links(start_height); } } - /// Updates [`LocalChain`] with an update [`LocalChain`]. + /// Update [`LocalChain`]. /// - /// This is equivalent to calling [`determine_changeset`] and [`apply_changeset`] in sequence. + /// This is equivalent to calling [`update`] and applying the update in sequence. /// - /// [`determine_changeset`]: Self::determine_changeset - /// [`apply_changeset`]: Self::apply_changeset - pub fn apply_update(&mut self, update: Self) -> Result { - let changeset = self.determine_changeset(&update)?; - self.apply_changeset(changeset.clone()); + /// [`update`]: Self::update + pub fn apply_update(&mut self, new_tip: CheckPoint) -> Result { + let (apply, changeset) = self.update(new_tip)?; + apply(); Ok(changeset) } - /// Derives a [`ChangeSet`] that assumes that there are no preceding changesets. + /// Get or insert a `block_id`. /// - /// The changeset returned will record additions of all blocks included in [`Self`]. - pub fn initial_changeset(&self) -> ChangeSet { - self.blocks - .iter() - .map(|(&height, &hash)| (height, Some(hash))) - .collect() - } - - /// Insert a block of [`BlockId`] into the [`LocalChain`]. + /// # Errors /// - /// # Error - /// - /// If the insertion height already contains a block, and the block has a different blockhash, - /// this will result in an [`InsertBlockNotMatchingError`]. - pub fn insert_block( + /// Replacing the block hash of an existing checkpoint will result in an error. + pub fn get_or_insert( &mut self, block_id: BlockId, - ) -> Result { - let mut update = Self::from_blocks(self.tip()); - - if let Some(original_hash) = update.blocks.insert(block_id.height, block_id.hash) { - if original_hash != block_id.hash { - return Err(InsertBlockNotMatchingError { - height: block_id.height, - original_hash, - update_hash: block_id.hash, + ) -> Result<(CheckPoint, ChangeSet), InsertBlockError> { + use crate::collections::btree_map::Entry; + + match self.checkpoints.entry(block_id.height) { + Entry::Vacant(entry) => { + entry.insert(CheckPoint::new(block_id)); + self.fix_links(block_id.height); + let cp = self.checkpoint(block_id.height).expect("must be inserted"); + let changeset = + core::iter::once((block_id.height, Some(block_id.hash))).collect::(); + Ok((cp, changeset)) + } + Entry::Occupied(entry) => { + let cp = entry.get(); + if cp.block_id() == block_id { + Ok((cp.clone(), ChangeSet::default())) + } else { + Err(InsertBlockError { + height: block_id.height, + original_hash: cp.hash(), + update_hash: block_id.hash, + }) + } + } + } + } + + fn fix_links(&mut self, start_height: u32) { + let mut prev = self + .checkpoints + .range(..start_height) + .last() + .map(|(_, cp)| cp.clone()); + + for (_, cp) in self.checkpoints.range_mut(start_height..) { + if cp.0.prev.as_ref().map(Arc::as_ptr) != prev.as_ref().map(|cp| Arc::as_ptr(&cp.0)) { + cp.0 = Arc::new(CPInner { + block: cp.block_id(), + prev: prev.clone().map(|cp| cp.0), }); } + prev = Some(cp.clone()); } + } - Ok(self.apply_update(update).expect("should always connect")) + /// Derives an initial [`ChangeSet`], meaning that it can be applied to an empty chain to + /// recover the current chain. + pub fn initial_changeset(&self) -> ChangeSet { + self.iter_checkpoints(None) + .map(|cp| (cp.height(), Some(cp.hash()))) + .collect() } -} -/// This is the return value of [`determine_changeset`] and represents changes to [`LocalChain`]. -/// -/// [`determine_changeset`]: LocalChain::determine_changeset -pub type ChangeSet = BTreeMap>; + /// Get checkpoint of `height` (if any). + pub fn checkpoint(&self, height: u32) -> Option { + self.checkpoints.get(&height).cloned() + } -/// Represents an update failure of [`LocalChain`] due to the update not connecting to the original -/// chain. -/// -/// The update cannot be applied to the chain because the chain suffix it represents did not -/// connect to the existing chain. This error case contains the checkpoint height to include so -/// that the chains can connect. -#[derive(Clone, Debug, PartialEq)] -pub struct UpdateNotConnectedError(pub u32); + /// Iterate over checkpoints in decending height order. + /// + /// `height_upper_bound` is inclusive. A value of `None` means there is no bound, so all + /// checkpoints will be traversed. + pub fn iter_checkpoints(&self, height_upper_bound: Option) -> CheckPointIter { + CheckPointIter { + current: match height_upper_bound { + Some(height) => self + .checkpoints + .range(..=height) + .last() + .map(|(_, cp)| cp.0.clone()), + None => self.checkpoints.values().last().map(|cp| cp.0.clone()), + }, + } + } -impl core::fmt::Display for UpdateNotConnectedError { - fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { - write!( - f, - "the update cannot connect with the chain, try include block at height {}", - self.0 - ) + /// Get a reference to the internal checkpoint map. + pub fn checkpoints(&self) -> &BTreeMap { + &self.checkpoints } } -#[cfg(feature = "std")] -impl std::error::Error for UpdateNotConnectedError {} - /// Represents a failure when trying to insert a checkpoint into [`LocalChain`]. #[derive(Clone, Debug, PartialEq)] -pub struct InsertBlockNotMatchingError { +pub struct InsertBlockError { /// The checkpoints' height. pub height: u32, /// Original checkpoint's block hash. @@ -236,7 +455,7 @@ pub struct InsertBlockNotMatchingError { pub update_hash: BlockHash, } -impl core::fmt::Display for InsertBlockNotMatchingError { +impl core::fmt::Display for InsertBlockError { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { write!( f, @@ -247,4 +466,24 @@ impl core::fmt::Display for InsertBlockNotMatchingError { } #[cfg(feature = "std")] -impl std::error::Error for InsertBlockNotMatchingError {} +impl std::error::Error for InsertBlockError {} + +/// Occurs when an update does not have a common checkpoint with the original chain. +#[derive(Clone, Debug, PartialEq)] +pub struct CannotConnectError { + /// The suggested checkpoint to include to connect the two chains. + pub try_include: BlockId, +} + +impl core::fmt::Display for CannotConnectError { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + write!( + f, + "introduced chain cannot connect with the original chain, try include {}:{}", + self.try_include.height, self.try_include.hash, + ) + } +} + +#[cfg(feature = "std")] +impl std::error::Error for CannotConnectError {} diff --git a/crates/chain/tests/common/mod.rs b/crates/chain/tests/common/mod.rs index 7d7288bdf..cd799c03a 100644 --- a/crates/chain/tests/common/mod.rs +++ b/crates/chain/tests/common/mod.rs @@ -9,25 +9,17 @@ macro_rules! h { macro_rules! local_chain { [ $(($height:expr, $block_hash:expr)), * ] => {{ #[allow(unused_mut)] - bdk_chain::local_chain::LocalChain::from_blocks([$(($height, $block_hash).into()),*]) + bdk_chain::local_chain::LocalChain::from_blocks([$(($height, $block_hash).into()),*].into_iter().collect()) }}; } #[allow(unused_macros)] -macro_rules! chain { - ($([$($tt:tt)*]),*) => { chain!( checkpoints: [$([$($tt)*]),*] ) }; - (checkpoints: $($tail:tt)*) => { chain!( index: TxHeight, checkpoints: $($tail)*) }; - (index: $ind:ty, checkpoints: [ $([$height:expr, $block_hash:expr]),* ] $(,txids: [$(($txid:expr, $tx_height:expr)),*])?) => {{ +macro_rules! chain_update { + [ $(($height:expr, $hash:expr)), * ] => {{ #[allow(unused_mut)] - let mut chain = bdk_chain::sparse_chain::SparseChain::<$ind>::from_checkpoints([$(($height, $block_hash).into()),*]); - - $( - $( - let _ = chain.insert_tx($txid, $tx_height).expect("should succeed"); - )* - )? - - chain + bdk_chain::local_chain::LocalChain::from_blocks([$(($height, $hash).into()),*].into_iter().collect()) + .tip() + .expect("must have tip") }}; } diff --git a/crates/chain/tests/test_indexed_tx_graph.rs b/crates/chain/tests/test_indexed_tx_graph.rs index 2ebd913c2..3319b2594 100644 --- a/crates/chain/tests/test_indexed_tx_graph.rs +++ b/crates/chain/tests/test_indexed_tx_graph.rs @@ -8,7 +8,7 @@ use bdk_chain::{ keychain::{Balance, DerivationAdditions, KeychainTxOutIndex}, local_chain::LocalChain, tx_graph::Additions, - BlockId, ChainPosition, ConfirmationHeightAnchor, + ChainPosition, ConfirmationHeightAnchor, }; use bitcoin::{secp256k1::Secp256k1, BlockHash, OutPoint, Script, Transaction, TxIn, TxOut}; use miniscript::Descriptor; @@ -109,8 +109,8 @@ fn test_list_owned_txouts() { // Create Local chains let local_chain = (0..150) - .map(|i| (i as u32, h!("random"))) - .collect::>(); + .map(|i| (i as u32, Some(h!("random")))) + .collect::>>(); let local_chain = LocalChain::from(local_chain); // Initiate IndexedTxGraph @@ -212,9 +212,8 @@ fn test_list_owned_txouts() { ( *tx, local_chain - .blocks() - .get(&height) - .map(|&hash| BlockId { height, hash }) + .checkpoint(height) + .map(|cp| cp.block_id()) .map(|anchor_block| ConfirmationHeightAnchor { anchor_block, confirmation_height: anchor_block.height, @@ -231,9 +230,8 @@ fn test_list_owned_txouts() { |height: u32, graph: &IndexedTxGraph>| { let chain_tip = local_chain - .blocks() - .get(&height) - .map(|&hash| BlockId { height, hash }) + .checkpoint(height) + .map(|cp| cp.block_id()) .expect("block must exist"); let txouts = graph .graph() diff --git a/crates/chain/tests/test_local_chain.rs b/crates/chain/tests/test_local_chain.rs index 55d8af113..fc0a9a4ab 100644 --- a/crates/chain/tests/test_local_chain.rs +++ b/crates/chain/tests/test_local_chain.rs @@ -1,172 +1,224 @@ -use bdk_chain::local_chain::{ - ChangeSet, InsertBlockNotMatchingError, LocalChain, UpdateNotConnectedError, +use bdk_chain::{ + local_chain::{CannotConnectError, ChangeSet, CheckPoint, InsertBlockError, LocalChain}, + BlockId, }; use bitcoin::BlockHash; #[macro_use] mod common; -#[test] -fn add_first_tip() { - let chain = LocalChain::default(); - assert_eq!( - chain.determine_changeset(&local_chain![(0, h!("A"))]), - Ok([(0, Some(h!("A")))].into()), - "add first tip" - ); -} - -#[test] -fn add_second_tip() { - let chain = local_chain![(0, h!("A"))]; - assert_eq!( - chain.determine_changeset(&local_chain![(0, h!("A")), (1, h!("B"))]), - Ok([(1, Some(h!("B")))].into()) - ); -} - -#[test] -fn two_disjoint_chains_cannot_merge() { - let chain1 = local_chain![(0, h!("A"))]; - let chain2 = local_chain![(1, h!("B"))]; - assert_eq!( - chain1.determine_changeset(&chain2), - Err(UpdateNotConnectedError(0)) - ); -} - -#[test] -fn duplicate_chains_should_merge() { - let chain1 = local_chain![(0, h!("A"))]; - let chain2 = local_chain![(0, h!("A"))]; - assert_eq!(chain1.determine_changeset(&chain2), Ok(Default::default())); -} - -#[test] -fn can_introduce_older_checkpoints() { - let chain1 = local_chain![(2, h!("C")), (3, h!("D"))]; - let chain2 = local_chain![(1, h!("B")), (2, h!("C"))]; - - assert_eq!( - chain1.determine_changeset(&chain2), - Ok([(1, Some(h!("B")))].into()) - ); -} - -#[test] -fn fix_blockhash_before_agreement_point() { - let chain1 = local_chain![(0, h!("im-wrong")), (1, h!("we-agree"))]; - let chain2 = local_chain![(0, h!("fix")), (1, h!("we-agree"))]; - - assert_eq!( - chain1.determine_changeset(&chain2), - Ok([(0, Some(h!("fix")))].into()) - ) -} - -/// B and C are in both chain and update -/// ``` -/// | 0 | 1 | 2 | 3 | 4 -/// chain | B C -/// update | A B C D -/// ``` -/// This should succeed with the point of agreement being C and A should be added in addition. -#[test] -fn two_points_of_agreement() { - let chain1 = local_chain![(1, h!("B")), (2, h!("C"))]; - let chain2 = local_chain![(0, h!("A")), (1, h!("B")), (2, h!("C")), (3, h!("D"))]; - - assert_eq!( - chain1.determine_changeset(&chain2), - Ok([(0, Some(h!("A"))), (3, Some(h!("D")))].into()), - ); +#[derive(Debug)] +struct TestLocalChain<'a> { + name: &'static str, + chain: LocalChain, + new_tip: CheckPoint, + exp: ExpectedResult<'a>, } -/// Update and chain does not connect: -/// ``` -/// | 0 | 1 | 2 | 3 | 4 -/// chain | B C -/// update | A B D -/// ``` -/// This should fail as we cannot figure out whether C & D are on the same chain -#[test] -fn update_and_chain_does_not_connect() { - let chain1 = local_chain![(1, h!("B")), (2, h!("C"))]; - let chain2 = local_chain![(0, h!("A")), (1, h!("B")), (3, h!("D"))]; - - assert_eq!( - chain1.determine_changeset(&chain2), - Err(UpdateNotConnectedError(2)), - ); +#[derive(Debug, PartialEq)] +enum ExpectedResult<'a> { + Ok { + changeset: &'a [(u32, Option)], + init_changeset: &'a [(u32, Option)], + }, + Err(CannotConnectError), } -/// Transient invalidation: -/// ``` -/// | 0 | 1 | 2 | 3 | 4 | 5 -/// chain | A B C E -/// update | A B' C' D -/// ``` -/// This should succeed and invalidate B,C and E with point of agreement being A. -#[test] -fn transitive_invalidation_applies_to_checkpoints_higher_than_invalidation() { - let chain1 = local_chain![(0, h!("A")), (2, h!("B")), (3, h!("C")), (5, h!("E"))]; - let chain2 = local_chain![(0, h!("A")), (2, h!("B'")), (3, h!("C'")), (4, h!("D"))]; - - assert_eq!( - chain1.determine_changeset(&chain2), - Ok([ - (2, Some(h!("B'"))), - (3, Some(h!("C'"))), - (4, Some(h!("D"))), - (5, None), - ] - .into()) - ); -} +impl<'a> TestLocalChain<'a> { + fn run(mut self) { + let got_changeset = match self.chain.update(self.new_tip) { + Ok((apply, changeset)) => { + apply(); + changeset + } + Err(err) => { + assert_eq!(ExpectedResult::Err(err), self.exp); + return; + } + }; -/// Transient invalidation: -/// ``` -/// | 0 | 1 | 2 | 3 | 4 -/// chain | B C E -/// update | B' C' D -/// ``` -/// -/// This should succeed and invalidate B, C and E with no point of agreement -#[test] -fn transitive_invalidation_applies_to_checkpoints_higher_than_invalidation_no_point_of_agreement() { - let chain1 = local_chain![(1, h!("B")), (2, h!("C")), (4, h!("E"))]; - let chain2 = local_chain![(1, h!("B'")), (2, h!("C'")), (3, h!("D"))]; - - assert_eq!( - chain1.determine_changeset(&chain2), - Ok([ - (1, Some(h!("B'"))), - (2, Some(h!("C'"))), - (3, Some(h!("D"))), - (4, None) - ] - .into()) - ) + match self.exp { + ExpectedResult::Ok { + changeset, + init_changeset, + } => { + assert_eq!( + got_changeset, + changeset.iter().cloned().collect(), + "{}: unexpected changeset", + self.name + ); + assert_eq!( + self.chain.initial_changeset(), + init_changeset.iter().cloned().collect(), + "{}: unexpected initial changeset", + self.name + ); + } + ExpectedResult::Err(err) => panic!( + "expected error ({}), got non-error result: {:?}", + err, got_changeset + ), + } + } } -/// Transient invalidation: -/// ``` -/// | 0 | 1 | 2 | 3 | 4 -/// chain | A B C E -/// update | B' C' D -/// ``` -/// -/// This should fail since although it tells us that B and C are invalid it doesn't tell us whether -/// A was invalid. #[test] -fn invalidation_but_no_connection() { - let chain1 = local_chain![(0, h!("A")), (1, h!("B")), (2, h!("C")), (4, h!("E"))]; - let chain2 = local_chain![(1, h!("B'")), (2, h!("C'")), (3, h!("D"))]; - - assert_eq!( - chain1.determine_changeset(&chain2), - Err(UpdateNotConnectedError(0)) - ) +fn update() { + [ + TestLocalChain { + name: "add first tip", + chain: local_chain![], + new_tip: chain_update![(0, h!("A"))], + exp: ExpectedResult::Ok { + changeset: &[(0, Some(h!("A")))], + init_changeset: &[(0, Some(h!("A")))], + }, + }, + TestLocalChain { + name: "add second tip", + chain: local_chain![(0, h!("A"))], + new_tip: chain_update![(0, h!("A")), (1, h!("B"))], + exp: ExpectedResult::Ok { + changeset: &[(1, Some(h!("B")))], + init_changeset: &[(0, Some(h!("A"))), (1, Some(h!("B")))], + }, + }, + TestLocalChain { + name: "two disjoint chains cannot merge", + chain: local_chain![(0, h!("A"))], + new_tip: chain_update![(1, h!("B"))], + exp: ExpectedResult::Err(CannotConnectError { + try_include: BlockId { + height: 0, + hash: h!("A"), + }, + }), + }, + TestLocalChain { + name: "duplicate chains should merge", + chain: local_chain![(0, h!("A"))], + new_tip: chain_update![(0, h!("A"))], + exp: ExpectedResult::Ok { + changeset: &[], + init_changeset: &[(0, Some(h!("A")))], + }, + }, + TestLocalChain { + name: "can introduce older checkpoints", + chain: local_chain![(2, h!("C")), (3, h!("D"))], + new_tip: chain_update![(1, h!("B")), (2, h!("C"))], + exp: ExpectedResult::Ok { + changeset: &[(1, Some(h!("B")))], + init_changeset: &[(1, Some(h!("B"))), (2, Some(h!("C"))), (3, Some(h!("D")))], + }, + }, + TestLocalChain { + name: "fix blockhash before agreement point", + chain: local_chain![(0, h!("im-wrong")), (1, h!("we-agree"))], + new_tip: chain_update![(0, h!("fix")), (1, h!("we-agree"))], + exp: ExpectedResult::Ok { + changeset: &[(0, Some(h!("fix")))], + init_changeset: &[(0, Some(h!("fix"))), (1, Some(h!("we-agree")))], + }, + }, + // B and C are in both chain and update + // | 0 | 1 | 2 | 3 | 4 + // chain | B C + // update | A B C D + // This should succeed with the point of agreement being C and A should be added in addition. + TestLocalChain { + name: "two points of agreement", + chain: local_chain![(1, h!("B")), (2, h!("C"))], + new_tip: chain_update![(0, h!("A")), (1, h!("B")), (2, h!("C")), (3, h!("D"))], + exp: ExpectedResult::Ok { + changeset: &[(0, Some(h!("A"))), (3, Some(h!("D")))], + init_changeset: &[ + (0, Some(h!("A"))), + (1, Some(h!("B"))), + (2, Some(h!("C"))), + (3, Some(h!("D"))), + ], + }, + }, + // Update and chain does not connect: + // | 0 | 1 | 2 | 3 | 4 + // chain | B C + // update | A B D + // This should fail as we cannot figure out whether C & D are on the same chain + TestLocalChain { + name: "update and chain does not connect", + chain: local_chain![(1, h!("B")), (2, h!("C"))], + new_tip: chain_update![(0, h!("A")), (1, h!("B")), (3, h!("D"))], + exp: ExpectedResult::Err(CannotConnectError { + try_include: BlockId { + height: 2, + hash: h!("C"), + }, + }), + }, + // Transient invalidation: + // | 0 | 1 | 2 | 3 | 4 | 5 + // chain | A B C E + // update | A B' C' D + // This should succeed and invalidate B,C and E with point of agreement being A. + TestLocalChain { + name: "transitive invalidation applies to checkpoints higher than invalidation", + chain: local_chain![(0, h!("A")), (2, h!("B")), (3, h!("C")), (5, h!("E"))], + new_tip: chain_update![(0, h!("A")), (2, h!("B'")), (3, h!("C'")), (4, h!("D"))], + exp: ExpectedResult::Ok { + changeset: &[ + (2, Some(h!("B'"))), + (3, Some(h!("C'"))), + (4, Some(h!("D"))), + (5, None), + ], + init_changeset: &[ + (0, Some(h!("A"))), + (2, Some(h!("B'"))), + (3, Some(h!("C'"))), + (4, Some(h!("D"))), + ], + }, + }, + // Transient invalidation: + // | 0 | 1 | 2 | 3 | 4 + // chain | B C E + // update | B' C' D + // This should succeed and invalidate B, C and E with no point of agreement + TestLocalChain { + name: "transitive invalidation applies to checkpoints higher than invalidation no point of agreement", + chain: local_chain![(1, h!("B")), (2, h!("C")), (4, h!("E"))], + new_tip: chain_update![(1, h!("B'")), (2, h!("C'")), (3, h!("D"))], + exp: ExpectedResult::Ok { + changeset: &[ + (1, Some(h!("B'"))), + (2, Some(h!("C'"))), + (3, Some(h!("D"))), + (4, None) + ], + init_changeset: &[ + (1, Some(h!("B'"))), + (2, Some(h!("C'"))), + (3, Some(h!("D"))), + ], + }, + }, + // Transient invalidation: + // | 0 | 1 | 2 | 3 | 4 + // chain | A B C E + // update | B' C' D + // This should fail since although it tells us that B and C are invalid it doesn't tell us whether + // A was invalid. + TestLocalChain { + name: "invalidation but no connection", + chain: local_chain![(0, h!("A")), (1, h!("B")), (2, h!("C")), (4, h!("E"))], + new_tip: chain_update![(1, h!("B'")), (2, h!("C'")), (3, h!("D"))], + exp: ExpectedResult::Err(CannotConnectError { try_include: BlockId { height: 0, hash: h!("A") } }), + }, + ] + .into_iter() + .for_each(TestLocalChain::run); } #[test] @@ -174,7 +226,7 @@ fn insert_block() { struct TestCase { original: LocalChain, insert: (u32, BlockHash), - expected_result: Result, + expected_result: Result, expected_final: LocalChain, } @@ -206,7 +258,7 @@ fn insert_block() { TestCase { original: local_chain![(2, h!("K"))], insert: (2, h!("J")), - expected_result: Err(InsertBlockNotMatchingError { + expected_result: Err(InsertBlockError { height: 2, original_hash: h!("K"), update_hash: h!("J"), @@ -218,7 +270,9 @@ fn insert_block() { for (i, t) in test_cases.into_iter().enumerate() { let mut chain = t.original; assert_eq!( - chain.insert_block(t.insert.into()), + chain + .get_or_insert(t.insert.into()) + .map(|(_, changeset)| changeset), t.expected_result, "[{}] unexpected result when inserting block", i, diff --git a/crates/chain/tests/test_tx_graph.rs b/crates/chain/tests/test_tx_graph.rs index c272f97aa..bbffdaf31 100644 --- a/crates/chain/tests/test_tx_graph.rs +++ b/crates/chain/tests/test_tx_graph.rs @@ -697,7 +697,7 @@ fn test_chain_spends() { let _ = graph.insert_anchor( tx.txid(), ConfirmationHeightAnchor { - anchor_block: tip, + anchor_block: tip.block_id(), confirmation_height: *ht, }, ); @@ -705,10 +705,10 @@ fn test_chain_spends() { // Assert that confirmed spends are returned correctly. assert_eq!( - graph.get_chain_spend(&local_chain, tip, OutPoint::new(tx_0.txid(), 0)), + graph.get_chain_spend(&local_chain, tip.block_id(), OutPoint::new(tx_0.txid(), 0)), Some(( ChainPosition::Confirmed(&ConfirmationHeightAnchor { - anchor_block: tip, + anchor_block: tip.block_id(), confirmation_height: 98 }), tx_1.txid(), @@ -717,17 +717,17 @@ fn test_chain_spends() { // Check if chain position is returned correctly. assert_eq!( - graph.get_chain_position(&local_chain, tip, tx_0.txid()), + graph.get_chain_position(&local_chain, tip.block_id(), tx_0.txid()), // Some(ObservedAs::Confirmed(&local_chain.get_block(95).expect("block expected"))), Some(ChainPosition::Confirmed(&ConfirmationHeightAnchor { - anchor_block: tip, + anchor_block: tip.block_id(), confirmation_height: 95 })) ); // Even if unconfirmed tx has a last_seen of 0, it can still be part of a chain spend. assert_eq!( - graph.get_chain_spend(&local_chain, tip, OutPoint::new(tx_0.txid(), 1)), + graph.get_chain_spend(&local_chain, tip.block_id(), OutPoint::new(tx_0.txid(), 1)), Some((ChainPosition::Unconfirmed(0), tx_2.txid())), ); @@ -737,7 +737,7 @@ fn test_chain_spends() { // Check chain spend returned correctly. assert_eq!( graph - .get_chain_spend(&local_chain, tip, OutPoint::new(tx_0.txid(), 1)) + .get_chain_spend(&local_chain, tip.block_id(), OutPoint::new(tx_0.txid(), 1)) .unwrap(), (ChainPosition::Unconfirmed(1234567), tx_2.txid()) ); @@ -754,7 +754,7 @@ fn test_chain_spends() { // Because this tx conflicts with an already confirmed transaction, chain position should return none. assert!(graph - .get_chain_position(&local_chain, tip, tx_1_conflict.txid()) + .get_chain_position(&local_chain, tip.block_id(), tx_1_conflict.txid()) .is_none()); // Another conflicting tx that conflicts with tx_2. @@ -773,7 +773,7 @@ fn test_chain_spends() { // This should return a valid observation with correct last seen. assert_eq!( graph - .get_chain_position(&local_chain, tip, tx_2_conflict.txid()) + .get_chain_position(&local_chain, tip.block_id(), tx_2_conflict.txid()) .expect("position expected"), ChainPosition::Unconfirmed(1234568) ); @@ -781,14 +781,14 @@ fn test_chain_spends() { // Chain_spend now catches the new transaction as the spend. assert_eq!( graph - .get_chain_spend(&local_chain, tip, OutPoint::new(tx_0.txid(), 1)) + .get_chain_spend(&local_chain, tip.block_id(), OutPoint::new(tx_0.txid(), 1)) .expect("expect observation"), (ChainPosition::Unconfirmed(1234568), tx_2_conflict.txid()) ); // Chain position of the `tx_2` is now none, as it is older than `tx_2_conflict` assert!(graph - .get_chain_position(&local_chain, tip, tx_2.txid()) + .get_chain_position(&local_chain, tip.block_id(), tx_2.txid()) .is_none()); } diff --git a/crates/electrum/src/electrum_ext.rs b/crates/electrum/src/electrum_ext.rs index 1ec44d85c..5dc6a8b35 100644 --- a/crates/electrum/src/electrum_ext.rs +++ b/crates/electrum/src/electrum_ext.rs @@ -1,7 +1,7 @@ use bdk_chain::{ bitcoin::{hashes::hex::FromHex, BlockHash, OutPoint, Script, Transaction, Txid}, keychain::LocalUpdate, - local_chain::LocalChain, + local_chain::CheckPoint, tx_graph::{self, TxGraph}, Anchor, BlockId, ConfirmationHeightAnchor, ConfirmationTimeAnchor, }; @@ -14,21 +14,19 @@ use std::{ #[derive(Debug, Clone)] pub struct ElectrumUpdate { pub graph_update: HashMap>, - pub chain_update: LocalChain, + pub chain_update: CheckPoint, pub keychain_update: BTreeMap, } -impl Default for ElectrumUpdate { - fn default() -> Self { +impl ElectrumUpdate { + pub fn new(cp: CheckPoint) -> Self { Self { - graph_update: Default::default(), - chain_update: Default::default(), - keychain_update: Default::default(), + graph_update: HashMap::new(), + chain_update: cp, + keychain_update: BTreeMap::new(), } } -} -impl ElectrumUpdate { pub fn missing_full_txs(&self, graph: &TxGraph) -> Vec { self.graph_update .keys() @@ -56,7 +54,7 @@ impl ElectrumUpdate { Ok(LocalUpdate { keychain: self.keychain_update, graph: graph_update, - chain: self.chain_update, + tip: self.chain_update, }) } } @@ -128,7 +126,7 @@ impl ElectrumUpdate { graph.apply_additions(graph_additions); graph }, - chain: update.chain, + tip: update.tip, }) } } @@ -138,7 +136,7 @@ pub trait ElectrumExt { fn scan( &self, - local_chain: &BTreeMap, + prev_tip: Option, keychain_spks: BTreeMap>, txids: impl IntoIterator, outpoints: impl IntoIterator, @@ -148,7 +146,7 @@ pub trait ElectrumExt { fn scan_without_keychain( &self, - local_chain: &BTreeMap, + prev_tip: Option, misc_spks: impl IntoIterator, txids: impl IntoIterator, outpoints: impl IntoIterator, @@ -160,7 +158,7 @@ pub trait ElectrumExt { .map(|(i, spk)| (i as u32, spk)); self.scan( - local_chain, + prev_tip, [((), spk_iter)].into(), txids, outpoints, @@ -179,7 +177,7 @@ impl ElectrumExt for Client { fn scan( &self, - local_chain: &BTreeMap, + prev_tip: Option, keychain_spks: BTreeMap>, txids: impl IntoIterator, outpoints: impl IntoIterator, @@ -196,14 +194,10 @@ impl ElectrumExt for Client { let outpoints = outpoints.into_iter().collect::>(); let update = loop { - let mut update = ElectrumUpdate:: { - chain_update: prepare_chain_update(self, local_chain)?, - ..Default::default() - }; - let anchor_block = update - .chain_update - .tip() - .expect("must have atleast one block"); + let mut update = ElectrumUpdate::::new( + prepare_chain_update(self, prev_tip.clone())?, + ); + let anchor_block = update.chain_update.block_id(); if !request_spks.is_empty() { if !scanned_spks.is_empty() { @@ -271,39 +265,72 @@ impl ElectrumExt for Client { /// Prepare an update "template" based on the checkpoints of the `local_chain`. fn prepare_chain_update( client: &Client, - local_chain: &BTreeMap, -) -> Result { - let mut update = LocalChain::default(); - - // Find the local chain block that is still there so our update can connect to the local chain. - for (&existing_height, &existing_hash) in local_chain.iter().rev() { - // TODO: a batch request may be safer, as a reorg that happens when we are obtaining - // `block_header`s will result in inconsistencies - let current_hash = client.block_header(existing_height as usize)?.block_hash(); - let _ = update - .insert_block(BlockId { - height: existing_height, - hash: current_hash, - }) - .expect("This never errors because we are working with a fresh chain"); - - if current_hash == existing_hash { - break; + prev_tip: Option, +) -> Result { + let mut header_notification = client.block_headers_subscribe()?; + + let (new_blocks, mut last_cp) = 'retry: loop { + let tip = BlockId { + height: header_notification.height as _, + hash: header_notification.header.block_hash(), + }; + let tip_parent = BlockId { + height: (header_notification.height - 1) as _, + hash: header_notification.header.prev_blockhash, + }; + + // this records new blocks, including blocks that are to be replaced + let mut new_blocks = [tip_parent, tip] + .into_iter() + .map(|b| (b.height, b.hash)) + .collect::>(); + let mut agreement_cp = Option::::None; + + for cp in prev_tip.iter().flat_map(CheckPoint::iter) { + let cp_block = cp.block_id(); + // TODO: a batch request may be safer, as a reorg that happens when we are obtaining + // `block_header`s will result in inconsistencies + let hash = client.block_header(cp_block.height as _)?.block_hash(); + if hash == cp_block.hash { + agreement_cp = Some(cp); + break; + } + new_blocks.insert(cp_block.height, hash); } - } - // Insert the new tip so new transactions will be accepted into the sparsechain. - let tip = { - let (height, hash) = crate::get_tip(client)?; - BlockId { height, hash } + // check for tip changes + loop { + match client.block_headers_pop()? { + Some(new_notification) => { + let new_height = new_notification.height; + header_notification = new_notification; + if new_height as u32 <= tip.height { + // we may have a reorg + // reorg-detection logic can be improved (false positives are possible) + continue 'retry; + } + } + None => { + let new_blocks = match &agreement_cp { + // `new_blocks` should only include blocks that are actually new + Some(agreement_cp) => new_blocks.split_off(&(agreement_cp.height() + 1)), + None => new_blocks, + }; + + break 'retry (new_blocks, agreement_cp); + } + }; + } }; - if update.insert_block(tip).is_err() { - // There has been a re-org before we even begin scanning addresses. - // Just recursively call (this should never happen). - return prepare_chain_update(client, local_chain); + + // construct checkpoints + for (height, hash) in new_blocks { + let cp = CheckPoint::new_with_prev(BlockId { height, hash }, last_cp) + .expect("heights should not conflict"); + last_cp = Some(cp); } - Ok(update) + Ok(last_cp.expect("must have atleast one checkpoint")) } fn determine_tx_anchor( diff --git a/crates/electrum/src/lib.rs b/crates/electrum/src/lib.rs index 4826c6dda..ec693fda9 100644 --- a/crates/electrum/src/lib.rs +++ b/crates/electrum/src/lib.rs @@ -15,21 +15,12 @@ //! //! Refer to [`bdk_electrum_example`] for a complete example. //! -//! [`ElectrumClient::scan`]: ElectrumClient::scan +//! [`ElectrumClient::scan`]: electrum_client::ElectrumClient::scan //! [`missing_full_txs`]: ElectrumUpdate::missing_full_txs -//! [`batch_transaction_get`]: ElectrumApi::batch_transaction_get +//! [`batch_transaction_get`]: electrum_client::ElectrumApi::batch_transaction_get //! [`bdk_electrum_example`]: https://github.com/LLFourn/bdk_core_staging/tree/master/bdk_electrum_example -use bdk_chain::bitcoin::BlockHash; -use electrum_client::{Client, ElectrumApi, Error}; mod electrum_ext; pub use bdk_chain; pub use electrum_client; pub use electrum_ext::*; - -fn get_tip(client: &Client) -> Result<(u32, BlockHash), Error> { - // TODO: unsubscribe when added to the client, or is there a better call to use here? - client - .block_headers_subscribe() - .map(|data| (data.height as u32, data.header.block_hash())) -} diff --git a/crates/esplora/src/async_ext.rs b/crates/esplora/src/async_ext.rs index e496e415c..8023b7422 100644 --- a/crates/esplora/src/async_ext.rs +++ b/crates/esplora/src/async_ext.rs @@ -3,6 +3,7 @@ use bdk_chain::{ bitcoin::{BlockHash, OutPoint, Script, Txid}, collections::BTreeMap, keychain::LocalUpdate, + local_chain::CheckPoint, BlockId, ConfirmationTimeAnchor, }; use esplora_client::{Error, OutputStatus, TxStatus}; @@ -35,7 +36,7 @@ pub trait EsploraAsyncExt { #[allow(clippy::result_large_err)] // FIXME async fn scan( &self, - local_chain: &BTreeMap, + prev_tip: Option, keychain_spks: BTreeMap< K, impl IntoIterator + Send> + Send, @@ -52,14 +53,14 @@ pub trait EsploraAsyncExt { #[allow(clippy::result_large_err)] // FIXME async fn scan_without_keychain( &self, - local_chain: &BTreeMap, + prev_tip: Option, misc_spks: impl IntoIterator + Send> + Send, txids: impl IntoIterator + Send> + Send, outpoints: impl IntoIterator + Send> + Send, parallel_requests: usize, ) -> Result, Error> { self.scan( - local_chain, + prev_tip, [( (), misc_spks @@ -83,7 +84,7 @@ impl EsploraAsyncExt for esplora_client::AsyncClient { #[allow(clippy::result_large_err)] // FIXME async fn scan( &self, - local_chain: &BTreeMap, + prev_tip: Option, keychain_spks: BTreeMap< K, impl IntoIterator + Send> + Send, @@ -95,33 +96,60 @@ impl EsploraAsyncExt for esplora_client::AsyncClient { ) -> Result, Error> { let parallel_requests = Ord::max(parallel_requests, 1); - let (mut update, tip_at_start) = loop { - let mut update = LocalUpdate::::default(); + let (new_blocks, mut last_cp) = 'retry: loop { + let new_tip = loop { + let hash = self.get_tip_hash().await?; + let status = self.get_block_status(&hash).await?; + if status.in_best_chain && status.next_best.is_none() { + break BlockId { + height: status.height.expect("must have height"), + hash, + }; + } + }; + + let mut new_blocks = core::iter::once((new_tip.height, new_tip.hash)) + .collect::>(); - for (&height, &original_hash) in local_chain.iter().rev() { - let update_block_id = BlockId { - height, - hash: self.get_block_hash(height).await?, - }; - let _ = update - .chain - .insert_block(update_block_id) - .expect("cannot repeat height here"); - if update_block_id.hash == original_hash { + let mut agreement_cp = Option::::None; + + for cp in prev_tip.iter().flat_map(CheckPoint::iter) { + let cp_block = cp.block_id(); + let hash = self.get_block_hash(cp_block.height).await?; + if hash == cp_block.hash { + agreement_cp = Some(cp); break; } + new_blocks.insert(cp_block.height, hash); } - let tip_at_start = BlockId { - height: self.get_height().await?, - hash: self.get_tip_hash().await?, - }; + // check for tip changes + // retry if there are changes to the tip + let status = self.get_block_status(&new_tip.hash).await?; - if update.chain.insert_block(tip_at_start).is_ok() { - break (update, tip_at_start); + if !status.in_best_chain || status.next_best.is_some() { + continue 'retry; } + + // `new_blocks` should only include blocks that are actually new + let new_blocks = match &agreement_cp { + Some(agreement_cp) => new_blocks.split_off(&(agreement_cp.height() + 1)), + None => new_blocks, + }; + break 'retry (new_blocks, agreement_cp); }; + // construct checkpoints + for (&height, &hash) in new_blocks.iter() { + last_cp = Some( + CheckPoint::new_with_prev(BlockId { height, hash }, last_cp) + .expect("heights should not conflict"), + ); + } + + let tip = last_cp.expect("must have atleast one checkpoint"); + let mut update = LocalUpdate::::new(tip.clone()); + for (keychain, spks) in keychain_spks { let mut spks = spks.into_iter(); let mut last_active_index = None; @@ -172,7 +200,7 @@ impl EsploraAsyncExt for esplora_client::AsyncClient { empty_scripts = 0; } for tx in related_txs { - let anchor = map_confirmation_time_anchor(&tx.status, tip_at_start); + let anchor = map_confirmation_time_anchor(&tx.status, &tip); let _ = update.graph.insert_tx(tx.to_tx()); if let Some(anchor) = anchor { @@ -202,7 +230,7 @@ impl EsploraAsyncExt for esplora_client::AsyncClient { } match self.get_tx_status(&txid).await? { tx_status if tx_status.confirmed => { - if let Some(anchor) = map_confirmation_time_anchor(&tx_status, tip_at_start) { + if let Some(anchor) = map_confirmation_time_anchor(&tx_status, &tip) { let _ = update.graph.insert_anchor(txid, anchor); } } @@ -236,7 +264,7 @@ impl EsploraAsyncExt for esplora_client::AsyncClient { for (tx, status) in op_txs { let txid = tx.txid(); - let anchor = map_confirmation_time_anchor(&status, tip_at_start); + let anchor = map_confirmation_time_anchor(&status, &tip); let _ = update.graph.insert_tx(tx); if let Some(anchor) = anchor { @@ -245,23 +273,34 @@ impl EsploraAsyncExt for esplora_client::AsyncClient { } } - if tip_at_start.hash != self.get_block_hash(tip_at_start.height).await? { + if tip.hash() != self.get_block_hash(tip.height()).await? { // A reorg occurred, so let's find out where all the txids we found are now in the chain let txids_found = update .graph .full_txs() .map(|tx_node| tx_node.txid) .collect::>(); - update.chain = EsploraAsyncExt::scan_without_keychain( + let new_update = EsploraAsyncExt::scan_without_keychain( self, - local_chain, + Some(tip), [], txids_found, [], parallel_requests, ) - .await? - .chain; + .await?; + update.tip = new_update.tip; + update.graph = new_update.graph; + // update.chain = EsploraAsyncExt::scan_without_keychain( + // self, + // local_chain, + // [], + // txids_found, + // [], + // parallel_requests, + // ) + // .await? + // .chain; } Ok(update) diff --git a/crates/esplora/src/blocking_ext.rs b/crates/esplora/src/blocking_ext.rs index 6e1c61993..2c5ddc6e9 100644 --- a/crates/esplora/src/blocking_ext.rs +++ b/crates/esplora/src/blocking_ext.rs @@ -1,5 +1,6 @@ use bdk_chain::bitcoin::{BlockHash, OutPoint, Script, Txid}; use bdk_chain::collections::BTreeMap; +use bdk_chain::local_chain::CheckPoint; use bdk_chain::BlockId; use bdk_chain::{keychain::LocalUpdate, ConfirmationTimeAnchor}; use esplora_client::{Error, OutputStatus, TxStatus}; @@ -27,7 +28,7 @@ pub trait EsploraExt { #[allow(clippy::result_large_err)] // FIXME fn scan( &self, - local_chain: &BTreeMap, + prev_tip: Option, keychain_spks: BTreeMap>, txids: impl IntoIterator, outpoints: impl IntoIterator, @@ -41,14 +42,14 @@ pub trait EsploraExt { #[allow(clippy::result_large_err)] // FIXME fn scan_without_keychain( &self, - local_chain: &BTreeMap, + prev_tip: Option, misc_spks: impl IntoIterator, txids: impl IntoIterator, outpoints: impl IntoIterator, parallel_requests: usize, ) -> Result, Error> { self.scan( - local_chain, + prev_tip, [( (), misc_spks @@ -68,7 +69,7 @@ pub trait EsploraExt { impl EsploraExt for esplora_client::BlockingClient { fn scan( &self, - local_chain: &BTreeMap, + prev_tip: Option, keychain_spks: BTreeMap>, txids: impl IntoIterator, outpoints: impl IntoIterator, @@ -77,33 +78,59 @@ impl EsploraExt for esplora_client::BlockingClient { ) -> Result, Error> { let parallel_requests = Ord::max(parallel_requests, 1); - let (mut update, tip_at_start) = loop { - let mut update = LocalUpdate::::default(); - - for (&height, &original_hash) in local_chain.iter().rev() { - let update_block_id = BlockId { - height, - hash: self.get_block_hash(height)?, - }; - let _ = update - .chain - .insert_block(update_block_id) - .expect("cannot repeat height here"); - if update_block_id.hash == original_hash { + let (new_blocks, mut last_cp) = 'retry: loop { + let new_tip = loop { + let hash = self.get_tip_hash()?; + let status = self.get_block_status(&hash)?; + if status.in_best_chain && status.next_best.is_none() { + break BlockId { + height: status.height.expect("must have height"), + hash, + }; + } + }; + + let mut new_blocks = core::iter::once((new_tip.height, new_tip.hash)) + .collect::>(); + + let mut agreement_cp = Option::::None; + + for cp in prev_tip.iter().flat_map(CheckPoint::iter) { + let cp_block = cp.block_id(); + let hash = self.get_block_hash(cp_block.height)?; + if hash == cp_block.hash { + agreement_cp = Some(cp); break; } + new_blocks.insert(cp_block.height, hash); } - let tip_at_start = BlockId { - height: self.get_height()?, - hash: self.get_tip_hash()?, - }; - - if update.chain.insert_block(tip_at_start).is_ok() { - break (update, tip_at_start); + // check for tip changes + // retry if there are changes to the tip + let status = self.get_block_status(&new_tip.hash)?; + if !status.in_best_chain || status.next_best.is_some() { + continue 'retry; } + + // `new_blocks` should only include blocks that are actually new + let new_blocks = match &agreement_cp { + Some(agreement_cp) => new_blocks.split_off(&(agreement_cp.height() + 1)), + None => new_blocks, + }; + break 'retry (new_blocks, agreement_cp); }; + // construct checkpoints + for (&height, &hash) in new_blocks.iter() { + last_cp = Some( + CheckPoint::new_with_prev(BlockId { height, hash }, last_cp) + .expect("heights should not conflict"), + ); + } + + let tip = last_cp.expect("must have atleast one checkpoint"); + let mut update = LocalUpdate::::new(tip.clone()); + for (keychain, spks) in keychain_spks { let mut spks = spks.into_iter(); let mut last_active_index = None; @@ -155,7 +182,7 @@ impl EsploraExt for esplora_client::BlockingClient { empty_scripts = 0; } for tx in related_txs { - let anchor = map_confirmation_time_anchor(&tx.status, tip_at_start); + let anchor = map_confirmation_time_anchor(&tx.status, &tip); let _ = update.graph.insert_tx(tx.to_tx()); if let Some(anchor) = anchor { @@ -187,7 +214,7 @@ impl EsploraExt for esplora_client::BlockingClient { tx_status @ TxStatus { confirmed: true, .. } => { - if let Some(anchor) = map_confirmation_time_anchor(&tx_status, tip_at_start) { + if let Some(anchor) = map_confirmation_time_anchor(&tx_status, &tip) { let _ = update.graph.insert_anchor(txid, anchor); } } @@ -219,7 +246,7 @@ impl EsploraExt for esplora_client::BlockingClient { for (tx, status) in op_txs { let txid = tx.txid(); - let anchor = map_confirmation_time_anchor(&status, tip_at_start); + let anchor = map_confirmation_time_anchor(&status, &tip); let _ = update.graph.insert_tx(tx); if let Some(anchor) = anchor { @@ -228,22 +255,23 @@ impl EsploraExt for esplora_client::BlockingClient { } } - if tip_at_start.hash != self.get_block_hash(tip_at_start.height)? { + if tip.hash() != self.get_block_hash(tip.height())? { // A reorg occurred, so let's find out where all the txids we found are now in the chain let txids_found = update .graph .full_txs() .map(|tx_node| tx_node.txid) .collect::>(); - update.chain = EsploraExt::scan_without_keychain( + let new_update = EsploraExt::scan_without_keychain( self, - local_chain, + Some(tip), [], txids_found, [], parallel_requests, - )? - .chain; + )?; + update.tip = new_update.tip; + update.graph = new_update.graph; } Ok(update) diff --git a/crates/esplora/src/lib.rs b/crates/esplora/src/lib.rs index d5f8d8af6..d1c68e81f 100644 --- a/crates/esplora/src/lib.rs +++ b/crates/esplora/src/lib.rs @@ -1,5 +1,5 @@ #![doc = include_str!("../README.md")] -use bdk_chain::{BlockId, ConfirmationTimeAnchor}; +use bdk_chain::{local_chain::CheckPoint, ConfirmationTimeAnchor}; use esplora_client::TxStatus; pub use esplora_client; @@ -16,11 +16,11 @@ pub use async_ext::*; pub(crate) fn map_confirmation_time_anchor( tx_status: &TxStatus, - tip_at_start: BlockId, + tip: &CheckPoint, ) -> Option { match (tx_status.block_time, tx_status.block_height) { (Some(confirmation_time), Some(confirmation_height)) => Some(ConfirmationTimeAnchor { - anchor_block: tip_at_start, + anchor_block: tip.block_id(), confirmation_height, confirmation_time, }), diff --git a/example-crates/example_electrum/src/main.rs b/example-crates/example_electrum/src/main.rs index 41d394234..b5ca8c2a7 100644 --- a/example-crates/example_electrum/src/main.rs +++ b/example-crates/example_electrum/src/main.rs @@ -5,7 +5,7 @@ use std::{ }; use bdk_chain::{ - bitcoin::{Address, BlockHash, Network, OutPoint, Txid}, + bitcoin::{Address, Network, OutPoint, Txid}, indexed_tx_graph::{IndexedAdditions, IndexedTxGraph}, keychain::LocalChangeSet, local_chain::LocalChain, @@ -23,7 +23,7 @@ use example_cli::{ const DB_MAGIC: &[u8] = b"bdk_example_electrum"; const DB_PATH: &str = ".bdk_electrum_example.db"; -const ASSUME_FINAL_DEPTH: usize = 10; +// const ASSUME_FINAL_DEPTH: usize = 10; #[derive(Subcommand, Debug, Clone)] enum ElectrumCommands { @@ -73,11 +73,7 @@ fn main() -> anyhow::Result<()> { graph }); - let chain = Mutex::new({ - let mut chain = LocalChain::default(); - chain.apply_changeset(init_changeset.chain_changeset); - chain - }); + let chain = Mutex::new(LocalChain::from_changeset(init_changeset.chain_changeset)); let electrum_url = match args.network { Network::Bitcoin => "ssl://electrum.blockstream.info:50002", @@ -119,7 +115,7 @@ fn main() -> anyhow::Result<()> { stop_gap, scan_options, } => { - let (keychain_spks, local_chain) = { + let (keychain_spks, tip) = { let graph = &*graph.lock().unwrap(); let chain = &*chain.lock().unwrap(); @@ -142,20 +138,13 @@ fn main() -> anyhow::Result<()> { }) .collect::>(); - let c = chain - .blocks() - .iter() - .rev() - .take(ASSUME_FINAL_DEPTH) - .map(|(k, v)| (*k, *v)) - .collect::>(); - - (keychain_spks, c) + let tip = chain.tip(); + (keychain_spks, tip) }; client .scan( - &local_chain, + tip, keychain_spks, core::iter::empty(), core::iter::empty(), @@ -174,7 +163,7 @@ fn main() -> anyhow::Result<()> { // Get a short lock on the tracker to get the spks we're interested in let graph = graph.lock().unwrap(); let chain = chain.lock().unwrap(); - let chain_tip = chain.tip().unwrap_or_default(); + let chain_tip = chain.tip().map(|cp| cp.block_id()).unwrap_or_default(); if !(all_spks || unused_spks || utxos || unconfirmed) { unused_spks = true; @@ -254,19 +243,13 @@ fn main() -> anyhow::Result<()> { })); } - let c = chain - .blocks() - .iter() - .rev() - .take(ASSUME_FINAL_DEPTH) - .map(|(k, v)| (*k, *v)) - .collect::>(); + let tip = chain.tip(); // drop lock on graph and chain drop((graph, chain)); let update = client - .scan_without_keychain(&c, spks, txids, outpoints, scan_options.batch_size) + .scan_without_keychain(tip, spks, txids, outpoints, scan_options.batch_size) .context("scanning the blockchain")?; ElectrumUpdate { graph_update: update.graph_update, @@ -292,7 +275,7 @@ fn main() -> anyhow::Result<()> { let mut chain = chain.lock().unwrap(); let mut graph = graph.lock().unwrap(); - let chain_changeset = chain.apply_update(final_update.chain)?; + let chain_changeset = chain.apply_update(final_update.tip)?; let indexed_additions = { let mut additions = IndexedAdditions::::default(); diff --git a/example-crates/wallet_electrum/src/main.rs b/example-crates/wallet_electrum/src/main.rs index db80f106d..2355a6fb0 100644 --- a/example-crates/wallet_electrum/src/main.rs +++ b/example-crates/wallet_electrum/src/main.rs @@ -35,7 +35,7 @@ fn main() -> Result<(), Box> { print!("Syncing..."); let client = electrum_client::Client::new("ssl://electrum.blockstream.info:60002")?; - let local_chain = wallet.checkpoints(); + let prev_tip = wallet.latest_checkpoint(); let keychain_spks = wallet .spks_of_all_keychains() .into_iter() @@ -52,8 +52,7 @@ fn main() -> Result<(), Box> { }) .collect(); - let electrum_update = - client.scan(local_chain, keychain_spks, None, None, STOP_GAP, BATCH_SIZE)?; + let electrum_update = client.scan(prev_tip, keychain_spks, None, None, STOP_GAP, BATCH_SIZE)?; println!(); diff --git a/example-crates/wallet_esplora/src/main.rs b/example-crates/wallet_esplora/src/main.rs index 119d9cbd7..8ae042a96 100644 --- a/example-crates/wallet_esplora/src/main.rs +++ b/example-crates/wallet_esplora/src/main.rs @@ -36,7 +36,7 @@ fn main() -> Result<(), Box> { let client = esplora_client::Builder::new("https://blockstream.info/testnet/api").build_blocking()?; - let local_chain = wallet.checkpoints(); + let prev_tip = wallet.latest_checkpoint(); let keychain_spks = wallet .spks_of_all_keychains() .into_iter() @@ -53,7 +53,7 @@ fn main() -> Result<(), Box> { }) .collect(); let update = client.scan( - local_chain, + prev_tip, keychain_spks, None, None, diff --git a/example-crates/wallet_esplora_async/src/main.rs b/example-crates/wallet_esplora_async/src/main.rs index 7cb218ec2..afe751b73 100644 --- a/example-crates/wallet_esplora_async/src/main.rs +++ b/example-crates/wallet_esplora_async/src/main.rs @@ -37,7 +37,7 @@ async fn main() -> Result<(), Box> { let client = esplora_client::Builder::new("https://blockstream.info/testnet/api").build_async()?; - let local_chain = wallet.checkpoints(); + let prev_cp = wallet.latest_checkpoint(); let keychain_spks = wallet .spks_of_all_keychains() .into_iter() @@ -54,14 +54,7 @@ async fn main() -> Result<(), Box> { }) .collect(); let update = client - .scan( - local_chain, - keychain_spks, - [], - [], - STOP_GAP, - PARALLEL_REQUESTS, - ) + .scan(prev_cp, keychain_spks, [], [], STOP_GAP, PARALLEL_REQUESTS) .await?; println!(); wallet.apply_update(update)?; From ca63c97de8070bb1bba8828032d91d206dcaefd5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Mon, 19 Jun 2023 05:22:49 +0200 Subject: [PATCH 3/9] [bitcoind_rpc] Initial work on `BitcoindRpcIter` --- Cargo.toml | 1 + crates/bitcoind_rpc/Cargo.toml | 11 +++ crates/bitcoind_rpc/src/lib.rs | 164 +++++++++++++++++++++++++++++++++ 3 files changed, 176 insertions(+) create mode 100644 crates/bitcoind_rpc/Cargo.toml create mode 100644 crates/bitcoind_rpc/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index c5f2692da..adfc16c5d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ members = [ "crates/file_store", "crates/electrum", "crates/esplora", + "crates/bitcoind_rpc", "example-crates/example_cli", "example-crates/example_electrum", "example-crates/wallet_electrum", diff --git a/crates/bitcoind_rpc/Cargo.toml b/crates/bitcoind_rpc/Cargo.toml new file mode 100644 index 000000000..22c6514e2 --- /dev/null +++ b/crates/bitcoind_rpc/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "bitcoind_rpc" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +bdk_chain = { path = "../chain", version = "0.4.0", features = ["serde", "miniscript"] } +bitcoincore-rpc = { version = "0.16" } +anyhow = { version = "1" } diff --git a/crates/bitcoind_rpc/src/lib.rs b/crates/bitcoind_rpc/src/lib.rs new file mode 100644 index 000000000..c8357fc56 --- /dev/null +++ b/crates/bitcoind_rpc/src/lib.rs @@ -0,0 +1,164 @@ +use std::collections::HashSet; + +use bdk_chain::{ + bitcoin::{Transaction, Txid}, + local_chain::CheckPoint, + BlockId, +}; +use bitcoincore_rpc::{bitcoincore_rpc_json::GetBlockResult, Client, RpcApi}; + +#[derive(Debug, Clone)] +pub enum BitcoindRpcItem { + Block { + cp: CheckPoint, + info: Box, + }, + Mempool { + cp: CheckPoint, + txs: Vec<(Transaction, u64)>, + }, +} + +pub struct BitcoindRpcIter<'a> { + client: &'a Client, + fallback_height: u32, + + last_cp: Option, + last_info: Option, + + seen_txids: HashSet, +} + +impl<'a> Iterator for BitcoindRpcIter<'a> { + type Item = Result; + + fn next(&mut self) -> Option { + self.next_emission().transpose() + } +} + +impl<'a> BitcoindRpcIter<'a> { + pub fn new(client: &'a Client, fallback_height: u32, last_cp: Option) -> Self { + Self { + client, + fallback_height, + last_cp, + last_info: None, + seen_txids: HashSet::new(), + } + } + + fn next_emission(&mut self) -> Result, bitcoincore_rpc::Error> { + let client = self.client; + + 'main_loop: loop { + match (&mut self.last_cp, &mut self.last_info) { + (last_cp @ None, last_info @ None) => { + // get first item at fallback_height + let info = client + .get_block_info(&client.get_block_hash(self.fallback_height as _)?)?; + let cp = CheckPoint::new(BlockId { + height: info.height as _, + hash: info.hash, + }); + *last_info = Some(info.clone()); + *last_cp = Some(cp.clone()); + return Ok(Some(BitcoindRpcItem::Block { + cp, + info: Box::new(info), + })); + } + (last_cp @ Some(_), last_info @ None) => { + 'cp_loop: for cp in last_cp.clone().iter().flat_map(CheckPoint::iter) { + let cp_block = cp.block_id(); + + let info = client.get_block_info(&cp_block.hash)?; + if info.confirmations < 0 { + // block is not in the main chain + continue 'cp_loop; + } + // agreement + // next loop + *last_cp = Some(cp); + *last_info = Some(info); + } + + // no point of agreement found + // next loop will emit block @ fallback height + *last_cp = None; + } + (Some(last_cp), last_info @ Some(_)) => { + // find next block + match last_info.as_ref().unwrap().nextblockhash { + Some(next_hash) => { + let info = self.client.get_block_info(&next_hash)?; + + if info.confirmations < 0 { + *last_info = None; + continue 'main_loop; + } + + let cp = CheckPoint::new_with_prev( + BlockId { + height: info.height as _, + hash: info.hash, + }, + Some(last_cp.clone()), + ) + .expect("must create valid checkpoint"); + + *last_cp = cp.clone(); + *last_info = Some(info.clone()); + + return Ok(Some(BitcoindRpcItem::Block { + cp, + info: Box::new(info), + })); + } + None => { + // emit from mempool! + let mempool_txs = client + .get_raw_mempool()? + .into_iter() + .filter(|&txid| self.seen_txids.insert(txid)) + .map( + |txid| -> Result<(Transaction, u64), bitcoincore_rpc::Error> { + let first_seen = client + .get_mempool_entry(&txid) + .map(|entry| entry.time)?; + let tx = client.get_raw_transaction(&txid, None)?; + Ok((tx, first_seen)) + }, + ) + .collect::, _>>()?; + + // remove last info... + *last_info = None; + + return Ok(Some(BitcoindRpcItem::Mempool { + txs: mempool_txs, + cp: last_cp.clone(), + })); + } + } + } + (None, Some(_)) => unreachable!(), + } + } + } +} + +pub trait BitcoindRpcErrorExt { + fn is_not_found_error(&self) -> bool; +} + +impl BitcoindRpcErrorExt for bitcoincore_rpc::Error { + fn is_not_found_error(&self) -> bool { + if let bitcoincore_rpc::Error::JsonRpc(bitcoincore_rpc::jsonrpc::Error::Rpc(rpc_err)) = self + { + rpc_err.code == -5 + } else { + false + } + } +} From 2188b6c3a431a1dc54c99f72395ac5a8b5ec0403 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Mon, 19 Jun 2023 15:58:04 +0800 Subject: [PATCH 4/9] [bitcoind_rpc] Introduce `prune_and_apply_update` for `IndexedTxGraph` `prune_and_apply_update` first scans all txs contained in `update` through the index, then filters out txs using `I::is_tx_relevant` before applying the update. This is useful for block-by-block syncing. `Wallet::apply_update` now has a second input; `prune: bool`. If `prune` is set, irrelevant transactions of `update` will not be included. --- crates/bdk/src/wallet/mod.rs | 15 +++- crates/bitcoind_rpc/src/lib.rs | 74 ++++++++++++++++++- crates/chain/src/indexed_tx_graph.rs | 44 +++++++++-- example-crates/wallet_electrum/src/main.rs | 2 +- example-crates/wallet_esplora/src/main.rs | 2 +- .../wallet_esplora_async/src/main.rs | 2 +- 6 files changed, 124 insertions(+), 15 deletions(-) diff --git a/crates/bdk/src/wallet/mod.rs b/crates/bdk/src/wallet/mod.rs index af818e1ff..15e5fad5c 100644 --- a/crates/bdk/src/wallet/mod.rs +++ b/crates/bdk/src/wallet/mod.rs @@ -1694,14 +1694,16 @@ impl Wallet { } /// Applies an update to the wallet and stages the changes (but does not [`commit`] them). + /// Returns whether the `update` resulted in any changes. /// - /// This returns whether the `update` resulted in any changes. + /// If `prune` is set, irrelevant transactions are pruned. Relevant transactions change the UTXO + /// set of tracked script pubkeys (script pubkeys derived from tracked descriptors). /// /// Usually you create an `update` by interacting with some blockchain data source and inserting /// transactions related to your wallet into it. /// /// [`commit`]: Self::commit - pub fn apply_update(&mut self, update: Update) -> Result + pub fn apply_update(&mut self, update: Update, prune: bool) -> Result where D: PersistBackend, { @@ -1711,7 +1713,14 @@ impl Wallet { .index .reveal_to_target_multi(&update.keychain); changeset.append(ChangeSet::from(IndexedAdditions::from(index_additions))); - changeset.append(self.indexed_graph.apply_update(update.graph).into()); + changeset.append( + if prune { + self.indexed_graph.prune_and_apply_update(update.graph) + } else { + self.indexed_graph.apply_update(update.graph) + } + .into(), + ); let changed = !changeset.is_empty(); self.persist.stage(changeset); diff --git a/crates/bitcoind_rpc/src/lib.rs b/crates/bitcoind_rpc/src/lib.rs index c8357fc56..70abd7a0f 100644 --- a/crates/bitcoind_rpc/src/lib.rs +++ b/crates/bitcoind_rpc/src/lib.rs @@ -1,9 +1,10 @@ use std::collections::HashSet; use bdk_chain::{ - bitcoin::{Transaction, Txid}, + bitcoin::{Block, Transaction, Txid}, + keychain::LocalUpdate, local_chain::CheckPoint, - BlockId, + BlockId, ConfirmationHeightAnchor, ConfirmationTimeAnchor, TxGraph, }; use bitcoincore_rpc::{bitcoincore_rpc_json::GetBlockResult, Client, RpcApi}; @@ -12,6 +13,7 @@ pub enum BitcoindRpcItem { Block { cp: CheckPoint, info: Box, + block: Box, }, Mempool { cp: CheckPoint, @@ -19,6 +21,70 @@ pub enum BitcoindRpcItem { }, } +pub fn confirmation_height_anchor( + info: &GetBlockResult, + _txid: Txid, + _tx_pos: usize, +) -> ConfirmationHeightAnchor { + ConfirmationHeightAnchor { + anchor_block: BlockId { + height: info.height as _, + hash: info.hash, + }, + confirmation_height: info.height as _, + } +} + +pub fn confirmation_time_anchor( + info: &GetBlockResult, + _txid: Txid, + _tx_pos: usize, +) -> ConfirmationTimeAnchor { + ConfirmationTimeAnchor { + anchor_block: BlockId { + height: info.height as _, + hash: info.hash, + }, + confirmation_height: info.height as _, + confirmation_time: info.time as _, + } +} + +impl BitcoindRpcItem { + pub fn into_update(self, anchor: F) -> LocalUpdate + where + A: Clone + Ord + PartialOrd, + F: Fn(&GetBlockResult, Txid, usize) -> A, + { + match self { + BitcoindRpcItem::Block { cp, info, block } => LocalUpdate { + graph: { + let mut g = TxGraph::::new(block.txdata); + for (tx_pos, &txid) in info.tx.iter().enumerate() { + let _ = g.insert_anchor(txid, anchor(&info, txid, tx_pos)); + } + g + }, + ..LocalUpdate::new(cp) + }, + BitcoindRpcItem::Mempool { cp, txs } => LocalUpdate { + graph: { + let mut last_seens = Vec::<(Txid, u64)>::with_capacity(txs.len()); + let mut g = TxGraph::::new(txs.into_iter().map(|(tx, last_seen)| { + last_seens.push((tx.txid(), last_seen)); + tx + })); + for (txid, seen_at) in last_seens { + let _ = g.insert_seen_at(txid, seen_at); + } + g + }, + ..LocalUpdate::new(cp) + }, + } + } +} + pub struct BitcoindRpcIter<'a> { client: &'a Client, fallback_height: u32, @@ -57,6 +123,7 @@ impl<'a> BitcoindRpcIter<'a> { // get first item at fallback_height let info = client .get_block_info(&client.get_block_hash(self.fallback_height as _)?)?; + let block = self.client.get_block(&info.hash)?; let cp = CheckPoint::new(BlockId { height: info.height as _, hash: info.hash, @@ -66,6 +133,7 @@ impl<'a> BitcoindRpcIter<'a> { return Ok(Some(BitcoindRpcItem::Block { cp, info: Box::new(info), + block: Box::new(block), })); } (last_cp @ Some(_), last_info @ None) => { @@ -98,6 +166,7 @@ impl<'a> BitcoindRpcIter<'a> { continue 'main_loop; } + let block = self.client.get_block(&info.hash)?; let cp = CheckPoint::new_with_prev( BlockId { height: info.height as _, @@ -113,6 +182,7 @@ impl<'a> BitcoindRpcIter<'a> { return Ok(Some(BitcoindRpcItem::Block { cp, info: Box::new(info), + block: Box::new(block), })); } None => { diff --git a/crates/chain/src/indexed_tx_graph.rs b/crates/chain/src/indexed_tx_graph.rs index 730b04340..25f193275 100644 --- a/crates/chain/src/indexed_tx_graph.rs +++ b/crates/chain/src/indexed_tx_graph.rs @@ -90,6 +90,38 @@ where } } + /// Apply `update`, but filters out irrelevant transactions. + /// + /// Relevancy is determined by the [`Indexer::is_tx_relevant`] implementation of `I`. + pub fn prune_and_apply_update( + &mut self, + update: TxGraph, + ) -> IndexedAdditions { + let mut additions = IndexedAdditions::::default(); + + // index all transactions first + for tx_node in update.full_txs() { + additions + .index_additions + .append(self.index.index_tx(&tx_node)); + } + + let update = update + .full_txs() + .filter(|tx_node| self.index.is_tx_relevant(tx_node)) + .fold(TxGraph::default(), |mut g, tx_node| -> TxGraph { + let _ = g.insert_tx(tx_node.tx.clone()); + for anchor in tx_node.anchors { + let _ = g.insert_anchor(tx_node.txid, anchor.clone()); + } + let _ = g.insert_seen_at(tx_node.txid, tx_node.last_seen_unconfirmed); + g + }); + + additions.append(self.apply_update(update)); + additions + } + /// Insert a floating `txout` of given `outpoint`. pub fn insert_txout( &mut self, @@ -146,14 +178,12 @@ where // 2. decide whether to insert them into the graph depending on whether `is_tx_relevant` // returns true or not. (in a second loop). let mut additions = IndexedAdditions::::default(); - let mut transactions = Vec::new(); - for (tx, anchors) in txs.into_iter() { - additions.index_additions.append(self.index.index_tx(tx)); - transactions.push((tx, anchors)); - } + let txs = txs + .into_iter() + .inspect(|(tx, _)| additions.index_additions.append(self.index.index_tx(tx))) + .collect::>(); additions.append( - transactions - .into_iter() + txs.into_iter() .filter_map(|(tx, anchors)| match self.index.is_tx_relevant(tx) { true => Some(self.insert_tx(tx, anchors, seen_at)), false => None, diff --git a/example-crates/wallet_electrum/src/main.rs b/example-crates/wallet_electrum/src/main.rs index 2355a6fb0..32663b2b5 100644 --- a/example-crates/wallet_electrum/src/main.rs +++ b/example-crates/wallet_electrum/src/main.rs @@ -59,7 +59,7 @@ fn main() -> Result<(), Box> { let missing = electrum_update.missing_full_txs(wallet.as_ref()); let update = electrum_update.finalize_as_confirmation_time(&client, None, missing)?; - wallet.apply_update(update)?; + wallet.apply_update(update, false)?; wallet.commit()?; let balance = wallet.get_balance(); diff --git a/example-crates/wallet_esplora/src/main.rs b/example-crates/wallet_esplora/src/main.rs index 8ae042a96..4e0476398 100644 --- a/example-crates/wallet_esplora/src/main.rs +++ b/example-crates/wallet_esplora/src/main.rs @@ -61,7 +61,7 @@ fn main() -> Result<(), Box> { PARALLEL_REQUESTS, )?; println!(); - wallet.apply_update(update)?; + wallet.apply_update(update, false)?; wallet.commit()?; let balance = wallet.get_balance(); diff --git a/example-crates/wallet_esplora_async/src/main.rs b/example-crates/wallet_esplora_async/src/main.rs index afe751b73..a3a3399e1 100644 --- a/example-crates/wallet_esplora_async/src/main.rs +++ b/example-crates/wallet_esplora_async/src/main.rs @@ -57,7 +57,7 @@ async fn main() -> Result<(), Box> { .scan(prev_cp, keychain_spks, [], [], STOP_GAP, PARALLEL_REQUESTS) .await?; println!(); - wallet.apply_update(update)?; + wallet.apply_update(update, false)?; wallet.commit()?; let balance = wallet.get_balance(); From ebef3b036e8d2cb2810dcf9b8790f9f7cb29b893 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Mon, 19 Jun 2023 20:20:09 +0800 Subject: [PATCH 5/9] [bitcoind_rpc] Initial work on RPC example --- Cargo.toml | 1 + crates/bitcoind_rpc/Cargo.toml | 2 +- crates/bitcoind_rpc/src/lib.rs | 11 +- example-crates/example_electrum/src/main.rs | 3 +- example-crates/example_rpc/Cargo.toml | 12 + example-crates/example_rpc/src/main.rs | 299 ++++++++++++++++++++ 6 files changed, 323 insertions(+), 5 deletions(-) create mode 100644 example-crates/example_rpc/Cargo.toml create mode 100644 example-crates/example_rpc/src/main.rs diff --git a/Cargo.toml b/Cargo.toml index adfc16c5d..8798269e8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ members = [ "crates/bitcoind_rpc", "example-crates/example_cli", "example-crates/example_electrum", + "example-crates/example_rpc", "example-crates/wallet_electrum", "example-crates/wallet_esplora", "example-crates/wallet_esplora_async", diff --git a/crates/bitcoind_rpc/Cargo.toml b/crates/bitcoind_rpc/Cargo.toml index 22c6514e2..f849be627 100644 --- a/crates/bitcoind_rpc/Cargo.toml +++ b/crates/bitcoind_rpc/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "bitcoind_rpc" +name = "bdk_bitcoind_rpc" version = "0.1.0" edition = "2021" diff --git a/crates/bitcoind_rpc/src/lib.rs b/crates/bitcoind_rpc/src/lib.rs index 70abd7a0f..3786bb148 100644 --- a/crates/bitcoind_rpc/src/lib.rs +++ b/crates/bitcoind_rpc/src/lib.rs @@ -6,6 +6,7 @@ use bdk_chain::{ local_chain::CheckPoint, BlockId, ConfirmationHeightAnchor, ConfirmationTimeAnchor, TxGraph, }; +pub use bitcoincore_rpc; use bitcoincore_rpc::{bitcoincore_rpc_json::GetBlockResult, Client, RpcApi}; #[derive(Debug, Clone)] @@ -51,6 +52,10 @@ pub fn confirmation_time_anchor( } impl BitcoindRpcItem { + pub fn is_mempool(&self) -> bool { + matches!(self, Self::Mempool { .. }) + } + pub fn into_update(self, anchor: F) -> LocalUpdate where A: Clone + Ord + PartialOrd, @@ -145,15 +150,17 @@ impl<'a> BitcoindRpcIter<'a> { // block is not in the main chain continue 'cp_loop; } + // agreement - // next loop *last_cp = Some(cp); *last_info = Some(info); + continue 'main_loop; } // no point of agreement found // next loop will emit block @ fallback height *last_cp = None; + *last_info = None; } (Some(last_cp), last_info @ Some(_)) => { // find next block @@ -212,7 +219,7 @@ impl<'a> BitcoindRpcIter<'a> { } } } - (None, Some(_)) => unreachable!(), + (None, Some(info)) => unreachable!("got info with no checkpoint? info={:#?}", info), } } } diff --git a/example-crates/example_electrum/src/main.rs b/example-crates/example_electrum/src/main.rs index b5ca8c2a7..243141d7c 100644 --- a/example-crates/example_electrum/src/main.rs +++ b/example-crates/example_electrum/src/main.rs @@ -22,8 +22,7 @@ use example_cli::{ }; const DB_MAGIC: &[u8] = b"bdk_example_electrum"; -const DB_PATH: &str = ".bdk_electrum_example.db"; -// const ASSUME_FINAL_DEPTH: usize = 10; +const DB_PATH: &str = ".bdk_example_electrum.db"; #[derive(Subcommand, Debug, Clone)] enum ElectrumCommands { diff --git a/example-crates/example_rpc/Cargo.toml b/example-crates/example_rpc/Cargo.toml new file mode 100644 index 000000000..c107c49b6 --- /dev/null +++ b/example-crates/example_rpc/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "example_rpc" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +bdk_chain = { path = "../../crates/chain", features = ["serde"] } +bdk_bitcoind_rpc = { path = "../../crates/bitcoind_rpc" } +example_cli = { path = "../example_cli" } +ctrlc = { version = "^2" } diff --git a/example-crates/example_rpc/src/main.rs b/example-crates/example_rpc/src/main.rs new file mode 100644 index 000000000..92064882b --- /dev/null +++ b/example-crates/example_rpc/src/main.rs @@ -0,0 +1,299 @@ +use std::{ + path::PathBuf, + sync::{ + atomic::{AtomicBool, Ordering}, + mpsc::sync_channel, + Arc, Mutex, + }, + time::{Duration, Instant, SystemTime}, +}; + +use bdk_bitcoind_rpc::{ + bitcoincore_rpc::{Auth, Client, RpcApi}, + confirmation_time_anchor, BitcoindRpcItem, BitcoindRpcIter, +}; +use bdk_chain::{ + bitcoin::{Address, Transaction}, + indexed_tx_graph::IndexedAdditions, + keychain::{LocalChangeSet, LocalUpdate}, + local_chain::LocalChain, + Append, BlockId, ConfirmationTimeAnchor, IndexedTxGraph, +}; +use example_cli::{ + anyhow, + clap::{self, Args, Subcommand}, + CoinSelectionAlgo, Keychain, +}; + +const DB_MAGIC: &[u8] = b"bdk_example_rpc"; +const DB_PATH: &str = ".bdk_example_rpc.db"; +const CHANNEL_BOUND: usize = 10; +const LIVE_POLL_DUR_SECS: Duration = Duration::from_secs(15); + +type ChangeSet = LocalChangeSet; + +#[derive(Args, Debug, Clone)] +struct RpcArgs { + /// RPC URL + #[clap(env = "RPC_URL", long, default_value = "127.0.0.1:8332")] + url: String, + /// RPC auth cookie file + #[clap(env = "RPC_COOKIE", long)] + rpc_cookie: Option, + /// RPC auth username + #[clap(env = "RPC_USER", long)] + rpc_user: Option, + /// RPC auth password + #[clap(env = "RPC_PASS", long)] + rpc_password: Option, +} + +impl From for Auth { + fn from(args: RpcArgs) -> Self { + match (args.rpc_cookie, args.rpc_user, args.rpc_password) { + (None, None, None) => Self::None, + (Some(path), _, _) => Self::CookieFile(path), + (_, Some(user), Some(pass)) => Self::UserPass(user, pass), + (_, Some(_), None) => panic!("rpc auth: missing rpc_pass"), + (_, None, Some(_)) => panic!("rpc auth: missing rpc_user"), + } + } +} + +#[derive(Subcommand, Debug, Clone)] +enum RpcCommands { + /// Scans blocks via RPC (starting from last point of agreement) and stores/indexes relevant + /// transactions + Scan { + /// Starting block height to fallback to if no point of agreement if found + #[clap(env = "FALLBACK_HEIGHT", long, default_value = "0")] + fallback_height: u32, + /// The unused-scripts lookahead will be kept at this size + #[clap(long, default_value = "10")] + lookahead: u32, + /// Whether to be live! + #[clap(long, default_value = "false")] + live: bool, + #[clap(flatten)] + rpc_args: RpcArgs, + }, + /// Create and broadcast a transaction. + Tx { + value: u64, + address: Address, + #[clap(short, default_value = "bnb")] + coin_select: CoinSelectionAlgo, + #[clap(flatten)] + rpc_args: RpcArgs, + }, +} + +impl RpcCommands { + fn rpc_args(&self) -> &RpcArgs { + match self { + RpcCommands::Scan { rpc_args, .. } => rpc_args, + RpcCommands::Tx { rpc_args, .. } => rpc_args, + } + } +} + +fn main() -> anyhow::Result<()> { + let sigterm_flag = start_ctrlc_handler(); + + let (args, keymap, index, db, init_changeset) = + example_cli::init::(DB_MAGIC, DB_PATH)?; + + let graph = Mutex::new({ + let mut graph = IndexedTxGraph::new(index); + graph.apply_additions(init_changeset.indexed_additions); + graph + }); + + let chain = Mutex::new(LocalChain::from_changeset(init_changeset.chain_changeset)); + + let rpc_cmd = match args.command { + example_cli::Commands::ChainSpecific(rpc_cmd) => rpc_cmd, + general_cmd => { + let res = example_cli::handle_commands( + &graph, + &db, + &chain, + &keymap, + args.network, + |_| Err(anyhow::anyhow!("use `tx` instead")), + general_cmd, + ); + db.lock().unwrap().commit()?; + return res; + } + }; + + let rpc_client = { + let a = rpc_cmd.rpc_args(); + Client::new( + &a.url, + match (&a.rpc_cookie, &a.rpc_user, &a.rpc_password) { + (None, None, None) => Auth::None, + (Some(path), _, _) => Auth::CookieFile(path.clone()), + (_, Some(user), Some(pass)) => Auth::UserPass(user.clone(), pass.clone()), + (_, Some(_), None) => panic!("rpc auth: missing rpc_pass"), + (_, None, Some(_)) => panic!("rpc auth: missing rpc_user"), + }, + )? + }; + + match rpc_cmd { + RpcCommands::Scan { + fallback_height, + lookahead, + live, + .. + } => { + graph.lock().unwrap().index.set_lookahead_for_all(lookahead); + + let (chan, recv) = sync_channel::<(BitcoindRpcItem, u32)>(CHANNEL_BOUND); + let prev_cp = chain.lock().unwrap().tip(); + + let join_handle = std::thread::spawn(move || -> anyhow::Result<()> { + let mut tip_height = Option::::None; + + for item in BitcoindRpcIter::new(&rpc_client, fallback_height, prev_cp) { + let item = item?; + let is_block = !item.is_mempool(); + let is_mempool = item.is_mempool(); + + if tip_height.is_none() || !is_block { + tip_height = Some(rpc_client.get_block_count()? as u32); + } + chan.send((item, tip_height.expect("must have tip height")))?; + + if sigterm_flag.load(Ordering::Acquire) { + return Ok(()); + } + if is_mempool { + if !live { + return Ok(()); + } + if await_flag(&sigterm_flag, LIVE_POLL_DUR_SECS) { + return Ok(()); + } + } + } + unreachable!() + }); + + let mut start = Instant::now(); + + for (item, tip_height) in recv { + let is_mempool = item.is_mempool(); + let update: LocalUpdate = + item.into_update(confirmation_time_anchor); + let current_height = update.tip.height(); + + let db_changeset = { + let mut chain = chain.lock().unwrap(); + let mut graph = graph.lock().unwrap(); + + let chain_changeset = chain.apply_update(update.tip)?; + + let mut indexed_additions = + IndexedAdditions::::default(); + let (_, index_additions) = graph.index.reveal_to_target_multi(&update.keychain); + indexed_additions.append(index_additions.into()); + indexed_additions.append(graph.prune_and_apply_update(update.graph)); + + ChangeSet { + indexed_additions, + chain_changeset, + } + }; + + let mut db = db.lock().unwrap(); + db.stage(db_changeset); + + // print stuff every 3 seconds + if start.elapsed() >= Duration::from_secs(3) { + start = Instant::now(); + let balance = { + let chain = chain.lock().unwrap(); + let graph = graph.lock().unwrap(); + graph.graph().balance( + &*chain, + chain.tip().map_or(BlockId::default(), |cp| cp.block_id()), + graph.index.outpoints().iter().cloned(), + |(k, _), _| k == &Keychain::Internal, + ) + }; + println!( + "* scanned_to: {} / {} tip | total: {} sats", + if is_mempool { + "mempool".to_string() + } else { + current_height.to_string() + }, + tip_height, + balance.confirmed + + balance.immature + + balance.trusted_pending + + balance.untrusted_pending + ); + } + } + + db.lock().unwrap().commit()?; + println!("commited to database!"); + + join_handle + .join() + .expect("failed to join chain source thread") + } + RpcCommands::Tx { + value, + address, + coin_select, + .. + } => { + let chain = chain.lock().unwrap(); + let broadcast = move |tx: &Transaction| -> anyhow::Result<()> { + rpc_client.send_raw_transaction(tx)?; + Ok(()) + }; + example_cli::run_send_cmd( + &graph, + &db, + &*chain, + &keymap, + coin_select, + address, + value, + broadcast, + ) + } + } +} + +fn start_ctrlc_handler() -> Arc { + let flag = Arc::new(AtomicBool::new(false)); + let cloned_flag = flag.clone(); + + ctrlc::set_handler(move || cloned_flag.store(true, Ordering::Release)); + + flag +} + +fn await_flag(flag: &AtomicBool, duration: Duration) -> bool { + let start = SystemTime::now(); + loop { + if flag.load(Ordering::Acquire) { + return true; + } + if SystemTime::now() + .duration_since(start) + .expect("should succeed") + >= duration + { + return false; + } + std::thread::sleep(Duration::from_secs(1)); + } +} From ec1eb56c51fc8fb7f3e02ae0183bf72f6495b88b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Tue, 20 Jun 2023 14:59:52 +0800 Subject: [PATCH 6/9] [local_chain] API simplifications Revert `get_or_insert` back as `insert_block`. Method `update` now mutates `LocalChain` directly, instead of mutating via a second call. `CheckPoint::new_with_prev` is replaced with `CheckPoint::extend`. --- crates/bdk/src/wallet/mod.rs | 4 +- crates/bitcoind_rpc/src/lib.rs | 11 ++- crates/chain/src/local_chain.rs | 91 ++++++++------------- crates/chain/tests/test_local_chain.rs | 9 +- crates/electrum/src/electrum_ext.rs | 8 +- crates/esplora/src/async_ext.rs | 10 ++- crates/esplora/src/blocking_ext.rs | 10 ++- example-crates/example_electrum/src/main.rs | 2 +- example-crates/example_rpc/src/main.rs | 2 +- 9 files changed, 63 insertions(+), 84 deletions(-) diff --git a/crates/bdk/src/wallet/mod.rs b/crates/bdk/src/wallet/mod.rs index 15e5fad5c..39c9fa37d 100644 --- a/crates/bdk/src/wallet/mod.rs +++ b/crates/bdk/src/wallet/mod.rs @@ -464,7 +464,7 @@ impl Wallet { where D: PersistBackend, { - let (_, changeset) = self.chain.get_or_insert(block_id)?; + let changeset = self.chain.insert_block(block_id)?; let changed = !changeset.is_empty(); self.persist.stage(changeset.into()); Ok(changed) @@ -1707,7 +1707,7 @@ impl Wallet { where D: PersistBackend, { - let mut changeset = ChangeSet::from(self.chain.apply_update(update.tip)?); + let mut changeset = ChangeSet::from(self.chain.update(update.tip)?); let (_, index_additions) = self .indexed_graph .index diff --git a/crates/bitcoind_rpc/src/lib.rs b/crates/bitcoind_rpc/src/lib.rs index 3786bb148..ccaf780b8 100644 --- a/crates/bitcoind_rpc/src/lib.rs +++ b/crates/bitcoind_rpc/src/lib.rs @@ -174,14 +174,13 @@ impl<'a> BitcoindRpcIter<'a> { } let block = self.client.get_block(&info.hash)?; - let cp = CheckPoint::new_with_prev( - BlockId { + let cp = last_cp + .clone() + .extend(BlockId { height: info.height as _, hash: info.hash, - }, - Some(last_cp.clone()), - ) - .expect("must create valid checkpoint"); + }) + .expect("must extend from checkpoint"); *last_cp = cp.clone(); *last_info = Some(info.clone()); diff --git a/crates/chain/src/local_chain.rs b/crates/chain/src/local_chain.rs index 361dcff01..23286c93f 100644 --- a/crates/chain/src/local_chain.rs +++ b/crates/chain/src/local_chain.rs @@ -47,24 +47,18 @@ impl CheckPoint { Self(Arc::new(CPInner { block, prev: None })) } - /// Construct a [`CheckPoint`] of `block` with a previous checkpoint. - pub fn new_with_prev( - block: BlockId, - prev: Option, - ) -> Result { - if let Some(prev_cp) = &prev { - if prev_cp.height() >= block.height { - return Err(NewCheckPointError { - new_height: block.height, - prev_height: prev_cp.height(), - }); - } + /// Extends [`CheckPoint`] with `block` and returns the new checkpoint tip. + /// + /// Returns an `Err` of the initial checkpoint + pub fn extend(self, block: BlockId) -> Result { + if self.height() < block.height { + Ok(Self(Arc::new(CPInner { + block, + prev: Some(self.0), + }))) + } else { + Err(self) } - - Ok(Self(Arc::new(CPInner { - block, - prev: prev.map(|cp| cp.0), - }))) } /// Get the [`BlockId`] of the checkpoint. @@ -199,10 +193,14 @@ impl LocalChain { checkpoints: blocks .into_iter() .map({ - let mut prev = None; + let mut prev = Option::::None; move |(height, hash)| { - let cp = CheckPoint::new_with_prev(BlockId { height, hash }, prev.clone()) - .expect("must not fail"); + let cp = match prev.clone() { + Some(prev) => { + prev.extend(BlockId { height, hash }).expect("must extend") + } + None => CheckPoint::new(BlockId { height, hash }), + }; prev = Some(cp.clone()); (height, cp) } @@ -221,10 +219,10 @@ impl LocalChain { self.checkpoints.is_empty() } - /// Previews, and optionally applies updates to [`Self`] with the given `new_tip`. + /// Updates [`Self`] with the given `new_tip`. /// - /// The method returns `(apply_update, changeset)` if [`Ok`]. `apply_update` is a closure that - /// can be called to apply the changes represented in `changeset. + /// The method returns [`ChangeSet`] on success. This represents the applied changes to + /// [`Self`]. /// /// To update, the `new_tip` must *connect* with `self`. If `self` and `new_tip` has a mutual /// checkpoint (same height and hash), it can connect if: @@ -247,10 +245,7 @@ impl LocalChain { /// Refer to [module-level documentation] for more. /// /// [module-level documentation]: crate::local_chain - pub fn update( - &mut self, - new_tip: CheckPoint, - ) -> Result<(impl FnOnce() + '_, ChangeSet), CannotConnectError> { + pub fn update(&mut self, new_tip: CheckPoint) -> Result { let mut updated_cps = BTreeMap::::new(); let mut agreement_height = Option::::None; let mut complete_match = false; @@ -316,17 +311,16 @@ impl LocalChain { changeset }; - let apply_update = move || { - if let Some(&start_height) = updated_cps.keys().next() { - self.checkpoints.split_off(&invalidate_lb); - self.checkpoints.append(&mut updated_cps); - if !self.is_empty() && !complete_match { - self.fix_links(start_height); - } + // apply update if `update_cps` is non-empty + if let Some(&start_height) = updated_cps.keys().next() { + self.checkpoints.split_off(&invalidate_lb); + self.checkpoints.append(&mut updated_cps); + if !self.is_empty() && !complete_match { + self.fix_links(start_height); } - }; + } - Ok((apply_update, changeset)) + Ok(changeset) } /// Apply the given `changeset`. @@ -344,41 +338,24 @@ impl LocalChain { } } - /// Update [`LocalChain`]. - /// - /// This is equivalent to calling [`update`] and applying the update in sequence. - /// - /// [`update`]: Self::update - pub fn apply_update(&mut self, new_tip: CheckPoint) -> Result { - let (apply, changeset) = self.update(new_tip)?; - apply(); - Ok(changeset) - } - - /// Get or insert a `block_id`. + /// Insert a [`BlockId`]. /// /// # Errors /// /// Replacing the block hash of an existing checkpoint will result in an error. - pub fn get_or_insert( - &mut self, - block_id: BlockId, - ) -> Result<(CheckPoint, ChangeSet), InsertBlockError> { + pub fn insert_block(&mut self, block_id: BlockId) -> Result { use crate::collections::btree_map::Entry; match self.checkpoints.entry(block_id.height) { Entry::Vacant(entry) => { entry.insert(CheckPoint::new(block_id)); self.fix_links(block_id.height); - let cp = self.checkpoint(block_id.height).expect("must be inserted"); - let changeset = - core::iter::once((block_id.height, Some(block_id.hash))).collect::(); - Ok((cp, changeset)) + Ok(core::iter::once((block_id.height, Some(block_id.hash))).collect()) } Entry::Occupied(entry) => { let cp = entry.get(); if cp.block_id() == block_id { - Ok((cp.clone(), ChangeSet::default())) + Ok(ChangeSet::default()) } else { Err(InsertBlockError { height: block_id.height, diff --git a/crates/chain/tests/test_local_chain.rs b/crates/chain/tests/test_local_chain.rs index fc0a9a4ab..101a7cf32 100644 --- a/crates/chain/tests/test_local_chain.rs +++ b/crates/chain/tests/test_local_chain.rs @@ -27,10 +27,7 @@ enum ExpectedResult<'a> { impl<'a> TestLocalChain<'a> { fn run(mut self) { let got_changeset = match self.chain.update(self.new_tip) { - Ok((apply, changeset)) => { - apply(); - changeset - } + Ok(changeset) => changeset, Err(err) => { assert_eq!(ExpectedResult::Err(err), self.exp); return; @@ -270,9 +267,7 @@ fn insert_block() { for (i, t) in test_cases.into_iter().enumerate() { let mut chain = t.original; assert_eq!( - chain - .get_or_insert(t.insert.into()) - .map(|(_, changeset)| changeset), + chain.insert_block(t.insert.into()), t.expected_result, "[{}] unexpected result when inserting block", i, diff --git a/crates/electrum/src/electrum_ext.rs b/crates/electrum/src/electrum_ext.rs index 5dc6a8b35..49fe236cb 100644 --- a/crates/electrum/src/electrum_ext.rs +++ b/crates/electrum/src/electrum_ext.rs @@ -325,8 +325,12 @@ fn prepare_chain_update( // construct checkpoints for (height, hash) in new_blocks { - let cp = CheckPoint::new_with_prev(BlockId { height, hash }, last_cp) - .expect("heights should not conflict"); + let cp = match last_cp.clone() { + Some(last_cp) => last_cp + .extend(BlockId { height, hash }) + .expect("must extend checkpoint"), + None => CheckPoint::new(BlockId { height, hash }), + }; last_cp = Some(cp); } diff --git a/crates/esplora/src/async_ext.rs b/crates/esplora/src/async_ext.rs index 8023b7422..7fc08e822 100644 --- a/crates/esplora/src/async_ext.rs +++ b/crates/esplora/src/async_ext.rs @@ -141,10 +141,12 @@ impl EsploraAsyncExt for esplora_client::AsyncClient { // construct checkpoints for (&height, &hash) in new_blocks.iter() { - last_cp = Some( - CheckPoint::new_with_prev(BlockId { height, hash }, last_cp) - .expect("heights should not conflict"), - ); + last_cp = Some(match last_cp { + Some(last_cp) => last_cp + .extend(BlockId { height, hash }) + .expect("must extend checkpoint"), + None => CheckPoint::new(BlockId { height, hash }), + }); } let tip = last_cp.expect("must have atleast one checkpoint"); diff --git a/crates/esplora/src/blocking_ext.rs b/crates/esplora/src/blocking_ext.rs index 2c5ddc6e9..3649d7b21 100644 --- a/crates/esplora/src/blocking_ext.rs +++ b/crates/esplora/src/blocking_ext.rs @@ -122,10 +122,12 @@ impl EsploraExt for esplora_client::BlockingClient { // construct checkpoints for (&height, &hash) in new_blocks.iter() { - last_cp = Some( - CheckPoint::new_with_prev(BlockId { height, hash }, last_cp) - .expect("heights should not conflict"), - ); + last_cp = Some(match last_cp { + Some(last_cp) => last_cp + .extend(BlockId { height, hash }) + .expect("must extend checkpoint"), + None => CheckPoint::new(BlockId { height, hash }), + }); } let tip = last_cp.expect("must have atleast one checkpoint"); diff --git a/example-crates/example_electrum/src/main.rs b/example-crates/example_electrum/src/main.rs index 243141d7c..89a54b7ef 100644 --- a/example-crates/example_electrum/src/main.rs +++ b/example-crates/example_electrum/src/main.rs @@ -274,7 +274,7 @@ fn main() -> anyhow::Result<()> { let mut chain = chain.lock().unwrap(); let mut graph = graph.lock().unwrap(); - let chain_changeset = chain.apply_update(final_update.tip)?; + let chain_changeset = chain.update(final_update.tip)?; let indexed_additions = { let mut additions = IndexedAdditions::::default(); diff --git a/example-crates/example_rpc/src/main.rs b/example-crates/example_rpc/src/main.rs index 92064882b..c6e148958 100644 --- a/example-crates/example_rpc/src/main.rs +++ b/example-crates/example_rpc/src/main.rs @@ -194,7 +194,7 @@ fn main() -> anyhow::Result<()> { let mut chain = chain.lock().unwrap(); let mut graph = graph.lock().unwrap(); - let chain_changeset = chain.apply_update(update.tip)?; + let chain_changeset = chain.update(update.tip)?; let mut indexed_additions = IndexedAdditions::::default(); From c6c59e14c255aca4bd338278363145ae05ffced5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Fri, 23 Jun 2023 13:37:09 +0800 Subject: [PATCH 7/9] [local_chain] Fix incorrect optimisation logic in `update()` Within `update()`, it is not always necessary to call `fix_links()`. The logic to detect this was wrong previously. Add test that would fail with the previous logic. --- crates/chain/src/local_chain.rs | 57 +++++++++++--------------- crates/chain/tests/test_local_chain.rs | 23 +++++++++++ 2 files changed, 46 insertions(+), 34 deletions(-) diff --git a/crates/chain/src/local_chain.rs b/crates/chain/src/local_chain.rs index 23286c93f..d4d55dae0 100644 --- a/crates/chain/src/local_chain.rs +++ b/crates/chain/src/local_chain.rs @@ -23,24 +23,6 @@ struct CPInner { prev: Option>, } -/// Occurs when the caller contructs a [`CheckPoint`] with a height that is not higher than the -/// previous checkpoint it points to. -#[derive(Debug, Clone, PartialEq)] -pub struct NewCheckPointError { - /// The height of the new checkpoint. - pub new_height: u32, - /// The height of the previous checkpoint. - pub prev_height: u32, -} - -impl core::fmt::Display for NewCheckPointError { - fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { - write!(f, "cannot construct checkpoint with a height ({}) that is not higher than the previous checkpoint ({})", self.new_height, self.prev_height) - } -} - -impl std::error::Error for NewCheckPointError {} - impl CheckPoint { /// Construct a [`CheckPoint`] from a [`BlockId`]. pub fn new(block: BlockId) -> Self { @@ -248,27 +230,30 @@ impl LocalChain { pub fn update(&mut self, new_tip: CheckPoint) -> Result { let mut updated_cps = BTreeMap::::new(); let mut agreement_height = Option::::None; - let mut complete_match = false; + let mut agreement_ptr_matches = false; for cp in new_tip.iter() { let block = cp.block_id(); - let original_cp = self.checkpoints.get(&block.height); - // if original block of height does not exist, or if the hash does not match we will - // need to update the original checkpoint at that height - if original_cp.map(CheckPoint::block_id) != Some(block) { - updated_cps.insert(block.height, cp.clone()); - } + match self.checkpoints.get(&block.height) { + Some(original_cp) if original_cp.block_id() == block => { + let ptr_matches = Arc::as_ptr(&original_cp.0) == Arc::as_ptr(&cp.0); + + // only record the first agreement height + if agreement_height.is_none() && original_cp.block_id() == block { + agreement_height = Some(block.height); + agreement_ptr_matches = ptr_matches; + } - if let Some(original_cp) = original_cp { - // record the first agreement height - if agreement_height.is_none() && original_cp.block_id() == block { - agreement_height = Some(block.height); + // break if the internal pointers of the checkpoints are the same + if ptr_matches { + break; + } } - // break if the internal pointers of the checkpoints are the same - if Arc::as_ptr(&original_cp.0) == Arc::as_ptr(&cp.0) { - complete_match = true; - break; + // only insert into `updated_cps` if cp is actually updated (original cp is `None`, + // or block ids do not match) + _ => { + updated_cps.insert(block.height, cp.clone()); } } } @@ -315,7 +300,11 @@ impl LocalChain { if let Some(&start_height) = updated_cps.keys().next() { self.checkpoints.split_off(&invalidate_lb); self.checkpoints.append(&mut updated_cps); - if !self.is_empty() && !complete_match { + + // we never need to fix links if either: + // 1. the original chain is empty + // 2. the pointers match at the first point of agreement (where the block ids are equal) + if !(self.is_empty() || agreement_ptr_matches) { self.fix_links(start_height); } } diff --git a/crates/chain/tests/test_local_chain.rs b/crates/chain/tests/test_local_chain.rs index 101a7cf32..4d6697841 100644 --- a/crates/chain/tests/test_local_chain.rs +++ b/crates/chain/tests/test_local_chain.rs @@ -213,6 +213,29 @@ fn update() { new_tip: chain_update![(1, h!("B'")), (2, h!("C'")), (3, h!("D"))], exp: ExpectedResult::Err(CannotConnectError { try_include: BlockId { height: 0, hash: h!("A") } }), }, + // Introduce blocks between two points of agreement + // | 0 | 1 | 2 | 3 | 4 | 5 + // chain | A B D E + // update | A C E F + TestLocalChain { + name: "introduce blocks between two points of agreement", + chain: local_chain![(0, h!("A")), (1, h!("B")), (3, h!("D")), (4, h!("E"))], + new_tip: chain_update![(0, h!("A")), (2, h!("C")), (4, h!("E")), (5, h!("F"))], + exp: ExpectedResult::Ok { + changeset: &[ + (2, Some(h!("C"))), + (5, Some(h!("F"))), + ], + init_changeset: &[ + (0, Some(h!("A"))), + (1, Some(h!("B"))), + (2, Some(h!("C"))), + (3, Some(h!("D"))), + (4, Some(h!("E"))), + (5, Some(h!("F"))), + ], + }, + } ] .into_iter() .for_each(TestLocalChain::run); From ee098951d4edbe43c1e7d0057f7c52143dd91a23 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Sat, 24 Jun 2023 10:11:23 +0800 Subject: [PATCH 8/9] [esplora] TODO --- crates/electrum/src/electrum_ext.rs | 33 ++-- crates/esplora/src/async_ext.rs | 194 ++++++++++++---------- crates/esplora/src/blocking_ext.rs | 178 +++++++++++--------- crates/esplora/src/lib.rs | 29 +++- example-crates/wallet_esplora/src/main.rs | 2 +- 5 files changed, 240 insertions(+), 196 deletions(-) diff --git a/crates/electrum/src/electrum_ext.rs b/crates/electrum/src/electrum_ext.rs index 49fe236cb..5cd832c4e 100644 --- a/crates/electrum/src/electrum_ext.rs +++ b/crates/electrum/src/electrum_ext.rs @@ -5,7 +5,7 @@ use bdk_chain::{ tx_graph::{self, TxGraph}, Anchor, BlockId, ConfirmationHeightAnchor, ConfirmationTimeAnchor, }; -use electrum_client::{Client, ElectrumApi, Error}; +use electrum_client::{Client, ElectrumApi, Error, HeaderNotification}; use std::{ collections::{BTreeMap, BTreeSet, HashMap, HashSet}, fmt::Debug, @@ -262,28 +262,24 @@ impl ElectrumExt for Client { } } -/// Prepare an update "template" based on the checkpoints of the `local_chain`. +/// Return a [`CheckPoint`] of the latest tip, that connects with the previous tip. fn prepare_chain_update( client: &Client, prev_tip: Option, ) -> Result { - let mut header_notification = client.block_headers_subscribe()?; + let HeaderNotification { height, mut header } = client.block_headers_subscribe()?; + let mut height = height as u32; let (new_blocks, mut last_cp) = 'retry: loop { - let tip = BlockId { - height: header_notification.height as _, - hash: header_notification.header.block_hash(), - }; - let tip_parent = BlockId { - height: (header_notification.height - 1) as _, - hash: header_notification.header.prev_blockhash, - }; - // this records new blocks, including blocks that are to be replaced - let mut new_blocks = [tip_parent, tip] - .into_iter() - .map(|b| (b.height, b.hash)) + let mut new_blocks = core::iter::once((height as _, header.block_hash())) + .chain( + height + .checked_sub(1) + .map(|h| (h as _, header.prev_blockhash)), + ) .collect::>(); + let mut agreement_cp = Option::::None; for cp in prev_tip.iter().flat_map(CheckPoint::iter) { @@ -302,9 +298,10 @@ fn prepare_chain_update( loop { match client.block_headers_pop()? { Some(new_notification) => { - let new_height = new_notification.height; - header_notification = new_notification; - if new_height as u32 <= tip.height { + let old_height = height; + height = new_notification.height as u32; + header = new_notification.header; + if height <= old_height { // we may have a reorg // reorg-detection logic can be improved (false positives are possible) continue 'retry; diff --git a/crates/esplora/src/async_ext.rs b/crates/esplora/src/async_ext.rs index 7fc08e822..0d07b1520 100644 --- a/crates/esplora/src/async_ext.rs +++ b/crates/esplora/src/async_ext.rs @@ -4,13 +4,11 @@ use bdk_chain::{ collections::BTreeMap, keychain::LocalUpdate, local_chain::CheckPoint, - BlockId, ConfirmationTimeAnchor, + BlockId, ConfirmationTimeAnchor, TxGraph, }; use esplora_client::{Error, OutputStatus, TxStatus}; use futures::{stream::FuturesOrdered, TryStreamExt}; -use crate::map_confirmation_time_anchor; - /// Trait to extend [`esplora_client::AsyncClient`] functionality. /// /// This is the async version of [`EsploraExt`]. Refer to @@ -96,61 +94,9 @@ impl EsploraAsyncExt for esplora_client::AsyncClient { ) -> Result, Error> { let parallel_requests = Ord::max(parallel_requests, 1); - let (new_blocks, mut last_cp) = 'retry: loop { - let new_tip = loop { - let hash = self.get_tip_hash().await?; - let status = self.get_block_status(&hash).await?; - if status.in_best_chain && status.next_best.is_none() { - break BlockId { - height: status.height.expect("must have height"), - hash, - }; - } - }; - - let mut new_blocks = core::iter::once((new_tip.height, new_tip.hash)) - .collect::>(); - - let mut agreement_cp = Option::::None; - - for cp in prev_tip.iter().flat_map(CheckPoint::iter) { - let cp_block = cp.block_id(); - let hash = self.get_block_hash(cp_block.height).await?; - if hash == cp_block.hash { - agreement_cp = Some(cp); - break; - } - new_blocks.insert(cp_block.height, hash); - } - - // check for tip changes - // retry if there are changes to the tip - let status = self.get_block_status(&new_tip.hash).await?; - - if !status.in_best_chain || status.next_best.is_some() { - continue 'retry; - } - - // `new_blocks` should only include blocks that are actually new - let new_blocks = match &agreement_cp { - Some(agreement_cp) => new_blocks.split_off(&(agreement_cp.height() + 1)), - None => new_blocks, - }; - break 'retry (new_blocks, agreement_cp); - }; - - // construct checkpoints - for (&height, &hash) in new_blocks.iter() { - last_cp = Some(match last_cp { - Some(last_cp) => last_cp - .extend(BlockId { height, hash }) - .expect("must extend checkpoint"), - None => CheckPoint::new(BlockId { height, hash }), - }); - } - - let tip = last_cp.expect("must have atleast one checkpoint"); - let mut update = LocalUpdate::::new(tip.clone()); + let (tip, _) = construct_update_tip(self, prev_tip).await?; + let mut make_anchor = crate::confirmation_time_anchor_maker(&tip); + let mut update = LocalUpdate::::new(tip); for (keychain, spks) in keychain_spks { let mut spks = spks.into_iter(); @@ -202,7 +148,7 @@ impl EsploraAsyncExt for esplora_client::AsyncClient { empty_scripts = 0; } for tx in related_txs { - let anchor = map_confirmation_time_anchor(&tx.status, &tip); + let anchor = make_anchor(&tx.status); let _ = update.graph.insert_tx(tx.to_tx()); if let Some(anchor) = anchor { @@ -232,7 +178,7 @@ impl EsploraAsyncExt for esplora_client::AsyncClient { } match self.get_tx_status(&txid).await? { tx_status if tx_status.confirmed => { - if let Some(anchor) = map_confirmation_time_anchor(&tx_status, &tip) { + if let Some(anchor) = make_anchor(&tx_status) { let _ = update.graph.insert_anchor(txid, anchor); } } @@ -266,7 +212,7 @@ impl EsploraAsyncExt for esplora_client::AsyncClient { for (tx, status) in op_txs { let txid = tx.txid(); - let anchor = map_confirmation_time_anchor(&status, &tip); + let anchor = make_anchor(&status); let _ = update.graph.insert_tx(tx); if let Some(anchor) = anchor { @@ -275,36 +221,106 @@ impl EsploraAsyncExt for esplora_client::AsyncClient { } } - if tip.hash() != self.get_block_hash(tip.height()).await? { - // A reorg occurred, so let's find out where all the txids we found are now in the chain - let txids_found = update - .graph - .full_txs() - .map(|tx_node| tx_node.txid) - .collect::>(); - let new_update = EsploraAsyncExt::scan_without_keychain( - self, - Some(tip), - [], - txids_found, - [], - parallel_requests, - ) - .await?; - update.tip = new_update.tip; - update.graph = new_update.graph; - // update.chain = EsploraAsyncExt::scan_without_keychain( - // self, - // local_chain, - // [], - // txids_found, - // [], - // parallel_requests, - // ) - // .await? - // .chain; + // If a reorg occured during the update, anchors may be wrong. We handle this by scrapping + // all anchors, reconstructing checkpoints and reconstructing anchors. + while self.get_block_hash(update.tip.height()).await? != update.tip.hash() { + let (new_tip, _) = construct_update_tip(self, Some(update.tip.clone())).await?; + make_anchor = crate::confirmation_time_anchor_maker(&new_tip); + + // Reconstruct graph with only transactions (no anchors). + update.graph = TxGraph::new(update.graph.full_txs().map(|n| n.tx.clone())); + update.tip = new_tip; + + // Re-fetch anchors. + let anchors = { + let mut a = Vec::new(); + for n in update.graph.full_txs() { + let status = self.get_tx_status(&n.txid).await?; + if !status.confirmed { + continue; + } + if let Some(anchor) = make_anchor(&status) { + a.push((n.txid, anchor)); + } + } + a + }; + for (txid, anchor) in anchors { + let _ = update.graph.insert_anchor(txid, anchor); + } } Ok(update) } } + +/// Constructs a new checkpoint tip that can "connect" to our previous checkpoint history. We return +/// the new checkpoint tip alongside the height of agreement between the two histories (if any). +#[allow(clippy::result_large_err)] +async fn construct_update_tip( + client: &esplora_client::AsyncClient, + prev_tip: Option, +) -> Result<(CheckPoint, Option), Error> { + let new_tip_height = client.get_height().await?; + + // If esplora returns a tip height that is lower than our previous tip, then checkpoints do not + // need updating. We just return the previous tip and use that as the point of agreement. + if let Some(prev_tip) = prev_tip.as_ref() { + if new_tip_height < prev_tip.height() { + return Ok((prev_tip.clone(), Some(prev_tip.height()))); + } + } + + // Grab latest blocks from esplora atomically first. We assume that deeper blocks cannot be + // reorged. This ensures that our checkpoint history is consistent. + let mut new_blocks = client + .get_blocks(Some(new_tip_height)) + .await? + .into_iter() + .zip((0..new_tip_height).rev()) + .map(|(b, height)| (height, b.id)) + .collect::>(); + + let mut agreement_cp = Option::::None; + + for cp in prev_tip.iter().flat_map(CheckPoint::iter) { + let cp_block = cp.block_id(); + + // We check esplora blocks cached in `new_blocks` first, keeping the checkpoint history + // consistent even during reorgs. + let hash = match new_blocks.get(&cp_block.height) { + Some(&hash) => hash, + None => { + assert!( + new_tip_height >= cp_block.height, + "already checked that esplora's tip cannot be smaller" + ); + let hash = client.get_block_hash(cp_block.height).await?; + new_blocks.insert(cp_block.height, hash); + hash + } + }; + + if hash == cp_block.hash { + agreement_cp = Some(cp); + break; + } + } + + let agreement_height = agreement_cp.as_ref().map(CheckPoint::height); + + let new_tip = new_blocks + .into_iter() + // Prune `new_blocks` to only include blocks that are actually new. + .filter(|(height, _)| Some(*height) > agreement_height) + .map(|(height, hash)| BlockId { height, hash }) + .fold(agreement_cp, |prev_cp, block| { + Some(match prev_cp { + Some(cp) => cp.extend(block).expect("must extend cp"), + None => CheckPoint::new(block), + }) + }) + .expect("must have at least one checkpoint"); + + Ok((new_tip, agreement_height)) +} diff --git a/crates/esplora/src/blocking_ext.rs b/crates/esplora/src/blocking_ext.rs index 3649d7b21..456c2c6ca 100644 --- a/crates/esplora/src/blocking_ext.rs +++ b/crates/esplora/src/blocking_ext.rs @@ -1,12 +1,10 @@ use bdk_chain::bitcoin::{BlockHash, OutPoint, Script, Txid}; use bdk_chain::collections::BTreeMap; use bdk_chain::local_chain::CheckPoint; -use bdk_chain::BlockId; use bdk_chain::{keychain::LocalUpdate, ConfirmationTimeAnchor}; +use bdk_chain::{BlockId, TxGraph}; use esplora_client::{Error, OutputStatus, TxStatus}; -use crate::map_confirmation_time_anchor; - /// Trait to extend [`esplora_client::BlockingClient`] functionality. /// /// Refer to [crate-level documentation] for more. @@ -78,60 +76,9 @@ impl EsploraExt for esplora_client::BlockingClient { ) -> Result, Error> { let parallel_requests = Ord::max(parallel_requests, 1); - let (new_blocks, mut last_cp) = 'retry: loop { - let new_tip = loop { - let hash = self.get_tip_hash()?; - let status = self.get_block_status(&hash)?; - if status.in_best_chain && status.next_best.is_none() { - break BlockId { - height: status.height.expect("must have height"), - hash, - }; - } - }; - - let mut new_blocks = core::iter::once((new_tip.height, new_tip.hash)) - .collect::>(); - - let mut agreement_cp = Option::::None; - - for cp in prev_tip.iter().flat_map(CheckPoint::iter) { - let cp_block = cp.block_id(); - let hash = self.get_block_hash(cp_block.height)?; - if hash == cp_block.hash { - agreement_cp = Some(cp); - break; - } - new_blocks.insert(cp_block.height, hash); - } - - // check for tip changes - // retry if there are changes to the tip - let status = self.get_block_status(&new_tip.hash)?; - if !status.in_best_chain || status.next_best.is_some() { - continue 'retry; - } - - // `new_blocks` should only include blocks that are actually new - let new_blocks = match &agreement_cp { - Some(agreement_cp) => new_blocks.split_off(&(agreement_cp.height() + 1)), - None => new_blocks, - }; - break 'retry (new_blocks, agreement_cp); - }; - - // construct checkpoints - for (&height, &hash) in new_blocks.iter() { - last_cp = Some(match last_cp { - Some(last_cp) => last_cp - .extend(BlockId { height, hash }) - .expect("must extend checkpoint"), - None => CheckPoint::new(BlockId { height, hash }), - }); - } - - let tip = last_cp.expect("must have atleast one checkpoint"); - let mut update = LocalUpdate::::new(tip.clone()); + let (tip, _) = construct_update_tip(self, prev_tip)?; + let mut make_anchor = crate::confirmation_time_anchor_maker(&tip); + let mut update = LocalUpdate::::new(tip); for (keychain, spks) in keychain_spks { let mut spks = spks.into_iter(); @@ -184,8 +131,7 @@ impl EsploraExt for esplora_client::BlockingClient { empty_scripts = 0; } for tx in related_txs { - let anchor = map_confirmation_time_anchor(&tx.status, &tip); - + let anchor = make_anchor(&tx.status); let _ = update.graph.insert_tx(tx.to_tx()); if let Some(anchor) = anchor { let _ = update.graph.insert_anchor(tx.txid, anchor); @@ -213,10 +159,8 @@ impl EsploraExt for esplora_client::BlockingClient { } } match self.get_tx_status(&txid)? { - tx_status @ TxStatus { - confirmed: true, .. - } => { - if let Some(anchor) = map_confirmation_time_anchor(&tx_status, &tip) { + tx_status if tx_status.confirmed => { + if let Some(anchor) = make_anchor(&tx_status) { let _ = update.graph.insert_anchor(txid, anchor); } } @@ -248,7 +192,7 @@ impl EsploraExt for esplora_client::BlockingClient { for (tx, status) in op_txs { let txid = tx.txid(); - let anchor = map_confirmation_time_anchor(&status, &tip); + let anchor = make_anchor(&status); let _ = update.graph.insert_tx(tx); if let Some(anchor) = anchor { @@ -257,25 +201,101 @@ impl EsploraExt for esplora_client::BlockingClient { } } - if tip.hash() != self.get_block_hash(tip.height())? { - // A reorg occurred, so let's find out where all the txids we found are now in the chain - let txids_found = update + // If a reorg occured during the update, anchors may be wrong. We handle this by scrapping + // all anchors, reconstructing checkpoints and reconstructing anchors. + while self.get_block_hash(update.tip.height())? != update.tip.hash() { + let (new_tip, _) = construct_update_tip(self, Some(update.tip.clone()))?; + make_anchor = crate::confirmation_time_anchor_maker(&new_tip); + + // Reconstruct graph with only transactions (no anchors). + update.graph = TxGraph::new(update.graph.full_txs().map(|n| n.tx.clone())); + update.tip = new_tip; + + // Re-fetch anchors. + let anchors = update .graph .full_txs() - .map(|tx_node| tx_node.txid) - .collect::>(); - let new_update = EsploraExt::scan_without_keychain( - self, - Some(tip), - [], - txids_found, - [], - parallel_requests, - )?; - update.tip = new_update.tip; - update.graph = new_update.graph; + .filter_map(|n| match self.get_tx_status(&n.txid) { + Err(err) => Some(Err(err)), + Ok(status) if status.confirmed => make_anchor(&status).map(|a| Ok((n.txid, a))), + _ => None, + }) + .collect::, _>>()?; + for (txid, anchor) in anchors { + let _ = update.graph.insert_anchor(txid, anchor); + } } Ok(update) } } + +/// Constructs a new checkpoint tip that can "connect" to our previous checkpoint history. We return +/// the new checkpoint tip alongside the height of agreement between the two histories (if any). +#[allow(clippy::result_large_err)] +fn construct_update_tip( + client: &esplora_client::BlockingClient, + prev_tip: Option, +) -> Result<(CheckPoint, Option), Error> { + let new_tip_height = client.get_height()?; + + // If esplora returns a tip height that is lower than our previous tip, then checkpoints do not + // need updating. We just return the previous tip and use that as the point of agreement. + if let Some(prev_tip) = prev_tip.as_ref() { + if new_tip_height < prev_tip.height() { + return Ok((prev_tip.clone(), Some(prev_tip.height()))); + } + } + + // Grab latest blocks from esplora atomically first. We assume that deeper blocks cannot be + // reorged. This ensures that our checkpoint history is consistent. + let mut new_blocks = client + .get_blocks(Some(new_tip_height))? + .into_iter() + .zip((0..new_tip_height).rev()) + .map(|(b, height)| (height, b.id)) + .collect::>(); + + let mut agreement_cp = Option::::None; + + for cp in prev_tip.iter().flat_map(CheckPoint::iter) { + let cp_block = cp.block_id(); + + // We check esplora blocks cached in `new_blocks` first, keeping the checkpoint history + // consistent even during reorgs. + let hash = match new_blocks.get(&cp_block.height) { + Some(&hash) => hash, + None => { + assert!( + new_tip_height >= cp_block.height, + "already checked that esplora's tip cannot be smaller" + ); + let hash = client.get_block_hash(cp_block.height)?; + new_blocks.insert(cp_block.height, hash); + hash + } + }; + + if hash == cp_block.hash { + agreement_cp = Some(cp); + break; + } + } + + let agreement_height = agreement_cp.as_ref().map(CheckPoint::height); + + let new_tip = new_blocks + .into_iter() + // Prune `new_blocks` to only include blocks that are actually new. + .filter(|(height, _)| Some(*height) > agreement_height) + .map(|(height, hash)| BlockId { height, hash }) + .fold(agreement_cp, |prev_cp, block| { + Some(match prev_cp { + Some(cp) => cp.extend(block).expect("must extend cp"), + None => CheckPoint::new(block), + }) + }) + .expect("must have at least one checkpoint"); + + Ok((new_tip, agreement_height)) +} diff --git a/crates/esplora/src/lib.rs b/crates/esplora/src/lib.rs index d1c68e81f..07ccdab8f 100644 --- a/crates/esplora/src/lib.rs +++ b/crates/esplora/src/lib.rs @@ -1,4 +1,6 @@ #![doc = include_str!("../README.md")] +use std::collections::BTreeMap; + use bdk_chain::{local_chain::CheckPoint, ConfirmationTimeAnchor}; use esplora_client::TxStatus; @@ -14,16 +16,25 @@ mod async_ext; #[cfg(feature = "async")] pub use async_ext::*; -pub(crate) fn map_confirmation_time_anchor( - tx_status: &TxStatus, +pub(crate) fn confirmation_time_anchor_maker( tip: &CheckPoint, -) -> Option { - match (tx_status.block_time, tx_status.block_height) { - (Some(confirmation_time), Some(confirmation_height)) => Some(ConfirmationTimeAnchor { - anchor_block: tip.block_id(), - confirmation_height, - confirmation_time, - }), +) -> impl FnMut(&TxStatus) -> Option { + let cache = tip + .iter() + .take(10) + .map(|cp| (cp.height(), cp)) + .collect::>(); + + move |status| match (status.block_time, status.block_height) { + (Some(confirmation_time), Some(confirmation_height)) => { + let (_, anchor_cp) = cache.range(confirmation_height..).next()?; + + Some(ConfirmationTimeAnchor { + anchor_block: anchor_cp.block_id(), + confirmation_height, + confirmation_time, + }) + } _ => None, } } diff --git a/example-crates/wallet_esplora/src/main.rs b/example-crates/wallet_esplora/src/main.rs index 4e0476398..187091ff4 100644 --- a/example-crates/wallet_esplora/src/main.rs +++ b/example-crates/wallet_esplora/src/main.rs @@ -1,7 +1,7 @@ const DB_MAGIC: &str = "bdk_wallet_esplora_example"; const SEND_AMOUNT: u64 = 5000; const STOP_GAP: usize = 50; -const PARALLEL_REQUESTS: usize = 5; +const PARALLEL_REQUESTS: usize = 2; use std::{io::Write, str::FromStr}; From 4d365919d5c797e43040fc20f83f50133a4691c2 Mon Sep 17 00:00:00 2001 From: Steve Myers Date: Mon, 26 Jun 2023 20:21:15 -0500 Subject: [PATCH 9/9] [example] add example_rpc README --- example-crates/example_rpc/README.md | 67 ++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 example-crates/example_rpc/README.md diff --git a/example-crates/example_rpc/README.md b/example-crates/example_rpc/README.md new file mode 100644 index 000000000..0eaa71dae --- /dev/null +++ b/example-crates/example_rpc/README.md @@ -0,0 +1,67 @@ +# Example RPC CLI + +### Simple Regtest Test + +1. Start local regtest bitcoind. + ``` + mkdir -p /tmp/regtest/bitcoind + bitcoind -datadir=/tmp/regtest/bitcoind -regtest -server -fallbackfee=0.0002 -rpcallowip=0.0.0.0/0 -rpcbind=0.0.0.0 -blockfilterindex=1 -peerblockfilters=1 -daemon + ``` +2. Create a test bitcoind wallet and set bitcoind env. + ``` + bitcoin-cli -datadir=/tmp/regtest/bitcoind -regtest -named createwallet wallet_name="test" + export RPC_URL=127.0.0.1:18443 + export RPC_COOKIE=/tmp/regtest/bitcoind/regtest/.cookie + ``` +3. Get test bitcoind wallet info. + ``` + bitcoin-cli -rpcwallet="test" -datadir=/tmp/regtest/bitcoind -regtest getwalletinfo + ``` +4. Get new test bitcoind wallet address. + ``` + BITCOIND_ADDRESS=$(bitcoin-cli -rpcwallet="test" -datadir=/tmp/regtest/bitcoind -regtest getnewaddress) + echo $BITCOIND_ADDRESS + ``` +5. Generate 101 blocks with reward to test bitcoind wallet address. + ``` + bitcoin-cli -datadir=/tmp/regtest/bitcoind -regtest generatetoaddress 101 $BITCOIND_ADDRESS + ``` +6. Verify test bitcoind wallet balance. + ``` + bitcoin-cli -rpcwallet="test" -datadir=/tmp/regtest/bitcoind -regtest getbalances + ``` +7. Set descriptor env and get address from RPC CLI wallet. + ``` + export DESCRIPTOR="wpkh(tprv8ZgxMBicQKsPfK9BTf82oQkHhawtZv19CorqQKPFeaHDMA4dXYX6eWsJGNJ7VTQXWmoHdrfjCYuDijcRmNFwSKcVhswzqs4fugE8turndGc/1/*)" + cargo run -- --network regtest address next + ``` +8. Send 5 test bitcoin to RPC CLI wallet. + ``` + bitcoin-cli -rpcwallet="test" -datadir=/tmp/regtest/bitcoind -regtest send '[{"
":5}]' + ``` +9. Scan blockchain with RPC CLI wallet. + ``` + cargo run -- --network regtest scan + + ``` +10. Get RPC CLI wallet unconfirmed balances. + ``` + cargo run -- --network regtest balance + ``` +11. Generate 1 block with reward to test bitcoind wallet address. + ``` + bitcoin-cli -datadir=/tmp/regtest/bitcoind -regtest generatetoaddress 10 $BITCOIND_ADDRESS + ``` +12. Scan blockchain with RPC CLI wallet. + ``` + cargo run -- --network regtest scan + + ``` +13. Get RPC CLI wallet confirmed balances. + ``` + cargo run -- --network regtest balance + ``` +14. Get RPC CLI wallet transactions. + ``` + cargo run -- --network regtest txout list + ``` \ No newline at end of file