diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index 85f0865f5..2a961b29a 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -108,6 +108,7 @@ enum NodeError { "ChannelClosingFailed", "ChannelConfigUpdateFailed", "PersistenceFailed", + "FeerateEstimationUpdateFailed", "WalletOperationFailed", "OnchainTxSigningFailed", "MessageSigningFailed", diff --git a/src/builder.rs b/src/builder.rs index 22f2c5aa0..e48af2cf3 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -1,10 +1,12 @@ use crate::event::EventQueue; +use crate::fee_estimator::OnchainFeeEstimator; use crate::gossip::GossipSource; use crate::io; use crate::io::sqlite_store::SqliteStore; use crate::logger::{log_error, FilesystemLogger, Logger}; use crate::payment_store::PaymentStore; use crate::peer_store::PeerStore; +use crate::tx_broadcaster::TransactionBroadcaster; use crate::types::{ ChainMonitor, ChannelManager, FakeMessageRouter, GossipSync, KeysManager, NetworkGraph, OnionMessenger, PeerManager, @@ -464,13 +466,19 @@ fn build_with_store_internal( BuildError::WalletSetupFailed })?; - let (blockchain, tx_sync) = match chain_data_source_config { + let (blockchain, tx_sync, tx_broadcaster, fee_estimator) = match chain_data_source_config { Some(ChainDataSourceConfig::Esplora(server_url)) => { let tx_sync = Arc::new(EsploraSyncClient::new(server_url.clone(), Arc::clone(&logger))); let blockchain = EsploraBlockchain::from_client(tx_sync.client().clone(), BDK_CLIENT_STOP_GAP) .with_concurrency(BDK_CLIENT_CONCURRENCY); - (blockchain, tx_sync) + let tx_broadcaster = Arc::new(TransactionBroadcaster::new( + tx_sync.client().clone(), + Arc::clone(&logger), + )); + let fee_estimator = + Arc::new(OnchainFeeEstimator::new(tx_sync.client().clone(), Arc::clone(&logger))); + (blockchain, tx_sync, tx_broadcaster, fee_estimator) } None => { // Default to Esplora client. @@ -479,20 +487,31 @@ fn build_with_store_internal( let blockchain = EsploraBlockchain::from_client(tx_sync.client().clone(), BDK_CLIENT_STOP_GAP) .with_concurrency(BDK_CLIENT_CONCURRENCY); - (blockchain, tx_sync) + let tx_broadcaster = Arc::new(TransactionBroadcaster::new( + tx_sync.client().clone(), + Arc::clone(&logger), + )); + let fee_estimator = + Arc::new(OnchainFeeEstimator::new(tx_sync.client().clone(), Arc::clone(&logger))); + (blockchain, tx_sync, tx_broadcaster, fee_estimator) } }; let runtime = Arc::new(RwLock::new(None)); - let wallet = - Arc::new(Wallet::new(blockchain, bdk_wallet, Arc::clone(&runtime), Arc::clone(&logger))); + let wallet = Arc::new(Wallet::new( + blockchain, + bdk_wallet, + Arc::clone(&tx_broadcaster), + Arc::clone(&fee_estimator), + Arc::clone(&logger), + )); // Initialize the ChainMonitor let chain_monitor: Arc> = Arc::new(chainmonitor::ChainMonitor::new( Some(Arc::clone(&tx_sync)), - Arc::clone(&wallet), + Arc::clone(&tx_broadcaster), Arc::clone(&logger), - Arc::clone(&wallet), + Arc::clone(&fee_estimator), Arc::clone(&kv_store), )); @@ -592,9 +611,9 @@ fn build_with_store_internal( Arc::clone(&keys_manager), Arc::clone(&keys_manager), Arc::clone(&keys_manager), - Arc::clone(&wallet), + Arc::clone(&fee_estimator), Arc::clone(&chain_monitor), - Arc::clone(&wallet), + Arc::clone(&tx_broadcaster), Arc::clone(&router), Arc::clone(&logger), user_config, @@ -616,9 +635,9 @@ fn build_with_store_internal( best_block: BestBlock::new(genesis_block_hash, 0), }; channelmanager::ChannelManager::new( - Arc::clone(&wallet), + Arc::clone(&fee_estimator), Arc::clone(&chain_monitor), - Arc::clone(&wallet), + Arc::clone(&tx_broadcaster), Arc::clone(&router), Arc::clone(&logger), Arc::clone(&keys_manager), @@ -767,6 +786,8 @@ fn build_with_store_internal( config, wallet, tx_sync, + tx_broadcaster, + fee_estimator, event_queue, channel_manager, chain_monitor, diff --git a/src/error.rs b/src/error.rs index b924fcda3..267239e04 100644 --- a/src/error.rs +++ b/src/error.rs @@ -25,6 +25,8 @@ pub enum Error { ChannelConfigUpdateFailed, /// Persistence failed. PersistenceFailed, + /// A fee rate estimation update failed. + FeerateEstimationUpdateFailed, /// A wallet operation failed. WalletOperationFailed, /// A signing operation for transaction failed. @@ -79,6 +81,9 @@ impl fmt::Display for Error { Self::ChannelClosingFailed => write!(f, "Failed to close channel."), Self::ChannelConfigUpdateFailed => write!(f, "Failed to update channel config."), Self::PersistenceFailed => write!(f, "Failed to persist data."), + Self::FeerateEstimationUpdateFailed => { + write!(f, "Failed to update fee rate estimates.") + } Self::WalletOperationFailed => write!(f, "Failed to conduct wallet operation."), Self::OnchainTxSigningFailed => write!(f, "Failed to sign given transaction."), Self::MessageSigningFailed => write!(f, "Failed to sign given message."), diff --git a/src/event.rs b/src/event.rs index ca92346d8..9cb0f7070 100644 --- a/src/event.rs +++ b/src/event.rs @@ -1,6 +1,7 @@ -use crate::peer_store::{PeerInfo, PeerStore}; +use crate::types::{Broadcaster, FeeEstimator, Wallet}; use crate::{ - hex_utils, ChannelManager, Config, Error, KeysManager, NetworkGraph, UserChannelId, Wallet, + hex_utils, ChannelManager, Config, Error, KeysManager, NetworkGraph, PeerInfo, PeerStore, + UserChannelId, }; use crate::payment_store::{ @@ -13,7 +14,9 @@ use crate::io::{ }; use crate::logger::{log_debug, log_error, log_info, Logger}; -use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator}; +use lightning::chain::chaininterface::{ + BroadcasterInterface, ConfirmationTarget, FeeEstimator as LDKFeeEstimator, +}; use lightning::events::Event as LdkEvent; use lightning::events::PaymentPurpose; use lightning::impl_writeable_tlv_based_enum; @@ -243,16 +246,18 @@ pub(crate) struct EventHandler where L::Target: Logger, { - wallet: Arc>, event_queue: Arc>, + wallet: Arc, channel_manager: Arc>, + tx_broadcaster: Arc, + fee_estimator: Arc, network_graph: Arc, keys_manager: Arc, payment_store: Arc>, + peer_store: Arc>, runtime: Arc>>, logger: L, config: Arc, - peer_store: Arc>, } impl EventHandler @@ -260,23 +265,26 @@ where L::Target: Logger, { pub fn new( - wallet: Arc>, event_queue: Arc>, - channel_manager: Arc>, network_graph: Arc, + event_queue: Arc>, wallet: Arc, + channel_manager: Arc>, tx_broadcaster: Arc, + fee_estimator: Arc, network_graph: Arc, keys_manager: Arc, payment_store: Arc>, - runtime: Arc>>, logger: L, config: Arc, - peer_store: Arc>, + peer_store: Arc>, runtime: Arc>>, + logger: L, config: Arc, ) -> Self { Self { event_queue, wallet, channel_manager, + tx_broadcaster, + fee_estimator, network_graph, keys_manager, payment_store, + peer_store, logger, runtime, config, - peer_store, } } @@ -586,7 +594,7 @@ where let output_descriptors = &outputs.iter().collect::>(); let tx_feerate = self - .wallet + .fee_estimator .get_est_sat_per_1000_weight(ConfirmationTarget::NonAnchorChannelFee); // We set nLockTime to the current height to discourage fee sniping. @@ -603,7 +611,9 @@ where ); match res { - Ok(Some(spending_tx)) => self.wallet.broadcast_transactions(&[&spending_tx]), + Ok(Some(spending_tx)) => { + self.tx_broadcaster.broadcast_transactions(&[&spending_tx]) + } Ok(None) => { log_debug!(self.logger, "Omitted spending static outputs: {:?}", outputs); } diff --git a/src/fee_estimator.rs b/src/fee_estimator.rs new file mode 100644 index 000000000..4116c992d --- /dev/null +++ b/src/fee_estimator.rs @@ -0,0 +1,131 @@ +use crate::logger::{log_error, log_trace, Logger}; +use crate::Error; + +use lightning::chain::chaininterface::{ + ConfirmationTarget, FeeEstimator, FEERATE_FLOOR_SATS_PER_KW, +}; + +use bdk::FeeRate; +use esplora_client::AsyncClient as EsploraClient; + +use std::collections::HashMap; +use std::ops::Deref; +use std::sync::RwLock; + +pub(crate) struct OnchainFeeEstimator +where + L::Target: Logger, +{ + fee_rate_cache: RwLock>, + esplora_client: EsploraClient, + logger: L, +} + +impl OnchainFeeEstimator +where + L::Target: Logger, +{ + pub(crate) fn new(esplora_client: EsploraClient, logger: L) -> Self { + let fee_rate_cache = RwLock::new(HashMap::new()); + Self { fee_rate_cache, esplora_client, logger } + } + + pub(crate) async fn update_fee_estimates(&self) -> Result<(), Error> { + let confirmation_targets = vec![ + ConfirmationTarget::OnChainSweep, + ConfirmationTarget::MaxAllowedNonAnchorChannelRemoteFee, + ConfirmationTarget::MinAllowedAnchorChannelRemoteFee, + ConfirmationTarget::MinAllowedNonAnchorChannelRemoteFee, + ConfirmationTarget::AnchorChannelFee, + ConfirmationTarget::NonAnchorChannelFee, + ConfirmationTarget::ChannelCloseMinimum, + ]; + for target in confirmation_targets { + let num_blocks = match target { + ConfirmationTarget::OnChainSweep => 6, + ConfirmationTarget::MaxAllowedNonAnchorChannelRemoteFee => 1, + ConfirmationTarget::MinAllowedAnchorChannelRemoteFee => 1008, + ConfirmationTarget::MinAllowedNonAnchorChannelRemoteFee => 144, + ConfirmationTarget::AnchorChannelFee => 1008, + ConfirmationTarget::NonAnchorChannelFee => 12, + ConfirmationTarget::ChannelCloseMinimum => 144, + }; + + let estimates = self.esplora_client.get_fee_estimates().await.map_err(|e| { + log_error!( + self.logger, + "Failed to retrieve fee rate estimates for {:?}: {}", + target, + e + ); + Error::FeerateEstimationUpdateFailed + })?; + + let converted_estimates = esplora_client::convert_fee_rate(num_blocks, estimates) + .map_err(|e| { + log_error!( + self.logger, + "Failed to convert fee rate estimates for {:?}: {}", + target, + e + ); + Error::FeerateEstimationUpdateFailed + })?; + + let fee_rate = FeeRate::from_sat_per_vb(converted_estimates); + + // LDK 0.0.118 introduced changes to the `ConfirmationTarget` semantics that + // require some post-estimation adjustments to the fee rates, which we do here. + let adjusted_fee_rate = match target { + ConfirmationTarget::MaxAllowedNonAnchorChannelRemoteFee => { + let really_high_prio = fee_rate.as_sat_per_vb() * 10.0; + FeeRate::from_sat_per_vb(really_high_prio) + } + ConfirmationTarget::MinAllowedNonAnchorChannelRemoteFee => { + let slightly_less_than_background = fee_rate.fee_wu(1000) - 250; + FeeRate::from_sat_per_kwu(slightly_less_than_background as f32) + } + _ => fee_rate, + }; + + let mut locked_fee_rate_cache = self.fee_rate_cache.write().unwrap(); + locked_fee_rate_cache.insert(target, adjusted_fee_rate); + log_trace!( + self.logger, + "Fee rate estimation updated for {:?}: {} sats/kwu", + target, + adjusted_fee_rate.fee_wu(1000) + ); + } + Ok(()) + } + + pub(crate) fn estimate_fee_rate(&self, confirmation_target: ConfirmationTarget) -> FeeRate { + let locked_fee_rate_cache = self.fee_rate_cache.read().unwrap(); + + let fallback_sats_kwu = match confirmation_target { + ConfirmationTarget::OnChainSweep => 5000, + ConfirmationTarget::MaxAllowedNonAnchorChannelRemoteFee => 25 * 250, + ConfirmationTarget::MinAllowedAnchorChannelRemoteFee => FEERATE_FLOOR_SATS_PER_KW, + ConfirmationTarget::MinAllowedNonAnchorChannelRemoteFee => FEERATE_FLOOR_SATS_PER_KW, + ConfirmationTarget::AnchorChannelFee => 500, + ConfirmationTarget::NonAnchorChannelFee => 1000, + ConfirmationTarget::ChannelCloseMinimum => 500, + }; + + // We'll fall back on this, if we really don't have any other information. + let fallback_rate = FeeRate::from_sat_per_kwu(fallback_sats_kwu as f32); + + *locked_fee_rate_cache.get(&confirmation_target).unwrap_or(&fallback_rate) + } +} + +impl FeeEstimator for OnchainFeeEstimator +where + L::Target: Logger, +{ + fn get_est_sat_per_1000_weight(&self, confirmation_target: ConfirmationTarget) -> u32 { + (self.estimate_fee_rate(confirmation_target).fee_wu(1000) as u32) + .max(FEERATE_FLOOR_SATS_PER_KW) + } +} diff --git a/src/lib.rs b/src/lib.rs index 33574972b..ae0fda1bd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -78,6 +78,7 @@ mod builder; mod error; mod event; +mod fee_estimator; mod gossip; mod hex_utils; pub mod io; @@ -86,6 +87,7 @@ mod payment_store; mod peer_store; #[cfg(test)] mod test; +mod tx_broadcaster; mod types; #[cfg(feature = "uniffi")] mod uniffi_types; @@ -118,9 +120,11 @@ use gossip::GossipSource; use payment_store::PaymentStore; pub use payment_store::{PaymentDetails, PaymentDirection, PaymentStatus}; use peer_store::{PeerInfo, PeerStore}; -use types::{ChainMonitor, ChannelManager, KeysManager, NetworkGraph, PeerManager, Router, Scorer}; +use types::{ + Broadcaster, ChainMonitor, ChannelManager, FeeEstimator, KeysManager, NetworkGraph, + PeerManager, Router, Scorer, Wallet, +}; pub use types::{ChannelDetails, PeerDetails, UserChannelId}; -use wallet::Wallet; use logger::{log_error, log_info, log_trace, FilesystemLogger, Logger}; @@ -284,8 +288,10 @@ pub struct Node { stop_sender: tokio::sync::watch::Sender<()>, stop_receiver: tokio::sync::watch::Receiver<()>, config: Arc, - wallet: Arc>>, + wallet: Arc, tx_sync: Arc>>, + tx_broadcaster: Arc, + fee_estimator: Arc, event_queue: Arc>>, channel_manager: Arc>, chain_monitor: Arc>, @@ -320,13 +326,13 @@ impl Node { let runtime = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap(); // Block to ensure we update our fee rate cache once on startup - let wallet = Arc::clone(&self.wallet); + let fee_estimator = Arc::clone(&self.fee_estimator); let sync_logger = Arc::clone(&self.logger); let runtime_ref = &runtime; tokio::task::block_in_place(move || { runtime_ref.block_on(async move { let now = Instant::now(); - match wallet.update_fee_estimates().await { + match fee_estimator.update_fee_estimates().await { Ok(()) => { log_info!( sync_logger, @@ -349,8 +355,6 @@ impl Node { let mut stop_sync = self.stop_receiver.clone(); let onchain_wallet_sync_interval_secs = self.config.onchain_wallet_sync_interval_secs.max(WALLET_SYNC_INTERVAL_MINIMUM_SECS); - let fee_rate_cache_update_interval_secs = - self.config.fee_rate_cache_update_interval_secs.max(WALLET_SYNC_INTERVAL_MINIMUM_SECS); std::thread::spawn(move || { tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on( async move { @@ -359,11 +363,6 @@ impl Node { ); onchain_wallet_sync_interval .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - let mut fee_rate_update_interval = tokio::time::interval(Duration::from_secs( - fee_rate_cache_update_interval_secs, - )); - // We just blocked on updating, so skip the first tick. - fee_rate_update_interval.reset(); loop { tokio::select! { _ = stop_sync.changed() => { @@ -386,29 +385,50 @@ impl Node { } } } - _ = fee_rate_update_interval.tick() => { - let now = Instant::now(); - match wallet.update_fee_estimates().await { - Ok(()) => log_trace!( - sync_logger, - "Background update of fee rate cache finished in {}ms.", - now.elapsed().as_millis() - ), - Err(err) => { - log_error!( - sync_logger, - "Background update of fee rate cache failed: {}", - err - ) - } - } - } } } }, ); }); + let mut stop_fee_updates = self.stop_receiver.clone(); + let fee_update_logger = Arc::clone(&self.logger); + let fee_estimator = Arc::clone(&self.fee_estimator); + let fee_rate_cache_update_interval_secs = + self.config.fee_rate_cache_update_interval_secs.max(WALLET_SYNC_INTERVAL_MINIMUM_SECS); + runtime.spawn(async move { + let mut fee_rate_update_interval = + tokio::time::interval(Duration::from_secs(fee_rate_cache_update_interval_secs)); + // We just blocked on updating, so skip the first tick. + fee_rate_update_interval.reset(); + fee_rate_update_interval + .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + loop { + tokio::select! { + _ = stop_fee_updates.changed() => { + return; + } + _ = fee_rate_update_interval.tick() => { + let now = Instant::now(); + match fee_estimator.update_fee_estimates().await { + Ok(()) => log_trace!( + fee_update_logger, + "Background update of fee rate cache finished in {}ms.", + now.elapsed().as_millis() + ), + Err(err) => { + log_error!( + fee_update_logger, + "Background update of fee rate cache failed: {}", + err + ) + } + } + } + } + } + }); + let tx_sync = Arc::clone(&self.tx_sync); let sync_cman = Arc::clone(&self.channel_manager); let sync_cmon = Arc::clone(&self.chain_monitor); @@ -653,17 +673,37 @@ impl Node { } }); + let mut stop_tx_bcast = self.stop_receiver.clone(); + let tx_bcaster = Arc::clone(&self.tx_broadcaster); + runtime.spawn(async move { + // Every second we try to clear our broadcasting queue. + let mut interval = tokio::time::interval(Duration::from_secs(1)); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + loop { + tokio::select! { + _ = stop_tx_bcast.changed() => { + return; + } + _ = interval.tick() => { + tx_bcaster.process_queue().await; + } + } + } + }); + let event_handler = Arc::new(EventHandler::new( - Arc::clone(&self.wallet), Arc::clone(&self.event_queue), + Arc::clone(&self.wallet), Arc::clone(&self.channel_manager), + Arc::clone(&self.tx_broadcaster), + Arc::clone(&self.fee_estimator), Arc::clone(&self.network_graph), Arc::clone(&self.keys_manager), Arc::clone(&self.payment_store), + Arc::clone(&self.peer_store), Arc::clone(&self.runtime), Arc::clone(&self.logger), Arc::clone(&self.config), - Arc::clone(&self.peer_store), )); // Setup background processing diff --git a/src/logger.rs b/src/logger.rs index cab62c61d..b0504a354 100644 --- a/src/logger.rs +++ b/src/logger.rs @@ -1,5 +1,5 @@ pub(crate) use lightning::util::logger::Logger; -pub(crate) use lightning::{log_debug, log_error, log_info, log_trace}; +pub(crate) use lightning::{log_bytes, log_debug, log_error, log_info, log_trace}; use lightning::util::logger::{Level, Record}; use lightning::util::ser::Writer; diff --git a/src/tx_broadcaster.rs b/src/tx_broadcaster.rs new file mode 100644 index 000000000..4e56cbafe --- /dev/null +++ b/src/tx_broadcaster.rs @@ -0,0 +1,108 @@ +use crate::logger::{log_bytes, log_debug, log_error, log_trace, Logger}; + +use lightning::chain::chaininterface::BroadcasterInterface; +use lightning::util::ser::Writeable; + +use esplora_client::AsyncClient as EsploraClient; + +use bitcoin::Transaction; + +use tokio::sync::mpsc; +use tokio::sync::Mutex; + +use std::ops::Deref; +use std::time::Duration; + +const BCAST_PACKAGE_QUEUE_SIZE: usize = 50; + +pub(crate) struct TransactionBroadcaster +where + L::Target: Logger, +{ + queue_sender: mpsc::Sender>, + queue_receiver: Mutex>>, + esplora_client: EsploraClient, + logger: L, +} + +impl TransactionBroadcaster +where + L::Target: Logger, +{ + pub(crate) fn new(esplora_client: EsploraClient, logger: L) -> Self { + let (queue_sender, queue_receiver) = mpsc::channel(BCAST_PACKAGE_QUEUE_SIZE); + Self { queue_sender, queue_receiver: Mutex::new(queue_receiver), esplora_client, logger } + } + + pub(crate) async fn process_queue(&self) { + let mut receiver = self.queue_receiver.lock().await; + while let Some(next_package) = receiver.recv().await { + for tx in &next_package { + match self.esplora_client.broadcast(tx).await { + Ok(()) => { + log_trace!(self.logger, "Successfully broadcast transaction {}", tx.txid()); + } + Err(e) => match e { + esplora_client::Error::Reqwest(_) => { + // Wait 500 ms and retry in case we get a `Reqwest` error (typically + // 429) + tokio::time::sleep(Duration::from_millis(500)).await; + log_error!( + self.logger, + "Sync failed due to HTTP connection error, retrying: {}", + e + ); + match self.esplora_client.broadcast(tx).await { + Ok(()) => { + log_debug!( + self.logger, + "Successfully broadcast transaction {}", + tx.txid() + ); + } + Err(e) => { + log_error!( + self.logger, + "Failed to broadcast transaction {}: {}", + tx.txid(), + e + ); + log_trace!( + self.logger, + "Failed broadcast transaction bytes: {}", + log_bytes!(tx.encode()) + ); + } + } + } + _ => { + log_error!( + self.logger, + "Failed to broadcast transaction {}: {}", + tx.txid(), + e + ); + log_trace!( + self.logger, + "Failed broadcast transaction bytes: {}", + log_bytes!(tx.encode()) + ); + } + }, + } + } + } + } +} + +impl BroadcasterInterface for TransactionBroadcaster +where + L::Target: Logger, +{ + fn broadcast_transactions(&self, txs: &[&Transaction]) { + let package = txs.iter().map(|&t| t.clone()).collect::>(); + self.queue_sender.try_send(package).unwrap_or_else(|e| { + log_error!(self.logger, "Failed to broadcast transactions: {}", e); + }); + } +} diff --git a/src/types.rs b/src/types.rs index 1235ce625..ead397a75 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,5 +1,4 @@ use crate::logger::FilesystemLogger; -use crate::wallet::{Wallet, WalletKeysManager}; use lightning::chain::chainmonitor; use lightning::ln::channelmanager::ChannelDetails as LdkChannelDetails; @@ -25,8 +24,8 @@ use std::sync::{Arc, Mutex, RwLock}; pub(crate) type ChainMonitor = chainmonitor::ChainMonitor< InMemorySigner, Arc>>, - Arc>>, - Arc>>, + Arc, + Arc, Arc, Arc, >; @@ -38,22 +37,37 @@ pub(crate) type PeerManager = lightning::ln::peer_handler::PeerManager< Arc, Arc, IgnoringMessageHandler, - Arc>>, + Arc, >; pub(crate) type ChannelManager = lightning::ln::channelmanager::ChannelManager< Arc>, - Arc>>, - Arc>>, - Arc>>, - Arc>>, - Arc>>, + Arc, + Arc, + Arc, + Arc, + Arc, Arc, Arc, >; -pub(crate) type KeysManager = - WalletKeysManager>; +pub(crate) type Broadcaster = crate::tx_broadcaster::TransactionBroadcaster>; + +pub(crate) type FeeEstimator = crate::fee_estimator::OnchainFeeEstimator>; + +pub(crate) type Wallet = crate::wallet::Wallet< + bdk::database::SqliteDatabase, + Arc, + Arc, + Arc, +>; + +pub(crate) type KeysManager = crate::wallet::WalletKeysManager< + bdk::database::SqliteDatabase, + Arc, + Arc, + Arc, +>; pub(crate) type Router = DefaultRouter< Arc, @@ -85,8 +99,8 @@ pub(crate) type GossipSync = lightning_background_processor::GossipSync< >; pub(crate) type OnionMessenger = lightning::onion_message::OnionMessenger< - Arc>>, - Arc>>, + Arc, + Arc, Arc, Arc, IgnoringMessageHandler, diff --git a/src/wallet.rs b/src/wallet.rs index 4221dc084..0279225cd 100644 --- a/src/wallet.rs +++ b/src/wallet.rs @@ -2,9 +2,7 @@ use crate::logger::{log_error, log_info, log_trace, Logger}; use crate::Error; -use lightning::chain::chaininterface::{ - BroadcasterInterface, ConfirmationTarget, FeeEstimator, FEERATE_FLOOR_SATS_PER_KW, -}; +use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator}; use lightning::ln::msgs::{DecodeError, UnsignedGossipMessage}; use lightning::ln::script::ShutdownScript; @@ -15,10 +13,11 @@ use lightning::sign::{ use lightning::util::message_signing; -use bdk::blockchain::{Blockchain, EsploraBlockchain}; +use bdk::blockchain::EsploraBlockchain; use bdk::database::BatchDatabase; use bdk::wallet::AddressIndex; -use bdk::{FeeRate, SignOptions, SyncOptions}; +use bdk::FeeRate; +use bdk::{SignOptions, SyncOptions}; use bitcoin::bech32::u5; use bitcoin::secp256k1::ecdh::SharedSecret; @@ -26,14 +25,15 @@ use bitcoin::secp256k1::ecdsa::{RecoverableSignature, Signature}; use bitcoin::secp256k1::{PublicKey, Scalar, Secp256k1, Signing}; use bitcoin::{LockTime, PackedLockTime, Script, Transaction, TxOut, Txid}; -use std::collections::HashMap; use std::ops::Deref; -use std::sync::{Arc, Condvar, Mutex, RwLock}; +use std::sync::{Arc, Condvar, Mutex}; use std::time::Duration; -pub struct Wallet +pub struct Wallet where D: BatchDatabase, + B::Target: BroadcasterInterface, + E::Target: FeeEstimator, L::Target: Logger, { // A BDK blockchain used for wallet sync. @@ -41,25 +41,26 @@ where // A BDK on-chain wallet. inner: Mutex>, // A cache storing the most recently retrieved fee rate estimations. - fee_rate_cache: RwLock>, - runtime: Arc>>, + broadcaster: B, + fee_estimator: E, sync_lock: (Mutex<()>, Condvar), logger: L, } -impl Wallet +impl Wallet where D: BatchDatabase, + B::Target: BroadcasterInterface, + E::Target: FeeEstimator, L::Target: Logger, { pub(crate) fn new( - blockchain: EsploraBlockchain, wallet: bdk::Wallet, - runtime: Arc>>, logger: L, + blockchain: EsploraBlockchain, wallet: bdk::Wallet, broadcaster: B, fee_estimator: E, + logger: L, ) -> Self { let inner = Mutex::new(wallet); - let fee_rate_cache = RwLock::new(HashMap::new()); let sync_lock = (Mutex::new(()), Condvar::new()); - Self { blockchain, inner, fee_rate_cache, runtime, sync_lock, logger } + Self { blockchain, inner, broadcaster, fee_estimator, sync_lock, logger } } pub(crate) async fn sync(&self) -> Result<(), Error> { @@ -112,72 +113,13 @@ where res } - pub(crate) async fn update_fee_estimates(&self) -> Result<(), Error> { - let mut locked_fee_rate_cache = self.fee_rate_cache.write().unwrap(); - - let confirmation_targets = vec![ - ConfirmationTarget::OnChainSweep, - ConfirmationTarget::MaxAllowedNonAnchorChannelRemoteFee, - ConfirmationTarget::MinAllowedAnchorChannelRemoteFee, - ConfirmationTarget::MinAllowedNonAnchorChannelRemoteFee, - ConfirmationTarget::AnchorChannelFee, - ConfirmationTarget::NonAnchorChannelFee, - ConfirmationTarget::ChannelCloseMinimum, - ]; - for target in confirmation_targets { - let num_blocks = match target { - ConfirmationTarget::OnChainSweep => 6, - ConfirmationTarget::MaxAllowedNonAnchorChannelRemoteFee => 1, - ConfirmationTarget::MinAllowedAnchorChannelRemoteFee => 1008, - ConfirmationTarget::MinAllowedNonAnchorChannelRemoteFee => 144, - ConfirmationTarget::AnchorChannelFee => 1008, - ConfirmationTarget::NonAnchorChannelFee => 12, - ConfirmationTarget::ChannelCloseMinimum => 144, - }; - - let est_fee_rate = self.blockchain.estimate_fee(num_blocks).await; - - match est_fee_rate { - Ok(rate) => { - // LDK 0.0.118 introduced changes to the `ConfirmationTarget` semantics that - // require some post-estimation adjustments to the fee rates, which we do here. - let adjusted_fee_rate = match target { - ConfirmationTarget::MaxAllowedNonAnchorChannelRemoteFee => { - let really_high_prio = rate.as_sat_per_vb() * 10.0; - FeeRate::from_sat_per_vb(really_high_prio) - } - ConfirmationTarget::MinAllowedNonAnchorChannelRemoteFee => { - let slightly_less_than_background = rate.fee_wu(1000) - 250; - FeeRate::from_sat_per_kwu(slightly_less_than_background as f32) - } - _ => rate, - }; - locked_fee_rate_cache.insert(target, adjusted_fee_rate); - log_trace!( - self.logger, - "Fee rate estimation updated for {:?}: {} sats/kwu", - target, - adjusted_fee_rate.fee_wu(1000) - ); - } - Err(e) => { - log_error!( - self.logger, - "Failed to update fee rate estimation for {:?}: {}", - target, - e - ); - } - } - } - Ok(()) - } - pub(crate) fn create_funding_transaction( &self, output_script: Script, value_sats: u64, confirmation_target: ConfirmationTarget, locktime: LockTime, ) -> Result { - let fee_rate = self.estimate_fee_rate(confirmation_target); + let fee_rate = FeeRate::from_sat_per_kwu( + self.fee_estimator.get_est_sat_per_1000_weight(confirmation_target) as f32, + ); let locked_wallet = self.inner.lock().unwrap(); let mut tx_builder = locked_wallet.build_tx(); @@ -231,7 +173,9 @@ where &self, address: &bitcoin::Address, amount_msat_or_drain: Option, ) -> Result { let confirmation_target = ConfirmationTarget::NonAnchorChannelFee; - let fee_rate = self.estimate_fee_rate(confirmation_target); + let fee_rate = FeeRate::from_sat_per_kwu( + self.fee_estimator.get_est_sat_per_1000_weight(confirmation_target) as f32, + ); let tx = { let locked_wallet = self.inner.lock().unwrap(); @@ -275,7 +219,7 @@ where psbt.extract_tx() }; - self.broadcast_transactions(&[&tx]); + self.broadcaster.broadcast_transactions(&[&tx]); let txid = tx.txid(); @@ -298,92 +242,27 @@ where Ok(txid) } - - fn estimate_fee_rate(&self, confirmation_target: ConfirmationTarget) -> FeeRate { - let locked_fee_rate_cache = self.fee_rate_cache.read().unwrap(); - - let fallback_sats_kwu = match confirmation_target { - ConfirmationTarget::OnChainSweep => 5000, - ConfirmationTarget::MaxAllowedNonAnchorChannelRemoteFee => 25 * 250, - ConfirmationTarget::MinAllowedAnchorChannelRemoteFee => FEERATE_FLOOR_SATS_PER_KW, - ConfirmationTarget::MinAllowedNonAnchorChannelRemoteFee => FEERATE_FLOOR_SATS_PER_KW, - ConfirmationTarget::AnchorChannelFee => 500, - ConfirmationTarget::NonAnchorChannelFee => 1000, - ConfirmationTarget::ChannelCloseMinimum => 500, - }; - - // We'll fall back on this, if we really don't have any other information. - let fallback_rate = FeeRate::from_sat_per_kwu(fallback_sats_kwu as f32); - - *locked_fee_rate_cache.get(&confirmation_target).unwrap_or(&fallback_rate) - } -} - -impl FeeEstimator for Wallet -where - D: BatchDatabase, - L::Target: Logger, -{ - fn get_est_sat_per_1000_weight(&self, confirmation_target: ConfirmationTarget) -> u32 { - (self.estimate_fee_rate(confirmation_target).fee_wu(1000) as u32) - .max(FEERATE_FLOOR_SATS_PER_KW) - } -} - -impl BroadcasterInterface for Wallet -where - D: BatchDatabase, - L::Target: Logger, -{ - fn broadcast_transactions(&self, txs: &[&Transaction]) { - let locked_runtime = self.runtime.read().unwrap(); - if locked_runtime.as_ref().is_none() { - log_error!(self.logger, "Failed to broadcast transaction: No runtime."); - return; - } - - let errors = tokio::task::block_in_place(move || { - locked_runtime.as_ref().unwrap().block_on(async move { - let mut handles = Vec::new(); - let mut errors = Vec::new(); - - for tx in txs { - handles.push((tx.txid(), self.blockchain.broadcast(tx))); - } - - for handle in handles { - match handle.1.await { - Ok(_) => {} - Err(e) => { - errors.push((e, handle.0)); - } - } - } - errors - }) - }); - - for (e, txid) in errors { - log_error!(self.logger, "Failed to broadcast transaction {}: {}", txid, e); - } - } } /// Similar to [`KeysManager`], but overrides the destination and shutdown scripts so they are /// directly spendable by the BDK wallet. -pub struct WalletKeysManager +pub struct WalletKeysManager where D: BatchDatabase, + B::Target: BroadcasterInterface, + E::Target: FeeEstimator, L::Target: Logger, { inner: KeysManager, - wallet: Arc>, + wallet: Arc>, logger: L, } -impl WalletKeysManager +impl WalletKeysManager where D: BatchDatabase, + B::Target: BroadcasterInterface, + E::Target: FeeEstimator, L::Target: Logger, { /// Constructs a `WalletKeysManager` that overrides the destination and shutdown scripts. @@ -392,7 +271,7 @@ where /// `starting_time_nanos`. pub fn new( seed: &[u8; 32], starting_time_secs: u64, starting_time_nanos: u32, - wallet: Arc>, logger: L, + wallet: Arc>, logger: L, ) -> Self { let inner = KeysManager::new(seed, starting_time_secs, starting_time_nanos); Self { inner, wallet, logger } @@ -434,9 +313,11 @@ where } } -impl NodeSigner for WalletKeysManager +impl NodeSigner for WalletKeysManager where D: BatchDatabase, + B::Target: BroadcasterInterface, + E::Target: FeeEstimator, L::Target: Logger, { fn get_node_id(&self, recipient: Recipient) -> Result { @@ -476,9 +357,11 @@ where } } -impl EntropySource for WalletKeysManager +impl EntropySource for WalletKeysManager where D: BatchDatabase, + B::Target: BroadcasterInterface, + E::Target: FeeEstimator, L::Target: Logger, { fn get_secure_random_bytes(&self) -> [u8; 32] { @@ -486,9 +369,11 @@ where } } -impl SignerProvider for WalletKeysManager +impl SignerProvider for WalletKeysManager where D: BatchDatabase, + B::Target: BroadcasterInterface, + E::Target: FeeEstimator, L::Target: Logger, { type Signer = InMemorySigner;