diff --git a/lightning-liquidity/src/lsps1/msgs.rs b/lightning-liquidity/src/lsps1/msgs.rs index 9b9b94d7cd2..8402827a4a6 100644 --- a/lightning-liquidity/src/lsps1/msgs.rs +++ b/lightning-liquidity/src/lsps1/msgs.rs @@ -1,3 +1,12 @@ +// This file is Copyright its original authors, visible in version control +// history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license +// , at your option. +// You may not use this file except in accordance with one or both of these +// licenses. + //! Message, request, and other primitive types used to implement bLIP-51 / LSPS1. use alloc::string::String; diff --git a/lightning-liquidity/src/lsps2/client.rs b/lightning-liquidity/src/lsps2/client.rs index fa08093108b..7008d42e345 100644 --- a/lightning-liquidity/src/lsps2/client.rs +++ b/lightning-liquidity/src/lsps2/client.rs @@ -3,7 +3,8 @@ // // This file is licensed under the Apache License, Version 2.0 or the MIT license -// , at your option. You may not use this file except in accordance with one or both of these +// , at your option. +// You may not use this file except in accordance with one or both of these // licenses. //! Contains the main bLIP-52 / LSPS2 client object, [`LSPS2ClientHandler`]. diff --git a/lightning-liquidity/src/lsps2/msgs.rs b/lightning-liquidity/src/lsps2/msgs.rs index 8fb9536b6d4..2a01d6ee32f 100644 --- a/lightning-liquidity/src/lsps2/msgs.rs +++ b/lightning-liquidity/src/lsps2/msgs.rs @@ -1,3 +1,12 @@ +// This file is Copyright its original authors, visible in version control +// history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license +// , at your option. +// You may not use this file except in accordance with one or both of these +// licenses. + //! Message, request, and other primitive types used to implement bLIP-52 / LSPS2. use alloc::string::String; diff --git a/lightning-liquidity/src/lsps2/payment_queue.rs b/lightning-liquidity/src/lsps2/payment_queue.rs index 30413537a9c..d6474dc97a0 100644 --- a/lightning-liquidity/src/lsps2/payment_queue.rs +++ b/lightning-liquidity/src/lsps2/payment_queue.rs @@ -1,3 +1,12 @@ +// This file is Copyright its original authors, visible in version control +// history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license +// , at your option. +// You may not use this file except in accordance with one or both of these +// licenses. + use alloc::vec::Vec; use lightning::ln::channelmanager::InterceptId; @@ -8,7 +17,7 @@ use lightning_types::payment::PaymentHash; /// remaining payments forwarded. #[derive(Clone, Default, PartialEq, Eq, Debug)] pub(crate) struct PaymentQueue { - payments: Vec<(PaymentHash, Vec)>, + payments: Vec, } impl PaymentQueue { @@ -17,37 +26,48 @@ impl PaymentQueue { } pub(crate) fn add_htlc(&mut self, new_htlc: InterceptedHTLC) -> (u64, usize) { - let payment = self.payments.iter_mut().find(|(p, _)| p == &new_htlc.payment_hash); - if let Some((payment_hash, htlcs)) = payment { + let payment = + self.payments.iter_mut().find(|entry| entry.payment_hash == new_htlc.payment_hash); + if let Some(entry) = payment { // HTLCs within a payment should have the same payment hash. - debug_assert!(htlcs.iter().all(|htlc| htlc.payment_hash == *payment_hash)); + debug_assert!(entry.htlcs.iter().all(|htlc| htlc.payment_hash == entry.payment_hash)); // The given HTLC should not already be present. - debug_assert!(htlcs.iter().all(|htlc| htlc.intercept_id != new_htlc.intercept_id)); - htlcs.push(new_htlc); + debug_assert!(entry + .htlcs + .iter() + .all(|htlc| htlc.intercept_id != new_htlc.intercept_id)); + entry.htlcs.push(new_htlc); let total_expected_outbound_amount_msat = - htlcs.iter().map(|htlc| htlc.expected_outbound_amount_msat).sum(); - (total_expected_outbound_amount_msat, htlcs.len()) + entry.htlcs.iter().map(|htlc| htlc.expected_outbound_amount_msat).sum(); + (total_expected_outbound_amount_msat, entry.htlcs.len()) } else { let expected_outbound_amount_msat = new_htlc.expected_outbound_amount_msat; - self.payments.push((new_htlc.payment_hash, vec![new_htlc])); + let entry = + PaymentQueueEntry { payment_hash: new_htlc.payment_hash, htlcs: vec![new_htlc] }; + self.payments.push(entry); (expected_outbound_amount_msat, 1) } } - pub(crate) fn pop_greater_than_msat( - &mut self, amount_msat: u64, - ) -> Option<(PaymentHash, Vec)> { - let position = self.payments.iter().position(|(_payment_hash, htlcs)| { - htlcs.iter().map(|htlc| htlc.expected_outbound_amount_msat).sum::() >= amount_msat + pub(crate) fn pop_greater_than_msat(&mut self, amount_msat: u64) -> Option { + let position = self.payments.iter().position(|entry| { + entry.htlcs.iter().map(|htlc| htlc.expected_outbound_amount_msat).sum::() + >= amount_msat }); position.map(|position| self.payments.remove(position)) } pub(crate) fn clear(&mut self) -> Vec { - self.payments.drain(..).map(|(_k, v)| v).flatten().collect() + self.payments.drain(..).map(|entry| entry.htlcs).flatten().collect() } } +#[derive(Clone, PartialEq, Eq, Debug)] +pub(crate) struct PaymentQueueEntry { + pub(crate) payment_hash: PaymentHash, + pub(crate) htlcs: Vec, +} + #[derive(Copy, Clone, PartialEq, Eq, Debug)] pub(crate) struct InterceptedHTLC { pub(crate) intercept_id: InterceptId, @@ -90,24 +110,23 @@ mod tests { }), (500_000_000, 2), ); - assert_eq!( - payment_queue.pop_greater_than_msat(500_000_000), - Some(( - PaymentHash([100; 32]), - vec![ - InterceptedHTLC { - intercept_id: InterceptId([0; 32]), - expected_outbound_amount_msat: 200_000_000, - payment_hash: PaymentHash([100; 32]), - }, - InterceptedHTLC { - intercept_id: InterceptId([2; 32]), - expected_outbound_amount_msat: 300_000_000, - payment_hash: PaymentHash([100; 32]), - }, - ] - )) - ); + + let expected_entry = PaymentQueueEntry { + payment_hash: PaymentHash([100; 32]), + htlcs: vec![ + InterceptedHTLC { + intercept_id: InterceptId([0; 32]), + expected_outbound_amount_msat: 200_000_000, + payment_hash: PaymentHash([100; 32]), + }, + InterceptedHTLC { + intercept_id: InterceptId([2; 32]), + expected_outbound_amount_msat: 300_000_000, + payment_hash: PaymentHash([100; 32]), + }, + ], + }; + assert_eq!(payment_queue.pop_greater_than_msat(500_000_000), Some(expected_entry),); assert_eq!( payment_queue.clear(), vec![InterceptedHTLC { diff --git a/lightning-liquidity/src/lsps2/service.rs b/lightning-liquidity/src/lsps2/service.rs index 309d7ae1755..114ed8b250d 100644 --- a/lightning-liquidity/src/lsps2/service.rs +++ b/lightning-liquidity/src/lsps2/service.rs @@ -242,12 +242,10 @@ impl OutboundJITChannelState { } => { let mut payment_queue = core::mem::take(payment_queue); payment_queue.add_htlc(htlc); - if let Some((_payment_hash, htlcs)) = - payment_queue.pop_greater_than_msat(*opening_fee_msat) - { + if let Some(entry) = payment_queue.pop_greater_than_msat(*opening_fee_msat) { let forward_payment = HTLCInterceptedAction::ForwardPayment( *channel_id, - FeePayment { htlcs, opening_fee_msat: *opening_fee_msat }, + FeePayment { htlcs: entry.htlcs, opening_fee_msat: *opening_fee_msat }, ); *self = OutboundJITChannelState::PendingPaymentForward { payment_queue, @@ -277,12 +275,10 @@ impl OutboundJITChannelState { ) -> Result { match self { OutboundJITChannelState::PendingChannelOpen { payment_queue, opening_fee_msat } => { - if let Some((_payment_hash, htlcs)) = - payment_queue.pop_greater_than_msat(*opening_fee_msat) - { + if let Some(entry) = payment_queue.pop_greater_than_msat(*opening_fee_msat) { let forward_payment = ForwardPaymentAction( channel_id, - FeePayment { opening_fee_msat: *opening_fee_msat, htlcs }, + FeePayment { htlcs: entry.htlcs, opening_fee_msat: *opening_fee_msat }, ); *self = OutboundJITChannelState::PendingPaymentForward { payment_queue: core::mem::take(payment_queue), @@ -311,12 +307,10 @@ impl OutboundJITChannelState { opening_fee_msat, channel_id, } => { - if let Some((_payment_hash, htlcs)) = - payment_queue.pop_greater_than_msat(*opening_fee_msat) - { + if let Some(entry) = payment_queue.pop_greater_than_msat(*opening_fee_msat) { let forward_payment = ForwardPaymentAction( *channel_id, - FeePayment { htlcs, opening_fee_msat: *opening_fee_msat }, + FeePayment { htlcs: entry.htlcs, opening_fee_msat: *opening_fee_msat }, ); *self = OutboundJITChannelState::PendingPaymentForward { payment_queue: core::mem::take(payment_queue), diff --git a/lightning-liquidity/src/lsps2/utils.rs b/lightning-liquidity/src/lsps2/utils.rs index a2c4d65936d..e4620043424 100644 --- a/lightning-liquidity/src/lsps2/utils.rs +++ b/lightning-liquidity/src/lsps2/utils.rs @@ -1,3 +1,10 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. +// You may not use this file except in accordance with one or both of these licenses. + //! Utilities for implementing the bLIP-52 / LSPS2 standard. use crate::lsps2::msgs::LSPS2OpeningFeeParams; diff --git a/lightning-liquidity/src/lsps5/service.rs b/lightning-liquidity/src/lsps5/service.rs index 2b86ad3ac08..72c3d83b3fe 100644 --- a/lightning-liquidity/src/lsps5/service.rs +++ b/lightning-liquidity/src/lsps5/service.rs @@ -17,9 +17,8 @@ use crate::lsps5::msgs::{ SetWebhookRequest, SetWebhookResponse, WebhookNotification, WebhookNotificationMethod, }; use crate::message_queue::MessageQueue; -use crate::prelude::hash_map::Entry; use crate::prelude::*; -use crate::sync::{Arc, Mutex}; +use crate::sync::{Arc, Mutex, RwLock, RwLockWriteGuard}; use crate::utils::time::TimeProvider; use bitcoin::secp256k1::PublicKey; @@ -47,16 +46,16 @@ pub const PRUNE_STALE_WEBHOOKS_INTERVAL_DAYS: Duration = Duration::from_secs(24 /// A stored webhook. #[derive(Debug, Clone)] -struct StoredWebhook { +struct Webhook { _app_name: LSPS5AppName, url: LSPS5WebhookUrl, _counterparty_node_id: PublicKey, // Timestamp used for tracking when the webhook was created / updated, or when the last notification was sent. // This is used to determine if the webhook is stale and should be pruned. last_used: LSPSDateTime, - // Map of last notification sent timestamps for each notification method. - // This is used to enforce notification cooldowns. - last_notification_sent: HashMap, + // Timestamp when we last sent a notification to the client. This is used to enforce + // notification cooldowns. + last_notification_sent: Option, } /// Server-side configuration options for LSPS5 Webhook Registration. @@ -117,7 +116,7 @@ where TP::Target: TimeProvider, { config: LSPS5ServiceConfig, - webhooks: Mutex>>, + per_peer_state: RwLock>, event_queue: Arc, pending_messages: Arc, time_provider: TP, @@ -140,7 +139,7 @@ where assert!(config.max_webhooks_per_client > 0, "`max_webhooks_per_client` must be > 0"); Self { config, - webhooks: Mutex::new(new_hash_map()), + per_peer_state: RwLock::new(new_hash_map()), event_queue, pending_messages, time_provider, @@ -150,18 +149,26 @@ where } } - fn check_prune_stale_webhooks(&self) { + fn check_prune_stale_webhooks<'a>( + &self, outer_state_lock: &mut RwLockWriteGuard<'a, HashMap>, + ) { + let mut last_pruning = self.last_pruning.lock().unwrap(); let now = LSPSDateTime::new_from_duration_since_epoch(self.time_provider.duration_since_epoch()); - let should_prune = { - let last_pruning = self.last_pruning.lock().unwrap(); - last_pruning.as_ref().map_or(true, |last_time| { - now.duration_since(&last_time) > PRUNE_STALE_WEBHOOKS_INTERVAL_DAYS - }) - }; + + let should_prune = last_pruning.as_ref().map_or(true, |last_time| { + now.duration_since(&last_time) > PRUNE_STALE_WEBHOOKS_INTERVAL_DAYS + }); if should_prune { - self.prune_stale_webhooks(); + outer_state_lock.retain(|client_id, peer_state| { + if self.client_has_open_channel(client_id) { + // Don't prune clients with open channels + return true; + } + !peer_state.prune_stale_webhooks(now) + }); + *last_pruning = Some(now); } } @@ -171,61 +178,55 @@ where ) -> Result<(), LightningError> { let mut message_queue_notifier = self.pending_messages.notifier(); - self.check_prune_stale_webhooks(); + let mut outer_state_lock = self.per_peer_state.write().unwrap(); - let mut webhooks = self.webhooks.lock().unwrap(); + let peer_state = + outer_state_lock.entry(counterparty_node_id).or_insert_with(PeerState::default); - let client_webhooks = webhooks.entry(counterparty_node_id).or_insert_with(new_hash_map); let now = LSPSDateTime::new_from_duration_since_epoch(self.time_provider.duration_since_epoch()); - let num_webhooks = client_webhooks.len(); + let num_webhooks = peer_state.webhooks_len(); let mut no_change = false; - match client_webhooks.entry(params.app_name.clone()) { - Entry::Occupied(mut entry) => { - no_change = entry.get().url == params.webhook; - let (last_used, last_notification_sent) = if no_change { - (entry.get().last_used, entry.get().last_notification_sent.clone()) - } else { - (now, new_hash_map()) - }; - entry.insert(StoredWebhook { - _app_name: params.app_name.clone(), - url: params.webhook.clone(), - _counterparty_node_id: counterparty_node_id, - last_used, - last_notification_sent, - }); - }, - Entry::Vacant(entry) => { - if num_webhooks >= self.config.max_webhooks_per_client as usize { - let error = LSPS5ProtocolError::TooManyWebhooks; - let msg = LSPS5Message::Response( - request_id, - LSPS5Response::SetWebhookError(error.clone().into()), - ) - .into(); - message_queue_notifier.enqueue(&counterparty_node_id, msg); - return Err(LightningError { - err: error.message().into(), - action: ErrorAction::IgnoreAndLog(Level::Info), - }); - } - entry.insert(StoredWebhook { - _app_name: params.app_name.clone(), - url: params.webhook.clone(), - _counterparty_node_id: counterparty_node_id, - last_used: now, - last_notification_sent: new_hash_map(), + if let Some(webhook) = peer_state.webhook_mut(¶ms.app_name) { + no_change = webhook.url == params.webhook; + if !no_change { + // The URL was updated. + webhook.url = params.webhook.clone(); + webhook.last_used = now; + webhook.last_notification_sent = None; + } + } else { + if num_webhooks >= self.config.max_webhooks_per_client as usize { + let error = LSPS5ProtocolError::TooManyWebhooks; + let msg = LSPS5Message::Response( + request_id, + LSPS5Response::SetWebhookError(error.clone().into()), + ) + .into(); + message_queue_notifier.enqueue(&counterparty_node_id, msg); + return Err(LightningError { + err: error.message().into(), + action: ErrorAction::IgnoreAndLog(Level::Info), }); - }, + } + + let webhook = Webhook { + _app_name: params.app_name.clone(), + url: params.webhook.clone(), + _counterparty_node_id: counterparty_node_id, + last_used: now, + last_notification_sent: None, + }; + + peer_state.insert_webhook(params.app_name.clone(), webhook); } if !no_change { self.send_webhook_registered_notification( counterparty_node_id, - params.app_name, + params.app_name.clone(), params.webhook, ) .map_err(|e| { @@ -245,7 +246,7 @@ where let msg = LSPS5Message::Response( request_id, LSPS5Response::SetWebhook(SetWebhookResponse { - num_webhooks: client_webhooks.len() as u32, + num_webhooks: peer_state.webhooks_len() as u32, max_webhooks: self.config.max_webhooks_per_client, no_change, }), @@ -261,14 +262,9 @@ where ) -> Result<(), LightningError> { let mut message_queue_notifier = self.pending_messages.notifier(); - self.check_prune_stale_webhooks(); - - let webhooks = self.webhooks.lock().unwrap(); - - let app_names = webhooks - .get(&counterparty_node_id) - .map(|client_webhooks| client_webhooks.keys().cloned().collect::>()) - .unwrap_or_else(Vec::new); + let outer_state_lock = self.per_peer_state.read().unwrap(); + let app_names = + outer_state_lock.get(&counterparty_node_id).map(|p| p.app_names()).unwrap_or_default(); let max_webhooks = self.config.max_webhooks_per_client; @@ -285,12 +281,10 @@ where ) -> Result<(), LightningError> { let mut message_queue_notifier = self.pending_messages.notifier(); - self.check_prune_stale_webhooks(); + let mut outer_state_lock = self.per_peer_state.write().unwrap(); - let mut webhooks = self.webhooks.lock().unwrap(); - - if let Some(client_webhooks) = webhooks.get_mut(&counterparty_node_id) { - if client_webhooks.remove(¶ms.app_name).is_some() { + if let Some(peer_state) = outer_state_lock.get_mut(&counterparty_node_id) { + if peer_state.remove_webhook(¶ms.app_name) { let response = RemoveWebhookResponse {}; let msg = LSPS5Message::Response(request_id, LSPS5Response::RemoveWebhook(response)) @@ -411,11 +405,11 @@ where fn send_notifications_to_client_webhooks( &self, client_id: PublicKey, notification: WebhookNotification, ) -> Result<(), LSPS5ProtocolError> { - let mut webhooks = self.webhooks.lock().unwrap(); - - let client_webhooks = match webhooks.get_mut(&client_id) { - Some(webhooks) if !webhooks.is_empty() => webhooks, - _ => return Ok(()), + let mut outer_state_lock = self.per_peer_state.write().unwrap(); + let peer_state = if let Some(peer_state) = outer_state_lock.get_mut(&client_id) { + peer_state + } else { + return Ok(()); }; let now = @@ -424,12 +418,10 @@ where // We must avoid sending multiple notifications of the same method // (other than lsps5.webhook_registered) close in time. if notification.method != WebhookNotificationMethod::LSPS5WebhookRegistered { - let rate_limit_applies = client_webhooks.iter().any(|(_, webhook)| { - webhook - .last_notification_sent - .get(¬ification.method) - .map(|last_sent| now.duration_since(&last_sent)) - .map_or(false, |duration| duration < NOTIFICATION_COOLDOWN_TIME) + let rate_limit_applies = peer_state.webhooks().iter().any(|(_, webhook)| { + webhook.last_notification_sent.as_ref().map_or(false, |last_sent| { + now.duration_since(&last_sent) < NOTIFICATION_COOLDOWN_TIME + }) }); if rate_limit_applies { @@ -437,15 +429,15 @@ where } } - for (app_name, webhook) in client_webhooks.iter_mut() { - webhook.last_notification_sent.insert(notification.method.clone(), now); - webhook.last_used = now; + for (app_name, webhook) in peer_state.webhooks_mut().iter_mut() { self.send_notification( client_id, app_name.clone(), webhook.url.clone(), notification.clone(), )?; + webhook.last_used = now; + webhook.last_notification_sent = Some(now); } Ok(()) } @@ -495,26 +487,6 @@ where .map_err(|_| LSPS5ProtocolError::UnknownError) } - fn prune_stale_webhooks(&self) { - let now = - LSPSDateTime::new_from_duration_since_epoch(self.time_provider.duration_since_epoch()); - let mut webhooks = self.webhooks.lock().unwrap(); - - webhooks.retain(|client_id, client_webhooks| { - if !self.client_has_open_channel(client_id) { - client_webhooks.retain(|_, webhook| { - now.duration_since(&webhook.last_used) < MIN_WEBHOOK_RETENTION_DAYS - }); - !client_webhooks.is_empty() - } else { - true - } - }); - - let mut last_pruning = self.last_pruning.lock().unwrap(); - *last_pruning = Some(now); - } - fn client_has_open_channel(&self, client_id: &PublicKey) -> bool { self.channel_manager .get_cm() @@ -524,12 +496,19 @@ where } pub(crate) fn peer_connected(&self, counterparty_node_id: &PublicKey) { - let mut webhooks = self.webhooks.lock().unwrap(); - if let Some(client_webhooks) = webhooks.get_mut(counterparty_node_id) { - for webhook in client_webhooks.values_mut() { - webhook.last_notification_sent.clear(); - } + let mut outer_state_lock = self.per_peer_state.write().unwrap(); + if let Some(peer_state) = outer_state_lock.get_mut(counterparty_node_id) { + peer_state.reset_notification_cooldown(); + } + self.check_prune_stale_webhooks(&mut outer_state_lock); + } + + pub(crate) fn peer_disconnected(&self, counterparty_node_id: &PublicKey) { + let mut outer_state_lock = self.per_peer_state.write().unwrap(); + if let Some(peer_state) = outer_state_lock.get_mut(counterparty_node_id) { + peer_state.reset_notification_cooldown(); } + self.check_prune_stale_webhooks(&mut outer_state_lock); } } @@ -574,3 +553,69 @@ where } } } + +#[derive(Debug, Default)] +struct PeerState { + webhooks: Vec<(LSPS5AppName, Webhook)>, +} + +impl PeerState { + fn webhook_mut(&mut self, name: &LSPS5AppName) -> Option<&mut Webhook> { + self.webhooks.iter_mut().find_map(|(n, h)| if n == name { Some(h) } else { None }) + } + + fn webhooks(&self) -> &Vec<(LSPS5AppName, Webhook)> { + &self.webhooks + } + + fn webhooks_mut(&mut self) -> &mut Vec<(LSPS5AppName, Webhook)> { + &mut self.webhooks + } + + fn webhooks_len(&self) -> usize { + self.webhooks.len() + } + + fn app_names(&self) -> Vec { + self.webhooks.iter().map(|(n, _)| n).cloned().collect() + } + + fn insert_webhook(&mut self, name: LSPS5AppName, hook: Webhook) { + for (n, h) in self.webhooks.iter_mut() { + if *n == name { + *h = hook; + return; + } + } + + self.webhooks.push((name, hook)); + } + + fn remove_webhook(&mut self, name: &LSPS5AppName) -> bool { + let mut removed = false; + self.webhooks.retain(|(n, _)| { + if n != name { + true + } else { + removed = true; + false + } + }); + removed + } + + fn reset_notification_cooldown(&mut self) { + for (_, h) in self.webhooks.iter_mut() { + h.last_notification_sent = None; + } + } + + // Returns whether the entire state is empty and can be pruned. + fn prune_stale_webhooks(&mut self, now: LSPSDateTime) -> bool { + self.webhooks.retain(|(_, webhook)| { + now.duration_since(&webhook.last_used) < MIN_WEBHOOK_RETENTION_DAYS + }); + + self.webhooks.is_empty() + } +} diff --git a/lightning-liquidity/src/manager.rs b/lightning-liquidity/src/manager.rs index 6452bd32df3..4cf97786d02 100644 --- a/lightning-liquidity/src/manager.rs +++ b/lightning-liquidity/src/manager.rs @@ -1,3 +1,12 @@ +// This file is Copyright its original authors, visible in version control +// history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license +// , at your option. +// You may not use this file except in accordance with one or both of these +// licenses. + use alloc::string::ToString; use alloc::vec::Vec; @@ -712,6 +721,10 @@ where if let Some(lsps2_service_handler) = self.lsps2_service_handler.as_ref() { lsps2_service_handler.peer_disconnected(counterparty_node_id); } + + if let Some(lsps5_service_handler) = self.lsps5_service_handler.as_ref() { + lsps5_service_handler.peer_disconnected(&counterparty_node_id); + } } fn peer_connected( &self, counterparty_node_id: bitcoin::secp256k1::PublicKey, _: &lightning::ln::msgs::Init, diff --git a/lightning-liquidity/src/message_queue.rs b/lightning-liquidity/src/message_queue.rs index 2e99d545438..d097573cf04 100644 --- a/lightning-liquidity/src/message_queue.rs +++ b/lightning-liquidity/src/message_queue.rs @@ -1,3 +1,12 @@ +// This file is Copyright its original authors, visible in version control +// history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license +// , at your option. +// You may not use this file except in accordance with one or both of these +// licenses. + //! Holds types and traits used to implement message queues for [`LSPSMessage`]s. use alloc::collections::VecDeque; diff --git a/lightning-liquidity/tests/lsps5_integration_tests.rs b/lightning-liquidity/tests/lsps5_integration_tests.rs index 9035755e89a..e526d3eda5e 100644 --- a/lightning-liquidity/tests/lsps5_integration_tests.rs +++ b/lightning-liquidity/tests/lsps5_integration_tests.rs @@ -411,11 +411,13 @@ fn webhook_error_handling_test() { #[test] fn webhook_notification_delivery_test() { + let mock_time_provider = Arc::new(MockTimeProvider::new(1000)); + let time_provider = Arc::::clone(&mock_time_provider); let chanmon_cfgs = create_chanmon_cfgs(2); let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); let nodes = create_network(2, &node_cfgs, &node_chanmgrs); - let (lsps_nodes, validator) = lsps5_test_setup(nodes, Arc::new(DefaultTimeProvider)); + let (lsps_nodes, validator) = lsps5_test_setup(nodes, time_provider); let LSPSNodes { service_node, client_node } = lsps_nodes; let service_node_id = service_node.inner.node.get_our_node_id(); let client_node_id = client_node.inner.node.get_our_node_id(); @@ -499,6 +501,8 @@ fn webhook_notification_delivery_test() { "No event should be emitted due to cooldown" ); + mock_time_provider.advance_time(NOTIFICATION_COOLDOWN_TIME.as_secs() + 1); + let timeout_block = 700000; // Some future block height let _ = service_handler.notify_expiry_soon(client_node_id, timeout_block); @@ -719,11 +723,13 @@ fn idempotency_set_webhook_test() { #[test] fn replay_prevention_test() { + let mock_time_provider = Arc::new(MockTimeProvider::new(1000)); + let time_provider = Arc::::clone(&mock_time_provider); let chanmon_cfgs = create_chanmon_cfgs(2); let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); let nodes = create_network(2, &node_cfgs, &node_chanmgrs); - let (lsps_nodes, validator) = lsps5_test_setup(nodes, Arc::new(DefaultTimeProvider)); + let (lsps_nodes, validator) = lsps5_test_setup(nodes, time_provider); let LSPSNodes { service_node, client_node } = lsps_nodes; let service_node_id = service_node.inner.node.get_our_node_id(); let client_node_id = client_node.inner.node.get_our_node_id(); @@ -774,6 +780,9 @@ fn replay_prevention_test() { // Fill up the validator's signature cache to push out the original signature. for i in 0..MAX_RECENT_SIGNATURES { + // Advance time, allowing for another notification + mock_time_provider.advance_time(NOTIFICATION_COOLDOWN_TIME.as_secs() + 1); + let timeout_block = 700000 + i as u32; let _ = service_handler.notify_expiry_soon(client_node_id, timeout_block); let event = service_node.liquidity_manager.next_event().unwrap(); @@ -852,7 +861,15 @@ fn stale_webhooks() { MIN_WEBHOOK_RETENTION_DAYS.as_secs() + PRUNE_STALE_WEBHOOKS_INTERVAL_DAYS.as_secs(), ); - // LIST calls prune before executing -> should be empty after advancing time + // LIST should be empty after advancing time and reconnection + service_node.liquidity_manager.peer_disconnected(client_node_id); + let init_msg = Init { + features: lightning_types::features::InitFeatures::empty(), + remote_network_address: None, + networks: None, + }; + service_node.liquidity_manager.peer_connected(client_node_id, &init_msg, false).unwrap(); + let _ = client_handler.list_webhooks(service_node_id); let list_req2 = get_lsps_message!(client_node, service_node_id); service_node.liquidity_manager.handle_custom_message(list_req2, client_node_id).unwrap(); @@ -871,11 +888,13 @@ fn stale_webhooks() { #[test] fn test_all_notifications() { + let mock_time_provider = Arc::new(MockTimeProvider::new(1000)); + let time_provider = Arc::::clone(&mock_time_provider); let chanmon_cfgs = create_chanmon_cfgs(2); let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); let nodes = create_network(2, &node_cfgs, &node_chanmgrs); - let (lsps_nodes, validator) = lsps5_test_setup(nodes, Arc::new(DefaultTimeProvider)); + let (lsps_nodes, validator) = lsps5_test_setup(nodes, time_provider); let LSPSNodes { service_node, client_node } = lsps_nodes; let service_node_id = service_node.inner.node.get_our_node_id(); let client_node_id = client_node.inner.node.get_our_node_id(); @@ -894,9 +913,16 @@ fn test_all_notifications() { // consume initial SendWebhookNotification let _ = service_node.liquidity_manager.next_event().unwrap(); + mock_time_provider.advance_time(NOTIFICATION_COOLDOWN_TIME.as_secs() + 1); let _ = service_handler.notify_onion_message_incoming(client_node_id); + + mock_time_provider.advance_time(NOTIFICATION_COOLDOWN_TIME.as_secs() + 1); let _ = service_handler.notify_payment_incoming(client_node_id); + + mock_time_provider.advance_time(NOTIFICATION_COOLDOWN_TIME.as_secs() + 1); let _ = service_handler.notify_expiry_soon(client_node_id, 1000); + + mock_time_provider.advance_time(NOTIFICATION_COOLDOWN_TIME.as_secs() + 1); let _ = service_handler.notify_liquidity_management_request(client_node_id); let expected_notifications = vec![ @@ -1050,7 +1076,7 @@ fn test_notify_without_webhooks_does_nothing() { } #[test] -fn test_send_notifications_and_peer_connected_resets_cooldown() { +fn test_notifications_and_peer_connected_resets_cooldown() { let mock_time_provider = Arc::new(MockTimeProvider::new(1000)); let time_provider = Arc::::clone(&mock_time_provider); let chanmon_cfgs = create_chanmon_cfgs(2); @@ -1101,24 +1127,7 @@ fn test_send_notifications_and_peer_connected_resets_cooldown() { "Should not emit event due to cooldown" ); - // 3. Notification of a different method CAN be sent - let timeout_block = 424242; - let _ = service_handler.notify_expiry_soon(client_node_id, timeout_block); - let event = service_node.liquidity_manager.next_event().unwrap(); - match event { - LiquidityEvent::LSPS5Service(LSPS5ServiceEvent::SendWebhookNotification { - notification, - .. - }) => { - assert!(matches!( - notification.method, - WebhookNotificationMethod::LSPS5ExpirySoon { timeout } if timeout == timeout_block - )); - }, - _ => panic!("Expected SendWebhookNotification event for expiry_soon"), - } - - // 4. Advance time past cooldown and ensure payment_incoming can be sent again + // 3. Advance time past cooldown and ensure payment_incoming can be sent again mock_time_provider.advance_time(NOTIFICATION_COOLDOWN_TIME.as_secs() + 1); let _ = service_handler.notify_payment_incoming(client_node_id); @@ -1133,7 +1142,7 @@ fn test_send_notifications_and_peer_connected_resets_cooldown() { _ => panic!("Expected SendWebhookNotification event after cooldown"), } - // 5. Can't send payment_incoming notification again immediately after cooldown + // 4. Can't send payment_incoming notification again immediately after cooldown let result = service_handler.notify_payment_incoming(client_node_id); let error = result.unwrap_err(); @@ -1144,7 +1153,7 @@ fn test_send_notifications_and_peer_connected_resets_cooldown() { "Should not emit event due to cooldown" ); - // 6. After peer_connected, notification should be sent again immediately + // 5. After peer_connected, notification should be sent again immediately let init_msg = Init { features: lightning_types::features::InitFeatures::empty(), remote_network_address: None, @@ -1163,3 +1172,65 @@ fn test_send_notifications_and_peer_connected_resets_cooldown() { _ => panic!("Expected SendWebhookNotification event after peer_connected"), } } + +#[test] +fn webhook_update_affects_future_notifications() { + let mock_time_provider = Arc::new(MockTimeProvider::new(1000)); + let time_provider = Arc::::clone(&mock_time_provider); + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); + let nodes = create_network(2, &node_cfgs, &node_chanmgrs); + let (lsps_nodes, _) = lsps5_test_setup(nodes, time_provider); + let LSPSNodes { service_node, client_node } = lsps_nodes; + let service_node_id = service_node.inner.node.get_our_node_id(); + let client_node_id = client_node.inner.node.get_our_node_id(); + let client_handler = client_node.liquidity_manager.lsps5_client_handler().unwrap(); + let service_handler = service_node.liquidity_manager.lsps5_service_handler().unwrap(); + + let app = "UpdateTestApp".to_string(); + let url_v1 = "https://example.org/v1".to_string(); + let url_v2 = "https://example.org/v2".to_string(); + + // register v1 + client_handler.set_webhook(service_node_id, app.clone(), url_v1).unwrap(); + let req = get_lsps_message!(client_node, service_node_id); + service_node.liquidity_manager.handle_custom_message(req, client_node_id).unwrap(); + let _ = service_node.liquidity_manager.next_event().unwrap(); // initial webhook_registered + let resp = get_lsps_message!(service_node, client_node_id); + client_node.liquidity_manager.handle_custom_message(resp, service_node_id).unwrap(); + let _ = client_node.liquidity_manager.next_event().unwrap(); + + // update to v2 + client_handler.set_webhook(service_node_id, app, url_v2.clone()).unwrap(); + let upd_req = get_lsps_message!(client_node, service_node_id); + service_node.liquidity_manager.handle_custom_message(upd_req, client_node_id).unwrap(); + let update_event = service_node.liquidity_manager.next_event().unwrap(); + match update_event { + LiquidityEvent::LSPS5Service(LSPS5ServiceEvent::SendWebhookNotification { + url, .. + }) => { + assert_eq!(url.as_str(), url_v2); + }, + _ => panic!("Expected webhook_registered for update"), + } + let upd_resp = get_lsps_message!(service_node, client_node_id); + client_node.liquidity_manager.handle_custom_message(upd_resp, service_node_id).unwrap(); + let _ = client_node.liquidity_manager.next_event().unwrap(); + + // Advance past cooldown and send a notification again + mock_time_provider.advance_time(NOTIFICATION_COOLDOWN_TIME.as_secs() + 1); + service_handler.notify_payment_incoming(client_node_id).unwrap(); + let ev = service_node.liquidity_manager.next_event().unwrap(); + match ev { + LiquidityEvent::LSPS5Service(LSPS5ServiceEvent::SendWebhookNotification { + url, + notification, + .. + }) => { + assert_eq!(notification.method, WebhookNotificationMethod::LSPS5PaymentIncoming); + assert_eq!(url.as_str(), url_v2, "Should target updated URL"); + }, + _ => panic!("Expected SendWebhookNotification after update"), + } +}