From 79f212b70a174d52cd6016d3c608e5ed9e68069b Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Mon, 6 Nov 2023 16:53:07 -0600 Subject: [PATCH 01/18] Use a message buffer abstraction in OnionMessenger Onion messages are buffered for sending to the next node. Since the network has limited adoption, connecting directly to a peer may be necessary. Add an OnionMessageBuffer abstraction that can differentiate between connected peers and those are pending a connection. This allows for buffering messages before a connection is established and applying different buffer policies for peers yet to be connected. --- fuzz/src/onion_message.rs | 2 +- lightning/src/onion_message/messenger.rs | 141 +++++++++++++++++------ 2 files changed, 105 insertions(+), 38 deletions(-) diff --git a/fuzz/src/onion_message.rs b/fuzz/src/onion_message.rs index de7b8b6b4c6..2882dcfb508 100644 --- a/fuzz/src/onion_message.rs +++ b/fuzz/src/onion_message.rs @@ -269,7 +269,7 @@ mod tests { "Received an onion message with path_id None and a reply_path: Custom(TestCustomMessage)" .to_string())), Some(&1)); assert_eq!(log_entries.get(&("lightning::onion_message::messenger".to_string(), - "Sending onion message: TestCustomMessage".to_string())), Some(&1)); + "Sending onion message when responding to Custom onion message with path_id None: TestCustomMessage".to_string())), Some(&1)); } let two_unblinded_hops_om = "\ diff --git a/lightning/src/onion_message/messenger.rs b/lightning/src/onion_message/messenger.rs index c7f01ae5978..2c42566e733 100644 --- a/lightning/src/onion_message/messenger.rs +++ b/lightning/src/onion_message/messenger.rs @@ -150,13 +150,70 @@ where entropy_source: ES, node_signer: NS, logger: L, - pending_messages: Mutex>>, + message_buffers: Mutex>, secp_ctx: Secp256k1, message_router: MR, offers_handler: OMH, custom_handler: CMH, } +/// [`OnionMessage`]s buffered to be sent. +enum OnionMessageBuffer { + /// Messages for a node connected as a peer. + ConnectedPeer(VecDeque), + + /// Messages for a node that is not yet connected. + PendingConnection(VecDeque), +} + +impl OnionMessageBuffer { + fn pending_messages(&self) -> &VecDeque { + match self { + OnionMessageBuffer::ConnectedPeer(pending_messages) => pending_messages, + OnionMessageBuffer::PendingConnection(pending_messages) => pending_messages, + } + } + + fn enqueue_message(&mut self, message: OnionMessage) { + let pending_messages = match self { + OnionMessageBuffer::ConnectedPeer(pending_messages) => pending_messages, + OnionMessageBuffer::PendingConnection(pending_messages) => pending_messages, + }; + + pending_messages.push_back(message); + } + + fn dequeue_message(&mut self) -> Option { + let pending_messages = match self { + OnionMessageBuffer::ConnectedPeer(pending_messages) => pending_messages, + OnionMessageBuffer::PendingConnection(pending_messages) => { + debug_assert!(false); + pending_messages + }, + }; + + pending_messages.pop_front() + } + + #[cfg(test)] + fn release_pending_messages(&mut self) -> VecDeque { + let pending_messages = match self { + OnionMessageBuffer::ConnectedPeer(pending_messages) => pending_messages, + OnionMessageBuffer::PendingConnection(pending_messages) => pending_messages, + }; + + core::mem::take(pending_messages) + } + + fn mark_connected(&mut self) { + if let OnionMessageBuffer::PendingConnection(pending_messages) = self { + let mut new_pending_messages = VecDeque::new(); + core::mem::swap(pending_messages, &mut new_pending_messages); + *self = OnionMessageBuffer::ConnectedPeer(new_pending_messages); + } + } +} + /// An [`OnionMessage`] for [`OnionMessenger`] to send. /// /// These are obtained when released from [`OnionMessenger`]'s handlers after which they are @@ -502,7 +559,7 @@ where OnionMessenger { entropy_source, node_signer, - pending_messages: Mutex::new(HashMap::new()), + message_buffers: Mutex::new(HashMap::new()), secp_ctx, logger, message_router, @@ -518,21 +575,23 @@ where pub fn send_onion_message( &self, path: OnionMessagePath, contents: T, reply_path: Option ) -> Result<(), SendError> { - log_trace!(self.logger, "Sending onion message: {:?}", contents); - - let (first_node_id, onion_msg) = create_onion_message( + + let (first_node_id, onion_message) = create_onion_message( &self.entropy_source, &self.node_signer, &self.secp_ctx, path, contents, reply_path )?; - let mut pending_per_peer_msgs = self.pending_messages.lock().unwrap(); - if outbound_buffer_full(&first_node_id, &pending_per_peer_msgs) { return Err(SendError::BufferFull) } - match pending_per_peer_msgs.entry(first_node_id) { + let mut message_buffers = self.message_buffers.lock().unwrap(); + if outbound_buffer_full(&first_node_id, &message_buffers) { + return Err(SendError::BufferFull); + } + + match message_buffers.entry(first_node_id) { hash_map::Entry::Vacant(_) => Err(SendError::InvalidFirstHop), hash_map::Entry::Occupied(mut e) => { - e.get_mut().push_back(onion_msg); + e.get_mut().enqueue_message(onion_message); Ok(()) - } + }, } } @@ -565,7 +624,7 @@ where } }; - let peers = self.pending_messages.lock().unwrap().keys().copied().collect(); + let peers = self.message_buffers.lock().unwrap().keys().copied().collect(); let path = match self.message_router.find_path(sender, peers, destination) { Ok(path) => path, Err(()) => { @@ -578,30 +637,29 @@ where if let Err(e) = self.send_onion_message(path, contents, reply_path) { log_trace!(self.logger, "Failed sending onion message {}: {:?}", log_suffix, e); - return; } } #[cfg(test)] pub(super) fn release_pending_msgs(&self) -> HashMap> { - let mut pending_msgs = self.pending_messages.lock().unwrap(); + let mut message_buffers = self.message_buffers.lock().unwrap(); let mut msgs = HashMap::new(); // We don't want to disconnect the peers by removing them entirely from the original map, so we - // swap the pending message buffers individually. - for (peer_node_id, pending_messages) in &mut *pending_msgs { - msgs.insert(*peer_node_id, core::mem::take(pending_messages)); + // release the pending message buffers individually. + for (peer_node_id, buffer) in &mut *message_buffers { + msgs.insert(*peer_node_id, buffer.release_pending_messages()); } msgs } } -fn outbound_buffer_full(peer_node_id: &PublicKey, buffer: &HashMap>) -> bool { +fn outbound_buffer_full(peer_node_id: &PublicKey, buffer: &HashMap) -> bool { const MAX_TOTAL_BUFFER_SIZE: usize = (1 << 20) * 128; const MAX_PER_PEER_BUFFER_SIZE: usize = (1 << 10) * 256; let mut total_buffered_bytes = 0; let mut peer_buffered_bytes = 0; for (pk, peer_buf) in buffer { - for om in peer_buf { + for om in peer_buf.pending_messages() { let om_len = om.serialized_length(); if pk == peer_node_id { peer_buffered_bytes += om_len; @@ -660,24 +718,28 @@ where } }, Ok(PeeledOnion::Forward(next_node_id, onion_message)) => { - let mut pending_per_peer_msgs = self.pending_messages.lock().unwrap(); - if outbound_buffer_full(&next_node_id, &pending_per_peer_msgs) { + let mut message_buffers = self.message_buffers.lock().unwrap(); + if outbound_buffer_full(&next_node_id, &message_buffers) { log_trace!(self.logger, "Dropping forwarded onion message to peer {:?}: outbound buffer full", next_node_id); return } #[cfg(fuzzing)] - pending_per_peer_msgs.entry(next_node_id).or_insert_with(VecDeque::new); - - match pending_per_peer_msgs.entry(next_node_id) { - hash_map::Entry::Vacant(_) => { + message_buffers + .entry(next_node_id) + .or_insert_with(|| OnionMessageBuffer::ConnectedPeer(VecDeque::new())); + + match message_buffers.entry(next_node_id) { + hash_map::Entry::Occupied(mut e) if matches!( + e.get(), OnionMessageBuffer::ConnectedPeer(..) + ) => { + e.get_mut().enqueue_message(onion_message); + log_trace!(self.logger, "Forwarding an onion message to peer {}", next_node_id); + }, + _ => { log_trace!(self.logger, "Dropping forwarded onion message to disconnected peer {:?}", next_node_id); return }, - hash_map::Entry::Occupied(mut e) => { - e.get_mut().push_back(onion_message); - log_trace!(self.logger, "Forwarding an onion message to peer {}", next_node_id); - } } }, Err(e) => { @@ -688,15 +750,22 @@ where fn peer_connected(&self, their_node_id: &PublicKey, init: &msgs::Init, _inbound: bool) -> Result<(), ()> { if init.features.supports_onion_messages() { - let mut peers = self.pending_messages.lock().unwrap(); - peers.insert(their_node_id.clone(), VecDeque::new()); + self.message_buffers.lock().unwrap() + .entry(*their_node_id) + .or_insert_with(|| OnionMessageBuffer::ConnectedPeer(VecDeque::new())) + .mark_connected(); + } else { + self.message_buffers.lock().unwrap().remove(their_node_id); } + Ok(()) } fn peer_disconnected(&self, their_node_id: &PublicKey) { - let mut pending_msgs = self.pending_messages.lock().unwrap(); - pending_msgs.remove(their_node_id); + match self.message_buffers.lock().unwrap().remove(their_node_id) { + Some(OnionMessageBuffer::ConnectedPeer(..)) => {}, + _ => debug_assert!(false), + } } fn provided_node_features(&self) -> NodeFeatures { @@ -737,11 +806,9 @@ where ); } - let mut pending_msgs = self.pending_messages.lock().unwrap(); - if let Some(msgs) = pending_msgs.get_mut(&peer_node_id) { - return msgs.pop_front() - } - None + self.message_buffers.lock().unwrap() + .get_mut(&peer_node_id) + .and_then(|buffer| buffer.dequeue_message()) } } From 8412e8368c670176fbcac7e6fa1a98a3916972e6 Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Wed, 15 Nov 2023 17:26:45 -0600 Subject: [PATCH 02/18] Destination in OnionMessenger::send_onion_message OnionMessenger::send_onion_message takes an OnionMessagePath. This isn't very useful as it requires finding a path manually. Instead, have the method take a Destination and use OnionMessenger's MessageRouter to construct the path. Later, this will allow for buffering messages where the first node in the path isn't a direct connection. --- fuzz/src/onion_message.rs | 4 +- .../src/onion_message/functional_tests.rs | 30 ++-- lightning/src/onion_message/messenger.rs | 139 ++++++++++++------ 3 files changed, 108 insertions(+), 65 deletions(-) diff --git a/fuzz/src/onion_message.rs b/fuzz/src/onion_message.rs index 2882dcfb508..afa416a4044 100644 --- a/fuzz/src/onion_message.rs +++ b/fuzz/src/onion_message.rs @@ -269,7 +269,9 @@ mod tests { "Received an onion message with path_id None and a reply_path: Custom(TestCustomMessage)" .to_string())), Some(&1)); assert_eq!(log_entries.get(&("lightning::onion_message::messenger".to_string(), - "Sending onion message when responding to Custom onion message with path_id None: TestCustomMessage".to_string())), Some(&1)); + "Constructing onion message when responding to Custom onion message with path_id None: TestCustomMessage".to_string())), Some(&1)); + assert_eq!(log_entries.get(&("lightning::onion_message::messenger".to_string(), + "Buffered onion message when responding to Custom onion message with path_id None".to_string())), Some(&1)); } let two_unblinded_hops_om = "\ diff --git a/lightning/src/onion_message/functional_tests.rs b/lightning/src/onion_message/functional_tests.rs index c43b218df7b..482e5ea8cc5 100644 --- a/lightning/src/onion_message/functional_tests.rs +++ b/lightning/src/onion_message/functional_tests.rs @@ -206,7 +206,7 @@ fn one_unblinded_hop() { intermediate_nodes: vec![], destination: Destination::Node(nodes[1].get_node_pk()), }; - nodes[0].messenger.send_onion_message(path, test_msg, None).unwrap(); + nodes[0].messenger.send_onion_message_using_path(path, test_msg, None).unwrap(); nodes[1].custom_message_handler.expect_message(TestCustomMessage::Response); pass_along_path(&nodes); } @@ -220,7 +220,7 @@ fn two_unblinded_hops() { intermediate_nodes: vec![nodes[1].get_node_pk()], destination: Destination::Node(nodes[2].get_node_pk()), }; - nodes[0].messenger.send_onion_message(path, test_msg, None).unwrap(); + nodes[0].messenger.send_onion_message_using_path(path, test_msg, None).unwrap(); nodes[2].custom_message_handler.expect_message(TestCustomMessage::Response); pass_along_path(&nodes); } @@ -236,7 +236,7 @@ fn one_blinded_hop() { intermediate_nodes: vec![], destination: Destination::BlindedPath(blinded_path), }; - nodes[0].messenger.send_onion_message(path, test_msg, None).unwrap(); + nodes[0].messenger.send_onion_message_using_path(path, test_msg, None).unwrap(); nodes[1].custom_message_handler.expect_message(TestCustomMessage::Response); pass_along_path(&nodes); } @@ -253,7 +253,7 @@ fn two_unblinded_two_blinded() { destination: Destination::BlindedPath(blinded_path), }; - nodes[0].messenger.send_onion_message(path, test_msg, None).unwrap(); + nodes[0].messenger.send_onion_message_using_path(path, test_msg, None).unwrap(); nodes[4].custom_message_handler.expect_message(TestCustomMessage::Response); pass_along_path(&nodes); } @@ -270,7 +270,7 @@ fn three_blinded_hops() { destination: Destination::BlindedPath(blinded_path), }; - nodes[0].messenger.send_onion_message(path, test_msg, None).unwrap(); + nodes[0].messenger.send_onion_message_using_path(path, test_msg, None).unwrap(); nodes[3].custom_message_handler.expect_message(TestCustomMessage::Response); pass_along_path(&nodes); } @@ -287,7 +287,7 @@ fn too_big_packet_error() { intermediate_nodes: hops, destination: Destination::Node(hop_node_id), }; - let err = nodes[0].messenger.send_onion_message(path, test_msg, None).unwrap_err(); + let err = nodes[0].messenger.send_onion_message_using_path(path, test_msg, None).unwrap_err(); assert_eq!(err, SendError::TooBigPacket); } @@ -305,7 +305,7 @@ fn we_are_intro_node() { destination: Destination::BlindedPath(blinded_path), }; - nodes[0].messenger.send_onion_message(path, test_msg.clone(), None).unwrap(); + nodes[0].messenger.send_onion_message_using_path(path, test_msg.clone(), None).unwrap(); nodes[2].custom_message_handler.expect_message(TestCustomMessage::Response); pass_along_path(&nodes); @@ -315,7 +315,7 @@ fn we_are_intro_node() { intermediate_nodes: vec![], destination: Destination::BlindedPath(blinded_path), }; - nodes[0].messenger.send_onion_message(path, test_msg, None).unwrap(); + nodes[0].messenger.send_onion_message_using_path(path, test_msg, None).unwrap(); nodes[1].custom_message_handler.expect_message(TestCustomMessage::Response); nodes.remove(2); pass_along_path(&nodes); @@ -335,7 +335,7 @@ fn invalid_blinded_path_error() { intermediate_nodes: vec![], destination: Destination::BlindedPath(blinded_path), }; - let err = nodes[0].messenger.send_onion_message(path, test_msg.clone(), None).unwrap_err(); + let err = nodes[0].messenger.send_onion_message_using_path(path, test_msg.clone(), None).unwrap_err(); assert_eq!(err, SendError::TooFewBlindedHops); } @@ -351,7 +351,7 @@ fn reply_path() { destination: Destination::Node(nodes[3].get_node_pk()), }; let reply_path = BlindedPath::new_for_message(&[nodes[2].get_node_pk(), nodes[1].get_node_pk(), nodes[0].get_node_pk()], &*nodes[0].keys_manager, &secp_ctx).unwrap(); - nodes[0].messenger.send_onion_message(path, test_msg.clone(), Some(reply_path)).unwrap(); + nodes[0].messenger.send_onion_message_using_path(path, test_msg.clone(), Some(reply_path)).unwrap(); nodes[3].custom_message_handler.expect_message(TestCustomMessage::Request); pass_along_path(&nodes); // Make sure the last node successfully decoded the reply path. @@ -367,7 +367,7 @@ fn reply_path() { }; let reply_path = BlindedPath::new_for_message(&[nodes[2].get_node_pk(), nodes[1].get_node_pk(), nodes[0].get_node_pk()], &*nodes[0].keys_manager, &secp_ctx).unwrap(); - nodes[0].messenger.send_onion_message(path, test_msg, Some(reply_path)).unwrap(); + nodes[0].messenger.send_onion_message_using_path(path, test_msg, Some(reply_path)).unwrap(); nodes[3].custom_message_handler.expect_message(TestCustomMessage::Request); pass_along_path(&nodes); @@ -399,7 +399,7 @@ fn invalid_custom_message_type() { intermediate_nodes: vec![], destination: Destination::Node(nodes[1].get_node_pk()), }; - let err = nodes[0].messenger.send_onion_message(path, test_msg, None).unwrap_err(); + let err = nodes[0].messenger.send_onion_message_using_path(path, test_msg, None).unwrap_err(); assert_eq!(err, SendError::InvalidMessage); } @@ -412,9 +412,9 @@ fn peer_buffer_full() { destination: Destination::Node(nodes[1].get_node_pk()), }; for _ in 0..188 { // Based on MAX_PER_PEER_BUFFER_SIZE in OnionMessenger - nodes[0].messenger.send_onion_message(path.clone(), test_msg.clone(), None).unwrap(); + nodes[0].messenger.send_onion_message_using_path(path.clone(), test_msg.clone(), None).unwrap(); } - let err = nodes[0].messenger.send_onion_message(path, test_msg, None).unwrap_err(); + let err = nodes[0].messenger.send_onion_message_using_path(path, test_msg, None).unwrap_err(); assert_eq!(err, SendError::BufferFull); } @@ -435,7 +435,7 @@ fn many_hops() { intermediate_nodes, destination: Destination::Node(nodes[num_nodes-1].get_node_pk()), }; - nodes[0].messenger.send_onion_message(path, test_msg, None).unwrap(); + nodes[0].messenger.send_onion_message_using_path(path, test_msg, None).unwrap(); nodes[num_nodes-1].custom_message_handler.expect_message(TestCustomMessage::Response); pass_along_path(&nodes); } diff --git a/lightning/src/onion_message/messenger.rs b/lightning/src/onion_message/messenger.rs index 2c42566e733..8d98e284e95 100644 --- a/lightning/src/onion_message/messenger.rs +++ b/lightning/src/onion_message/messenger.rs @@ -76,7 +76,14 @@ use crate::prelude::*; /// # struct FakeMessageRouter {} /// # impl MessageRouter for FakeMessageRouter { /// # fn find_path(&self, sender: PublicKey, peers: Vec, destination: Destination) -> Result { -/// # unimplemented!() +/// # let secp_ctx = Secp256k1::new(); +/// # let node_secret = SecretKey::from_slice(&>::from_hex("0101010101010101010101010101010101010101010101010101010101010101").unwrap()[..]).unwrap(); +/// # let hop_node_id1 = PublicKey::from_secret_key(&secp_ctx, &node_secret); +/// # let hop_node_id2 = hop_node_id1; +/// # Ok(OnionMessagePath { +/// # intermediate_nodes: vec![hop_node_id1, hop_node_id2], +/// # destination, +/// # }) /// # } /// # } /// # let seed = [42u8; 32]; @@ -86,7 +93,7 @@ use crate::prelude::*; /// # let node_secret = SecretKey::from_slice(&>::from_hex("0101010101010101010101010101010101010101010101010101010101010101").unwrap()[..]).unwrap(); /// # let secp_ctx = Secp256k1::new(); /// # let hop_node_id1 = PublicKey::from_secret_key(&secp_ctx, &node_secret); -/// # let (hop_node_id2, hop_node_id3, hop_node_id4) = (hop_node_id1, hop_node_id1, hop_node_id1); +/// # let (hop_node_id3, hop_node_id4) = (hop_node_id1, hop_node_id1); /// # let destination_node_id = hop_node_id1; /// # let message_router = Arc::new(FakeMessageRouter {}); /// # let custom_message_handler = IgnoringMessageHandler {}; @@ -113,13 +120,10 @@ use crate::prelude::*; /// } /// } /// // Send a custom onion message to a node id. -/// let path = OnionMessagePath { -/// intermediate_nodes: vec![hop_node_id1, hop_node_id2], -/// destination: Destination::Node(destination_node_id), -/// }; +/// let destination = Destination::Node(destination_node_id); /// let reply_path = None; /// # let message = YourCustomMessage {}; -/// onion_messenger.send_onion_message(path, message, reply_path); +/// onion_messenger.send_onion_message(message, destination, reply_path); /// /// // Create a blinded path to yourself, for someone to send an onion message to. /// # let your_node_id = hop_node_id1; @@ -127,13 +131,10 @@ use crate::prelude::*; /// let blinded_path = BlindedPath::new_for_message(&hops, &keys_manager, &secp_ctx).unwrap(); /// /// // Send a custom onion message to a blinded path. -/// let path = OnionMessagePath { -/// intermediate_nodes: vec![hop_node_id1, hop_node_id2], -/// destination: Destination::BlindedPath(blinded_path), -/// }; +/// let destination = Destination::BlindedPath(blinded_path); /// let reply_path = None; /// # let message = YourCustomMessage {}; -/// onion_messenger.send_onion_message(path, message, reply_path); +/// onion_messenger.send_onion_message(message, destination, reply_path); /// ``` /// /// [`InvoiceRequest`]: crate::offers::invoice_request::InvoiceRequest @@ -304,6 +305,16 @@ impl Destination { } } +/// Result of successfully [sending an onion message]. +/// +/// [sending an onion message]: OnionMessenger::send_onion_message +#[derive(Debug, PartialEq, Eq)] +pub enum SendSuccess { + /// The message was buffered and will be sent once it is processed by + /// [`OnionMessageHandler::next_onion_message_for_peer`]. + Buffered, +} + /// Errors that may occur when [sending an onion message]. /// /// [sending an onion message]: OnionMessenger::send_onion_message @@ -319,6 +330,8 @@ pub enum SendError { TooFewBlindedHops, /// Our next-hop peer was offline or does not support onion message forwarding. InvalidFirstHop, + /// A path from the sender to the destination could not be found by the [`MessageRouter`]. + PathNotFound, /// Onion message contents must have a TLV type >= 64. InvalidMessage, /// Our next-hop peer's buffer was full or our total outbound buffer was full. @@ -568,14 +581,63 @@ where } } - /// Sends an [`OnionMessage`] with the given `contents` for sending to the destination of - /// `path`. + /// Sends an [`OnionMessage`] with the given `contents` to `destination`. /// /// See [`OnionMessenger`] for example usage. pub fn send_onion_message( - &self, path: OnionMessagePath, contents: T, reply_path: Option - ) -> Result<(), SendError> { - log_trace!(self.logger, "Sending onion message: {:?}", contents); + &self, contents: T, destination: Destination, reply_path: Option + ) -> Result { + self.find_path_and_enqueue_onion_message( + contents, destination, reply_path, format_args!("") + ) + } + + fn find_path_and_enqueue_onion_message( + &self, contents: T, destination: Destination, reply_path: Option, + log_suffix: fmt::Arguments + ) -> Result { + let result = self.find_path(destination) + .and_then(|path| self.enqueue_onion_message(path, contents, reply_path, log_suffix)); + + match result.as_ref() { + Err(SendError::GetNodeIdFailed) => { + log_warn!(self.logger, "Unable to retrieve node id {}", log_suffix); + }, + Err(SendError::PathNotFound) => { + log_trace!(self.logger, "Failed to find path {}", log_suffix); + }, + Err(e) => { + log_trace!(self.logger, "Failed sending onion message {}: {:?}", log_suffix, e); + }, + Ok(SendSuccess::Buffered) => { + log_trace!(self.logger, "Buffered onion message {}", log_suffix); + }, + } + + result + } + + fn find_path(&self, destination: Destination) -> Result { + let sender = self.node_signer + .get_node_id(Recipient::Node) + .map_err(|_| SendError::GetNodeIdFailed)?; + + let peers = self.message_buffers.lock().unwrap() + .iter() + .filter(|(_, buffer)| matches!(buffer, OnionMessageBuffer::ConnectedPeer(_))) + .map(|(node_id, _)| *node_id) + .collect(); + + self.message_router + .find_path(sender, peers, destination) + .map_err(|_| SendError::PathNotFound) + } + + fn enqueue_onion_message( + &self, path: OnionMessagePath, contents: T, reply_path: Option, + log_suffix: fmt::Arguments + ) -> Result { + log_trace!(self.logger, "Constructing onion message {}: {:?}", log_suffix, contents); let (first_node_id, onion_message) = create_onion_message( &self.entropy_source, &self.node_signer, &self.secp_ctx, path, contents, reply_path @@ -590,18 +652,25 @@ where hash_map::Entry::Vacant(_) => Err(SendError::InvalidFirstHop), hash_map::Entry::Occupied(mut e) => { e.get_mut().enqueue_message(onion_message); - Ok(()) + Ok(SendSuccess::Buffered) }, } } + #[cfg(test)] + pub(super) fn send_onion_message_using_path( + &self, path: OnionMessagePath, contents: T, reply_path: Option + ) -> Result { + self.enqueue_onion_message(path, contents, reply_path, format_args!("")) + } + fn handle_onion_message_response( &self, response: Option, reply_path: Option, log_suffix: fmt::Arguments ) { if let Some(response) = response { match reply_path { Some(reply_path) => { - self.find_path_and_enqueue_onion_message( + let _ = self.find_path_and_enqueue_onion_message( response, Destination::BlindedPath(reply_path), None, log_suffix ); }, @@ -612,34 +681,6 @@ where } } - fn find_path_and_enqueue_onion_message( - &self, contents: T, destination: Destination, reply_path: Option, - log_suffix: fmt::Arguments - ) { - let sender = match self.node_signer.get_node_id(Recipient::Node) { - Ok(node_id) => node_id, - Err(_) => { - log_warn!(self.logger, "Unable to retrieve node id {}", log_suffix); - return; - } - }; - - let peers = self.message_buffers.lock().unwrap().keys().copied().collect(); - let path = match self.message_router.find_path(sender, peers, destination) { - Ok(path) => path, - Err(()) => { - log_trace!(self.logger, "Failed to find path {}", log_suffix); - return; - }, - }; - - log_trace!(self.logger, "Sending onion message {}: {:?}", log_suffix, contents); - - if let Err(e) = self.send_onion_message(path, contents, reply_path) { - log_trace!(self.logger, "Failed sending onion message {}: {:?}", log_suffix, e); - } - } - #[cfg(test)] pub(super) fn release_pending_msgs(&self) -> HashMap> { let mut message_buffers = self.message_buffers.lock().unwrap(); @@ -790,7 +831,7 @@ where let PendingOnionMessage { contents, destination, reply_path } = message; #[cfg(c_bindings)] let (contents, destination, reply_path) = message; - self.find_path_and_enqueue_onion_message( + let _ = self.find_path_and_enqueue_onion_message( contents, destination, reply_path, format_args!("when sending OffersMessage") ); } @@ -801,7 +842,7 @@ where let PendingOnionMessage { contents, destination, reply_path } = message; #[cfg(c_bindings)] let (contents, destination, reply_path) = message; - self.find_path_and_enqueue_onion_message( + let _ = self.find_path_and_enqueue_onion_message( contents, destination, reply_path, format_args!("when sending CustomMessage") ); } From ddee9289dce52065bcc0e8bf8ea3fcedd342627a Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Tue, 7 Nov 2023 08:17:46 -0600 Subject: [PATCH 03/18] Buffer onion messages requiring a connection MessageRouter::find_path returns a path to use when sending an onion message. If the first node on the path is not connected or does not support onion messages, sending will fail with InvalidFirstHop. Instead of failing outright, buffer the message for later sending once the first node is a connected peer. --- lightning/src/onion_message/messenger.rs | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/lightning/src/onion_message/messenger.rs b/lightning/src/onion_message/messenger.rs index 8d98e284e95..c84cfc12079 100644 --- a/lightning/src/onion_message/messenger.rs +++ b/lightning/src/onion_message/messenger.rs @@ -313,6 +313,9 @@ pub enum SendSuccess { /// The message was buffered and will be sent once it is processed by /// [`OnionMessageHandler::next_onion_message_for_peer`]. Buffered, + /// The message was buffered and will be sent once the node is connected as a peer and it is + /// processed by [`OnionMessageHandler::next_onion_message_for_peer`]. + BufferedAwaitingConnection(PublicKey), } /// Errors that may occur when [sending an onion message]. @@ -328,8 +331,6 @@ pub enum SendError { /// The provided [`Destination`] was an invalid [`BlindedPath`] due to not having any blinded /// hops. TooFewBlindedHops, - /// Our next-hop peer was offline or does not support onion message forwarding. - InvalidFirstHop, /// A path from the sender to the destination could not be found by the [`MessageRouter`]. PathNotFound, /// Onion message contents must have a TLV type >= 64. @@ -612,6 +613,12 @@ where Ok(SendSuccess::Buffered) => { log_trace!(self.logger, "Buffered onion message {}", log_suffix); }, + Ok(SendSuccess::BufferedAwaitingConnection(node_id)) => { + log_trace!( + self.logger, "Buffered onion message waiting on peer connection {}: {:?}", + log_suffix, node_id + ); + }, } result @@ -649,7 +656,11 @@ where } match message_buffers.entry(first_node_id) { - hash_map::Entry::Vacant(_) => Err(SendError::InvalidFirstHop), + hash_map::Entry::Vacant(e) => { + e.insert(OnionMessageBuffer::PendingConnection(VecDeque::new())) + .enqueue_message(onion_message); + Ok(SendSuccess::BufferedAwaitingConnection(first_node_id)) + }, hash_map::Entry::Occupied(mut e) => { e.get_mut().enqueue_message(onion_message); Ok(SendSuccess::Buffered) From 17af8d5f0954c85163f881ce1c9cc49efbc582b1 Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Tue, 14 Nov 2023 15:05:05 -0600 Subject: [PATCH 04/18] Add NetworkGraph reference to DefaultMessageRouter When buffering onion messages for a node that is not connected as a peer, it's possible that the node does not exist. Include a NetworkGraph reference in DefaultMessageRouter so that it can be used to check if the node actually exists. Otherwise, an malicious node may send an onion message where the reply path's introduction node doesn't exist. This would result in buffering messages that may never be delivered. --- lightning/src/onion_message/messenger.rs | 27 ++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/lightning/src/onion_message/messenger.rs b/lightning/src/onion_message/messenger.rs index c84cfc12079..e507f0a21aa 100644 --- a/lightning/src/onion_message/messenger.rs +++ b/lightning/src/onion_message/messenger.rs @@ -25,6 +25,7 @@ use crate::ln::features::{InitFeatures, NodeFeatures}; use crate::ln::msgs::{self, OnionMessage, OnionMessageHandler}; use crate::ln::onion_utils; use crate::ln::peer_handler::IgnoringMessageHandler; +use crate::routing::gossip::NetworkGraph; pub use super::packet::OnionMessageContents; use super::packet::ParsedOnionMessageContents; use super::offers::OffersMessageHandler; @@ -256,9 +257,27 @@ pub trait MessageRouter { } /// A [`MessageRouter`] that can only route to a directly connected [`Destination`]. -pub struct DefaultMessageRouter; +pub struct DefaultMessageRouter>, L: Deref> +where + L::Target: Logger, +{ + network_graph: G, +} -impl MessageRouter for DefaultMessageRouter { +impl>, L: Deref> DefaultMessageRouter +where + L::Target: Logger, +{ + /// Creates a [`DefaultMessageRouter`] using the given [`NetworkGraph`]. + pub fn new(network_graph: G) -> Self { + Self { network_graph } + } +} + +impl>, L: Deref> MessageRouter for DefaultMessageRouter +where + L::Target: Logger, +{ fn find_path( &self, _sender: PublicKey, peers: Vec, destination: Destination ) -> Result { @@ -878,7 +897,7 @@ pub type SimpleArcOnionMessenger = OnionMessenger< Arc, Arc, Arc, - Arc, + Arc>>, Arc>>, Arc>, IgnoringMessageHandler >; @@ -897,7 +916,7 @@ pub type SimpleRefOnionMessenger< &'a KeysManager, &'a KeysManager, &'b L, - &'i DefaultMessageRouter, + &'i DefaultMessageRouter<&'g NetworkGraph<&'b L>, &'b L>, &'j SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L>, IgnoringMessageHandler >; From 1114c3c5aaca7a0292a6416975ab94c17fe07cfb Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Tue, 14 Nov 2023 15:30:17 -0600 Subject: [PATCH 05/18] Add Option> to OnionMessagePath MessageRouter::find_path is given a Destination to reach via a set of peers. If a path cannot be found, it may return a partial path such that OnionMessenger can signal a direct connection to the first node in the path is needed. Include a list of socket addresses in the returned OnionMessagePath to allow OnionMessenger to know how to connect to the node. This allows DefaultMessageRouter to use its NetworkGraph to return socket addresses for gossiped nodes. --- fuzz/src/onion_message.rs | 1 + .../src/onion_message/functional_tests.rs | 15 ++++++++++++ lightning/src/onion_message/messenger.rs | 23 ++++++++++++++++--- 3 files changed, 36 insertions(+), 3 deletions(-) diff --git a/fuzz/src/onion_message.rs b/fuzz/src/onion_message.rs index afa416a4044..76cd98cea2d 100644 --- a/fuzz/src/onion_message.rs +++ b/fuzz/src/onion_message.rs @@ -79,6 +79,7 @@ impl MessageRouter for TestMessageRouter { Ok(OnionMessagePath { intermediate_nodes: vec![], destination, + addresses: None, }) } } diff --git a/lightning/src/onion_message/functional_tests.rs b/lightning/src/onion_message/functional_tests.rs index 482e5ea8cc5..fcf209cda65 100644 --- a/lightning/src/onion_message/functional_tests.rs +++ b/lightning/src/onion_message/functional_tests.rs @@ -55,6 +55,7 @@ impl MessageRouter for TestMessageRouter { Ok(OnionMessagePath { intermediate_nodes: vec![], destination, + addresses: None, }) } } @@ -205,6 +206,7 @@ fn one_unblinded_hop() { let path = OnionMessagePath { intermediate_nodes: vec![], destination: Destination::Node(nodes[1].get_node_pk()), + addresses: None, }; nodes[0].messenger.send_onion_message_using_path(path, test_msg, None).unwrap(); nodes[1].custom_message_handler.expect_message(TestCustomMessage::Response); @@ -219,6 +221,7 @@ fn two_unblinded_hops() { let path = OnionMessagePath { intermediate_nodes: vec![nodes[1].get_node_pk()], destination: Destination::Node(nodes[2].get_node_pk()), + addresses: None, }; nodes[0].messenger.send_onion_message_using_path(path, test_msg, None).unwrap(); nodes[2].custom_message_handler.expect_message(TestCustomMessage::Response); @@ -235,6 +238,7 @@ fn one_blinded_hop() { let path = OnionMessagePath { intermediate_nodes: vec![], destination: Destination::BlindedPath(blinded_path), + addresses: None, }; nodes[0].messenger.send_onion_message_using_path(path, test_msg, None).unwrap(); nodes[1].custom_message_handler.expect_message(TestCustomMessage::Response); @@ -251,6 +255,7 @@ fn two_unblinded_two_blinded() { let path = OnionMessagePath { intermediate_nodes: vec![nodes[1].get_node_pk(), nodes[2].get_node_pk()], destination: Destination::BlindedPath(blinded_path), + addresses: None, }; nodes[0].messenger.send_onion_message_using_path(path, test_msg, None).unwrap(); @@ -268,6 +273,7 @@ fn three_blinded_hops() { let path = OnionMessagePath { intermediate_nodes: vec![], destination: Destination::BlindedPath(blinded_path), + addresses: None, }; nodes[0].messenger.send_onion_message_using_path(path, test_msg, None).unwrap(); @@ -286,6 +292,7 @@ fn too_big_packet_error() { let path = OnionMessagePath { intermediate_nodes: hops, destination: Destination::Node(hop_node_id), + addresses: None, }; let err = nodes[0].messenger.send_onion_message_using_path(path, test_msg, None).unwrap_err(); assert_eq!(err, SendError::TooBigPacket); @@ -303,6 +310,7 @@ fn we_are_intro_node() { let path = OnionMessagePath { intermediate_nodes: vec![], destination: Destination::BlindedPath(blinded_path), + addresses: None, }; nodes[0].messenger.send_onion_message_using_path(path, test_msg.clone(), None).unwrap(); @@ -314,6 +322,7 @@ fn we_are_intro_node() { let path = OnionMessagePath { intermediate_nodes: vec![], destination: Destination::BlindedPath(blinded_path), + addresses: None, }; nodes[0].messenger.send_onion_message_using_path(path, test_msg, None).unwrap(); nodes[1].custom_message_handler.expect_message(TestCustomMessage::Response); @@ -334,6 +343,7 @@ fn invalid_blinded_path_error() { let path = OnionMessagePath { intermediate_nodes: vec![], destination: Destination::BlindedPath(blinded_path), + addresses: None, }; let err = nodes[0].messenger.send_onion_message_using_path(path, test_msg.clone(), None).unwrap_err(); assert_eq!(err, SendError::TooFewBlindedHops); @@ -349,6 +359,7 @@ fn reply_path() { let path = OnionMessagePath { intermediate_nodes: vec![nodes[1].get_node_pk(), nodes[2].get_node_pk()], destination: Destination::Node(nodes[3].get_node_pk()), + addresses: None, }; let reply_path = BlindedPath::new_for_message(&[nodes[2].get_node_pk(), nodes[1].get_node_pk(), nodes[0].get_node_pk()], &*nodes[0].keys_manager, &secp_ctx).unwrap(); nodes[0].messenger.send_onion_message_using_path(path, test_msg.clone(), Some(reply_path)).unwrap(); @@ -364,6 +375,7 @@ fn reply_path() { let path = OnionMessagePath { intermediate_nodes: vec![], destination: Destination::BlindedPath(blinded_path), + addresses: None, }; let reply_path = BlindedPath::new_for_message(&[nodes[2].get_node_pk(), nodes[1].get_node_pk(), nodes[0].get_node_pk()], &*nodes[0].keys_manager, &secp_ctx).unwrap(); @@ -398,6 +410,7 @@ fn invalid_custom_message_type() { let path = OnionMessagePath { intermediate_nodes: vec![], destination: Destination::Node(nodes[1].get_node_pk()), + addresses: None, }; let err = nodes[0].messenger.send_onion_message_using_path(path, test_msg, None).unwrap_err(); assert_eq!(err, SendError::InvalidMessage); @@ -410,6 +423,7 @@ fn peer_buffer_full() { let path = OnionMessagePath { intermediate_nodes: vec![], destination: Destination::Node(nodes[1].get_node_pk()), + addresses: None, }; for _ in 0..188 { // Based on MAX_PER_PEER_BUFFER_SIZE in OnionMessenger nodes[0].messenger.send_onion_message_using_path(path.clone(), test_msg.clone(), None).unwrap(); @@ -434,6 +448,7 @@ fn many_hops() { let path = OnionMessagePath { intermediate_nodes, destination: Destination::Node(nodes[num_nodes-1].get_node_pk()), + addresses: None, }; nodes[0].messenger.send_onion_message_using_path(path, test_msg, None).unwrap(); nodes[num_nodes-1].custom_message_handler.expect_message(TestCustomMessage::Response); diff --git a/lightning/src/onion_message/messenger.rs b/lightning/src/onion_message/messenger.rs index e507f0a21aa..eb309b622e8 100644 --- a/lightning/src/onion_message/messenger.rs +++ b/lightning/src/onion_message/messenger.rs @@ -22,7 +22,7 @@ use crate::sign::{EntropySource, KeysManager, NodeSigner, Recipient}; #[cfg(not(c_bindings))] use crate::ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager}; use crate::ln::features::{InitFeatures, NodeFeatures}; -use crate::ln::msgs::{self, OnionMessage, OnionMessageHandler}; +use crate::ln::msgs::{self, OnionMessage, OnionMessageHandler, SocketAddress}; use crate::ln::onion_utils; use crate::ln::peer_handler::IgnoringMessageHandler; use crate::routing::gossip::NetworkGraph; @@ -84,6 +84,7 @@ use crate::prelude::*; /// # Ok(OnionMessagePath { /// # intermediate_nodes: vec![hop_node_id1, hop_node_id2], /// # destination, +/// # addresses: None, /// # }) /// # } /// # } @@ -282,7 +283,7 @@ where &self, _sender: PublicKey, peers: Vec, destination: Destination ) -> Result { if peers.contains(&destination.first_node()) { - Ok(OnionMessagePath { intermediate_nodes: vec![], destination }) + Ok(OnionMessagePath { intermediate_nodes: vec![], destination, addresses: None }) } else { Err(()) } @@ -297,6 +298,22 @@ pub struct OnionMessagePath { /// The recipient of the message. pub destination: Destination, + + /// Addresses that may be used to connect to [`OnionMessagePath::first_node`]. + /// + /// Only needs to be set if a connection to the node is required. [`OnionMessenger`] may use + /// this to initiate such a connection. + pub addresses: Option>, +} + +impl OnionMessagePath { + /// Returns the first node in the path. + pub fn first_node(&self) -> PublicKey { + self.intermediate_nodes + .first() + .copied() + .unwrap_or_else(|| self.destination.first_node()) + } } /// The destination of an onion message. @@ -427,7 +444,7 @@ where ES::Target: EntropySource, NS::Target: NodeSigner, { - let OnionMessagePath { intermediate_nodes, mut destination } = path; + let OnionMessagePath { intermediate_nodes, mut destination, .. } = path; if let Destination::BlindedPath(BlindedPath { ref blinded_hops, .. }) = destination { if blinded_hops.is_empty() { return Err(SendError::TooFewBlindedHops); From b86f02afad22f0d2b8fe30e3427bfc63bd0b086d Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Tue, 14 Nov 2023 17:08:26 -0600 Subject: [PATCH 06/18] Return socket addresses from DefaultMessageRouter When there isn't a direct connection with the Destination of an OnionMessage, look up socket addresses from the NetworkGraph. This is used to signal to OnionMessenger that a direct connection is needed to send the message. --- lightning/src/onion_message/messenger.rs | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/lightning/src/onion_message/messenger.rs b/lightning/src/onion_message/messenger.rs index eb309b622e8..9135157b815 100644 --- a/lightning/src/onion_message/messenger.rs +++ b/lightning/src/onion_message/messenger.rs @@ -25,7 +25,7 @@ use crate::ln::features::{InitFeatures, NodeFeatures}; use crate::ln::msgs::{self, OnionMessage, OnionMessageHandler, SocketAddress}; use crate::ln::onion_utils; use crate::ln::peer_handler::IgnoringMessageHandler; -use crate::routing::gossip::NetworkGraph; +use crate::routing::gossip::{NetworkGraph, NodeId}; pub use super::packet::OnionMessageContents; use super::packet::ParsedOnionMessageContents; use super::offers::OffersMessageHandler; @@ -282,10 +282,24 @@ where fn find_path( &self, _sender: PublicKey, peers: Vec, destination: Destination ) -> Result { - if peers.contains(&destination.first_node()) { + let first_node = destination.first_node(); + if peers.contains(&first_node) { Ok(OnionMessagePath { intermediate_nodes: vec![], destination, addresses: None }) } else { - Err(()) + let network_graph = self.network_graph.deref().read_only(); + let node_announcement = network_graph + .node(&NodeId::from_pubkey(&first_node)) + .and_then(|node_info| node_info.announcement_info.as_ref()) + .and_then(|announcement_info| announcement_info.announcement_message.as_ref()) + .map(|node_announcement| &node_announcement.contents); + + match node_announcement { + Some(node_announcement) if node_announcement.features.supports_onion_messages() => { + let addresses = Some(node_announcement.addresses.clone()); + Ok(OnionMessagePath { intermediate_nodes: vec![], destination, addresses }) + }, + _ => Err(()), + } } } } From ba2a8221c414c253531a1fd49623bf1c743bad6a Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Thu, 9 Nov 2023 11:10:23 -0600 Subject: [PATCH 07/18] Add Event::ConnectionNeeded for onion messages A MessageRouter may be unable to find a complete path to an onion message's destination. This could because no such path exists or any needs on a potential path don't support onion messages. Add an event that indicates a connection with a node is needed in order to send the message. --- lightning/src/events/mod.rs | 28 ++++++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/lightning/src/events/mod.rs b/lightning/src/events/mod.rs index 4e04a3634e1..76e5f25c0e5 100644 --- a/lightning/src/events/mod.rs +++ b/lightning/src/events/mod.rs @@ -530,6 +530,25 @@ pub enum Event { /// serialized prior to LDK version 0.0.117. sender_intended_total_msat: Option, }, + /// Indicates that a peer connection with a node is needed in order to send an [`OnionMessage`]. + /// + /// Typically, this happens when a [`MessageRouter`] is unable to find a complete path to a + /// [`Destination`]. Once a connection is established, any messages buffered by an + /// [`OnionMessageHandler`] may be sent. + /// + /// This event will not be generated for onion message forwards; only for sends including + /// replies. Handlers should connect to the node otherwise any buffered messages may be lost. + /// + /// [`OnionMessage`]: msgs::OnionMessage + /// [`MessageRouter`]: crate::onion_message::MessageRouter + /// [`Destination`]: crate::onion_message::Destination + /// [`OnionMessageHandler`]: crate::ln::msgs::OnionMessageHandler + ConnectionNeeded { + /// The node id for the node needing a connection. + node_id: PublicKey, + /// Sockets for connecting to the node. + addresses: Vec, + }, /// Indicates a request for an invoice failed to yield a response in a reasonable amount of time /// or was explicitly abandoned by [`ChannelManager::abandon_payment`]. This may be for an /// [`InvoiceRequest`] sent for an [`Offer`] or for a [`Refund`] that hasn't been redeemed. @@ -1190,6 +1209,10 @@ impl Writeable for Event { (0, payment_id, required), }) }, + &Event::ConnectionNeeded { .. } => { + 35u8.write(writer)?; + // Never write ConnectionNeeded events as buffered onion messages aren't serialized. + }, // Note that, going forward, all new events must only write data inside of // `write_tlv_fields`. Versions 0.0.101+ will ignore odd-numbered events that write // data via `write_tlv_fields`. @@ -1200,8 +1223,7 @@ impl Writeable for Event { impl MaybeReadable for Event { fn read(reader: &mut R) -> Result, msgs::DecodeError> { match Readable::read(reader)? { - // Note that we do not write a length-prefixed TLV for FundingGenerationReady events, - // unlike all other events, thus we return immediately here. + // Note that we do not write a length-prefixed TLV for FundingGenerationReady events. 0u8 => Ok(None), 1u8 => { let f = || { @@ -1588,6 +1610,8 @@ impl MaybeReadable for Event { }; f() }, + // Note that we do not write a length-prefixed TLV for ConnectionNeeded events. + 35u8 => Ok(None), // Versions prior to 0.0.100 did not ignore odd types, instead returning InvalidValue. // Version 0.0.100 failed to properly ignore odd types, possibly resulting in corrupt // reads. From 06b05df75533bbbe1400bb3efca7e97cff78146f Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Thu, 9 Nov 2023 11:13:01 -0600 Subject: [PATCH 08/18] Make OnionMessageHandler extend EventsProvider An OnionMessageHandler may buffer messages that can't be sent because the recipient is not a peer. Have the trait extend EventsProvider so that implementation so that an Event::ConnectionNeeded can be generated for any nodes that fall into this category. Also, implement EventsProvider for OnionMessenger and IgnoringMessageHandler. --- lightning/src/ln/msgs.rs | 4 +- lightning/src/ln/peer_handler.rs | 5 +- lightning/src/onion_message/messenger.rs | 60 +++++++++++++++++------- 3 files changed, 49 insertions(+), 20 deletions(-) diff --git a/lightning/src/ln/msgs.rs b/lightning/src/ln/msgs.rs index 2d871b354a2..41120ce036f 100644 --- a/lightning/src/ln/msgs.rs +++ b/lightning/src/ln/msgs.rs @@ -52,7 +52,7 @@ use core::fmt::Display; use crate::io::{self, Cursor, Read}; use crate::io_extras::read_to_end; -use crate::events::MessageSendEventsProvider; +use crate::events::{EventsProvider, MessageSendEventsProvider}; use crate::util::chacha20poly1305rfc::ChaChaPolyReadAdapter; use crate::util::logger; use crate::util::ser::{LengthReadable, LengthReadableArgs, Readable, ReadableArgs, Writeable, Writer, WithoutLength, FixedLengthReader, HighZeroBytesDroppedBigSize, Hostname, TransactionU16LenLimited, BigSize}; @@ -1631,7 +1631,7 @@ pub trait RoutingMessageHandler : MessageSendEventsProvider { } /// A handler for received [`OnionMessage`]s and for providing generated ones to send. -pub trait OnionMessageHandler { +pub trait OnionMessageHandler: EventsProvider { /// Handle an incoming `onion_message` message from the given peer. fn handle_onion_message(&self, peer_node_id: &PublicKey, msg: &OnionMessage); diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index f061772890b..1e9752a4fa5 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -19,7 +19,7 @@ use bitcoin::blockdata::constants::ChainHash; use bitcoin::secp256k1::{self, Secp256k1, SecretKey, PublicKey}; use crate::sign::{KeysManager, NodeSigner, Recipient}; -use crate::events::{MessageSendEvent, MessageSendEventsProvider}; +use crate::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider}; use crate::ln::ChannelId; use crate::ln::features::{InitFeatures, NodeFeatures}; use crate::ln::msgs; @@ -89,6 +89,9 @@ pub trait CustomMessageHandler: wire::CustomMessageReader { /// A dummy struct which implements `RoutingMessageHandler` without storing any routing information /// or doing any processing. You can provide one of these as the route_handler in a MessageHandler. pub struct IgnoringMessageHandler{} +impl EventsProvider for IgnoringMessageHandler { + fn process_pending_events(&self, _handler: H) where H::Target: EventHandler {} +} impl MessageSendEventsProvider for IgnoringMessageHandler { fn get_and_clear_pending_msg_events(&self) -> Vec { Vec::new() } } diff --git a/lightning/src/onion_message/messenger.rs b/lightning/src/onion_message/messenger.rs index 9135157b815..c2e2bc0292a 100644 --- a/lightning/src/onion_message/messenger.rs +++ b/lightning/src/onion_message/messenger.rs @@ -18,6 +18,7 @@ use bitcoin::secp256k1::{self, PublicKey, Scalar, Secp256k1, SecretKey}; use crate::blinded_path::BlindedPath; use crate::blinded_path::message::{advance_path_by_one, ForwardTlvs, ReceiveTlvs}; use crate::blinded_path::utils; +use crate::events::{Event, EventHandler, EventsProvider}; use crate::sign::{EntropySource, KeysManager, NodeSigner, Recipient}; #[cfg(not(c_bindings))] use crate::ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager}; @@ -166,21 +167,21 @@ enum OnionMessageBuffer { ConnectedPeer(VecDeque), /// Messages for a node that is not yet connected. - PendingConnection(VecDeque), + PendingConnection(VecDeque, Option>), } impl OnionMessageBuffer { fn pending_messages(&self) -> &VecDeque { match self { OnionMessageBuffer::ConnectedPeer(pending_messages) => pending_messages, - OnionMessageBuffer::PendingConnection(pending_messages) => pending_messages, + OnionMessageBuffer::PendingConnection(pending_messages, _) => pending_messages, } } fn enqueue_message(&mut self, message: OnionMessage) { let pending_messages = match self { OnionMessageBuffer::ConnectedPeer(pending_messages) => pending_messages, - OnionMessageBuffer::PendingConnection(pending_messages) => pending_messages, + OnionMessageBuffer::PendingConnection(pending_messages, _) => pending_messages, }; pending_messages.push_back(message); @@ -189,7 +190,7 @@ impl OnionMessageBuffer { fn dequeue_message(&mut self) -> Option { let pending_messages = match self { OnionMessageBuffer::ConnectedPeer(pending_messages) => pending_messages, - OnionMessageBuffer::PendingConnection(pending_messages) => { + OnionMessageBuffer::PendingConnection(pending_messages, _) => { debug_assert!(false); pending_messages }, @@ -202,14 +203,14 @@ impl OnionMessageBuffer { fn release_pending_messages(&mut self) -> VecDeque { let pending_messages = match self { OnionMessageBuffer::ConnectedPeer(pending_messages) => pending_messages, - OnionMessageBuffer::PendingConnection(pending_messages) => pending_messages, + OnionMessageBuffer::PendingConnection(pending_messages, _) => pending_messages, }; core::mem::take(pending_messages) } fn mark_connected(&mut self) { - if let OnionMessageBuffer::PendingConnection(pending_messages) = self { + if let OnionMessageBuffer::PendingConnection(pending_messages, _) = self { let mut new_pending_messages = VecDeque::new(); core::mem::swap(pending_messages, &mut new_pending_messages); *self = OnionMessageBuffer::ConnectedPeer(new_pending_messages); @@ -381,6 +382,8 @@ pub enum SendError { /// The provided [`Destination`] was an invalid [`BlindedPath`] due to not having any blinded /// hops. TooFewBlindedHops, + /// The first hop is not a peer and doesn't have a known [`SocketAddress`]. + InvalidFirstHop(PublicKey), /// A path from the sender to the destination could not be found by the [`MessageRouter`]. PathNotFound, /// Onion message contents must have a TLV type >= 64. @@ -453,12 +456,12 @@ pub enum PeeledOnion { pub fn create_onion_message( entropy_source: &ES, node_signer: &NS, secp_ctx: &Secp256k1, path: OnionMessagePath, contents: T, reply_path: Option, -) -> Result<(PublicKey, OnionMessage), SendError> +) -> Result<(PublicKey, OnionMessage, Option>), SendError> where ES::Target: EntropySource, NS::Target: NodeSigner, { - let OnionMessagePath { intermediate_nodes, mut destination, .. } = path; + let OnionMessagePath { intermediate_nodes, mut destination, addresses } = path; if let Destination::BlindedPath(BlindedPath { ref blinded_hops, .. }) = destination { if blinded_hops.is_empty() { return Err(SendError::TooFewBlindedHops); @@ -499,10 +502,8 @@ where let onion_routing_packet = construct_onion_message_packet( packet_payloads, packet_keys, prng_seed).map_err(|()| SendError::TooBigPacket)?; - Ok((first_node_id, OnionMessage { - blinding_point, - onion_routing_packet - })) + let message = OnionMessage { blinding_point, onion_routing_packet }; + Ok((first_node_id, message, addresses)) } /// Decode one layer of an incoming [`OnionMessage`]. @@ -696,7 +697,7 @@ where ) -> Result { log_trace!(self.logger, "Constructing onion message {}: {:?}", log_suffix, contents); - let (first_node_id, onion_message) = create_onion_message( + let (first_node_id, onion_message, addresses) = create_onion_message( &self.entropy_source, &self.node_signer, &self.secp_ctx, path, contents, reply_path )?; @@ -706,10 +707,14 @@ where } match message_buffers.entry(first_node_id) { - hash_map::Entry::Vacant(e) => { - e.insert(OnionMessageBuffer::PendingConnection(VecDeque::new())) - .enqueue_message(onion_message); - Ok(SendSuccess::BufferedAwaitingConnection(first_node_id)) + hash_map::Entry::Vacant(e) => match addresses { + None => Err(SendError::InvalidFirstHop(first_node_id)), + Some(addresses) => { + e.insert( + OnionMessageBuffer::PendingConnection(VecDeque::new(), Some(addresses)) + ).enqueue_message(onion_message); + Ok(SendSuccess::BufferedAwaitingConnection(first_node_id)) + }, }, hash_map::Entry::Occupied(mut e) => { e.get_mut().enqueue_message(onion_message); @@ -778,6 +783,27 @@ fn outbound_buffer_full(peer_node_id: &PublicKey, buffer: &HashMap EventsProvider +for OnionMessenger +where + ES::Target: EntropySource, + NS::Target: NodeSigner, + L::Target: Logger, + MR::Target: MessageRouter, + OMH::Target: OffersMessageHandler, + CMH::Target: CustomOnionMessageHandler, +{ + fn process_pending_events(&self, handler: H) where H::Target: EventHandler { + for (node_id, recipient) in self.message_buffers.lock().unwrap().iter_mut() { + if let OnionMessageBuffer::PendingConnection(_, addresses) = recipient { + if let Some(addresses) = addresses.take() { + handler.handle_event(Event::ConnectionNeeded { node_id: *node_id, addresses }); + } + } + } + } +} + impl OnionMessageHandler for OnionMessenger where From cfaa7f3617947d25e74fb4fcaad20c442ffd602e Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Thu, 9 Nov 2023 15:58:24 -0600 Subject: [PATCH 09/18] Drop buffered messages for timed out nodes OnionMessenger buffers onion messages for nodes that are pending a connection. To prevent DoS concerns, add a timer_tick_occurred method to OnionMessageHandler so that buffered messages can be dropped. This will be called in lightning-background-processor every 10 seconds. --- lightning/src/ln/msgs.rs | 4 ++ lightning/src/ln/peer_handler.rs | 1 + lightning/src/onion_message/messenger.rs | 47 ++++++++++++++++++------ 3 files changed, 41 insertions(+), 11 deletions(-) diff --git a/lightning/src/ln/msgs.rs b/lightning/src/ln/msgs.rs index 41120ce036f..b877565e017 100644 --- a/lightning/src/ln/msgs.rs +++ b/lightning/src/ln/msgs.rs @@ -1650,6 +1650,10 @@ pub trait OnionMessageHandler: EventsProvider { /// drop and refuse to forward onion messages to this peer. fn peer_disconnected(&self, their_node_id: &PublicKey); + /// Performs actions that should happen roughly every ten seconds after startup. Allows handlers + /// to drop any buffered onion messages intended for prospective peers. + fn timer_tick_occurred(&self); + // Handler information: /// Gets the node feature flags which this handler itself supports. All available handlers are /// queried similarly and their feature flags are OR'd together to form the [`NodeFeatures`] diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 1e9752a4fa5..2fcf1b33047 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -118,6 +118,7 @@ impl OnionMessageHandler for IgnoringMessageHandler { fn next_onion_message_for_peer(&self, _peer_node_id: PublicKey) -> Option { None } fn peer_connected(&self, _their_node_id: &PublicKey, _init: &msgs::Init, _inbound: bool) -> Result<(), ()> { Ok(()) } fn peer_disconnected(&self, _their_node_id: &PublicKey) {} + fn timer_tick_occurred(&self) {} fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::empty() } fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures { InitFeatures::empty() diff --git a/lightning/src/onion_message/messenger.rs b/lightning/src/onion_message/messenger.rs index c2e2bc0292a..c0f1130a45f 100644 --- a/lightning/src/onion_message/messenger.rs +++ b/lightning/src/onion_message/messenger.rs @@ -166,22 +166,27 @@ enum OnionMessageBuffer { /// Messages for a node connected as a peer. ConnectedPeer(VecDeque), - /// Messages for a node that is not yet connected. - PendingConnection(VecDeque, Option>), + /// Messages for a node that is not yet connected, which are dropped after a certain number of + /// timer ticks defined in [`OnionMessenger::timer_tick_occurred`] and tracked here. + PendingConnection(VecDeque, Option>, usize), } impl OnionMessageBuffer { + fn pending_connection(addresses: Vec) -> Self { + Self::PendingConnection(VecDeque::new(), Some(addresses), 0) + } + fn pending_messages(&self) -> &VecDeque { match self { OnionMessageBuffer::ConnectedPeer(pending_messages) => pending_messages, - OnionMessageBuffer::PendingConnection(pending_messages, _) => pending_messages, + OnionMessageBuffer::PendingConnection(pending_messages, _, _) => pending_messages, } } fn enqueue_message(&mut self, message: OnionMessage) { let pending_messages = match self { OnionMessageBuffer::ConnectedPeer(pending_messages) => pending_messages, - OnionMessageBuffer::PendingConnection(pending_messages, _) => pending_messages, + OnionMessageBuffer::PendingConnection(pending_messages, _, _) => pending_messages, }; pending_messages.push_back(message); @@ -190,7 +195,7 @@ impl OnionMessageBuffer { fn dequeue_message(&mut self) -> Option { let pending_messages = match self { OnionMessageBuffer::ConnectedPeer(pending_messages) => pending_messages, - OnionMessageBuffer::PendingConnection(pending_messages, _) => { + OnionMessageBuffer::PendingConnection(pending_messages, _, _) => { debug_assert!(false); pending_messages }, @@ -203,14 +208,14 @@ impl OnionMessageBuffer { fn release_pending_messages(&mut self) -> VecDeque { let pending_messages = match self { OnionMessageBuffer::ConnectedPeer(pending_messages) => pending_messages, - OnionMessageBuffer::PendingConnection(pending_messages, _) => pending_messages, + OnionMessageBuffer::PendingConnection(pending_messages, _, _) => pending_messages, }; core::mem::take(pending_messages) } fn mark_connected(&mut self) { - if let OnionMessageBuffer::PendingConnection(pending_messages, _) = self { + if let OnionMessageBuffer::PendingConnection(pending_messages, _, _) = self { let mut new_pending_messages = VecDeque::new(); core::mem::swap(pending_messages, &mut new_pending_messages); *self = OnionMessageBuffer::ConnectedPeer(new_pending_messages); @@ -710,9 +715,8 @@ where hash_map::Entry::Vacant(e) => match addresses { None => Err(SendError::InvalidFirstHop(first_node_id)), Some(addresses) => { - e.insert( - OnionMessageBuffer::PendingConnection(VecDeque::new(), Some(addresses)) - ).enqueue_message(onion_message); + e.insert(OnionMessageBuffer::pending_connection(addresses)) + .enqueue_message(onion_message); Ok(SendSuccess::BufferedAwaitingConnection(first_node_id)) }, }, @@ -795,7 +799,7 @@ where { fn process_pending_events(&self, handler: H) where H::Target: EventHandler { for (node_id, recipient) in self.message_buffers.lock().unwrap().iter_mut() { - if let OnionMessageBuffer::PendingConnection(_, addresses) = recipient { + if let OnionMessageBuffer::PendingConnection(_, addresses, _) = recipient { if let Some(addresses) = addresses.take() { handler.handle_event(Event::ConnectionNeeded { node_id: *node_id, addresses }); } @@ -896,6 +900,27 @@ where } } + fn timer_tick_occurred(&self) { + const MAX_TIMER_TICKS: usize = 2; + let mut message_buffers = self.message_buffers.lock().unwrap(); + + // Drop any pending recipients since the last call to avoid retaining buffered messages for + // too long. + message_buffers.retain(|_, recipient| match recipient { + OnionMessageBuffer::PendingConnection(_, None, ticks) => *ticks < MAX_TIMER_TICKS, + OnionMessageBuffer::PendingConnection(_, Some(_), _) => true, + _ => true, + }); + + // Increment a timer tick for pending recipients so that their buffered messages are dropped + // at MAX_TIMER_TICKS. + for recipient in message_buffers.values_mut() { + if let OnionMessageBuffer::PendingConnection(_, None, ticks) = recipient { + *ticks += 1; + } + } + } + fn provided_node_features(&self) -> NodeFeatures { let mut features = NodeFeatures::empty(); features.set_onion_messages_optional(); From ae9851794ca8295e48ff233f598ef1f3d3521f11 Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Thu, 16 Nov 2023 08:49:05 -0600 Subject: [PATCH 10/18] Remove unnecessary BackgroundProcessor type param --- lightning-background-processor/src/lib.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 95796e86086..c22382490c7 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -603,8 +603,7 @@ pub async fn process_events_async< CM: 'static + Deref> + Send + Sync, PGS: 'static + Deref> + Send + Sync, RGS: 'static + Deref> + Send, - APM: APeerManager + Send + Sync, - PM: 'static + Deref + Send + Sync, + PM: 'static + Deref + Send + Sync, S: 'static + Deref + Send + Sync, SC: for<'b> WriteableScore<'b>, SleepFuture: core::future::Future + core::marker::Unpin, @@ -627,6 +626,7 @@ where L::Target: 'static + Logger, P::Target: 'static + Persist<::EcdsaSigner>, PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>, + PM::Target: APeerManager + Send + Sync, { let mut should_break = false; let async_event_handler = |event| { @@ -742,8 +742,7 @@ impl BackgroundProcessor { CM: 'static + Deref> + Send + Sync, PGS: 'static + Deref> + Send + Sync, RGS: 'static + Deref> + Send, - APM: APeerManager + Send + Sync, - PM: 'static + Deref + Send + Sync, + PM: 'static + Deref + Send + Sync, S: 'static + Deref + Send + Sync, SC: for <'b> WriteableScore<'b>, >( @@ -763,6 +762,7 @@ impl BackgroundProcessor { L::Target: 'static + Logger, P::Target: 'static + Persist<::EcdsaSigner>, PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>, + PM::Target: APeerManager + Send + Sync, { let stop_thread = Arc::new(AtomicBool::new(false)); let stop_thread_clone = stop_thread.clone(); From 36ecc8e729775be2a12e1392066926bed760524a Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Thu, 16 Nov 2023 08:59:40 -0600 Subject: [PATCH 11/18] Re-wrap define_run_body macro parameters Some code hygiene before another parameter is added and rustfmt is eventually used. --- lightning-background-processor/src/lib.rs | 27 ++++++++++++++--------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index c22382490c7..f953ba1c753 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -270,12 +270,13 @@ fn update_scorer<'a, S: 'static + Deref + Send + Sync, SC: 'a + Wri } macro_rules! define_run_body { - ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr, - $channel_manager: ident, $process_channel_manager_events: expr, - $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident, - $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr, - $check_slow_await: expr) - => { { + ( + $persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr, + $channel_manager: ident, $process_channel_manager_events: expr, + $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident, + $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr, + $check_slow_await: expr + ) => { { log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup"); $channel_manager.timer_tick_occurred(); log_trace!($logger, "Rebroadcasting monitor's pending claims on startup"); @@ -650,8 +651,9 @@ where event_handler(event).await; } }; - define_run_body!(persister, - chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await, + define_run_body!( + persister, chain_monitor, + chain_monitor.process_pending_events_async(async_event_handler).await, channel_manager, channel_manager.process_pending_events_async(async_event_handler).await, gossip_sync, peer_manager, logger, scorer, should_break, { let fut = Selector { @@ -673,7 +675,8 @@ where task::Poll::Ready(exit) => { should_break = exit; true }, task::Poll::Pending => false, } - }, mobile_interruptable_platform) + }, mobile_interruptable_platform + ) } #[cfg(feature = "std")] @@ -782,14 +785,16 @@ impl BackgroundProcessor { } event_handler.handle_event(event); }; - define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler), + define_run_body!( + persister, chain_monitor, chain_monitor.process_pending_events(&event_handler), channel_manager, channel_manager.process_pending_events(&event_handler), gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire), { Sleeper::from_two_futures( channel_manager.get_event_or_persistence_needed_future(), chain_monitor.get_update_future() ).wait_timeout(Duration::from_millis(100)); }, - |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false) + |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false + ) }); Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) } } From ce68f223e96767d91d40da67acf61f4a964d60d4 Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Thu, 16 Nov 2023 09:03:18 -0600 Subject: [PATCH 12/18] Re-order define_run_body macro parameters Simply to avoid excessive wrapping when possible. --- lightning-background-processor/src/lib.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index f953ba1c753..fc080eee1f7 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -273,7 +273,7 @@ macro_rules! define_run_body { ( $persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr, $channel_manager: ident, $process_channel_manager_events: expr, - $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident, + $peer_manager: ident, $gossip_sync: ident, $logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr, $check_slow_await: expr ) => { { @@ -655,7 +655,7 @@ where persister, chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await, channel_manager, channel_manager.process_pending_events_async(async_event_handler).await, - gossip_sync, peer_manager, logger, scorer, should_break, { + peer_manager, gossip_sync, logger, scorer, should_break, { let fut = Selector { a: channel_manager.get_event_or_persistence_needed_future(), b: chain_monitor.get_update_future(), @@ -788,7 +788,7 @@ impl BackgroundProcessor { define_run_body!( persister, chain_monitor, chain_monitor.process_pending_events(&event_handler), channel_manager, channel_manager.process_pending_events(&event_handler), - gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire), + peer_manager, gossip_sync, logger, scorer, stop_thread.load(Ordering::Acquire), { Sleeper::from_two_futures( channel_manager.get_event_or_persistence_needed_future(), chain_monitor.get_update_future() From 6ca81ff2bbddaf946251ce9f15cc69526d228ceb Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Thu, 16 Nov 2023 10:07:12 -0600 Subject: [PATCH 13/18] Process OnionMessageHandler events in background OnionMessageHandler implementations now also implement EventsProvider. Update lightning-background-processor to also process any events the PeerManager's OnionMessageHandler provides. --- lightning-background-processor/src/lib.rs | 35 +++++++++++++++++++---- lightning/src/ln/peer_handler.rs | 5 ++++ 2 files changed, 35 insertions(+), 5 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index fc080eee1f7..4ced59abde4 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -273,9 +273,9 @@ macro_rules! define_run_body { ( $persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr, $channel_manager: ident, $process_channel_manager_events: expr, - $peer_manager: ident, $gossip_sync: ident, $logger: ident, $scorer: ident, - $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr, - $check_slow_await: expr + $peer_manager: ident, $process_onion_message_handler_events: expr, $gossip_sync: ident, + $logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr, + $timer_elapsed: expr, $check_slow_await: expr ) => { { log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup"); $channel_manager.timer_tick_occurred(); @@ -292,6 +292,7 @@ macro_rules! define_run_body { loop { $process_channel_manager_events; $process_chain_monitor_events; + $process_onion_message_handler_events; // Note that the PeerManager::process_events may block on ChannelManager's locks, // hence it comes last here. When the ChannelManager finishes whatever it's doing, @@ -655,7 +656,8 @@ where persister, chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await, channel_manager, channel_manager.process_pending_events_async(async_event_handler).await, - peer_manager, gossip_sync, logger, scorer, should_break, { + peer_manager, process_onion_message_handler_events_async(&peer_manager, async_event_handler).await, + gossip_sync, logger, scorer, should_break, { let fut = Selector { a: channel_manager.get_event_or_persistence_needed_future(), b: chain_monitor.get_update_future(), @@ -679,6 +681,27 @@ where ) } +#[cfg(feature = "futures")] +async fn process_onion_message_handler_events_async< + EventHandlerFuture: core::future::Future, + EventHandler: Fn(Event) -> EventHandlerFuture, + PM: 'static + Deref + Send + Sync, +>( + peer_manager: &PM, handler: EventHandler +) +where + PM::Target: APeerManager + Send + Sync, +{ + use lightning::events::EventsProvider; + + let events = core::cell::RefCell::new(Vec::new()); + peer_manager.onion_message_handler().process_pending_events(&|e| events.borrow_mut().push(e)); + + for event in events.into_inner() { + handler(event).await + } +} + #[cfg(feature = "std")] impl BackgroundProcessor { /// Start a background thread that takes care of responsibilities enumerated in the [top-level @@ -788,7 +811,9 @@ impl BackgroundProcessor { define_run_body!( persister, chain_monitor, chain_monitor.process_pending_events(&event_handler), channel_manager, channel_manager.process_pending_events(&event_handler), - peer_manager, gossip_sync, logger, scorer, stop_thread.load(Ordering::Acquire), + peer_manager, + peer_manager.onion_message_handler().process_pending_events(&event_handler), + gossip_sync, logger, scorer, stop_thread.load(Ordering::Acquire), { Sleeper::from_two_futures( channel_manager.get_event_or_persistence_needed_future(), chain_monitor.get_update_future() diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 2fcf1b33047..c78ca879fdb 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -684,6 +684,8 @@ pub trait APeerManager { type NS: Deref; /// Gets a reference to the underlying [`PeerManager`]. fn as_ref(&self) -> &PeerManager; + /// Returns the peer manager's [`OnionMessageHandler`]. + fn onion_message_handler(&self) -> &Self::OMT; } impl @@ -709,6 +711,9 @@ APeerManager for PeerManager where type NST = ::Target; type NS = NS; fn as_ref(&self) -> &PeerManager { self } + fn onion_message_handler(&self) -> &Self::OMT { + self.message_handler.onion_message_handler.deref() + } } /// A PeerManager manages a set of peers, described by their [`SocketDescriptor`] and marshalls From e25af3eb01f3bbf4d2c460a45a2aad93e5a948e5 Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Thu, 16 Nov 2023 10:21:12 -0600 Subject: [PATCH 14/18] Call OnionMessageHandler::timer_tick_occurred lightning-background-processor processes events provided by the PeerManager's OnionMessageHandler for when a connection is needed. If a connection is not established in a reasonable amount of time, drop any buffered onion messages by calling timer_tick_occurred. --- lightning-background-processor/src/lib.rs | 24 +++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 4ced59abde4..5885281a427 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -30,6 +30,7 @@ use lightning::events::{Event, PathFailure}; #[cfg(feature = "std")] use lightning::events::{EventHandler, EventsProvider}; use lightning::ln::channelmanager::ChannelManager; +use lightning::ln::msgs::OnionMessageHandler; use lightning::ln::peer_handler::APeerManager; use lightning::routing::gossip::{NetworkGraph, P2PGossipSync}; use lightning::routing::utxo::UtxoLookup; @@ -104,6 +105,11 @@ const PING_TIMER: u64 = 30; #[cfg(test)] const PING_TIMER: u64 = 1; +#[cfg(not(test))] +const ONION_MESSAGE_HANDLER_TIMER: u64 = 10; +#[cfg(test)] +const ONION_MESSAGE_HANDLER_TIMER: u64 = 1; + /// Prune the network graph of stale entries hourly. const NETWORK_PRUNE_TIMER: u64 = 60 * 60; @@ -283,6 +289,7 @@ macro_rules! define_run_body { $chain_monitor.rebroadcast_pending_claims(); let mut last_freshness_call = $get_timer(FRESHNESS_TIMER); + let mut last_onion_message_handler_call = $get_timer(ONION_MESSAGE_HANDLER_TIMER); let mut last_ping_call = $get_timer(PING_TIMER); let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER); let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER); @@ -336,6 +343,11 @@ macro_rules! define_run_body { $channel_manager.timer_tick_occurred(); last_freshness_call = $get_timer(FRESHNESS_TIMER); } + if $timer_elapsed(&mut last_onion_message_handler_call, ONION_MESSAGE_HANDLER_TIMER) { + log_trace!($logger, "Calling OnionMessageHandler's timer_tick_occurred"); + $peer_manager.onion_message_handler().timer_tick_occurred(); + last_onion_message_handler_call = $get_timer(ONION_MESSAGE_HANDLER_TIMER); + } if await_slow { // On various platforms, we may be starved of CPU cycles for several reasons. // E.g. on iOS, if we've been in the background, we will be entirely paused. @@ -1392,9 +1404,11 @@ mod tests { #[test] fn test_timer_tick_called() { - // Test that `ChannelManager::timer_tick_occurred` is called every `FRESHNESS_TIMER`, - // `ChainMonitor::rebroadcast_pending_claims` is called every `REBROADCAST_TIMER`, and - // `PeerManager::timer_tick_occurred` every `PING_TIMER`. + // Test that: + // - `ChannelManager::timer_tick_occurred` is called every `FRESHNESS_TIMER`, + // - `ChainMonitor::rebroadcast_pending_claims` is called every `REBROADCAST_TIMER`, + // - `PeerManager::timer_tick_occurred` is called every `PING_TIMER`, and + // - `OnionMessageHandler::timer_tick_occurred` is called every `ONION_MESSAGE_HANDLER_TIMER`. let (_, nodes) = create_nodes(1, "test_timer_tick_called"); let data_dir = nodes[0].kv_store.get_data_dir(); let persister = Arc::new(Persister::new(data_dir)); @@ -1405,9 +1419,11 @@ mod tests { let desired_log_1 = "Calling ChannelManager's timer_tick_occurred".to_string(); let desired_log_2 = "Calling PeerManager's timer_tick_occurred".to_string(); let desired_log_3 = "Rebroadcasting monitor's pending claims".to_string(); + let desired_log_4 = "Calling OnionMessageHandler's timer_tick_occurred".to_string(); if log_entries.get(&("lightning_background_processor", desired_log_1)).is_some() && log_entries.get(&("lightning_background_processor", desired_log_2)).is_some() && - log_entries.get(&("lightning_background_processor", desired_log_3)).is_some() { + log_entries.get(&("lightning_background_processor", desired_log_3)).is_some() && + log_entries.get(&("lightning_background_processor", desired_log_4)).is_some() { break } } From 210407e1bb31020be47f4a0b1eac57bb814f6d69 Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Wed, 29 Nov 2023 17:36:50 -0600 Subject: [PATCH 15/18] Reuse MessengerNode in spec_test_vector Additional tests will be added needing a similar node struct, so consolidate its usage. --- .../src/onion_message/functional_tests.rs | 142 ++++++++---------- 1 file changed, 60 insertions(+), 82 deletions(-) diff --git a/lightning/src/onion_message/functional_tests.rs b/lightning/src/onion_message/functional_tests.rs index fcf209cda65..82a08504fe4 100644 --- a/lightning/src/onion_message/functional_tests.rs +++ b/lightning/src/onion_message/functional_tests.rs @@ -28,10 +28,11 @@ use crate::sync::{Arc, Mutex}; use crate::prelude::*; struct MessengerNode { - keys_manager: Arc, + node_id: PublicKey, + entropy_source: Arc, messenger: OnionMessenger< Arc, - Arc, + Arc, Arc, Arc, Arc, @@ -40,12 +41,6 @@ struct MessengerNode { custom_message_handler: Arc, } -impl MessengerNode { - fn get_node_pk(&self) -> PublicKey { - self.keys_manager.get_node_id(Recipient::Node).unwrap() - } -} - struct TestMessageRouter {} impl MessageRouter for TestMessageRouter { @@ -156,30 +151,40 @@ impl CustomOnionMessageHandler for TestCustomMessageHandler { } fn create_nodes(num_messengers: u8) -> Vec { + let secrets = (1..=num_messengers) + .into_iter() + .map(|i| SecretKey::from_slice(&[i; 32]).unwrap()) + .collect(); + create_nodes_using_secrets(secrets) +} + +fn create_nodes_using_secrets(secrets: Vec) -> Vec { let mut nodes = Vec::new(); - for i in 0..num_messengers { + for (i, secret_key) in secrets.into_iter().enumerate() { let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i))); let seed = [i as u8; 32]; - let keys_manager = Arc::new(test_utils::TestKeysInterface::new(&seed, Network::Testnet)); + let entropy_source = Arc::new(test_utils::TestKeysInterface::new(&seed, Network::Testnet)); + let node_signer = Arc::new(test_utils::TestNodeSigner::new(secret_key)); + let message_router = Arc::new(TestMessageRouter {}); let offers_message_handler = Arc::new(TestOffersMessageHandler {}); let custom_message_handler = Arc::new(TestCustomMessageHandler::new()); nodes.push(MessengerNode { - keys_manager: keys_manager.clone(), + node_id: node_signer.get_node_id(Recipient::Node).unwrap(), + entropy_source: entropy_source.clone(), messenger: OnionMessenger::new( - keys_manager.clone(), keys_manager, logger.clone(), message_router, + entropy_source, node_signer, logger.clone(), message_router, offers_message_handler, custom_message_handler.clone() ), custom_message_handler, }); } - for idx in 0..num_messengers - 1 { - let i = idx as usize; + for i in 0..nodes.len() - 1 { let mut features = InitFeatures::empty(); features.set_onion_messages_optional(); let init_msg = msgs::Init { features, networks: None, remote_network_address: None }; - nodes[i].messenger.peer_connected(&nodes[i + 1].get_node_pk(), &init_msg.clone(), true).unwrap(); - nodes[i + 1].messenger.peer_connected(&nodes[i].get_node_pk(), &init_msg.clone(), false).unwrap(); + nodes[i].messenger.peer_connected(&nodes[i + 1].node_id, &init_msg.clone(), true).unwrap(); + nodes[i + 1].messenger.peer_connected(&nodes[i].node_id, &init_msg.clone(), false).unwrap(); } nodes } @@ -189,11 +194,11 @@ fn pass_along_path(path: &Vec) { for node in path.into_iter().skip(1) { let events = prev_node.messenger.release_pending_msgs(); let onion_msg = { - let msgs = events.get(&node.get_node_pk()).unwrap(); + let msgs = events.get(&node.node_id).unwrap(); assert_eq!(msgs.len(), 1); msgs[0].clone() }; - node.messenger.handle_onion_message(&prev_node.get_node_pk(), &onion_msg); + node.messenger.handle_onion_message(&prev_node.node_id, &onion_msg); prev_node = node; } } @@ -205,7 +210,7 @@ fn one_unblinded_hop() { let path = OnionMessagePath { intermediate_nodes: vec![], - destination: Destination::Node(nodes[1].get_node_pk()), + destination: Destination::Node(nodes[1].node_id), addresses: None, }; nodes[0].messenger.send_onion_message_using_path(path, test_msg, None).unwrap(); @@ -219,8 +224,8 @@ fn two_unblinded_hops() { let test_msg = TestCustomMessage::Response; let path = OnionMessagePath { - intermediate_nodes: vec![nodes[1].get_node_pk()], - destination: Destination::Node(nodes[2].get_node_pk()), + intermediate_nodes: vec![nodes[1].node_id], + destination: Destination::Node(nodes[2].node_id), addresses: None, }; nodes[0].messenger.send_onion_message_using_path(path, test_msg, None).unwrap(); @@ -234,7 +239,7 @@ fn one_blinded_hop() { let test_msg = TestCustomMessage::Response; let secp_ctx = Secp256k1::new(); - let blinded_path = BlindedPath::new_for_message(&[nodes[1].get_node_pk()], &*nodes[1].keys_manager, &secp_ctx).unwrap(); + let blinded_path = BlindedPath::new_for_message(&[nodes[1].node_id], &*nodes[1].entropy_source, &secp_ctx).unwrap(); let path = OnionMessagePath { intermediate_nodes: vec![], destination: Destination::BlindedPath(blinded_path), @@ -251,9 +256,9 @@ fn two_unblinded_two_blinded() { let test_msg = TestCustomMessage::Response; let secp_ctx = Secp256k1::new(); - let blinded_path = BlindedPath::new_for_message(&[nodes[3].get_node_pk(), nodes[4].get_node_pk()], &*nodes[4].keys_manager, &secp_ctx).unwrap(); + let blinded_path = BlindedPath::new_for_message(&[nodes[3].node_id, nodes[4].node_id], &*nodes[4].entropy_source, &secp_ctx).unwrap(); let path = OnionMessagePath { - intermediate_nodes: vec![nodes[1].get_node_pk(), nodes[2].get_node_pk()], + intermediate_nodes: vec![nodes[1].node_id, nodes[2].node_id], destination: Destination::BlindedPath(blinded_path), addresses: None, }; @@ -269,7 +274,7 @@ fn three_blinded_hops() { let test_msg = TestCustomMessage::Response; let secp_ctx = Secp256k1::new(); - let blinded_path = BlindedPath::new_for_message(&[nodes[1].get_node_pk(), nodes[2].get_node_pk(), nodes[3].get_node_pk()], &*nodes[3].keys_manager, &secp_ctx).unwrap(); + let blinded_path = BlindedPath::new_for_message(&[nodes[1].node_id, nodes[2].node_id, nodes[3].node_id], &*nodes[3].entropy_source, &secp_ctx).unwrap(); let path = OnionMessagePath { intermediate_nodes: vec![], destination: Destination::BlindedPath(blinded_path), @@ -287,7 +292,7 @@ fn too_big_packet_error() { let nodes = create_nodes(2); let test_msg = TestCustomMessage::Response; - let hop_node_id = nodes[1].get_node_pk(); + let hop_node_id = nodes[1].node_id; let hops = vec![hop_node_id; 400]; let path = OnionMessagePath { intermediate_nodes: hops, @@ -306,7 +311,7 @@ fn we_are_intro_node() { let test_msg = TestCustomMessage::Response; let secp_ctx = Secp256k1::new(); - let blinded_path = BlindedPath::new_for_message(&[nodes[0].get_node_pk(), nodes[1].get_node_pk(), nodes[2].get_node_pk()], &*nodes[2].keys_manager, &secp_ctx).unwrap(); + let blinded_path = BlindedPath::new_for_message(&[nodes[0].node_id, nodes[1].node_id, nodes[2].node_id], &*nodes[2].entropy_source, &secp_ctx).unwrap(); let path = OnionMessagePath { intermediate_nodes: vec![], destination: Destination::BlindedPath(blinded_path), @@ -318,7 +323,7 @@ fn we_are_intro_node() { pass_along_path(&nodes); // Try with a two-hop blinded path where we are the introduction node. - let blinded_path = BlindedPath::new_for_message(&[nodes[0].get_node_pk(), nodes[1].get_node_pk()], &*nodes[1].keys_manager, &secp_ctx).unwrap(); + let blinded_path = BlindedPath::new_for_message(&[nodes[0].node_id, nodes[1].node_id], &*nodes[1].entropy_source, &secp_ctx).unwrap(); let path = OnionMessagePath { intermediate_nodes: vec![], destination: Destination::BlindedPath(blinded_path), @@ -338,7 +343,7 @@ fn invalid_blinded_path_error() { // 0 hops let secp_ctx = Secp256k1::new(); - let mut blinded_path = BlindedPath::new_for_message(&[nodes[1].get_node_pk(), nodes[2].get_node_pk()], &*nodes[2].keys_manager, &secp_ctx).unwrap(); + let mut blinded_path = BlindedPath::new_for_message(&[nodes[1].node_id, nodes[2].node_id], &*nodes[2].entropy_source, &secp_ctx).unwrap(); blinded_path.blinded_hops.clear(); let path = OnionMessagePath { intermediate_nodes: vec![], @@ -357,11 +362,11 @@ fn reply_path() { // Destination::Node let path = OnionMessagePath { - intermediate_nodes: vec![nodes[1].get_node_pk(), nodes[2].get_node_pk()], - destination: Destination::Node(nodes[3].get_node_pk()), + intermediate_nodes: vec![nodes[1].node_id, nodes[2].node_id], + destination: Destination::Node(nodes[3].node_id), addresses: None, }; - let reply_path = BlindedPath::new_for_message(&[nodes[2].get_node_pk(), nodes[1].get_node_pk(), nodes[0].get_node_pk()], &*nodes[0].keys_manager, &secp_ctx).unwrap(); + let reply_path = BlindedPath::new_for_message(&[nodes[2].node_id, nodes[1].node_id, nodes[0].node_id], &*nodes[0].entropy_source, &secp_ctx).unwrap(); nodes[0].messenger.send_onion_message_using_path(path, test_msg.clone(), Some(reply_path)).unwrap(); nodes[3].custom_message_handler.expect_message(TestCustomMessage::Request); pass_along_path(&nodes); @@ -371,13 +376,13 @@ fn reply_path() { pass_along_path(&nodes); // Destination::BlindedPath - let blinded_path = BlindedPath::new_for_message(&[nodes[1].get_node_pk(), nodes[2].get_node_pk(), nodes[3].get_node_pk()], &*nodes[3].keys_manager, &secp_ctx).unwrap(); + let blinded_path = BlindedPath::new_for_message(&[nodes[1].node_id, nodes[2].node_id, nodes[3].node_id], &*nodes[3].entropy_source, &secp_ctx).unwrap(); let path = OnionMessagePath { intermediate_nodes: vec![], destination: Destination::BlindedPath(blinded_path), addresses: None, }; - let reply_path = BlindedPath::new_for_message(&[nodes[2].get_node_pk(), nodes[1].get_node_pk(), nodes[0].get_node_pk()], &*nodes[0].keys_manager, &secp_ctx).unwrap(); + let reply_path = BlindedPath::new_for_message(&[nodes[2].node_id, nodes[1].node_id, nodes[0].node_id], &*nodes[0].entropy_source, &secp_ctx).unwrap(); nodes[0].messenger.send_onion_message_using_path(path, test_msg, Some(reply_path)).unwrap(); nodes[3].custom_message_handler.expect_message(TestCustomMessage::Request); @@ -409,7 +414,7 @@ fn invalid_custom_message_type() { let test_msg = InvalidCustomMessage {}; let path = OnionMessagePath { intermediate_nodes: vec![], - destination: Destination::Node(nodes[1].get_node_pk()), + destination: Destination::Node(nodes[1].node_id), addresses: None, }; let err = nodes[0].messenger.send_onion_message_using_path(path, test_msg, None).unwrap_err(); @@ -422,7 +427,7 @@ fn peer_buffer_full() { let test_msg = TestCustomMessage::Request; let path = OnionMessagePath { intermediate_nodes: vec![], - destination: Destination::Node(nodes[1].get_node_pk()), + destination: Destination::Node(nodes[1].node_id), addresses: None, }; for _ in 0..188 { // Based on MAX_PER_PEER_BUFFER_SIZE in OnionMessenger @@ -442,12 +447,12 @@ fn many_hops() { let mut intermediate_nodes = vec![]; for i in 1..(num_nodes-1) { - intermediate_nodes.push(nodes[i].get_node_pk()); + intermediate_nodes.push(nodes[i].node_id); } let path = OnionMessagePath { intermediate_nodes, - destination: Destination::Node(nodes[num_nodes-1].get_node_pk()), + destination: Destination::Node(nodes[num_nodes-1].node_id), addresses: None, }; nodes[0].messenger.send_onion_message_using_path(path, test_msg, None).unwrap(); @@ -457,37 +462,16 @@ fn many_hops() { #[test] fn spec_test_vector() { - let keys_mgrs = vec![ - (Arc::new(test_utils::TestKeysInterface::new(&[0; 32], Network::Testnet)), // Alice - Arc::new(test_utils::TestNodeSigner::new(SecretKey::from_slice(&>::from_hex("4141414141414141414141414141414141414141414141414141414141414141").unwrap()).unwrap()))), - (Arc::new(test_utils::TestKeysInterface::new(&[1; 32], Network::Testnet)), // Bob - Arc::new(test_utils::TestNodeSigner::new(SecretKey::from_slice(&>::from_hex("4242424242424242424242424242424242424242424242424242424242424242").unwrap()).unwrap()))), - (Arc::new(test_utils::TestKeysInterface::new(&[2; 32], Network::Testnet)), // Carol - Arc::new(test_utils::TestNodeSigner::new(SecretKey::from_slice(&>::from_hex("4343434343434343434343434343434343434343434343434343434343434343").unwrap()).unwrap()))), - (Arc::new(test_utils::TestKeysInterface::new(&[3; 32], Network::Testnet)), // Dave - Arc::new(test_utils::TestNodeSigner::new(SecretKey::from_slice(&>::from_hex("4444444444444444444444444444444444444444444444444444444444444444").unwrap()).unwrap()))), - ]; - let message_router = Arc::new(TestMessageRouter {}); - let offers_message_handler = Arc::new(TestOffersMessageHandler {}); - let custom_message_handler = Arc::new(TestCustomMessageHandler::new()); - let mut nodes = Vec::new(); - for (idx, (entropy_source, node_signer)) in keys_mgrs.iter().enumerate() { - let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", idx))); - nodes.push(OnionMessenger::new( - entropy_source.clone(), node_signer.clone(), logger.clone(), message_router.clone(), - offers_message_handler.clone(), custom_message_handler.clone() - )); - } - for idx in 0..nodes.len() - 1 { - let i = idx as usize; - let mut features = InitFeatures::empty(); - features.set_onion_messages_optional(); - let init_msg = msgs::Init { features, networks: None, remote_network_address: None }; - nodes[i].peer_connected( - &keys_mgrs[i + 1].1.get_node_id(Recipient::Node).unwrap(), &init_msg.clone(), true).unwrap(); - nodes[i + 1].peer_connected( - &keys_mgrs[i].1.get_node_id(Recipient::Node).unwrap(), &init_msg.clone(), false).unwrap(); - } + let secret_keys = [ + "4141414141414141414141414141414141414141414141414141414141414141", // Alice + "4242424242424242424242424242424242424242424242424242424242424242", // Bob + "4343434343434343434343434343434343434343434343434343434343434343", // Carol + "4444444444444444444444444444444444444444444444444444444444444444", // Dave + ] + .iter() + .map(|secret| SecretKey::from_slice(&>::from_hex(secret).unwrap()).unwrap()) + .collect(); + let nodes = create_nodes_using_secrets(secret_keys); // Hardcode the sender->Alice onion message, because it includes an unknown TLV of type 1, which // LDK doesn't support constructing. @@ -506,24 +490,18 @@ fn spec_test_vector() { // which is why the asserted strings differ slightly from the spec. assert_eq!(sender_to_alice_om.encode(), >::from_hex("031195a8046dcbb8e17034bca630065e7a0982e4e36f6f7e5a8d4554e4846fcd9905560002531fe6068134503d2723133227c867ac8fa6c83c537e9a44c3c5bdbdcb1fe33793b828776d70aabbd8cef1a5b52d5a397ae1a20f20435ff6057cd8be339d5aee226660ef73b64afa45dbf2e6e8e26eb96a259b2db5aeecda1ce2e768bbc35d389d7f320ca3d2bd14e2689bef2f5ac0307eaaabc1924eb972c1563d4646ae131accd39da766257ed35ea36e4222527d1db4fa7b2000aab9eafcceed45e28b5560312d4e2299bd8d1e7fe27d10925966c28d497aec400b4630485e82efbabc00550996bdad5d6a9a8c75952f126d14ad2cff91e16198691a7ef2937de83209285f1fb90944b4e46bca7c856a9ce3da10cdf2a7d00dc2bf4f114bc4d3ed67b91cbde558ce9af86dc81fbdc37f8e301b29e23c1466659c62bdbf8cff5d4c20f0fb0851ec72f5e9385dd40fdd2e3ed67ca4517117825665e50a3e26f73c66998daf18e418e8aef9ce2d20da33c3629db2933640e03e7b44c2edf49e9b482db7b475cfd4c617ae1d46d5c24d697846f9f08561eac2b065f9b382501f6eabf07343ed6c602f61eab99cdb52adf63fd44a8db2d3016387ea708fc1c08591e19b4d9984ebe31edbd684c2ea86526dd8c7732b1d8d9117511dc1b643976d356258fce8313b1cb92682f41ab72dedd766f06de375f9edacbcd0ca8c99b865ea2b7952318ea1fd20775a28028b5cf59dece5de14f615b8df254eee63493a5111ea987224bea006d8f1b60d565eef06ac0da194dba2a6d02e79b2f2f34e9ca6e1984a507319d86e9d4fcaeea41b4b9144e0b1826304d4cc1da61cfc5f8b9850697df8adc5e9d6f3acb3219b02764b4909f2b2b22e799fd66c383414a84a7d791b899d4aa663770009eb122f90282c8cb9cda16aba6897edcf9b32951d0080c0f52be3ca011fbec3fb16423deb47744645c3b05fdbd932edf54ba6efd26e65340a8e9b1d1216582e1b30d64524f8ca2d6c5ba63a38f7120a3ed71bed8960bcac2feee2dd41c90be48e3c11ec518eb3d872779e4765a6cc28c6b0fa71ab57ced73ae963cc630edae4258cba2bf25821a6ae049fec2fca28b5dd1bb004d92924b65701b06dcf37f0ccd147a13a03f9bc0f98b7d78fe9058089756931e2cd0e0ed92ec6759d07b248069526c67e9e6ce095118fd3501ba0f858ef030b76c6f6beb11a09317b5ad25343f4b31aef02bc555951bc7791c2c289ecf94d5544dcd6ad3021ed8e8e3db34b2a73e1eedb57b578b068a5401836d6e382110b73690a94328c404af25e85a8d6b808893d1b71af6a31fadd8a8cc6e31ecc0d9ff7e6b91fd03c274a5c1f1ccd25b61150220a3fddb04c91012f5f7a83a5c90deb2470089d6e38cd5914b9c946eca6e9d31bbf8667d36cf87effc3f3ff283c21dd4137bd569fe7cf758feac94053e4baf7338bb592c8b7c291667fadf4a9bf9a2a154a18f612cbc7f851b3f8f2070e0a9d180622ee4f8e81b0ab250d504cef24116a3ff188cc829fcd8610b56343569e8dc997629410d1967ca9dd1d27eec5e01e4375aad16c46faba268524b154850d0d6fe3a76af2c6aa3e97647c51036049ac565370028d6a439a2672b6face56e1b171496c0722cfa22d9da631be359661617c5d5a2d286c5e19db9452c1e21a0107b6400debda2decb0c838f342dd017cdb2dccdf1fe97e3df3f881856b546997a3fed9e279c720145101567dd56be21688fed66bf9759e432a9aa89cbbd225d13cdea4ca05f7a45cfb6a682a3d5b1e18f7e6cf934fae5098108bae9058d05c3387a01d8d02a656d2bfff67e9f46b2d8a6aac28129e52efddf6e552214c3f8a45bc7a912cca9a7fec1d7d06412c6972cb9e3dc518983f56530b8bffe7f92c4b6eb47d4aef59fb513c4653a42de61bc17ad7728e7fc7590ff05a9e991de03f023d0aaf8688ed6170def5091c66576a424ac1cb").unwrap()); let sender_dummy_node_id = PublicKey::from_slice(&[2; 33]).unwrap(); - nodes[0].handle_onion_message(&sender_dummy_node_id, &sender_to_alice_om); - let alice_to_bob_om = nodes[0].next_onion_message_for_peer( - keys_mgrs[1].1.get_node_id(Recipient::Node).unwrap()).unwrap(); + nodes[0].messenger.handle_onion_message(&sender_dummy_node_id, &sender_to_alice_om); + let alice_to_bob_om = nodes[0].messenger.next_onion_message_for_peer(nodes[1].node_id).unwrap(); assert_eq!(alice_to_bob_om.encode(), >::from_hex("031b84c5567b126440995d3ed5aaba0565d71e1834604819ff9c17f5e9d5dd078f05560002536d53f93796cad550b6c68662dca41f7e8c221c31022c64dd1a627b2df3982b25eac261e88369cfc66e1e3b6d9829cb3dcd707046e68a7796065202a7904811bf2608c5611cf74c9eb5371c7eb1a4428bb39a041493e2a568ddb0b2482a6cc6711bc6116cef144ebf988073cb18d9dd4ce2d3aa9de91a7dc6d7c6f11a852024626e66b41ba1158055505dff9cb15aa51099f315564d9ee3ed6349665dc3e209eedf9b5805ee4f69d315df44c80e63d0e2efbdab60ec96f44a3447c6a6ddb1efb6aa4e072bde1dab974081646bfddf3b02daa2b83847d74dd336465e76e9b8fecc2b0414045eeedfc39939088a76820177dd1103c99939e659beb07197bab9f714b30ba8dc83738e9a6553a57888aaeda156c68933a2f4ff35e3f81135076b944ed9856acbfee9c61299a5d1763eadd14bf5eaf71304c8e165e590d7ecbcd25f1650bf5b6c2ad1823b2dc9145e168974ecf6a2273c94decff76d94bc6708007a17f22262d63033c184d0166c14f41b225a956271947aae6ce65890ed8f0d09c6ffe05ec02ee8b9de69d7077a0c5adeb813aabcc1ba8975b73ab06ddea5f4db3c23a1de831602de2b83f990d4133871a1a81e53f86393e6a7c3a7b73f0c099fa72afe26c3027bb9412338a19303bd6e6591c04fb4cde9b832b5f41ae199301ea8c303b5cef3aca599454273565de40e1148156d1f97c1aa9e58459ab318304075e034f5b7899c12587b86776a18a1da96b7bcdc22864fccc4c41538ebce92a6f054d53bf46770273a70e75fe0155cd6d2f2e937465b0825ce3123b8c206fac4c30478fa0f08a97ade7216dce11626401374993213636e93545a31f500562130f2feb04089661ad8c34d5a4cbd2e4e426f37cb094c786198a220a2646ecadc38c04c29ee67b19d662c209a7b30bfecc7fe8bf7d274de0605ee5df4db490f6d32234f6af639d3fce38a2801bcf8d51e9c090a6c6932355a83848129a378095b34e71cb8f51152dc035a4fe8e802fec8de221a02ba5afd6765ce570bef912f87357936ea0b90cb2990f56035e89539ec66e8dbd6ed50835158614096990e019c3eba3d7dd6a77147641c6145e8b17552cd5cf7cd163dd40b9eaeba8c78e03a2cd8c0b7997d6f56d35f38983a202b4eb8a54e14945c4de1a6dde46167e11708b7a5ff5cb9c0f7fc12fae49a012aa90bb1995c038130b749c48e6f1ffb732e92086def42af10fbc460d94abeb7b2fa744a5e9a491d62a08452be8cf2fdef573deedc1fe97098bce889f98200b26f9bb99da9aceddda6d793d8e0e44a2601ef4590cfbb5c3d0197aac691e3d31c20fd8e38764962ca34dabeb85df28feabaf6255d4d0df3d814455186a84423182caa87f9673df770432ad8fdfe78d4888632d460d36d2719e8fa8e4b4ca10d817c5d6bc44a8b2affab8c2ba53b8bf4994d63286c2fad6be04c28661162fa1a67065ecda8ba8c13aee4a8039f4f0110e0c0da2366f178d8903e19136dad6df9d8693ce71f3a270f9941de2a93d9b67bc516207ac1687bf6e00b29723c42c7d9c90df9d5e599dbeb7b73add0a6a2b7aba82f98ac93cb6e60494040445229f983a81c34f7f686d166dfc98ec23a6318d4a02a311ac28d655ea4e0f9c3014984f31e621ef003e98c373561d9040893feece2e0fa6cd2dd565e6fbb2773a2407cb2c3273c306cf71f427f2e551c4092e067cf9869f31ac7c6c80dd52d4f85be57a891a41e34be0d564e39b4af6f46b85339254a58b205fb7e10e7d0470ee73622493f28c08962118c23a1198467e72c4ae1cd482144b419247a5895975ea90d135e2a46ef7e5794a1551a447ff0a0d299b66a7f565cd86531f5e7af5408d85d877ce95b1df12b88b7d5954903a5296325ba478ba1e1a9d1f30a2d5052b2e2889bbd64f72c72bc71d8817288a2").unwrap()); - nodes[1].handle_onion_message( - &keys_mgrs[0].1.get_node_id(Recipient::Node).unwrap(), &alice_to_bob_om); - let bob_to_carol_om = nodes[1].next_onion_message_for_peer( - keys_mgrs[2].1.get_node_id(Recipient::Node).unwrap()).unwrap(); + nodes[1].messenger.handle_onion_message(&nodes[0].node_id, &alice_to_bob_om); + let bob_to_carol_om = nodes[1].messenger.next_onion_message_for_peer(nodes[2].node_id).unwrap(); assert_eq!(bob_to_carol_om.encode(), >::from_hex("02b684babfd400c8dd48b367e9754b8021a3594a34dc94d7101776c7f6a86d0582055600029a77e8523162efa1f4208f4f2050cd5c386ddb6ce6d36235ea569d217ec52209fb85fdf7dbc4786c373eebdba0ddc184cfbe6da624f610e93f62c70f2c56be1090b926359969f040f932c03f53974db5656233bd60af375517d4323002937d784c2c88a564bcefe5c33d3fc21c26d94dfacab85e2e19685fd2ff4c543650958524439b6da68779459aee5ffc9dc543339acec73ff43be4c44ddcbe1c11d50e2411a67056ba9db7939d780f5a86123fdd3abd6f075f7a1d78ab7daf3a82798b7ec1e9f1345bc0d1e935098497067e2ae5a51ece396fcb3bb30871ad73aee51b2418b39f00c8e8e22be4a24f4b624e09cb0414dd46239de31c7be035f71e8da4f5a94d15b44061f46414d3f355069b5c5b874ba56704eb126148a22ec873407fe118972127e63ff80e682e410f297f23841777cec0517e933eaf49d7e34bd203266b42081b3a5193b51ccd34b41342bc67cf73523b741f5c012ba2572e9dda15fbe131a6ac2ff24dc2a7622d58b9f3553092cfae7fae3c8864d95f97aa49ec8edeff5d9f5782471160ee412d82ff6767030fc63eec6a93219a108cd41433834b26676a39846a944998796c79cd1cc460531b8ded659cedfd8aecefd91944f00476f1496daafb4ea6af3feacac1390ea510709783c2aa81a29de27f8959f6284f4684102b17815667cbb0645396ac7d542b878d90c42a1f7f00c4c4eedb2a22a219f38afadb4f1f562b6e000a94e75cc38f535b43a3c0384ccef127fde254a9033a317701c710b2b881065723486e3f4d3eea5e12f374a41565fe43fa137c1a252c2153dde055bb343344c65ad0529010ece29bbd405effbebfe3ba21382b94a60ac1a5ffa03f521792a67b30773cb42e862a8a02a8bbd41b842e115969c87d1ff1f8c7b5726b9f20772dd57fe6e4ea41f959a2a673ffad8e2f2a472c4c8564f3a5a47568dd75294b1c7180c500f7392a7da231b1fe9e525ea2d7251afe9ca52a17fe54a116cb57baca4f55b9b6de915924d644cba9dade4ccc01939d7935749c008bafc6d3ad01cd72341ce5ddf7a5d7d21cf0465ab7a3233433aef21f9acf2bfcdc5a8cc003adc4d82ac9d72b36eb74e05c9aa6ccf439ac92e6b84a3191f0764dd2a2e0b4cc3baa08782b232ad6ecd3ca6029bc08cc094aef3aebddcaddc30070cb6023a689641de86cfc6341c8817215a4650f844cd2ca60f2f10c6e44cfc5f23912684d4457bf4f599879d30b79bf12ef1ab8d34dddc15672b82e56169d4c770f0a2a7a960b1e8790773f5ff7fce92219808f16d061cc85e053971213676d28fb48925e9232b66533dbd938458eb2cc8358159df7a2a2e4cf87500ede2afb8ce963a845b98978edf26a6948d4932a6b95d022004556d25515fe158092ce9a913b4b4a493281393ca731e8d8e5a3449b9d888fc4e73ffcbb9c6d6d66e88e03cf6e81a0496ede6e4e4172b08c000601993af38f80c7f68c9d5fff9e0e215cff088285bf039ca731744efcb7825a272ca724517736b4890f47e306b200aa2543c363e2c9090bcf3cf56b5b86868a62471c7123a41740392fc1d5ab28da18dca66618e9af7b42b62b23aba907779e73ca03ec60e6ab9e0484b9cae6578e0fddb6386cb3468506bf6420298bf4a690947ab582255551d82487f271101c72e19e54872ab47eae144db66bc2f8194a666a5daec08d12822cb83a61946234f2dfdbd6ca7d8763e6818adee7b401fcdb1ac42f9df1ac5cc5ac131f2869013c8d6cd29d4c4e3d05bccd34ca83366d616296acf854fa05149bfd763a25b9938e96826a037fdcb85545439c76df6beed3bdbd01458f9cf984997cc4f0a7ac3cc3f5e1eeb59c09cadcf5a537f16e444149c8f17d4bdaef16c9fbabc5ef06eb0f0bf3a07a1beddfeacdaf1df5582d6dbd6bb808d6ab31bc22e5d7").unwrap()); - nodes[2].handle_onion_message( - &keys_mgrs[1].1.get_node_id(Recipient::Node).unwrap(), &bob_to_carol_om); - let carol_to_dave_om = nodes[2].next_onion_message_for_peer( - keys_mgrs[3].1.get_node_id(Recipient::Node).unwrap()).unwrap(); + nodes[2].messenger.handle_onion_message(&nodes[1].node_id, &bob_to_carol_om); + let carol_to_dave_om = nodes[2].messenger.next_onion_message_for_peer(nodes[3].node_id).unwrap(); assert_eq!(carol_to_dave_om.encode(), >::from_hex("025aaca62db7ce6b46386206ef9930daa32e979a35cb185a41cb951aa7d254b03c055600025550b2910294fa73bda99b9de9c851be9cbb481e23194a1743033630efba546b86e7d838d0f6e9cc0ed088dbf6889f0dceca3bfc745bd77d013a31311fa932a8bf1d28387d9ff521eabc651dee8f861fed609a68551145a451f017ec44978addeee97a423c08445531da488fd1ddc998e9cdbfcea59517b53fbf1833f0bbe6188dba6ca773a247220ec934010daca9cc185e1ceb136803469baac799e27a0d82abe53dc48a06a55d1f643885cc7894677dd20a4e4152577d1ba74b870b9279f065f9b340cedb3ca13b7df218e853e10ccd1b59c42a2acf93f489e170ee4373d30ab158b60fc20d3ba73a1f8c750951d69fb5b9321b968ddc8114936412346aff802df65516e1c09c51ef19849ff36c0199fd88c8bec301a30fef0c7cb497901c038611303f64e4174b5daf42832aa5586b84d2c9b95f382f4269a5d1bd4be898618dc78dfd451170f72ca16decac5b03e60702112e439cadd104fb3bbb3d5023c9b80823fdcd0a212a7e1aaa6eeb027adc7f8b3723031d135a09a979a4802788bb7861c6cc85501fb91137768b70aeab309b27b885686604ffc387004ac4f8c44b101c39bc0597ef7fd957f53fc5051f534b10eb3852100962b5e58254e5558689913c26ad6072ea41f5c5db10077cfc91101d4ae393be274c74297da5cc381cd88d54753aaa7df74b2f9da8d88a72bc9218fcd1f19e4ff4aace182312b9509c5175b6988f044c5756d232af02a451a02ca752f3c52747773acff6fd07d2032e6ce562a2c42105d106eba02d0b1904182cdc8c74875b082d4989d3a7e9f0e73de7c75d357f4af976c28c0b206c5e8123fc2391d078592d0d5ff686fd245c0a2de2e535b7cca99c0a37d432a8657393a9e3ca53eec1692159046ba52cb9bc97107349d8673f74cbc97e231f1108005c8d03e24ca813cea2294b39a7a493bcc062708f1f6cf0074e387e7d50e0666ce784ef4d31cb860f6cad767438d9ea5156ff0ae86e029e0247bf94df75ee0cda4f2006061455cb2eaff513d558863ae334cef7a3d45f55e7cc13153c6719e9901c1d4db6c03f643b69ea4860690305651794284d9e61eb848ccdf5a77794d376f0af62e46d4835acce6fd9eef5df73ebb8ea3bb48629766967f446e744ecc57ff3642c4aa1ccee9a2f72d5caa75fa05787d08b79408fce792485fdecdc25df34820fb061275d70b84ece540b0fc47b2453612be34f2b78133a64e812598fbe225fd85415f8ffe5340ce955b5fd9d67dd88c1c531dde298ed25f96df271558c812c26fa386966c76f03a6ebccbca49ac955916929bd42e134f982dde03f924c464be5fd1ba44f8dc4c3cbc8162755fd1d8f7dc044b15b1a796c53df7d8769bb167b2045b49cc71e08908796c92c16a235717cabc4bb9f60f8f66ff4fff1f9836388a99583acebdff4a7fb20f48eedcd1f4bdcc06ec8b48e35307df51d9bc81d38a94992dd135b30079e1f592da6e98dff496cb1a7776460a26b06395b176f585636ebdf7eab692b227a31d6979f5a6141292698e91346b6c806b90c7c6971e481559cae92ee8f4136f2226861f5c39ddd29bbdb118a35dece03f49a96804caea79a3dacfbf09d65f2611b5622de51d98e18151acb3bb84c09caaa0cc80edfa743a4679f37d6167618ce99e73362fa6f213409931762618a61f1738c071bba5afc1db24fe94afb70c40d731908ab9a505f76f57a7d40e708fd3df0efc5b7cbb2a7b75cd23449e09684a2f0e2bfa0d6176c35f96fe94d92fc9fa4103972781f81cb6e8df7dbeb0fc529c600d768bed3f08828b773d284f69e9a203459d88c12d6df7a75be2455fec128f07a497a2b2bf626cc6272d0419ca663e9dc66b8224227eb796f0246dcae9c5b0b6cfdbbd40c3245a610481c92047c968c9fc92c04b89cc41a0c15355a8f").unwrap()); // Dave handles the onion message but he'll log that he errored while decoding the hop data // because he sees it as an empty onion message (the only contents of the sender's OM is "hello" // with TLV type 1, which Dave ignores because (1) it's odd and he can't understand it and (2) LDK // only attempts to parse custom OM TLVs with type > 64). - nodes[3].handle_onion_message( - &keys_mgrs[2].1.get_node_id(Recipient::Node).unwrap(), &carol_to_dave_om); + nodes[3].messenger.handle_onion_message(&nodes[2].node_id, &carol_to_dave_om); } From 89e630b9183c41ebf9e10913a64becf69097b010 Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Wed, 29 Nov 2023 21:30:15 -0600 Subject: [PATCH 16/18] Test pending connection onion message buffering Add tests for onion message buffering checking that messages are cleared upon disconnection and timed out after MAX_TIMER_TICKS. Also, checks that ConnectionNeeded events are generated. --- .../src/onion_message/functional_tests.rs | 96 +++++++++++++++++-- lightning/src/onion_message/messenger.rs | 7 +- 2 files changed, 93 insertions(+), 10 deletions(-) diff --git a/lightning/src/onion_message/functional_tests.rs b/lightning/src/onion_message/functional_tests.rs index 82a08504fe4..e8e800d5e92 100644 --- a/lightning/src/onion_message/functional_tests.rs +++ b/lightning/src/onion_message/functional_tests.rs @@ -10,8 +10,9 @@ //! Onion message testing and test utilities live here. use crate::blinded_path::BlindedPath; +use crate::events::{Event, EventsProvider}; use crate::ln::features::InitFeatures; -use crate::ln::msgs::{self, DecodeError, OnionMessageHandler}; +use crate::ln::msgs::{self, DecodeError, OnionMessageHandler, SocketAddress}; use crate::sign::{NodeSigner, Recipient}; use crate::util::ser::{FixedLengthReader, LengthReadable, Writeable, Writer}; use crate::util::test_utils; @@ -50,7 +51,7 @@ impl MessageRouter for TestMessageRouter { Ok(OnionMessagePath { intermediate_nodes: vec![], destination, - addresses: None, + addresses: Some(vec![SocketAddress::TcpIpV4 { addr: [127, 0, 0, 1], port: 1000 }]), }) } } @@ -180,15 +181,30 @@ fn create_nodes_using_secrets(secrets: Vec) -> Vec { }); } for i in 0..nodes.len() - 1 { - let mut features = InitFeatures::empty(); - features.set_onion_messages_optional(); - let init_msg = msgs::Init { features, networks: None, remote_network_address: None }; - nodes[i].messenger.peer_connected(&nodes[i + 1].node_id, &init_msg.clone(), true).unwrap(); - nodes[i + 1].messenger.peer_connected(&nodes[i].node_id, &init_msg.clone(), false).unwrap(); + connect_peers(&nodes[i], &nodes[i + 1]); } nodes } +fn connect_peers(node_a: &MessengerNode, node_b: &MessengerNode) { + let mut features = InitFeatures::empty(); + features.set_onion_messages_optional(); + let init_msg = msgs::Init { features, networks: None, remote_network_address: None }; + node_a.messenger.peer_connected(&node_b.node_id, &init_msg.clone(), true).unwrap(); + node_b.messenger.peer_connected(&node_a.node_id, &init_msg.clone(), false).unwrap(); +} + +fn disconnect_peers(node_a: &MessengerNode, node_b: &MessengerNode) { + node_a.messenger.peer_disconnected(&node_b.node_id); + node_b.messenger.peer_disconnected(&node_a.node_id); +} + +fn release_events(node: &MessengerNode) -> Vec { + let events = core::cell::RefCell::new(Vec::new()); + node.messenger.process_pending_events(&|e| events.borrow_mut().push(e)); + events.into_inner() +} + fn pass_along_path(path: &Vec) { let mut prev_node = &path[0]; for node in path.into_iter().skip(1) { @@ -460,6 +476,72 @@ fn many_hops() { pass_along_path(&nodes); } +#[test] +fn requests_peer_connection_for_buffered_messages() { + let nodes = create_nodes(3); + let message = TestCustomMessage::Request; + let secp_ctx = Secp256k1::new(); + let blinded_path = BlindedPath::new_for_message( + &[nodes[1].node_id, nodes[2].node_id], &*nodes[0].entropy_source, &secp_ctx + ).unwrap(); + let destination = Destination::BlindedPath(blinded_path); + + // Buffer an onion message for a connected peer + nodes[0].messenger.send_onion_message(message.clone(), destination.clone(), None).unwrap(); + assert!(release_events(&nodes[0]).is_empty()); + assert!(nodes[0].messenger.next_onion_message_for_peer(nodes[1].node_id).is_some()); + assert!(nodes[0].messenger.next_onion_message_for_peer(nodes[1].node_id).is_none()); + + // Buffer an onion message for a disconnected peer + disconnect_peers(&nodes[0], &nodes[1]); + assert!(nodes[0].messenger.next_onion_message_for_peer(nodes[1].node_id).is_none()); + nodes[0].messenger.send_onion_message(message, destination, None).unwrap(); + + // Check that a ConnectionNeeded event for the peer is provided + let events = release_events(&nodes[0]); + assert_eq!(events.len(), 1); + match &events[0] { + Event::ConnectionNeeded { node_id, .. } => assert_eq!(*node_id, nodes[1].node_id), + e => panic!("Unexpected event: {:?}", e), + } + + // Release the buffered onion message when reconnected + connect_peers(&nodes[0], &nodes[1]); + assert!(nodes[0].messenger.next_onion_message_for_peer(nodes[1].node_id).is_some()); + assert!(nodes[0].messenger.next_onion_message_for_peer(nodes[1].node_id).is_none()); +} + +#[test] +fn drops_buffered_messages_waiting_for_peer_connection() { + let nodes = create_nodes(3); + let message = TestCustomMessage::Request; + let secp_ctx = Secp256k1::new(); + let blinded_path = BlindedPath::new_for_message( + &[nodes[1].node_id, nodes[2].node_id], &*nodes[0].entropy_source, &secp_ctx + ).unwrap(); + let destination = Destination::BlindedPath(blinded_path); + + // Buffer an onion message for a disconnected peer + disconnect_peers(&nodes[0], &nodes[1]); + nodes[0].messenger.send_onion_message(message, destination, None).unwrap(); + + // Release the event so the timer can start ticking + let events = release_events(&nodes[0]); + assert_eq!(events.len(), 1); + match &events[0] { + Event::ConnectionNeeded { node_id, .. } => assert_eq!(*node_id, nodes[1].node_id), + e => panic!("Unexpected event: {:?}", e), + } + + // Drop buffered messages for a disconnected peer after some timer ticks + use crate::onion_message::messenger::MAX_TIMER_TICKS; + for _ in 0..=MAX_TIMER_TICKS { + nodes[0].messenger.timer_tick_occurred(); + } + connect_peers(&nodes[0], &nodes[1]); + assert!(nodes[0].messenger.next_onion_message_for_peer(nodes[1].node_id).is_none()); +} + #[test] fn spec_test_vector() { let secret_keys = [ diff --git a/lightning/src/onion_message/messenger.rs b/lightning/src/onion_message/messenger.rs index c0f1130a45f..b7172452464 100644 --- a/lightning/src/onion_message/messenger.rs +++ b/lightning/src/onion_message/messenger.rs @@ -40,6 +40,8 @@ use crate::io; use crate::sync::{Arc, Mutex}; use crate::prelude::*; +pub(super) const MAX_TIMER_TICKS: usize = 2; + /// A sender, receiver and forwarder of [`OnionMessage`]s. /// /// # Handling Messages @@ -166,8 +168,8 @@ enum OnionMessageBuffer { /// Messages for a node connected as a peer. ConnectedPeer(VecDeque), - /// Messages for a node that is not yet connected, which are dropped after a certain number of - /// timer ticks defined in [`OnionMessenger::timer_tick_occurred`] and tracked here. + /// Messages for a node that is not yet connected, which are dropped after [`MAX_TIMER_TICKS`] + /// and tracked here. PendingConnection(VecDeque, Option>, usize), } @@ -901,7 +903,6 @@ where } fn timer_tick_occurred(&self) { - const MAX_TIMER_TICKS: usize = 2; let mut message_buffers = self.message_buffers.lock().unwrap(); // Drop any pending recipients since the last call to avoid retaining buffered messages for From d46519bbd0edb76a2d6dd8d33dac057d49576a46 Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Wed, 29 Nov 2023 21:42:48 -0600 Subject: [PATCH 17/18] Remove superfluous space from where clause --- lightning/src/onion_message/messenger.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lightning/src/onion_message/messenger.rs b/lightning/src/onion_message/messenger.rs index b7172452464..8d4fb044f5a 100644 --- a/lightning/src/onion_message/messenger.rs +++ b/lightning/src/onion_message/messenger.rs @@ -151,7 +151,7 @@ where L::Target: Logger, MR::Target: MessageRouter, OMH::Target: OffersMessageHandler, - CMH:: Target: CustomOnionMessageHandler, + CMH::Target: CustomOnionMessageHandler, { entropy_source: ES, node_signer: NS, From 0b8311643697cb3dd7c8e6a53ade86003432e10e Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Fri, 1 Dec 2023 13:22:43 -0600 Subject: [PATCH 18/18] Rename OnionMessageBuffer to OnionMessageRecipient --- lightning/src/onion_message/messenger.rs | 88 ++++++++++++------------ 1 file changed, 44 insertions(+), 44 deletions(-) diff --git a/lightning/src/onion_message/messenger.rs b/lightning/src/onion_message/messenger.rs index 8d4fb044f5a..05ea7a2853c 100644 --- a/lightning/src/onion_message/messenger.rs +++ b/lightning/src/onion_message/messenger.rs @@ -156,7 +156,7 @@ where entropy_source: ES, node_signer: NS, logger: L, - message_buffers: Mutex>, + message_recipients: Mutex>, secp_ctx: Secp256k1, message_router: MR, offers_handler: OMH, @@ -164,7 +164,7 @@ where } /// [`OnionMessage`]s buffered to be sent. -enum OnionMessageBuffer { +enum OnionMessageRecipient { /// Messages for a node connected as a peer. ConnectedPeer(VecDeque), @@ -173,22 +173,22 @@ enum OnionMessageBuffer { PendingConnection(VecDeque, Option>, usize), } -impl OnionMessageBuffer { +impl OnionMessageRecipient { fn pending_connection(addresses: Vec) -> Self { Self::PendingConnection(VecDeque::new(), Some(addresses), 0) } fn pending_messages(&self) -> &VecDeque { match self { - OnionMessageBuffer::ConnectedPeer(pending_messages) => pending_messages, - OnionMessageBuffer::PendingConnection(pending_messages, _, _) => pending_messages, + OnionMessageRecipient::ConnectedPeer(pending_messages) => pending_messages, + OnionMessageRecipient::PendingConnection(pending_messages, _, _) => pending_messages, } } fn enqueue_message(&mut self, message: OnionMessage) { let pending_messages = match self { - OnionMessageBuffer::ConnectedPeer(pending_messages) => pending_messages, - OnionMessageBuffer::PendingConnection(pending_messages, _, _) => pending_messages, + OnionMessageRecipient::ConnectedPeer(pending_messages) => pending_messages, + OnionMessageRecipient::PendingConnection(pending_messages, _, _) => pending_messages, }; pending_messages.push_back(message); @@ -196,8 +196,8 @@ impl OnionMessageBuffer { fn dequeue_message(&mut self) -> Option { let pending_messages = match self { - OnionMessageBuffer::ConnectedPeer(pending_messages) => pending_messages, - OnionMessageBuffer::PendingConnection(pending_messages, _, _) => { + OnionMessageRecipient::ConnectedPeer(pending_messages) => pending_messages, + OnionMessageRecipient::PendingConnection(pending_messages, _, _) => { debug_assert!(false); pending_messages }, @@ -209,18 +209,18 @@ impl OnionMessageBuffer { #[cfg(test)] fn release_pending_messages(&mut self) -> VecDeque { let pending_messages = match self { - OnionMessageBuffer::ConnectedPeer(pending_messages) => pending_messages, - OnionMessageBuffer::PendingConnection(pending_messages, _, _) => pending_messages, + OnionMessageRecipient::ConnectedPeer(pending_messages) => pending_messages, + OnionMessageRecipient::PendingConnection(pending_messages, _, _) => pending_messages, }; core::mem::take(pending_messages) } fn mark_connected(&mut self) { - if let OnionMessageBuffer::PendingConnection(pending_messages, _, _) = self { + if let OnionMessageRecipient::PendingConnection(pending_messages, _, _) = self { let mut new_pending_messages = VecDeque::new(); core::mem::swap(pending_messages, &mut new_pending_messages); - *self = OnionMessageBuffer::ConnectedPeer(new_pending_messages); + *self = OnionMessageRecipient::ConnectedPeer(new_pending_messages); } } } @@ -631,7 +631,7 @@ where OnionMessenger { entropy_source, node_signer, - message_buffers: Mutex::new(HashMap::new()), + message_recipients: Mutex::new(HashMap::new()), secp_ctx, logger, message_router, @@ -687,9 +687,9 @@ where .get_node_id(Recipient::Node) .map_err(|_| SendError::GetNodeIdFailed)?; - let peers = self.message_buffers.lock().unwrap() + let peers = self.message_recipients.lock().unwrap() .iter() - .filter(|(_, buffer)| matches!(buffer, OnionMessageBuffer::ConnectedPeer(_))) + .filter(|(_, recipient)| matches!(recipient, OnionMessageRecipient::ConnectedPeer(_))) .map(|(node_id, _)| *node_id) .collect(); @@ -708,16 +708,16 @@ where &self.entropy_source, &self.node_signer, &self.secp_ctx, path, contents, reply_path )?; - let mut message_buffers = self.message_buffers.lock().unwrap(); - if outbound_buffer_full(&first_node_id, &message_buffers) { + let mut message_recipients = self.message_recipients.lock().unwrap(); + if outbound_buffer_full(&first_node_id, &message_recipients) { return Err(SendError::BufferFull); } - match message_buffers.entry(first_node_id) { + match message_recipients.entry(first_node_id) { hash_map::Entry::Vacant(e) => match addresses { None => Err(SendError::InvalidFirstHop(first_node_id)), Some(addresses) => { - e.insert(OnionMessageBuffer::pending_connection(addresses)) + e.insert(OnionMessageRecipient::pending_connection(addresses)) .enqueue_message(onion_message); Ok(SendSuccess::BufferedAwaitingConnection(first_node_id)) }, @@ -755,18 +755,18 @@ where #[cfg(test)] pub(super) fn release_pending_msgs(&self) -> HashMap> { - let mut message_buffers = self.message_buffers.lock().unwrap(); + let mut message_recipients = self.message_recipients.lock().unwrap(); let mut msgs = HashMap::new(); // We don't want to disconnect the peers by removing them entirely from the original map, so we // release the pending message buffers individually. - for (peer_node_id, buffer) in &mut *message_buffers { - msgs.insert(*peer_node_id, buffer.release_pending_messages()); + for (node_id, recipient) in &mut *message_recipients { + msgs.insert(*node_id, recipient.release_pending_messages()); } msgs } } -fn outbound_buffer_full(peer_node_id: &PublicKey, buffer: &HashMap) -> bool { +fn outbound_buffer_full(peer_node_id: &PublicKey, buffer: &HashMap) -> bool { const MAX_TOTAL_BUFFER_SIZE: usize = (1 << 20) * 128; const MAX_PER_PEER_BUFFER_SIZE: usize = (1 << 10) * 256; let mut total_buffered_bytes = 0; @@ -800,8 +800,8 @@ where CMH::Target: CustomOnionMessageHandler, { fn process_pending_events(&self, handler: H) where H::Target: EventHandler { - for (node_id, recipient) in self.message_buffers.lock().unwrap().iter_mut() { - if let OnionMessageBuffer::PendingConnection(_, addresses, _) = recipient { + for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() { + if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient { if let Some(addresses) = addresses.take() { handler.handle_event(Event::ConnectionNeeded { node_id: *node_id, addresses }); } @@ -852,20 +852,20 @@ where } }, Ok(PeeledOnion::Forward(next_node_id, onion_message)) => { - let mut message_buffers = self.message_buffers.lock().unwrap(); - if outbound_buffer_full(&next_node_id, &message_buffers) { + let mut message_recipients = self.message_recipients.lock().unwrap(); + if outbound_buffer_full(&next_node_id, &message_recipients) { log_trace!(self.logger, "Dropping forwarded onion message to peer {:?}: outbound buffer full", next_node_id); return } #[cfg(fuzzing)] - message_buffers + message_recipients .entry(next_node_id) - .or_insert_with(|| OnionMessageBuffer::ConnectedPeer(VecDeque::new())); + .or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new())); - match message_buffers.entry(next_node_id) { + match message_recipients.entry(next_node_id) { hash_map::Entry::Occupied(mut e) if matches!( - e.get(), OnionMessageBuffer::ConnectedPeer(..) + e.get(), OnionMessageRecipient::ConnectedPeer(..) ) => { e.get_mut().enqueue_message(onion_message); log_trace!(self.logger, "Forwarding an onion message to peer {}", next_node_id); @@ -884,39 +884,39 @@ where fn peer_connected(&self, their_node_id: &PublicKey, init: &msgs::Init, _inbound: bool) -> Result<(), ()> { if init.features.supports_onion_messages() { - self.message_buffers.lock().unwrap() + self.message_recipients.lock().unwrap() .entry(*their_node_id) - .or_insert_with(|| OnionMessageBuffer::ConnectedPeer(VecDeque::new())) + .or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new())) .mark_connected(); } else { - self.message_buffers.lock().unwrap().remove(their_node_id); + self.message_recipients.lock().unwrap().remove(their_node_id); } Ok(()) } fn peer_disconnected(&self, their_node_id: &PublicKey) { - match self.message_buffers.lock().unwrap().remove(their_node_id) { - Some(OnionMessageBuffer::ConnectedPeer(..)) => {}, + match self.message_recipients.lock().unwrap().remove(their_node_id) { + Some(OnionMessageRecipient::ConnectedPeer(..)) => {}, _ => debug_assert!(false), } } fn timer_tick_occurred(&self) { - let mut message_buffers = self.message_buffers.lock().unwrap(); + let mut message_recipients = self.message_recipients.lock().unwrap(); // Drop any pending recipients since the last call to avoid retaining buffered messages for // too long. - message_buffers.retain(|_, recipient| match recipient { - OnionMessageBuffer::PendingConnection(_, None, ticks) => *ticks < MAX_TIMER_TICKS, - OnionMessageBuffer::PendingConnection(_, Some(_), _) => true, + message_recipients.retain(|_, recipient| match recipient { + OnionMessageRecipient::PendingConnection(_, None, ticks) => *ticks < MAX_TIMER_TICKS, + OnionMessageRecipient::PendingConnection(_, Some(_), _) => true, _ => true, }); // Increment a timer tick for pending recipients so that their buffered messages are dropped // at MAX_TIMER_TICKS. - for recipient in message_buffers.values_mut() { - if let OnionMessageBuffer::PendingConnection(_, None, ticks) = recipient { + for recipient in message_recipients.values_mut() { + if let OnionMessageRecipient::PendingConnection(_, None, ticks) = recipient { *ticks += 1; } } @@ -960,7 +960,7 @@ where ); } - self.message_buffers.lock().unwrap() + self.message_recipients.lock().unwrap() .get_mut(&peer_node_id) .and_then(|buffer| buffer.dequeue_message()) }