From d9157eb93fe3404eac1abd870947996178cbaf04 Mon Sep 17 00:00:00 2001 From: Martin Saposnic Date: Thu, 7 Aug 2025 11:08:06 -0300 Subject: [PATCH 1/3] Add LSPS5 DOS protections. When handling an incoming LSPS5 request, the manager will check if the counterparty is 'engaged' in some way before responding. `Engaged` meaning = active channel | LSPS2 active operation | LSPS1 active operation. Logic: `If not engaged then reject request;` A single test is added only checking for the active channel condition, because it's not super easy to get LSPS1-2 on the correct state to check this (yet). Other tangential work is happening that will make this easier and more tests will come in the near future --- lightning-liquidity/src/lsps1/service.rs | 11 + lightning-liquidity/src/lsps2/service.rs | 91 ++++++- lightning-liquidity/src/lsps5/service.rs | 25 +- lightning-liquidity/src/manager.rs | 26 ++ .../tests/lsps5_integration_tests.rs | 240 +++++++++++++++++- 5 files changed, 387 insertions(+), 6 deletions(-) diff --git a/lightning-liquidity/src/lsps1/service.rs b/lightning-liquidity/src/lsps1/service.rs index aa10e735565..40707d3c20c 100644 --- a/lightning-liquidity/src/lsps1/service.rs +++ b/lightning-liquidity/src/lsps1/service.rs @@ -174,6 +174,17 @@ where &self.config } + pub(crate) fn has_active_requests(&self, counterparty_node_id: &PublicKey) -> bool { + let outer_state_lock = self.per_peer_state.read().unwrap(); + if let Some(inner_state_lock) = outer_state_lock.get(counterparty_node_id) { + let peer_state = inner_state_lock.lock().unwrap(); + !(peer_state.pending_requests.is_empty() + && peer_state.outbound_channels_by_order_id.is_empty()) + } else { + false + } + } + fn handle_get_info_request( &self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey, ) -> Result<(), LightningError> { diff --git a/lightning-liquidity/src/lsps2/service.rs b/lightning-liquidity/src/lsps2/service.rs index 309d7ae1755..a1645e502d7 100644 --- a/lightning-liquidity/src/lsps2/service.rs +++ b/lightning-liquidity/src/lsps2/service.rs @@ -108,8 +108,8 @@ struct ForwardPaymentAction(ChannelId, FeePayment); struct ForwardHTLCsAction(ChannelId, Vec); /// The different states a requested JIT channel can be in. -#[derive(Debug)] -enum OutboundJITChannelState { +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) enum OutboundJITChannelState { /// The JIT channel SCID was created after a buy request, and we are awaiting an initial payment /// of sufficient size to open the channel. PendingInitialPayment { payment_queue: PaymentQueue }, @@ -134,6 +134,30 @@ enum OutboundJITChannelState { PaymentForwarded { channel_id: ChannelId }, } +impl OutboundJITChannelState { + fn ord_index(&self) -> u8 { + match self { + OutboundJITChannelState::PendingInitialPayment { .. } => 0, + OutboundJITChannelState::PendingChannelOpen { .. } => 1, + OutboundJITChannelState::PendingPaymentForward { .. } => 2, + OutboundJITChannelState::PendingPayment { .. } => 3, + OutboundJITChannelState::PaymentForwarded { .. } => 4, + } + } +} + +impl PartialOrd for OutboundJITChannelState { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for OutboundJITChannelState { + fn cmp(&self, other: &Self) -> core::cmp::Ordering { + self.ord_index().cmp(&other.ord_index()) + } +} + impl OutboundJITChannelState { fn new() -> Self { OutboundJITChannelState::PendingInitialPayment { payment_queue: PaymentQueue::new() } @@ -572,6 +596,18 @@ where &self.config } + pub(crate) fn highest_state_for_peer( + &self, counterparty_node_id: &PublicKey, + ) -> Option { + let outer_state_lock = self.per_peer_state.read().unwrap(); + if let Some(inner_state_lock) = outer_state_lock.get(counterparty_node_id) { + let peer_state = inner_state_lock.lock().unwrap(); + peer_state.outbound_channels_by_intercept_scid.values().map(|c| c.state.clone()).max() + } else { + None + } + } + /// Used by LSP to inform a client requesting a JIT Channel the token they used is invalid. /// /// Should be called in response to receiving a [`LSPS2ServiceEvent::GetInfo`] event. @@ -1905,4 +1941,55 @@ mod tests { ); } } + + #[test] + fn highest_state_for_peer_orders() { + let opening_fee_params = LSPS2OpeningFeeParams { + min_fee_msat: 0, + proportional: 0, + valid_until: LSPSDateTime::from_str("1970-01-01T00:00:00Z").unwrap(), + min_lifetime: 0, + max_client_to_self_delay: 0, + min_payment_size_msat: 0, + max_payment_size_msat: 0, + promise: String::new(), + }; + + let mut map = new_hash_map(); + map.insert( + 0, + OutboundJITChannel { + state: OutboundJITChannelState::PendingInitialPayment { + payment_queue: PaymentQueue::new(), + }, + user_channel_id: 0, + opening_fee_params: opening_fee_params.clone(), + payment_size_msat: None, + }, + ); + map.insert( + 1, + OutboundJITChannel { + state: OutboundJITChannelState::PendingChannelOpen { + payment_queue: PaymentQueue::new(), + opening_fee_msat: 0, + }, + user_channel_id: 1, + opening_fee_params: opening_fee_params.clone(), + payment_size_msat: None, + }, + ); + map.insert( + 2, + OutboundJITChannel { + state: OutboundJITChannelState::PaymentForwarded { channel_id: ChannelId([0; 32]) }, + user_channel_id: 2, + opening_fee_params, + payment_size_msat: None, + }, + ); + + let max_state = map.values().map(|c| c.state.clone()).max().unwrap(); + assert!(matches!(max_state, OutboundJITChannelState::PaymentForwarded { .. })); + } } diff --git a/lightning-liquidity/src/lsps5/service.rs b/lightning-liquidity/src/lsps5/service.rs index e0fb3ab442a..04635dd3b9c 100644 --- a/lightning-liquidity/src/lsps5/service.rs +++ b/lightning-liquidity/src/lsps5/service.rs @@ -22,6 +22,7 @@ use crate::prelude::*; use crate::sync::{Arc, Mutex}; use crate::utils::time::TimeProvider; +use crate::lsps2::service::OutboundJITChannelState; use bitcoin::secp256k1::PublicKey; use lightning::ln::channelmanager::AChannelManager; @@ -150,6 +151,28 @@ where } } + /// Returns whether a request from the given client should be accepted. + /// + /// Prior activity includes an existing open channel, an active LSPS1 flow, + /// or an LSPS2 flow that has progressed to at least + /// [`OutboundJITChannelState::PendingChannelOpen`]. + pub(crate) fn can_accept_request( + &self, client_id: &PublicKey, lsps2_max_state: Option, + lsps1_has_activity: bool, + ) -> bool { + self.client_has_open_channel(client_id) + || lsps1_has_activity + || lsps2_max_state.map_or(false, |s| { + matches!( + s, + OutboundJITChannelState::PendingChannelOpen { .. } + | OutboundJITChannelState::PendingPaymentForward { .. } + | OutboundJITChannelState::PendingPayment { .. } + | OutboundJITChannelState::PaymentForwarded { .. } + ) + }) + } + fn check_prune_stale_webhooks(&self) { let now = LSPSDateTime::new_from_duration_since_epoch(self.time_provider.duration_since_epoch()); @@ -515,7 +538,7 @@ where *last_pruning = Some(now); } - fn client_has_open_channel(&self, client_id: &PublicKey) -> bool { + pub(crate) fn client_has_open_channel(&self, client_id: &PublicKey) -> bool { self.channel_manager .get_cm() .list_channels() diff --git a/lightning-liquidity/src/manager.rs b/lightning-liquidity/src/manager.rs index 6452bd32df3..8a09be4d83b 100644 --- a/lightning-liquidity/src/manager.rs +++ b/lightning-liquidity/src/manager.rs @@ -559,6 +559,32 @@ where LSPSMessage::LSPS5(msg @ LSPS5Message::Request(..)) => { match &self.lsps5_service_handler { Some(lsps5_service_handler) => { + let lsps2_max_state = self + .lsps2_service_handler + .as_ref() + .and_then(|h| h.highest_state_for_peer(sender_node_id)); + #[cfg(lsps1_service)] + let lsps1_has_active_requests = self + .lsps1_service_handler + .as_ref() + .map_or(false, |h| h.has_active_requests(sender_node_id)); + #[cfg(not(lsps1_service))] + let lsps1_has_active_requests = false; + + if !lsps5_service_handler.can_accept_request( + sender_node_id, + lsps2_max_state, + lsps1_has_active_requests, + ) { + return Err(LightningError { + err: format!( + "Rejecting LSPS5 request from {:?} without prior activity (requires open channel or active LSPS1 or LSPS2 flow)", + sender_node_id + ), + action: ErrorAction::IgnoreAndLog(Level::Debug), + }); + } + lsps5_service_handler.handle_message(msg, sender_node_id)?; }, None => { diff --git a/lightning-liquidity/tests/lsps5_integration_tests.rs b/lightning-liquidity/tests/lsps5_integration_tests.rs index 61e7c12f95b..f4bb4139c1f 100644 --- a/lightning-liquidity/tests/lsps5_integration_tests.rs +++ b/lightning-liquidity/tests/lsps5_integration_tests.rs @@ -4,14 +4,22 @@ mod common; use common::{create_service_and_client_nodes, get_lsps_message, LSPSNodes}; +use lightning::check_closed_event; +use lightning::events::ClosureReason; +use lightning::ln::channelmanager::InterceptId; use lightning::ln::functional_test_utils::{ - create_chanmon_cfgs, create_network, create_node_cfgs, create_node_chanmgrs, Node, + close_channel, create_chan_between_nodes, create_chanmon_cfgs, create_network, + create_node_cfgs, create_node_chanmgrs, Node, }; use lightning::ln::msgs::Init; use lightning::ln::peer_handler::CustomMessageHandler; use lightning::util::hash_tables::{HashMap, HashSet}; use lightning_liquidity::events::LiquidityEvent; use lightning_liquidity::lsps0::ser::LSPSDateTime; +use lightning_liquidity::lsps2::client::LSPS2ClientConfig; +use lightning_liquidity::lsps2::event::{LSPS2ClientEvent, LSPS2ServiceEvent}; +use lightning_liquidity::lsps2::msgs::LSPS2RawOpeningFeeParams; +use lightning_liquidity::lsps2::service::LSPS2ServiceConfig; use lightning_liquidity::lsps5::client::LSPS5ClientConfig; use lightning_liquidity::lsps5::event::{LSPS5ClientEvent, LSPS5ServiceEvent}; use lightning_liquidity::lsps5::msgs::{ @@ -27,6 +35,10 @@ use lightning_liquidity::lsps5::service::{ use lightning_liquidity::lsps5::validator::{LSPS5Validator, MAX_RECENT_SIGNATURES}; use lightning_liquidity::utils::time::{DefaultTimeProvider, TimeProvider}; use lightning_liquidity::{LiquidityClientConfig, LiquidityServiceConfig}; + +use lightning_types::payment::PaymentHash; + +use std::str::FromStr; use std::sync::{Arc, RwLock}; use std::time::Duration; @@ -62,6 +74,39 @@ pub(crate) fn lsps5_test_setup<'a, 'b, 'c>( (lsps_nodes, validator) } +pub(crate) fn lsps5_lsps2_test_setup<'a, 'b, 'c>( + nodes: Vec>, time_provider: Arc, +) -> (LSPSNodes<'a, 'b, 'c>, LSPS5Validator) { + let lsps5_service_config = LSPS5ServiceConfig::default(); + let lsps2_service_config = LSPS2ServiceConfig { promise_secret: [42; 32] }; + let service_config = LiquidityServiceConfig { + #[cfg(lsps1_service)] + lsps1_service_config: None, + lsps2_service_config: Some(lsps2_service_config), + lsps5_service_config: Some(lsps5_service_config), + advertise_service: true, + }; + + let lsps5_client_config = LSPS5ClientConfig::default(); + let lsps2_client_config = LSPS2ClientConfig::default(); + let client_config = LiquidityClientConfig { + lsps1_client_config: None, + lsps2_client_config: Some(lsps2_client_config), + lsps5_client_config: Some(lsps5_client_config), + }; + + let lsps_nodes = create_service_and_client_nodes( + nodes, + service_config, + client_config, + Arc::clone(&time_provider), + ); + + let validator = LSPS5Validator::new(); + + (lsps_nodes, validator) +} + struct MockTimeProvider { current_time: RwLock, } @@ -102,7 +147,8 @@ fn webhook_registration_flow() { let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); let nodes = create_network(2, &node_cfgs, &node_chanmgrs); let (lsps_nodes, _) = lsps5_test_setup(nodes, Arc::new(DefaultTimeProvider)); - let LSPSNodes { service_node, client_node } = lsps_nodes; + let LSPSNodes { service_node, client_node, .. } = lsps_nodes; + create_chan_between_nodes(&service_node.inner, &client_node.inner); let service_node_id = service_node.inner.node.get_our_node_id(); let client_node_id = client_node.inner.node.get_our_node_id(); let client_handler = client_node.liquidity_manager.lsps5_client_handler().unwrap(); @@ -293,6 +339,7 @@ fn webhook_error_handling_test() { let nodes = create_network(2, &node_cfgs, &node_chanmgrs); let (lsps_nodes, _) = lsps5_test_setup(nodes, Arc::new(DefaultTimeProvider)); let LSPSNodes { service_node, client_node } = lsps_nodes; + create_chan_between_nodes(&service_node.inner, &client_node.inner); let service_node_id = service_node.inner.node.get_our_node_id(); let client_node_id = client_node.inner.node.get_our_node_id(); @@ -417,6 +464,7 @@ fn webhook_notification_delivery_test() { let nodes = create_network(2, &node_cfgs, &node_chanmgrs); let (lsps_nodes, validator) = lsps5_test_setup(nodes, Arc::new(DefaultTimeProvider)); let LSPSNodes { service_node, client_node } = lsps_nodes; + create_chan_between_nodes(&service_node.inner, &client_node.inner); let service_node_id = service_node.inner.node.get_our_node_id(); let client_node_id = client_node.inner.node.get_our_node_id(); @@ -525,6 +573,7 @@ fn multiple_webhooks_notification_test() { let nodes = create_network(2, &node_cfgs, &node_chanmgrs); let (lsps_nodes, _) = lsps5_test_setup(nodes, Arc::new(DefaultTimeProvider)); let LSPSNodes { service_node, client_node } = lsps_nodes; + create_chan_between_nodes(&service_node.inner, &client_node.inner); let service_node_id = service_node.inner.node.get_our_node_id(); let client_node_id = client_node.inner.node.get_our_node_id(); @@ -625,6 +674,7 @@ fn idempotency_set_webhook_test() { let nodes = create_network(2, &node_cfgs, &node_chanmgrs); let (lsps_nodes, _) = lsps5_test_setup(nodes, Arc::new(DefaultTimeProvider)); let LSPSNodes { service_node, client_node } = lsps_nodes; + create_chan_between_nodes(&service_node.inner, &client_node.inner); let service_node_id = service_node.inner.node.get_our_node_id(); let client_node_id = client_node.inner.node.get_our_node_id(); @@ -725,6 +775,7 @@ fn replay_prevention_test() { let nodes = create_network(2, &node_cfgs, &node_chanmgrs); let (lsps_nodes, validator) = lsps5_test_setup(nodes, Arc::new(DefaultTimeProvider)); let LSPSNodes { service_node, client_node } = lsps_nodes; + create_chan_between_nodes(&service_node.inner, &client_node.inner); let service_node_id = service_node.inner.node.get_our_node_id(); let client_node_id = client_node.inner.node.get_our_node_id(); @@ -809,7 +860,8 @@ fn stale_webhooks() { let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); let nodes = create_network(2, &node_cfgs, &node_chanmgrs); - let (lsps_nodes, _) = lsps5_test_setup(nodes, time_provider); + let (lsps_nodes, _) = lsps5_lsps2_test_setup(nodes, time_provider); + establish_lsps2_prior_interaction(&lsps_nodes); let LSPSNodes { service_node, client_node } = lsps_nodes; let service_node_id = service_node.inner.node.get_our_node_id(); let client_node_id = client_node.inner.node.get_our_node_id(); @@ -877,6 +929,7 @@ fn test_all_notifications() { let nodes = create_network(2, &node_cfgs, &node_chanmgrs); let (lsps_nodes, validator) = lsps5_test_setup(nodes, Arc::new(DefaultTimeProvider)); let LSPSNodes { service_node, client_node } = lsps_nodes; + create_chan_between_nodes(&service_node.inner, &client_node.inner); let service_node_id = service_node.inner.node.get_our_node_id(); let client_node_id = client_node.inner.node.get_our_node_id(); @@ -936,6 +989,7 @@ fn test_tampered_notification() { let nodes = create_network(2, &node_cfgs, &node_chanmgrs); let (lsps_nodes, validator) = lsps5_test_setup(nodes, Arc::new(DefaultTimeProvider)); let LSPSNodes { service_node, client_node } = lsps_nodes; + create_chan_between_nodes(&service_node.inner, &client_node.inner); let service_node_id = service_node.inner.node.get_our_node_id(); let client_node_id = client_node.inner.node.get_our_node_id(); @@ -989,6 +1043,7 @@ fn test_bad_signature_notification() { let nodes = create_network(2, &node_cfgs, &node_chanmgrs); let (lsps_nodes, validator) = lsps5_test_setup(nodes, Arc::new(DefaultTimeProvider)); let LSPSNodes { service_node, client_node } = lsps_nodes; + create_chan_between_nodes(&service_node.inner, &client_node.inner); let service_node_id = service_node.inner.node.get_our_node_id(); let client_node_id = client_node.inner.node.get_our_node_id(); @@ -1037,6 +1092,7 @@ fn test_notify_without_webhooks_does_nothing() { let nodes = create_network(2, &node_cfgs, &node_chanmgrs); let (lsps_nodes, _) = lsps5_test_setup(nodes, Arc::new(DefaultTimeProvider)); let LSPSNodes { service_node, client_node } = lsps_nodes; + create_chan_between_nodes(&service_node.inner, &client_node.inner); let client_node_id = client_node.inner.node.get_our_node_id(); let service_handler = service_node.liquidity_manager.lsps5_service_handler().unwrap(); @@ -1059,6 +1115,7 @@ fn test_send_notifications_and_peer_connected_resets_cooldown() { let nodes = create_network(2, &node_cfgs, &node_chanmgrs); let (lsps_nodes, _) = lsps5_test_setup(nodes, time_provider); let LSPSNodes { service_node, client_node } = lsps_nodes; + create_chan_between_nodes(&service_node.inner, &client_node.inner); let service_node_id = service_node.inner.node.get_our_node_id(); let client_node_id = client_node.inner.node.get_our_node_id(); @@ -1163,3 +1220,180 @@ fn test_send_notifications_and_peer_connected_resets_cooldown() { _ => panic!("Expected SendWebhookNotification event after peer_connected"), } } + +#[test] +fn dos_protection() { + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); + let nodes = create_network(2, &node_cfgs, &node_chanmgrs); + let (lsps_nodes, _) = lsps5_test_setup(nodes, Arc::new(DefaultTimeProvider)); + let LSPSNodes { service_node, client_node } = lsps_nodes; + let client_node_id = client_node.inner.node.get_our_node_id(); + let service_node_id = service_node.inner.node.get_our_node_id(); + + let client_handler = client_node.liquidity_manager.lsps5_client_handler().unwrap(); + + let assert_reject = || -> () { + let _ = client_handler + .set_webhook( + service_node_id, + "App".to_string(), + "https://example.org/webhook".to_string(), + ) + .expect("Request should send"); + let request = get_lsps_message!(client_node, service_node_id); + + let result = service_node.liquidity_manager.handle_custom_message(request, client_node_id); + assert!(result.is_err(), "Service should reject request without prior interaction"); + + assert!(service_node.liquidity_manager.get_and_clear_pending_msg().is_empty()); + }; + + let assert_accept = || -> () { + let _ = client_handler + .set_webhook( + service_node_id, + "App".to_string(), + "https://example.org/webhook".to_string(), + ) + .expect("Request should send"); + let request = get_lsps_message!(client_node, service_node_id); + + let result = service_node.liquidity_manager.handle_custom_message(request, client_node_id); + assert!(result.is_ok(), "Service should accept request after prior interaction"); + let _ = service_node.liquidity_manager.next_event().unwrap(); + let response = get_lsps_message!(service_node, client_node_id); + client_node + .liquidity_manager + .handle_custom_message(response, service_node_id) + .expect("Client should handle response"); + let _ = client_node.liquidity_manager.next_event().unwrap(); + }; + + // no channel is open so far -> should reject + assert_reject(); + + let (_, _, _, channel_id, funding_tx) = + create_chan_between_nodes(&service_node.inner, &client_node.inner); + + // now that a channel is open, should accept + assert_accept(); + + close_channel(&service_node.inner, &client_node.inner, &channel_id, funding_tx, true); + let node_a_reason = ClosureReason::CounterpartyInitiatedCooperativeClosure; + check_closed_event!(service_node.inner, 1, node_a_reason, [client_node_id], 100000); + let node_b_reason = ClosureReason::LocallyInitiatedCooperativeClosure; + check_closed_event!(client_node.inner, 1, node_b_reason, [service_node_id], 100000); + + // channel is now closed again -> should reject + assert_reject(); +} + +#[test] +fn lsps2_state_allows_lsps5_request() { + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); + let nodes = create_network(2, &node_cfgs, &node_chanmgrs); + + let (lsps_nodes, _) = lsps5_lsps2_test_setup(nodes, Arc::new(DefaultTimeProvider)); + establish_lsps2_prior_interaction(&lsps_nodes); + + let LSPSNodes { service_node, client_node } = lsps_nodes; + let service_node_id = service_node.inner.node.get_our_node_id(); + let client_node_id = client_node.inner.node.get_our_node_id(); + + let lsps5_client = client_node.liquidity_manager.lsps5_client_handler().unwrap(); + + let _ = lsps5_client + .set_webhook(service_node_id, "App".to_string(), "https://example.org/webhook".to_string()) + .expect("Request should send"); + let request = get_lsps_message!(client_node, service_node_id); + let result = service_node.liquidity_manager.handle_custom_message(request, client_node_id); + assert!(result.is_ok(), "Service should accept request based on LSPS2 state"); +} + +fn establish_lsps2_prior_interaction(lsps_nodes: &LSPSNodes) { + let service_node = &lsps_nodes.service_node; + let client_node = &lsps_nodes.client_node; + + let service_node_id = service_node.inner.node.get_our_node_id(); + let client_node_id = client_node.inner.node.get_our_node_id(); + + let lsps2_client = client_node.liquidity_manager.lsps2_client_handler().unwrap(); + let lsps2_service = service_node.liquidity_manager.lsps2_service_handler().unwrap(); + + let get_info_request_id = lsps2_client.request_opening_params(service_node_id, None); + let get_info_req = get_lsps_message!(client_node, service_node_id); + service_node.liquidity_manager.handle_custom_message(get_info_req, client_node_id).unwrap(); + + let get_info_event = service_node.liquidity_manager.next_event().unwrap(); + let opening_fee_params = match get_info_event { + LiquidityEvent::LSPS2Service(LSPS2ServiceEvent::GetInfo { + request_id, + counterparty_node_id, + .. + }) => { + assert_eq!(request_id, get_info_request_id); + assert_eq!(counterparty_node_id, client_node_id); + let raw_opening_params = LSPS2RawOpeningFeeParams { + min_fee_msat: 1000, + proportional: 0, + valid_until: LSPSDateTime::from_str("2035-05-20T08:30:45Z").unwrap(), + min_lifetime: 144, + max_client_to_self_delay: 144, + min_payment_size_msat: 1, + max_payment_size_msat: 1_000_000_000, + }; + lsps2_service + .opening_fee_params_generated( + &client_node_id, + request_id.clone(), + vec![raw_opening_params], + ) + .unwrap(); + let response = get_lsps_message!(service_node, client_node_id); + client_node.liquidity_manager.handle_custom_message(response, service_node_id).unwrap(); + match client_node.liquidity_manager.next_event().unwrap() { + LiquidityEvent::LSPS2Client(LSPS2ClientEvent::OpeningParametersReady { + opening_fee_params_menu, + .. + }) => opening_fee_params_menu.first().unwrap().clone(), + _ => panic!("Unexpected event"), + } + }, + _ => panic!("Unexpected event"), + }; + + let payment_size_msat = Some(1_000_000); + let buy_request_id = lsps2_client + .select_opening_params(service_node_id, payment_size_msat, opening_fee_params.clone()) + .unwrap(); + let buy_req = get_lsps_message!(client_node, service_node_id); + service_node.liquidity_manager.handle_custom_message(buy_req, client_node_id).unwrap(); + let _ = service_node.liquidity_manager.next_event().unwrap(); + + let intercept_scid = service_node.inner.node.get_intercept_scid(); + let user_channel_id = 7; + let cltv_expiry_delta = 144; + lsps2_service + .invoice_parameters_generated( + &client_node_id, + buy_request_id.clone(), + intercept_scid, + cltv_expiry_delta, + true, + user_channel_id, + ) + .unwrap(); + let buy_resp = get_lsps_message!(service_node, client_node_id); + client_node.liquidity_manager.handle_custom_message(buy_resp, service_node_id).unwrap(); + let _ = client_node.liquidity_manager.next_event().unwrap(); + + let intercept_id = InterceptId([0; 32]); + let payment_hash = PaymentHash([1; 32]); + lsps2_service.htlc_intercepted(intercept_scid, intercept_id, 1_000_000, payment_hash).unwrap(); + + let _ = service_node.liquidity_manager.next_event().unwrap(); +} From 9a5975587872ca6f1390a24c9491832e92678e09 Mon Sep 17 00:00:00 2001 From: Martin Saposnic Date: Tue, 12 Aug 2025 12:17:15 -0300 Subject: [PATCH 2/3] fixup: address comments. changes: - refactor highest_state_for_peer and has_active_requests for better readabiliy - refactor ordering logic OutboundJITChannelState --- lightning-liquidity/src/lsps1/service.rs | 8 ++--- lightning-liquidity/src/lsps2/service.rs | 39 ++++++++++++++++-------- lightning-liquidity/src/lsps5/service.rs | 12 ++------ 3 files changed, 31 insertions(+), 28 deletions(-) diff --git a/lightning-liquidity/src/lsps1/service.rs b/lightning-liquidity/src/lsps1/service.rs index 40707d3c20c..4acfb81eb32 100644 --- a/lightning-liquidity/src/lsps1/service.rs +++ b/lightning-liquidity/src/lsps1/service.rs @@ -176,13 +176,11 @@ where pub(crate) fn has_active_requests(&self, counterparty_node_id: &PublicKey) -> bool { let outer_state_lock = self.per_peer_state.read().unwrap(); - if let Some(inner_state_lock) = outer_state_lock.get(counterparty_node_id) { - let peer_state = inner_state_lock.lock().unwrap(); + outer_state_lock.get(counterparty_node_id).map_or(false, |inner| { + let peer_state = inner.lock().unwrap(); !(peer_state.pending_requests.is_empty() && peer_state.outbound_channels_by_order_id.is_empty()) - } else { - false - } + }) } fn handle_get_info_request( diff --git a/lightning-liquidity/src/lsps2/service.rs b/lightning-liquidity/src/lsps2/service.rs index a1645e502d7..8bcfe1118c1 100644 --- a/lightning-liquidity/src/lsps2/service.rs +++ b/lightning-liquidity/src/lsps2/service.rs @@ -107,8 +107,17 @@ struct ForwardPaymentAction(ChannelId, FeePayment); #[derive(Debug, PartialEq)] struct ForwardHTLCsAction(ChannelId, Vec); +#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)] +pub(crate) enum OutboundJITStage { + PendingInitialPayment, + PendingChannelOpen, + PendingPaymentForward, + PendingPayment, + PaymentForwarded, +} + /// The different states a requested JIT channel can be in. -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq, Clone)] pub(crate) enum OutboundJITChannelState { /// The JIT channel SCID was created after a buy request, and we are awaiting an initial payment /// of sufficient size to open the channel. @@ -135,13 +144,19 @@ pub(crate) enum OutboundJITChannelState { } impl OutboundJITChannelState { - fn ord_index(&self) -> u8 { + pub(crate) fn stage(&self) -> OutboundJITStage { match self { - OutboundJITChannelState::PendingInitialPayment { .. } => 0, - OutboundJITChannelState::PendingChannelOpen { .. } => 1, - OutboundJITChannelState::PendingPaymentForward { .. } => 2, - OutboundJITChannelState::PendingPayment { .. } => 3, - OutboundJITChannelState::PaymentForwarded { .. } => 4, + OutboundJITChannelState::PendingInitialPayment { .. } => { + OutboundJITStage::PendingInitialPayment + }, + OutboundJITChannelState::PendingChannelOpen { .. } => { + OutboundJITStage::PendingChannelOpen + }, + OutboundJITChannelState::PendingPaymentForward { .. } => { + OutboundJITStage::PendingPaymentForward + }, + OutboundJITChannelState::PendingPayment { .. } => OutboundJITStage::PendingPayment, + OutboundJITChannelState::PaymentForwarded { .. } => OutboundJITStage::PaymentForwarded, } } } @@ -154,7 +169,7 @@ impl PartialOrd for OutboundJITChannelState { impl Ord for OutboundJITChannelState { fn cmp(&self, other: &Self) -> core::cmp::Ordering { - self.ord_index().cmp(&other.ord_index()) + self.stage().cmp(&other.stage()) } } @@ -600,12 +615,10 @@ where &self, counterparty_node_id: &PublicKey, ) -> Option { let outer_state_lock = self.per_peer_state.read().unwrap(); - if let Some(inner_state_lock) = outer_state_lock.get(counterparty_node_id) { - let peer_state = inner_state_lock.lock().unwrap(); + outer_state_lock.get(counterparty_node_id).and_then(|inner| { + let peer_state = inner.lock().unwrap(); peer_state.outbound_channels_by_intercept_scid.values().map(|c| c.state.clone()).max() - } else { - None - } + }) } /// Used by LSP to inform a client requesting a JIT Channel the token they used is invalid. diff --git a/lightning-liquidity/src/lsps5/service.rs b/lightning-liquidity/src/lsps5/service.rs index 04635dd3b9c..2870d9f1aac 100644 --- a/lightning-liquidity/src/lsps5/service.rs +++ b/lightning-liquidity/src/lsps5/service.rs @@ -22,7 +22,7 @@ use crate::prelude::*; use crate::sync::{Arc, Mutex}; use crate::utils::time::TimeProvider; -use crate::lsps2::service::OutboundJITChannelState; +use crate::lsps2::service::{OutboundJITChannelState, OutboundJITStage}; use bitcoin::secp256k1::PublicKey; use lightning::ln::channelmanager::AChannelManager; @@ -162,15 +162,7 @@ where ) -> bool { self.client_has_open_channel(client_id) || lsps1_has_activity - || lsps2_max_state.map_or(false, |s| { - matches!( - s, - OutboundJITChannelState::PendingChannelOpen { .. } - | OutboundJITChannelState::PendingPaymentForward { .. } - | OutboundJITChannelState::PendingPayment { .. } - | OutboundJITChannelState::PaymentForwarded { .. } - ) - }) + || lsps2_max_state.map_or(false, |s| s.stage() >= OutboundJITStage::PendingChannelOpen) } fn check_prune_stale_webhooks(&self) { From 109de16ced288f092901a088cffd5805dc80cb42 Mon Sep 17 00:00:00 2001 From: Martin Saposnic Date: Wed, 13 Aug 2025 14:52:30 -0300 Subject: [PATCH 3/3] fixup: make OutboundJITChannelState private, and do bool helper for checking if lsps2 has any pendingChannelOpen or above --- lightning-liquidity/src/lsps2/service.rs | 111 ++-------------- lightning-liquidity/src/lsps5/service.rs | 10 +- lightning-liquidity/src/manager.rs | 6 +- .../tests/lsps5_integration_tests.rs | 122 +++++++++++------- 4 files changed, 98 insertions(+), 151 deletions(-) diff --git a/lightning-liquidity/src/lsps2/service.rs b/lightning-liquidity/src/lsps2/service.rs index 8bcfe1118c1..d3440099633 100644 --- a/lightning-liquidity/src/lsps2/service.rs +++ b/lightning-liquidity/src/lsps2/service.rs @@ -107,18 +107,9 @@ struct ForwardPaymentAction(ChannelId, FeePayment); #[derive(Debug, PartialEq)] struct ForwardHTLCsAction(ChannelId, Vec); -#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)] -pub(crate) enum OutboundJITStage { - PendingInitialPayment, - PendingChannelOpen, - PendingPaymentForward, - PendingPayment, - PaymentForwarded, -} - /// The different states a requested JIT channel can be in. -#[derive(Debug, PartialEq, Eq, Clone)] -pub(crate) enum OutboundJITChannelState { +#[derive(Debug)] +enum OutboundJITChannelState { /// The JIT channel SCID was created after a buy request, and we are awaiting an initial payment /// of sufficient size to open the channel. PendingInitialPayment { payment_queue: PaymentQueue }, @@ -143,36 +134,6 @@ pub(crate) enum OutboundJITChannelState { PaymentForwarded { channel_id: ChannelId }, } -impl OutboundJITChannelState { - pub(crate) fn stage(&self) -> OutboundJITStage { - match self { - OutboundJITChannelState::PendingInitialPayment { .. } => { - OutboundJITStage::PendingInitialPayment - }, - OutboundJITChannelState::PendingChannelOpen { .. } => { - OutboundJITStage::PendingChannelOpen - }, - OutboundJITChannelState::PendingPaymentForward { .. } => { - OutboundJITStage::PendingPaymentForward - }, - OutboundJITChannelState::PendingPayment { .. } => OutboundJITStage::PendingPayment, - OutboundJITChannelState::PaymentForwarded { .. } => OutboundJITStage::PaymentForwarded, - } - } -} - -impl PartialOrd for OutboundJITChannelState { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for OutboundJITChannelState { - fn cmp(&self, other: &Self) -> core::cmp::Ordering { - self.stage().cmp(&other.stage()) - } -} - impl OutboundJITChannelState { fn new() -> Self { OutboundJITChannelState::PendingInitialPayment { payment_queue: PaymentQueue::new() } @@ -611,13 +572,20 @@ where &self.config } - pub(crate) fn highest_state_for_peer( - &self, counterparty_node_id: &PublicKey, - ) -> Option { + /// Returns whether the peer has any opening or open JIT channels. + pub(crate) fn has_opening_or_open_jit_channel(&self, counterparty_node_id: &PublicKey) -> bool { let outer_state_lock = self.per_peer_state.read().unwrap(); - outer_state_lock.get(counterparty_node_id).and_then(|inner| { + outer_state_lock.get(counterparty_node_id).map_or(false, |inner| { let peer_state = inner.lock().unwrap(); - peer_state.outbound_channels_by_intercept_scid.values().map(|c| c.state.clone()).max() + peer_state.outbound_channels_by_intercept_scid.values().any(|chan| { + matches!( + chan.state, + OutboundJITChannelState::PendingChannelOpen { .. } + | OutboundJITChannelState::PendingPaymentForward { .. } + | OutboundJITChannelState::PendingPayment { .. } + | OutboundJITChannelState::PaymentForwarded { .. } + ) + }) }) } @@ -1954,55 +1922,4 @@ mod tests { ); } } - - #[test] - fn highest_state_for_peer_orders() { - let opening_fee_params = LSPS2OpeningFeeParams { - min_fee_msat: 0, - proportional: 0, - valid_until: LSPSDateTime::from_str("1970-01-01T00:00:00Z").unwrap(), - min_lifetime: 0, - max_client_to_self_delay: 0, - min_payment_size_msat: 0, - max_payment_size_msat: 0, - promise: String::new(), - }; - - let mut map = new_hash_map(); - map.insert( - 0, - OutboundJITChannel { - state: OutboundJITChannelState::PendingInitialPayment { - payment_queue: PaymentQueue::new(), - }, - user_channel_id: 0, - opening_fee_params: opening_fee_params.clone(), - payment_size_msat: None, - }, - ); - map.insert( - 1, - OutboundJITChannel { - state: OutboundJITChannelState::PendingChannelOpen { - payment_queue: PaymentQueue::new(), - opening_fee_msat: 0, - }, - user_channel_id: 1, - opening_fee_params: opening_fee_params.clone(), - payment_size_msat: None, - }, - ); - map.insert( - 2, - OutboundJITChannel { - state: OutboundJITChannelState::PaymentForwarded { channel_id: ChannelId([0; 32]) }, - user_channel_id: 2, - opening_fee_params, - payment_size_msat: None, - }, - ); - - let max_state = map.values().map(|c| c.state.clone()).max().unwrap(); - assert!(matches!(max_state, OutboundJITChannelState::PaymentForwarded { .. })); - } } diff --git a/lightning-liquidity/src/lsps5/service.rs b/lightning-liquidity/src/lsps5/service.rs index 2870d9f1aac..a7b0e196ba1 100644 --- a/lightning-liquidity/src/lsps5/service.rs +++ b/lightning-liquidity/src/lsps5/service.rs @@ -22,7 +22,6 @@ use crate::prelude::*; use crate::sync::{Arc, Mutex}; use crate::utils::time::TimeProvider; -use crate::lsps2::service::{OutboundJITChannelState, OutboundJITStage}; use bitcoin::secp256k1::PublicKey; use lightning::ln::channelmanager::AChannelManager; @@ -154,15 +153,14 @@ where /// Returns whether a request from the given client should be accepted. /// /// Prior activity includes an existing open channel, an active LSPS1 flow, - /// or an LSPS2 flow that has progressed to at least - /// [`OutboundJITChannelState::PendingChannelOpen`]. + /// or an LSPS2 flow that has an opening or open JIT channel. pub(crate) fn can_accept_request( - &self, client_id: &PublicKey, lsps2_max_state: Option, + &self, client_id: &PublicKey, lsps2_has_opening_or_open_jit_channel: bool, lsps1_has_activity: bool, ) -> bool { self.client_has_open_channel(client_id) + || lsps2_has_opening_or_open_jit_channel || lsps1_has_activity - || lsps2_max_state.map_or(false, |s| s.stage() >= OutboundJITStage::PendingChannelOpen) } fn check_prune_stale_webhooks(&self) { @@ -530,7 +528,7 @@ where *last_pruning = Some(now); } - pub(crate) fn client_has_open_channel(&self, client_id: &PublicKey) -> bool { + fn client_has_open_channel(&self, client_id: &PublicKey) -> bool { self.channel_manager .get_cm() .list_channels() diff --git a/lightning-liquidity/src/manager.rs b/lightning-liquidity/src/manager.rs index 8a09be4d83b..3cf64b62c9c 100644 --- a/lightning-liquidity/src/manager.rs +++ b/lightning-liquidity/src/manager.rs @@ -559,10 +559,10 @@ where LSPSMessage::LSPS5(msg @ LSPS5Message::Request(..)) => { match &self.lsps5_service_handler { Some(lsps5_service_handler) => { - let lsps2_max_state = self + let lsps2_has_opening_or_open_jit_channel = self .lsps2_service_handler .as_ref() - .and_then(|h| h.highest_state_for_peer(sender_node_id)); + .map_or(false, |h| h.has_opening_or_open_jit_channel(sender_node_id)); #[cfg(lsps1_service)] let lsps1_has_active_requests = self .lsps1_service_handler @@ -573,7 +573,7 @@ where if !lsps5_service_handler.can_accept_request( sender_node_id, - lsps2_max_state, + lsps2_has_opening_or_open_jit_channel, lsps1_has_active_requests, ) { return Err(LightningError { diff --git a/lightning-liquidity/tests/lsps5_integration_tests.rs b/lightning-liquidity/tests/lsps5_integration_tests.rs index f4bb4139c1f..f54db723af7 100644 --- a/lightning-liquidity/tests/lsps5_integration_tests.rs +++ b/lightning-liquidity/tests/lsps5_integration_tests.rs @@ -1221,64 +1221,82 @@ fn test_send_notifications_and_peer_connected_resets_cooldown() { } } -#[test] -fn dos_protection() { - let chanmon_cfgs = create_chanmon_cfgs(2); - let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); - let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); - let nodes = create_network(2, &node_cfgs, &node_chanmgrs); - let (lsps_nodes, _) = lsps5_test_setup(nodes, Arc::new(DefaultTimeProvider)); - let LSPSNodes { service_node, client_node } = lsps_nodes; - let client_node_id = client_node.inner.node.get_our_node_id(); - let service_node_id = service_node.inner.node.get_our_node_id(); - - let client_handler = client_node.liquidity_manager.lsps5_client_handler().unwrap(); - - let assert_reject = || -> () { - let _ = client_handler +macro_rules! assert_lsps5_reject { + ($client_handler:expr, $service_node:expr, $client_node:expr, $service_node_id:expr, $client_node_id:expr) => {{ + let _ = $client_handler .set_webhook( - service_node_id, + $service_node_id, "App".to_string(), "https://example.org/webhook".to_string(), ) .expect("Request should send"); - let request = get_lsps_message!(client_node, service_node_id); + let request = get_lsps_message!($client_node, $service_node_id); - let result = service_node.liquidity_manager.handle_custom_message(request, client_node_id); + let result = + $service_node.liquidity_manager.handle_custom_message(request, $client_node_id); assert!(result.is_err(), "Service should reject request without prior interaction"); - assert!(service_node.liquidity_manager.get_and_clear_pending_msg().is_empty()); - }; + assert!($service_node.liquidity_manager.get_and_clear_pending_msg().is_empty()); + }}; +} - let assert_accept = || -> () { - let _ = client_handler +macro_rules! assert_lsps5_accept { + ($client_handler:expr, $service_node:expr, $client_node:expr, $service_node_id:expr, $client_node_id:expr) => {{ + let _ = $client_handler .set_webhook( - service_node_id, + $service_node_id, "App".to_string(), "https://example.org/webhook".to_string(), ) .expect("Request should send"); - let request = get_lsps_message!(client_node, service_node_id); + let request = get_lsps_message!($client_node, $service_node_id); - let result = service_node.liquidity_manager.handle_custom_message(request, client_node_id); + let result = + $service_node.liquidity_manager.handle_custom_message(request, $client_node_id); assert!(result.is_ok(), "Service should accept request after prior interaction"); - let _ = service_node.liquidity_manager.next_event().unwrap(); - let response = get_lsps_message!(service_node, client_node_id); - client_node + let _ = $service_node.liquidity_manager.next_event().unwrap(); + let response = get_lsps_message!($service_node, $client_node_id); + $client_node .liquidity_manager - .handle_custom_message(response, service_node_id) + .handle_custom_message(response, $service_node_id) .expect("Client should handle response"); - let _ = client_node.liquidity_manager.next_event().unwrap(); - }; + let _ = $client_node.liquidity_manager.next_event().unwrap(); + }}; +} + +#[test] +fn dos_protection() { + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); + let nodes = create_network(2, &node_cfgs, &node_chanmgrs); + let (lsps_nodes, _) = lsps5_test_setup(nodes, Arc::new(DefaultTimeProvider)); + let LSPSNodes { service_node, client_node } = lsps_nodes; + let client_node_id = client_node.inner.node.get_our_node_id(); + let service_node_id = service_node.inner.node.get_our_node_id(); + + let client_handler = client_node.liquidity_manager.lsps5_client_handler().unwrap(); // no channel is open so far -> should reject - assert_reject(); + assert_lsps5_reject!( + client_handler, + service_node, + client_node, + service_node_id, + client_node_id + ); let (_, _, _, channel_id, funding_tx) = create_chan_between_nodes(&service_node.inner, &client_node.inner); // now that a channel is open, should accept - assert_accept(); + assert_lsps5_accept!( + client_handler, + service_node, + client_node, + service_node_id, + client_node_id + ); close_channel(&service_node.inner, &client_node.inner, &channel_id, funding_tx, true); let node_a_reason = ClosureReason::CounterpartyInitiatedCooperativeClosure; @@ -1287,7 +1305,13 @@ fn dos_protection() { check_closed_event!(client_node.inner, 1, node_b_reason, [service_node_id], 100000); // channel is now closed again -> should reject - assert_reject(); + assert_lsps5_reject!( + client_handler, + service_node, + client_node, + service_node_id, + client_node_id + ); } #[test] @@ -1298,20 +1322,28 @@ fn lsps2_state_allows_lsps5_request() { let nodes = create_network(2, &node_cfgs, &node_chanmgrs); let (lsps_nodes, _) = lsps5_lsps2_test_setup(nodes, Arc::new(DefaultTimeProvider)); - establish_lsps2_prior_interaction(&lsps_nodes); - let LSPSNodes { service_node, client_node } = lsps_nodes; - let service_node_id = service_node.inner.node.get_our_node_id(); - let client_node_id = client_node.inner.node.get_our_node_id(); + let client_node_id = lsps_nodes.client_node.inner.node.get_our_node_id(); + let service_node_id = lsps_nodes.service_node.inner.node.get_our_node_id(); + let client_handler = lsps_nodes.client_node.liquidity_manager.lsps5_client_handler().unwrap(); - let lsps5_client = client_node.liquidity_manager.lsps5_client_handler().unwrap(); + assert_lsps5_reject!( + client_handler, + lsps_nodes.service_node, + lsps_nodes.client_node, + service_node_id, + client_node_id + ); - let _ = lsps5_client - .set_webhook(service_node_id, "App".to_string(), "https://example.org/webhook".to_string()) - .expect("Request should send"); - let request = get_lsps_message!(client_node, service_node_id); - let result = service_node.liquidity_manager.handle_custom_message(request, client_node_id); - assert!(result.is_ok(), "Service should accept request based on LSPS2 state"); + establish_lsps2_prior_interaction(&lsps_nodes); + + assert_lsps5_accept!( + client_handler, + lsps_nodes.service_node, + lsps_nodes.client_node, + service_node_id, + client_node_id + ); } fn establish_lsps2_prior_interaction(lsps_nodes: &LSPSNodes) {