diff --git a/Cargo.toml b/Cargo.toml index 9829b7cb7..6e404bd92 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,17 +52,17 @@ default = [] #lightning-liquidity = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main" } #lightning-macros = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main" } -lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "3e21ba37a133977d4247e86f25df983b39326994", features = ["std"] } -lightning-types = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "3e21ba37a133977d4247e86f25df983b39326994" } -lightning-invoice = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "3e21ba37a133977d4247e86f25df983b39326994", features = ["std"] } -lightning-net-tokio = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "3e21ba37a133977d4247e86f25df983b39326994" } -lightning-persister = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "3e21ba37a133977d4247e86f25df983b39326994", features = ["tokio"] } -lightning-background-processor = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "3e21ba37a133977d4247e86f25df983b39326994" } -lightning-rapid-gossip-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "3e21ba37a133977d4247e86f25df983b39326994" } -lightning-block-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "3e21ba37a133977d4247e86f25df983b39326994", features = ["rest-client", "rpc-client", "tokio"] } -lightning-transaction-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "3e21ba37a133977d4247e86f25df983b39326994", features = ["esplora-async-https", "electrum-rustls-ring", "time"] } -lightning-liquidity = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "3e21ba37a133977d4247e86f25df983b39326994" } -lightning-macros = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "3e21ba37a133977d4247e86f25df983b39326994" } +lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "d0765847c85f1c3dc753c17c3e05dbcb1d300204", features = ["std"] } +lightning-types = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "d0765847c85f1c3dc753c17c3e05dbcb1d300204" } +lightning-invoice = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "d0765847c85f1c3dc753c17c3e05dbcb1d300204", features = ["std"] } +lightning-net-tokio = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "d0765847c85f1c3dc753c17c3e05dbcb1d300204" } +lightning-persister = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "d0765847c85f1c3dc753c17c3e05dbcb1d300204", features = ["tokio"] } +lightning-background-processor = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "d0765847c85f1c3dc753c17c3e05dbcb1d300204" } +lightning-rapid-gossip-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "d0765847c85f1c3dc753c17c3e05dbcb1d300204" } +lightning-block-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "d0765847c85f1c3dc753c17c3e05dbcb1d300204", features = ["rest-client", "rpc-client", "tokio"] } +lightning-transaction-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "d0765847c85f1c3dc753c17c3e05dbcb1d300204", features = ["esplora-async-https", "electrum-rustls-ring", "time"] } +lightning-liquidity = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "d0765847c85f1c3dc753c17c3e05dbcb1d300204" } +lightning-macros = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "d0765847c85f1c3dc753c17c3e05dbcb1d300204" } #lightning = { path = "../rust-lightning/lightning", features = ["std"] } #lightning-types = { path = "../rust-lightning/lightning-types" } @@ -109,7 +109,7 @@ winapi = { version = "0.3", features = ["winbase"] } [dev-dependencies] #lightning = { version = "0.1.0", features = ["std", "_test_utils"] } #lightning = { git = "https://github.com/lightningdevkit/rust-lightning", branch="main", features = ["std", "_test_utils"] } -lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "3e21ba37a133977d4247e86f25df983b39326994", features = ["std", "_test_utils"] } +lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "d0765847c85f1c3dc753c17c3e05dbcb1d300204", features = ["std", "_test_utils"] } #lightning = { path = "../rust-lightning/lightning", features = ["std", "_test_utils"] } proptest = "1.0.0" regex = "1.5.6" diff --git a/src/builder.rs b/src/builder.rs index 3396a52a0..0f627a2fe 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -1557,6 +1557,7 @@ fn build_with_store_internal( Arc::clone(&channel_manager), Arc::clone(&keys_manager), Arc::clone(&chain_source), + Arc::clone(&kv_store), Arc::clone(&config), Arc::clone(&logger), ); @@ -1590,7 +1591,8 @@ fn build_with_store_internal( liquidity_source_builder.lsps2_service(promise_secret, config.clone()) }); - let liquidity_source = Arc::new(liquidity_source_builder.build()); + let liquidity_source = runtime + .block_on(async move { liquidity_source_builder.build().await.map(Arc::new) })?; let custom_message_handler = Arc::new(NodeCustomMessageHandler::new_liquidity(Arc::clone(&liquidity_source))); (Some(liquidity_source), custom_message_handler) diff --git a/src/event.rs b/src/event.rs index 824cba694..df6649e05 100644 --- a/src/event.rs +++ b/src/event.rs @@ -1046,7 +1046,7 @@ where LdkEvent::ProbeFailed { .. } => {}, LdkEvent::HTLCHandlingFailed { failure_type, .. } => { if let Some(liquidity_source) = self.liquidity_source.as_ref() { - liquidity_source.handle_htlc_handling_failed(failure_type); + liquidity_source.handle_htlc_handling_failed(failure_type).await; } }, LdkEvent::SpendableOutputs { outputs, channel_id } => { @@ -1229,40 +1229,46 @@ where claim_from_onchain_tx, outbound_amount_forwarded_msat, } => { - let read_only_network_graph = self.network_graph.read_only(); - let nodes = read_only_network_graph.nodes(); - let channels = self.channel_manager.list_channels(); - - let node_str = |channel_id: &Option| { - channel_id - .and_then(|channel_id| channels.iter().find(|c| c.channel_id == channel_id)) - .and_then(|channel| { - nodes.get(&NodeId::from_pubkey(&channel.counterparty.node_id)) - }) - .map_or("private_node".to_string(), |node| { - node.announcement_info - .as_ref() - .map_or("unnamed node".to_string(), |ann| { - format!("node {}", ann.alias()) - }) - }) - }; - let channel_str = |channel_id: &Option| { - channel_id - .map(|channel_id| format!(" with channel {}", channel_id)) - .unwrap_or_default() - }; - let from_prev_str = format!( - " from {}{}", - node_str(&prev_channel_id), - channel_str(&prev_channel_id) - ); - let to_next_str = - format!(" to {}{}", node_str(&next_channel_id), channel_str(&next_channel_id)); + { + let read_only_network_graph = self.network_graph.read_only(); + let nodes = read_only_network_graph.nodes(); + let channels = self.channel_manager.list_channels(); + + let node_str = |channel_id: &Option| { + channel_id + .and_then(|channel_id| { + channels.iter().find(|c| c.channel_id == channel_id) + }) + .and_then(|channel| { + nodes.get(&NodeId::from_pubkey(&channel.counterparty.node_id)) + }) + .map_or("private_node".to_string(), |node| { + node.announcement_info + .as_ref() + .map_or("unnamed node".to_string(), |ann| { + format!("node {}", ann.alias()) + }) + }) + }; + let channel_str = |channel_id: &Option| { + channel_id + .map(|channel_id| format!(" with channel {}", channel_id)) + .unwrap_or_default() + }; + let from_prev_str = format!( + " from {}{}", + node_str(&prev_channel_id), + channel_str(&prev_channel_id) + ); + let to_next_str = format!( + " to {}{}", + node_str(&next_channel_id), + channel_str(&next_channel_id) + ); - let fee_earned = total_fee_earned_msat.unwrap_or(0); - if claim_from_onchain_tx { - log_info!( + let fee_earned = total_fee_earned_msat.unwrap_or(0); + if claim_from_onchain_tx { + log_info!( self.logger, "Forwarded payment{}{} of {}msat, earning {}msat in fees from claiming onchain.", from_prev_str, @@ -1270,19 +1276,20 @@ where outbound_amount_forwarded_msat.unwrap_or(0), fee_earned, ); - } else { - log_info!( - self.logger, - "Forwarded payment{}{} of {}msat, earning {}msat in fees.", - from_prev_str, - to_next_str, - outbound_amount_forwarded_msat.unwrap_or(0), - fee_earned, - ); + } else { + log_info!( + self.logger, + "Forwarded payment{}{} of {}msat, earning {}msat in fees.", + from_prev_str, + to_next_str, + outbound_amount_forwarded_msat.unwrap_or(0), + fee_earned, + ); + } } if let Some(liquidity_source) = self.liquidity_source.as_ref() { - liquidity_source.handle_payment_forwarded(next_channel_id); + liquidity_source.handle_payment_forwarded(next_channel_id).await; } let event = Event::PaymentForwarded { @@ -1375,11 +1382,9 @@ where ); if let Some(liquidity_source) = self.liquidity_source.as_ref() { - liquidity_source.handle_channel_ready( - user_channel_id, - &channel_id, - &counterparty_node_id, - ); + liquidity_source + .handle_channel_ready(user_channel_id, &channel_id, &counterparty_node_id) + .await; } let event = Event::ChannelReady { @@ -1428,12 +1433,14 @@ where .. } => { if let Some(liquidity_source) = self.liquidity_source.as_ref() { - liquidity_source.handle_htlc_intercepted( - requested_next_hop_scid, - intercept_id, - expected_outbound_amount_msat, - payment_hash, - ); + liquidity_source + .handle_htlc_intercepted( + requested_next_hop_scid, + intercept_id, + expected_outbound_amount_msat, + payment_hash, + ) + .await; } }, LdkEvent::InvoiceReceived { .. } => { diff --git a/src/liquidity.rs b/src/liquidity.rs index ae31f9ace..a09848b38 100644 --- a/src/liquidity.rs +++ b/src/liquidity.rs @@ -38,11 +38,12 @@ use lightning_types::payment::PaymentHash; use rand::Rng; use tokio::sync::oneshot; +use crate::builder::BuildError; use crate::chain::ChainSource; use crate::connection::ConnectionManager; use crate::logger::{log_debug, log_error, log_info, LdkLogger, Logger}; use crate::runtime::Runtime; -use crate::types::{ChannelManager, KeysManager, LiquidityManager, PeerManager, Wallet}; +use crate::types::{ChannelManager, DynStore, KeysManager, LiquidityManager, PeerManager, Wallet}; use crate::{total_anchor_channels_reserve_sats, Config, Error}; const LIQUIDITY_REQUEST_TIMEOUT_SECS: u64 = 5; @@ -140,6 +141,7 @@ where channel_manager: Arc, keys_manager: Arc, chain_source: Arc, + kv_store: Arc, config: Arc, logger: L, } @@ -150,7 +152,7 @@ where { pub(crate) fn new( wallet: Arc, channel_manager: Arc, keys_manager: Arc, - chain_source: Arc, config: Arc, logger: L, + chain_source: Arc, kv_store: Arc, config: Arc, logger: L, ) -> Self { let lsps1_client = None; let lsps2_client = None; @@ -163,6 +165,7 @@ where channel_manager, keys_manager, chain_source, + kv_store, config, logger, } @@ -213,7 +216,7 @@ where self } - pub(crate) fn build(self) -> LiquiditySource { + pub(crate) async fn build(self) -> Result, BuildError> { let liquidity_service_config = self.lsps2_service.as_ref().map(|s| { let lsps2_service_config = Some(s.ldk_service_config.clone()); let lsps5_service_config = None; @@ -230,17 +233,22 @@ where lsps5_client_config, }); - let liquidity_manager = Arc::new(LiquidityManager::new( - Arc::clone(&self.keys_manager), - Arc::clone(&self.keys_manager), - Arc::clone(&self.channel_manager), - Some(Arc::clone(&self.chain_source)), - None, - liquidity_service_config, - liquidity_client_config, - )); + let liquidity_manager = Arc::new( + LiquidityManager::new( + Arc::clone(&self.keys_manager), + Arc::clone(&self.keys_manager), + Arc::clone(&self.channel_manager), + Some(Arc::clone(&self.chain_source)), + None, + Arc::clone(&self.kv_store), + liquidity_service_config, + liquidity_client_config, + ) + .await + .map_err(|_| BuildError::ReadFailed)?, + ); - LiquiditySource { + Ok(LiquiditySource { lsps1_client: self.lsps1_client, lsps2_client: self.lsps2_client, lsps2_service: self.lsps2_service, @@ -251,7 +259,7 @@ where liquidity_manager, config: self.config, logger: self.logger, - } + }) } } @@ -574,14 +582,17 @@ where } } - match lsps2_service_handler.invoice_parameters_generated( - &counterparty_node_id, - request_id, - intercept_scid, - LSPS2_CHANNEL_CLTV_EXPIRY_DELTA, - LSPS2_CLIENT_TRUSTS_LSP_MODE, - user_channel_id, - ) { + match lsps2_service_handler + .invoice_parameters_generated( + &counterparty_node_id, + request_id, + intercept_scid, + LSPS2_CHANNEL_CLTV_EXPIRY_DELTA, + LSPS2_CLIENT_TRUSTS_LSP_MODE, + user_channel_id, + ) + .await + { Ok(()) => {}, Err(e) => { log_error!( @@ -1239,15 +1250,14 @@ where }) } - pub(crate) fn handle_channel_ready( + pub(crate) async fn handle_channel_ready( &self, user_channel_id: u128, channel_id: &ChannelId, counterparty_node_id: &PublicKey, ) { if let Some(lsps2_service_handler) = self.liquidity_manager.lsps2_service_handler() { - if let Err(e) = lsps2_service_handler.channel_ready( - user_channel_id, - channel_id, - counterparty_node_id, - ) { + if let Err(e) = lsps2_service_handler + .channel_ready(user_channel_id, channel_id, counterparty_node_id) + .await + { log_error!( self.logger, "LSPS2 service failed to handle ChannelReady event: {:?}", @@ -1257,17 +1267,20 @@ where } } - pub(crate) fn handle_htlc_intercepted( + pub(crate) async fn handle_htlc_intercepted( &self, intercept_scid: u64, intercept_id: InterceptId, expected_outbound_amount_msat: u64, payment_hash: PaymentHash, ) { if let Some(lsps2_service_handler) = self.liquidity_manager.lsps2_service_handler() { - if let Err(e) = lsps2_service_handler.htlc_intercepted( - intercept_scid, - intercept_id, - expected_outbound_amount_msat, - payment_hash, - ) { + if let Err(e) = lsps2_service_handler + .htlc_intercepted( + intercept_scid, + intercept_id, + expected_outbound_amount_msat, + payment_hash, + ) + .await + { log_error!( self.logger, "LSPS2 service failed to handle HTLCIntercepted event: {:?}", @@ -1277,9 +1290,9 @@ where } } - pub(crate) fn handle_htlc_handling_failed(&self, failure_type: HTLCHandlingFailureType) { + pub(crate) async fn handle_htlc_handling_failed(&self, failure_type: HTLCHandlingFailureType) { if let Some(lsps2_service_handler) = self.liquidity_manager.lsps2_service_handler() { - if let Err(e) = lsps2_service_handler.htlc_handling_failed(failure_type) { + if let Err(e) = lsps2_service_handler.htlc_handling_failed(failure_type).await { log_error!( self.logger, "LSPS2 service failed to handle HTLCHandlingFailed event: {:?}", @@ -1289,10 +1302,10 @@ where } } - pub(crate) fn handle_payment_forwarded(&self, next_channel_id: Option) { + pub(crate) async fn handle_payment_forwarded(&self, next_channel_id: Option) { if let Some(next_channel_id) = next_channel_id { if let Some(lsps2_service_handler) = self.liquidity_manager.lsps2_service_handler() { - if let Err(e) = lsps2_service_handler.payment_forwarded(next_channel_id) { + if let Err(e) = lsps2_service_handler.payment_forwarded(next_channel_id).await { log_error!( self.logger, "LSPS2 service failed to handle PaymentForwarded: {:?}", diff --git a/src/types.rs b/src/types.rs index 4f5229dd2..ccfde2766 100644 --- a/src/types.rs +++ b/src/types.rs @@ -75,6 +75,7 @@ pub(crate) type LiquidityManager = lightning_liquidity::LiquidityManager< Arc, Arc, Arc, + Arc, Arc, >;