diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index ff5189f9346..dcf5c33367c 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -181,7 +181,7 @@ struct LatestMonitorState { } struct TestChainMonitor { - pub logger: Arc, + pub logger: Arc>, pub keys: Arc, pub persister: Arc, pub chain_monitor: Arc< @@ -190,7 +190,7 @@ struct TestChainMonitor { Arc, Arc, Arc, - Arc, + Arc>, Arc, Arc, >, @@ -199,8 +199,8 @@ struct TestChainMonitor { } impl TestChainMonitor { pub fn new( - broadcaster: Arc, logger: Arc, feeest: Arc, - persister: Arc, keys: Arc, + broadcaster: Arc, logger: Arc>, + feeest: Arc, persister: Arc, keys: Arc, ) -> Self { Self { chain_monitor: Arc::new(chainmonitor::ChainMonitor::new( @@ -464,7 +464,7 @@ type ChanMan<'a> = ChannelManager< Arc, &'a FuzzRouter, &'a FuzzRouter, - Arc, + Arc>, >; #[inline] @@ -640,7 +640,7 @@ pub fn do_test(data: &[u8], underlying_out: Out, anchors: bool) { macro_rules! make_node { ($node_id: expr, $fee_estimator: expr) => {{ - let logger: Arc = + let logger: Arc> = Arc::new(test_logger::TestLogger::new($node_id.to_string(), out.clone())); let node_secret = SecretKey::from_slice(&[ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, @@ -703,7 +703,7 @@ pub fn do_test(data: &[u8], underlying_out: Out, anchors: bool) { keys, fee_estimator| { let keys_manager = Arc::clone(keys); - let logger: Arc = + let logger: Arc> = Arc::new(test_logger::TestLogger::new(node_id.to_string(), out.clone())); let chain_monitor = Arc::new(TestChainMonitor::new( broadcast.clone(), diff --git a/fuzz/src/full_stack.rs b/fuzz/src/full_stack.rs index eb9d51d487d..9e324097aca 100644 --- a/fuzz/src/full_stack.rs +++ b/fuzz/src/full_stack.rs @@ -224,7 +224,7 @@ type ChannelMan<'a> = ChannelManager< Arc, Arc, Arc, - Arc, + Arc>, Arc, Arc, >, @@ -236,14 +236,20 @@ type ChannelMan<'a> = ChannelManager< Arc, &'a FuzzRouter, &'a FuzzRouter, - Arc, + Arc>, >; type PeerMan<'a> = PeerManager< Peer<'a>, Arc>, - Arc>>, Arc, Arc>>, + Arc< + P2PGossipSync< + Arc>>>, + Arc, + Arc>, + >, + >, IgnoringMessageHandler, - Arc, + Arc>, IgnoringMessageHandler, Arc, IgnoringMessageHandler, @@ -257,7 +263,7 @@ struct MoneyLossDetector<'a> { Arc, Arc, Arc, - Arc, + Arc>, Arc, Arc, >, @@ -282,7 +288,7 @@ impl<'a> MoneyLossDetector<'a> { Arc, Arc, Arc, - Arc, + Arc>, Arc, Arc, >, @@ -519,7 +525,7 @@ impl SignerProvider for KeyProvider { } #[inline] -pub fn do_test(mut data: &[u8], logger: &Arc) { +pub fn do_test(mut data: &[u8], logger: &Arc>) { if data.len() < 32 { return; } @@ -1021,13 +1027,14 @@ pub fn do_test(mut data: &[u8], logger: &Arc) { } pub fn full_stack_test(data: &[u8], out: Out) { - let logger: Arc = Arc::new(test_logger::TestLogger::new("".to_owned(), out)); + let logger: Arc> = + Arc::new(test_logger::TestLogger::new("".to_owned(), out)); do_test(data, &logger); } #[no_mangle] pub extern "C" fn full_stack_run(data: *const u8, datalen: usize) { - let logger: Arc = + let logger: Arc> = Arc::new(test_logger::TestLogger::new("".to_owned(), test_logger::DevNull {})); do_test(unsafe { std::slice::from_raw_parts(data, datalen) }, &logger); } @@ -1656,7 +1663,7 @@ pub fn write_fst_seeds(path: &str) { #[cfg(test)] mod tests { - use lightning::util::logger::{Logger, Record}; + use lightning::util::logger::{Logger, Record, Span}; use std::collections::HashMap; use std::sync::{Arc, Mutex}; @@ -1665,6 +1672,8 @@ mod tests { pub lines: Mutex>, } impl Logger for TrackingLogger { + type UserSpan = (); + fn log(&self, record: Record) { *self .lines @@ -1681,6 +1690,8 @@ mod tests { record.args ); } + + fn start(&self, _span: Span, _parent: Option<&()>) -> () {} } #[test] @@ -1694,7 +1705,7 @@ mod tests { let test = super::two_peer_forwarding_seed(); let logger = Arc::new(TrackingLogger { lines: Mutex::new(HashMap::new()) }); - super::do_test(&test, &(Arc::clone(&logger) as Arc)); + super::do_test(&test, &(Arc::clone(&logger) as Arc>)); let log_entries = logger.lines.lock().unwrap(); // 1 @@ -1730,7 +1741,7 @@ mod tests { let test = super::gossip_exchange_seed(); let logger = Arc::new(TrackingLogger { lines: Mutex::new(HashMap::new()) }); - super::do_test(&test, &(Arc::clone(&logger) as Arc)); + super::do_test(&test, &(Arc::clone(&logger) as Arc>)); let log_entries = logger.lines.lock().unwrap(); assert_eq!(log_entries.get(&("lightning::ln::peer_handler".to_string(), "Sending message to all peers except Some(PublicKey(0000000000000000000000000000000000000000000000000000000000000002ff00000000000000000000000000000000000000000000000000000000000002)) or the announced channel's counterparties: ChannelAnnouncement { node_signature_1: 3026020200b202200303030303030303030303030303030303030303030303030303030303030303, node_signature_2: 3026020200b202200202020202020202020202020202020202020202020202020202020202020202, bitcoin_signature_1: 3026020200b202200303030303030303030303030303030303030303030303030303030303030303, bitcoin_signature_2: 3026020200b202200202020202020202020202020202020202020202020202020202020202020202, contents: UnsignedChannelAnnouncement { features: [], chain_hash: 6fe28c0ab6f1b372c1a6a246ae63f74f931e8365e15a089c68d6190000000000, short_channel_id: 42, node_id_1: NodeId(030303030303030303030303030303030303030303030303030303030303030303), node_id_2: NodeId(020202020202020202020202020202020202020202020202020202020202020202), bitcoin_key_1: NodeId(030303030303030303030303030303030303030303030303030303030303030303), bitcoin_key_2: NodeId(020202020202020202020202020202020202020202020202020202020202020202), excess_data: [] } }".to_string())), Some(&1)); diff --git a/fuzz/src/onion_message.rs b/fuzz/src/onion_message.rs index 85ba6263b2a..b40e43de019 100644 --- a/fuzz/src/onion_message.rs +++ b/fuzz/src/onion_message.rs @@ -315,7 +315,7 @@ impl SignerProvider for KeyProvider { #[cfg(test)] mod tests { use bitcoin::hex::FromHex; - use lightning::util::logger::{Logger, Record}; + use lightning::util::logger::{Logger, Record, Span}; use std::collections::HashMap; use std::sync::Mutex; @@ -324,6 +324,8 @@ mod tests { pub lines: Mutex>, } impl Logger for TrackingLogger { + type UserSpan = (); + fn log(&self, record: Record) { let mut lines_lock = self.lines.lock().unwrap(); let key = (record.module_path.to_string(), format!("{}", record.args)); @@ -337,6 +339,8 @@ mod tests { record.args ); } + + fn start(&self, _span: Span, _parent: Option<&()>) -> () {} } #[test] diff --git a/fuzz/src/process_onion_failure.rs b/fuzz/src/process_onion_failure.rs index 1bc9900718a..01f7080225a 100644 --- a/fuzz/src/process_onion_failure.rs +++ b/fuzz/src/process_onion_failure.rs @@ -63,7 +63,8 @@ fn do_test(data: &[u8], out: Out) { } let secp_ctx = Secp256k1::new(); - let logger: Arc = Arc::new(test_logger::TestLogger::new("".to_owned(), out)); + let logger: Arc> = + Arc::new(test_logger::TestLogger::new("".to_owned(), out)); let session_priv = SecretKey::from_slice(&usize_to_32_bytes(213127)).unwrap(); let payment_id = PaymentId(usize_to_32_bytes(232299)); diff --git a/fuzz/src/utils/test_logger.rs b/fuzz/src/utils/test_logger.rs index 6d9de02e387..4602a5154af 100644 --- a/fuzz/src/utils/test_logger.rs +++ b/fuzz/src/utils/test_logger.rs @@ -7,7 +7,7 @@ // You may not use this file except in accordance with one or both of these // licenses. -use lightning::util::logger::{Logger, Record}; +use lightning::util::logger::{Logger, Record, Span}; use std::io::Write; use std::sync::{Arc, Mutex}; @@ -58,6 +58,8 @@ impl<'a, Out: Output> Write for LockedWriteAdapter<'a, Out> { } impl Logger for TestLogger { + type UserSpan = (); + fn log(&self, record: Record) { write!( LockedWriteAdapter(&self.out), @@ -70,4 +72,6 @@ impl Logger for TestLogger { ) .unwrap(); } + + fn start(&self, _span: Span, _parent: Option<&()>) -> () {} } diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index ad1592b7722..06da83e6834 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -738,7 +738,9 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput}; /// # use lightning_liquidity::lsps5::service::TimeProvider; /// # struct Logger {} /// # impl lightning::util::logger::Logger for Logger { +/// # type UserSpan = (); /// # fn log(&self, _record: lightning::util::logger::Record) {} +/// # fn start(&self, _span: lightning::util::logger::Span, _parent: Option<&()>) -> () {} /// # } /// # struct StoreSync {} /// # impl lightning::util::persist::KVStoreSync for StoreSync { diff --git a/lightning-dns-resolver/src/lib.rs b/lightning-dns-resolver/src/lib.rs index fc591c8c153..e5d79e89ba5 100644 --- a/lightning-dns-resolver/src/lib.rs +++ b/lightning-dns-resolver/src/lib.rs @@ -179,7 +179,7 @@ mod test { use lightning::sign::{KeysManager, NodeSigner, ReceiveAuthKey, Recipient}; use lightning::types::features::InitFeatures; use lightning::types::payment::PaymentHash; - use lightning::util::logger::Logger; + use lightning::util::logger::{Logger, Span}; use lightning::{commitment_signed_dance, expect_payment_claimed, get_htlc_update_msgs}; use lightning_types::string::UntrustedString; @@ -192,9 +192,13 @@ mod test { node: &'static str, } impl Logger for TestLogger { + type UserSpan = (); + fn log(&self, record: lightning::util::logger::Record) { eprintln!("{}: {}", self.node, record.args); } + + fn start(&self, _span: Span, _parent: Option<&()>) -> () {} } impl Deref for TestLogger { type Target = TestLogger; diff --git a/lightning-net-tokio/src/lib.rs b/lightning-net-tokio/src/lib.rs index 1d1d3c4654b..431900bf157 100644 --- a/lightning-net-tokio/src/lib.rs +++ b/lightning-net-tokio/src/lib.rs @@ -639,6 +639,8 @@ mod tests { pub struct TestLogger(); impl lightning::util::logger::Logger for TestLogger { + type UserSpan = (); + fn log(&self, record: lightning::util::logger::Record) { println!( "{:<5} [{} : {}, {}] {}", @@ -649,6 +651,8 @@ mod tests { record.args ); } + + fn start(&self, _span: lightning::util::logger::Span, _parent: Option<&()>) -> () {} } struct MsgHandler { diff --git a/lightning-rapid-gossip-sync/src/lib.rs b/lightning-rapid-gossip-sync/src/lib.rs index 429a3560be0..ce0ed571399 100644 --- a/lightning-rapid-gossip-sync/src/lib.rs +++ b/lightning-rapid-gossip-sync/src/lib.rs @@ -51,10 +51,12 @@ //! use lightning::routing::gossip::NetworkGraph; //! use lightning_rapid_gossip_sync::RapidGossipSync; //! -//! # use lightning::util::logger::{Logger, Record}; +//! # use lightning::util::logger::{Logger, Record, Span}; //! # struct FakeLogger {} //! # impl Logger for FakeLogger { +//! # type UserSpan = (); //! # fn log(&self, record: Record) { } +//! # fn start(&self, _span: Span, parent: Option<&()>) -> () {} //! # } //! # let logger = FakeLogger {}; //! diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index ddb1e31f645..3baf367cc8c 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -66,7 +66,7 @@ use crate::sign::{ use crate::types::features::ChannelTypeFeatures; use crate::types::payment::{PaymentHash, PaymentPreimage}; use crate::util::byte_utils; -use crate::util::logger::{Logger, Record}; +use crate::util::logger::{Logger, Record, Span}; use crate::util::persist::MonitorName; use crate::util::ser::{ MaybeReadable, Readable, ReadableArgs, RequiredWrapper, UpgradableRequired, Writeable, Writer, @@ -1634,12 +1634,18 @@ impl<'a, L: Deref> Logger for WithChannelMonitor<'a, L> where L::Target: Logger, { + type UserSpan = <::Target as Logger>::UserSpan; + fn log(&self, mut record: Record) { record.peer_id = self.peer_id; record.channel_id = self.channel_id; record.payment_hash = self.payment_hash; self.logger.log(record) } + + fn start(&self, span: Span, parent: Option<&Self::UserSpan>) -> Self::UserSpan { + self.logger.start(span, parent) + } } impl<'a, L: Deref> WithChannelMonitor<'a, L> diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index b5b77972a6c..07316ad0c95 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -79,7 +79,7 @@ use crate::util::config::{ MaxDustHTLCExposure, UserConfig, }; use crate::util::errors::APIError; -use crate::util::logger::{Logger, Record, WithContext}; +use crate::util::logger::{BoxedSpan, Logger, Record, Span, WithContext}; use crate::util::scid_utils::{block_from_scid, scid_from_parts}; use crate::util::ser::{ Readable, ReadableArgs, RequiredWrapper, TransactionU16LenLimited, Writeable, Writer, @@ -277,6 +277,15 @@ impl InboundHTLCState { } struct InboundHTLCOutput { + htlc_id: u64, + amount_msat: u64, + cltv_expiry: u32, + payment_hash: PaymentHash, + state_wrapper: InboundHTLCStateWrapper, + span: BoxedSpan, +} + +struct InboundHTLCOutputParams { htlc_id: u64, amount_msat: u64, cltv_expiry: u32, @@ -285,6 +294,24 @@ struct InboundHTLCOutput { } impl InboundHTLCOutput { + fn new( + channel_id: ChannelId, params: InboundHTLCOutputParams, logger: &L, + ) -> InboundHTLCOutput + where + L::Target: Logger, + { + let htlc_span = + logger.start(Span::InboundHTLC { channel_id, htlc_id: params.htlc_id }, None); + InboundHTLCOutput { + htlc_id: params.htlc_id, + amount_msat: params.amount_msat, + cltv_expiry: params.cltv_expiry, + payment_hash: params.payment_hash, + state_wrapper: InboundHTLCStateWrapper::new(params.state, Some(&htlc_span), logger), + span: BoxedSpan::new(htlc_span), + } + } + fn is_dust( &self, local: bool, feerate_per_kw: u32, broadcaster_dust_limit_sat: u64, features: &ChannelTypeFeatures, @@ -300,6 +327,100 @@ impl InboundHTLCOutput { }; self.amount_msat / 1000 < broadcaster_dust_limit_sat + htlc_tx_fee_sat } + + fn state(&self) -> &InboundHTLCState { + &self.state_wrapper.state + } + + fn set_state(&mut self, state: InboundHTLCState, logger: &L) + where + L::Target: Logger, + { + mem::drop(self.state_wrapper.waiting_on_peer_span.take()); + mem::drop(self.state_wrapper.waiting_on_monitor_persist_span.take()); + mem::drop(self.state_wrapper.span.take()); + self.state_wrapper = + InboundHTLCStateWrapper::new(state, self.span.as_user_span_ref::(), logger); + } + + fn waiting_on_peer(&mut self, logger: &L) + where + L::Target: Logger, + { + self.state_wrapper.waiting_on_peer_span = Some(BoxedSpan::new(logger.start( + Span::WaitingOnPeer, + self.state_wrapper.span.as_ref().map(|s| s.as_user_span_ref::()).flatten(), + ))); + } + + fn waiting_on_monitor_persist(&mut self, logger: &L) + where + L::Target: Logger, + { + self.state_wrapper.waiting_on_monitor_persist_span = Some(BoxedSpan::new(logger.start( + Span::WaitingOnMonitorPersist, + self.state_wrapper.span.as_ref().map(|s| s.as_user_span_ref::()).flatten(), + ))); + } + + fn monitor_persisted(&mut self) { + if self.state_wrapper.waiting_on_monitor_persist_span.is_some() { + mem::drop(self.state_wrapper.waiting_on_monitor_persist_span.take()); + } + } + + fn is_waiting_on_monitor_persist(&self) -> bool { + self.state_wrapper.waiting_on_monitor_persist_span.is_some() + } + + fn waiting_on_async_signing(&mut self, logger: &L) + where + L::Target: Logger, + { + self.state_wrapper.waiting_on_async_signing_span = Some(BoxedSpan::new(logger.start( + Span::WaitingOnAsyncSigning, + self.state_wrapper.span.as_ref().map(|s| s.as_user_span_ref::()).flatten(), + ))); + } + + fn async_signing_completed(&mut self) { + if self.state_wrapper.waiting_on_async_signing_span.is_some() { + mem::drop(self.state_wrapper.waiting_on_async_signing_span.take()); + } + } + + fn is_waiting_on_async_signing(&self) -> bool { + self.state_wrapper.waiting_on_async_signing_span.is_some() + } +} + +struct InboundHTLCStateWrapper { + state: InboundHTLCState, + waiting_on_peer_span: Option, + waiting_on_monitor_persist_span: Option, + waiting_on_async_signing_span: Option, + // Drop full span last. + span: Option, +} + +impl InboundHTLCStateWrapper { + fn new( + state: InboundHTLCState, parent_span: Option<&<::Target as Logger>::UserSpan>, + logger: &L, + ) -> InboundHTLCStateWrapper + where + L::Target: Logger, + { + let state_span = + logger.start(Span::InboundHTLCState { state: (&state).into() }, parent_span); + InboundHTLCStateWrapper { + state, + span: Some(BoxedSpan::new(state_span)), + waiting_on_peer_span: None, + waiting_on_monitor_persist_span: None, + waiting_on_async_signing_span: None, + } + } } #[cfg_attr(test, derive(Clone, Debug, PartialEq))] @@ -416,6 +537,20 @@ impl<'a> Into> for &'a OutboundHTLCOutcome { #[cfg_attr(test, derive(Clone, Debug, PartialEq))] struct OutboundHTLCOutput { + htlc_id: u64, + amount_msat: u64, + cltv_expiry: u32, + payment_hash: PaymentHash, + state_wrapper: OutboundHTLCStateWrapper, + source: HTLCSource, + blinding_point: Option, + skimmed_fee_msat: Option, + send_timestamp: Option, + span: BoxedSpan, + _forward_span: Option, +} + +struct OutboundHTLCOutputParams { htlc_id: u64, amount_msat: u64, cltv_expiry: u32, @@ -428,6 +563,32 @@ struct OutboundHTLCOutput { } impl OutboundHTLCOutput { + fn new( + channel_id: ChannelId, params: OutboundHTLCOutputParams, forward_span: Option, + logger: &L, + ) -> OutboundHTLCOutput + where + L::Target: Logger, + { + let htlc_span = logger.start( + Span::OutboundHTLC { channel_id, htlc_id: params.htlc_id }, + forward_span.as_ref().map(|s| s.as_user_span_ref::()).flatten(), + ); + OutboundHTLCOutput { + htlc_id: params.htlc_id, + amount_msat: params.amount_msat, + cltv_expiry: params.cltv_expiry, + payment_hash: params.payment_hash, + state_wrapper: OutboundHTLCStateWrapper::new(params.state, Some(&htlc_span), logger), + source: params.source, + blinding_point: params.blinding_point, + skimmed_fee_msat: params.skimmed_fee_msat, + send_timestamp: params.send_timestamp, + span: BoxedSpan::new(htlc_span), + _forward_span: forward_span, + } + } + fn is_dust( &self, local: bool, feerate_per_kw: u32, broadcaster_dust_limit_sat: u64, features: &ChannelTypeFeatures, @@ -443,6 +604,127 @@ impl OutboundHTLCOutput { }; self.amount_msat / 1000 < broadcaster_dust_limit_sat + htlc_tx_fee_sat } + + fn state(&self) -> &OutboundHTLCState { + &self.state_wrapper.state + } + + fn set_state(&mut self, state: OutboundHTLCState, logger: &L) + where + L::Target: Logger, + { + mem::drop(self.state_wrapper.waiting_on_peer_span.take()); + mem::drop(self.state_wrapper.waiting_on_monitor_persist_span.take()); + mem::drop(self.state_wrapper.span.take()); + self.state_wrapper = + OutboundHTLCStateWrapper::new(state, self.span.as_user_span_ref::(), logger); + } + + fn is_waiting_on_peer(&self, reason: Option) -> bool { + match (&self.state_wrapper.waiting_on_peer_span, reason) { + (Some((_, _)), None) => true, + (Some((_, span_reason)), Some(given_reason)) => *span_reason == given_reason, + _ => false, + } + } + + fn waiting_on_peer(&mut self, logger: &L, reason: WaitingOnPeerReason) + where + L::Target: Logger, + { + self.state_wrapper.waiting_on_peer_span = Some(( + BoxedSpan::new(logger.start( + Span::WaitingOnPeer, + self.state_wrapper.span.as_ref().map(|s| s.as_user_span_ref::()).flatten(), + )), + reason, + )); + } + + fn peer_responded(&mut self) { + if self.state_wrapper.waiting_on_peer_span.is_some() { + mem::drop(self.state_wrapper.waiting_on_peer_span.take()); + } + } + + fn is_waiting_on_monitor_persist(&self) -> bool { + self.state_wrapper.waiting_on_monitor_persist_span.is_some() + } + + fn waiting_on_monitor_persist(&mut self, logger: &L) + where + L::Target: Logger, + { + self.state_wrapper.waiting_on_monitor_persist_span = Some(BoxedSpan::new(logger.start( + Span::WaitingOnMonitorPersist, + self.state_wrapper.span.as_ref().map(|s| s.as_user_span_ref::()).flatten(), + ))); + } + + fn monitor_persisted(&mut self) { + if self.state_wrapper.waiting_on_monitor_persist_span.is_some() { + mem::drop(self.state_wrapper.waiting_on_monitor_persist_span.take()); + } + } + + fn waiting_on_async_signing(&mut self, logger: &L) + where + L::Target: Logger, + { + self.state_wrapper.waiting_on_async_signing_span = Some(BoxedSpan::new(logger.start( + Span::WaitingOnAsyncSigning, + self.state_wrapper.span.as_ref().map(|s| s.as_user_span_ref::()).flatten(), + ))); + } + + fn async_signing_completed(&mut self) { + if self.state_wrapper.waiting_on_async_signing_span.is_some() { + mem::drop(self.state_wrapper.waiting_on_async_signing_span.take()); + } + } + + fn is_waiting_on_async_signing(&self) -> bool { + self.state_wrapper.waiting_on_async_signing_span.is_some() + } +} + +// This additional reason allows us to recognize the different stages in the +// OutboundHTLCState::Committed state. For other states, this can easily be derived. +#[derive(Debug, Clone, PartialEq, Eq)] +enum WaitingOnPeerReason { + Commitment, + Revocation, + HTLCResolution, +} + +#[cfg_attr(test, derive(Clone, Debug, PartialEq))] +struct OutboundHTLCStateWrapper { + state: OutboundHTLCState, + waiting_on_peer_span: Option<(BoxedSpan, WaitingOnPeerReason)>, + waiting_on_monitor_persist_span: Option, + waiting_on_async_signing_span: Option, + // Drop full span last. + span: Option, +} + +impl OutboundHTLCStateWrapper { + fn new( + state: OutboundHTLCState, parent_span: Option<&<::Target as Logger>::UserSpan>, + logger: &L, + ) -> OutboundHTLCStateWrapper + where + L::Target: Logger, + { + let state_span = + logger.start(Span::OutboundHTLCState { state: (&state).into() }, parent_span); + OutboundHTLCStateWrapper { + state, + span: Some(BoxedSpan::new(state_span)), + waiting_on_peer_span: None, + waiting_on_monitor_persist_span: None, + waiting_on_async_signing_span: None, + } + } } /// See AwaitingRemoteRevoke ChannelState for more info @@ -459,6 +741,7 @@ enum HTLCUpdateAwaitingACK { // The extra fee we're skimming off the top of this HTLC. skimmed_fee_msat: Option, blinding_point: Option, + forward_span: Option, }, ClaimHTLC { payment_preimage: PaymentPreimage, @@ -1021,12 +1304,18 @@ impl<'a, L: Deref> Logger for WithChannelContext<'a, L> where L::Target: Logger, { + type UserSpan = <::Target as Logger>::UserSpan; + fn log(&self, mut record: Record) { record.peer_id = self.peer_id; record.channel_id = self.channel_id; record.payment_hash = self.payment_hash; self.logger.log(record) } + + fn start(&self, span: Span, parent: Option<&Self::UserSpan>) -> Self::UserSpan { + self.logger.start(span, parent) + } } impl<'a, 'b, L: Deref> WithChannelContext<'a, L> @@ -1179,7 +1468,7 @@ pub(super) struct MonitorRestoreUpdates { pub accepted_htlcs: Vec<(PendingHTLCInfo, u64)>, pub failed_htlcs: Vec<(HTLCSource, PaymentHash, HTLCFailReason)>, pub finalized_claimed_htlcs: Vec<(HTLCSource, Option)>, - pub pending_update_adds: Vec, + pub pending_update_adds: Vec<(msgs::UpdateAddHTLC, BoxedSpan, BoxedSpan)>, pub funding_broadcastable: Option, pub channel_ready: Option, pub announcement_sigs: Option, @@ -2309,7 +2598,7 @@ where monitor_pending_forwards: Vec<(PendingHTLCInfo, u64)>, monitor_pending_failures: Vec<(HTLCSource, PaymentHash, HTLCFailReason)>, monitor_pending_finalized_fulfills: Vec<(HTLCSource, Option)>, - monitor_pending_update_adds: Vec, + monitor_pending_update_adds: Vec<(msgs::UpdateAddHTLC, BoxedSpan, BoxedSpan)>, monitor_pending_tx_signatures: Option, /// If we went to send a revoke_and_ack but our signer was unable to give us a signature, @@ -3725,7 +4014,7 @@ where } if self.pending_inbound_htlcs.iter() - .any(|htlc| match htlc.state { + .any(|htlc| match htlc.state() { InboundHTLCState::Committed => false, // An HTLC removal from the local node is pending on the remote commitment. InboundHTLCState::LocalRemoved(_) => true, @@ -3739,7 +4028,7 @@ where } self.pending_outbound_htlcs.iter() - .any(|htlc| match htlc.state { + .any(|htlc| match htlc.state() { OutboundHTLCState::Committed => false, // An HTLC add from the local node is pending on the remote commitment. OutboundHTLCState::LocalAnnounced(_) => true, @@ -4144,7 +4433,7 @@ where .iter() .filter_map(|htlc| { matches!( - htlc.state, + htlc.state(), OutboundHTLCState::AwaitingRemoteRevokeToRemove(OutboundHTLCOutcome::Success(_, _)) | OutboundHTLCState::AwaitingRemovedRemoteRevoke(OutboundHTLCOutcome::Success(_, _)) ) @@ -4381,7 +4670,7 @@ where .iter() .filter_map(|htlc| { matches!( - htlc.state, + htlc.state(), OutboundHTLCState::AwaitingRemoteRevokeToRemove(OutboundHTLCOutcome::Success(_, _)) | OutboundHTLCState::AwaitingRemovedRemoteRevoke(OutboundHTLCOutcome::Success(_, _)) ) @@ -4462,26 +4751,26 @@ where let feerate_per_kw = feerate_per_kw.unwrap_or_else(|| self.get_commitment_feerate(funding, generated_by_local)); for htlc in self.pending_inbound_htlcs.iter() { - if htlc.state.included_in_commitment(generated_by_local) { + if htlc.state().included_in_commitment(generated_by_local) { if !htlc.is_dust(local, feerate_per_kw, broadcaster_dust_limit_sat, funding.get_channel_type()) { nondust_htlc_count += 1; } remote_htlc_total_msat += htlc.amount_msat; } else { - if htlc.state.preimage().is_some() { + if htlc.state().preimage().is_some() { value_to_self_claimed_msat += htlc.amount_msat; } } }; for htlc in self.pending_outbound_htlcs.iter() { - if htlc.state.included_in_commitment(generated_by_local) { + if htlc.state().included_in_commitment(generated_by_local) { if !htlc.is_dust(local, feerate_per_kw, broadcaster_dust_limit_sat, funding.get_channel_type()) { nondust_htlc_count += 1; } local_htlc_total_msat += htlc.amount_msat; } else { - if htlc.state.preimage().is_some() { + if htlc.state().preimage().is_some() { value_to_remote_claimed_msat += htlc.amount_msat; } } @@ -4579,12 +4868,12 @@ where let mut outbound_htlc_preimages: Vec = Vec::new(); for htlc in self.pending_inbound_htlcs.iter() { - if htlc.state.included_in_commitment(generated_by_local) { - log_trace!(logger, " ...including inbound {} HTLC {} (hash {}) with value {}", htlc.state, htlc.htlc_id, htlc.payment_hash, htlc.amount_msat); + if htlc.state().included_in_commitment(generated_by_local) { + log_trace!(logger, " ...including inbound {} HTLC {} (hash {}) with value {}", htlc.state(), htlc.htlc_id, htlc.payment_hash, htlc.amount_msat); add_htlc_output!(htlc, false, None); } else { - log_trace!(logger, " ...not including inbound HTLC {} (hash {}) with value {} due to state ({})", htlc.htlc_id, htlc.payment_hash, htlc.amount_msat, htlc.state); - if let Some(preimage) = htlc.state.preimage() { + log_trace!(logger, " ...not including inbound HTLC {} (hash {}) with value {} due to state ({})", htlc.htlc_id, htlc.payment_hash, htlc.amount_msat, htlc.state()); + if let Some(preimage) = htlc.state().preimage() { inbound_htlc_preimages.push(preimage); value_to_self_claimed_msat += htlc.amount_msat; } @@ -4592,15 +4881,15 @@ where }; for htlc in self.pending_outbound_htlcs.iter() { - if let Some(preimage) = htlc.state.preimage() { + if let Some(preimage) = htlc.state().preimage() { outbound_htlc_preimages.push(preimage); } - if htlc.state.included_in_commitment(generated_by_local) { - log_trace!(logger, " ...including outbound {} HTLC {} (hash {}) with value {}", htlc.state, htlc.htlc_id, htlc.payment_hash, htlc.amount_msat); + if htlc.state().included_in_commitment(generated_by_local) { + log_trace!(logger, " ...including outbound {} HTLC {} (hash {}) with value {}", htlc.state(), htlc.htlc_id, htlc.payment_hash, htlc.amount_msat); add_htlc_output!(htlc, true, Some(&htlc.source)); } else { - log_trace!(logger, " ...not including outbound HTLC {} (hash {}) with value {} due to state ({})", htlc.htlc_id, htlc.payment_hash, htlc.amount_msat, htlc.state); - if htlc.state.preimage().is_some() { + log_trace!(logger, " ...not including outbound HTLC {} (hash {}) with value {} due to state ({})", htlc.htlc_id, htlc.payment_hash, htlc.amount_msat, htlc.state()); + if htlc.state().preimage().is_some() { value_to_remote_claimed_msat += htlc.amount_msat; } } @@ -4843,7 +5132,7 @@ where ); let holder_dust_limit_success_sat = htlc_success_tx_fee_sat + self.holder_dust_limit_satoshis; for htlc in self.pending_inbound_htlcs.iter() { - if let Some(state_details) = (&htlc.state).into() { + if let Some(state_details) = htlc.state().into() { inbound_details.push(InboundHTLCDetails{ htlc_id: htlc.htlc_id, amount_msat: htlc.amount_msat, @@ -4874,7 +5163,7 @@ where cltv_expiry: htlc.cltv_expiry, payment_hash: htlc.payment_hash, skimmed_fee_msat: htlc.skimmed_fee_msat, - state: Some((&htlc.state).into()), + state: Some(htlc.state().into()), is_dust: htlc.amount_msat / 1000 < holder_dust_limit_timeout_sat, }); } @@ -5109,7 +5398,7 @@ where if htlc.amount_msat / 1000 < real_dust_limit_timeout_sat { continue } - match htlc.state { + match htlc.state() { OutboundHTLCState::LocalAnnounced {..} => included_htlcs += 1, OutboundHTLCState::Committed => included_htlcs += 1, OutboundHTLCState::RemoteRemoved {..} => included_htlcs += 1, @@ -5226,7 +5515,7 @@ where } // We only include outbound HTLCs if it will not be included in their next commitment_signed, // i.e. if they've responded to us with an RAA after announcement. - match htlc.state { + match htlc.state() { OutboundHTLCState::Committed => included_htlcs += 1, OutboundHTLCState::RemoteRemoved {..} => included_htlcs += 1, OutboundHTLCState::LocalAnnounced { .. } => included_htlcs += 1, @@ -6215,7 +6504,7 @@ where htlc.payment_hash, payment_preimage_arg ); - match htlc.state { + match htlc.state() { InboundHTLCState::Committed => {}, InboundHTLCState::LocalRemoved(ref reason) => { if let &InboundHTLCRemovalReason::Fulfill(_, _) = reason { @@ -6310,7 +6599,7 @@ where { let htlc = &mut self.context.pending_inbound_htlcs[pending_idx]; - if let InboundHTLCState::Committed = htlc.state { + if let InboundHTLCState::Committed = htlc.state() { } else { debug_assert!( false, @@ -6328,10 +6617,14 @@ where &htlc.payment_hash, &self.context.channel_id ); - htlc.state = InboundHTLCState::LocalRemoved(InboundHTLCRemovalReason::Fulfill( - payment_preimage_arg.clone(), - attribution_data, - )); + htlc.set_state( + InboundHTLCState::LocalRemoved(InboundHTLCRemovalReason::Fulfill( + payment_preimage_arg.clone(), + attribution_data, + )), + logger, + ); + htlc.waiting_on_monitor_persist(logger); } UpdateFulfillFetch::NewClaim { monitor_update, htlc_value_msat, update_blocked: false } @@ -6445,7 +6738,7 @@ where let mut pending_idx = core::usize::MAX; for (idx, htlc) in self.context.pending_inbound_htlcs.iter().enumerate() { if htlc.htlc_id == htlc_id_arg { - match htlc.state { + match htlc.state() { InboundHTLCState::Committed => {}, InboundHTLCState::LocalRemoved(_) => { return Err(ChannelError::Ignore(format!("HTLC {} was already resolved", htlc.htlc_id))); @@ -6495,7 +6788,7 @@ where E::Message::name(), &self.context.channel_id()); { let htlc = &mut self.context.pending_inbound_htlcs[pending_idx]; - htlc.state = err_contents.clone().to_inbound_htlc_state(); + htlc.set_state(err_contents.clone().to_inbound_htlc_state(), logger); } Ok(Some(err_contents.to_message(htlc_id_arg, self.context.channel_id()))) @@ -6608,9 +6901,9 @@ where } #[rustfmt::skip] - pub fn update_add_htlc( - &mut self, msg: &msgs::UpdateAddHTLC, fee_estimator: &LowerBoundedFeeEstimator, - ) -> Result<(), ChannelError> where F::Target: FeeEstimator { + pub fn update_add_htlc( + &mut self, msg: &msgs::UpdateAddHTLC, fee_estimator: &LowerBoundedFeeEstimator, logger: &L, + ) -> Result<(), ChannelError> where F::Target: FeeEstimator, L::Target: Logger { if self.context.channel_state.is_remote_stfu_sent() || self.context.channel_state.is_quiescent() { return Err(ChannelError::WarnAndDisconnect("Got add HTLC message while quiescent".to_owned())); } @@ -6643,22 +6936,31 @@ where // Now update local state: self.context.next_counterparty_htlc_id += 1; - self.context.pending_inbound_htlcs.push(InboundHTLCOutput { - htlc_id: msg.htlc_id, - amount_msat: msg.amount_msat, - payment_hash: msg.payment_hash, - cltv_expiry: msg.cltv_expiry, - state: InboundHTLCState::RemoteAnnounced(InboundHTLCResolution::Pending { - update_add_htlc: msg.clone(), - }), - }); + let mut output = InboundHTLCOutput::new( + self.context.channel_id(), + InboundHTLCOutputParams { + htlc_id: msg.htlc_id, + amount_msat: msg.amount_msat, + cltv_expiry: msg.cltv_expiry, + payment_hash: msg.payment_hash, + state: InboundHTLCState::RemoteAnnounced(InboundHTLCResolution::Pending { + update_add_htlc: msg.clone(), + }), + }, + logger, + ); + output.waiting_on_peer(logger); + self.context.pending_inbound_htlcs.push(output); + Ok(()) } /// Marks an outbound HTLC which we have received update_fail/fulfill/malformed #[inline] #[rustfmt::skip] - fn mark_outbound_htlc_removed(&mut self, htlc_id: u64, outcome: OutboundHTLCOutcome) -> Result<&OutboundHTLCOutput, ChannelError> { + fn mark_outbound_htlc_removed( + &mut self, htlc_id: u64, outcome: OutboundHTLCOutcome, logger: &L + ) -> Result<&OutboundHTLCOutput, ChannelError> where L::Target: Logger { for htlc in self.context.pending_outbound_htlcs.iter_mut() { if htlc.htlc_id == htlc_id { if let OutboundHTLCOutcome::Success(ref payment_preimage, ..) = outcome { @@ -6667,11 +6969,12 @@ where return Err(ChannelError::close(format!("Remote tried to fulfill HTLC ({}) with an incorrect preimage", htlc_id))); } } - match htlc.state { + match htlc.state() { OutboundHTLCState::LocalAnnounced(_) => return Err(ChannelError::close(format!("Remote tried to fulfill/fail HTLC ({}) before it had been committed", htlc_id))), OutboundHTLCState::Committed => { - htlc.state = OutboundHTLCState::RemoteRemoved(outcome); + htlc.set_state(OutboundHTLCState::RemoteRemoved(outcome), logger); + htlc.waiting_on_peer(logger, WaitingOnPeerReason::Commitment); }, OutboundHTLCState::AwaitingRemoteRevokeToRemove(_) | OutboundHTLCState::AwaitingRemovedRemoteRevoke(_) | OutboundHTLCState::RemoteRemoved(_) => return Err(ChannelError::close(format!("Remote tried to fulfill/fail HTLC ({}) that they'd already fulfilled/failed", htlc_id))), @@ -6682,9 +6985,12 @@ where Err(ChannelError::close("Remote tried to fulfill/fail an HTLC we couldn't find".to_owned())) } - pub fn update_fulfill_htlc( - &mut self, msg: &msgs::UpdateFulfillHTLC, - ) -> Result<(HTLCSource, u64, Option, Option), ChannelError> { + pub fn update_fulfill_htlc( + &mut self, msg: &msgs::UpdateFulfillHTLC, logger: &L, + ) -> Result<(HTLCSource, u64, Option, Option), ChannelError> + where + L::Target: Logger, + { if self.context.channel_state.is_remote_stfu_sent() || self.context.channel_state.is_quiescent() { @@ -6705,13 +7011,15 @@ where let outcome = OutboundHTLCOutcome::Success(msg.payment_preimage, msg.attribution_data.clone()); - self.mark_outbound_htlc_removed(msg.htlc_id, outcome).map(|htlc| { + self.mark_outbound_htlc_removed(msg.htlc_id, outcome, logger).map(|htlc| { (htlc.source.clone(), htlc.amount_msat, htlc.skimmed_fee_msat, htlc.send_timestamp) }) } #[rustfmt::skip] - pub fn update_fail_htlc(&mut self, msg: &msgs::UpdateFailHTLC, fail_reason: HTLCFailReason) -> Result<(), ChannelError> { + pub fn update_fail_htlc( + &mut self, msg: &msgs::UpdateFailHTLC, fail_reason: HTLCFailReason, logger: &L + ) -> Result<(), ChannelError> where L::Target: Logger { if self.context.channel_state.is_remote_stfu_sent() || self.context.channel_state.is_quiescent() { return Err(ChannelError::WarnAndDisconnect("Got fail HTLC message while quiescent".to_owned())); } @@ -6722,12 +7030,14 @@ where return Err(ChannelError::close("Peer sent update_fail_htlc when we needed a channel_reestablish".to_owned())); } - self.mark_outbound_htlc_removed(msg.htlc_id, OutboundHTLCOutcome::Failure(fail_reason))?; + self.mark_outbound_htlc_removed(msg.htlc_id, OutboundHTLCOutcome::Failure(fail_reason), logger)?; Ok(()) } #[rustfmt::skip] - pub fn update_fail_malformed_htlc(&mut self, msg: &msgs::UpdateFailMalformedHTLC, fail_reason: HTLCFailReason) -> Result<(), ChannelError> { + pub fn update_fail_malformed_htlc( + &mut self, msg: &msgs::UpdateFailMalformedHTLC, fail_reason: HTLCFailReason, logger: &L + ) -> Result<(), ChannelError> where L::Target: Logger { if self.context.channel_state.is_remote_stfu_sent() || self.context.channel_state.is_quiescent() { return Err(ChannelError::WarnAndDisconnect("Got fail malformed HTLC message while quiescent".to_owned())); } @@ -6738,7 +7048,7 @@ where return Err(ChannelError::close("Peer sent update_fail_malformed_htlc when we needed a channel_reestablish".to_owned())); } - self.mark_outbound_htlc_removed(msg.htlc_id, OutboundHTLCOutcome::Failure(fail_reason))?; + self.mark_outbound_htlc_removed(msg.htlc_id, OutboundHTLCOutcome::Failure(fail_reason), logger)?; Ok(()) } @@ -7050,33 +7360,50 @@ where } for htlc in self.context.pending_inbound_htlcs.iter_mut() { - if let &InboundHTLCState::RemoteAnnounced(ref htlc_resolution) = &htlc.state { + if let InboundHTLCState::RemoteAnnounced(ref htlc_resolution) = htlc.state() { log_trace!(logger, "Updating HTLC {} to AwaitingRemoteRevokeToAnnounce due to commitment_signed in channel {}.", &htlc.payment_hash, &self.context.channel_id); - htlc.state = - InboundHTLCState::AwaitingRemoteRevokeToAnnounce(htlc_resolution.clone()); + htlc.set_state( + InboundHTLCState::AwaitingRemoteRevokeToAnnounce(htlc_resolution.clone()), + logger, + ); + if self.context.channel_state.is_awaiting_remote_revoke() { + htlc.waiting_on_peer(logger); + } need_commitment = true; } } let mut claimed_htlcs = Vec::new(); for htlc in self.context.pending_outbound_htlcs.iter_mut() { - if let &mut OutboundHTLCState::RemoteRemoved(ref mut outcome) = &mut htlc.state { - log_trace!(logger, "Updating HTLC {} to AwaitingRemoteRevokeToRemove due to commitment_signed in channel {}.", - &htlc.payment_hash, &self.context.channel_id); - // Swap against a dummy variant to avoid a potentially expensive clone of `OutboundHTLCOutcome::Failure(HTLCFailReason)` - let mut reason = OutboundHTLCOutcome::Success(PaymentPreimage([0u8; 32]), None); - mem::swap(outcome, &mut reason); - if let OutboundHTLCOutcome::Success(preimage, _) = reason { - // If a user (a) receives an HTLC claim using LDK 0.0.104 or before, then (b) - // upgrades to LDK 0.0.114 or later before the HTLC is fully resolved, we could - // have a `Success(None)` reason. In this case we could forget some HTLC - // claims, but such an upgrade is unlikely and including claimed HTLCs here - // fixes a bug which the user was exposed to on 0.0.104 when they started the - // claim anyway. - claimed_htlcs.push((SentHTLCId::from_source(&htlc.source), preimage)); - } - htlc.state = OutboundHTLCState::AwaitingRemoteRevokeToRemove(reason); - need_commitment = true; + match &mut htlc.state_wrapper.state { + &mut OutboundHTLCState::RemoteRemoved(ref mut outcome) => { + log_trace!(logger, "Updating HTLC {} to AwaitingRemoteRevokeToRemove due to commitment_signed in channel {}.", + &htlc.payment_hash, &self.context.channel_id); + // Swap against a dummy variant to avoid a potentially expensive clone of `OutboundHTLCOutcome::Failure(HTLCFailReason)` + let mut reason = OutboundHTLCOutcome::Success(PaymentPreimage([0u8; 32]), None); + mem::swap(outcome, &mut reason); + if let OutboundHTLCOutcome::Success(preimage, _) = reason { + // If a user (a) receives an HTLC claim using LDK 0.0.104 or before, then (b) + // upgrades to LDK 0.0.114 or later before the HTLC is fully resolved, we could + // have a `Success(None)` reason. In this case we could forget some HTLC + // claims, but such an upgrade is unlikely and including claimed HTLCs here + // fixes a bug which the user was exposed to on 0.0.104 when they started the + // claim anyway. + claimed_htlcs.push((SentHTLCId::from_source(&htlc.source), preimage)); + } + htlc.set_state(OutboundHTLCState::AwaitingRemoteRevokeToRemove(reason), logger); + if self.context.channel_state.is_awaiting_remote_revoke() { + htlc.waiting_on_peer(logger, WaitingOnPeerReason::Revocation); + } + need_commitment = true; + }, + OutboundHTLCState::Committed => { + if htlc.is_waiting_on_peer(Some(WaitingOnPeerReason::Commitment)) { + htlc.peer_responded(); + htlc.waiting_on_monitor_persist(logger); + } + }, + _ => {}, } } @@ -7222,8 +7549,8 @@ where // the limit. In case it's less rare than I anticipate, we may want to revisit // handling this case better and maybe fulfilling some of the HTLCs while attempting // to rebalance channels. - let fail_htlc_res = match &htlc_update { - &HTLCUpdateAwaitingACK::AddHTLC { + let fail_htlc_res = match htlc_update { + HTLCUpdateAwaitingACK::AddHTLC { amount_msat, cltv_expiry, ref payment_hash, @@ -7231,6 +7558,7 @@ where ref onion_routing_packet, skimmed_fee_msat, blinding_point, + forward_span, .. } => { match self.send_htlc( @@ -7243,6 +7571,7 @@ where skimmed_fee_msat, blinding_point, fee_estimator, + forward_span, logger, ) { Ok(can_add_htlc) => { @@ -7267,7 +7596,7 @@ where } None }, - &HTLCUpdateAwaitingACK::ClaimHTLC { + HTLCUpdateAwaitingACK::ClaimHTLC { ref payment_preimage, htlc_id, ref attribution_data, @@ -7299,11 +7628,11 @@ where monitor_update.updates.append(&mut additional_monitor_update.updates); None }, - &HTLCUpdateAwaitingACK::FailHTLC { htlc_id, ref err_packet } => Some( + HTLCUpdateAwaitingACK::FailHTLC { htlc_id, ref err_packet } => Some( self.fail_htlc(htlc_id, err_packet.clone(), false, logger) .map(|fail_msg_opt| fail_msg_opt.map(|_| ())), ), - &HTLCUpdateAwaitingACK::FailMalformedHTLC { + HTLCUpdateAwaitingACK::FailMalformedHTLC { htlc_id, failure_code, sha256_of_onion, @@ -7488,7 +7817,7 @@ where &self.context.channel_id() ); let mut to_forward_infos = Vec::new(); - let mut pending_update_adds = Vec::new(); + let mut pending_update_adds = Vec::<(msgs::UpdateAddHTLC, BoxedSpan, BoxedSpan)>::new(); let mut revoked_htlcs = Vec::new(); let mut finalized_claimed_htlcs = Vec::new(); let mut update_fail_htlcs = Vec::new(); @@ -7505,7 +7834,7 @@ where // We really shouldnt have two passes here, but retain gives a non-mutable ref (Rust bug) pending_inbound_htlcs.retain(|htlc| { - if let &InboundHTLCState::LocalRemoved(ref reason) = &htlc.state { + if let &InboundHTLCState::LocalRemoved(ref reason) = htlc.state() { log_trace!(logger, " ...removing inbound LocalRemoved {}", &htlc.payment_hash); if let &InboundHTLCRemovalReason::Fulfill(_, _) = reason { value_to_self_msat_diff += htlc.amount_msat as i64; @@ -7517,7 +7846,7 @@ where } }); pending_outbound_htlcs.retain(|htlc| { - if let &OutboundHTLCState::AwaitingRemovedRemoteRevoke(ref outcome) = &htlc.state { + if let &OutboundHTLCState::AwaitingRemovedRemoteRevoke(ref outcome) = htlc.state() { log_trace!( logger, " ...removing outbound AwaitingRemovedRemoteRevoke {}", @@ -7546,21 +7875,26 @@ where } }); for htlc in pending_inbound_htlcs.iter_mut() { - let swap = if let &InboundHTLCState::AwaitingRemoteRevokeToAnnounce(_) = &htlc.state + let swap = if let &InboundHTLCState::AwaitingRemoteRevokeToAnnounce(_) = + htlc.state() { true - } else if let &InboundHTLCState::AwaitingAnnouncedRemoteRevoke(_) = &htlc.state { + } else if let &InboundHTLCState::AwaitingAnnouncedRemoteRevoke(_) = htlc.state() { true } else { false }; if swap { let mut state = InboundHTLCState::Committed; - mem::swap(&mut state, &mut htlc.state); + mem::swap(&mut state, &mut htlc.state_wrapper.state); if let InboundHTLCState::AwaitingRemoteRevokeToAnnounce(resolution) = state { log_trace!(logger, " ...promoting inbound AwaitingRemoteRevokeToAnnounce {} to AwaitingAnnouncedRemoteRevoke", &htlc.payment_hash); - htlc.state = InboundHTLCState::AwaitingAnnouncedRemoteRevoke(resolution); + htlc.set_state( + InboundHTLCState::AwaitingAnnouncedRemoteRevoke(resolution), + logger, + ); + htlc.waiting_on_monitor_persist(logger); require_commitment = true; } else if let InboundHTLCState::AwaitingAnnouncedRemoteRevoke(resolution) = state @@ -7573,58 +7907,78 @@ where require_commitment = true; match fail_msg { HTLCFailureMsg::Relay(msg) => { - htlc.state = InboundHTLCState::LocalRemoved( - InboundHTLCRemovalReason::FailRelay( - msg.clone().into(), + htlc.set_state( + InboundHTLCState::LocalRemoved( + InboundHTLCRemovalReason::FailRelay( + msg.clone().into(), + ), ), + logger, ); update_fail_htlcs.push(msg) }, HTLCFailureMsg::Malformed(msg) => { - htlc.state = InboundHTLCState::LocalRemoved( - InboundHTLCRemovalReason::FailMalformed(( - msg.sha256_of_onion, - msg.failure_code, - )), + htlc.set_state( + InboundHTLCState::LocalRemoved( + InboundHTLCRemovalReason::FailMalformed(( + msg.sha256_of_onion, + msg.failure_code, + )), + ), + logger, ); update_fail_malformed_htlcs.push(msg) }, } + htlc.waiting_on_monitor_persist(logger); }, PendingHTLCStatus::Forward(forward_info) => { log_trace!(logger, " ...promoting inbound AwaitingAnnouncedRemoteRevoke {} to Committed, attempting to forward", &htlc.payment_hash); to_forward_infos.push((forward_info, htlc.htlc_id)); - htlc.state = InboundHTLCState::Committed; + htlc.set_state(InboundHTLCState::Committed, logger); }, } }, InboundHTLCResolution::Pending { update_add_htlc } => { log_trace!(logger, " ...promoting inbound AwaitingAnnouncedRemoteRevoke {} to Committed", &htlc.payment_hash); - pending_update_adds.push(update_add_htlc); - htlc.state = InboundHTLCState::Committed; + htlc.set_state(InboundHTLCState::Committed, logger); + let forward_span = BoxedSpan::new( + logger.start(Span::Forward, htlc.span.as_user_span_ref::()), + ); + let waiting_on_persist_span = BoxedSpan::new(logger.start( + Span::WaitingOnMonitorPersist, + forward_span.as_user_span_ref::(), + )); + pending_update_adds.push(( + update_add_htlc, + waiting_on_persist_span, + forward_span, + )); }, } } } } for htlc in pending_outbound_htlcs.iter_mut() { - if let OutboundHTLCState::LocalAnnounced(_) = htlc.state { + if let OutboundHTLCState::LocalAnnounced(_) = htlc.state() { log_trace!( logger, " ...promoting outbound LocalAnnounced {} to Committed", &htlc.payment_hash ); - htlc.state = OutboundHTLCState::Committed; + htlc.set_state(OutboundHTLCState::Committed, logger); + htlc.waiting_on_peer(logger, WaitingOnPeerReason::Commitment); *expecting_peer_commitment_signed = true; } if let &mut OutboundHTLCState::AwaitingRemoteRevokeToRemove(ref mut outcome) = - &mut htlc.state + &mut htlc.state_wrapper.state { log_trace!(logger, " ...promoting outbound AwaitingRemoteRevokeToRemove {} to AwaitingRemovedRemoteRevoke", &htlc.payment_hash); // Swap against a dummy variant to avoid a potentially expensive clone of `OutboundHTLCOutcome::Failure(HTLCFailReason)` let mut reason = OutboundHTLCOutcome::Success(PaymentPreimage([0u8; 32]), None); mem::swap(outcome, &mut reason); - htlc.state = OutboundHTLCState::AwaitingRemovedRemoteRevoke(reason); + htlc.set_state(OutboundHTLCState::AwaitingRemovedRemoteRevoke(reason), logger); + htlc.waiting_on_monitor_persist(logger); require_commitment = true; } } @@ -7954,7 +8308,7 @@ where let mut inbound_drop_count = 0; self.context.pending_inbound_htlcs.retain(|htlc| { - match htlc.state { + match htlc.state() { InboundHTLCState::RemoteAnnounced(_) => { // They sent us an update_add_htlc but we never got the commitment_signed. // We'll tell them what commitment_signed we're expecting next and they'll drop @@ -7988,11 +8342,12 @@ where } for htlc in self.context.pending_outbound_htlcs.iter_mut() { - if let OutboundHTLCState::RemoteRemoved(_) = htlc.state { + if let OutboundHTLCState::RemoteRemoved(_) = htlc.state() { // They sent us an update to remove this but haven't yet sent the corresponding // commitment_signed, we need to move it back to Committed and they can re-send // the update upon reconnection. - htlc.state = OutboundHTLCState::Committed; + htlc.set_state(OutboundHTLCState::Committed, logger); + htlc.waiting_on_peer(logger, WaitingOnPeerReason::HTLCResolution); } } @@ -8092,7 +8447,11 @@ where let mut finalized_claimed_htlcs = Vec::new(); mem::swap(&mut finalized_claimed_htlcs, &mut self.context.monitor_pending_finalized_fulfills); let mut pending_update_adds = Vec::new(); - mem::swap(&mut pending_update_adds, &mut self.context.monitor_pending_update_adds); + for (msg, waiting_on_persist_span, forward_span) in self.context.monitor_pending_update_adds.drain(..) { + mem::drop(waiting_on_persist_span); + let waiting_on_forward_span = BoxedSpan::new(logger.start(Span::WaitingOnForward, forward_span.as_user_span_ref::())); + pending_update_adds.push((msg, waiting_on_forward_span, forward_span)); + } // For channels established with V2 establishment we won't send a `tx_signatures` when we're in // MonitorUpdateInProgress (and we assume the user will never directly broadcast the funding // transaction and waits for us to do it). @@ -8132,6 +8491,49 @@ where commitment_update = None; } + for htlc in self.context.pending_inbound_htlcs.iter_mut() { + match htlc.state() { + InboundHTLCState::AwaitingAnnouncedRemoteRevoke(_) | + InboundHTLCState::LocalRemoved(_) => { + if htlc.is_waiting_on_monitor_persist() { + htlc.monitor_persisted(); + if commitment_update.is_some() { + htlc.waiting_on_peer(logger); + } else if self.context.signer_pending_commitment_update { + htlc.waiting_on_async_signing(logger); + } + } + }, + _ => {}, + } + } + for htlc in self.context.pending_outbound_htlcs.iter_mut() { + match htlc.state() { + OutboundHTLCState::LocalAnnounced(_) | + OutboundHTLCState::AwaitingRemovedRemoteRevoke(_) => { + if htlc.is_waiting_on_monitor_persist() { + htlc.monitor_persisted(); + if commitment_update.is_some() { + htlc.waiting_on_peer(logger, WaitingOnPeerReason::Revocation); + } else if self.context.signer_pending_commitment_update { + htlc.waiting_on_async_signing(logger); + } + } + }, + OutboundHTLCState::Committed => { + if htlc.is_waiting_on_monitor_persist() { + htlc.monitor_persisted(); + if raa.is_some() { + htlc.waiting_on_peer(logger, WaitingOnPeerReason::HTLCResolution); + } else if self.context.signer_pending_revoke_and_ack { + htlc.waiting_on_async_signing(logger); + } + } + }, + _ => {}, + } + } + self.context.monitor_pending_revoke_and_ack = false; self.context.monitor_pending_commitment_signed = false; let order = self.context.resend_order.clone(); @@ -8275,6 +8677,37 @@ where } else { (None, None, None) } } else { (None, None, None) }; + for htlc in self.context.pending_inbound_htlcs.iter_mut() { + match htlc.state() { + InboundHTLCState::AwaitingAnnouncedRemoteRevoke(_) | + InboundHTLCState::LocalRemoved(_) => { + if htlc.is_waiting_on_async_signing() && commitment_update.is_some() { + htlc.async_signing_completed(); + htlc.waiting_on_peer(logger); + } + } + _ => {}, + } + } + for htlc in self.context.pending_outbound_htlcs.iter_mut() { + match htlc.state() { + OutboundHTLCState::LocalAnnounced(_) | + OutboundHTLCState::AwaitingRemovedRemoteRevoke(_) => { + if htlc.is_waiting_on_async_signing() && commitment_update.is_some() { + htlc.async_signing_completed(); + htlc.waiting_on_peer(logger, WaitingOnPeerReason::Revocation); + } + }, + OutboundHTLCState::Committed => { + if htlc.is_waiting_on_async_signing() && revoke_and_ack.is_some() { + htlc.async_signing_completed(); + htlc.waiting_on_peer(logger, WaitingOnPeerReason::HTLCResolution); + } + }, + _ => {}, + } + } + log_trace!(logger, "Signer unblocked with {} commitment_update, {} revoke_and_ack, with resend order {:?}, {} funding_signed, {} channel_ready, {} closing_signed, {} signed_closing_tx, and {} shutdown result", if commitment_update.is_some() { "a" } else { "no" }, @@ -8352,7 +8785,7 @@ where let mut update_fail_malformed_htlcs = Vec::new(); for htlc in self.context.pending_outbound_htlcs.iter() { - if let &OutboundHTLCState::LocalAnnounced(ref onion_packet) = &htlc.state { + if let &OutboundHTLCState::LocalAnnounced(ref onion_packet) = htlc.state() { update_add_htlcs.push(msgs::UpdateAddHTLC { channel_id: self.context.channel_id(), htlc_id: htlc.htlc_id, @@ -8367,7 +8800,7 @@ where } for htlc in self.context.pending_inbound_htlcs.iter() { - if let &InboundHTLCState::LocalRemoved(ref reason) = &htlc.state { + if let &InboundHTLCState::LocalRemoved(ref reason) = htlc.state() { match reason { &InboundHTLCRemovalReason::FailRelay(ref err_packet) => { update_fail_htlcs.push(msgs::UpdateFailHTLC { @@ -8416,23 +8849,25 @@ where log_trace!(logger, "Regenerating latest commitment update in channel {} with{} {} update_adds, {} update_fulfills, {} update_fails, and {} update_fail_malformeds", &self.context.channel_id(), if update_fee.is_some() { " update_fee," } else { "" }, update_add_htlcs.len(), update_fulfill_htlcs.len(), update_fail_htlcs.len(), update_fail_malformed_htlcs.len()); - let commitment_signed = - if let Ok(update) = self.send_commitment_no_state_update(logger) { - if self.context.signer_pending_commitment_update { - log_trace!( - logger, - "Commitment update generated: clearing signer_pending_commitment_update" - ); - self.context.signer_pending_commitment_update = false; - } - update - } else { - if !self.context.signer_pending_commitment_update { - log_trace!(logger, "Commitment update awaiting signer: setting signer_pending_commitment_update"); - self.context.signer_pending_commitment_update = true; - } - return Err(()); - }; + let commitment_signed = if let Ok(update) = self.send_commitment_no_state_update(logger) { + if self.context.signer_pending_commitment_update { + log_trace!( + logger, + "Commitment update generated: clearing signer_pending_commitment_update" + ); + self.context.signer_pending_commitment_update = false; + } + update + } else { + if !self.context.signer_pending_commitment_update { + log_trace!( + logger, + "Commitment update awaiting signer: setting signer_pending_commitment_update" + ); + self.context.signer_pending_commitment_update = true; + } + return Err(()); + }; Ok(msgs::CommitmentUpdate { update_add_htlcs, update_fulfill_htlcs, @@ -8937,7 +9372,7 @@ where )); } for htlc in self.context.pending_inbound_htlcs.iter() { - if let InboundHTLCState::RemoteAnnounced(_) = htlc.state { + if let InboundHTLCState::RemoteAnnounced(_) = htlc.state() { return Err(ChannelError::close( "Got shutdown with remote pending HTLCs".to_owned(), )); @@ -10586,7 +11021,8 @@ where pub fn queue_add_htlc( &mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource, onion_routing_packet: msgs::OnionPacket, skimmed_fee_msat: Option, - blinding_point: Option, fee_estimator: &LowerBoundedFeeEstimator, logger: &L, + blinding_point: Option, fee_estimator: &LowerBoundedFeeEstimator, + forward_span: Option, logger: &L, ) -> Result<(), (LocalHTLCFailureReason, String)> where F::Target: FeeEstimator, @@ -10602,6 +11038,7 @@ where skimmed_fee_msat, blinding_point, fee_estimator, + forward_span, logger, ) .map(|can_add_htlc| assert!(!can_add_htlc, "We forced holding cell?")) @@ -10632,7 +11069,7 @@ where &mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource, onion_routing_packet: msgs::OnionPacket, mut force_holding_cell: bool, skimmed_fee_msat: Option, blinding_point: Option, - fee_estimator: &LowerBoundedFeeEstimator, logger: &L, + fee_estimator: &LowerBoundedFeeEstimator, forward_span: Option, logger: &L, ) -> Result where F::Target: FeeEstimator, @@ -10713,6 +11150,7 @@ where onion_routing_packet, skimmed_fee_msat, blinding_point, + forward_span, }); return Ok(false); } @@ -10724,17 +11162,24 @@ where // that are simple to implement, and we do it on the outgoing side because then the failure message that encodes // the hold time still needs to be built in channel manager. let send_timestamp = duration_since_epoch(); - self.context.pending_outbound_htlcs.push(OutboundHTLCOutput { - htlc_id: self.context.next_holder_htlc_id, - amount_msat, - payment_hash: payment_hash.clone(), - cltv_expiry, - state: OutboundHTLCState::LocalAnnounced(Box::new(onion_routing_packet.clone())), - source, - blinding_point, - skimmed_fee_msat, - send_timestamp, - }); + let mut htlc = OutboundHTLCOutput::new( + self.context.channel_id(), + OutboundHTLCOutputParams { + htlc_id: self.context.next_holder_htlc_id, + amount_msat, + cltv_expiry, + payment_hash: payment_hash.clone(), + state: OutboundHTLCState::LocalAnnounced(Box::new(onion_routing_packet.clone())), + source, + blinding_point, + skimmed_fee_msat, + send_timestamp, + }, + forward_span, + logger, + ); + htlc.waiting_on_monitor_persist(logger); + self.context.pending_outbound_htlcs.push(htlc); self.context.next_holder_htlc_id += 1; Ok(true) @@ -10768,21 +11213,23 @@ where // fail to generate this, we still are at least at a position where upgrading their status // is acceptable. for htlc in self.context.pending_inbound_htlcs.iter_mut() { - let new_state = if let &InboundHTLCState::AwaitingRemoteRevokeToAnnounce(ref forward_info) = &htlc.state { + let new_state = if let InboundHTLCState::AwaitingRemoteRevokeToAnnounce(ref forward_info) = htlc.state() { Some(InboundHTLCState::AwaitingAnnouncedRemoteRevoke(forward_info.clone())) } else { None }; if let Some(state) = new_state { log_trace!(logger, " ...promoting inbound AwaitingRemoteRevokeToAnnounce {} to AwaitingAnnouncedRemoteRevoke", &htlc.payment_hash); - htlc.state = state; + htlc.set_state(state, logger); + htlc.waiting_on_monitor_persist(logger); } } for htlc in self.context.pending_outbound_htlcs.iter_mut() { - if let &mut OutboundHTLCState::AwaitingRemoteRevokeToRemove(ref mut outcome) = &mut htlc.state { + if let &mut OutboundHTLCState::AwaitingRemoteRevokeToRemove(ref mut outcome) = &mut htlc.state_wrapper.state { log_trace!(logger, " ...promoting outbound AwaitingRemoteRevokeToRemove {} to AwaitingRemovedRemoteRevoke", &htlc.payment_hash); // Swap against a dummy variant to avoid a potentially expensive clone of `OutboundHTLCOutcome::Failure(HTLCFailReason)` let mut reason = OutboundHTLCOutcome::Success(PaymentPreimage([0u8; 32]), None); mem::swap(outcome, &mut reason); - htlc.state = OutboundHTLCState::AwaitingRemovedRemoteRevoke(reason); + htlc.set_state(OutboundHTLCState::AwaitingRemovedRemoteRevoke(reason), logger); + htlc.waiting_on_monitor_persist(logger); } } if let Some((feerate, update_state)) = self.context.pending_update_fee { @@ -10977,7 +11424,7 @@ where pub fn send_htlc_and_commit( &mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource, onion_routing_packet: msgs::OnionPacket, skimmed_fee_msat: Option, - fee_estimator: &LowerBoundedFeeEstimator, logger: &L, + fee_estimator: &LowerBoundedFeeEstimator, forward_span: Option, logger: &L, ) -> Result, ChannelError> where F::Target: FeeEstimator, @@ -10993,6 +11440,7 @@ where skimmed_fee_msat, None, fee_estimator, + forward_span, logger, ); // All [`LocalHTLCFailureReason`] errors are temporary, so they are [`ChannelError::Ignore`]. @@ -11041,7 +11489,7 @@ where }); } for htlc in self.context.pending_outbound_htlcs.iter() { - if let OutboundHTLCState::LocalAnnounced(_) = htlc.state { + if let OutboundHTLCState::LocalAnnounced(_) = htlc.state() { return Err(APIError::APIMisuseError { err: "Cannot begin shutdown with pending HTLCs. Process pending events first" .to_owned(), @@ -12507,21 +12955,21 @@ where let mut dropped_inbound_htlcs = 0; for htlc in self.context.pending_inbound_htlcs.iter() { - if let InboundHTLCState::RemoteAnnounced(_) = htlc.state { + if let InboundHTLCState::RemoteAnnounced(_) = htlc.state() { dropped_inbound_htlcs += 1; } } let mut removed_htlc_attribution_data: Vec<&Option> = Vec::new(); (self.context.pending_inbound_htlcs.len() as u64 - dropped_inbound_htlcs).write(writer)?; for htlc in self.context.pending_inbound_htlcs.iter() { - if let &InboundHTLCState::RemoteAnnounced(_) = &htlc.state { + if let &InboundHTLCState::RemoteAnnounced(_) = htlc.state() { continue; // Drop } htlc.htlc_id.write(writer)?; htlc.amount_msat.write(writer)?; htlc.cltv_expiry.write(writer)?; htlc.payment_hash.write(writer)?; - match &htlc.state { + match htlc.state() { &InboundHTLCState::RemoteAnnounced(_) => unreachable!(), &InboundHTLCState::AwaitingRemoteRevokeToAnnounce(ref htlc_resolution) => { 1u8.write(writer)?; @@ -12573,7 +13021,7 @@ where htlc.cltv_expiry.write(writer)?; htlc.payment_hash.write(writer)?; htlc.source.write(writer)?; - match &htlc.state { + match htlc.state() { &OutboundHTLCState::LocalAnnounced(ref onion_packet) => { 0u8.write(writer)?; onion_packet.write(writer)?; @@ -12629,6 +13077,7 @@ where ref onion_routing_packet, blinding_point, skimmed_fee_msat, + forward_span: _, } => { 0u8.write(writer)?; amount_msat.write(writer)?; @@ -12828,7 +13277,9 @@ where let mut monitor_pending_update_adds = None; if !self.context.monitor_pending_update_adds.is_empty() { - monitor_pending_update_adds = Some(&self.context.monitor_pending_update_adds); + monitor_pending_update_adds = Some( + self.context.monitor_pending_update_adds.iter().map(|a| &a.0).collect::>(), + ); } let is_manual_broadcast = Some(self.context.is_manual_broadcast); @@ -12890,16 +13341,17 @@ where } } -impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, &'c ChannelTypeFeatures)> - for FundedChannel +impl<'a, 'b, 'c, 'd, ES: Deref, SP: Deref, L: Deref> + ReadableArgs<(&'a ES, &'b SP, &'c ChannelTypeFeatures, &'d L)> for FundedChannel where ES::Target: EntropySource, SP::Target: SignerProvider, + L::Target: Logger, { fn read( - reader: &mut R, args: (&'a ES, &'b SP, &'c ChannelTypeFeatures), + reader: &mut R, args: (&'a ES, &'b SP, &'c ChannelTypeFeatures, &'d L), ) -> Result { - let (entropy_source, signer_provider, our_supported_features) = args; + let (entropy_source, signer_provider, our_supported_features, logger) = args; let ver = read_ver_prefix!(reader, SERIALIZATION_VERSION); if ver <= 2 { return Err(DecodeError::UnknownVersion); @@ -12941,48 +13393,51 @@ where DEFAULT_MAX_HTLCS as usize, )); for _ in 0..pending_inbound_htlc_count { - pending_inbound_htlcs.push(InboundHTLCOutput { - htlc_id: Readable::read(reader)?, - amount_msat: Readable::read(reader)?, - cltv_expiry: Readable::read(reader)?, - payment_hash: Readable::read(reader)?, - state: match ::read(reader)? { - 1 => { - let resolution = if ver <= 3 { - InboundHTLCResolution::Resolved { - pending_htlc_status: Readable::read(reader)?, - } - } else { - Readable::read(reader)? - }; - InboundHTLCState::AwaitingRemoteRevokeToAnnounce(resolution) - }, - 2 => { - let resolution = if ver <= 3 { - InboundHTLCResolution::Resolved { - pending_htlc_status: Readable::read(reader)?, - } - } else { - Readable::read(reader)? - }; - InboundHTLCState::AwaitingAnnouncedRemoteRevoke(resolution) - }, - 3 => InboundHTLCState::Committed, - 4 => { - let reason = match ::read(reader)? { - 0 => InboundHTLCRemovalReason::FailRelay(msgs::OnionErrorPacket { - data: Readable::read(reader)?, - attribution_data: None, - }), - 1 => InboundHTLCRemovalReason::FailMalformed(Readable::read(reader)?), - 2 => InboundHTLCRemovalReason::Fulfill(Readable::read(reader)?, None), - _ => return Err(DecodeError::InvalidValue), - }; - InboundHTLCState::LocalRemoved(reason) - }, - _ => return Err(DecodeError::InvalidValue), + let htlc_id = Readable::read(reader)?; + let amount_msat = Readable::read(reader)?; + let cltv_expiry = Readable::read(reader)?; + let payment_hash = Readable::read(reader)?; + let state = match ::read(reader)? { + 1 => { + let resolution = if ver <= 3 { + InboundHTLCResolution::Resolved { + pending_htlc_status: Readable::read(reader)?, + } + } else { + Readable::read(reader)? + }; + InboundHTLCState::AwaitingRemoteRevokeToAnnounce(resolution) }, - }); + 2 => { + let resolution = if ver <= 3 { + InboundHTLCResolution::Resolved { + pending_htlc_status: Readable::read(reader)?, + } + } else { + Readable::read(reader)? + }; + InboundHTLCState::AwaitingAnnouncedRemoteRevoke(resolution) + }, + 3 => InboundHTLCState::Committed, + 4 => { + let reason = match ::read(reader)? { + 0 => InboundHTLCRemovalReason::FailRelay(msgs::OnionErrorPacket { + data: Readable::read(reader)?, + attribution_data: None, + }), + 1 => InboundHTLCRemovalReason::FailMalformed(Readable::read(reader)?), + 2 => InboundHTLCRemovalReason::Fulfill(Readable::read(reader)?, None), + _ => return Err(DecodeError::InvalidValue), + }; + InboundHTLCState::LocalRemoved(reason) + }, + _ => return Err(DecodeError::InvalidValue), + }; + pending_inbound_htlcs.push(InboundHTLCOutput::new( + channel_id, + InboundHTLCOutputParams { htlc_id, amount_msat, cltv_expiry, payment_hash, state }, + logger, + )); } let pending_outbound_htlc_count: u64 = Readable::read(reader)?; @@ -12991,48 +13446,59 @@ where DEFAULT_MAX_HTLCS as usize, )); for _ in 0..pending_outbound_htlc_count { - pending_outbound_htlcs.push(OutboundHTLCOutput { - htlc_id: Readable::read(reader)?, - amount_msat: Readable::read(reader)?, - cltv_expiry: Readable::read(reader)?, - payment_hash: Readable::read(reader)?, - source: Readable::read(reader)?, - state: match ::read(reader)? { - 0 => OutboundHTLCState::LocalAnnounced(Box::new(Readable::read(reader)?)), - 1 => OutboundHTLCState::Committed, - 2 => { - let option: Option = Readable::read(reader)?; - let outcome = match option { - Some(r) => OutboundHTLCOutcome::Failure(r), - // Initialize this variant with a dummy preimage, the actual preimage will be filled in further down - None => OutboundHTLCOutcome::Success(PaymentPreimage([0u8; 32]), None), - }; - OutboundHTLCState::RemoteRemoved(outcome) - }, - 3 => { - let option: Option = Readable::read(reader)?; - let outcome = match option { - Some(r) => OutboundHTLCOutcome::Failure(r), - // Initialize this variant with a dummy preimage, the actual preimage will be filled in further down - None => OutboundHTLCOutcome::Success(PaymentPreimage([0u8; 32]), None), - }; - OutboundHTLCState::AwaitingRemoteRevokeToRemove(outcome) - }, - 4 => { - let option: Option = Readable::read(reader)?; - let outcome = match option { - Some(r) => OutboundHTLCOutcome::Failure(r), - // Initialize this variant with a dummy preimage, the actual preimage will be filled in further down - None => OutboundHTLCOutcome::Success(PaymentPreimage([0u8; 32]), None), - }; - OutboundHTLCState::AwaitingRemovedRemoteRevoke(outcome) - }, - _ => return Err(DecodeError::InvalidValue), + let htlc_id = Readable::read(reader)?; + let amount_msat = Readable::read(reader)?; + let cltv_expiry = Readable::read(reader)?; + let payment_hash = Readable::read(reader)?; + let source = Readable::read(reader)?; + let state = match ::read(reader)? { + 0 => OutboundHTLCState::LocalAnnounced(Box::new(Readable::read(reader)?)), + 1 => OutboundHTLCState::Committed, + 2 => { + let option: Option = Readable::read(reader)?; + let outcome = match option { + Some(r) => OutboundHTLCOutcome::Failure(r), + // Initialize this variant with a dummy preimage, the actual preimage will be filled in further down + None => OutboundHTLCOutcome::Success(PaymentPreimage([0u8; 32]), None), + }; + OutboundHTLCState::RemoteRemoved(outcome) }, - skimmed_fee_msat: None, - blinding_point: None, - send_timestamp: None, - }); + 3 => { + let option: Option = Readable::read(reader)?; + let outcome = match option { + Some(r) => OutboundHTLCOutcome::Failure(r), + // Initialize this variant with a dummy preimage, the actual preimage will be filled in further down + None => OutboundHTLCOutcome::Success(PaymentPreimage([0u8; 32]), None), + }; + OutboundHTLCState::AwaitingRemoteRevokeToRemove(outcome) + }, + 4 => { + let option: Option = Readable::read(reader)?; + let outcome = match option { + Some(r) => OutboundHTLCOutcome::Failure(r), + // Initialize this variant with a dummy preimage, the actual preimage will be filled in further down + None => OutboundHTLCOutcome::Success(PaymentPreimage([0u8; 32]), None), + }; + OutboundHTLCState::AwaitingRemovedRemoteRevoke(outcome) + }, + _ => return Err(DecodeError::InvalidValue), + }; + pending_outbound_htlcs.push(OutboundHTLCOutput::new( + channel_id, + OutboundHTLCOutputParams { + htlc_id, + amount_msat, + cltv_expiry, + payment_hash, + state, + source, + blinding_point: None, + skimmed_fee_msat: None, + send_timestamp: None, + }, + None, + logger, + )); } let holding_cell_htlc_update_count: u64 = Readable::read(reader)?; @@ -13050,6 +13516,7 @@ where onion_routing_packet: Readable::read(reader)?, skimmed_fee_msat: None, blinding_point: None, + forward_span: None, }, 1 => HTLCUpdateAwaitingACK::ClaimHTLC { payment_preimage: Readable::read(reader)?, @@ -13293,7 +13760,7 @@ where let mut iter = preimages.into_iter(); let mut fulfill_attribution_data_iter = fulfill_attribution_data.map(Vec::into_iter); for htlc in pending_outbound_htlcs.iter_mut() { - match &mut htlc.state { + match &mut htlc.state_wrapper.state { OutboundHTLCState::AwaitingRemoteRevokeToRemove(OutboundHTLCOutcome::Success( ref mut preimage, ref mut attribution_data, @@ -13390,7 +13857,7 @@ where if let Some(attribution_data_list) = removed_htlc_attribution_data { let mut removed_htlcs = pending_inbound_htlcs.iter_mut().filter_map(|status| { - if let InboundHTLCState::LocalRemoved(reason) = &mut status.state { + if let InboundHTLCState::LocalRemoved(reason) = &mut status.state_wrapper.state { match reason { InboundHTLCRemovalReason::FailRelay(ref mut packet) => { Some(&mut packet.attribution_data) @@ -13495,6 +13962,18 @@ where }, }; + let monitor_pending_update_adds = monitor_pending_update_adds + .unwrap_or_default() + .into_iter() + .map(|msg| { + let forward_span = BoxedSpan::new(logger.start(Span::Forward, None)); + let waiting_span = BoxedSpan::new( + logger.start(Span::WaitingOnForward, forward_span.as_user_span_ref::()), + ); + (msg, waiting_span, forward_span) + }) + .collect::>(); + Ok(FundedChannel { funding: FundingScope { value_to_self_msat, @@ -13558,7 +14037,7 @@ where monitor_pending_forwards, monitor_pending_failures, monitor_pending_finalized_fulfills: monitor_pending_finalized_fulfills.unwrap(), - monitor_pending_update_adds: monitor_pending_update_adds.unwrap_or_default(), + monitor_pending_update_adds, monitor_pending_tx_signatures: None, signer_pending_revoke_and_ack: false, @@ -13673,8 +14152,9 @@ mod tests { use crate::ln::chan_utils::{self, commit_tx_fee_sat}; use crate::ln::channel::{ AwaitingChannelReadyFlags, ChannelState, FundedChannel, HTLCCandidate, HTLCInitiator, - HTLCUpdateAwaitingACK, InboundHTLCOutput, InboundHTLCState, InboundV1Channel, - OutboundHTLCOutput, OutboundHTLCState, OutboundV1Channel, + HTLCUpdateAwaitingACK, InboundHTLCOutput, InboundHTLCOutputParams, InboundHTLCState, + InboundV1Channel, OutboundHTLCOutput, OutboundHTLCOutputParams, OutboundHTLCState, + OutboundV1Channel, }; use crate::ln::channel::{ MAX_FUNDING_SATOSHIS_NO_WUMBO, MIN_THEIR_CHAN_RESERVE_SATOSHIS, @@ -13920,31 +14400,40 @@ mod tests { // Put some inbound and outbound HTLCs in A's channel. let htlc_amount_msat = 11_092_000; // put an amount below A's effective dust limit but above B's. - node_a_chan.context.pending_inbound_htlcs.push(InboundHTLCOutput { - htlc_id: 0, - amount_msat: htlc_amount_msat, - payment_hash: PaymentHash(Sha256::hash(&[42; 32]).to_byte_array()), - cltv_expiry: 300000000, - state: InboundHTLCState::Committed, - }); + node_a_chan.context.pending_inbound_htlcs.push(InboundHTLCOutput::new( + node_a_chan.context.channel_id(), + InboundHTLCOutputParams { + htlc_id: 0, + amount_msat: htlc_amount_msat, + cltv_expiry: 300000000, + payment_hash: PaymentHash(Sha256::hash(&[42; 32]).to_byte_array()), + state: InboundHTLCState::Committed, + }, + &&logger, + )); - node_a_chan.context.pending_outbound_htlcs.push(OutboundHTLCOutput { - htlc_id: 1, - amount_msat: htlc_amount_msat, // put an amount below A's dust amount but above B's. - payment_hash: PaymentHash(Sha256::hash(&[43; 32]).to_byte_array()), - cltv_expiry: 200000000, - state: OutboundHTLCState::Committed, - source: HTLCSource::OutboundRoute { - path: Path { hops: Vec::new(), blinded_tail: None }, - session_priv: SecretKey::from_slice(&>::from_hex("0fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff").unwrap()[..]).unwrap(), - first_hop_htlc_msat: 548, - payment_id: PaymentId([42; 32]), - bolt12_invoice: None, + node_a_chan.context.pending_outbound_htlcs.push(OutboundHTLCOutput::new( + node_a_chan.context.channel_id(), + OutboundHTLCOutputParams { + htlc_id: 1, + amount_msat: htlc_amount_msat, // put an amount below A's dust amount but above B's. + payment_hash: PaymentHash(Sha256::hash(&[43; 32]).to_byte_array()), + cltv_expiry: 200000000, + state: OutboundHTLCState::Committed, + source: HTLCSource::OutboundRoute { + path: Path { hops: Vec::new(), blinded_tail: None }, + session_priv: SecretKey::from_slice(&>::from_hex("0fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff").unwrap()[..]).unwrap(), + first_hop_htlc_msat: 548, + payment_id: PaymentId([42; 32]), + bolt12_invoice: None, + }, + skimmed_fee_msat: None, + blinding_point: None, + send_timestamp: None, }, - skimmed_fee_msat: None, - blinding_point: None, - send_timestamp: None, - }); + None, + &&logger, + )); // Make sure when Node A calculates their local commitment transaction, none of the HTLCs pass // the dust limit check. @@ -14388,17 +14877,22 @@ mod tests { payment_id: PaymentId([42; 32]), bolt12_invoice: None, }; - let dummy_outbound_output = OutboundHTLCOutput { - htlc_id: 0, - amount_msat: 0, - payment_hash: PaymentHash([43; 32]), - cltv_expiry: 0, - state: OutboundHTLCState::Committed, - source: dummy_htlc_source.clone(), - skimmed_fee_msat: None, - blinding_point: None, - send_timestamp: None, - }; + let dummy_outbound_output = OutboundHTLCOutput::new( + chan.context.channel_id(), + OutboundHTLCOutputParams { + htlc_id: 0, + amount_msat: 0, + payment_hash: PaymentHash([43; 32]), + cltv_expiry: 0, + state: OutboundHTLCState::Committed, + source: dummy_htlc_source.clone(), + skimmed_fee_msat: None, + blinding_point: None, + send_timestamp: None, + }, + None, + &&logger, + ); let mut pending_outbound_htlcs = vec![dummy_outbound_output.clone(); 10]; for (idx, htlc) in pending_outbound_htlcs.iter_mut().enumerate() { if idx % 2 == 0 { @@ -14423,6 +14917,7 @@ mod tests { }, skimmed_fee_msat: None, blinding_point: None, + forward_span: None, }; let dummy_holding_cell_claim_htlc = |attribution_data| HTLCUpdateAwaitingACK::ClaimHTLC { payment_preimage: PaymentPreimage([42; 32]), @@ -14490,9 +14985,11 @@ mod tests { let mut reader = crate::util::ser::FixedLengthReader::new(&mut s, encoded_chan.len() as u64); let features = channelmanager::provided_channel_type_features(&config); - let decoded_chan = - FundedChannel::read(&mut reader, (&&keys_provider, &&keys_provider, &features)) - .unwrap(); + let decoded_chan = FundedChannel::read( + &mut reader, + (&&keys_provider, &&keys_provider, &features, &&logger), + ) + .unwrap(); assert_eq!(decoded_chan.context.pending_outbound_htlcs, pending_outbound_htlcs); assert_eq!(decoded_chan.context.holding_cell_htlc_updates, holding_cell_htlc_updates); } @@ -14521,7 +15018,7 @@ mod tests { // Test vectors from BOLT 3 Appendices C and F (anchors): let feeest = TestFeeEstimator::new(15000); - let logger : Arc = Arc::new(TestLogger::new()); + let logger : Arc::UserSpan>> = Arc::new(TestLogger::new()); let secp_ctx = Secp256k1::new(); let signer = InMemorySigner::new( @@ -14714,65 +15211,87 @@ mod tests { "02000000000101bef67e4e2fb9ddeeb3461973cd4c62abb35050b1add772995b820b584a488489000000000038b02b80044a010000000000002200202b1b5854183c12d3316565972c4668929d314d81c5dcdbb21cb45fe8a9a8114f4a01000000000000220020e9e86e4823faa62e222ebc858a226636856158f07e69898da3b0d1af0ddb3994c0c62d0000000000220020f3394e1e619b0eca1f91be2fb5ab4dfc59ba5b84ebe014ad1d43a564d012994a508b6a00000000002200204adb4e2f00643db396dd120d4e7dc17625f5f2c11a40d857accc862d6b7dd80e04004830450221008266ac6db5ea71aac3c95d97b0e172ff596844851a3216eb88382a8dddfd33d2022050e240974cfd5d708708b4365574517c18e7ae535ef732a3484d43d0d82be9f701483045022100f89034eba16b2be0e5581f750a0a6309192b75cce0f202f0ee2b4ec0cc394850022076c65dc507fe42276152b7a3d90e961e678adbe966e916ecfe85e64d430e75f301475221023da092f6980e58d2c037173180e9a465476026ee50f96695963e8efe436f54eb21030e9f7b623d2ccc7c9bd44d66d5ce21ce504c0acf6385a132cec6d3c39fa711c152ae3e195220", {}); chan.context.pending_inbound_htlcs.push({ - let mut out = InboundHTLCOutput{ - htlc_id: 0, - amount_msat: 1000000, - cltv_expiry: 500, - payment_hash: PaymentHash([0; 32]), - state: InboundHTLCState::Committed, - }; + let mut out = InboundHTLCOutput::new( + chan.context.channel_id(), + InboundHTLCOutputParams { + htlc_id: 0, + amount_msat: 1000000, + cltv_expiry: 500, + payment_hash: PaymentHash([0; 32]), + state: InboundHTLCState::Committed, + }, + &logger, + ); out.payment_hash.0 = Sha256::hash(&>::from_hex("0000000000000000000000000000000000000000000000000000000000000000").unwrap()).to_byte_array(); out }); chan.context.pending_inbound_htlcs.push({ - let mut out = InboundHTLCOutput{ - htlc_id: 1, - amount_msat: 2000000, - cltv_expiry: 501, - payment_hash: PaymentHash([0; 32]), - state: InboundHTLCState::Committed, - }; + let mut out = InboundHTLCOutput::new( + chan.context.channel_id(), + InboundHTLCOutputParams { + htlc_id: 1, + amount_msat: 2000000, + cltv_expiry: 501, + payment_hash: PaymentHash([0; 32]), + state: InboundHTLCState::Committed, + }, + &logger + ); out.payment_hash.0 = Sha256::hash(&>::from_hex("0101010101010101010101010101010101010101010101010101010101010101").unwrap()).to_byte_array(); out }); chan.context.pending_outbound_htlcs.push({ - let mut out = OutboundHTLCOutput{ - htlc_id: 2, - amount_msat: 2000000, - cltv_expiry: 502, - payment_hash: PaymentHash([0; 32]), - state: OutboundHTLCState::Committed, - source: HTLCSource::dummy(), - skimmed_fee_msat: None, - blinding_point: None, - send_timestamp: None, - }; + let mut out = OutboundHTLCOutput::new( + chan.context.channel_id(), + OutboundHTLCOutputParams { + htlc_id: 2, + amount_msat: 2000000, + cltv_expiry: 502, + payment_hash: PaymentHash([0; 32]), + state: OutboundHTLCState::Committed, + source: HTLCSource::dummy(), + skimmed_fee_msat: None, + blinding_point: None, + send_timestamp: None, + }, + None, + &logger + ); out.payment_hash.0 = Sha256::hash(&>::from_hex("0202020202020202020202020202020202020202020202020202020202020202").unwrap()).to_byte_array(); out }); chan.context.pending_outbound_htlcs.push({ - let mut out = OutboundHTLCOutput{ - htlc_id: 3, - amount_msat: 3000000, - cltv_expiry: 503, - payment_hash: PaymentHash([0; 32]), - state: OutboundHTLCState::Committed, - source: HTLCSource::dummy(), - skimmed_fee_msat: None, - blinding_point: None, - send_timestamp: None, - }; + let mut out = OutboundHTLCOutput::new( + chan.context.channel_id(), + OutboundHTLCOutputParams { + htlc_id: 3, + amount_msat: 3000000, + cltv_expiry: 503, + payment_hash: PaymentHash([0; 32]), + state: OutboundHTLCState::Committed, + source: HTLCSource::dummy(), + skimmed_fee_msat: None, + blinding_point: None, + send_timestamp: None, + }, + None, + &logger, + ); out.payment_hash.0 = Sha256::hash(&>::from_hex("0303030303030303030303030303030303030303030303030303030303030303").unwrap()).to_byte_array(); out }); chan.context.pending_inbound_htlcs.push({ - let mut out = InboundHTLCOutput{ - htlc_id: 4, - amount_msat: 4000000, - cltv_expiry: 504, - payment_hash: PaymentHash([0; 32]), - state: InboundHTLCState::Committed, - }; + let mut out = InboundHTLCOutput::new( + chan.context.channel_id(), + InboundHTLCOutputParams { + htlc_id: 4, + amount_msat: 4000000, + cltv_expiry: 504, + payment_hash: PaymentHash([0; 32]), + state: InboundHTLCState::Committed, + }, + &logger, + ); out.payment_hash.0 = Sha256::hash(&>::from_hex("0404040404040404040404040404040404040404040404040404040404040404").unwrap()).to_byte_array(); out }); @@ -15152,44 +15671,58 @@ mod tests { chan.context.feerate_per_kw = 253; chan.context.pending_inbound_htlcs.clear(); chan.context.pending_inbound_htlcs.push({ - let mut out = InboundHTLCOutput{ - htlc_id: 1, - amount_msat: 2000000, - cltv_expiry: 501, - payment_hash: PaymentHash([0; 32]), - state: InboundHTLCState::Committed, - }; + let mut out = InboundHTLCOutput::new( + chan.context.channel_id(), + InboundHTLCOutputParams { + htlc_id: 1, + amount_msat: 2000000, + cltv_expiry: 501, + payment_hash: PaymentHash([0; 32]), + state: InboundHTLCState::Committed, + }, + &logger, + ); out.payment_hash.0 = Sha256::hash(&>::from_hex("0101010101010101010101010101010101010101010101010101010101010101").unwrap()).to_byte_array(); out }); chan.context.pending_outbound_htlcs.clear(); chan.context.pending_outbound_htlcs.push({ - let mut out = OutboundHTLCOutput{ - htlc_id: 6, - amount_msat: 5000001, - cltv_expiry: 506, - payment_hash: PaymentHash([0; 32]), - state: OutboundHTLCState::Committed, - source: HTLCSource::dummy(), - skimmed_fee_msat: None, - blinding_point: None, - send_timestamp: None, - }; + let mut out = OutboundHTLCOutput::new( + chan.context.channel_id(), + OutboundHTLCOutputParams { + htlc_id: 6, + amount_msat: 5000001, + cltv_expiry: 506, + payment_hash: PaymentHash([0; 32]), + state: OutboundHTLCState::Committed, + source: HTLCSource::dummy(), + skimmed_fee_msat: None, + blinding_point: None, + send_timestamp: None, + }, + None, + &logger, + ); out.payment_hash.0 = Sha256::hash(&>::from_hex("0505050505050505050505050505050505050505050505050505050505050505").unwrap()).to_byte_array(); out }); chan.context.pending_outbound_htlcs.push({ - let mut out = OutboundHTLCOutput{ - htlc_id: 5, - amount_msat: 5000000, - cltv_expiry: 505, - payment_hash: PaymentHash([0; 32]), - state: OutboundHTLCState::Committed, - source: HTLCSource::dummy(), - skimmed_fee_msat: None, - blinding_point: None, - send_timestamp: None, - }; + let mut out = OutboundHTLCOutput::new( + chan.context.channel_id(), + OutboundHTLCOutputParams { + htlc_id: 5, + amount_msat: 5000000, + cltv_expiry: 505, + payment_hash: PaymentHash([0; 32]), + state: OutboundHTLCState::Committed, + source: HTLCSource::dummy(), + skimmed_fee_msat: None, + blinding_point: None, + send_timestamp: None, + }, + None, + &logger, + ); out.payment_hash.0 = Sha256::hash(&>::from_hex("0505050505050505050505050505050505050505050505050505050505050505").unwrap()).to_byte_array(); out }); diff --git a/lightning/src/ln/channel_state.rs b/lightning/src/ln/channel_state.rs index c28b4687631..fbb55794532 100644 --- a/lightning/src/ln/channel_state.rs +++ b/lightning/src/ln/channel_state.rs @@ -37,7 +37,7 @@ use core::ops::Deref; /// through the exchange of commitment_signed and revoke_and_ack messages. /// /// This can be used to inspect what next message an HTLC is waiting for to advance its state. -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, Hash, PartialEq, Eq)] pub enum InboundHTLCStateDetails { /// We have added this HTLC in our commitment transaction by receiving commitment_signed and /// returning revoke_and_ack. We are awaiting the appropriate revoke_and_ack's from the remote @@ -130,7 +130,7 @@ impl_writeable_tlv_based!(InboundHTLCDetails, { /// through the exchange of commitment_signed and revoke_and_ack messages. /// /// This can be used to inspect what next message an HTLC is waiting for to advance its state. -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, Hash, PartialEq, Eq)] pub enum OutboundHTLCStateDetails { /// We are awaiting the appropriate revoke_and_ack's from the remote before the HTLC is added /// on the remote's commitment transaction after update_add_htlc. diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 8bac6c2fa3a..974aac5bb12 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -125,7 +125,7 @@ use crate::types::payment::{PaymentHash, PaymentPreimage, PaymentSecret}; use crate::types::string::UntrustedString; use crate::util::config::{ChannelConfig, ChannelConfigOverrides, ChannelConfigUpdate, UserConfig}; use crate::util::errors::APIError; -use crate::util::logger::{Level, Logger, WithContext}; +use crate::util::logger::{BoxedSpan, Level, Logger, Span, WithContext}; use crate::util::scid_utils::fake_scid; use crate::util::ser::{ BigSize, FixedLengthReader, LengthReadable, MaybeReadable, Readable, ReadableArgs, VecWriter, @@ -381,7 +381,6 @@ impl PendingHTLCRouting { /// Information about an incoming HTLC, including the [`PendingHTLCRouting`] describing where it /// should go next. -#[derive(Clone)] // See FundedChannel::revoke_and_ack for why, tl;dr: Rust bug #[cfg_attr(test, derive(Debug, PartialEq))] pub struct PendingHTLCInfo { /// Further routing details based on whether the HTLC is being forwarded or received. @@ -422,6 +421,23 @@ pub struct PendingHTLCInfo { /// This is used to allow LSPs to take fees as a part of payments, without the sender having to /// shoulder them. pub skimmed_fee_msat: Option, + pub(crate) forward_span: Option, +} + +// See FundedChannel::revoke_and_ack for why, tl;dr: Rust bug +impl Clone for PendingHTLCInfo { + fn clone(&self) -> Self { + Self { + routing: self.routing.clone(), + incoming_shared_secret: self.incoming_shared_secret.clone(), + payment_hash: self.payment_hash.clone(), + incoming_amt_msat: self.incoming_amt_msat.clone(), + outgoing_amt_msat: self.outgoing_amt_msat, + outgoing_cltv_value: self.outgoing_cltv_value, + skimmed_fee_msat: self.skimmed_fee_msat.clone(), + forward_span: None, + } + } } #[derive(Clone)] // See FundedChannel::revoke_and_ack for why, tl;dr: Rust bug @@ -2567,7 +2583,7 @@ pub struct ChannelManager< /// `short_channel_id` here, nor the `channel_id` in `UpdateAddHTLC`! /// /// See `ChannelManager` struct-level documentation for lock order requirements. - decode_update_add_htlcs: Mutex>>, + decode_update_add_htlcs: Mutex>>, /// The sets of payments which are claimable or currently being claimed. See /// [`ClaimablePayments`]' individual field docs for more info. @@ -4686,7 +4702,7 @@ where fn get_pending_htlc_info<'a>( &self, msg: &msgs::UpdateAddHTLC, shared_secret: [u8; 32], decoded_hop: onion_utils::Hop, allow_underpay: bool, - next_packet_pubkey_opt: Option>, + next_packet_pubkey_opt: Option>, forward_span: Option, ) -> Result { match decoded_hop { onion_utils::Hop::Receive { .. } | onion_utils::Hop::BlindedReceive { .. } | @@ -4699,13 +4715,13 @@ where let current_height: u32 = self.best_block.read().unwrap().height; create_recv_pending_htlc_info(decoded_hop, shared_secret, msg.payment_hash, msg.amount_msat, msg.cltv_expiry, None, allow_underpay, msg.skimmed_fee_msat, - current_height) + current_height, forward_span) }, onion_utils::Hop::Forward { .. } | onion_utils::Hop::BlindedForward { .. } => { - create_fwd_pending_htlc_info(msg, decoded_hop, shared_secret, next_packet_pubkey_opt) + create_fwd_pending_htlc_info(msg, decoded_hop, shared_secret, next_packet_pubkey_opt, forward_span) }, onion_utils::Hop::TrampolineForward { .. } | onion_utils::Hop::TrampolineBlindedForward { .. } => { - create_fwd_pending_htlc_info(msg, decoded_hop, shared_secret, next_packet_pubkey_opt) + create_fwd_pending_htlc_info(msg, decoded_hop, shared_secret, next_packet_pubkey_opt, forward_span) }, } } @@ -4930,6 +4946,7 @@ where onion_packet, None, &self.fee_estimator, + None, &&logger, ); match break_channel_entry!(self, peer_state, send_res, chan_entry) { @@ -6228,7 +6245,7 @@ where let mut htlc_forwards = Vec::new(); let mut htlc_fails = Vec::new(); - for update_add_htlc in &update_add_htlcs { + for (update_add_htlc, _waiting_on_forward_span, forward_span) in update_add_htlcs { let (next_hop, next_packet_details_opt) = match decode_incoming_update_add_htlc_onion( &update_add_htlc, @@ -6262,7 +6279,11 @@ where &chan.context, Some(update_add_htlc.payment_hash), ); - chan.can_accept_incoming_htlc(update_add_htlc, &self.fee_estimator, &logger) + chan.can_accept_incoming_htlc( + &update_add_htlc, + &self.fee_estimator, + &logger, + ) }, ) { Some(Ok(_)) => {}, @@ -6308,6 +6329,7 @@ where next_hop, incoming_accept_underpaying_htlcs, next_packet_details_opt.map(|d| d.next_packet_pubkey), + Some(forward_span), ) { Ok(info) => htlc_forwards.push((info, update_add_htlc.htlc_id)), Err(inbound_err) => { @@ -6499,6 +6521,7 @@ where payment_hash, outgoing_amt_msat, outgoing_cltv_value, + forward_span, .. }, }) => { @@ -6607,6 +6630,7 @@ where false, None, current_height, + forward_span, ); match create_res { Ok(info) => phantom_receives.push(( @@ -6702,7 +6726,7 @@ where let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); let peer_state = &mut *peer_state_lock; let mut draining_pending_forwards = pending_forwards.drain(..); - while let Some(forward_info) = draining_pending_forwards.next() { + while let Some(mut forward_info) = draining_pending_forwards.next() { let queue_fail_htlc_res = match forward_info { HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo { prev_short_channel_id, @@ -6725,6 +6749,7 @@ where .. }, skimmed_fee_msat, + ref mut forward_span, .. }, }) => { @@ -6816,6 +6841,8 @@ where }; log_trace!(logger, "Forwarding HTLC from SCID {} with payment_hash {} and next hop SCID {} over {} channel {} with corresponding peer {}", prev_short_channel_id, &payment_hash, short_chan_id, channel_description, optimal_channel.context.channel_id(), &counterparty_node_id); + let mut swapped_span = None; + mem::swap(forward_span, &mut swapped_span); if let Err((reason, msg)) = optimal_channel.queue_add_htlc( outgoing_amt_msat, payment_hash, @@ -6825,6 +6852,7 @@ where skimmed_fee_msat, next_blinding_point, &self.fee_estimator, + swapped_span, &&logger, ) { log_trace!( @@ -8759,11 +8787,11 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ fn handle_channel_resumption(&self, pending_msg_events: &mut Vec, channel: &mut FundedChannel, raa: Option, commitment_update: Option, order: RAACommitmentOrder, - pending_forwards: Vec<(PendingHTLCInfo, u64)>, pending_update_adds: Vec, + pending_forwards: Vec<(PendingHTLCInfo, u64)>, pending_update_adds: Vec<(msgs::UpdateAddHTLC, BoxedSpan, BoxedSpan)>, funding_broadcastable: Option, channel_ready: Option, announcement_sigs: Option, tx_signatures: Option, tx_abort: Option, - ) -> (Option<(u64, Option, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)>, Option<(u64, Vec)>) { + ) -> (Option<(u64, Option, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)>, Option<(u64, Vec<(msgs::UpdateAddHTLC, BoxedSpan, BoxedSpan)>)>) { let logger = WithChannelContext::from(&self.logger, &channel.context, None); log_trace!(logger, "Handling channel resumption for channel {} with {} RAA, {} commitment update, {} pending forwards, {} pending update_add_htlcs, {}broadcasting funding, {} channel ready, {} announcement, {} tx_signatures, {} tx_abort", &channel.context.channel_id(), @@ -10118,7 +10146,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan_entry) => { if let Some(chan) = chan_entry.get_mut().as_funded_mut() { - try_channel_entry!(self, peer_state, chan.update_add_htlc(&msg, &self.fee_estimator), chan_entry); + try_channel_entry!(self, peer_state, chan.update_add_htlc(&msg, &self.fee_estimator, &self.logger), chan_entry); } else { return try_channel_entry!(self, peer_state, Err(ChannelError::close( "Got an update_add_htlc message for an unfunded channel!".into())), chan_entry); @@ -10151,9 +10179,9 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan_entry) => { if let Some(chan) = chan_entry.get_mut().as_funded_mut() { - let res = try_channel_entry!(self, peer_state, chan.update_fulfill_htlc(&msg), chan_entry); + let logger = WithChannelContext::from(&self.logger, &chan.context, None); + let res = try_channel_entry!(self, peer_state, chan.update_fulfill_htlc(&msg, &&logger), chan_entry); if let HTLCSource::PreviousHopData(prev_hop) = &res.0 { - let logger = WithChannelContext::from(&self.logger, &chan.context, None); log_trace!(logger, "Holding the next revoke_and_ack from {} until the preimage is durably persisted in the inbound edge's ChannelMonitor", msg.channel_id); @@ -10211,7 +10239,8 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan_entry) => { if let Some(chan) = chan_entry.get_mut().as_funded_mut() { - try_channel_entry!(self, peer_state, chan.update_fail_htlc(&msg, HTLCFailReason::from_msg(msg)), chan_entry); + let logger = WithChannelContext::from(&self.logger, &chan.context, None); + try_channel_entry!(self, peer_state, chan.update_fail_htlc(&msg, HTLCFailReason::from_msg(msg), &&logger), chan_entry); } else { return try_channel_entry!(self, peer_state, Err(ChannelError::close( "Got an update_fail_htlc message for an unfunded channel!".into())), chan_entry); @@ -10241,7 +10270,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ try_channel_entry!(self, peer_state, Err(chan_err), chan_entry); } if let Some(chan) = chan_entry.get_mut().as_funded_mut() { - try_channel_entry!(self, peer_state, chan.update_fail_malformed_htlc(&msg, HTLCFailReason::reason(msg.failure_code.into(), msg.sha256_of_onion.to_vec())), chan_entry); + try_channel_entry!(self, peer_state, chan.update_fail_malformed_htlc(&msg, HTLCFailReason::reason(msg.failure_code.into(), msg.sha256_of_onion.to_vec()), &self.logger), chan_entry); } else { return try_channel_entry!(self, peer_state, Err(ChannelError::close( "Got an update_fail_malformed_htlc message for an unfunded channel!".into())), chan_entry); @@ -10330,7 +10359,9 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ } } - fn push_decode_update_add_htlcs(&self, mut update_add_htlcs: (u64, Vec)) { + fn push_decode_update_add_htlcs( + &self, mut update_add_htlcs: (u64, Vec<(msgs::UpdateAddHTLC, BoxedSpan, BoxedSpan)>), + ) { let mut decode_update_add_htlcs = self.decode_update_add_htlcs.lock().unwrap(); let scid = update_add_htlcs.0; match decode_update_add_htlcs.entry(scid) { @@ -14678,6 +14709,7 @@ impl_writeable_tlv_based!(PendingHTLCInfo, { (8, outgoing_cltv_value, required), (9, incoming_amt_msat, option), (10, skimmed_fee_msat, option), + (_unused, forward_span, (static_value, None)) }); impl Writeable for HTLCFailureMsg { @@ -15106,10 +15138,14 @@ where } } - let mut decode_update_add_htlcs_opt = None; let decode_update_add_htlcs = self.decode_update_add_htlcs.lock().unwrap(); + let mut decode_update_add_htlcs_opt = None; if !decode_update_add_htlcs.is_empty() { - decode_update_add_htlcs_opt = Some(decode_update_add_htlcs); + let mut without_spans = new_hash_map(); + for (scid, htlcs) in decode_update_add_htlcs.iter() { + without_spans.insert(scid, htlcs.iter().map(|(msg, _span1, _span2)| msg).collect::>()); + } + decode_update_add_htlcs_opt = Some(without_spans); } let claimable_payments = self.claimable_payments.lock().unwrap(); @@ -15586,6 +15622,7 @@ where &args.entropy_source, &args.signer_provider, &provided_channel_type_features(&args.default_config), + &args.logger, ), )?; let logger = WithChannelContext::from(&args.logger, &channel.context, None); @@ -16790,6 +16827,26 @@ where args.message_router, ) .with_async_payments_offers_cache(async_receive_offer_cache); + let decode_update_add_htlcs = decode_update_add_htlcs + .into_iter() + .map(|(scid, htlcs)| { + ( + scid, + htlcs + .into_iter() + .map(|htlc| { + let forward_span = + BoxedSpan::new(args.logger.start(Span::Forward, None)); + let waiting_on_forward_span = BoxedSpan::new(args.logger.start( + Span::WaitingOnForward, + forward_span.as_user_span_ref::(), + )); + (htlc, waiting_on_forward_span, forward_span) + }) + .collect::>(), + ) + }) + .collect::>(); let channel_manager = ChannelManager { chain_hash, @@ -18110,7 +18167,7 @@ mod tests { if let Err(crate::ln::channelmanager::InboundHTLCErr { reason, .. }) = create_recv_pending_htlc_info(hop_data, [0; 32], PaymentHash([0; 32]), sender_intended_amt_msat - extra_fee_msat - 1, 42, None, true, Some(extra_fee_msat), - current_height) + current_height, None) { assert_eq!(reason, LocalHTLCFailureReason::FinalIncorrectHTLCAmount); } else { panic!(); } @@ -18133,7 +18190,7 @@ mod tests { let current_height: u32 = node[0].node.best_block.read().unwrap().height; assert!(create_recv_pending_htlc_info(hop_data, [0; 32], PaymentHash([0; 32]), sender_intended_amt_msat - extra_fee_msat, 42, None, true, Some(extra_fee_msat), - current_height).is_ok()); + current_height, None).is_ok()); } #[test] @@ -18158,7 +18215,7 @@ mod tests { custom_tlvs: Vec::new(), }, shared_secret: SharedSecret::from_bytes([0; 32]), - }, [0; 32], PaymentHash([0; 32]), 100, TEST_FINAL_CLTV + 1, None, true, None, current_height); + }, [0; 32], PaymentHash([0; 32]), 100, TEST_FINAL_CLTV + 1, None, true, None, current_height, None); // Should not return an error as this condition: // https://github.com/lightning/bolts/blob/4dcc377209509b13cf89a4b91fde7d478f5b46d8/04-onion-routing.md?plain=1#L334 diff --git a/lightning/src/ln/functional_tests.rs b/lightning/src/ln/functional_tests.rs index f6c92d77fbf..5b05505a570 100644 --- a/lightning/src/ln/functional_tests.rs +++ b/lightning/src/ln/functional_tests.rs @@ -33,6 +33,7 @@ use crate::ln::channel::{ OutboundV1Channel, COINBASE_MATURITY, DISCONNECT_PEER_AWAITING_RESPONSE_TICKS, MIN_CHAN_DUST_LIMIT_SATOSHIS, }; +use crate::ln::channel_state::{InboundHTLCStateDetails, OutboundHTLCStateDetails}; use crate::ln::channelmanager::{ self, PaymentId, RAACommitmentOrder, RecipientOnionFields, BREAKDOWN_TIMEOUT, DISABLE_GOSSIP_TICKS, ENABLE_GOSSIP_TICKS, MIN_CLTV_EXPIRY_DELTA, @@ -58,9 +59,10 @@ use crate::util::config::{ UserConfig, }; use crate::util::errors::APIError; +use crate::util::logger::Span; use crate::util::ser::{ReadableArgs, Writeable}; use crate::util::test_channel_signer::TestChannelSigner; -use crate::util::test_utils::{self, TestLogger, WatchtowerPersister}; +use crate::util::test_utils::{self, TestLogger, TestSpanBoundary, WatchtowerPersister}; use bitcoin::constants::ChainHash; use bitcoin::hash_types::BlockHash; @@ -11717,3 +11719,200 @@ pub fn test_funding_signed_event() { nodes[0].node.get_and_clear_pending_msg_events(); nodes[1].node.get_and_clear_pending_msg_events(); } + +#[xtest(feature = "_externalize_tests")] +pub fn test_payment_traces() { + let chanmon_cfgs = create_chanmon_cfgs(3); + let node_cfgs = create_node_cfgs(3, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]); + let mut nodes = create_network(3, &node_cfgs, &node_chanmgrs); + let channel_id_1 = create_announced_chan_between_nodes(&nodes, 0, 1).2; + let channel_id_2 = create_announced_chan_between_nodes(&nodes, 1, 2).2; + send_payment(&nodes[0], &[&nodes[1], &nodes[2]], 1_000_000); + + assert_eq!( + node_cfgs[1].logger.span_boundaries.lock().unwrap().as_ref(), + vec![ + TestSpanBoundary::Start( + Span::InboundHTLC { channel_id: channel_id_1, htlc_id: 0 }, + None + ), + TestSpanBoundary::Start( + Span::InboundHTLCState { state: None }, + Some(Span::InboundHTLC { channel_id: channel_id_1, htlc_id: 0 }), + ), + TestSpanBoundary::Start( + Span::WaitingOnPeer, + Some(Span::InboundHTLCState { state: None }), + ), + TestSpanBoundary::End(Span::WaitingOnPeer), + TestSpanBoundary::End(Span::InboundHTLCState { state: None }), + TestSpanBoundary::Start( + Span::InboundHTLCState { + state: Some(InboundHTLCStateDetails::AwaitingRemoteRevokeToAdd) + }, + Some(Span::InboundHTLC { channel_id: channel_id_1, htlc_id: 0 }), + ), + TestSpanBoundary::End(Span::InboundHTLCState { + state: Some(InboundHTLCStateDetails::AwaitingRemoteRevokeToAdd) + }), + TestSpanBoundary::Start( + Span::InboundHTLCState { + state: Some(InboundHTLCStateDetails::AwaitingRemoteRevokeToAdd) + }, + Some(Span::InboundHTLC { channel_id: channel_id_1, htlc_id: 0 }), + ), + TestSpanBoundary::Start( + Span::WaitingOnMonitorPersist, + Some(Span::InboundHTLCState { + state: Some(InboundHTLCStateDetails::AwaitingRemoteRevokeToAdd) + }), + ), + TestSpanBoundary::End(Span::WaitingOnMonitorPersist), + TestSpanBoundary::Start( + Span::WaitingOnPeer, + Some(Span::InboundHTLCState { + state: Some(InboundHTLCStateDetails::AwaitingRemoteRevokeToAdd) + }), + ), + TestSpanBoundary::End(Span::WaitingOnPeer), + TestSpanBoundary::End(Span::InboundHTLCState { + state: Some(InboundHTLCStateDetails::AwaitingRemoteRevokeToAdd) + }), + TestSpanBoundary::Start( + Span::InboundHTLCState { state: Some(InboundHTLCStateDetails::Committed) }, + Some(Span::InboundHTLC { channel_id: channel_id_1, htlc_id: 0 }) + ), + TestSpanBoundary::Start( + Span::Forward, + Some(Span::InboundHTLC { channel_id: channel_id_1, htlc_id: 0 }) + ), + TestSpanBoundary::Start(Span::WaitingOnMonitorPersist, Some(Span::Forward),), + TestSpanBoundary::End(Span::WaitingOnMonitorPersist,), + TestSpanBoundary::Start(Span::WaitingOnForward, Some(Span::Forward),), + TestSpanBoundary::End(Span::WaitingOnForward,), + TestSpanBoundary::Start( + Span::OutboundHTLC { channel_id: channel_id_2, htlc_id: 0 }, + Some(Span::Forward) + ), + TestSpanBoundary::Start( + Span::OutboundHTLCState { + state: OutboundHTLCStateDetails::AwaitingRemoteRevokeToAdd + }, + Some(Span::OutboundHTLC { channel_id: channel_id_2, htlc_id: 0 }), + ), + TestSpanBoundary::Start( + Span::WaitingOnMonitorPersist, + Some(Span::OutboundHTLCState { + state: OutboundHTLCStateDetails::AwaitingRemoteRevokeToAdd + }), + ), + TestSpanBoundary::End(Span::WaitingOnMonitorPersist,), + TestSpanBoundary::Start( + Span::WaitingOnPeer, + Some(Span::OutboundHTLCState { + state: OutboundHTLCStateDetails::AwaitingRemoteRevokeToAdd + }), + ), + TestSpanBoundary::End(Span::WaitingOnPeer,), + TestSpanBoundary::End(Span::OutboundHTLCState { + state: OutboundHTLCStateDetails::AwaitingRemoteRevokeToAdd + }), + TestSpanBoundary::Start( + Span::OutboundHTLCState { state: OutboundHTLCStateDetails::Committed }, + Some(Span::OutboundHTLC { channel_id: channel_id_2, htlc_id: 0 }), + ), + TestSpanBoundary::Start( + Span::WaitingOnPeer, + Some(Span::OutboundHTLCState { state: OutboundHTLCStateDetails::Committed }), + ), + TestSpanBoundary::End(Span::WaitingOnPeer), + TestSpanBoundary::Start( + Span::WaitingOnMonitorPersist, + Some(Span::OutboundHTLCState { state: OutboundHTLCStateDetails::Committed }), + ), + TestSpanBoundary::End(Span::WaitingOnMonitorPersist), + TestSpanBoundary::Start( + Span::WaitingOnPeer, + Some(Span::OutboundHTLCState { state: OutboundHTLCStateDetails::Committed }), + ), + TestSpanBoundary::End(Span::WaitingOnPeer), + TestSpanBoundary::End(Span::OutboundHTLCState { + state: OutboundHTLCStateDetails::Committed + }), + TestSpanBoundary::Start( + Span::OutboundHTLCState { state: OutboundHTLCStateDetails::Committed }, + Some(Span::OutboundHTLC { channel_id: channel_id_2, htlc_id: 0 }), + ), + TestSpanBoundary::Start( + Span::WaitingOnPeer, + Some(Span::OutboundHTLCState { state: OutboundHTLCStateDetails::Committed }), + ), + TestSpanBoundary::End(Span::InboundHTLCState { + state: Some(InboundHTLCStateDetails::Committed) + }), + TestSpanBoundary::Start( + Span::InboundHTLCState { + state: Some(InboundHTLCStateDetails::AwaitingRemoteRevokeToRemoveFulfill) + }, + Some(Span::InboundHTLC { channel_id: channel_id_1, htlc_id: 0 }), + ), + TestSpanBoundary::Start( + Span::WaitingOnMonitorPersist, + Some(Span::InboundHTLCState { + state: Some(InboundHTLCStateDetails::AwaitingRemoteRevokeToRemoveFulfill) + }), + ), + TestSpanBoundary::End(Span::WaitingOnMonitorPersist), + TestSpanBoundary::Start( + Span::WaitingOnPeer, + Some(Span::InboundHTLCState { + state: Some(InboundHTLCStateDetails::AwaitingRemoteRevokeToRemoveFulfill) + }), + ), + TestSpanBoundary::End(Span::WaitingOnPeer), + TestSpanBoundary::End(Span::OutboundHTLCState { + state: OutboundHTLCStateDetails::Committed + }), + TestSpanBoundary::Start( + Span::OutboundHTLCState { + state: OutboundHTLCStateDetails::AwaitingRemoteRevokeToRemoveSuccess + }, + Some(Span::OutboundHTLC { channel_id: channel_id_2, htlc_id: 0 }), + ), + TestSpanBoundary::End(Span::OutboundHTLCState { + state: OutboundHTLCStateDetails::AwaitingRemoteRevokeToRemoveSuccess + }), + TestSpanBoundary::Start( + Span::OutboundHTLCState { + state: OutboundHTLCStateDetails::AwaitingRemoteRevokeToRemoveSuccess + }, + Some(Span::OutboundHTLC { channel_id: channel_id_2, htlc_id: 0 }), + ), + TestSpanBoundary::Start( + Span::WaitingOnMonitorPersist, + Some(Span::OutboundHTLCState { + state: OutboundHTLCStateDetails::AwaitingRemoteRevokeToRemoveSuccess + }), + ), + TestSpanBoundary::End(Span::WaitingOnMonitorPersist), + TestSpanBoundary::Start( + Span::WaitingOnPeer, + Some(Span::OutboundHTLCState { + state: OutboundHTLCStateDetails::AwaitingRemoteRevokeToRemoveSuccess + }), + ), + TestSpanBoundary::End(Span::WaitingOnPeer), + TestSpanBoundary::End(Span::OutboundHTLCState { + state: OutboundHTLCStateDetails::AwaitingRemoteRevokeToRemoveSuccess + }), + TestSpanBoundary::End(Span::OutboundHTLC { channel_id: channel_id_2, htlc_id: 0 }), + TestSpanBoundary::End(Span::Forward), + TestSpanBoundary::End(Span::WaitingOnPeer), + TestSpanBoundary::End(Span::InboundHTLCState { + state: Some(InboundHTLCStateDetails::AwaitingRemoteRevokeToRemoveFulfill) + }), + TestSpanBoundary::End(Span::InboundHTLC { channel_id: channel_id_1, htlc_id: 0 }) + ] + ); +} diff --git a/lightning/src/ln/invoice_utils.rs b/lightning/src/ln/invoice_utils.rs index 509cb2e3b7b..fdb0a732b15 100644 --- a/lightning/src/ln/invoice_utils.rs +++ b/lightning/src/ln/invoice_utils.rs @@ -16,7 +16,7 @@ use crate::routing::gossip::RoutingFees; use crate::routing::router::{RouteHint, RouteHintHop}; use crate::sign::{EntropySource, NodeSigner, Recipient}; use crate::types::payment::PaymentHash; -use crate::util::logger::{Logger, Record}; +use crate::util::logger::{Logger, Record, Span}; use alloc::collections::{btree_map, BTreeMap}; use bitcoin::hashes::Hash; use bitcoin::secp256k1::PublicKey; @@ -604,11 +604,17 @@ impl<'a, 'b, L: Deref> Logger for WithChannelDetails<'a, 'b, L> where L::Target: Logger, { + type UserSpan = <::Target as Logger>::UserSpan; + fn log(&self, mut record: Record) { record.peer_id = Some(self.details.counterparty.node_id); record.channel_id = Some(self.details.channel_id); self.logger.log(record) } + + fn start(&self, span: Span, parent: Option<&Self::UserSpan>) -> Self::UserSpan { + self.logger.start(span, parent) + } } impl<'a, 'b, L: Deref> WithChannelDetails<'a, 'b, L> diff --git a/lightning/src/ln/onion_payment.rs b/lightning/src/ln/onion_payment.rs index 79952faca9a..419617f6ebd 100644 --- a/lightning/src/ln/onion_payment.rs +++ b/lightning/src/ln/onion_payment.rs @@ -21,7 +21,7 @@ use crate::ln::onion_utils::{HTLCFailReason, LocalHTLCFailureReason, ONION_DATA_ use crate::sign::{NodeSigner, Recipient}; use crate::types::features::BlindedHopFeatures; use crate::types::payment::PaymentHash; -use crate::util::logger::Logger; +use crate::util::logger::{BoxedSpan, Logger}; #[allow(unused_imports)] use crate::prelude::*; @@ -93,7 +93,7 @@ enum RoutingInfo { #[rustfmt::skip] pub(super) fn create_fwd_pending_htlc_info( msg: &msgs::UpdateAddHTLC, hop_data: onion_utils::Hop, shared_secret: [u8; 32], - next_packet_pubkey_opt: Option> + next_packet_pubkey_opt: Option>, forward_span: Option, ) -> Result { debug_assert!(next_packet_pubkey_opt.is_some()); @@ -240,6 +240,7 @@ pub(super) fn create_fwd_pending_htlc_info( outgoing_amt_msat: amt_to_forward, outgoing_cltv_value, skimmed_fee_msat: None, + forward_span, }) } @@ -247,7 +248,7 @@ pub(super) fn create_fwd_pending_htlc_info( pub(super) fn create_recv_pending_htlc_info( hop_data: onion_utils::Hop, shared_secret: [u8; 32], payment_hash: PaymentHash, amt_msat: u64, cltv_expiry: u32, phantom_shared_secret: Option<[u8; 32]>, allow_underpay: bool, - counterparty_skimmed_fee_msat: Option, current_height: u32 + counterparty_skimmed_fee_msat: Option, current_height: u32, forward_span: Option, ) -> Result { let ( payment_data, keysend_preimage, custom_tlvs, onion_amt_msat, onion_cltv_expiry, @@ -415,6 +416,7 @@ pub(super) fn create_recv_pending_htlc_info( outgoing_amt_msat: onion_amt_msat, outgoing_cltv_value: onion_cltv_expiry, skimmed_fee_msat: counterparty_skimmed_fee_msat, + forward_span, }) } @@ -473,13 +475,13 @@ where // TODO: If this is potentially a phantom payment we should decode the phantom payment // onion here and check it. - create_fwd_pending_htlc_info(msg, hop, shared_secret.secret_bytes(), Some(next_packet_pubkey))? + create_fwd_pending_htlc_info(msg, hop, shared_secret.secret_bytes(), Some(next_packet_pubkey), None)? }, _ => { let shared_secret = hop.shared_secret().secret_bytes(); create_recv_pending_htlc_info( hop, shared_secret, msg.payment_hash, msg.amount_msat, msg.cltv_expiry, - None, allow_skimmed_fees, msg.skimmed_fee_msat, cur_height, + None, allow_skimmed_fees, msg.skimmed_fee_msat, cur_height, None )? } }) diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 98e54eec925..9b3b1f01695 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -47,7 +47,7 @@ use crate::sign::{NodeSigner, Recipient}; use crate::types::features::{InitFeatures, NodeFeatures}; use crate::types::string::PrintableString; use crate::util::atomic_counter::AtomicCounter; -use crate::util::logger::{Level, Logger, WithContext}; +use crate::util::logger::{BoxedSpan, Level, Logger, Span, WithContext}; use crate::util::ser::{VecWriter, Writeable, Writer}; #[allow(unused_imports)] @@ -792,6 +792,7 @@ struct Peer { msgs_sent_since_pong: usize, awaiting_pong_timer_tick_intervals: i64, + ping_pong_span: Option, received_message_since_timer_tick: bool, sent_gossip_timestamp_filter: bool, @@ -1452,6 +1453,7 @@ where msgs_sent_since_pong: 0, awaiting_pong_timer_tick_intervals: 0, + ping_pong_span: None, received_message_since_timer_tick: false, sent_gossip_timestamp_filter: false, @@ -1512,6 +1514,7 @@ where msgs_sent_since_pong: 0, awaiting_pong_timer_tick_intervals: 0, + ping_pong_span: None, received_message_since_timer_tick: false, sent_gossip_timestamp_filter: false, @@ -2441,6 +2444,7 @@ where let mut peer_lock = peer_mutex.lock().unwrap(); peer_lock.awaiting_pong_timer_tick_intervals = 0; peer_lock.msgs_sent_since_pong = 0; + peer_lock.ping_pong_span = None; }, // Channel messages: @@ -3501,6 +3505,9 @@ where if peer.awaiting_pong_timer_tick_intervals == 0 { peer.awaiting_pong_timer_tick_intervals = -1; let ping = msgs::Ping { ponglen: 0, byteslen: 64 }; + let ping_pong_span = + self.logger.start(Span::PingPong { node_id: peer.their_node_id.unwrap().0 }, None); + peer.ping_pong_span = Some(BoxedSpan::new(ping_pong_span)); self.enqueue_message(peer, &ping); } } @@ -3572,6 +3579,10 @@ where peer.awaiting_pong_timer_tick_intervals = 1; let ping = msgs::Ping { ponglen: 0, byteslen: 64 }; + let ping_pong_span = self + .logger + .start(Span::PingPong { node_id: peer.their_node_id.unwrap().0 }, None); + peer.ping_pong_span = Some(BoxedSpan::new(ping_pong_span)); self.enqueue_message(&mut *peer, &ping); break; } diff --git a/lightning/src/onion_message/messenger.rs b/lightning/src/onion_message/messenger.rs index 38c8cd304d9..f57523788e5 100644 --- a/lightning/src/onion_message/messenger.rs +++ b/lightning/src/onion_message/messenger.rs @@ -196,13 +196,15 @@ where /// # use lightning::onion_message::messenger::{Destination, MessageRouter, MessageSendInstructions, OnionMessagePath, OnionMessenger}; /// # use lightning::onion_message::packet::OnionMessageContents; /// # use lightning::sign::{NodeSigner, ReceiveAuthKey}; -/// # use lightning::util::logger::{Logger, Record}; +/// # use lightning::util::logger::{Logger, Record, Span}; /// # use lightning::util::ser::{Writeable, Writer}; /// # use lightning::io; /// # use std::sync::Arc; /// # struct FakeLogger; /// # impl Logger for FakeLogger { +/// # type UserSpan = (); /// # fn log(&self, record: Record) { println!("{:?}" , record); } +/// # fn start(&self, _span: Span, parent: Option<&()>) -> () {} /// # } /// # struct FakeMessageRouter {} /// # impl MessageRouter for FakeMessageRouter { diff --git a/lightning/src/routing/router.rs b/lightning/src/routing/router.rs index d56d5747e09..b4110b4d609 100644 --- a/lightning/src/routing/router.rs +++ b/lightning/src/routing/router.rs @@ -9424,14 +9424,16 @@ pub mod benches { use crate::routing::scoring::{ScoreLookUp, ScoreUpdate}; use crate::types::features::Bolt11InvoiceFeatures; use crate::util::config::UserConfig; - use crate::util::logger::{Logger, Record}; + use crate::util::logger::{Logger, Record, Span}; use crate::util::test_utils::TestLogger; use criterion::Criterion; struct DummyLogger {} impl Logger for DummyLogger { + type UserSpan = (); fn log(&self, _record: Record) {} + fn start(&self, _span: Span, _parent: Option<&()>) -> () {} } #[rustfmt::skip] diff --git a/lightning/src/routing/scoring.rs b/lightning/src/routing/scoring.rs index 02efb88dbb9..13c9e216f28 100644 --- a/lightning/src/routing/scoring.rs +++ b/lightning/src/routing/scoring.rs @@ -21,12 +21,14 @@ //! # use lightning::routing::router::{RouteParameters, find_route}; //! # use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringFeeParameters, ProbabilisticScoringDecayParameters}; //! # use lightning::sign::KeysManager; -//! # use lightning::util::logger::{Logger, Record}; +//! # use lightning::util::logger::{Logger, Record, Span}; //! # use bitcoin::secp256k1::PublicKey; //! # //! # struct FakeLogger {}; //! # impl Logger for FakeLogger { +//! # type UserSpan = (); //! # fn log(&self, record: Record) { unimplemented!() } +//! # fn start(&self, _span: Span, parent: Option<&()>) -> () {} //! # } //! # fn find_scored_route(payer: PublicKey, route_params: RouteParameters, network_graph: NetworkGraph<&FakeLogger>) { //! # let logger = FakeLogger {}; diff --git a/lightning/src/util/logger.rs b/lightning/src/util/logger.rs index 283d3158144..8fb8f0aefb5 100644 --- a/lightning/src/util/logger.rs +++ b/lightning/src/util/logger.rs @@ -15,13 +15,14 @@ use bitcoin::secp256k1::PublicKey; +use core::any::Any; use core::cmp; use core::fmt; use core::ops::Deref; +use crate::ln::channel_state::{InboundHTLCStateDetails, OutboundHTLCStateDetails}; use crate::ln::types::ChannelId; -#[cfg(c_bindings)] -use crate::prelude::*; // Needed for String +use crate::prelude::*; use crate::types::payment::PaymentHash; static LOG_LEVEL_NAMES: [&'static str; 6] = ["GOSSIP", "TRACE", "DEBUG", "INFO", "WARN", "ERROR"]; @@ -160,8 +161,109 @@ impl_record!(, 'a); /// A trait encapsulating the operations required of a logger. pub trait Logger { + /// The user-defined type returned to capture a span with its lifetime. + #[cfg(feature = "std")] + type UserSpan: 'static + Send; + /// The user-defined type returned to capture a span with its lifetime. + #[cfg(not(feature = "std"))] + type UserSpan: 'static; + /// Logs the [`Record`]. fn log(&self, record: Record); + /// Indicates the start of a span of computation. + /// The returned object will be dropped when the span ends. + fn start(&self, span: Span, parent: Option<&Self::UserSpan>) -> Self::UserSpan; +} + +/// A span of computation in time. +#[derive(Debug, Hash, PartialEq, Eq, Clone)] +pub enum Span { + /// Span representing the lifetime of an inbound HTLC. + InboundHTLC { + /// Channel ID. + channel_id: ChannelId, + /// HTLC ID. + htlc_id: u64, + }, + /// Span representing the lifetime of an outbound HTLC. + OutboundHTLC { + /// Channel ID. + channel_id: ChannelId, + /// HTLC ID. + htlc_id: u64, + }, + /// Span representing the downstream forward of an incoming HTLC. + Forward, + /// Span representing an inbound HTLC state in the commitment state machine. + InboundHTLCState { + /// The state. + state: Option, + }, + /// Span representing an outbound HTLC state in the commitment state machine. + OutboundHTLCState { + /// The state. + state: OutboundHTLCStateDetails, + }, + /// Span representing the wait time until a peer responds. + WaitingOnPeer, + /// Span representing the wait time until a channel monitor persists. + WaitingOnMonitorPersist, + /// Span representing the wait time until async signing completes. + WaitingOnAsyncSigning, + /// Span representing the wait time until an HTLC gets forwarded. + WaitingOnForward, + /// Span representing sending an outbound Ping and receiving an inbound Pong. + PingPong { + /// The node id of the counterparty. + node_id: PublicKey, + }, +} + +#[cfg(feature = "std")] +pub(crate) struct BoxedSpan(Box); +#[cfg(not(feature = "std"))] +pub(crate) struct BoxedSpan(Box); + +impl BoxedSpan { + #[cfg(feature = "std")] + pub fn new(s: S) -> Self { + BoxedSpan(Box::new(s)) + } + #[cfg(not(feature = "std"))] + pub fn new(s: S) -> Self { + BoxedSpan(Box::new(s)) + } + + pub fn as_user_span_ref(&self) -> Option<&<::Target as Logger>::UserSpan> + where + L::Target: Logger, + { + self.0.downcast_ref() + } +} + +// A set of implementations for tests ignoring spans. +// Note that cloning just creates a dummy span. + +#[cfg(test)] +impl core::fmt::Debug for BoxedSpan { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + write!(f, "boxed span") + } +} + +#[cfg(test)] +impl PartialEq for BoxedSpan { + fn eq(&self, _other: &Self) -> bool { + true + } +} + +#[cfg(test)] +impl Clone for BoxedSpan { + fn clone(&self) -> Self { + BoxedSpan::new(()) + } } /// Adds relevant context to a [`Record`] before passing it to the wrapped [`Logger`]. @@ -186,6 +288,8 @@ impl<'a, L: Deref> Logger for WithContext<'a, L> where L::Target: Logger, { + type UserSpan = <::Target as Logger>::UserSpan; + fn log(&self, mut record: Record) { if self.peer_id.is_some() { record.peer_id = self.peer_id @@ -198,6 +302,10 @@ where } self.logger.log(record) } + + fn start(&self, span: Span, parent: Option<&Self::UserSpan>) -> Self::UserSpan { + self.logger.start(span, parent) + } } impl<'a, L: Deref> WithContext<'a, L> @@ -278,11 +386,11 @@ mod tests { } struct WrapperLog { - logger: Arc, + logger: Arc::UserSpan>>, } impl WrapperLog { - fn new(logger: Arc) -> WrapperLog { + fn new(logger: Arc::UserSpan>>) -> WrapperLog { WrapperLog { logger } } @@ -299,7 +407,7 @@ mod tests { #[test] fn test_logging_macros() { let logger = TestLogger::new(); - let logger: Arc = Arc::new(logger); + let logger: Arc::UserSpan>> = Arc::new(logger); let wrapper = WrapperLog::new(Arc::clone(&logger)); wrapper.call_macros(); } diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 776d876bbc8..3f8fd2bc0b8 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -54,7 +54,7 @@ use crate::util::config::UserConfig; use crate::util::dyn_signer::{ DynKeysInterface, DynKeysInterfaceTrait, DynPhantomKeysInterface, DynSigner, }; -use crate::util::logger::{Logger, Record}; +use crate::util::logger::{Logger, Record, Span}; #[cfg(feature = "std")] use crate::util::mut_global::MutGlobal; use crate::util::persist::{KVStoreSync, MonitorName}; @@ -1445,10 +1445,48 @@ impl BaseMessageHandler for TestRoutingMessageHandler { } } +pub struct TestUserSpan { + span: Span, + span_boundaries: Arc>>, +} + +impl TestUserSpan { + fn new( + span: Span, parent_span: Option<&TestUserSpan>, + span_boundaries: Arc>>, + ) -> Self { + { + let mut span_boundaries_guard = span_boundaries.lock().unwrap(); + span_boundaries_guard + .push(TestSpanBoundary::Start(span.clone(), parent_span.map(|s| s.span.clone()))); + core::mem::drop(span_boundaries_guard); + } + TestUserSpan { span, span_boundaries } + } +} + +impl Drop for TestUserSpan { + fn drop(&mut self) { + if std::thread::panicking() { + return; + } + let mut span_boundaries = self.span_boundaries.lock().unwrap(); + span_boundaries.push(TestSpanBoundary::End(self.span.clone())); + core::mem::drop(span_boundaries); + } +} + +#[derive(Debug, Eq, PartialEq)] +pub enum TestSpanBoundary { + Start(Span, Option), + End(Span), +} + pub struct TestLogger { pub(crate) id: String, pub lines: Mutex>, pub context: Mutex, Option), usize>>, + pub span_boundaries: Arc>>, } impl TestLogger { @@ -1458,7 +1496,8 @@ impl TestLogger { pub fn with_id(id: String) -> TestLogger { let lines = Mutex::new(new_hash_map()); let context = Mutex::new(new_hash_map()); - TestLogger { id, lines, context } + let span_boundaries = Arc::new(Mutex::new(Vec::new())); + TestLogger { id, lines, context, span_boundaries } } pub fn assert_log(&self, module: &str, line: String, count: usize) { let log_entries = self.lines.lock().unwrap(); @@ -1505,6 +1544,8 @@ impl TestLogger { } impl Logger for TestLogger { + type UserSpan = TestUserSpan; + fn log(&self, record: Record) { let context = format!("{} {} [{}:{}]", self.id, record.level, record.module_path, record.line); @@ -1537,6 +1578,10 @@ impl Logger for TestLogger { println!("{}", s); } } + + fn start(&self, span: Span, parent: Option<&TestUserSpan>) -> TestUserSpan { + TestUserSpan::new(span, parent, Arc::clone(&self.span_boundaries)) + } } pub struct TestNodeSigner {