diff --git a/Cargo.toml b/Cargo.toml index c5f2692da..048cacc6e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,11 +5,13 @@ members = [ "crates/file_store", "crates/electrum", "crates/esplora", + "crates/bdk_cbf", "example-crates/example_cli", "example-crates/example_electrum", "example-crates/wallet_electrum", "example-crates/wallet_esplora", "example-crates/wallet_esplora_async", + "example-crates/example_cbf", "nursery/tmp_plan", "nursery/coin_select" ] diff --git a/crates/bdk_cbf/Cargo.toml b/crates/bdk_cbf/Cargo.toml new file mode 100644 index 000000000..66c9e23bd --- /dev/null +++ b/crates/bdk_cbf/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "bdk_cbf" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +nakamoto = "0.4.0" +bdk_chain = { path = "../chain", version = "0.5.0"} diff --git a/crates/bdk_cbf/src/lib.rs b/crates/bdk_cbf/src/lib.rs new file mode 100644 index 000000000..64841bce3 --- /dev/null +++ b/crates/bdk_cbf/src/lib.rs @@ -0,0 +1,313 @@ +use std::{net, thread}; +use core::fmt::Debug; + +use bdk_chain::keychain::DerivationAdditions; +use nakamoto::client::network::Services; +use nakamoto::client::Handle; +use nakamoto::client::traits::Handle as HandleTrait; +use nakamoto::client::{chan, Client, Config, Error, Event}; +use nakamoto::net::poll; + +pub use nakamoto::client::network::Network; +pub use nakamoto::common::block::Height; + +use bdk_chain::{ + bitcoin::{Script, Transaction}, + collections::BTreeMap, + indexed_tx_graph::{IndexedAdditions, IndexedTxGraph, Indexer}, + keychain::KeychainTxOutIndex, + BlockId, ChainOracle, ConfirmationHeightAnchor, TxGraph, +}; + +type Reactor = poll::Reactor; + +#[derive(Clone)] +pub struct CBFClient { + handle: Handle, +} + +impl ChainOracle for CBFClient { + type Error = nakamoto::client::Error; + + fn is_block_in_chain( + &self, + block: BlockId, + chain_tip: BlockId, + ) -> Result, Self::Error> { + if block.height > chain_tip.height { + return Ok(None); + } + + Ok( + match ( + self.handle.get_block_by_height(block.height as _)?, + self.handle.get_block_by_height(chain_tip.height as _)?, + ) { + (Some(b), Some(c)) => { + Some(b.block_hash() == block.hash && c.block_hash() == chain_tip.hash) + } + _ => None, + }, + ) + } + + fn get_chain_tip(&self) -> Result, Self::Error> { + let (height, header) = self.handle.get_tip()?; + Ok(Some(BlockId { + height: height as u32, + hash: header.block_hash(), + })) + } +} + +#[derive(Debug, Clone)] +pub enum CBFUpdate { + Synced { + height: Height, + tip: Height, + }, + BlockMatched { + transactions: Vec, + block: BlockId, + }, + BlockDisconnected { + block: BlockId, + }, +} + +pub struct CBFUpdateIterator { + client: CBFClient, +} + +impl Iterator for CBFUpdateIterator { + type Item = Result; + + fn next(&mut self) -> Option { + match self.client.watch_events() { + Ok(update) => { + match update { + CBFUpdate::Synced { height, tip } if height == tip => None, + _ => Some(Ok(update)) + } + } + Err(e) => Some(Err(e)), + } + } +} + +impl CBFClient { + pub fn start_client(network: Network, peer_count: usize) -> Result { + let cfg = Config { + network, + ..Default::default() + }; + let client = Client::::new()?; + let handle = client.handle(); + + // Run the client on a different thread, to not block the main thread. + thread::spawn(|| client.run(cfg).unwrap()); + + // Wait for the client to be connected to a peer. + handle.wait_for_peers(peer_count, Services::default())?; + + println!("Connected to {} peers", peer_count); + + Ok(Self { handle }) + } + + /// Given a list of scripts, start scanning the chain from the given height. + pub fn start_scanning( + &self, + start_height: Height, + watch: impl Iterator, + ) -> Result<(), Error> { + self.handle.rescan(start_height.., watch)?; + println!("About to start scanning from height {}", start_height); + Ok(()) + } + + /// Listen for nakamoto events that are relevant to scripts we are watching. + pub fn watch_events(&self) -> Result { + let events_chan = self.handle.events(); + loop { + print!("looping..."); + chan::select! { + recv(events_chan) -> event => { + let event = event?; + match event { + Event::BlockDisconnected { hash, height, .. } => { + return Ok(CBFUpdate::BlockDisconnected { block: BlockId { height: height as u32, hash } }); + } + Event::BlockMatched { + hash, + height, + transactions, + .. + } => { + println!("Block matched: {} {}", height, hash); + return Ok(CBFUpdate::BlockMatched { + transactions, + block: BlockId { height: height as u32, hash } + }); + } + Event::Synced { height, tip } => { + println!("Synced: {} {}", height, tip); + return Ok(CBFUpdate::Synced { height, tip }); + } + _ => {} + } + } + } + } + } + + /// Given a list of tuples of block and their transactions, create a TxGraph update. + pub fn into_tx_graph_update( + &self, + block_txs: Vec<(BlockId, Vec)>, + ) -> TxGraph { + let mut tx_graph = TxGraph::default(); + + for (blockid, txs) in block_txs.into_iter() { + for tx in txs { + let txid = tx.txid(); + let _ = tx_graph.insert_anchor(txid, to_confirmation_height_anchor(blockid)); + let _ = tx_graph.insert_tx(tx); + } + } + tx_graph + } + + pub fn iter(&self) -> CBFUpdateIterator { + CBFUpdateIterator { + client: self.clone(), + } + } + + pub fn scan( + &self, + mut watch_per_keychain: u32, + start_height: Height, + indexed_tx_graph: &mut IndexedTxGraph>, + stop_gap: u32, + ) -> Result>, Error> + where + K: Ord + Clone + Debug, + { + let mut keychain_spks = indexed_tx_graph.index.spks_of_all_keychains(); + let mut empty_scripts_counter = BTreeMap::::new(); + keychain_spks.keys().for_each(|k| { + empty_scripts_counter.insert(k.clone(), 0); + }); + + let mut updates = Vec::new(); + + while let Some(keychains) = Self::check_stop_gap(stop_gap, &empty_scripts_counter) { + keychains.iter().for_each(|k| { + indexed_tx_graph.index.set_lookahead(k, watch_per_keychain); + }); + + let mut spk_watchlist = BTreeMap::>::new(); + for (k, script_iter) in keychain_spks.iter_mut() { + (0..watch_per_keychain).for_each(|_| { + if let Some((_, script)) = script_iter.next() { + let spks = spk_watchlist.entry(k.clone()).or_insert(vec![]); + spks.push(script); + } + }); + } + + let scripts = spk_watchlist.values().flatten().cloned().collect::>(); + self.start_scanning(start_height, scripts.into_iter())?; + + for update in self.iter() { + match update { + Ok(CBFUpdate::BlockMatched { + transactions, + block, + }) => { + let relevant_txs = transactions + .into_iter() + .filter(|tx| indexed_tx_graph.index.is_tx_relevant(tx)) + .collect::>(); + updates.push((block, relevant_txs)); + } + Ok(CBFUpdate::BlockDisconnected { .. }) => { + //TODO: Don't know how to handle re-orgs yet + //I will love to get your comments on this. + } + Ok(_) => {} + Err(e) => { + return Err(e); + } + } + } + + // Determine which scripts are part of the update. + for (k, scripts) in spk_watchlist.iter() { + for script in scripts { + let counter = empty_scripts_counter.get_mut(k).unwrap(); + if Self::is_script_in_udpates(script.clone(), &updates) { + *counter = 0; + } else { + *counter += 1; + } + } + } + + watch_per_keychain += watch_per_keychain; + } + + //apply the updates to IndexedGraph + let graph_update = self.into_tx_graph_update(updates); + let additions = indexed_tx_graph.apply_update(graph_update); + + Ok(additions) + } + + fn is_script_in_udpates(script: Script, updates: &Vec<(BlockId, Vec)>) -> bool { + for update in updates { + for tx in update.1.iter() { + for output in tx.output.iter() { + if output.script_pubkey == script { + return true; + } + } + } + } + false + } + + fn check_stop_gap(stop_gap: u32, empty_scripts_counter: &BTreeMap) -> Option> + where + K: Ord + Clone + Debug, + { + let keychains = empty_scripts_counter + .iter() + .filter(|(_, counter)| **counter < stop_gap) + .map(|(k, _)| k.clone()) + .collect::>(); + if keychains.is_empty() { + None + } else { + Some(keychains) + } + } + + pub fn submit_transaction(&self, tx: Transaction) -> Result<(), Error> { + self.handle.submit_transaction(tx)?; + Ok(()) + } + + pub fn shutdown(self) -> Result<(), Error>{ + self.handle.shutdown()?; + Ok(()) + } +} + +fn to_confirmation_height_anchor(blockid: BlockId) -> ConfirmationHeightAnchor { + ConfirmationHeightAnchor { + anchor_block: blockid, + confirmation_height: blockid.height, + } +} diff --git a/example-crates/example_cbf/Cargo.toml b/example-crates/example_cbf/Cargo.toml new file mode 100644 index 000000000..fda610c6d --- /dev/null +++ b/example-crates/example_cbf/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "example_cbf" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +bdk_cbf = { path = "../../crates/bdk_cbf"} +bdk_chain = { path = "../../crates/chain"} +example_cli = { path = "../example_cli" } diff --git a/example-crates/example_cbf/src/main.rs b/example-crates/example_cbf/src/main.rs new file mode 100644 index 000000000..4d1db9d2c --- /dev/null +++ b/example-crates/example_cbf/src/main.rs @@ -0,0 +1,99 @@ +use std::sync::Mutex; + +use bdk_cbf::{CBFClient, Network}; +use bdk_chain::{keychain::LocalChangeSet, ConfirmationHeightAnchor, IndexedTxGraph}; +use example_cli::{ + anyhow, + clap::{self, Args, Subcommand}, + Keychain, +}; + +const DB_MAGIC: &[u8] = b"bdk_example_cbf"; +const DB_PATH: &str = ".bdk_example_cbf.db"; + +type ChangeSet = LocalChangeSet; + +#[derive(Debug, Clone, Args)] +struct CBFArgs {} + +#[derive(Subcommand, Debug, Clone)] +enum CBFCommands { + Scan { + /// The block height to start scanning from + #[clap(long, default_value = "0")] + start_height: u64, + /// The block height to stop scanning at + #[clap(long, default_value = "5")] + stop_gap: u32, + /// Number of scripts to watch for every sync + #[clap(long, default_value = "1000")] + watchlist_size: u32, + }, +} + +fn main() -> anyhow::Result<()> { + let (args, keymap, index, db, init_changeset) = + example_cli::init::(DB_MAGIC, DB_PATH)?; + + let graph = Mutex::new({ + let mut graph = IndexedTxGraph::new(index); + graph.apply_additions(init_changeset.indexed_additions); + graph + }); + + let client = Mutex::new({ + let client = CBFClient::start_client(Network::Testnet, 1)?; + client + }); + + let cbf_cmd = match args.command { + example_cli::Commands::ChainSpecific(cbf_cmd) => cbf_cmd, + general_cmd => { + let res = example_cli::handle_commands( + &graph, + &db, + &client, + &keymap, + args.network, + |tx| { + client + .lock() + .unwrap() + .submit_transaction(tx.clone()) + .map_err(anyhow::Error::from) + }, + general_cmd, + ); + db.lock().unwrap().commit()?; + return res; + } + }; + + match cbf_cmd { + CBFCommands::Scan { + start_height, + stop_gap, + watchlist_size, + } => { + println!("Scanning from height {} to {}", start_height, stop_gap); + let indexed_additions = { + let mut graph = graph.lock().unwrap(); + client + .lock() + .unwrap() + .scan(watchlist_size, start_height, &mut graph, stop_gap)? + }; + + let curr_changeset = LocalChangeSet::from(indexed_additions); + + // stage changes to the database + let mut db = db.lock().unwrap(); + db.stage(curr_changeset); + db.commit()?; + + println!("commited to database!"); + } + } + + Ok(()) +}