diff --git a/node-manager/src/chain.rs b/node-manager/src/chain.rs new file mode 100644 index 000000000..1845bbc77 --- /dev/null +++ b/node-manager/src/chain.rs @@ -0,0 +1,378 @@ +use bitcoin::{BlockHash, BlockHeader, Script, Transaction, Txid}; + +use crate::error::MutinyError; +use crate::wallet::MutinyWallet; +use bdk::blockchain::Blockchain; +use bdk_macros::maybe_await; +use lightning::chain::chaininterface::{ + BroadcasterInterface, ConfirmationTarget, FeeEstimator, FEERATE_FLOOR_SATS_PER_KW, +}; +use lightning::chain::{Confirm, Filter, WatchedOutput}; +use log::error; +use std::collections::HashSet; +use std::sync::{Arc, Mutex}; +use wasm_bindgen_futures::spawn_local; + +pub struct MutinyChain { + wallet: Arc, + // Transactions that were registered via the `Filter` interface and have to be processed. + queued_transactions: Mutex>, + // Transactions that were previously processed, but must not be forgotten yet. + watched_transactions: Mutex>, + // Outputs that were registered via the `Filter` interface and have to be processed. + queued_outputs: Mutex>, + // Outputs that were previously processed, but must not be forgotten yet. + watched_outputs: Mutex>, + // The tip hash observed during our last sync. + last_sync_hash: futures::lock::Mutex>, +} + +impl MutinyChain { + pub(crate) fn new(wallet: Arc) -> Self { + let watched_transactions = Mutex::new(HashSet::new()); + let queued_transactions = Mutex::new(HashSet::new()); + let watched_outputs = Mutex::new(HashSet::new()); + let queued_outputs = Mutex::new(HashSet::new()); + let last_sync_hash = futures::lock::Mutex::new(None); + Self { + wallet, + queued_transactions, + watched_transactions, + queued_outputs, + watched_outputs, + last_sync_hash, + } + } + + /// Syncs the LDK wallet via the `Confirm` interface. We run in a loop until we completed a + /// full iteration without + pub(crate) async fn sync( + &self, + confirmables: Vec<&(dyn Confirm + Sync)>, + ) -> Result<(), MutinyError> { + // This lock makes sure we're syncing once at a time. + let mut locked_last_sync_hash = self.last_sync_hash.lock().await; + + let client = &*self.wallet.blockchain; + + let mut tip_hash = client.get_tip_hash().await?; + + loop { + let registrations_are_pending = self.process_queues(); + let tip_is_new = Some(tip_hash) != *locked_last_sync_hash; + + // We loop until any registered transactions have been processed at least once, or the + // tip hasn't been updated during the last iteration. + if !registrations_are_pending && !tip_is_new { + // Nothing to do. + break; + } else { + // Update the known tip to the newest one. + if tip_is_new { + // First check for any unconfirmed transactions and act on it immediately. + self.sync_unconfirmed_transactions(&confirmables).await?; + + match self.sync_best_block_updated(&confirmables, &tip_hash).await { + Ok(()) => {} + Err(MutinyError::ChainAccessFailed) => { + // Immediately restart syncing when we encounter any inconsistencies. + continue; + } + Err(err) => { + // (Semi-)permanent failure, retry later. + return Err(err); + } + } + } + + match self.get_confirmed_transactions().await { + Ok((confirmed_txs, unconfirmed_registered_txs, unspent_registered_outputs)) => { + // Double-check tip hash. If something changed, restart last-minute. + tip_hash = client.get_tip_hash().await?; + if Some(tip_hash) != *locked_last_sync_hash { + continue; + } + + self.sync_confirmed_transactions( + &confirmables, + confirmed_txs, + unconfirmed_registered_txs, + unspent_registered_outputs, + ); + } + Err(MutinyError::ChainAccessFailed) => { + // Immediately restart syncing when we encounter any inconsistencies. + continue; + } + Err(err) => { + // (Semi-)permanent failure, retry later. + return Err(err); + } + } + *locked_last_sync_hash = Some(tip_hash); + } + } + Ok(()) + } + + // Processes the transaction and output queues, returns `true` if new items had been + // registered. + fn process_queues(&self) -> bool { + let mut pending_registrations = false; + { + let mut locked_queued_transactions = self.queued_transactions.lock().unwrap(); + if !locked_queued_transactions.is_empty() { + let mut locked_watched_transactions = self.watched_transactions.lock().unwrap(); + pending_registrations = true; + + locked_watched_transactions.extend(locked_queued_transactions.iter()); + *locked_queued_transactions = HashSet::new(); + } + } + { + let mut locked_queued_outputs = self.queued_outputs.lock().unwrap(); + if !locked_queued_outputs.is_empty() { + let mut locked_watched_outputs = self.watched_outputs.lock().unwrap(); + pending_registrations = true; + + locked_watched_outputs.extend(locked_queued_outputs.iter().cloned()); + *locked_queued_outputs = HashSet::new(); + } + } + pending_registrations + } + + async fn sync_best_block_updated( + &self, + confirmables: &Vec<&(dyn Confirm + Sync)>, + tip_hash: &BlockHash, + ) -> Result<(), MutinyError> { + let client = &*self.wallet.blockchain; + + // Inform the interface of the new block. + let tip_header = client.get_header_by_hash(tip_hash).await?; + let tip_status = client.get_block_status(tip_hash).await?; + if tip_status.in_best_chain { + if let Some(tip_height) = tip_status.height { + for c in confirmables { + c.best_block_updated(&tip_header, tip_height); + } + } + } else { + return Err(MutinyError::ChainAccessFailed); + } + Ok(()) + } + + fn sync_confirmed_transactions( + &self, + confirmables: &Vec<&(dyn Confirm + Sync)>, + confirmed_txs: Vec, + unconfirmed_registered_txs: HashSet, + unspent_registered_outputs: HashSet, + ) { + for ctx in confirmed_txs { + for c in confirmables { + c.transactions_confirmed( + &ctx.block_header, + &[(ctx.pos, &ctx.tx)], + ctx.block_height, + ); + } + } + + *self.watched_transactions.lock().unwrap() = unconfirmed_registered_txs; + *self.watched_outputs.lock().unwrap() = unspent_registered_outputs; + } + + async fn get_confirmed_transactions( + &self, + ) -> Result<(Vec, HashSet, HashSet), MutinyError> { + let client = &*self.wallet.blockchain; + + // First, check the confirmation status of registered transactions as well as the + // status of dependent transactions of registered outputs. + + let mut confirmed_txs = Vec::new(); + + // Check in the current queue, as well as in registered transactions leftover from + // previous iterations. + let registered_txs = self.watched_transactions.lock().unwrap().clone(); + + // Remember all registered but unconfirmed transactions for future processing. + let mut unconfirmed_registered_txs = HashSet::new(); + + for txid in registered_txs { + if let Some(confirmed_tx) = self.get_confirmed_tx(&txid, None, None).await? { + confirmed_txs.push(confirmed_tx); + } else { + unconfirmed_registered_txs.insert(txid); + } + } + + // Check all registered outputs for dependent spending transactions. + let registered_outputs = self.watched_outputs.lock().unwrap().clone(); + + // Remember all registered outputs that haven't been spent for future processing. + let mut unspent_registered_outputs = HashSet::new(); + + for output in registered_outputs { + if let Some(output_status) = client + .get_output_status(&output.outpoint.txid, output.outpoint.index as u64) + .await? + { + if let Some(spending_txid) = output_status.txid { + if let Some(spending_tx_status) = output_status.status { + if let Some(confirmed_tx) = self + .get_confirmed_tx( + &spending_txid, + spending_tx_status.block_hash, + spending_tx_status.block_height, + ) + .await? + { + confirmed_txs.push(confirmed_tx); + continue; + } + } + } + } + unspent_registered_outputs.insert(output); + } + + // Sort all confirmed transactions first by block height, then by in-block + // position, and finally feed them to the interface in order. + confirmed_txs.sort_unstable_by(|tx1, tx2| { + tx1.block_height + .cmp(&tx2.block_height) + .then_with(|| tx1.pos.cmp(&tx2.pos)) + }); + + Ok(( + confirmed_txs, + unconfirmed_registered_txs, + unspent_registered_outputs, + )) + } + + async fn get_confirmed_tx( + &self, + txid: &Txid, + expected_block_hash: Option, + known_block_height: Option, + ) -> Result, MutinyError> { + let client = &*self.wallet.blockchain; + + if let Some(merkle_proof) = client.get_merkle_proof(txid).await? { + let block_hash = client.get_block_hash(merkle_proof.block_height).await?; + if let Some(expected_block_hash) = expected_block_hash { + if expected_block_hash != block_hash { + return Err(MutinyError::ChainAccessFailed); + } + } + + let block_header = client.get_header_by_hash(&block_hash).await?; + + if let Some(tx) = client.get_tx(txid).await? { + // We can take a shortcut here if a previous call already gave us the height. + if let Some(block_height) = known_block_height { + // if we have mismatched heights something probably went wrong + if merkle_proof.block_height != block_height { + return Err(MutinyError::ChainAccessFailed); + } + return Ok(Some(ConfirmedTx { + tx, + block_header, + pos: merkle_proof.pos, + block_height, + })); + } + + return Ok(Some(ConfirmedTx { + tx, + block_header, + pos: merkle_proof.pos, + block_height: merkle_proof.block_height, + })); + } + } + Ok(None) + } + + async fn sync_unconfirmed_transactions( + &self, + confirmables: &Vec<&(dyn Confirm + Sync)>, + ) -> Result<(), MutinyError> { + let client = &*self.wallet.blockchain; + // Query the interface for relevant txids and check whether they are still + // in the best chain, mark them unconfirmed otherwise. + + let relevant_txids = confirmables + .iter() + .flat_map(|c| c.get_relevant_txids()) + .collect::>(); + for txid in relevant_txids { + match client.get_tx_status(&txid).await { + Ok(Some(status)) => { + // Skip if the tx in question is still confirmed. + if status.confirmed { + continue; + } + } + // if the tx no longer exists or errors, we should + // consider it unconfirmed + Ok(None) => (), + Err(_) => (), + } + + for c in confirmables { + c.transaction_unconfirmed(&txid); + } + } + + Ok(()) + } +} + +struct ConfirmedTx { + tx: Transaction, + block_header: BlockHeader, + block_height: u32, + pos: usize, +} + +impl Filter for MutinyChain { + fn register_tx(&self, txid: &Txid, _script_pubkey: &Script) { + self.queued_transactions.lock().unwrap().insert(*txid); + } + + fn register_output(&self, output: WatchedOutput) { + self.queued_outputs.lock().unwrap().insert(output); + } +} + +impl BroadcasterInterface for MutinyChain { + fn broadcast_transaction(&self, tx: &Transaction) { + let blockchain = self.wallet.blockchain.clone(); + let tx_clone = tx.clone(); + spawn_local(async move { + maybe_await!(blockchain.broadcast(&tx_clone)) + .unwrap_or_else(|_| error!("failed to broadcast tx! {}", tx_clone.txid())) + }); + } +} + +impl FeeEstimator for MutinyChain { + fn get_est_sat_per_1000_weight(&self, confirmation_target: ConfirmationTarget) -> u32 { + // todo get from esplora + fallback_fee_from_conf_target(confirmation_target) + } +} + +fn fallback_fee_from_conf_target(confirmation_target: ConfirmationTarget) -> u32 { + match confirmation_target { + ConfirmationTarget::Background => FEERATE_FLOOR_SATS_PER_KW, + ConfirmationTarget::Normal => 2000, + ConfirmationTarget::HighPriority => 5000, + } +} diff --git a/node-manager/src/error.rs b/node-manager/src/error.rs index 052d638fe..7e82675d1 100644 --- a/node-manager/src/error.rs +++ b/node-manager/src/error.rs @@ -1,3 +1,4 @@ +use bdk::esplora_client; use lightning::ln::peer_handler::PeerHandleError; use thiserror::Error; use wasm_bindgen::JsValue; @@ -92,6 +93,14 @@ impl From for MutinyError { } } +// TODO add more granular errors for esplora failures +impl From for MutinyError { + fn from(_e: esplora_client::Error) -> Self { + // This is most likely a chain access failure + Self::ChainAccessFailed + } +} + impl From for MutinyError { fn from(_e: PeerHandleError) -> Self { // TODO handle the case where `no_connection_possible` diff --git a/node-manager/src/lib.rs b/node-manager/src/lib.rs index 8ccaa7e77..2e4deb8e4 100644 --- a/node-manager/src/lib.rs +++ b/node-manager/src/lib.rs @@ -4,6 +4,7 @@ // https://github.com/rustwasm/wasm-bindgen/issues/2882 mod bdkstorage; +mod chain; mod encrypt; mod error; mod keymanager; diff --git a/node-manager/src/node.rs b/node-manager/src/node.rs index 916bd8e38..1a56d4944 100644 --- a/node-manager/src/node.rs +++ b/node-manager/src/node.rs @@ -1,6 +1,6 @@ use crate::ldkstorage::MutinyNodePersister; use crate::localstorage::MutinyBrowserStorage; -use crate::wallet::{esplora_from_network, MutinyWallet}; +use crate::wallet::esplora_from_network; use bdk::blockchain::EsploraBlockchain; use bitcoin::Network; use futures::StreamExt; @@ -12,6 +12,7 @@ use std::str::FromStr; use std::sync::Arc; use wasm_bindgen_futures::spawn_local; +use crate::chain::MutinyChain; use crate::tcpproxy::{SocketDescriptor, TcpProxy}; use crate::{ error::MutinyError, @@ -50,8 +51,8 @@ pub(crate) type PeerManager = LdkPeerManager< type ChainMonitor = chainmonitor::ChainMonitor< InMemorySigner, Arc, - Arc, - Arc, + Arc, + Arc, Arc, Arc, >; @@ -61,6 +62,7 @@ pub struct Node { pub pubkey: PublicKey, pub peer_manager: Arc, pub keys_manager: Arc, + pub chain: Arc, blockchain: Arc, } @@ -69,41 +71,35 @@ impl Node { node_index: NodeIndex, mnemonic: Mnemonic, storage: MutinyBrowserStorage, + chain: Arc, network: Network, ) -> Result { info!("initialized a new node: {}", node_index.uuid); let logger = Arc::new(MutinyLogger::default()); - let keys_manager = Arc::new(create_keys_manager( - mnemonic.clone(), - node_index.child_index, - )); + let keys_manager = Arc::new(create_keys_manager(mnemonic, node_index.child_index)); let pubkey = pubkey_from_keys_manager(&keys_manager); // init the persister - let persister = Arc::new(MutinyNodePersister::new( - node_index.uuid.clone(), - storage.clone(), - )); + let persister = Arc::new(MutinyNodePersister::new(node_index.uuid.clone(), storage)); // init esplora let blockchain = Arc::new(esplora_from_network(network)); - // init wallet - let wallet = Arc::new(MutinyWallet::new(mnemonic, storage, network)); - // init chain monitor - let chain_monitor: Arc = Arc::new(chainmonitor::ChainMonitor::new( + // todo use chain monitor + let _chain_monitor: Arc = Arc::new(ChainMonitor::new( None, - wallet.clone(), + chain.clone(), logger.clone(), - wallet, + chain.clone(), persister.clone(), )); + // todo use channelmonitors to init node // read channelmonitor state from disk - let channelmonitors = persister + let _channelmonitors = persister .read_channel_monitors(keys_manager.clone()) .unwrap(); @@ -121,6 +117,7 @@ impl Node { pubkey, peer_manager: Arc::new(peer_man), keys_manager, + chain, blockchain, }) } diff --git a/node-manager/src/nodemanager.rs b/node-manager/src/nodemanager.rs index 47665c747..3217297d5 100644 --- a/node-manager/src/nodemanager.rs +++ b/node-manager/src/nodemanager.rs @@ -1,8 +1,10 @@ use std::collections::HashMap; use std::{str::FromStr, sync::Arc}; +use crate::chain::MutinyChain; use crate::error::{MutinyError, MutinyJsError, MutinyStorageError}; use crate::keymanager; +use crate::logging::MutinyLogger; use crate::node::Node; use crate::{localstorage::MutinyBrowserStorage, utils::set_panic_hook, wallet::MutinyWallet}; use bdk::wallet::AddressIndex; @@ -21,7 +23,8 @@ use wasm_bindgen::prelude::*; pub struct NodeManager { mnemonic: Mnemonic, network: Network, - wallet: MutinyWallet, + wallet: Arc, + chain: Arc, storage: MutinyBrowserStorage, node_storage: Mutex, nodes: Arc>>>, @@ -91,7 +94,13 @@ impl NodeManager { }), }; - let wallet = MutinyWallet::new(mnemonic.clone(), storage.clone(), network); + let wallet = Arc::new(MutinyWallet::new( + mnemonic.clone(), + storage.clone(), + network, + )); + + let chain = Arc::new(MutinyChain::new(wallet.clone())); let node_storage = match MutinyBrowserStorage::get_nodes() { Ok(node_storage) => node_storage, @@ -99,7 +108,7 @@ impl NodeManager { return Err(MutinyError::ReadError { source: MutinyStorageError::Other(e.into()), } - .into()) + .into()); } }; @@ -107,6 +116,7 @@ impl NodeManager { mnemonic, network, wallet, + chain, storage, node_storage: Mutex::new(node_storage), nodes: Arc::new(Mutex::new(HashMap::new())), // TODO init the nodes @@ -124,7 +134,7 @@ impl NodeManager { Err(_) => return Err(MutinyError::WalletOperationFailed.into()), }; - self.wallet.broadcast_transaction(&tx); + self.chain.broadcast_transaction(&tx); Ok(()) } @@ -174,8 +184,13 @@ impl NodeManager { #[wasm_bindgen] pub async fn sync(&self) -> Result<(), MutinyJsError> { + // sync bdk wallet match self.wallet.sync().await { - Ok(()) => Ok(()), + Ok(()) => { + // sync ldk wallet + // todo get new chain txs and pass them in the vec + self.chain.sync(Vec::new()).await.map_err(|e| e.into()) + } Err(e) => Err(e.into()), } } @@ -261,6 +276,7 @@ pub(crate) async fn create_new_node_from_node_manager( next_node.clone(), node_manager.mnemonic.clone(), node_manager.storage.clone(), + node_manager.chain.clone(), node_manager.network, ) { Ok(new_node) => new_node, diff --git a/node-manager/src/wallet.rs b/node-manager/src/wallet.rs index ee0159917..cc2d37d9b 100644 --- a/node-manager/src/wallet.rs +++ b/node-manager/src/wallet.rs @@ -1,6 +1,6 @@ use anyhow::Context; use futures::lock::Mutex; -use log::{debug, error}; +use log::debug; use std::str::FromStr; use std::sync::Arc; @@ -11,9 +11,7 @@ use bdk::{FeeRate, SignOptions, SyncOptions, Wallet}; use bdk_macros::maybe_await; use bip39::Mnemonic; use bitcoin::util::bip32::{ChildNumber, DerivationPath, ExtendedPrivKey}; -use bitcoin::{Address, Network, Transaction}; -use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator}; -use wasm_bindgen_futures::spawn_local; +use bitcoin::{Address, Network}; use crate::error::MutinyError; use crate::localstorage::MutinyBrowserStorage; @@ -21,7 +19,7 @@ use crate::localstorage::MutinyBrowserStorage; #[derive(Debug)] pub struct MutinyWallet { pub wallet: Mutex>, - blockchain: Arc, + pub blockchain: Arc, } impl MutinyWallet { @@ -122,26 +120,6 @@ impl MutinyWallet { } } -impl BroadcasterInterface for MutinyWallet { - fn broadcast_transaction(&self, tx: &Transaction) { - let blockchain = self.blockchain.clone(); - let tx_clone = tx.clone(); - spawn_local(async move { - maybe_await!(blockchain.broadcast(&tx_clone)) - .unwrap_or_else(|_| error!("failed to broadcast tx! {}", tx_clone.txid())) - }); - } -} - -const MIN_FEERATE: u32 = 253 * 4; - -impl FeeEstimator for MutinyWallet { - fn get_est_sat_per_1000_weight(&self, confirmation_target: ConfirmationTarget) -> u32 { - // TODO - MIN_FEERATE - } -} - // mostly copied from sensei fn get_tr_descriptors_for_extended_key( xkey: ExtendedKey,