Skip to content
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ members = [
"example-crates/wallet_electrum",
"example-crates/wallet_esplora_blocking",
"example-crates/wallet_esplora_async",
"example-crates/wallet_rpc",
"nursery/tmp_plan",
"nursery/coin_select"
]
Expand Down
139 changes: 135 additions & 4 deletions crates/bdk/src/wallet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,18 @@ pub use bdk_chain::keychain::Balance;
use bdk_chain::{
indexed_tx_graph,
keychain::{self, KeychainTxOutIndex},
local_chain::{self, CannotConnectError, CheckPoint, CheckPointIter, LocalChain},
local_chain::{
self, ApplyHeaderError, CannotConnectError, CheckPoint, CheckPointIter, LocalChain,
},
tx_graph::{CanonicalTx, TxGraph},
Append, BlockId, ChainPosition, ConfirmationTime, ConfirmationTimeHeightAnchor, FullTxOut,
IndexedTxGraph, Persist, PersistBackend,
};
use bitcoin::secp256k1::{All, Secp256k1};
use bitcoin::sighash::{EcdsaSighashType, TapSighashType};
use bitcoin::{
absolute, Address, Network, OutPoint, Script, ScriptBuf, Sequence, Transaction, TxOut, Txid,
Weight, Witness,
absolute, Address, Block, Network, OutPoint, Script, ScriptBuf, Sequence, Transaction, TxOut,
Txid, Weight, Witness,
};
use bitcoin::{consensus::encode::serialize, BlockHash};
use bitcoin::{constants::genesis_block, psbt};
Expand Down Expand Up @@ -428,6 +430,55 @@ pub enum InsertTxError {
},
}

impl fmt::Display for InsertTxError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
InsertTxError::ConfirmationHeightCannotBeGreaterThanTip {
tip_height,
tx_height,
} => {
write!(f, "cannot insert tx with confirmation height ({}) higher than internal tip height ({})", tx_height, tip_height)
}
}
}
}

#[cfg(feature = "std")]
impl std::error::Error for InsertTxError {}

/// An error that may occur when applying a block to [`Wallet`].
#[derive(Debug)]
pub enum ApplyBlockError {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you intend to use this for either apply_block or apply_block_connected_to?

/// Occurs when the update chain cannot connect with original chain.
CannotConnect(CannotConnectError),
/// Occurs when the `connected_to` hash does not match the hash derived from `block`.
UnexpectedConnectedToHash {
/// Block hash of `connected_to`.
connected_to_hash: BlockHash,
/// Expected block hash of `connected_to`, as derived from `block`.
expected_hash: BlockHash,
},
}

impl fmt::Display for ApplyBlockError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ApplyBlockError::CannotConnect(err) => err.fmt(f),
ApplyBlockError::UnexpectedConnectedToHash {
expected_hash: block_hash,
connected_to_hash: checkpoint_hash,
} => write!(
f,
"`connected_to` hash {} differs from the expected hash {} (which is derived from `block`)",
checkpoint_hash, block_hash
),
}
}
}

#[cfg(feature = "std")]
impl std::error::Error for ApplyBlockError {}

impl<D> Wallet<D> {
/// Initialize an empty [`Wallet`].
pub fn new<E: IntoWalletDescriptor>(
Expand Down Expand Up @@ -2302,7 +2353,7 @@ impl<D> Wallet<D> {
self.persist.commit().map(|c| c.is_some())
}

/// Returns the changes that will be staged with the next call to [`commit`].
/// Returns the changes that will be committed with the next call to [`commit`].
///
/// [`commit`]: Self::commit
pub fn staged(&self) -> &ChangeSet
Expand All @@ -2326,6 +2377,86 @@ impl<D> Wallet<D> {
pub fn local_chain(&self) -> &LocalChain {
&self.chain
}

/// Introduces a `block` of `height` to the wallet, and tries to connect it to the
/// `prev_blockhash` of the block's header.
///
/// This is a convenience method that is equivalent to calling [`apply_block_connected_to`]
/// with `prev_blockhash` and `height-1` as the `connected_to` parameter.
///
/// [`apply_block_connected_to`]: Self::apply_block_connected_to
pub fn apply_block(&mut self, block: &Block, height: u32) -> Result<(), CannotConnectError>
where
D: PersistBackend<ChangeSet>,
{
let connected_to = match height.checked_sub(1) {
Some(prev_height) => BlockId {
height: prev_height,
hash: block.header.prev_blockhash,
},
None => BlockId {
height,
hash: block.block_hash(),
},
};
self.apply_block_connected_to(block, height, connected_to)
.map_err(|err| match err {
ApplyHeaderError::InconsistentBlocks => {
unreachable!("connected_to is derived from the block so must be consistent")
}
ApplyHeaderError::CannotConnect(err) => err,
})
}

/// Applies relevant transactions from `block` of `height` to the wallet, and connects the
/// block to the internal chain.
///
/// The `connected_to` parameter informs the wallet how this block connects to the internal
/// [`LocalChain`]. Relevant transactions are filtered from the `block` and inserted into the
/// internal [`TxGraph`].
pub fn apply_block_connected_to(
&mut self,
block: &Block,
height: u32,
connected_to: BlockId,
) -> Result<(), ApplyHeaderError>
where
D: PersistBackend<ChangeSet>,
{
let mut changeset = ChangeSet::default();
changeset.append(
self.chain
.apply_header_connected_to(&block.header, height, connected_to)?
.into(),
);
changeset.append(
self.indexed_graph
.apply_block_relevant(block, height)
.into(),
);
self.persist.stage(changeset);
Ok(())
}

/// Apply relevant unconfirmed transactions to the wallet.
///
/// Transactions that are not relevant are filtered out.
///
/// This method takes in an iterator of `(tx, last_seen)` where `last_seen` is the timestamp of
/// when the transaction was last seen in the mempool. This is used for conflict resolution
/// when there is conflicting unconfirmed transactions. The transaction with the later
/// `last_seen` is prioritied.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: prioritied

pub fn apply_unconfirmed_txs<'t>(
&mut self,
unconfirmed_txs: impl IntoIterator<Item = (&'t Transaction, u64)>,
) where
D: PersistBackend<ChangeSet>,
{
let indexed_graph_changeset = self
.indexed_graph
.batch_insert_relevant_unconfirmed(unconfirmed_txs);
self.persist.stage(ChangeSet::from(indexed_graph_changeset));
}
}

impl<D> AsRef<bdk_chain::tx_graph::TxGraph<ConfirmationTimeHeightAnchor>> for Wallet<D> {
Expand Down
70 changes: 59 additions & 11 deletions crates/bitcoind_rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@ pub struct Emitter<'c, C> {
}

impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
/// Construct a new [`Emitter`] with the given RPC `client`, `last_cp` and `start_height`.
/// Construct a new [`Emitter`].
///
/// * `last_cp` is the check point used to find the latest block which is still part of the best
/// chain.
/// * `start_height` is the block height to start emitting blocks from.
/// `last_cp` informs the emitter of the chain we are starting off with. This way, the emitter
/// can start emission from a block that connects to the original chain.
///
/// `start_height` starts emission from a given height (if there are no conflicts with the
/// original chain).
pub fn new(client: &'c C, last_cp: CheckPoint, start_height: u32) -> Self {
Self {
client,
Expand Down Expand Up @@ -127,13 +129,58 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
}

/// Emit the next block height and header (if any).
pub fn next_header(&mut self) -> Result<Option<(u32, Header)>, bitcoincore_rpc::Error> {
poll(self, |hash| self.client.get_block_header(hash))
pub fn next_header(&mut self) -> Result<Option<BlockEvent<Header>>, bitcoincore_rpc::Error> {
Ok(poll(self, |hash| self.client.get_block_header(hash))?
.map(|(checkpoint, block)| BlockEvent { block, checkpoint }))
}

/// Emit the next block height and block (if any).
pub fn next_block(&mut self) -> Result<Option<(u32, Block)>, bitcoincore_rpc::Error> {
poll(self, |hash| self.client.get_block(hash))
pub fn next_block(&mut self) -> Result<Option<BlockEvent<Block>>, bitcoincore_rpc::Error> {
Ok(poll(self, |hash| self.client.get_block(hash))?
.map(|(checkpoint, block)| BlockEvent { block, checkpoint }))
}
}

/// A newly emitted block from [`Emitter`].
#[derive(Debug)]
pub struct BlockEvent<B> {
/// Either a full [`Block`] or [`Header`] of the new block.
pub block: B,

/// The checkpoint of the new block.
///
/// A [`CheckPoint`] is a node of a linked list of [`BlockId`]s. This checkpoint is linked to
/// all [`BlockId`]s originally passed in [`Emitter::new`] as well as emitted blocks since then.
/// These blocks are guaranteed to be of the same chain.
///
/// This is important as BDK structures require block-to-apply to be connected with another
/// block in the original chain.
pub checkpoint: CheckPoint,
}

impl<B> BlockEvent<B> {
/// The block height of this new block.
pub fn block_height(&self) -> u32 {
self.checkpoint.height()
}

/// The block hash of this new block.
pub fn block_hash(&self) -> BlockHash {
self.checkpoint.hash()
}

/// The [`BlockId`] of a previous block that this block connects to.
///
/// This either returns a [`BlockId`] of a previously emitted block or from the chain we started
/// with (passed in as `last_cp` in [`Emitter::new`]).
///
/// This value is derived from [`BlockEvent::checkpoint`].
pub fn connected_to(&self) -> BlockId {
match self.checkpoint.prev() {
Some(prev_cp) => prev_cp.block_id(),
// there is no previous checkpoint, so just connect with itself
None => self.checkpoint.block_id(),
}
}
}

Expand Down Expand Up @@ -203,7 +250,7 @@ where
fn poll<C, V, F>(
emitter: &mut Emitter<C>,
get_item: F,
) -> Result<Option<(u32, V)>, bitcoincore_rpc::Error>
) -> Result<Option<(CheckPoint, V)>, bitcoincore_rpc::Error>
where
C: bitcoincore_rpc::RpcApi,
F: Fn(&BlockHash) -> Result<V, bitcoincore_rpc::Error>,
Expand All @@ -215,13 +262,14 @@ where
let hash = res.hash;
let item = get_item(&hash)?;

emitter.last_cp = emitter
let new_cp = emitter
.last_cp
.clone()
.push(BlockId { height, hash })
.expect("must push");
emitter.last_cp = new_cp.clone();
emitter.last_block = Some(res);
return Ok(Some((height, item)));
return Ok(Some((new_cp, item)));
}
PollResponse::NoMoreBlocks => {
emitter.last_block = None;
Expand Down
Loading