From 88b0cdf74748c7dd593d5d4292c90efb762ac3ea Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Wed, 22 Nov 2023 15:29:48 +0100 Subject: [PATCH 1/5] Clean up `types.rs` --- src/event.rs | 9 +++++---- src/lib.rs | 7 ++++--- src/types.rs | 26 ++++++++++++++------------ 3 files changed, 23 insertions(+), 19 deletions(-) diff --git a/src/event.rs b/src/event.rs index ca92346d8..9691f1664 100644 --- a/src/event.rs +++ b/src/event.rs @@ -1,6 +1,7 @@ -use crate::peer_store::{PeerInfo, PeerStore}; +use crate::types::Wallet; use crate::{ - hex_utils, ChannelManager, Config, Error, KeysManager, NetworkGraph, UserChannelId, Wallet, + hex_utils, ChannelManager, Config, Error, KeysManager, NetworkGraph, PeerInfo, PeerStore, + UserChannelId, }; use crate::payment_store::{ @@ -243,7 +244,7 @@ pub(crate) struct EventHandler where L::Target: Logger, { - wallet: Arc>, + wallet: Arc, event_queue: Arc>, channel_manager: Arc>, network_graph: Arc, @@ -260,7 +261,7 @@ where L::Target: Logger, { pub fn new( - wallet: Arc>, event_queue: Arc>, + wallet: Arc, event_queue: Arc>, channel_manager: Arc>, network_graph: Arc, keys_manager: Arc, payment_store: Arc>, runtime: Arc>>, logger: L, config: Arc, diff --git a/src/lib.rs b/src/lib.rs index 33574972b..e0adac786 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -118,9 +118,10 @@ use gossip::GossipSource; use payment_store::PaymentStore; pub use payment_store::{PaymentDetails, PaymentDirection, PaymentStatus}; use peer_store::{PeerInfo, PeerStore}; -use types::{ChainMonitor, ChannelManager, KeysManager, NetworkGraph, PeerManager, Router, Scorer}; +use types::{ + ChainMonitor, ChannelManager, KeysManager, NetworkGraph, PeerManager, Router, Scorer, Wallet, +}; pub use types::{ChannelDetails, PeerDetails, UserChannelId}; -use wallet::Wallet; use logger::{log_error, log_info, log_trace, FilesystemLogger, Logger}; @@ -284,7 +285,7 @@ pub struct Node { stop_sender: tokio::sync::watch::Sender<()>, stop_receiver: tokio::sync::watch::Receiver<()>, config: Arc, - wallet: Arc>>, + wallet: Arc, tx_sync: Arc>>, event_queue: Arc>>, channel_manager: Arc>, diff --git a/src/types.rs b/src/types.rs index 1235ce625..59748f0e0 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,5 +1,4 @@ use crate::logger::FilesystemLogger; -use crate::wallet::{Wallet, WalletKeysManager}; use lightning::chain::chainmonitor; use lightning::ln::channelmanager::ChannelDetails as LdkChannelDetails; @@ -25,8 +24,8 @@ use std::sync::{Arc, Mutex, RwLock}; pub(crate) type ChainMonitor = chainmonitor::ChainMonitor< InMemorySigner, Arc>>, - Arc>>, - Arc>>, + Arc, + Arc, Arc, Arc, >; @@ -38,22 +37,25 @@ pub(crate) type PeerManager = lightning::ln::peer_handler::PeerManager< Arc, Arc, IgnoringMessageHandler, - Arc>>, + Arc, >; pub(crate) type ChannelManager = lightning::ln::channelmanager::ChannelManager< Arc>, - Arc>>, - Arc>>, - Arc>>, - Arc>>, - Arc>>, + Arc, + Arc, + Arc, + Arc, + Arc, Arc, Arc, >; +pub(crate) type Wallet = + crate::wallet::Wallet>; + pub(crate) type KeysManager = - WalletKeysManager>; + crate::wallet::WalletKeysManager>; pub(crate) type Router = DefaultRouter< Arc, @@ -85,8 +87,8 @@ pub(crate) type GossipSync = lightning_background_processor::GossipSync< >; pub(crate) type OnionMessenger = lightning::onion_message::OnionMessenger< - Arc>>, - Arc>>, + Arc, + Arc, Arc, Arc, IgnoringMessageHandler, From 3c08268090ff81b06a51585c358ced49136f204a Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Wed, 22 Nov 2023 14:05:11 +0100 Subject: [PATCH 2/5] Introduce `TransactionBroadcaster` We introduce a separate `TransactionBroadcaster` which will regularly process a message queue of transactions to be broadcast. This allows us to a) decouple broadcasting from the BDK wallet and b) add transactions to the queue from a blocking context, without the need to schlep around a separate runtime or mess with it. --- src/builder.rs | 30 ++++++++++++----- src/event.rs | 22 ++++++++----- src/lib.rs | 28 ++++++++++++++-- src/tx_broadcaster.rs | 67 ++++++++++++++++++++++++++++++++++++++ src/types.rs | 15 ++++++--- src/wallet.rs | 76 +++++++++++++------------------------------ 6 files changed, 159 insertions(+), 79 deletions(-) create mode 100644 src/tx_broadcaster.rs diff --git a/src/builder.rs b/src/builder.rs index 22f2c5aa0..63dfa6f32 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -5,6 +5,7 @@ use crate::io::sqlite_store::SqliteStore; use crate::logger::{log_error, FilesystemLogger, Logger}; use crate::payment_store::PaymentStore; use crate::peer_store::PeerStore; +use crate::tx_broadcaster::TransactionBroadcaster; use crate::types::{ ChainMonitor, ChannelManager, FakeMessageRouter, GossipSync, KeysManager, NetworkGraph, OnionMessenger, PeerManager, @@ -464,13 +465,17 @@ fn build_with_store_internal( BuildError::WalletSetupFailed })?; - let (blockchain, tx_sync) = match chain_data_source_config { + let (blockchain, tx_sync, tx_broadcaster) = match chain_data_source_config { Some(ChainDataSourceConfig::Esplora(server_url)) => { let tx_sync = Arc::new(EsploraSyncClient::new(server_url.clone(), Arc::clone(&logger))); let blockchain = EsploraBlockchain::from_client(tx_sync.client().clone(), BDK_CLIENT_STOP_GAP) .with_concurrency(BDK_CLIENT_CONCURRENCY); - (blockchain, tx_sync) + let tx_broadcaster = Arc::new(TransactionBroadcaster::new( + tx_sync.client().clone(), + Arc::clone(&logger), + )); + (blockchain, tx_sync, tx_broadcaster) } None => { // Default to Esplora client. @@ -479,18 +484,26 @@ fn build_with_store_internal( let blockchain = EsploraBlockchain::from_client(tx_sync.client().clone(), BDK_CLIENT_STOP_GAP) .with_concurrency(BDK_CLIENT_CONCURRENCY); - (blockchain, tx_sync) + let tx_broadcaster = Arc::new(TransactionBroadcaster::new( + tx_sync.client().clone(), + Arc::clone(&logger), + )); + (blockchain, tx_sync, tx_broadcaster) } }; let runtime = Arc::new(RwLock::new(None)); - let wallet = - Arc::new(Wallet::new(blockchain, bdk_wallet, Arc::clone(&runtime), Arc::clone(&logger))); + let wallet = Arc::new(Wallet::new( + blockchain, + bdk_wallet, + Arc::clone(&tx_broadcaster), + Arc::clone(&logger), + )); // Initialize the ChainMonitor let chain_monitor: Arc> = Arc::new(chainmonitor::ChainMonitor::new( Some(Arc::clone(&tx_sync)), - Arc::clone(&wallet), + Arc::clone(&tx_broadcaster), Arc::clone(&logger), Arc::clone(&wallet), Arc::clone(&kv_store), @@ -594,7 +607,7 @@ fn build_with_store_internal( Arc::clone(&keys_manager), Arc::clone(&wallet), Arc::clone(&chain_monitor), - Arc::clone(&wallet), + Arc::clone(&tx_broadcaster), Arc::clone(&router), Arc::clone(&logger), user_config, @@ -618,7 +631,7 @@ fn build_with_store_internal( channelmanager::ChannelManager::new( Arc::clone(&wallet), Arc::clone(&chain_monitor), - Arc::clone(&wallet), + Arc::clone(&tx_broadcaster), Arc::clone(&router), Arc::clone(&logger), Arc::clone(&keys_manager), @@ -767,6 +780,7 @@ fn build_with_store_internal( config, wallet, tx_sync, + tx_broadcaster, event_queue, channel_manager, chain_monitor, diff --git a/src/event.rs b/src/event.rs index 9691f1664..94fb1f4ca 100644 --- a/src/event.rs +++ b/src/event.rs @@ -1,4 +1,4 @@ -use crate::types::Wallet; +use crate::types::{Broadcaster, Wallet}; use crate::{ hex_utils, ChannelManager, Config, Error, KeysManager, NetworkGraph, PeerInfo, PeerStore, UserChannelId, @@ -244,16 +244,17 @@ pub(crate) struct EventHandler where L::Target: Logger, { - wallet: Arc, event_queue: Arc>, + wallet: Arc, channel_manager: Arc>, + tx_broadcaster: Arc, network_graph: Arc, keys_manager: Arc, payment_store: Arc>, + peer_store: Arc>, runtime: Arc>>, logger: L, config: Arc, - peer_store: Arc>, } impl EventHandler @@ -261,23 +262,24 @@ where L::Target: Logger, { pub fn new( - wallet: Arc, event_queue: Arc>, - channel_manager: Arc>, network_graph: Arc, - keys_manager: Arc, payment_store: Arc>, + event_queue: Arc>, wallet: Arc, + channel_manager: Arc>, tx_broadcaster: Arc, + network_graph: Arc, keys_manager: Arc, + payment_store: Arc>, peer_store: Arc>, runtime: Arc>>, logger: L, config: Arc, - peer_store: Arc>, ) -> Self { Self { event_queue, wallet, channel_manager, + tx_broadcaster, network_graph, keys_manager, payment_store, + peer_store, logger, runtime, config, - peer_store, } } @@ -604,7 +606,9 @@ where ); match res { - Ok(Some(spending_tx)) => self.wallet.broadcast_transactions(&[&spending_tx]), + Ok(Some(spending_tx)) => { + self.tx_broadcaster.broadcast_transactions(&[&spending_tx]) + } Ok(None) => { log_debug!(self.logger, "Omitted spending static outputs: {:?}", outputs); } diff --git a/src/lib.rs b/src/lib.rs index e0adac786..697a93fe3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -86,6 +86,7 @@ mod payment_store; mod peer_store; #[cfg(test)] mod test; +mod tx_broadcaster; mod types; #[cfg(feature = "uniffi")] mod uniffi_types; @@ -119,7 +120,8 @@ use payment_store::PaymentStore; pub use payment_store::{PaymentDetails, PaymentDirection, PaymentStatus}; use peer_store::{PeerInfo, PeerStore}; use types::{ - ChainMonitor, ChannelManager, KeysManager, NetworkGraph, PeerManager, Router, Scorer, Wallet, + Broadcaster, ChainMonitor, ChannelManager, KeysManager, NetworkGraph, PeerManager, Router, + Scorer, Wallet, }; pub use types::{ChannelDetails, PeerDetails, UserChannelId}; @@ -287,6 +289,7 @@ pub struct Node { config: Arc, wallet: Arc, tx_sync: Arc>>, + tx_broadcaster: Arc, event_queue: Arc>>, channel_manager: Arc>, chain_monitor: Arc>, @@ -654,17 +657,36 @@ impl Node { } }); + let mut stop_tx_bcast = self.stop_receiver.clone(); + let tx_bcaster = Arc::clone(&self.tx_broadcaster); + runtime.spawn(async move { + // Every second we try to clear our broadcasting queue. + let mut interval = tokio::time::interval(Duration::from_secs(1)); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + loop { + tokio::select! { + _ = stop_tx_bcast.changed() => { + return; + } + _ = interval.tick() => { + tx_bcaster.process_queue().await; + } + } + } + }); + let event_handler = Arc::new(EventHandler::new( - Arc::clone(&self.wallet), Arc::clone(&self.event_queue), + Arc::clone(&self.wallet), Arc::clone(&self.channel_manager), + Arc::clone(&self.tx_broadcaster), Arc::clone(&self.network_graph), Arc::clone(&self.keys_manager), Arc::clone(&self.payment_store), + Arc::clone(&self.peer_store), Arc::clone(&self.runtime), Arc::clone(&self.logger), Arc::clone(&self.config), - Arc::clone(&self.peer_store), )); // Setup background processing diff --git a/src/tx_broadcaster.rs b/src/tx_broadcaster.rs new file mode 100644 index 000000000..3cba97b23 --- /dev/null +++ b/src/tx_broadcaster.rs @@ -0,0 +1,67 @@ +use crate::logger::{log_error, log_trace, Logger}; + +use lightning::chain::chaininterface::BroadcasterInterface; + +use esplora_client::AsyncClient as EsploraClient; + +use bitcoin::Transaction; + +use tokio::sync::mpsc; +use tokio::sync::Mutex; + +use std::ops::Deref; + +const BCAST_PACKAGE_QUEUE_SIZE: usize = 50; + +pub(crate) struct TransactionBroadcaster +where + L::Target: Logger, +{ + queue_sender: mpsc::Sender>, + queue_receiver: Mutex>>, + esplora_client: EsploraClient, + logger: L, +} + +impl TransactionBroadcaster +where + L::Target: Logger, +{ + pub(crate) fn new(esplora_client: EsploraClient, logger: L) -> Self { + let (queue_sender, queue_receiver) = mpsc::channel(BCAST_PACKAGE_QUEUE_SIZE); + Self { queue_sender, queue_receiver: Mutex::new(queue_receiver), esplora_client, logger } + } + + pub(crate) async fn process_queue(&self) { + let mut receiver = self.queue_receiver.lock().await; + while let Some(next_package) = receiver.recv().await { + for tx in &next_package { + match self.esplora_client.broadcast(tx).await { + Ok(()) => { + log_trace!(self.logger, "Successfully broadcast transaction {}", tx.txid()); + } + Err(e) => { + log_error!( + self.logger, + "Failed to broadcast transaction {}: {}", + tx.txid(), + e + ); + } + } + } + } + } +} + +impl BroadcasterInterface for TransactionBroadcaster +where + L::Target: Logger, +{ + fn broadcast_transactions(&self, txs: &[&Transaction]) { + let package = txs.iter().map(|&t| t.clone()).collect::>(); + self.queue_sender.try_send(package).unwrap_or_else(|e| { + log_error!(self.logger, "Failed to broadcast transactions: {}", e); + }); + } +} diff --git a/src/types.rs b/src/types.rs index 59748f0e0..9371f0698 100644 --- a/src/types.rs +++ b/src/types.rs @@ -24,7 +24,7 @@ use std::sync::{Arc, Mutex, RwLock}; pub(crate) type ChainMonitor = chainmonitor::ChainMonitor< InMemorySigner, Arc>>, - Arc, + Arc, Arc, Arc, Arc, @@ -42,7 +42,7 @@ pub(crate) type PeerManager = lightning::ln::peer_handler::PeerManager< pub(crate) type ChannelManager = lightning::ln::channelmanager::ChannelManager< Arc>, - Arc, + Arc, Arc, Arc, Arc, @@ -51,11 +51,16 @@ pub(crate) type ChannelManager = lightning::ln::channelmanager::ChannelManage Arc, >; +pub(crate) type Broadcaster = crate::tx_broadcaster::TransactionBroadcaster>; + pub(crate) type Wallet = - crate::wallet::Wallet>; + crate::wallet::Wallet, Arc>; -pub(crate) type KeysManager = - crate::wallet::WalletKeysManager>; +pub(crate) type KeysManager = crate::wallet::WalletKeysManager< + bdk::database::SqliteDatabase, + Arc, + Arc, +>; pub(crate) type Router = DefaultRouter< Arc, diff --git a/src/wallet.rs b/src/wallet.rs index 4221dc084..d249581b6 100644 --- a/src/wallet.rs +++ b/src/wallet.rs @@ -31,9 +31,10 @@ use std::ops::Deref; use std::sync::{Arc, Condvar, Mutex, RwLock}; use std::time::Duration; -pub struct Wallet +pub struct Wallet where D: BatchDatabase, + B::Target: BroadcasterInterface, L::Target: Logger, { // A BDK blockchain used for wallet sync. @@ -41,25 +42,25 @@ where // A BDK on-chain wallet. inner: Mutex>, // A cache storing the most recently retrieved fee rate estimations. + broadcaster: B, fee_rate_cache: RwLock>, - runtime: Arc>>, sync_lock: (Mutex<()>, Condvar), logger: L, } -impl Wallet +impl Wallet where D: BatchDatabase, + B::Target: BroadcasterInterface, L::Target: Logger, { pub(crate) fn new( - blockchain: EsploraBlockchain, wallet: bdk::Wallet, - runtime: Arc>>, logger: L, + blockchain: EsploraBlockchain, wallet: bdk::Wallet, broadcaster: B, logger: L, ) -> Self { let inner = Mutex::new(wallet); let fee_rate_cache = RwLock::new(HashMap::new()); let sync_lock = (Mutex::new(()), Condvar::new()); - Self { blockchain, inner, fee_rate_cache, runtime, sync_lock, logger } + Self { blockchain, inner, broadcaster, fee_rate_cache, sync_lock, logger } } pub(crate) async fn sync(&self) -> Result<(), Error> { @@ -275,7 +276,7 @@ where psbt.extract_tx() }; - self.broadcast_transactions(&[&tx]); + self.broadcaster.broadcast_transactions(&[&tx]); let txid = tx.txid(); @@ -319,9 +320,10 @@ where } } -impl FeeEstimator for Wallet +impl FeeEstimator for Wallet where D: BatchDatabase, + B::Target: BroadcasterInterface, L::Target: Logger, { fn get_est_sat_per_1000_weight(&self, confirmation_target: ConfirmationTarget) -> u32 { @@ -330,60 +332,23 @@ where } } -impl BroadcasterInterface for Wallet -where - D: BatchDatabase, - L::Target: Logger, -{ - fn broadcast_transactions(&self, txs: &[&Transaction]) { - let locked_runtime = self.runtime.read().unwrap(); - if locked_runtime.as_ref().is_none() { - log_error!(self.logger, "Failed to broadcast transaction: No runtime."); - return; - } - - let errors = tokio::task::block_in_place(move || { - locked_runtime.as_ref().unwrap().block_on(async move { - let mut handles = Vec::new(); - let mut errors = Vec::new(); - - for tx in txs { - handles.push((tx.txid(), self.blockchain.broadcast(tx))); - } - - for handle in handles { - match handle.1.await { - Ok(_) => {} - Err(e) => { - errors.push((e, handle.0)); - } - } - } - errors - }) - }); - - for (e, txid) in errors { - log_error!(self.logger, "Failed to broadcast transaction {}: {}", txid, e); - } - } -} - /// Similar to [`KeysManager`], but overrides the destination and shutdown scripts so they are /// directly spendable by the BDK wallet. -pub struct WalletKeysManager +pub struct WalletKeysManager where D: BatchDatabase, + B::Target: BroadcasterInterface, L::Target: Logger, { inner: KeysManager, - wallet: Arc>, + wallet: Arc>, logger: L, } -impl WalletKeysManager +impl WalletKeysManager where D: BatchDatabase, + B::Target: BroadcasterInterface, L::Target: Logger, { /// Constructs a `WalletKeysManager` that overrides the destination and shutdown scripts. @@ -392,7 +357,7 @@ where /// `starting_time_nanos`. pub fn new( seed: &[u8; 32], starting_time_secs: u64, starting_time_nanos: u32, - wallet: Arc>, logger: L, + wallet: Arc>, logger: L, ) -> Self { let inner = KeysManager::new(seed, starting_time_secs, starting_time_nanos); Self { inner, wallet, logger } @@ -434,9 +399,10 @@ where } } -impl NodeSigner for WalletKeysManager +impl NodeSigner for WalletKeysManager where D: BatchDatabase, + B::Target: BroadcasterInterface, L::Target: Logger, { fn get_node_id(&self, recipient: Recipient) -> Result { @@ -476,9 +442,10 @@ where } } -impl EntropySource for WalletKeysManager +impl EntropySource for WalletKeysManager where D: BatchDatabase, + B::Target: BroadcasterInterface, L::Target: Logger, { fn get_secure_random_bytes(&self) -> [u8; 32] { @@ -486,9 +453,10 @@ where } } -impl SignerProvider for WalletKeysManager +impl SignerProvider for WalletKeysManager where D: BatchDatabase, + B::Target: BroadcasterInterface, L::Target: Logger, { type Signer = InMemorySigner; From 6a70348c8b11f1751096750077ea179a6db5c05e Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Mon, 27 Nov 2023 13:57:18 +0100 Subject: [PATCH 3/5] Retry broadcasts after 500ms if they failed due to an HTTP error Due to the rate-limiting applied by many Esplora servers we often receive HTTP 429 ('too many requests') errors during syncing. Here, we simply give broadcasting transactions a second chance after a slight delay of 500ms to ease the pain of immediate failures. Generally, rebroadcasting will then be initiated by the `OuputSweeper`. --- src/tx_broadcaster.rs | 48 +++++++++++++++++++++++++++++++++++-------- 1 file changed, 39 insertions(+), 9 deletions(-) diff --git a/src/tx_broadcaster.rs b/src/tx_broadcaster.rs index 3cba97b23..9ecf2a50b 100644 --- a/src/tx_broadcaster.rs +++ b/src/tx_broadcaster.rs @@ -1,4 +1,4 @@ -use crate::logger::{log_error, log_trace, Logger}; +use crate::logger::{log_debug, log_error, log_trace, Logger}; use lightning::chain::chaininterface::BroadcasterInterface; @@ -10,6 +10,7 @@ use tokio::sync::mpsc; use tokio::sync::Mutex; use std::ops::Deref; +use std::time::Duration; const BCAST_PACKAGE_QUEUE_SIZE: usize = 50; @@ -40,14 +41,43 @@ where Ok(()) => { log_trace!(self.logger, "Successfully broadcast transaction {}", tx.txid()); } - Err(e) => { - log_error!( - self.logger, - "Failed to broadcast transaction {}: {}", - tx.txid(), - e - ); - } + Err(e) => match e { + esplora_client::Error::Reqwest(_) => { + // Wait 500 ms and retry in case we get a `Reqwest` error (typically + // 429) + tokio::time::sleep(Duration::from_millis(500)).await; + log_error!( + self.logger, + "Sync failed due to HTTP connection error, retrying: {}", + e + ); + match self.esplora_client.broadcast(tx).await { + Ok(()) => { + log_debug!( + self.logger, + "Successfully broadcast transaction {}", + tx.txid() + ); + } + Err(e) => { + log_error!( + self.logger, + "Failed to broadcast transaction {}: {}", + tx.txid(), + e + ); + } + } + } + _ => { + log_error!( + self.logger, + "Failed to broadcast transaction {}: {}", + tx.txid(), + e + ); + } + }, } } } From 372a520fde2890dbd0fa36badeac75116f807f2b Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Mon, 4 Dec 2023 19:18:59 +0100 Subject: [PATCH 4/5] Log transaction bytes on broadcast failure --- src/logger.rs | 2 +- src/tx_broadcaster.rs | 13 ++++++++++++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/src/logger.rs b/src/logger.rs index cab62c61d..b0504a354 100644 --- a/src/logger.rs +++ b/src/logger.rs @@ -1,5 +1,5 @@ pub(crate) use lightning::util::logger::Logger; -pub(crate) use lightning::{log_debug, log_error, log_info, log_trace}; +pub(crate) use lightning::{log_bytes, log_debug, log_error, log_info, log_trace}; use lightning::util::logger::{Level, Record}; use lightning::util::ser::Writer; diff --git a/src/tx_broadcaster.rs b/src/tx_broadcaster.rs index 9ecf2a50b..4e56cbafe 100644 --- a/src/tx_broadcaster.rs +++ b/src/tx_broadcaster.rs @@ -1,6 +1,7 @@ -use crate::logger::{log_debug, log_error, log_trace, Logger}; +use crate::logger::{log_bytes, log_debug, log_error, log_trace, Logger}; use lightning::chain::chaininterface::BroadcasterInterface; +use lightning::util::ser::Writeable; use esplora_client::AsyncClient as EsploraClient; @@ -66,6 +67,11 @@ where tx.txid(), e ); + log_trace!( + self.logger, + "Failed broadcast transaction bytes: {}", + log_bytes!(tx.encode()) + ); } } } @@ -76,6 +82,11 @@ where tx.txid(), e ); + log_trace!( + self.logger, + "Failed broadcast transaction bytes: {}", + log_bytes!(tx.encode()) + ); } }, } From ea31d2d26488d57141fb37998f146a26b958527b Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Thu, 23 Nov 2023 10:58:34 +0100 Subject: [PATCH 5/5] Introduce `OnchainFeeEstimator` We also decouple fee estimation from the BDK on-chain wallet and BDK's corresponding `EsploraBlockchain`. Instead we use the esplora client directly in a dedicated `OnchainFeeEstimator` object. For one, this change is nice as it allows us to move more things out of the `Wallet` thread and corresponding locks. Moreover, it makes things more modular in general, which makes future upgrades and testing easier. --- bindings/ldk_node.udl | 1 + src/builder.rs | 19 ++++-- src/error.rs | 5 ++ src/event.rs | 17 +++-- src/fee_estimator.rs | 131 ++++++++++++++++++++++++++++++++++++++ src/lib.rs | 73 +++++++++++++-------- src/types.rs | 15 +++-- src/wallet.rs | 145 +++++++++--------------------------------- 8 files changed, 248 insertions(+), 158 deletions(-) create mode 100644 src/fee_estimator.rs diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index 85f0865f5..2a961b29a 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -108,6 +108,7 @@ enum NodeError { "ChannelClosingFailed", "ChannelConfigUpdateFailed", "PersistenceFailed", + "FeerateEstimationUpdateFailed", "WalletOperationFailed", "OnchainTxSigningFailed", "MessageSigningFailed", diff --git a/src/builder.rs b/src/builder.rs index 63dfa6f32..e48af2cf3 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -1,4 +1,5 @@ use crate::event::EventQueue; +use crate::fee_estimator::OnchainFeeEstimator; use crate::gossip::GossipSource; use crate::io; use crate::io::sqlite_store::SqliteStore; @@ -465,7 +466,7 @@ fn build_with_store_internal( BuildError::WalletSetupFailed })?; - let (blockchain, tx_sync, tx_broadcaster) = match chain_data_source_config { + let (blockchain, tx_sync, tx_broadcaster, fee_estimator) = match chain_data_source_config { Some(ChainDataSourceConfig::Esplora(server_url)) => { let tx_sync = Arc::new(EsploraSyncClient::new(server_url.clone(), Arc::clone(&logger))); let blockchain = @@ -475,7 +476,9 @@ fn build_with_store_internal( tx_sync.client().clone(), Arc::clone(&logger), )); - (blockchain, tx_sync, tx_broadcaster) + let fee_estimator = + Arc::new(OnchainFeeEstimator::new(tx_sync.client().clone(), Arc::clone(&logger))); + (blockchain, tx_sync, tx_broadcaster, fee_estimator) } None => { // Default to Esplora client. @@ -488,7 +491,9 @@ fn build_with_store_internal( tx_sync.client().clone(), Arc::clone(&logger), )); - (blockchain, tx_sync, tx_broadcaster) + let fee_estimator = + Arc::new(OnchainFeeEstimator::new(tx_sync.client().clone(), Arc::clone(&logger))); + (blockchain, tx_sync, tx_broadcaster, fee_estimator) } }; @@ -497,6 +502,7 @@ fn build_with_store_internal( blockchain, bdk_wallet, Arc::clone(&tx_broadcaster), + Arc::clone(&fee_estimator), Arc::clone(&logger), )); @@ -505,7 +511,7 @@ fn build_with_store_internal( Some(Arc::clone(&tx_sync)), Arc::clone(&tx_broadcaster), Arc::clone(&logger), - Arc::clone(&wallet), + Arc::clone(&fee_estimator), Arc::clone(&kv_store), )); @@ -605,7 +611,7 @@ fn build_with_store_internal( Arc::clone(&keys_manager), Arc::clone(&keys_manager), Arc::clone(&keys_manager), - Arc::clone(&wallet), + Arc::clone(&fee_estimator), Arc::clone(&chain_monitor), Arc::clone(&tx_broadcaster), Arc::clone(&router), @@ -629,7 +635,7 @@ fn build_with_store_internal( best_block: BestBlock::new(genesis_block_hash, 0), }; channelmanager::ChannelManager::new( - Arc::clone(&wallet), + Arc::clone(&fee_estimator), Arc::clone(&chain_monitor), Arc::clone(&tx_broadcaster), Arc::clone(&router), @@ -781,6 +787,7 @@ fn build_with_store_internal( wallet, tx_sync, tx_broadcaster, + fee_estimator, event_queue, channel_manager, chain_monitor, diff --git a/src/error.rs b/src/error.rs index b924fcda3..267239e04 100644 --- a/src/error.rs +++ b/src/error.rs @@ -25,6 +25,8 @@ pub enum Error { ChannelConfigUpdateFailed, /// Persistence failed. PersistenceFailed, + /// A fee rate estimation update failed. + FeerateEstimationUpdateFailed, /// A wallet operation failed. WalletOperationFailed, /// A signing operation for transaction failed. @@ -79,6 +81,9 @@ impl fmt::Display for Error { Self::ChannelClosingFailed => write!(f, "Failed to close channel."), Self::ChannelConfigUpdateFailed => write!(f, "Failed to update channel config."), Self::PersistenceFailed => write!(f, "Failed to persist data."), + Self::FeerateEstimationUpdateFailed => { + write!(f, "Failed to update fee rate estimates.") + } Self::WalletOperationFailed => write!(f, "Failed to conduct wallet operation."), Self::OnchainTxSigningFailed => write!(f, "Failed to sign given transaction."), Self::MessageSigningFailed => write!(f, "Failed to sign given message."), diff --git a/src/event.rs b/src/event.rs index 94fb1f4ca..9cb0f7070 100644 --- a/src/event.rs +++ b/src/event.rs @@ -1,4 +1,4 @@ -use crate::types::{Broadcaster, Wallet}; +use crate::types::{Broadcaster, FeeEstimator, Wallet}; use crate::{ hex_utils, ChannelManager, Config, Error, KeysManager, NetworkGraph, PeerInfo, PeerStore, UserChannelId, @@ -14,7 +14,9 @@ use crate::io::{ }; use crate::logger::{log_debug, log_error, log_info, Logger}; -use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator}; +use lightning::chain::chaininterface::{ + BroadcasterInterface, ConfirmationTarget, FeeEstimator as LDKFeeEstimator, +}; use lightning::events::Event as LdkEvent; use lightning::events::PaymentPurpose; use lightning::impl_writeable_tlv_based_enum; @@ -248,6 +250,7 @@ where wallet: Arc, channel_manager: Arc>, tx_broadcaster: Arc, + fee_estimator: Arc, network_graph: Arc, keys_manager: Arc, payment_store: Arc>, @@ -264,15 +267,17 @@ where pub fn new( event_queue: Arc>, wallet: Arc, channel_manager: Arc>, tx_broadcaster: Arc, - network_graph: Arc, keys_manager: Arc, - payment_store: Arc>, peer_store: Arc>, - runtime: Arc>>, logger: L, config: Arc, + fee_estimator: Arc, network_graph: Arc, + keys_manager: Arc, payment_store: Arc>, + peer_store: Arc>, runtime: Arc>>, + logger: L, config: Arc, ) -> Self { Self { event_queue, wallet, channel_manager, tx_broadcaster, + fee_estimator, network_graph, keys_manager, payment_store, @@ -589,7 +594,7 @@ where let output_descriptors = &outputs.iter().collect::>(); let tx_feerate = self - .wallet + .fee_estimator .get_est_sat_per_1000_weight(ConfirmationTarget::NonAnchorChannelFee); // We set nLockTime to the current height to discourage fee sniping. diff --git a/src/fee_estimator.rs b/src/fee_estimator.rs new file mode 100644 index 000000000..4116c992d --- /dev/null +++ b/src/fee_estimator.rs @@ -0,0 +1,131 @@ +use crate::logger::{log_error, log_trace, Logger}; +use crate::Error; + +use lightning::chain::chaininterface::{ + ConfirmationTarget, FeeEstimator, FEERATE_FLOOR_SATS_PER_KW, +}; + +use bdk::FeeRate; +use esplora_client::AsyncClient as EsploraClient; + +use std::collections::HashMap; +use std::ops::Deref; +use std::sync::RwLock; + +pub(crate) struct OnchainFeeEstimator +where + L::Target: Logger, +{ + fee_rate_cache: RwLock>, + esplora_client: EsploraClient, + logger: L, +} + +impl OnchainFeeEstimator +where + L::Target: Logger, +{ + pub(crate) fn new(esplora_client: EsploraClient, logger: L) -> Self { + let fee_rate_cache = RwLock::new(HashMap::new()); + Self { fee_rate_cache, esplora_client, logger } + } + + pub(crate) async fn update_fee_estimates(&self) -> Result<(), Error> { + let confirmation_targets = vec![ + ConfirmationTarget::OnChainSweep, + ConfirmationTarget::MaxAllowedNonAnchorChannelRemoteFee, + ConfirmationTarget::MinAllowedAnchorChannelRemoteFee, + ConfirmationTarget::MinAllowedNonAnchorChannelRemoteFee, + ConfirmationTarget::AnchorChannelFee, + ConfirmationTarget::NonAnchorChannelFee, + ConfirmationTarget::ChannelCloseMinimum, + ]; + for target in confirmation_targets { + let num_blocks = match target { + ConfirmationTarget::OnChainSweep => 6, + ConfirmationTarget::MaxAllowedNonAnchorChannelRemoteFee => 1, + ConfirmationTarget::MinAllowedAnchorChannelRemoteFee => 1008, + ConfirmationTarget::MinAllowedNonAnchorChannelRemoteFee => 144, + ConfirmationTarget::AnchorChannelFee => 1008, + ConfirmationTarget::NonAnchorChannelFee => 12, + ConfirmationTarget::ChannelCloseMinimum => 144, + }; + + let estimates = self.esplora_client.get_fee_estimates().await.map_err(|e| { + log_error!( + self.logger, + "Failed to retrieve fee rate estimates for {:?}: {}", + target, + e + ); + Error::FeerateEstimationUpdateFailed + })?; + + let converted_estimates = esplora_client::convert_fee_rate(num_blocks, estimates) + .map_err(|e| { + log_error!( + self.logger, + "Failed to convert fee rate estimates for {:?}: {}", + target, + e + ); + Error::FeerateEstimationUpdateFailed + })?; + + let fee_rate = FeeRate::from_sat_per_vb(converted_estimates); + + // LDK 0.0.118 introduced changes to the `ConfirmationTarget` semantics that + // require some post-estimation adjustments to the fee rates, which we do here. + let adjusted_fee_rate = match target { + ConfirmationTarget::MaxAllowedNonAnchorChannelRemoteFee => { + let really_high_prio = fee_rate.as_sat_per_vb() * 10.0; + FeeRate::from_sat_per_vb(really_high_prio) + } + ConfirmationTarget::MinAllowedNonAnchorChannelRemoteFee => { + let slightly_less_than_background = fee_rate.fee_wu(1000) - 250; + FeeRate::from_sat_per_kwu(slightly_less_than_background as f32) + } + _ => fee_rate, + }; + + let mut locked_fee_rate_cache = self.fee_rate_cache.write().unwrap(); + locked_fee_rate_cache.insert(target, adjusted_fee_rate); + log_trace!( + self.logger, + "Fee rate estimation updated for {:?}: {} sats/kwu", + target, + adjusted_fee_rate.fee_wu(1000) + ); + } + Ok(()) + } + + pub(crate) fn estimate_fee_rate(&self, confirmation_target: ConfirmationTarget) -> FeeRate { + let locked_fee_rate_cache = self.fee_rate_cache.read().unwrap(); + + let fallback_sats_kwu = match confirmation_target { + ConfirmationTarget::OnChainSweep => 5000, + ConfirmationTarget::MaxAllowedNonAnchorChannelRemoteFee => 25 * 250, + ConfirmationTarget::MinAllowedAnchorChannelRemoteFee => FEERATE_FLOOR_SATS_PER_KW, + ConfirmationTarget::MinAllowedNonAnchorChannelRemoteFee => FEERATE_FLOOR_SATS_PER_KW, + ConfirmationTarget::AnchorChannelFee => 500, + ConfirmationTarget::NonAnchorChannelFee => 1000, + ConfirmationTarget::ChannelCloseMinimum => 500, + }; + + // We'll fall back on this, if we really don't have any other information. + let fallback_rate = FeeRate::from_sat_per_kwu(fallback_sats_kwu as f32); + + *locked_fee_rate_cache.get(&confirmation_target).unwrap_or(&fallback_rate) + } +} + +impl FeeEstimator for OnchainFeeEstimator +where + L::Target: Logger, +{ + fn get_est_sat_per_1000_weight(&self, confirmation_target: ConfirmationTarget) -> u32 { + (self.estimate_fee_rate(confirmation_target).fee_wu(1000) as u32) + .max(FEERATE_FLOOR_SATS_PER_KW) + } +} diff --git a/src/lib.rs b/src/lib.rs index 697a93fe3..ae0fda1bd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -78,6 +78,7 @@ mod builder; mod error; mod event; +mod fee_estimator; mod gossip; mod hex_utils; pub mod io; @@ -120,8 +121,8 @@ use payment_store::PaymentStore; pub use payment_store::{PaymentDetails, PaymentDirection, PaymentStatus}; use peer_store::{PeerInfo, PeerStore}; use types::{ - Broadcaster, ChainMonitor, ChannelManager, KeysManager, NetworkGraph, PeerManager, Router, - Scorer, Wallet, + Broadcaster, ChainMonitor, ChannelManager, FeeEstimator, KeysManager, NetworkGraph, + PeerManager, Router, Scorer, Wallet, }; pub use types::{ChannelDetails, PeerDetails, UserChannelId}; @@ -290,6 +291,7 @@ pub struct Node { wallet: Arc, tx_sync: Arc>>, tx_broadcaster: Arc, + fee_estimator: Arc, event_queue: Arc>>, channel_manager: Arc>, chain_monitor: Arc>, @@ -324,13 +326,13 @@ impl Node { let runtime = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap(); // Block to ensure we update our fee rate cache once on startup - let wallet = Arc::clone(&self.wallet); + let fee_estimator = Arc::clone(&self.fee_estimator); let sync_logger = Arc::clone(&self.logger); let runtime_ref = &runtime; tokio::task::block_in_place(move || { runtime_ref.block_on(async move { let now = Instant::now(); - match wallet.update_fee_estimates().await { + match fee_estimator.update_fee_estimates().await { Ok(()) => { log_info!( sync_logger, @@ -353,8 +355,6 @@ impl Node { let mut stop_sync = self.stop_receiver.clone(); let onchain_wallet_sync_interval_secs = self.config.onchain_wallet_sync_interval_secs.max(WALLET_SYNC_INTERVAL_MINIMUM_SECS); - let fee_rate_cache_update_interval_secs = - self.config.fee_rate_cache_update_interval_secs.max(WALLET_SYNC_INTERVAL_MINIMUM_SECS); std::thread::spawn(move || { tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on( async move { @@ -363,11 +363,6 @@ impl Node { ); onchain_wallet_sync_interval .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - let mut fee_rate_update_interval = tokio::time::interval(Duration::from_secs( - fee_rate_cache_update_interval_secs, - )); - // We just blocked on updating, so skip the first tick. - fee_rate_update_interval.reset(); loop { tokio::select! { _ = stop_sync.changed() => { @@ -390,29 +385,50 @@ impl Node { } } } - _ = fee_rate_update_interval.tick() => { - let now = Instant::now(); - match wallet.update_fee_estimates().await { - Ok(()) => log_trace!( - sync_logger, - "Background update of fee rate cache finished in {}ms.", - now.elapsed().as_millis() - ), - Err(err) => { - log_error!( - sync_logger, - "Background update of fee rate cache failed: {}", - err - ) - } - } - } } } }, ); }); + let mut stop_fee_updates = self.stop_receiver.clone(); + let fee_update_logger = Arc::clone(&self.logger); + let fee_estimator = Arc::clone(&self.fee_estimator); + let fee_rate_cache_update_interval_secs = + self.config.fee_rate_cache_update_interval_secs.max(WALLET_SYNC_INTERVAL_MINIMUM_SECS); + runtime.spawn(async move { + let mut fee_rate_update_interval = + tokio::time::interval(Duration::from_secs(fee_rate_cache_update_interval_secs)); + // We just blocked on updating, so skip the first tick. + fee_rate_update_interval.reset(); + fee_rate_update_interval + .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + loop { + tokio::select! { + _ = stop_fee_updates.changed() => { + return; + } + _ = fee_rate_update_interval.tick() => { + let now = Instant::now(); + match fee_estimator.update_fee_estimates().await { + Ok(()) => log_trace!( + fee_update_logger, + "Background update of fee rate cache finished in {}ms.", + now.elapsed().as_millis() + ), + Err(err) => { + log_error!( + fee_update_logger, + "Background update of fee rate cache failed: {}", + err + ) + } + } + } + } + } + }); + let tx_sync = Arc::clone(&self.tx_sync); let sync_cman = Arc::clone(&self.channel_manager); let sync_cmon = Arc::clone(&self.chain_monitor); @@ -680,6 +696,7 @@ impl Node { Arc::clone(&self.wallet), Arc::clone(&self.channel_manager), Arc::clone(&self.tx_broadcaster), + Arc::clone(&self.fee_estimator), Arc::clone(&self.network_graph), Arc::clone(&self.keys_manager), Arc::clone(&self.payment_store), diff --git a/src/types.rs b/src/types.rs index 9371f0698..ead397a75 100644 --- a/src/types.rs +++ b/src/types.rs @@ -25,7 +25,7 @@ pub(crate) type ChainMonitor = chainmonitor::ChainMonitor< InMemorySigner, Arc>>, Arc, - Arc, + Arc, Arc, Arc, >; @@ -46,19 +46,26 @@ pub(crate) type ChannelManager = lightning::ln::channelmanager::ChannelManage Arc, Arc, Arc, - Arc, + Arc, Arc, Arc, >; pub(crate) type Broadcaster = crate::tx_broadcaster::TransactionBroadcaster>; -pub(crate) type Wallet = - crate::wallet::Wallet, Arc>; +pub(crate) type FeeEstimator = crate::fee_estimator::OnchainFeeEstimator>; + +pub(crate) type Wallet = crate::wallet::Wallet< + bdk::database::SqliteDatabase, + Arc, + Arc, + Arc, +>; pub(crate) type KeysManager = crate::wallet::WalletKeysManager< bdk::database::SqliteDatabase, Arc, + Arc, Arc, >; diff --git a/src/wallet.rs b/src/wallet.rs index d249581b6..0279225cd 100644 --- a/src/wallet.rs +++ b/src/wallet.rs @@ -2,9 +2,7 @@ use crate::logger::{log_error, log_info, log_trace, Logger}; use crate::Error; -use lightning::chain::chaininterface::{ - BroadcasterInterface, ConfirmationTarget, FeeEstimator, FEERATE_FLOOR_SATS_PER_KW, -}; +use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator}; use lightning::ln::msgs::{DecodeError, UnsignedGossipMessage}; use lightning::ln::script::ShutdownScript; @@ -15,10 +13,11 @@ use lightning::sign::{ use lightning::util::message_signing; -use bdk::blockchain::{Blockchain, EsploraBlockchain}; +use bdk::blockchain::EsploraBlockchain; use bdk::database::BatchDatabase; use bdk::wallet::AddressIndex; -use bdk::{FeeRate, SignOptions, SyncOptions}; +use bdk::FeeRate; +use bdk::{SignOptions, SyncOptions}; use bitcoin::bech32::u5; use bitcoin::secp256k1::ecdh::SharedSecret; @@ -26,15 +25,15 @@ use bitcoin::secp256k1::ecdsa::{RecoverableSignature, Signature}; use bitcoin::secp256k1::{PublicKey, Scalar, Secp256k1, Signing}; use bitcoin::{LockTime, PackedLockTime, Script, Transaction, TxOut, Txid}; -use std::collections::HashMap; use std::ops::Deref; -use std::sync::{Arc, Condvar, Mutex, RwLock}; +use std::sync::{Arc, Condvar, Mutex}; use std::time::Duration; -pub struct Wallet +pub struct Wallet where D: BatchDatabase, B::Target: BroadcasterInterface, + E::Target: FeeEstimator, L::Target: Logger, { // A BDK blockchain used for wallet sync. @@ -43,24 +42,25 @@ where inner: Mutex>, // A cache storing the most recently retrieved fee rate estimations. broadcaster: B, - fee_rate_cache: RwLock>, + fee_estimator: E, sync_lock: (Mutex<()>, Condvar), logger: L, } -impl Wallet +impl Wallet where D: BatchDatabase, B::Target: BroadcasterInterface, + E::Target: FeeEstimator, L::Target: Logger, { pub(crate) fn new( - blockchain: EsploraBlockchain, wallet: bdk::Wallet, broadcaster: B, logger: L, + blockchain: EsploraBlockchain, wallet: bdk::Wallet, broadcaster: B, fee_estimator: E, + logger: L, ) -> Self { let inner = Mutex::new(wallet); - let fee_rate_cache = RwLock::new(HashMap::new()); let sync_lock = (Mutex::new(()), Condvar::new()); - Self { blockchain, inner, broadcaster, fee_rate_cache, sync_lock, logger } + Self { blockchain, inner, broadcaster, fee_estimator, sync_lock, logger } } pub(crate) async fn sync(&self) -> Result<(), Error> { @@ -113,72 +113,13 @@ where res } - pub(crate) async fn update_fee_estimates(&self) -> Result<(), Error> { - let mut locked_fee_rate_cache = self.fee_rate_cache.write().unwrap(); - - let confirmation_targets = vec![ - ConfirmationTarget::OnChainSweep, - ConfirmationTarget::MaxAllowedNonAnchorChannelRemoteFee, - ConfirmationTarget::MinAllowedAnchorChannelRemoteFee, - ConfirmationTarget::MinAllowedNonAnchorChannelRemoteFee, - ConfirmationTarget::AnchorChannelFee, - ConfirmationTarget::NonAnchorChannelFee, - ConfirmationTarget::ChannelCloseMinimum, - ]; - for target in confirmation_targets { - let num_blocks = match target { - ConfirmationTarget::OnChainSweep => 6, - ConfirmationTarget::MaxAllowedNonAnchorChannelRemoteFee => 1, - ConfirmationTarget::MinAllowedAnchorChannelRemoteFee => 1008, - ConfirmationTarget::MinAllowedNonAnchorChannelRemoteFee => 144, - ConfirmationTarget::AnchorChannelFee => 1008, - ConfirmationTarget::NonAnchorChannelFee => 12, - ConfirmationTarget::ChannelCloseMinimum => 144, - }; - - let est_fee_rate = self.blockchain.estimate_fee(num_blocks).await; - - match est_fee_rate { - Ok(rate) => { - // LDK 0.0.118 introduced changes to the `ConfirmationTarget` semantics that - // require some post-estimation adjustments to the fee rates, which we do here. - let adjusted_fee_rate = match target { - ConfirmationTarget::MaxAllowedNonAnchorChannelRemoteFee => { - let really_high_prio = rate.as_sat_per_vb() * 10.0; - FeeRate::from_sat_per_vb(really_high_prio) - } - ConfirmationTarget::MinAllowedNonAnchorChannelRemoteFee => { - let slightly_less_than_background = rate.fee_wu(1000) - 250; - FeeRate::from_sat_per_kwu(slightly_less_than_background as f32) - } - _ => rate, - }; - locked_fee_rate_cache.insert(target, adjusted_fee_rate); - log_trace!( - self.logger, - "Fee rate estimation updated for {:?}: {} sats/kwu", - target, - adjusted_fee_rate.fee_wu(1000) - ); - } - Err(e) => { - log_error!( - self.logger, - "Failed to update fee rate estimation for {:?}: {}", - target, - e - ); - } - } - } - Ok(()) - } - pub(crate) fn create_funding_transaction( &self, output_script: Script, value_sats: u64, confirmation_target: ConfirmationTarget, locktime: LockTime, ) -> Result { - let fee_rate = self.estimate_fee_rate(confirmation_target); + let fee_rate = FeeRate::from_sat_per_kwu( + self.fee_estimator.get_est_sat_per_1000_weight(confirmation_target) as f32, + ); let locked_wallet = self.inner.lock().unwrap(); let mut tx_builder = locked_wallet.build_tx(); @@ -232,7 +173,9 @@ where &self, address: &bitcoin::Address, amount_msat_or_drain: Option, ) -> Result { let confirmation_target = ConfirmationTarget::NonAnchorChannelFee; - let fee_rate = self.estimate_fee_rate(confirmation_target); + let fee_rate = FeeRate::from_sat_per_kwu( + self.fee_estimator.get_est_sat_per_1000_weight(confirmation_target) as f32, + ); let tx = { let locked_wallet = self.inner.lock().unwrap(); @@ -299,56 +242,27 @@ where Ok(txid) } - - fn estimate_fee_rate(&self, confirmation_target: ConfirmationTarget) -> FeeRate { - let locked_fee_rate_cache = self.fee_rate_cache.read().unwrap(); - - let fallback_sats_kwu = match confirmation_target { - ConfirmationTarget::OnChainSweep => 5000, - ConfirmationTarget::MaxAllowedNonAnchorChannelRemoteFee => 25 * 250, - ConfirmationTarget::MinAllowedAnchorChannelRemoteFee => FEERATE_FLOOR_SATS_PER_KW, - ConfirmationTarget::MinAllowedNonAnchorChannelRemoteFee => FEERATE_FLOOR_SATS_PER_KW, - ConfirmationTarget::AnchorChannelFee => 500, - ConfirmationTarget::NonAnchorChannelFee => 1000, - ConfirmationTarget::ChannelCloseMinimum => 500, - }; - - // We'll fall back on this, if we really don't have any other information. - let fallback_rate = FeeRate::from_sat_per_kwu(fallback_sats_kwu as f32); - - *locked_fee_rate_cache.get(&confirmation_target).unwrap_or(&fallback_rate) - } -} - -impl FeeEstimator for Wallet -where - D: BatchDatabase, - B::Target: BroadcasterInterface, - L::Target: Logger, -{ - fn get_est_sat_per_1000_weight(&self, confirmation_target: ConfirmationTarget) -> u32 { - (self.estimate_fee_rate(confirmation_target).fee_wu(1000) as u32) - .max(FEERATE_FLOOR_SATS_PER_KW) - } } /// Similar to [`KeysManager`], but overrides the destination and shutdown scripts so they are /// directly spendable by the BDK wallet. -pub struct WalletKeysManager +pub struct WalletKeysManager where D: BatchDatabase, B::Target: BroadcasterInterface, + E::Target: FeeEstimator, L::Target: Logger, { inner: KeysManager, - wallet: Arc>, + wallet: Arc>, logger: L, } -impl WalletKeysManager +impl WalletKeysManager where D: BatchDatabase, B::Target: BroadcasterInterface, + E::Target: FeeEstimator, L::Target: Logger, { /// Constructs a `WalletKeysManager` that overrides the destination and shutdown scripts. @@ -357,7 +271,7 @@ where /// `starting_time_nanos`. pub fn new( seed: &[u8; 32], starting_time_secs: u64, starting_time_nanos: u32, - wallet: Arc>, logger: L, + wallet: Arc>, logger: L, ) -> Self { let inner = KeysManager::new(seed, starting_time_secs, starting_time_nanos); Self { inner, wallet, logger } @@ -399,10 +313,11 @@ where } } -impl NodeSigner for WalletKeysManager +impl NodeSigner for WalletKeysManager where D: BatchDatabase, B::Target: BroadcasterInterface, + E::Target: FeeEstimator, L::Target: Logger, { fn get_node_id(&self, recipient: Recipient) -> Result { @@ -442,10 +357,11 @@ where } } -impl EntropySource for WalletKeysManager +impl EntropySource for WalletKeysManager where D: BatchDatabase, B::Target: BroadcasterInterface, + E::Target: FeeEstimator, L::Target: Logger, { fn get_secure_random_bytes(&self) -> [u8; 32] { @@ -453,10 +369,11 @@ where } } -impl SignerProvider for WalletKeysManager +impl SignerProvider for WalletKeysManager where D: BatchDatabase, B::Target: BroadcasterInterface, + E::Target: FeeEstimator, L::Target: Logger, { type Signer = InMemorySigner;