diff --git a/Cargo.toml b/Cargo.toml index 59ad2b767..cd28e706f 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,7 +56,7 @@ bdk_esplora = { version = "0.22.0", default-features = false, features = ["async bdk_electrum = { version = "0.23.0", default-features = false, features = ["use-rustls-ring"]} bdk_wallet = { version = "2.2.0", default-features = false, features = ["std", "keys-bip39"]} -reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] } +reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls", "blocking"] } rustls = { version = "0.23", default-features = false } rusqlite = { version = "0.31.0", features = ["bundled"] } bitcoin = "0.32.7" diff --git a/src/builder.rs b/src/builder.rs index ff84505b4..983d5e3aa 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -61,7 +61,8 @@ use crate::io::{ self, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, }; use crate::liquidity::{ - LSPS1ClientConfig, LSPS2ClientConfig, LSPS2ServiceConfig, LiquiditySourceBuilder, + LSPS1ClientConfig, LSPS2ClientConfig, LSPS2ServiceConfig, LSPS5ClientConfig, + LSPS5ServiceConfig, LiquiditySourceBuilder, }; use crate::logger::{log_error, LdkLogger, LogLevel, LogWriter, Logger}; use crate::message_handler::NodeCustomMessageHandler; @@ -119,6 +120,10 @@ struct LiquiditySourceConfig { lsps2_client: Option, // Act as an LSPS2 service. lsps2_service: Option, + // Act as an LSPS5 client connecting to the given service. + lsps5_client: Option, + // Act as an LSPS5 service. + lsps5_service: Option, } #[derive(Clone)] @@ -444,6 +449,30 @@ impl NodeBuilder { self } + /// Configures the [`Node`] instance to source inbound liquidity from the given + /// [bLIP-137 / LSPS5] service. + /// + /// [bLIP-137 / LSPS5]: https://github.com/lightning/blips/blob/master/blip-0055.md + pub fn set_liquidity_source_lsps5( + &mut self, node_id: PublicKey, address: SocketAddress, + ) -> &mut Self { + let liquidity_source_config = + self.liquidity_source_config.get_or_insert(LiquiditySourceConfig::default()); + let lsps5_client_config = LSPS5ClientConfig { node_id, address }; + liquidity_source_config.lsps5_client = Some(lsps5_client_config); + self + } + + /// Configures the [`Node`] instance to provide an LSPS5 service + pub fn set_liquidity_provider_lsps5( + &mut self, service_config: LSPS5ServiceConfig, + ) -> &mut Self { + let liquidity_source_config = + self.liquidity_source_config.get_or_insert(LiquiditySourceConfig::default()); + liquidity_source_config.lsps5_service = Some(service_config); + self + } + /// Sets the used storage directory path. pub fn set_storage_dir_path(&mut self, storage_dir_path: String) -> &mut Self { self.config.storage_dir_path = storage_dir_path; @@ -1547,6 +1576,14 @@ fn build_with_store_internal( liquidity_source_builder.lsps2_service(promise_secret, config.clone()) }); + lsc.lsps5_client.as_ref().map(|config| { + liquidity_source_builder.lsps5_client(config.node_id, config.address.clone()) + }); + + lsc.lsps5_service + .as_ref() + .map(|config| liquidity_source_builder.lsps5_service(config.clone())); + let liquidity_source = runtime .block_on(async move { liquidity_source_builder.build().await.map(Arc::new) })?; let custom_message_handler = diff --git a/src/error.rs b/src/error.rs index 20b1cceab..109c6393d 100644 --- a/src/error.rs +++ b/src/error.rs @@ -127,6 +127,10 @@ pub enum Error { InvalidBlindedPaths, /// Asynchronous payment services are disabled. AsyncPaymentServicesDisabled, + /// Failed to set a webhook with the LSP. + LiquiditySetWebhookFailed, + /// Failed to remove a webhook with the LSP. + LiquidityRemoveWebhookFailed, } impl fmt::Display for Error { @@ -205,6 +209,12 @@ impl fmt::Display for Error { Self::AsyncPaymentServicesDisabled => { write!(f, "Asynchronous payment services are disabled.") }, + Self::LiquiditySetWebhookFailed => { + write!(f, "Failed to set a webhook with the LSP.") + }, + Self::LiquidityRemoveWebhookFailed => { + write!(f, "Failed to remove a webhook with the LSP.") + }, } } } diff --git a/src/lib.rs b/src/lib.rs index fdaa0f4f1..4ab44abc8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -146,7 +146,7 @@ use lightning::ln::msgs::SocketAddress; use lightning::routing::gossip::NodeAlias; use lightning::util::persist::KVStoreSync; use lightning_background_processor::process_events_async; -use liquidity::{LSPS1Liquidity, LiquiditySource}; +use liquidity::{LSPS1Liquidity, LSPS5Liquidity, LiquiditySource}; use logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger}; use payment::asynchronous::om_mailbox::OnionMessageMailbox; use payment::asynchronous::static_invoice_store::StaticInvoiceStore; @@ -1004,6 +1004,32 @@ impl Node { )) } + /// Returns a liquidity handler allowing to handle webhooks and notifications via the [bLIP-55 / LSPS5] protocol. + /// + /// [bLIP-55 / LSPS5]: https://github.com/lightning/blips/blob/master/blip-0055.md + #[cfg(not(feature = "uniffi"))] + pub fn lsps5_liquidity(&self) -> LSPS5Liquidity { + LSPS5Liquidity::new( + Arc::clone(&self.runtime), + Arc::clone(&self.connection_manager), + self.liquidity_source.clone(), + Arc::clone(&self.logger), + ) + } + + /// Returns a liquidity handler allowing to handle webhooks and notifications via the [bLIP-55 / LSPS5] protocol. + /// + /// [bLIP-55 / LSPS5]: https://github.com/lightning/blips/blob/master/blip-0055.md + #[cfg(feature = "uniffi")] + pub fn lsps5_liquidity(&self) -> Arc { + Arc::new(LSPS5Liquidity::new( + Arc::clone(&self.runtime), + Arc::clone(&self.connection_manager), + self.liquidity_source.clone(), + Arc::clone(&self.logger), + )) + } + /// Retrieve a list of known channels. pub fn list_channels(&self) -> Vec { self.channel_manager.list_channels().into_iter().map(|c| c.into()).collect() diff --git a/src/liquidity.rs b/src/liquidity.rs index 74e6098dd..1da3ac148 100644 --- a/src/liquidity.rs +++ b/src/liquidity.rs @@ -34,6 +34,12 @@ use lightning_liquidity::lsps2::event::{LSPS2ClientEvent, LSPS2ServiceEvent}; use lightning_liquidity::lsps2::msgs::{LSPS2OpeningFeeParams, LSPS2RawOpeningFeeParams}; use lightning_liquidity::lsps2::service::LSPS2ServiceConfig as LdkLSPS2ServiceConfig; use lightning_liquidity::lsps2::utils::compute_opening_fee; +use lightning_liquidity::lsps5::client::LSPS5ClientConfig as LdkLSPS5ClientConfig; +use lightning_liquidity::lsps5::event::{LSPS5ClientEvent, LSPS5ServiceEvent}; +use lightning_liquidity::lsps5::msgs::{ + LSPS5Error, ListWebhooksResponse, RemoveWebhookResponse, SetWebhookResponse, +}; +use lightning_liquidity::lsps5::service::LSPS5ServiceConfig as LdkLSPS5ServiceConfig; use lightning_liquidity::{LiquidityClientConfig, LiquidityServiceConfig}; use lightning_types::payment::PaymentHash; use rand::Rng; @@ -54,6 +60,77 @@ const LIQUIDITY_REQUEST_TIMEOUT_SECS: u64 = 5; const LSPS2_GETINFO_REQUEST_EXPIRY: Duration = Duration::from_secs(60 * 60 * 24); const LSPS2_CHANNEL_CLTV_EXPIRY_DELTA: u32 = 72; +/// Error type for HTTP client operations. +#[derive(Debug)] +pub enum HttpClientError { + /// Network or connection error. + Network(String), + /// HTTP status error. + Status(u16), + /// Other error. + Other(String), +} + +impl std::fmt::Display for HttpClientError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + HttpClientError::Network(msg) => write!(f, "Network error: {}", msg), + HttpClientError::Status(code) => write!(f, "HTTP error: {}", code), + HttpClientError::Other(msg) => write!(f, "HTTP client error: {}", msg), + } + } +} + +impl std::error::Error for HttpClientError {} + +/// Trait for making HTTP requests. +pub trait HttpClient: Send + Sync + std::fmt::Debug { + /// Make a POST request to the specified URL with headers and body. + fn post( + &self, url: &str, headers: &HashMap, body: &str, + ) -> Result<(), HttpClientError>; +} + +/// Default HTTP client implementation using reqwest. +#[derive(Debug, Clone)] +pub struct DefaultHttpClient; + +impl HttpClient for DefaultHttpClient { + fn post( + &self, url: &str, headers: &HashMap, body: &str, + ) -> Result<(), HttpClientError> { + let url = url.to_string(); + let headers = headers.clone(); + let body = body.to_string(); + + let handle = std::thread::spawn(move || -> Result<(), HttpClientError> { + let client = reqwest::blocking::Client::builder() + .timeout(Duration::from_secs(5)) + .build() + .map_err(|e| HttpClientError::Network(e.to_string()))?; + + let mut request = client.post(&url); + + for (key, value) in &headers { + request = request.header(key, value); + } + + let response = + request.body(body).send().map_err(|e| HttpClientError::Network(e.to_string()))?; + + if response.status().is_success() { + Ok(()) + } else { + Err(HttpClientError::Status(response.status().as_u16())) + } + }); + + handle + .join() + .map_err(|_| HttpClientError::Other("HTTP client thread panicked".to_string()))? + } +} + struct LSPS1Client { lsp_node_id: PublicKey, lsp_address: SocketAddress, @@ -145,6 +222,62 @@ pub struct LSPS2ServiceConfig { pub client_trusts_lsp: bool, } +struct LSPS5Client { + lsp_node_id: PublicKey, + lsp_address: SocketAddress, + ldk_client_config: LdkLSPS5ClientConfig, + pending_set_webhook_requests: + Mutex>>>, + pending_list_webhooks_requests: + Mutex>>>, + pending_remove_webhook_requests: + Mutex>>>, +} + +#[derive(Debug, Clone)] +pub(crate) struct LSPS5ClientConfig { + pub node_id: PublicKey, + pub address: SocketAddress, +} + +struct LSPS5Service { + service_config: LSPS5ServiceConfig, + ldk_service_config: LdkLSPS5ServiceConfig, +} + +/// Represents the configuration of the LSPS5 service. +/// +/// See [bLIP-55 / LSPS5] for more information. +/// +/// [bLIP-55 / LSPS5]: https://github.com/lightning/blips/blob/master/blip-0055.md +#[derive(Debug, Clone)] +pub struct LSPS5ServiceConfig { + /// Maximum number of webhooks allowed per client. + pub max_webhooks_per_client: u32, + /// HTTP client for sending webhook notifications. + pub http_client: Arc, +} + +impl Default for LSPS5ServiceConfig { + fn default() -> Self { + Self { max_webhooks_per_client: 5, http_client: Arc::new(DefaultHttpClient) } + } +} + +impl LSPS5ServiceConfig { + /// Creates a new LSPS5ServiceConfig with the default HTTP client. + pub fn new(max_webhooks_per_client: u32) -> Self { + Self { max_webhooks_per_client, http_client: Arc::new(DefaultHttpClient) } + } + + /// Creates a new LSPS5ServiceConfig with a custom HTTP client. + pub fn with_http_client( + max_webhooks_per_client: u32, http_client: Arc, + ) -> Self { + Self { max_webhooks_per_client, http_client } + } +} + pub(crate) struct LiquiditySourceBuilder where L::Target: LdkLogger, @@ -152,6 +285,8 @@ where lsps1_client: Option, lsps2_client: Option, lsps2_service: Option, + lsps5_client: Option, + lsps5_service: Option, wallet: Arc, channel_manager: Arc, keys_manager: Arc, @@ -174,10 +309,14 @@ where let lsps1_client = None; let lsps2_client = None; let lsps2_service = None; + let lsps5_client = None; + let lsps5_service = None; Self { lsps1_client, lsps2_client, lsps2_service, + lsps5_client, + lsps5_service, wallet, channel_manager, keys_manager, @@ -234,17 +373,59 @@ where self } - pub(crate) async fn build(self) -> Result, BuildError> { - let liquidity_service_config = self.lsps2_service.as_ref().map(|s| { - let lsps2_service_config = Some(s.ldk_service_config.clone()); - let lsps5_service_config = None; - let advertise_service = s.service_config.advertise_service; - LiquidityServiceConfig { lsps2_service_config, lsps5_service_config, advertise_service } + pub(crate) fn lsps5_client( + &mut self, lsp_node_id: PublicKey, lsp_address: SocketAddress, + ) -> &mut Self { + let ldk_client_config = LdkLSPS5ClientConfig {}; + + let pending_set_webhook_requests = Mutex::new(HashMap::new()); + let pending_list_webhooks_requests = Mutex::new(HashMap::new()); + let pending_remove_webhook_requests = Mutex::new(HashMap::new()); + + self.lsps5_client = Some(LSPS5Client { + ldk_client_config, + lsp_node_id, + lsp_address, + pending_set_webhook_requests, + pending_list_webhooks_requests, + pending_remove_webhook_requests, }); + self + } + + pub(crate) fn lsps5_service(&mut self, service_config: LSPS5ServiceConfig) -> &mut Self { + let ldk_service_config = LdkLSPS5ServiceConfig { + max_webhooks_per_client: service_config.max_webhooks_per_client, + }; + self.lsps5_service = Some(LSPS5Service { service_config, ldk_service_config }); + self + } + + pub(crate) async fn build(self) -> Result, BuildError> { + let lsps2_service_config = + self.lsps2_service.as_ref().map(|s| s.ldk_service_config.clone()); + let lsps5_service_config = + self.lsps5_service.as_ref().map(|s| s.ldk_service_config.clone()); + let advertise_service = self + .lsps2_service + .as_ref() + .map(|s| s.service_config.advertise_service) + .unwrap_or(false); + + let liquidity_service_config = + if lsps2_service_config.is_some() || lsps5_service_config.is_some() { + Some(LiquidityServiceConfig { + lsps2_service_config, + lsps5_service_config, + advertise_service, + }) + } else { + None + }; let lsps1_client_config = self.lsps1_client.as_ref().map(|s| s.ldk_client_config.clone()); let lsps2_client_config = self.lsps2_client.as_ref().map(|s| s.ldk_client_config.clone()); - let lsps5_client_config = None; + let lsps5_client_config = self.lsps5_client.as_ref().map(|s| s.ldk_client_config.clone()); let liquidity_client_config = Some(LiquidityClientConfig { lsps1_client_config, lsps2_client_config, @@ -271,6 +452,8 @@ where lsps1_client: self.lsps1_client, lsps2_client: self.lsps2_client, lsps2_service: self.lsps2_service, + lsps5_client: self.lsps5_client, + lsps5_service: self.lsps5_service, wallet: self.wallet, channel_manager: self.channel_manager, peer_manager: RwLock::new(None), @@ -289,6 +472,8 @@ where lsps1_client: Option, lsps2_client: Option, lsps2_service: Option, + lsps5_client: Option, + lsps5_service: Option, wallet: Arc, channel_manager: Arc, peer_manager: RwLock>>, @@ -318,6 +503,10 @@ where self.lsps2_client.as_ref().map(|s| (s.lsp_node_id, s.lsp_address.clone())) } + pub(crate) fn get_lsps5_lsp_details(&self) -> Option<(PublicKey, SocketAddress)> { + self.lsps5_client.as_ref().map(|s| (s.lsp_node_id, s.lsp_address.clone())) + } + pub(crate) fn lsps2_channel_needs_manual_broadcast( &self, counterparty_node_id: PublicKey, user_channel_id: u128, ) -> bool { @@ -910,6 +1099,332 @@ where ); } }, + LiquidityEvent::LSPS5Client(LSPS5ClientEvent::WebhookRegistered { + request_id, + counterparty_node_id, + num_webhooks, + max_webhooks, + no_change, + .. + }) => { + if let Some(lsps5_client) = self.lsps5_client.as_ref() { + if counterparty_node_id != lsps5_client.lsp_node_id { + debug_assert!( + false, + "Received response from unexpected LSP counterparty. This should never happen." + ); + log_error!( + self.logger, + "Received response from unexpected LSP counterparty. This should never happen." + ); + return; + } + + let response = Ok(SetWebhookResponse { num_webhooks, max_webhooks, no_change }); + + match lsps5_client + .pending_set_webhook_requests + .lock() + .unwrap() + .remove(&request_id) + { + Some(sender) => { + if sender.send(response).is_err() { + log_error!( + self.logger, + "Failed to handle response for request {:?} from liquidity service", + request_id + ); + } + }, + None => { + debug_assert!( + false, + "Received response from liquidity service for unknown request." + ); + log_error!( + self.logger, + "Received response from liquidity service for unknown request." + ); + }, + } + } else { + log_error!( + self.logger, + "Received unexpected LSPS5Client::WebhookRegistered event!" + ); + } + }, + LiquidityEvent::LSPS5Client(LSPS5ClientEvent::WebhookRegistrationFailed { + request_id, + counterparty_node_id, + error, + app_name, + url, + }) => { + if let Some(lsps5_client) = self.lsps5_client.as_ref() { + if counterparty_node_id != lsps5_client.lsp_node_id { + debug_assert!( + false, + "Received response from unexpected LSP counterparty. This should never happen." + ); + log_error!( + self.logger, + "Received response from unexpected LSP counterparty. This should never happen." + ); + return; + } + + match lsps5_client + .pending_set_webhook_requests + .lock() + .unwrap() + .remove(&request_id) + { + Some(sender) => { + log_error!( + self.logger, + "Webhook registration failed for app '{}' with url '{}': {:?}", + app_name, + url, + error + ); + if sender.send(Err(error)).is_err() { + log_error!( + self.logger, + "Failed to handle response for request {:?} from liquidity service", + request_id + ); + } + }, + None => { + debug_assert!( + false, + "Received response from liquidity service for unknown request." + ); + log_error!( + self.logger, + "Received response from liquidity service for unknown request." + ); + }, + } + } else { + log_error!( + self.logger, + "Received unexpected LSPS5Client::WebhookRegistrationFailed event!" + ); + } + }, + LiquidityEvent::LSPS5Client(LSPS5ClientEvent::WebhooksListed { + request_id, + counterparty_node_id, + app_names, + max_webhooks, + }) => { + if let Some(lsps5_client) = self.lsps5_client.as_ref() { + if counterparty_node_id != lsps5_client.lsp_node_id { + debug_assert!( + false, + "Received response from unexpected LSP counterparty. This should never happen." + ); + log_error!( + self.logger, + "Received response from unexpected LSP counterparty. This should never happen." + ); + return; + } + + let response = + Ok(ListWebhooksResponse { app_names: app_names.clone(), max_webhooks }); + + match lsps5_client + .pending_list_webhooks_requests + .lock() + .unwrap() + .remove(&request_id) + { + Some(sender) => { + if sender.send(response).is_err() { + log_error!( + self.logger, + "Failed to handle response for request {:?} from liquidity service", + request_id + ); + } + }, + None => { + debug_assert!( + false, + "Received response from liquidity service for unknown request." + ); + log_error!( + self.logger, + "Received response from liquidity service for unknown request." + ); + }, + } + } else { + log_error!( + self.logger, + "Received unexpected LSPS5Client::WebhooksListed event!" + ); + } + }, + LiquidityEvent::LSPS5Client(LSPS5ClientEvent::WebhookRemoved { + request_id, + counterparty_node_id, + .. + }) => { + if let Some(lsps5_client) = self.lsps5_client.as_ref() { + if counterparty_node_id != lsps5_client.lsp_node_id { + debug_assert!( + false, + "Received response from unexpected LSP counterparty. This should never happen." + ); + log_error!( + self.logger, + "Received response from unexpected LSP counterparty. This should never happen." + ); + return; + } + + match lsps5_client + .pending_remove_webhook_requests + .lock() + .unwrap() + .remove(&request_id) + { + Some(sender) => { + if sender.send(Ok(RemoveWebhookResponse {})).is_err() { + log_error!( + self.logger, + "Failed to handle response for request {:?} from liquidity service", + request_id + ); + } + }, + None => { + debug_assert!( + false, + "Received response from liquidity service for unknown request." + ); + log_error!( + self.logger, + "Received response from liquidity service for unknown request." + ); + }, + } + } else { + log_error!( + self.logger, + "Received unexpected LSPS5Client::WebhookRemoved event!" + ); + } + }, + LiquidityEvent::LSPS5Client(LSPS5ClientEvent::WebhookRemovalFailed { + request_id, + counterparty_node_id, + error, + app_name, + }) => { + if let Some(lsps5_client) = self.lsps5_client.as_ref() { + if counterparty_node_id != lsps5_client.lsp_node_id { + debug_assert!( + false, + "Received response from unexpected LSP counterparty. This should never happen." + ); + log_error!( + self.logger, + "Received response from unexpected LSP counterparty. This should never happen." + ); + return; + } + + match lsps5_client + .pending_remove_webhook_requests + .lock() + .unwrap() + .remove(&request_id) + { + Some(sender) => { + log_error!( + self.logger, + "Webhook removal failed for app '{}': {:?}", + app_name, + error + ); + if sender.send(Err(error)).is_err() { + log_error!( + self.logger, + "Failed to handle response for request {:?} from liquidity service", + request_id + ); + } + }, + None => { + debug_assert!( + false, + "Received response from liquidity service for unknown request." + ); + log_error!( + self.logger, + "Received response from liquidity service for unknown request." + ); + }, + } + } else { + log_error!( + self.logger, + "Received unexpected LSPS5Client::WebhookRemovalFailed event!" + ); + } + }, + LiquidityEvent::LSPS5Service(LSPS5ServiceEvent::SendWebhookNotification { + counterparty_node_id: _, + app_name, + url, + notification, + headers, + }) => { + if let Some(ref lsps5_service) = self.lsps5_service { + log_info!( + self.logger, + "Sending webhook notification for {} to {}: {:?}", + app_name, + url, + notification + ); + + let headers_std: HashMap = headers.into_iter().collect(); + let notification_str = serde_json::to_string(¬ification) + .unwrap_or_else(|_| format!("{:?}", notification)); + + if let Err(e) = lsps5_service.service_config.http_client.post( + &url, + &headers_std, + ¬ification_str, + ) { + println!(""); + println!( + "LSPS5 SendWebhookNotification event: {} to {}, {}", + app_name, url, e + ); + println!(""); + log_error!( + self.logger, + "Failed to send webhook notification for {} to {}: {}", + app_name, + url, + e + ); + } + } else { + log_error!( + self.logger, + "Received unexpected LSPS5ServiceEvent::SendWebhookNotification event!" + ); + } + }, e => { log_error!(self.logger, "Received unexpected liquidity event: {:?}", e); }, @@ -1336,6 +1851,165 @@ where }) } + pub(crate) async fn lsps5_set_webhook( + &self, app_name: String, webhook_url: String, + ) -> Result { + let lsps5_client = self.lsps5_client.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + let client_handler = self.liquidity_manager.lsps5_client_handler().ok_or_else(|| { + log_error!(self.logger, "LSPS5 liquidity client was not configured."); + Error::LiquiditySourceUnavailable + })?; + + let (sender, receiver) = oneshot::channel(); + let request_id = match client_handler.set_webhook( + lsps5_client.lsp_node_id, + app_name.clone(), + webhook_url.clone(), + ) { + Ok(request_id) => request_id, + Err(_) => return Err(Error::LiquiditySetWebhookFailed), + }; + + lsps5_client + .pending_set_webhook_requests + .lock() + .unwrap() + .insert(request_id.clone(), sender); + + match tokio::time::timeout(Duration::from_secs(LIQUIDITY_REQUEST_TIMEOUT_SECS), receiver) + .await + { + Ok(Ok(result)) => result.map_err(|_| { + log_error!(self.logger, "Failed to handle response from liquidity service"); + Error::LiquidityRequestFailed + }), + Ok(Err(_)) => { + log_error!(self.logger, "Failed to handle response from liquidity service"); + Err(Error::LiquidityRequestFailed) + }, + Err(e) => { + lsps5_client.pending_set_webhook_requests.lock().unwrap().remove(&request_id); + log_error!(self.logger, "Liquidity request timed out: {}", e); + Err(Error::LiquidityRequestFailed) + }, + } + } + + pub(crate) async fn lsps5_list_webhooks(&self) -> Result { + let lsps5_client = self.lsps5_client.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + let client_handler = self.liquidity_manager.lsps5_client_handler().ok_or_else(|| { + log_error!(self.logger, "LSPS5 liquidity client was not configured."); + Error::LiquiditySourceUnavailable + })?; + + let (sender, receiver) = oneshot::channel(); + let request_id = client_handler.list_webhooks(lsps5_client.lsp_node_id); + lsps5_client + .pending_list_webhooks_requests + .lock() + .unwrap() + .insert(request_id.clone(), sender); + + match tokio::time::timeout(Duration::from_secs(LIQUIDITY_REQUEST_TIMEOUT_SECS), receiver) + .await + { + Ok(Ok(result)) => result.map_err(|_| { + log_error!(self.logger, "Failed to handle response from liquidity service"); + Error::LiquidityRequestFailed + }), + Ok(Err(_)) => { + log_error!(self.logger, "Failed to handle response from liquidity service"); + Err(Error::LiquidityRequestFailed) + }, + Err(e) => { + lsps5_client.pending_list_webhooks_requests.lock().unwrap().remove(&request_id); + log_error!(self.logger, "Liquidity request timed out: {}", e); + Err(Error::LiquidityRequestFailed) + }, + } + } + + pub(crate) async fn lsps5_remove_webhook( + &self, app_name: String, + ) -> Result { + let lsps5_client = self.lsps5_client.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + let client_handler = self.liquidity_manager.lsps5_client_handler().ok_or_else(|| { + log_error!(self.logger, "LSPS5 liquidity client was not configured."); + Error::LiquiditySourceUnavailable + })?; + + let (sender, receiver) = oneshot::channel(); + let request_id = + match client_handler.remove_webhook(lsps5_client.lsp_node_id, app_name.clone()) { + Ok(request_id) => request_id, + Err(_) => return Err(Error::LiquidityRemoveWebhookFailed), + }; + + lsps5_client + .pending_remove_webhook_requests + .lock() + .unwrap() + .insert(request_id.clone(), sender); + + match tokio::time::timeout(Duration::from_secs(LIQUIDITY_REQUEST_TIMEOUT_SECS), receiver) + .await + { + Ok(Ok(result)) => result.map_err(|_| { + log_error!(self.logger, "Failed to handle response from liquidity service"); + Error::LiquidityRequestFailed + }), + Ok(Err(_)) => { + log_error!(self.logger, "Failed to handle response from liquidity service"); + Err(Error::LiquidityRequestFailed) + }, + Err(e) => { + lsps5_client.pending_remove_webhook_requests.lock().unwrap().remove(&request_id); + log_error!(self.logger, "Liquidity request timed out: {}", e); + Err(Error::LiquidityRequestFailed) + }, + } + } + + pub(crate) fn lsps5_notify_payment_incoming(&self, client_id: PublicKey) -> Result<(), Error> { + let handler = self + .liquidity_manager + .lsps5_service_handler() + .ok_or(Error::LiquiditySourceUnavailable)?; + handler.notify_payment_incoming(client_id).map_err(|_| Error::LiquidityRequestFailed) + } + + pub(crate) fn lsps5_notify_expiry_soon( + &self, client_id: PublicKey, timeout: u32, + ) -> Result<(), Error> { + let handler = self + .liquidity_manager + .lsps5_service_handler() + .ok_or(Error::LiquiditySourceUnavailable)?; + handler.notify_expiry_soon(client_id, timeout).map_err(|_| Error::LiquidityRequestFailed) + } + + pub(crate) fn lsps5_notify_liquidity_management_request( + &self, client_id: PublicKey, + ) -> Result<(), Error> { + let handler = self + .liquidity_manager + .lsps5_service_handler() + .ok_or(Error::LiquiditySourceUnavailable)?; + handler + .notify_liquidity_management_request(client_id) + .map_err(|_| Error::LiquidityRequestFailed) + } + + pub(crate) fn lsps5_notify_onion_message_incoming( + &self, client_id: PublicKey, + ) -> Result<(), Error> { + let handler = self + .liquidity_manager + .lsps5_service_handler() + .ok_or(Error::LiquiditySourceUnavailable)?; + handler.notify_onion_message_incoming(client_id).map_err(|_| Error::LiquidityRequestFailed) + } + pub(crate) async fn handle_channel_ready( &self, user_channel_id: u128, channel_id: &ChannelId, counterparty_node_id: &PublicKey, ) { @@ -1540,3 +2214,191 @@ impl LSPS1Liquidity { Ok(response) } } + +/// A liquidity handler for handling LSPS5 webhooks and notifications. +#[derive(Clone)] +pub struct LSPS5Liquidity { + runtime: Arc, + connection_manager: Arc>>, + liquidity_source: Option>>>, + logger: Arc, +} + +impl LSPS5Liquidity { + pub(crate) fn new( + runtime: Arc, connection_manager: Arc>>, + liquidity_source: Option>>>, logger: Arc, + ) -> Self { + Self { runtime, connection_manager, liquidity_source, logger } + } + + fn set_webhook_impl( + &self, app_name: String, webhook_url: String, + ) -> Result { + let liquidity_source = + self.liquidity_source.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + + let (lsp_node_id, lsp_address) = + liquidity_source.get_lsps5_lsp_details().ok_or(Error::LiquiditySourceUnavailable)?; + + let con_node_id = lsp_node_id; + let con_addr = lsp_address.clone(); + let con_cm = Arc::clone(&self.connection_manager); + + // We need to use our main runtime here as a local runtime might not be around to poll + // connection futures going forward. + self.runtime.block_on(async move { + con_cm.connect_peer_if_necessary(con_node_id, con_addr).await + })?; + + log_info!(self.logger, "Connected to LSP {}@{}. ", lsp_node_id, lsp_address); + + let liquidity_source = Arc::clone(&liquidity_source); + let response = self.runtime.block_on(async move { + liquidity_source.lsps5_set_webhook(app_name, webhook_url).await + })?; + + Ok(response) + } + + /// Sets a webhook URL at the configured LSP for receiving LSPS5 notifications. + #[cfg(not(feature = "uniffi"))] + pub fn set_webhook( + &self, app_name: String, webhook_url: String, + ) -> Result { + self.set_webhook_impl(app_name, webhook_url) + } + + /// Sets a webhook URL at the configured LSP for receiving LSPS5 notifications. + #[cfg(feature = "uniffi")] + pub fn set_webhook( + &self, app_name: String, webhook_url: String, + ) -> Result { + self.set_webhook_impl(app_name, webhook_url) + } + + fn list_webhooks_impl(&self) -> Result { + let liquidity_source = + self.liquidity_source.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + let (lsp_node_id, lsp_address) = + liquidity_source.get_lsps5_lsp_details().ok_or(Error::LiquiditySourceUnavailable)?; + let con_node_id = lsp_node_id; + let con_addr = lsp_address.clone(); + let con_cm = Arc::clone(&self.connection_manager); + + // We need to use our main runtime here as a local runtime might not be around to poll + // connection futures going forward. + self.runtime.block_on(async move { + con_cm.connect_peer_if_necessary(con_node_id, con_addr).await + })?; + + let liquidity_source = Arc::clone(&liquidity_source); + let response = + self.runtime.block_on(async move { liquidity_source.lsps5_list_webhooks().await })?; + Ok(response) + } + + /// Lists all currently configured webhooks at the configured LSP. + #[cfg(not(feature = "uniffi"))] + pub fn list_webhooks(&self) -> Result { + self.list_webhooks_impl() + } + + /// Lists all currently configured webhooks at the configured LSP. + #[cfg(feature = "uniffi")] + pub fn list_webhooks(&self) -> Result { + self.list_webhooks_impl().map(|response| response.into()) + } + + fn remove_webhook_impl(&self, app_name: String) -> Result { + let liquidity_source = + self.liquidity_source.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + + let (lsp_node_id, lsp_address) = + liquidity_source.get_lsps5_lsp_details().ok_or(Error::LiquiditySourceUnavailable)?; + + let con_node_id = lsp_node_id; + let con_addr = lsp_address.clone(); + let con_cm = Arc::clone(&self.connection_manager); + + // We need to use our main runtime here as a local runtime might not be around to poll + // connection futures going forward. + self.runtime.block_on(async move { + con_cm.connect_peer_if_necessary(con_node_id, con_addr).await + })?; + + log_info!(self.logger, "Connected to LSP {}@{}. ", lsp_node_id, lsp_address); + + let liquidity_source = Arc::clone(&liquidity_source); + let response = self + .runtime + .block_on(async move { liquidity_source.lsps5_remove_webhook(app_name).await })?; + + Ok(response) + } + + /// Removes a previously-configured webhook at the configured LSP. + #[cfg(not(feature = "uniffi"))] + pub fn remove_webhook(&self, app_name: String) -> Result { + self.remove_webhook_impl(app_name) + } + + /// Removes a previously-configured webhook at the configured LSP. + #[cfg(feature = "uniffi")] + pub fn remove_webhook(&self, app_name: String) -> Result { + self.remove_webhook_impl(app_name) + } + + /// Notifies the configured LSP about an incoming payment. + pub fn notify_payment_incoming(&self, client_id: PublicKey) -> Result<(), Error> { + let liquidity_source = + self.liquidity_source.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + liquidity_source.lsps5_notify_payment_incoming(client_id) + } + + /// Notifies the configured LSP about an invoice expiring soon. + pub fn notify_expiry_soon(&self, client_id: PublicKey, timeout: u32) -> Result<(), Error> { + let liquidity_source = + self.liquidity_source.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + liquidity_source.lsps5_notify_expiry_soon(client_id, timeout) + } + + /// Notifies the configured LSP about a liquidity management request. + pub fn notify_liquidity_management_request(&self, client_id: PublicKey) -> Result<(), Error> { + let liquidity_source = + self.liquidity_source.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + liquidity_source.lsps5_notify_liquidity_management_request(client_id) + } + + /// Notifies the configured LSP about an incoming onion message. + pub fn notify_onion_message_incoming(&self, client_id: PublicKey) -> Result<(), Error> { + let liquidity_source = + self.liquidity_source.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + liquidity_source.lsps5_notify_onion_message_incoming(client_id) + } +} + +#[cfg(feature = "uniffi")] +// Re-export LSPS5 response types for uniffi +pub use lightning_liquidity::lsps5::msgs::{ + RemoveWebhookResponse as LSPS5RemoveWebhookResponse, + SetWebhookResponse as LSPS5SetWebhookResponse, +}; + +#[cfg(feature = "uniffi")] +/// Wrapper for ListWebhooksResponse that converts LSPS5AppName to String for uniffi +#[derive(Clone, Debug)] +pub struct LSPS5ListWebhooksResponse { + pub app_names: Vec, + pub max_webhooks: u32, +} + +#[cfg(feature = "uniffi")] +impl From for LSPS5ListWebhooksResponse { + fn from(response: ListWebhooksResponse) -> Self { + Self { + app_names: response.app_names.into_iter().map(|name| name.to_string()).collect(), + max_webhooks: response.max_webhooks, + } + } +}