diff --git a/Cargo.toml b/Cargo.toml index 1d3f45bfa..c6606a2f3 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -81,7 +81,7 @@ bdk_esplora = { version = "0.22.0", default-features = false, features = ["async bdk_electrum = { version = "0.23.0", default-features = false, features = ["use-rustls-ring"]} bdk_wallet = { version = "2.0.0", default-features = false, features = ["std", "keys-bip39"]} -reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] } +reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls", "blocking"] } rustls = { version = "0.23", default-features = false } rusqlite = { version = "0.31.0", features = ["bundled"] } bitcoin = "0.32.4" diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index 9f0ef697e..ded7903b3 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -130,6 +130,7 @@ interface Node { OnchainPayment onchain_payment(); UnifiedQrPayment unified_qr_payment(); LSPS1Liquidity lsps1_liquidity(); + LSPS5Liquidity lsps5_liquidity(); [Throws=NodeError] void connect(PublicKey node_id, SocketAddress address, boolean persist); [Throws=NodeError] @@ -264,6 +265,15 @@ interface LSPS1Liquidity { LSPS1OrderStatus check_order_status(LSPS1OrderId order_id); }; +interface LSPS5Liquidity { + [Throws=NodeError] + LSPS5SetWebhookResponse set_webhook(string app_name, string webhook_url); + [Throws=NodeError] + LSPS5ListWebhooksResponse list_webhooks(); + [Throws=NodeError] + LSPS5RemoveWebhookResponse remove_webhook(string app_name); +}; + [Error] enum NodeError { "AlreadyRunning", @@ -320,6 +330,8 @@ enum NodeError { "LiquidityFeeTooHigh", "InvalidBlindedPaths", "AsyncPaymentServicesDisabled", + "LiquiditySetWebhookFailed", + "LiquidityRemoveWebhookFailed", }; dictionary NodeStatus { @@ -531,6 +543,20 @@ enum LSPS1PaymentState { "Refunded", }; +dictionary LSPS5SetWebhookResponse { + u32 num_webhooks; + u32 max_webhooks; + boolean no_change; +}; + +dictionary LSPS5ListWebhooksResponse { + sequence app_names; + u32 max_webhooks; +}; + +dictionary LSPS5RemoveWebhookResponse { +}; + [NonExhaustive] enum Network { "Bitcoin", diff --git a/src/builder.rs b/src/builder.rs index b99c44cec..85e69d7ee 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -23,7 +23,8 @@ use crate::io::{ self, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, }; use crate::liquidity::{ - LSPS1ClientConfig, LSPS2ClientConfig, LSPS2ServiceConfig, LiquiditySourceBuilder, + LSPS1ClientConfig, LSPS2ClientConfig, LSPS2ServiceConfig, LSPS5ClientConfig, + LSPS5ServiceConfig, LiquiditySourceBuilder, }; use crate::logger::{log_error, LdkLogger, LogLevel, LogWriter, Logger}; use crate::message_handler::NodeCustomMessageHandler; @@ -125,6 +126,10 @@ struct LiquiditySourceConfig { lsps2_client: Option, // Act as an LSPS2 service. lsps2_service: Option, + /// Act as an LSPS5 client connecting to the given service. + lsps5_client: Option, + // Act as an LSPS5 service. + lsps5_service: Option, } #[derive(Clone)] @@ -738,6 +743,27 @@ impl NodeBuilder { kv_store, ) } + + /// Configures the [`Node`] instance to source inbound liquidity from the given + pub fn set_liquidity_source_lsps5( + &mut self, node_id: PublicKey, address: SocketAddress, + ) -> &mut Self { + let liquidity_source_config = + self.liquidity_source_config.get_or_insert(LiquiditySourceConfig::default()); + let lsps5_client_config = LSPS5ClientConfig { node_id, address }; + liquidity_source_config.lsps5_client = Some(lsps5_client_config); + self + } + + /// Configures the [`Node`] instance to provide an LSPS5 service + pub fn set_liquidity_provider_lsps5( + &mut self, service_config: LSPS5ServiceConfig, + ) -> &mut Self { + let liquidity_source_config = + self.liquidity_source_config.get_or_insert(LiquiditySourceConfig::default()); + liquidity_source_config.lsps5_service = Some(service_config); + self + } } /// A builder for an [`Node`] instance, allowing to set some configuration and module choices from @@ -928,6 +954,20 @@ impl ArcedNodeBuilder { self.inner.write().unwrap().set_liquidity_provider_lsps2(service_config); } + /// Configures the [`Node`] instance to source inbound liquidity from the given [LSPS5] LSP. + /// + /// [LSPS5]: https://github.com/lightning/blips/blob/master/blip-0055.md + pub fn set_liquidity_source_lsps5(&self, node_id: PublicKey, address: SocketAddress) { + self.inner.write().unwrap().set_liquidity_source_lsps5(node_id, address); + } + + /// Configures the [`Node`] instance to provide an LSPS5 service. + /// + /// [LSPS5]: https://github.com/lightning/blips/blob/master/blip-0055.md + pub fn set_liquidity_provider_lsps5(&self, service_config: LSPS5ServiceConfig) { + self.inner.write().unwrap().set_liquidity_provider_lsps5(service_config); + } + /// Sets the used storage directory path. pub fn set_storage_dir_path(&self, storage_dir_path: String) { self.inner.write().unwrap().set_storage_dir_path(storage_dir_path); @@ -1542,6 +1582,13 @@ fn build_with_store_internal( liquidity_source_builder.lsps2_service(promise_secret, config.clone()) }); + lsc.lsps5_client.as_ref().map(|config| { + liquidity_source_builder.lsps5_client(config.node_id, config.address.clone()) + }); + lsc.lsps5_service + .as_ref() + .map(|config| liquidity_source_builder.lsps5_service(config.clone())); + let liquidity_source = Arc::new(liquidity_source_builder.build()); let custom_message_handler = Arc::new(NodeCustomMessageHandler::new_liquidity(Arc::clone(&liquidity_source))); diff --git a/src/error.rs b/src/error.rs index eaa022e56..0ee5ca9f6 100644 --- a/src/error.rs +++ b/src/error.rs @@ -124,6 +124,10 @@ pub enum Error { InvalidBlindedPaths, /// Asynchronous payment services are disabled. AsyncPaymentServicesDisabled, + /// Failed to set a webhook with the LSP. + LiquiditySetWebhookFailed, + /// Failed to remove a webhook with the LSP. + LiquidityRemoveWebhookFailed, } impl fmt::Display for Error { @@ -201,6 +205,12 @@ impl fmt::Display for Error { Self::AsyncPaymentServicesDisabled => { write!(f, "Asynchronous payment services are disabled.") }, + Self::LiquiditySetWebhookFailed => { + write!(f, "Failed to set a webhook with the LSP.") + }, + Self::LiquidityRemoveWebhookFailed => { + write!(f, "Failed to remove a webhook with the LSP.") + }, } } } diff --git a/src/lib.rs b/src/lib.rs index e7e27273b..97bad6ca8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -171,6 +171,14 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex, RwLock}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; +#[cfg(feature = "uniffi")] +use crate::liquidity::{ + LSPS5Liquidity, LSPS5ListWebhooksResponse, LSPS5RemoveWebhookResponse, LSPS5SetWebhookResponse, +}; + +#[cfg(not(feature = "uniffi"))] +use crate::liquidity::LSPS5Liquidity; + #[cfg(feature = "uniffi")] uniffi::include_scaffolding!("ldk_node"); @@ -958,6 +966,32 @@ impl Node { )) } + /// Returns a liquidity handler allowing to handle webhooks and notifications via the [bLIP-55 / LSPS5] protocol. + /// + /// [bLIP-55 / LSPS5]: https://github.com/lightning/blips/blob/master/blip-0055.md + #[cfg(not(feature = "uniffi"))] + pub fn lsps5_liquidity(&self) -> LSPS5Liquidity { + LSPS5Liquidity::new( + Arc::clone(&self.runtime), + Arc::clone(&self.connection_manager), + self.liquidity_source.clone(), + Arc::clone(&self.logger), + ) + } + + /// Returns a liquidity handler allowing to handle webhooks and notifications via the [bLIP-55 / LSPS5] protocol. + /// + /// [bLIP-55 / LSPS5]: https://github.com/lightning/blips/blob/master/blip-0055.md + #[cfg(feature = "uniffi")] + pub fn lsps5_liquidity(&self) -> Arc { + Arc::new(LSPS5Liquidity::new( + Arc::clone(&self.runtime), + Arc::clone(&self.connection_manager), + self.liquidity_source.clone(), + Arc::clone(&self.logger), + )) + } + /// Retrieve a list of known channels. pub fn list_channels(&self) -> Vec { self.channel_manager.list_channels().into_iter().map(|c| c.into()).collect() diff --git a/src/liquidity.rs b/src/liquidity.rs index 5d0bf5afe..67d31b0b7 100644 --- a/src/liquidity.rs +++ b/src/liquidity.rs @@ -34,6 +34,12 @@ use lightning_liquidity::lsps2::event::{LSPS2ClientEvent, LSPS2ServiceEvent}; use lightning_liquidity::lsps2::msgs::{LSPS2OpeningFeeParams, LSPS2RawOpeningFeeParams}; use lightning_liquidity::lsps2::service::LSPS2ServiceConfig as LdkLSPS2ServiceConfig; use lightning_liquidity::lsps2::utils::compute_opening_fee; +use lightning_liquidity::lsps5::client::LSPS5ClientConfig as LdkLSPS5ClientConfig; +use lightning_liquidity::lsps5::event::{LSPS5ClientEvent, LSPS5ServiceEvent}; +use lightning_liquidity::lsps5::msgs::{ + LSPS5Error, ListWebhooksResponse, RemoveWebhookResponse, SetWebhookResponse, +}; +use lightning_liquidity::lsps5::service::LSPS5ServiceConfig as LdkLSPS5ServiceConfig; use lightning_liquidity::{LiquidityClientConfig, LiquidityServiceConfig}; use lightning_types::payment::PaymentHash; @@ -52,6 +58,77 @@ use std::ops::Deref; use std::sync::{Arc, Mutex, RwLock}; use std::time::Duration; +/// Error type for HTTP client operations. +#[derive(Debug)] +pub enum HttpClientError { + /// Network or connection error. + Network(String), + /// HTTP status error. + Status(u16), + /// Other error. + Other(String), +} + +impl std::fmt::Display for HttpClientError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + HttpClientError::Network(msg) => write!(f, "Network error: {}", msg), + HttpClientError::Status(code) => write!(f, "HTTP error: {}", code), + HttpClientError::Other(msg) => write!(f, "HTTP client error: {}", msg), + } + } +} + +impl std::error::Error for HttpClientError {} + +/// Trait for making HTTP requests. +pub trait HttpClient: Send + Sync + std::fmt::Debug { + /// Make a POST request to the specified URL with headers and body. + fn post( + &self, url: &str, headers: &HashMap, body: &str, + ) -> Result<(), HttpClientError>; +} + +/// Default HTTP client implementation using reqwest. +#[derive(Debug, Clone)] +pub struct DefaultHttpClient; + +impl HttpClient for DefaultHttpClient { + fn post( + &self, url: &str, headers: &HashMap, body: &str, + ) -> Result<(), HttpClientError> { + let url = url.to_string(); + let headers = headers.clone(); + let body = body.to_string(); + + let handle = std::thread::spawn(move || -> Result<(), HttpClientError> { + let client = reqwest::blocking::Client::builder() + .timeout(Duration::from_secs(5)) + .build() + .map_err(|e| HttpClientError::Network(e.to_string()))?; + + let mut request = client.post(&url); + + for (key, value) in &headers { + request = request.header(key, value); + } + + let response = + request.body(body).send().map_err(|e| HttpClientError::Network(e.to_string()))?; + + if response.status().is_success() { + Ok(()) + } else { + Err(HttpClientError::Status(response.status().as_u16())) + } + }); + + handle + .join() + .map_err(|_| HttpClientError::Other("HTTP client thread panicked".to_string()))? + } +} + const LIQUIDITY_REQUEST_TIMEOUT_SECS: u64 = 5; const LSPS2_GETINFO_REQUEST_EXPIRY: Duration = Duration::from_secs(60 * 60 * 24); @@ -98,6 +175,29 @@ struct LSPS2Service { ldk_service_config: LdkLSPS2ServiceConfig, } +struct LSPS5Client { + lsp_node_id: PublicKey, + lsp_address: SocketAddress, + ldk_client_config: LdkLSPS5ClientConfig, + pending_set_webhook_requests: + Mutex>>>, + pending_list_webhooks_requests: + Mutex>>>, + pending_remove_webhook_requests: + Mutex>>>, +} + +#[derive(Debug, Clone)] +pub(crate) struct LSPS5ClientConfig { + pub node_id: PublicKey, + pub address: SocketAddress, +} + +struct LSPS5Service { + service_config: LSPS5ServiceConfig, + ldk_service_config: LdkLSPS5ServiceConfig, +} + /// Represents the configuration of the LSPS2 service. /// /// See [bLIP-52 / LSPS2] for more information. @@ -136,6 +236,39 @@ pub struct LSPS2ServiceConfig { pub max_payment_size_msat: u64, } +/// Represents the configuration of the LSPS5 service. +/// +/// See [bLIP-55 / LSPS5] for more information. +/// +/// [bLIP-55 / LSPS5]: https://github.com/lightning/blips/blob/master/blip-0055.md +#[derive(Debug, Clone)] +pub struct LSPS5ServiceConfig { + /// Maximum number of webhooks allowed per client. + pub max_webhooks_per_client: u32, + /// HTTP client for sending webhook notifications. + pub http_client: Arc, +} + +impl Default for LSPS5ServiceConfig { + fn default() -> Self { + Self { max_webhooks_per_client: 5, http_client: Arc::new(DefaultHttpClient) } + } +} + +impl LSPS5ServiceConfig { + /// Creates a new LSPS5ServiceConfig with the default HTTP client. + pub fn new(max_webhooks_per_client: u32) -> Self { + Self { max_webhooks_per_client, http_client: Arc::new(DefaultHttpClient) } + } + + /// Creates a new LSPS5ServiceConfig with a custom HTTP client. + pub fn with_http_client( + max_webhooks_per_client: u32, http_client: Arc, + ) -> Self { + Self { max_webhooks_per_client, http_client } + } +} + pub(crate) struct LiquiditySourceBuilder where L::Target: LdkLogger, @@ -143,6 +276,8 @@ where lsps1_client: Option, lsps2_client: Option, lsps2_service: Option, + lsps5_client: Option, + lsps5_service: Option, wallet: Arc, channel_manager: Arc, keys_manager: Arc, @@ -162,10 +297,14 @@ where let lsps1_client = None; let lsps2_client = None; let lsps2_service = None; + let lsps5_service = None; + let lsps5_client = None; Self { lsps1_client, lsps2_client, lsps2_service, + lsps5_client, + lsps5_service, wallet, channel_manager, keys_manager, @@ -220,17 +359,58 @@ where self } - pub(crate) fn build(self) -> LiquiditySource { - let liquidity_service_config = self.lsps2_service.as_ref().map(|s| { - let lsps2_service_config = Some(s.ldk_service_config.clone()); - let lsps5_service_config = None; - let advertise_service = s.service_config.advertise_service; - LiquidityServiceConfig { lsps2_service_config, lsps5_service_config, advertise_service } + pub(crate) fn lsps5_client( + &mut self, lsp_node_id: PublicKey, lsp_address: SocketAddress, + ) -> &mut Self { + let ldk_client_config = LdkLSPS5ClientConfig {}; + + let pending_set_webhook_requests = Mutex::new(HashMap::new()); + let pending_list_webhooks_requests = Mutex::new(HashMap::new()); + let pending_remove_webhook_requests = Mutex::new(HashMap::new()); + + self.lsps5_client = Some(LSPS5Client { + ldk_client_config, + lsp_node_id, + lsp_address, + pending_set_webhook_requests, + pending_list_webhooks_requests, + pending_remove_webhook_requests, }); + self + } + + pub(crate) fn lsps5_service(&mut self, service_config: LSPS5ServiceConfig) -> &mut Self { + let ldk_service_config = LdkLSPS5ServiceConfig { + max_webhooks_per_client: service_config.max_webhooks_per_client, + }; + self.lsps5_service = Some(LSPS5Service { service_config, ldk_service_config }); + self + } + + pub(crate) fn build(self) -> LiquiditySource { + let lsps2_service_config = + self.lsps2_service.as_ref().map(|s| s.ldk_service_config.clone()); + let lsps5_service_config = + self.lsps5_service.as_ref().map(|s| s.ldk_service_config.clone()); + let advertise_service = self + .lsps2_service + .as_ref() + .map(|s| s.service_config.advertise_service) + .unwrap_or(false); + let liquidity_service_config = + if lsps2_service_config.is_some() || lsps5_service_config.is_some() { + Some(LiquidityServiceConfig { + lsps2_service_config, + lsps5_service_config, + advertise_service, + }) + } else { + None + }; let lsps1_client_config = self.lsps1_client.as_ref().map(|s| s.ldk_client_config.clone()); let lsps2_client_config = self.lsps2_client.as_ref().map(|s| s.ldk_client_config.clone()); - let lsps5_client_config = None; + let lsps5_client_config = self.lsps5_client.as_ref().map(|s| s.ldk_client_config.clone()); let liquidity_client_config = Some(LiquidityClientConfig { lsps1_client_config, lsps2_client_config, @@ -251,6 +431,8 @@ where lsps1_client: self.lsps1_client, lsps2_client: self.lsps2_client, lsps2_service: self.lsps2_service, + lsps5_client: self.lsps5_client, + lsps5_service: self.lsps5_service, wallet: self.wallet, channel_manager: self.channel_manager, peer_manager: RwLock::new(None), @@ -269,6 +451,8 @@ where lsps1_client: Option, lsps2_client: Option, lsps2_service: Option, + lsps5_client: Option, + lsps5_service: Option, wallet: Arc, channel_manager: Arc, peer_manager: RwLock>>, @@ -298,6 +482,10 @@ where self.lsps2_client.as_ref().map(|s| (s.lsp_node_id, s.lsp_address.clone())) } + pub(crate) fn get_lsps5_lsp_details(&self) -> Option<(PublicKey, SocketAddress)> { + self.lsps5_client.as_ref().map(|s| (s.lsp_node_id, s.lsp_address.clone())) + } + pub(crate) async fn handle_next_event(&self) { match self.liquidity_manager.next_event_async().await { LiquidityEvent::LSPS1Client(LSPS1ClientEvent::SupportedOptionsReady { @@ -820,6 +1008,296 @@ where ); } }, + LiquidityEvent::LSPS5Client(event) => { + if let Some(lsps5_client) = self.lsps5_client.as_ref() { + match event { + LSPS5ClientEvent::WebhookRegistered { + request_id, + counterparty_node_id, + num_webhooks, + max_webhooks, + no_change, + .. + } => { + if counterparty_node_id != lsps5_client.lsp_node_id { + debug_assert!( + false, + "Received response from unexpected LSP counterparty. This should never happen." + ); + log_error!( + self.logger, + "Received response from unexpected LSP counterparty. This should never happen." + ); + return; + } + + let response = + Ok(SetWebhookResponse { num_webhooks, max_webhooks, no_change }); + + match lsps5_client + .pending_set_webhook_requests + .lock() + .unwrap() + .remove(&request_id) + { + Some(sender) => { + if sender.send(response).is_err() { + log_error!( + self.logger, + "Failed to handle response for request {:?} from liquidity service", + request_id + ); + } + }, + None => { + debug_assert!(false, "Received response from liquidity service for unknown request."); + log_error!( + self.logger, + "Received response from liquidity service for unknown request." + ); + }, + } + }, + LSPS5ClientEvent::WebhookRegistrationFailed { + request_id, + counterparty_node_id, + error, + app_name, + url, + } => { + if counterparty_node_id != lsps5_client.lsp_node_id { + debug_assert!( + false, + "Received response from unexpected LSP counterparty. This should never happen." + ); + log_error!( + self.logger, + "Received response from unexpected LSP counterparty. This should never happen." + ); + return; + } + + match lsps5_client + .pending_set_webhook_requests + .lock() + .unwrap() + .remove(&request_id) + { + Some(sender) => { + log_error!( + self.logger, + "Webhook registration failed for app '{}' with url '{}': {:?}", + app_name, + url, + error + ); + if sender.send(Err(error)).is_err() { + log_error!( + self.logger, + "Failed to handle response for request {:?} from liquidity service", + request_id + ); + } + }, + None => { + debug_assert!(false, "Received response from liquidity service for unknown request."); + log_error!( + self.logger, + "Received response from liquidity service for unknown request." + ); + }, + } + }, + LSPS5ClientEvent::WebhooksListed { + request_id, + counterparty_node_id, + app_names, + max_webhooks, + } => { + if counterparty_node_id != lsps5_client.lsp_node_id { + debug_assert!( + false, + "Received response from unexpected LSP counterparty. This should never happen." + ); + log_error!( + self.logger, + "Received response from unexpected LSP counterparty. This should never happen." + ); + return; + } + + let response = Ok(ListWebhooksResponse { + app_names: app_names.clone(), + max_webhooks, + }); + + match lsps5_client + .pending_list_webhooks_requests + .lock() + .unwrap() + .remove(&request_id) + { + Some(sender) => { + if sender.send(response).is_err() { + log_error!( + self.logger, + "Failed to handle response for request {:?} from liquidity service", + request_id + ); + } + }, + None => { + debug_assert!(false, "Received response from liquidity service for unknown request."); + log_error!( + self.logger, + "Received response from liquidity service for unknown request." + ); + }, + } + }, + LSPS5ClientEvent::WebhookRemoved { + request_id, + counterparty_node_id, + .. + } => { + if counterparty_node_id != lsps5_client.lsp_node_id { + debug_assert!( + false, + "Received response from unexpected LSP counterparty. This should never happen." + ); + log_error!( + self.logger, + "Received response from unexpected LSP counterparty. This should never happen." + ); + return; + } + + match lsps5_client + .pending_remove_webhook_requests + .lock() + .unwrap() + .remove(&request_id) + { + Some(sender) => { + if sender.send(Ok(RemoveWebhookResponse {})).is_err() { + log_error!( + self.logger, + "Failed to handle response for request {:?} from liquidity service", + request_id + ); + } + }, + None => { + debug_assert!(false, "Received response from liquidity service for unknown request."); + log_error!( + self.logger, + "Received response from liquidity service for unknown request." + ); + }, + } + }, + LSPS5ClientEvent::WebhookRemovalFailed { + request_id, + counterparty_node_id, + error, + app_name, + } => { + if counterparty_node_id != lsps5_client.lsp_node_id { + debug_assert!( + false, + "Received response from unexpected LSP counterparty. This should never happen." + ); + log_error!( + self.logger, + "Received response from unexpected LSP counterparty. This should never happen." + ); + return; + } + + match lsps5_client + .pending_remove_webhook_requests + .lock() + .unwrap() + .remove(&request_id) + { + Some(sender) => { + log_error!( + self.logger, + "Webhook removal failed for app '{}': {:?}", + app_name, + error + ); + if sender.send(Err(error)).is_err() { + log_error!( + self.logger, + "Failed to handle response for request {:?} from liquidity service", + request_id + ); + } + }, + None => { + debug_assert!(false, "Received response from liquidity service for unknown request."); + log_error!( + self.logger, + "Received response from liquidity service for unknown request." + ); + }, + } + }, + } + } else { + log_error!( + self.logger, + "Received LSPS5 client event but LSPS5 client was not configured." + ); + } + }, + LiquidityEvent::LSPS5Service(LSPS5ServiceEvent::SendWebhookNotification { + counterparty_node_id: _, + app_name, + url, + notification, + headers, + }) => { + if let Some(ref lsps5_service) = self.lsps5_service { + log_info!( + self.logger, + "Sending webhook notification for {} to {}: {:?}", + app_name, + url, + notification + ); + + let headers_std: HashMap = headers.into_iter().collect(); + let notification_str = serde_json::to_string(¬ification) + .unwrap_or_else(|_| format!("{:?}", notification)); + + if let Err(e) = lsps5_service.service_config.http_client.post( + &url, + &headers_std, + ¬ification_str, + ) { + println!(""); + println!( + "LSPS5 SendWebhookNotification event: {} to {}, {}", + app_name, url, e + ); + println!(""); + log_error!( + self.logger, + "Failed to send webhook notification for {} to {}: {}", + app_name, + url, + e + ); + } + } else { + log_error!( + self.logger, + "Received LSPS5 service event but LSPS5 service was not configured." + ); + } + }, e => { log_error!(self.logger, "Received unexpected liquidity event: {:?}", e); }, @@ -1246,6 +1724,133 @@ where }) } + pub(crate) async fn lsps5_set_webhook( + &self, app_name: String, webhook_url: String, + ) -> Result { + let lsps5_client = self.lsps5_client.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + let client_handler = self.liquidity_manager.lsps5_client_handler().ok_or_else(|| { + log_error!(self.logger, "LSPS5 liquidity client was not configured."); + Error::LiquiditySourceUnavailable + })?; + + let (sender, receiver) = oneshot::channel(); + let request_id = match client_handler.set_webhook( + lsps5_client.lsp_node_id, + app_name.clone(), + webhook_url.clone(), + ) { + Ok(request_id) => request_id, + Err(_) => return Err(Error::LiquiditySetWebhookFailed), + }; + + lsps5_client + .pending_set_webhook_requests + .lock() + .unwrap() + .insert(request_id.clone(), sender); + + match tokio::time::timeout(Duration::from_secs(LIQUIDITY_REQUEST_TIMEOUT_SECS), receiver) + .await + { + Ok(Ok(result)) => result.map_err(|_| { + log_error!(self.logger, "Failed to handle response from liquidity service"); + Error::LiquidityRequestFailed + }), + Ok(Err(_)) => { + log_error!(self.logger, "Failed to handle response from liquidity service"); + Err(Error::LiquidityRequestFailed) + }, + Err(e) => { + lsps5_client.pending_set_webhook_requests.lock().unwrap().remove(&request_id); + log_error!(self.logger, "Liquidity request timed out: {}", e); + Err(Error::LiquidityRequestFailed) + }, + } + } + + pub(crate) async fn lsps5_list_webhooks(&self) -> Result { + let lsps5_client = self.lsps5_client.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + let client_handler = self.liquidity_manager.lsps5_client_handler().ok_or_else(|| { + log_error!(self.logger, "LSPS5 liquidity client was not configured."); + Error::LiquiditySourceUnavailable + })?; + + let (sender, receiver) = oneshot::channel(); + let request_id = client_handler.list_webhooks(lsps5_client.lsp_node_id); + lsps5_client + .pending_list_webhooks_requests + .lock() + .unwrap() + .insert(request_id.clone(), sender); + + match tokio::time::timeout(Duration::from_secs(LIQUIDITY_REQUEST_TIMEOUT_SECS), receiver) + .await + { + Ok(Ok(result)) => result.map_err(|_| { + log_error!(self.logger, "Failed to handle response from liquidity service"); + Error::LiquidityRequestFailed + }), + Ok(Err(_)) => { + log_error!(self.logger, "Failed to handle response from liquidity service"); + Err(Error::LiquidityRequestFailed) + }, + Err(e) => { + lsps5_client.pending_list_webhooks_requests.lock().unwrap().remove(&request_id); + log_error!(self.logger, "Liquidity request timed out: {}", e); + Err(Error::LiquidityRequestFailed) + }, + } + } + + pub(crate) async fn lsps5_remove_webhook( + &self, app_name: String, + ) -> Result { + let lsps5_client = self.lsps5_client.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + let client_handler = self.liquidity_manager.lsps5_client_handler().ok_or_else(|| { + log_error!(self.logger, "LSPS5 liquidity client was not configured."); + Error::LiquiditySourceUnavailable + })?; + + let (sender, receiver) = oneshot::channel(); + let request_id = + match client_handler.remove_webhook(lsps5_client.lsp_node_id, app_name.clone()) { + Ok(request_id) => request_id, + Err(_) => return Err(Error::LiquidityRemoveWebhookFailed), + }; + + lsps5_client + .pending_remove_webhook_requests + .lock() + .unwrap() + .insert(request_id.clone(), sender); + + match tokio::time::timeout(Duration::from_secs(LIQUIDITY_REQUEST_TIMEOUT_SECS), receiver) + .await + { + Ok(Ok(result)) => result.map_err(|_| { + log_error!(self.logger, "Failed to handle response from liquidity service"); + Error::LiquidityRequestFailed + }), + Ok(Err(_)) => { + log_error!(self.logger, "Failed to handle response from liquidity service"); + Err(Error::LiquidityRequestFailed) + }, + Err(e) => { + lsps5_client.pending_remove_webhook_requests.lock().unwrap().remove(&request_id); + log_error!(self.logger, "Liquidity request timed out: {}", e); + Err(Error::LiquidityRequestFailed) + }, + } + } + + pub(crate) fn lsps5_notify_payment_incoming(&self, client_id: PublicKey) -> Result<(), Error> { + let handler = self + .liquidity_manager + .lsps5_service_handler() + .ok_or(Error::LiquiditySourceUnavailable)?; + handler.notify_payment_incoming(client_id).map_err(|_| Error::LiquidityRequestFailed) + } + pub(crate) fn handle_channel_ready( &self, user_channel_id: u128, channel_id: &ChannelId, counterparty_node_id: &PublicKey, ) { @@ -1444,3 +2049,170 @@ impl LSPS1Liquidity { Ok(response) } } + +/// A liquidity handler for handling LSPS5 webhooks and notifications. +#[derive(Clone)] +pub struct LSPS5Liquidity { + runtime: Arc, + connection_manager: Arc>>, + liquidity_source: Option>>>, + logger: Arc, +} + +impl LSPS5Liquidity { + pub(crate) fn new( + runtime: Arc, connection_manager: Arc>>, + liquidity_source: Option>>>, logger: Arc, + ) -> Self { + Self { runtime, connection_manager, liquidity_source, logger } + } + + fn set_webhook_impl( + &self, app_name: String, webhook_url: String, + ) -> Result { + let liquidity_source = + self.liquidity_source.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + + let (lsp_node_id, lsp_address) = + liquidity_source.get_lsps5_lsp_details().ok_or(Error::LiquiditySourceUnavailable)?; + + let con_node_id = lsp_node_id; + let con_addr = lsp_address.clone(); + let con_cm = Arc::clone(&self.connection_manager); + + // We need to use our main runtime here as a local runtime might not be around to poll + // connection futures going forward. + self.runtime.block_on(async move { + con_cm.connect_peer_if_necessary(con_node_id, con_addr).await + })?; + + log_info!(self.logger, "Connected to LSP {}@{}. ", lsp_node_id, lsp_address); + + let liquidity_source = Arc::clone(&liquidity_source); + let response = self.runtime.block_on(async move { + liquidity_source.lsps5_set_webhook(app_name, webhook_url).await + })?; + + Ok(response) + } + + /// Sets a webhook URL at the configured LSP for receiving LSPS5 notifications. + #[cfg(not(feature = "uniffi"))] + pub fn set_webhook( + &self, app_name: String, webhook_url: String, + ) -> Result { + self.set_webhook_impl(app_name, webhook_url) + } + + /// Sets a webhook URL at the configured LSP for receiving LSPS5 notifications. + #[cfg(feature = "uniffi")] + pub fn set_webhook( + &self, app_name: String, webhook_url: String, + ) -> Result { + self.set_webhook_impl(app_name, webhook_url) + } + + fn list_webhooks_impl(&self) -> Result { + let liquidity_source = + self.liquidity_source.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + let (lsp_node_id, lsp_address) = + liquidity_source.get_lsps5_lsp_details().ok_or(Error::LiquiditySourceUnavailable)?; + let con_node_id = lsp_node_id; + let con_addr = lsp_address.clone(); + let con_cm = Arc::clone(&self.connection_manager); + + // We need to use our main runtime here as a local runtime might not be around to poll + // connection futures going forward. + self.runtime.block_on(async move { + con_cm.connect_peer_if_necessary(con_node_id, con_addr).await + })?; + + let liquidity_source = Arc::clone(&liquidity_source); + let response = + self.runtime.block_on(async move { liquidity_source.lsps5_list_webhooks().await })?; + Ok(response) + } + + /// Lists all currently configured webhooks at the configured LSP. + #[cfg(not(feature = "uniffi"))] + pub fn list_webhooks(&self) -> Result { + self.list_webhooks_impl() + } + + /// Lists all currently configured webhooks at the configured LSP. + #[cfg(feature = "uniffi")] + pub fn list_webhooks(&self) -> Result { + self.list_webhooks_impl().map(|response| response.into()) + } + + fn remove_webhook_impl(&self, app_name: String) -> Result { + let liquidity_source = + self.liquidity_source.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + + let (lsp_node_id, lsp_address) = + liquidity_source.get_lsps5_lsp_details().ok_or(Error::LiquiditySourceUnavailable)?; + + let con_node_id = lsp_node_id; + let con_addr = lsp_address.clone(); + let con_cm = Arc::clone(&self.connection_manager); + + // We need to use our main runtime here as a local runtime might not be around to poll + // connection futures going forward. + self.runtime.block_on(async move { + con_cm.connect_peer_if_necessary(con_node_id, con_addr).await + })?; + + log_info!(self.logger, "Connected to LSP {}@{}. ", lsp_node_id, lsp_address); + + let liquidity_source = Arc::clone(&liquidity_source); + let response = self + .runtime + .block_on(async move { liquidity_source.lsps5_remove_webhook(app_name).await })?; + + Ok(response) + } + + /// Removes a previously-configured webhook at the configured LSP. + #[cfg(not(feature = "uniffi"))] + pub fn remove_webhook(&self, app_name: String) -> Result { + self.remove_webhook_impl(app_name) + } + + /// Removes a previously-configured webhook at the configured LSP. + #[cfg(feature = "uniffi")] + pub fn remove_webhook(&self, app_name: String) -> Result { + self.remove_webhook_impl(app_name) + } + + /// Notifies the configured LSP about an incoming payment. + pub fn notify_payment_incoming(&self, client_id: PublicKey) -> Result<(), Error> { + let liquidity_source = + self.liquidity_source.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + liquidity_source.lsps5_notify_payment_incoming(client_id) + } +} + +#[cfg(feature = "uniffi")] +// Re-export LSPS5 response types for uniffi +pub use lightning_liquidity::lsps5::msgs::{ + RemoveWebhookResponse as LSPS5RemoveWebhookResponse, + SetWebhookResponse as LSPS5SetWebhookResponse, +}; + +#[cfg(feature = "uniffi")] +/// Wrapper for ListWebhooksResponse that converts LSPS5AppName to String for uniffi +#[derive(Clone, Debug)] +pub struct LSPS5ListWebhooksResponse { + pub app_names: Vec, + pub max_webhooks: u32, +} + +#[cfg(feature = "uniffi")] +impl From for LSPS5ListWebhooksResponse { + fn from(response: ListWebhooksResponse) -> Self { + Self { + app_names: response.app_names.into_iter().map(|name| name.to_string()).collect(), + max_webhooks: response.max_webhooks, + } + } +} diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index f2e8407cd..5981ad730 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -20,7 +20,7 @@ use common::{ }; use ldk_node::config::EsploraSyncConfig; -use ldk_node::liquidity::LSPS2ServiceConfig; +use ldk_node::liquidity::{HttpClient, HttpClientError, LSPS2ServiceConfig, LSPS5ServiceConfig}; use ldk_node::payment::{ ConfirmationStatus, PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus, QrPaymentResult, @@ -33,6 +33,7 @@ use lightning::routing::router::RouteParametersConfig; use lightning::util::persist::KVStoreSync; use lightning_invoice::{Bolt11InvoiceDescription, Description}; +use lightning_liquidity::lsps5::msgs::SetWebhookResponse; use lightning_types::payment::{PaymentHash, PaymentPreimage}; use bitcoin::address::NetworkUnchecked; @@ -41,9 +42,42 @@ use bitcoin::hashes::Hash; use bitcoin::{Address, Amount, ScriptBuf}; use log::LevelFilter; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::str::FromStr; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; + +/// Mock HTTP client for testing that captures all requests. +#[derive(Debug)] +pub struct MockHttpClient { + /// Vector of captured requests as (url, headers, body) tuples. + pub requests: Arc, String)>>>, +} + +impl MockHttpClient { + /// Create a new mock HTTP client. + pub fn new() -> Self { + Self { requests: Arc::new(Mutex::new(Vec::new())) } + } + + /// Get all captured requests. + pub fn get_requests(&self) -> Vec<(String, HashMap, String)> { + self.requests.lock().unwrap().clone() + } + + /// Check if a request was made to a specific URL. + pub fn has_request_to(&self, url: &str) -> bool { + self.requests.lock().unwrap().iter().any(|(req_url, _, _)| req_url == url) + } +} + +impl HttpClient for MockHttpClient { + fn post( + &self, url: &str, headers: &HashMap, body: &str, + ) -> Result<(), HttpClientError> { + self.requests.lock().unwrap().push((url.to_string(), headers.clone(), body.to_string())); + Ok(()) + } +} #[test] fn channel_full_cycle() { @@ -1723,6 +1757,140 @@ fn lsps2_client_service_integration() { assert_eq!(client_node.payment(&payment_id).unwrap().status, PaymentStatus::Failed); } +#[test] +fn lsps5_webhook_flow() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + + let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap()); + let sync_config = EsploraSyncConfig { background_sync_config: None }; + + let service_config = random_config(true); + let mock_http_client = Arc::new(MockHttpClient::new()); + let mock_http_client_for_test = Arc::clone(&mock_http_client); + + let lsps5_service_config = LSPS5ServiceConfig::with_http_client(5, mock_http_client); + + setup_builder!(service_builder, service_config.node_config); + service_builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); + service_builder.set_liquidity_provider_lsps5(lsps5_service_config); + let service_node = service_builder.build().unwrap(); + service_node.start().unwrap(); + + let service_node_id = service_node.node_id(); + let service_addr = service_node.listening_addresses().unwrap().first().unwrap().clone(); + + let client_config = random_config(true); + setup_builder!(client_builder, client_config.node_config); + client_builder.set_chain_source_esplora(esplora_url, Some(sync_config)); + client_builder.set_liquidity_source_lsps5(service_node_id, service_addr); + let client_node = client_builder.build().unwrap(); + client_node.start().unwrap(); + + let service_onchain_addr = service_node.onchain_payment().new_address().unwrap(); + let client_onchain_addr = client_node.onchain_payment().new_address().unwrap(); + + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![service_onchain_addr, client_onchain_addr], + Amount::from_sat(5_000_000), + ); + service_node.sync_wallets().unwrap(); + client_node.sync_wallets().unwrap(); + + open_channel(&client_node, &service_node, 1_000_000, false, &electrsd); + + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6); + service_node.sync_wallets().unwrap(); + client_node.sync_wallets().unwrap(); + expect_channel_ready_event!(client_node, service_node.node_id()); + expect_channel_ready_event!(service_node, client_node.node_id()); + + let app_name = "walletapp".to_string(); + let webhook_url = "https://www.whatever.com/webhook".to_string(); + let client_lsps5_liquidity = client_node.lsps5_liquidity(); + let service_lsps5_liquidity = service_node.lsps5_liquidity(); + let registration_result = + client_lsps5_liquidity.set_webhook(app_name.clone(), webhook_url.clone()); + + let registration: SetWebhookResponse = registration_result.unwrap(); + + assert_eq!(registration.num_webhooks, 1); + assert_eq!(registration.max_webhooks, 5); + assert!(!registration.no_change); + + std::thread::sleep(std::time::Duration::from_secs(1)); + + { + let captured_requests = mock_http_client_for_test.get_requests(); + + assert_eq!( + captured_requests.len(), + 1, + "Expected exactly 1 webhook request after registration" + ); + + let (url, headers, body) = &captured_requests[0]; + + assert_eq!(url, &webhook_url, "Webhook URL should match the registered URL"); + assert!(headers.contains_key("Content-Type"), "Should have Content-Type header"); + assert_eq!( + headers.get("Content-Type").unwrap(), + "application/json", + "Content-Type should be application/json" + ); + assert!(headers.contains_key("x-lsps5-timestamp"), "Should have x-lsps5-timestamp header"); + assert!(headers.contains_key("x-lsps5-signature"), "Should have x-lsps5-signature header"); + + let parsed_body: serde_json::Value = + serde_json::from_str(body).expect("Body should be valid JSON"); + assert_eq!( + parsed_body.get("jsonrpc"), + Some(&serde_json::Value::String("2.0".to_string())), + "Should have jsonrpc 2.0" + ); + assert_eq!( + parsed_body.get("method"), + Some(&serde_json::Value::String("lsps5.webhook_registered".to_string())), + "Method should be lsps5.webhook_registered" + ); + assert!(parsed_body.get("params").is_some(), "Should have params object"); + } + + let list = client_lsps5_liquidity.list_webhooks().unwrap(); + assert_eq!( + vec![app_name.clone()], + list.app_names.into_iter().map(|s| s.to_string()).collect::>() + ); + assert_eq!(list.max_webhooks, 5); + + service_lsps5_liquidity.notify_payment_incoming(client_node.node_id()).unwrap(); + + std::thread::sleep(std::time::Duration::from_secs(3)); + + { + let captured_requests = mock_http_client_for_test.get_requests(); + + assert!(captured_requests.len() == 2, "Should have at least the registration webhook"); + + let (url, _headers, body) = &captured_requests[1]; + assert_eq!(url, &webhook_url, "second webhook URL should match the registered URL"); + + let parsed_body: serde_json::Value = + serde_json::from_str(body).expect("Body should be valid JSON"); + assert_eq!( + parsed_body.get("method"), + Some(&serde_json::Value::String("lsps5.payment_incoming".to_string())), + "second webhook should be payment_incoming" + ); + } + + let _ = client_lsps5_liquidity.remove_webhook(app_name.clone()).unwrap(); + + let list_after = client_lsps5_liquidity.list_webhooks().unwrap(); + assert!(list_after.app_names.is_empty()); +} + #[test] fn facade_logging() { let (_bitcoind, electrsd) = setup_bitcoind_and_electrsd();