diff --git a/fuzz/src/onion_message.rs b/fuzz/src/onion_message.rs index de7b8b6b4c6..76cd98cea2d 100644 --- a/fuzz/src/onion_message.rs +++ b/fuzz/src/onion_message.rs @@ -79,6 +79,7 @@ impl MessageRouter for TestMessageRouter { Ok(OnionMessagePath { intermediate_nodes: vec![], destination, + addresses: None, }) } } @@ -269,7 +270,9 @@ mod tests { "Received an onion message with path_id None and a reply_path: Custom(TestCustomMessage)" .to_string())), Some(&1)); assert_eq!(log_entries.get(&("lightning::onion_message::messenger".to_string(), - "Sending onion message: TestCustomMessage".to_string())), Some(&1)); + "Constructing onion message when responding to Custom onion message with path_id None: TestCustomMessage".to_string())), Some(&1)); + assert_eq!(log_entries.get(&("lightning::onion_message::messenger".to_string(), + "Buffered onion message when responding to Custom onion message with path_id None".to_string())), Some(&1)); } let two_unblinded_hops_om = "\ diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 95796e86086..5885281a427 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -30,6 +30,7 @@ use lightning::events::{Event, PathFailure}; #[cfg(feature = "std")] use lightning::events::{EventHandler, EventsProvider}; use lightning::ln::channelmanager::ChannelManager; +use lightning::ln::msgs::OnionMessageHandler; use lightning::ln::peer_handler::APeerManager; use lightning::routing::gossip::{NetworkGraph, P2PGossipSync}; use lightning::routing::utxo::UtxoLookup; @@ -104,6 +105,11 @@ const PING_TIMER: u64 = 30; #[cfg(test)] const PING_TIMER: u64 = 1; +#[cfg(not(test))] +const ONION_MESSAGE_HANDLER_TIMER: u64 = 10; +#[cfg(test)] +const ONION_MESSAGE_HANDLER_TIMER: u64 = 1; + /// Prune the network graph of stale entries hourly. const NETWORK_PRUNE_TIMER: u64 = 60 * 60; @@ -270,18 +276,20 @@ fn update_scorer<'a, S: 'static + Deref + Send + Sync, SC: 'a + Wri } macro_rules! define_run_body { - ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr, - $channel_manager: ident, $process_channel_manager_events: expr, - $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident, - $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr, - $check_slow_await: expr) - => { { + ( + $persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr, + $channel_manager: ident, $process_channel_manager_events: expr, + $peer_manager: ident, $process_onion_message_handler_events: expr, $gossip_sync: ident, + $logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr, + $timer_elapsed: expr, $check_slow_await: expr + ) => { { log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup"); $channel_manager.timer_tick_occurred(); log_trace!($logger, "Rebroadcasting monitor's pending claims on startup"); $chain_monitor.rebroadcast_pending_claims(); let mut last_freshness_call = $get_timer(FRESHNESS_TIMER); + let mut last_onion_message_handler_call = $get_timer(ONION_MESSAGE_HANDLER_TIMER); let mut last_ping_call = $get_timer(PING_TIMER); let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER); let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER); @@ -291,6 +299,7 @@ macro_rules! define_run_body { loop { $process_channel_manager_events; $process_chain_monitor_events; + $process_onion_message_handler_events; // Note that the PeerManager::process_events may block on ChannelManager's locks, // hence it comes last here. When the ChannelManager finishes whatever it's doing, @@ -334,6 +343,11 @@ macro_rules! define_run_body { $channel_manager.timer_tick_occurred(); last_freshness_call = $get_timer(FRESHNESS_TIMER); } + if $timer_elapsed(&mut last_onion_message_handler_call, ONION_MESSAGE_HANDLER_TIMER) { + log_trace!($logger, "Calling OnionMessageHandler's timer_tick_occurred"); + $peer_manager.onion_message_handler().timer_tick_occurred(); + last_onion_message_handler_call = $get_timer(ONION_MESSAGE_HANDLER_TIMER); + } if await_slow { // On various platforms, we may be starved of CPU cycles for several reasons. // E.g. on iOS, if we've been in the background, we will be entirely paused. @@ -603,8 +617,7 @@ pub async fn process_events_async< CM: 'static + Deref> + Send + Sync, PGS: 'static + Deref> + Send + Sync, RGS: 'static + Deref> + Send, - APM: APeerManager + Send + Sync, - PM: 'static + Deref + Send + Sync, + PM: 'static + Deref + Send + Sync, S: 'static + Deref + Send + Sync, SC: for<'b> WriteableScore<'b>, SleepFuture: core::future::Future + core::marker::Unpin, @@ -627,6 +640,7 @@ where L::Target: 'static + Logger, P::Target: 'static + Persist<::EcdsaSigner>, PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>, + PM::Target: APeerManager + Send + Sync, { let mut should_break = false; let async_event_handler = |event| { @@ -650,10 +664,12 @@ where event_handler(event).await; } }; - define_run_body!(persister, - chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await, + define_run_body!( + persister, chain_monitor, + chain_monitor.process_pending_events_async(async_event_handler).await, channel_manager, channel_manager.process_pending_events_async(async_event_handler).await, - gossip_sync, peer_manager, logger, scorer, should_break, { + peer_manager, process_onion_message_handler_events_async(&peer_manager, async_event_handler).await, + gossip_sync, logger, scorer, should_break, { let fut = Selector { a: channel_manager.get_event_or_persistence_needed_future(), b: chain_monitor.get_update_future(), @@ -673,7 +689,29 @@ where task::Poll::Ready(exit) => { should_break = exit; true }, task::Poll::Pending => false, } - }, mobile_interruptable_platform) + }, mobile_interruptable_platform + ) +} + +#[cfg(feature = "futures")] +async fn process_onion_message_handler_events_async< + EventHandlerFuture: core::future::Future, + EventHandler: Fn(Event) -> EventHandlerFuture, + PM: 'static + Deref + Send + Sync, +>( + peer_manager: &PM, handler: EventHandler +) +where + PM::Target: APeerManager + Send + Sync, +{ + use lightning::events::EventsProvider; + + let events = core::cell::RefCell::new(Vec::new()); + peer_manager.onion_message_handler().process_pending_events(&|e| events.borrow_mut().push(e)); + + for event in events.into_inner() { + handler(event).await + } } #[cfg(feature = "std")] @@ -742,8 +780,7 @@ impl BackgroundProcessor { CM: 'static + Deref> + Send + Sync, PGS: 'static + Deref> + Send + Sync, RGS: 'static + Deref> + Send, - APM: APeerManager + Send + Sync, - PM: 'static + Deref + Send + Sync, + PM: 'static + Deref + Send + Sync, S: 'static + Deref + Send + Sync, SC: for <'b> WriteableScore<'b>, >( @@ -763,6 +800,7 @@ impl BackgroundProcessor { L::Target: 'static + Logger, P::Target: 'static + Persist<::EcdsaSigner>, PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>, + PM::Target: APeerManager + Send + Sync, { let stop_thread = Arc::new(AtomicBool::new(false)); let stop_thread_clone = stop_thread.clone(); @@ -782,14 +820,18 @@ impl BackgroundProcessor { } event_handler.handle_event(event); }; - define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler), + define_run_body!( + persister, chain_monitor, chain_monitor.process_pending_events(&event_handler), channel_manager, channel_manager.process_pending_events(&event_handler), - gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire), + peer_manager, + peer_manager.onion_message_handler().process_pending_events(&event_handler), + gossip_sync, logger, scorer, stop_thread.load(Ordering::Acquire), { Sleeper::from_two_futures( channel_manager.get_event_or_persistence_needed_future(), chain_monitor.get_update_future() ).wait_timeout(Duration::from_millis(100)); }, - |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false) + |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false + ) }); Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) } } @@ -1362,9 +1404,11 @@ mod tests { #[test] fn test_timer_tick_called() { - // Test that `ChannelManager::timer_tick_occurred` is called every `FRESHNESS_TIMER`, - // `ChainMonitor::rebroadcast_pending_claims` is called every `REBROADCAST_TIMER`, and - // `PeerManager::timer_tick_occurred` every `PING_TIMER`. + // Test that: + // - `ChannelManager::timer_tick_occurred` is called every `FRESHNESS_TIMER`, + // - `ChainMonitor::rebroadcast_pending_claims` is called every `REBROADCAST_TIMER`, + // - `PeerManager::timer_tick_occurred` is called every `PING_TIMER`, and + // - `OnionMessageHandler::timer_tick_occurred` is called every `ONION_MESSAGE_HANDLER_TIMER`. let (_, nodes) = create_nodes(1, "test_timer_tick_called"); let data_dir = nodes[0].kv_store.get_data_dir(); let persister = Arc::new(Persister::new(data_dir)); @@ -1375,9 +1419,11 @@ mod tests { let desired_log_1 = "Calling ChannelManager's timer_tick_occurred".to_string(); let desired_log_2 = "Calling PeerManager's timer_tick_occurred".to_string(); let desired_log_3 = "Rebroadcasting monitor's pending claims".to_string(); + let desired_log_4 = "Calling OnionMessageHandler's timer_tick_occurred".to_string(); if log_entries.get(&("lightning_background_processor", desired_log_1)).is_some() && log_entries.get(&("lightning_background_processor", desired_log_2)).is_some() && - log_entries.get(&("lightning_background_processor", desired_log_3)).is_some() { + log_entries.get(&("lightning_background_processor", desired_log_3)).is_some() && + log_entries.get(&("lightning_background_processor", desired_log_4)).is_some() { break } } diff --git a/lightning/src/events/mod.rs b/lightning/src/events/mod.rs index 4e04a3634e1..76e5f25c0e5 100644 --- a/lightning/src/events/mod.rs +++ b/lightning/src/events/mod.rs @@ -530,6 +530,25 @@ pub enum Event { /// serialized prior to LDK version 0.0.117. sender_intended_total_msat: Option, }, + /// Indicates that a peer connection with a node is needed in order to send an [`OnionMessage`]. + /// + /// Typically, this happens when a [`MessageRouter`] is unable to find a complete path to a + /// [`Destination`]. Once a connection is established, any messages buffered by an + /// [`OnionMessageHandler`] may be sent. + /// + /// This event will not be generated for onion message forwards; only for sends including + /// replies. Handlers should connect to the node otherwise any buffered messages may be lost. + /// + /// [`OnionMessage`]: msgs::OnionMessage + /// [`MessageRouter`]: crate::onion_message::MessageRouter + /// [`Destination`]: crate::onion_message::Destination + /// [`OnionMessageHandler`]: crate::ln::msgs::OnionMessageHandler + ConnectionNeeded { + /// The node id for the node needing a connection. + node_id: PublicKey, + /// Sockets for connecting to the node. + addresses: Vec, + }, /// Indicates a request for an invoice failed to yield a response in a reasonable amount of time /// or was explicitly abandoned by [`ChannelManager::abandon_payment`]. This may be for an /// [`InvoiceRequest`] sent for an [`Offer`] or for a [`Refund`] that hasn't been redeemed. @@ -1190,6 +1209,10 @@ impl Writeable for Event { (0, payment_id, required), }) }, + &Event::ConnectionNeeded { .. } => { + 35u8.write(writer)?; + // Never write ConnectionNeeded events as buffered onion messages aren't serialized. + }, // Note that, going forward, all new events must only write data inside of // `write_tlv_fields`. Versions 0.0.101+ will ignore odd-numbered events that write // data via `write_tlv_fields`. @@ -1200,8 +1223,7 @@ impl Writeable for Event { impl MaybeReadable for Event { fn read(reader: &mut R) -> Result, msgs::DecodeError> { match Readable::read(reader)? { - // Note that we do not write a length-prefixed TLV for FundingGenerationReady events, - // unlike all other events, thus we return immediately here. + // Note that we do not write a length-prefixed TLV for FundingGenerationReady events. 0u8 => Ok(None), 1u8 => { let f = || { @@ -1588,6 +1610,8 @@ impl MaybeReadable for Event { }; f() }, + // Note that we do not write a length-prefixed TLV for ConnectionNeeded events. + 35u8 => Ok(None), // Versions prior to 0.0.100 did not ignore odd types, instead returning InvalidValue. // Version 0.0.100 failed to properly ignore odd types, possibly resulting in corrupt // reads. diff --git a/lightning/src/ln/msgs.rs b/lightning/src/ln/msgs.rs index 2d871b354a2..b877565e017 100644 --- a/lightning/src/ln/msgs.rs +++ b/lightning/src/ln/msgs.rs @@ -52,7 +52,7 @@ use core::fmt::Display; use crate::io::{self, Cursor, Read}; use crate::io_extras::read_to_end; -use crate::events::MessageSendEventsProvider; +use crate::events::{EventsProvider, MessageSendEventsProvider}; use crate::util::chacha20poly1305rfc::ChaChaPolyReadAdapter; use crate::util::logger; use crate::util::ser::{LengthReadable, LengthReadableArgs, Readable, ReadableArgs, Writeable, Writer, WithoutLength, FixedLengthReader, HighZeroBytesDroppedBigSize, Hostname, TransactionU16LenLimited, BigSize}; @@ -1631,7 +1631,7 @@ pub trait RoutingMessageHandler : MessageSendEventsProvider { } /// A handler for received [`OnionMessage`]s and for providing generated ones to send. -pub trait OnionMessageHandler { +pub trait OnionMessageHandler: EventsProvider { /// Handle an incoming `onion_message` message from the given peer. fn handle_onion_message(&self, peer_node_id: &PublicKey, msg: &OnionMessage); @@ -1650,6 +1650,10 @@ pub trait OnionMessageHandler { /// drop and refuse to forward onion messages to this peer. fn peer_disconnected(&self, their_node_id: &PublicKey); + /// Performs actions that should happen roughly every ten seconds after startup. Allows handlers + /// to drop any buffered onion messages intended for prospective peers. + fn timer_tick_occurred(&self); + // Handler information: /// Gets the node feature flags which this handler itself supports. All available handlers are /// queried similarly and their feature flags are OR'd together to form the [`NodeFeatures`] diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index f061772890b..c78ca879fdb 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -19,7 +19,7 @@ use bitcoin::blockdata::constants::ChainHash; use bitcoin::secp256k1::{self, Secp256k1, SecretKey, PublicKey}; use crate::sign::{KeysManager, NodeSigner, Recipient}; -use crate::events::{MessageSendEvent, MessageSendEventsProvider}; +use crate::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider}; use crate::ln::ChannelId; use crate::ln::features::{InitFeatures, NodeFeatures}; use crate::ln::msgs; @@ -89,6 +89,9 @@ pub trait CustomMessageHandler: wire::CustomMessageReader { /// A dummy struct which implements `RoutingMessageHandler` without storing any routing information /// or doing any processing. You can provide one of these as the route_handler in a MessageHandler. pub struct IgnoringMessageHandler{} +impl EventsProvider for IgnoringMessageHandler { + fn process_pending_events(&self, _handler: H) where H::Target: EventHandler {} +} impl MessageSendEventsProvider for IgnoringMessageHandler { fn get_and_clear_pending_msg_events(&self) -> Vec { Vec::new() } } @@ -115,6 +118,7 @@ impl OnionMessageHandler for IgnoringMessageHandler { fn next_onion_message_for_peer(&self, _peer_node_id: PublicKey) -> Option { None } fn peer_connected(&self, _their_node_id: &PublicKey, _init: &msgs::Init, _inbound: bool) -> Result<(), ()> { Ok(()) } fn peer_disconnected(&self, _their_node_id: &PublicKey) {} + fn timer_tick_occurred(&self) {} fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::empty() } fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures { InitFeatures::empty() @@ -680,6 +684,8 @@ pub trait APeerManager { type NS: Deref; /// Gets a reference to the underlying [`PeerManager`]. fn as_ref(&self) -> &PeerManager; + /// Returns the peer manager's [`OnionMessageHandler`]. + fn onion_message_handler(&self) -> &Self::OMT; } impl @@ -705,6 +711,9 @@ APeerManager for PeerManager where type NST = ::Target; type NS = NS; fn as_ref(&self) -> &PeerManager { self } + fn onion_message_handler(&self) -> &Self::OMT { + self.message_handler.onion_message_handler.deref() + } } /// A PeerManager manages a set of peers, described by their [`SocketDescriptor`] and marshalls diff --git a/lightning/src/onion_message/functional_tests.rs b/lightning/src/onion_message/functional_tests.rs index c43b218df7b..e8e800d5e92 100644 --- a/lightning/src/onion_message/functional_tests.rs +++ b/lightning/src/onion_message/functional_tests.rs @@ -10,8 +10,9 @@ //! Onion message testing and test utilities live here. use crate::blinded_path::BlindedPath; +use crate::events::{Event, EventsProvider}; use crate::ln::features::InitFeatures; -use crate::ln::msgs::{self, DecodeError, OnionMessageHandler}; +use crate::ln::msgs::{self, DecodeError, OnionMessageHandler, SocketAddress}; use crate::sign::{NodeSigner, Recipient}; use crate::util::ser::{FixedLengthReader, LengthReadable, Writeable, Writer}; use crate::util::test_utils; @@ -28,10 +29,11 @@ use crate::sync::{Arc, Mutex}; use crate::prelude::*; struct MessengerNode { - keys_manager: Arc, + node_id: PublicKey, + entropy_source: Arc, messenger: OnionMessenger< Arc, - Arc, + Arc, Arc, Arc, Arc, @@ -40,12 +42,6 @@ struct MessengerNode { custom_message_handler: Arc, } -impl MessengerNode { - fn get_node_pk(&self) -> PublicKey { - self.keys_manager.get_node_id(Recipient::Node).unwrap() - } -} - struct TestMessageRouter {} impl MessageRouter for TestMessageRouter { @@ -55,6 +51,7 @@ impl MessageRouter for TestMessageRouter { Ok(OnionMessagePath { intermediate_nodes: vec![], destination, + addresses: Some(vec![SocketAddress::TcpIpV4 { addr: [127, 0, 0, 1], port: 1000 }]), }) } } @@ -155,44 +152,69 @@ impl CustomOnionMessageHandler for TestCustomMessageHandler { } fn create_nodes(num_messengers: u8) -> Vec { + let secrets = (1..=num_messengers) + .into_iter() + .map(|i| SecretKey::from_slice(&[i; 32]).unwrap()) + .collect(); + create_nodes_using_secrets(secrets) +} + +fn create_nodes_using_secrets(secrets: Vec) -> Vec { let mut nodes = Vec::new(); - for i in 0..num_messengers { + for (i, secret_key) in secrets.into_iter().enumerate() { let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i))); let seed = [i as u8; 32]; - let keys_manager = Arc::new(test_utils::TestKeysInterface::new(&seed, Network::Testnet)); + let entropy_source = Arc::new(test_utils::TestKeysInterface::new(&seed, Network::Testnet)); + let node_signer = Arc::new(test_utils::TestNodeSigner::new(secret_key)); + let message_router = Arc::new(TestMessageRouter {}); let offers_message_handler = Arc::new(TestOffersMessageHandler {}); let custom_message_handler = Arc::new(TestCustomMessageHandler::new()); nodes.push(MessengerNode { - keys_manager: keys_manager.clone(), + node_id: node_signer.get_node_id(Recipient::Node).unwrap(), + entropy_source: entropy_source.clone(), messenger: OnionMessenger::new( - keys_manager.clone(), keys_manager, logger.clone(), message_router, + entropy_source, node_signer, logger.clone(), message_router, offers_message_handler, custom_message_handler.clone() ), custom_message_handler, }); } - for idx in 0..num_messengers - 1 { - let i = idx as usize; - let mut features = InitFeatures::empty(); - features.set_onion_messages_optional(); - let init_msg = msgs::Init { features, networks: None, remote_network_address: None }; - nodes[i].messenger.peer_connected(&nodes[i + 1].get_node_pk(), &init_msg.clone(), true).unwrap(); - nodes[i + 1].messenger.peer_connected(&nodes[i].get_node_pk(), &init_msg.clone(), false).unwrap(); + for i in 0..nodes.len() - 1 { + connect_peers(&nodes[i], &nodes[i + 1]); } nodes } +fn connect_peers(node_a: &MessengerNode, node_b: &MessengerNode) { + let mut features = InitFeatures::empty(); + features.set_onion_messages_optional(); + let init_msg = msgs::Init { features, networks: None, remote_network_address: None }; + node_a.messenger.peer_connected(&node_b.node_id, &init_msg.clone(), true).unwrap(); + node_b.messenger.peer_connected(&node_a.node_id, &init_msg.clone(), false).unwrap(); +} + +fn disconnect_peers(node_a: &MessengerNode, node_b: &MessengerNode) { + node_a.messenger.peer_disconnected(&node_b.node_id); + node_b.messenger.peer_disconnected(&node_a.node_id); +} + +fn release_events(node: &MessengerNode) -> Vec { + let events = core::cell::RefCell::new(Vec::new()); + node.messenger.process_pending_events(&|e| events.borrow_mut().push(e)); + events.into_inner() +} + fn pass_along_path(path: &Vec) { let mut prev_node = &path[0]; for node in path.into_iter().skip(1) { let events = prev_node.messenger.release_pending_msgs(); let onion_msg = { - let msgs = events.get(&node.get_node_pk()).unwrap(); + let msgs = events.get(&node.node_id).unwrap(); assert_eq!(msgs.len(), 1); msgs[0].clone() }; - node.messenger.handle_onion_message(&prev_node.get_node_pk(), &onion_msg); + node.messenger.handle_onion_message(&prev_node.node_id, &onion_msg); prev_node = node; } } @@ -204,9 +226,10 @@ fn one_unblinded_hop() { let path = OnionMessagePath { intermediate_nodes: vec![], - destination: Destination::Node(nodes[1].get_node_pk()), + destination: Destination::Node(nodes[1].node_id), + addresses: None, }; - nodes[0].messenger.send_onion_message(path, test_msg, None).unwrap(); + nodes[0].messenger.send_onion_message_using_path(path, test_msg, None).unwrap(); nodes[1].custom_message_handler.expect_message(TestCustomMessage::Response); pass_along_path(&nodes); } @@ -217,10 +240,11 @@ fn two_unblinded_hops() { let test_msg = TestCustomMessage::Response; let path = OnionMessagePath { - intermediate_nodes: vec![nodes[1].get_node_pk()], - destination: Destination::Node(nodes[2].get_node_pk()), + intermediate_nodes: vec![nodes[1].node_id], + destination: Destination::Node(nodes[2].node_id), + addresses: None, }; - nodes[0].messenger.send_onion_message(path, test_msg, None).unwrap(); + nodes[0].messenger.send_onion_message_using_path(path, test_msg, None).unwrap(); nodes[2].custom_message_handler.expect_message(TestCustomMessage::Response); pass_along_path(&nodes); } @@ -231,12 +255,13 @@ fn one_blinded_hop() { let test_msg = TestCustomMessage::Response; let secp_ctx = Secp256k1::new(); - let blinded_path = BlindedPath::new_for_message(&[nodes[1].get_node_pk()], &*nodes[1].keys_manager, &secp_ctx).unwrap(); + let blinded_path = BlindedPath::new_for_message(&[nodes[1].node_id], &*nodes[1].entropy_source, &secp_ctx).unwrap(); let path = OnionMessagePath { intermediate_nodes: vec![], destination: Destination::BlindedPath(blinded_path), + addresses: None, }; - nodes[0].messenger.send_onion_message(path, test_msg, None).unwrap(); + nodes[0].messenger.send_onion_message_using_path(path, test_msg, None).unwrap(); nodes[1].custom_message_handler.expect_message(TestCustomMessage::Response); pass_along_path(&nodes); } @@ -247,13 +272,14 @@ fn two_unblinded_two_blinded() { let test_msg = TestCustomMessage::Response; let secp_ctx = Secp256k1::new(); - let blinded_path = BlindedPath::new_for_message(&[nodes[3].get_node_pk(), nodes[4].get_node_pk()], &*nodes[4].keys_manager, &secp_ctx).unwrap(); + let blinded_path = BlindedPath::new_for_message(&[nodes[3].node_id, nodes[4].node_id], &*nodes[4].entropy_source, &secp_ctx).unwrap(); let path = OnionMessagePath { - intermediate_nodes: vec![nodes[1].get_node_pk(), nodes[2].get_node_pk()], + intermediate_nodes: vec![nodes[1].node_id, nodes[2].node_id], destination: Destination::BlindedPath(blinded_path), + addresses: None, }; - nodes[0].messenger.send_onion_message(path, test_msg, None).unwrap(); + nodes[0].messenger.send_onion_message_using_path(path, test_msg, None).unwrap(); nodes[4].custom_message_handler.expect_message(TestCustomMessage::Response); pass_along_path(&nodes); } @@ -264,13 +290,14 @@ fn three_blinded_hops() { let test_msg = TestCustomMessage::Response; let secp_ctx = Secp256k1::new(); - let blinded_path = BlindedPath::new_for_message(&[nodes[1].get_node_pk(), nodes[2].get_node_pk(), nodes[3].get_node_pk()], &*nodes[3].keys_manager, &secp_ctx).unwrap(); + let blinded_path = BlindedPath::new_for_message(&[nodes[1].node_id, nodes[2].node_id, nodes[3].node_id], &*nodes[3].entropy_source, &secp_ctx).unwrap(); let path = OnionMessagePath { intermediate_nodes: vec![], destination: Destination::BlindedPath(blinded_path), + addresses: None, }; - nodes[0].messenger.send_onion_message(path, test_msg, None).unwrap(); + nodes[0].messenger.send_onion_message_using_path(path, test_msg, None).unwrap(); nodes[3].custom_message_handler.expect_message(TestCustomMessage::Response); pass_along_path(&nodes); } @@ -281,13 +308,14 @@ fn too_big_packet_error() { let nodes = create_nodes(2); let test_msg = TestCustomMessage::Response; - let hop_node_id = nodes[1].get_node_pk(); + let hop_node_id = nodes[1].node_id; let hops = vec![hop_node_id; 400]; let path = OnionMessagePath { intermediate_nodes: hops, destination: Destination::Node(hop_node_id), + addresses: None, }; - let err = nodes[0].messenger.send_onion_message(path, test_msg, None).unwrap_err(); + let err = nodes[0].messenger.send_onion_message_using_path(path, test_msg, None).unwrap_err(); assert_eq!(err, SendError::TooBigPacket); } @@ -299,23 +327,25 @@ fn we_are_intro_node() { let test_msg = TestCustomMessage::Response; let secp_ctx = Secp256k1::new(); - let blinded_path = BlindedPath::new_for_message(&[nodes[0].get_node_pk(), nodes[1].get_node_pk(), nodes[2].get_node_pk()], &*nodes[2].keys_manager, &secp_ctx).unwrap(); + let blinded_path = BlindedPath::new_for_message(&[nodes[0].node_id, nodes[1].node_id, nodes[2].node_id], &*nodes[2].entropy_source, &secp_ctx).unwrap(); let path = OnionMessagePath { intermediate_nodes: vec![], destination: Destination::BlindedPath(blinded_path), + addresses: None, }; - nodes[0].messenger.send_onion_message(path, test_msg.clone(), None).unwrap(); + nodes[0].messenger.send_onion_message_using_path(path, test_msg.clone(), None).unwrap(); nodes[2].custom_message_handler.expect_message(TestCustomMessage::Response); pass_along_path(&nodes); // Try with a two-hop blinded path where we are the introduction node. - let blinded_path = BlindedPath::new_for_message(&[nodes[0].get_node_pk(), nodes[1].get_node_pk()], &*nodes[1].keys_manager, &secp_ctx).unwrap(); + let blinded_path = BlindedPath::new_for_message(&[nodes[0].node_id, nodes[1].node_id], &*nodes[1].entropy_source, &secp_ctx).unwrap(); let path = OnionMessagePath { intermediate_nodes: vec![], destination: Destination::BlindedPath(blinded_path), + addresses: None, }; - nodes[0].messenger.send_onion_message(path, test_msg, None).unwrap(); + nodes[0].messenger.send_onion_message_using_path(path, test_msg, None).unwrap(); nodes[1].custom_message_handler.expect_message(TestCustomMessage::Response); nodes.remove(2); pass_along_path(&nodes); @@ -329,13 +359,14 @@ fn invalid_blinded_path_error() { // 0 hops let secp_ctx = Secp256k1::new(); - let mut blinded_path = BlindedPath::new_for_message(&[nodes[1].get_node_pk(), nodes[2].get_node_pk()], &*nodes[2].keys_manager, &secp_ctx).unwrap(); + let mut blinded_path = BlindedPath::new_for_message(&[nodes[1].node_id, nodes[2].node_id], &*nodes[2].entropy_source, &secp_ctx).unwrap(); blinded_path.blinded_hops.clear(); let path = OnionMessagePath { intermediate_nodes: vec![], destination: Destination::BlindedPath(blinded_path), + addresses: None, }; - let err = nodes[0].messenger.send_onion_message(path, test_msg.clone(), None).unwrap_err(); + let err = nodes[0].messenger.send_onion_message_using_path(path, test_msg.clone(), None).unwrap_err(); assert_eq!(err, SendError::TooFewBlindedHops); } @@ -347,11 +378,12 @@ fn reply_path() { // Destination::Node let path = OnionMessagePath { - intermediate_nodes: vec![nodes[1].get_node_pk(), nodes[2].get_node_pk()], - destination: Destination::Node(nodes[3].get_node_pk()), + intermediate_nodes: vec![nodes[1].node_id, nodes[2].node_id], + destination: Destination::Node(nodes[3].node_id), + addresses: None, }; - let reply_path = BlindedPath::new_for_message(&[nodes[2].get_node_pk(), nodes[1].get_node_pk(), nodes[0].get_node_pk()], &*nodes[0].keys_manager, &secp_ctx).unwrap(); - nodes[0].messenger.send_onion_message(path, test_msg.clone(), Some(reply_path)).unwrap(); + let reply_path = BlindedPath::new_for_message(&[nodes[2].node_id, nodes[1].node_id, nodes[0].node_id], &*nodes[0].entropy_source, &secp_ctx).unwrap(); + nodes[0].messenger.send_onion_message_using_path(path, test_msg.clone(), Some(reply_path)).unwrap(); nodes[3].custom_message_handler.expect_message(TestCustomMessage::Request); pass_along_path(&nodes); // Make sure the last node successfully decoded the reply path. @@ -360,14 +392,15 @@ fn reply_path() { pass_along_path(&nodes); // Destination::BlindedPath - let blinded_path = BlindedPath::new_for_message(&[nodes[1].get_node_pk(), nodes[2].get_node_pk(), nodes[3].get_node_pk()], &*nodes[3].keys_manager, &secp_ctx).unwrap(); + let blinded_path = BlindedPath::new_for_message(&[nodes[1].node_id, nodes[2].node_id, nodes[3].node_id], &*nodes[3].entropy_source, &secp_ctx).unwrap(); let path = OnionMessagePath { intermediate_nodes: vec![], destination: Destination::BlindedPath(blinded_path), + addresses: None, }; - let reply_path = BlindedPath::new_for_message(&[nodes[2].get_node_pk(), nodes[1].get_node_pk(), nodes[0].get_node_pk()], &*nodes[0].keys_manager, &secp_ctx).unwrap(); + let reply_path = BlindedPath::new_for_message(&[nodes[2].node_id, nodes[1].node_id, nodes[0].node_id], &*nodes[0].entropy_source, &secp_ctx).unwrap(); - nodes[0].messenger.send_onion_message(path, test_msg, Some(reply_path)).unwrap(); + nodes[0].messenger.send_onion_message_using_path(path, test_msg, Some(reply_path)).unwrap(); nodes[3].custom_message_handler.expect_message(TestCustomMessage::Request); pass_along_path(&nodes); @@ -397,9 +430,10 @@ fn invalid_custom_message_type() { let test_msg = InvalidCustomMessage {}; let path = OnionMessagePath { intermediate_nodes: vec![], - destination: Destination::Node(nodes[1].get_node_pk()), + destination: Destination::Node(nodes[1].node_id), + addresses: None, }; - let err = nodes[0].messenger.send_onion_message(path, test_msg, None).unwrap_err(); + let err = nodes[0].messenger.send_onion_message_using_path(path, test_msg, None).unwrap_err(); assert_eq!(err, SendError::InvalidMessage); } @@ -409,12 +443,13 @@ fn peer_buffer_full() { let test_msg = TestCustomMessage::Request; let path = OnionMessagePath { intermediate_nodes: vec![], - destination: Destination::Node(nodes[1].get_node_pk()), + destination: Destination::Node(nodes[1].node_id), + addresses: None, }; for _ in 0..188 { // Based on MAX_PER_PEER_BUFFER_SIZE in OnionMessenger - nodes[0].messenger.send_onion_message(path.clone(), test_msg.clone(), None).unwrap(); + nodes[0].messenger.send_onion_message_using_path(path.clone(), test_msg.clone(), None).unwrap(); } - let err = nodes[0].messenger.send_onion_message(path, test_msg, None).unwrap_err(); + let err = nodes[0].messenger.send_onion_message_using_path(path, test_msg, None).unwrap_err(); assert_eq!(err, SendError::BufferFull); } @@ -428,51 +463,97 @@ fn many_hops() { let mut intermediate_nodes = vec![]; for i in 1..(num_nodes-1) { - intermediate_nodes.push(nodes[i].get_node_pk()); + intermediate_nodes.push(nodes[i].node_id); } let path = OnionMessagePath { intermediate_nodes, - destination: Destination::Node(nodes[num_nodes-1].get_node_pk()), + destination: Destination::Node(nodes[num_nodes-1].node_id), + addresses: None, }; - nodes[0].messenger.send_onion_message(path, test_msg, None).unwrap(); + nodes[0].messenger.send_onion_message_using_path(path, test_msg, None).unwrap(); nodes[num_nodes-1].custom_message_handler.expect_message(TestCustomMessage::Response); pass_along_path(&nodes); } #[test] -fn spec_test_vector() { - let keys_mgrs = vec![ - (Arc::new(test_utils::TestKeysInterface::new(&[0; 32], Network::Testnet)), // Alice - Arc::new(test_utils::TestNodeSigner::new(SecretKey::from_slice(&>::from_hex("4141414141414141414141414141414141414141414141414141414141414141").unwrap()).unwrap()))), - (Arc::new(test_utils::TestKeysInterface::new(&[1; 32], Network::Testnet)), // Bob - Arc::new(test_utils::TestNodeSigner::new(SecretKey::from_slice(&>::from_hex("4242424242424242424242424242424242424242424242424242424242424242").unwrap()).unwrap()))), - (Arc::new(test_utils::TestKeysInterface::new(&[2; 32], Network::Testnet)), // Carol - Arc::new(test_utils::TestNodeSigner::new(SecretKey::from_slice(&>::from_hex("4343434343434343434343434343434343434343434343434343434343434343").unwrap()).unwrap()))), - (Arc::new(test_utils::TestKeysInterface::new(&[3; 32], Network::Testnet)), // Dave - Arc::new(test_utils::TestNodeSigner::new(SecretKey::from_slice(&>::from_hex("4444444444444444444444444444444444444444444444444444444444444444").unwrap()).unwrap()))), - ]; - let message_router = Arc::new(TestMessageRouter {}); - let offers_message_handler = Arc::new(TestOffersMessageHandler {}); - let custom_message_handler = Arc::new(TestCustomMessageHandler::new()); - let mut nodes = Vec::new(); - for (idx, (entropy_source, node_signer)) in keys_mgrs.iter().enumerate() { - let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", idx))); - nodes.push(OnionMessenger::new( - entropy_source.clone(), node_signer.clone(), logger.clone(), message_router.clone(), - offers_message_handler.clone(), custom_message_handler.clone() - )); +fn requests_peer_connection_for_buffered_messages() { + let nodes = create_nodes(3); + let message = TestCustomMessage::Request; + let secp_ctx = Secp256k1::new(); + let blinded_path = BlindedPath::new_for_message( + &[nodes[1].node_id, nodes[2].node_id], &*nodes[0].entropy_source, &secp_ctx + ).unwrap(); + let destination = Destination::BlindedPath(blinded_path); + + // Buffer an onion message for a connected peer + nodes[0].messenger.send_onion_message(message.clone(), destination.clone(), None).unwrap(); + assert!(release_events(&nodes[0]).is_empty()); + assert!(nodes[0].messenger.next_onion_message_for_peer(nodes[1].node_id).is_some()); + assert!(nodes[0].messenger.next_onion_message_for_peer(nodes[1].node_id).is_none()); + + // Buffer an onion message for a disconnected peer + disconnect_peers(&nodes[0], &nodes[1]); + assert!(nodes[0].messenger.next_onion_message_for_peer(nodes[1].node_id).is_none()); + nodes[0].messenger.send_onion_message(message, destination, None).unwrap(); + + // Check that a ConnectionNeeded event for the peer is provided + let events = release_events(&nodes[0]); + assert_eq!(events.len(), 1); + match &events[0] { + Event::ConnectionNeeded { node_id, .. } => assert_eq!(*node_id, nodes[1].node_id), + e => panic!("Unexpected event: {:?}", e), + } + + // Release the buffered onion message when reconnected + connect_peers(&nodes[0], &nodes[1]); + assert!(nodes[0].messenger.next_onion_message_for_peer(nodes[1].node_id).is_some()); + assert!(nodes[0].messenger.next_onion_message_for_peer(nodes[1].node_id).is_none()); +} + +#[test] +fn drops_buffered_messages_waiting_for_peer_connection() { + let nodes = create_nodes(3); + let message = TestCustomMessage::Request; + let secp_ctx = Secp256k1::new(); + let blinded_path = BlindedPath::new_for_message( + &[nodes[1].node_id, nodes[2].node_id], &*nodes[0].entropy_source, &secp_ctx + ).unwrap(); + let destination = Destination::BlindedPath(blinded_path); + + // Buffer an onion message for a disconnected peer + disconnect_peers(&nodes[0], &nodes[1]); + nodes[0].messenger.send_onion_message(message, destination, None).unwrap(); + + // Release the event so the timer can start ticking + let events = release_events(&nodes[0]); + assert_eq!(events.len(), 1); + match &events[0] { + Event::ConnectionNeeded { node_id, .. } => assert_eq!(*node_id, nodes[1].node_id), + e => panic!("Unexpected event: {:?}", e), } - for idx in 0..nodes.len() - 1 { - let i = idx as usize; - let mut features = InitFeatures::empty(); - features.set_onion_messages_optional(); - let init_msg = msgs::Init { features, networks: None, remote_network_address: None }; - nodes[i].peer_connected( - &keys_mgrs[i + 1].1.get_node_id(Recipient::Node).unwrap(), &init_msg.clone(), true).unwrap(); - nodes[i + 1].peer_connected( - &keys_mgrs[i].1.get_node_id(Recipient::Node).unwrap(), &init_msg.clone(), false).unwrap(); + + // Drop buffered messages for a disconnected peer after some timer ticks + use crate::onion_message::messenger::MAX_TIMER_TICKS; + for _ in 0..=MAX_TIMER_TICKS { + nodes[0].messenger.timer_tick_occurred(); } + connect_peers(&nodes[0], &nodes[1]); + assert!(nodes[0].messenger.next_onion_message_for_peer(nodes[1].node_id).is_none()); +} + +#[test] +fn spec_test_vector() { + let secret_keys = [ + "4141414141414141414141414141414141414141414141414141414141414141", // Alice + "4242424242424242424242424242424242424242424242424242424242424242", // Bob + "4343434343434343434343434343434343434343434343434343434343434343", // Carol + "4444444444444444444444444444444444444444444444444444444444444444", // Dave + ] + .iter() + .map(|secret| SecretKey::from_slice(&>::from_hex(secret).unwrap()).unwrap()) + .collect(); + let nodes = create_nodes_using_secrets(secret_keys); // Hardcode the sender->Alice onion message, because it includes an unknown TLV of type 1, which // LDK doesn't support constructing. @@ -491,24 +572,18 @@ fn spec_test_vector() { // which is why the asserted strings differ slightly from the spec. assert_eq!(sender_to_alice_om.encode(), >::from_hex("031195a8046dcbb8e17034bca630065e7a0982e4e36f6f7e5a8d4554e4846fcd9905560002531fe6068134503d2723133227c867ac8fa6c83c537e9a44c3c5bdbdcb1fe33793b828776d70aabbd8cef1a5b52d5a397ae1a20f20435ff6057cd8be339d5aee226660ef73b64afa45dbf2e6e8e26eb96a259b2db5aeecda1ce2e768bbc35d389d7f320ca3d2bd14e2689bef2f5ac0307eaaabc1924eb972c1563d4646ae131accd39da766257ed35ea36e4222527d1db4fa7b2000aab9eafcceed45e28b5560312d4e2299bd8d1e7fe27d10925966c28d497aec400b4630485e82efbabc00550996bdad5d6a9a8c75952f126d14ad2cff91e16198691a7ef2937de83209285f1fb90944b4e46bca7c856a9ce3da10cdf2a7d00dc2bf4f114bc4d3ed67b91cbde558ce9af86dc81fbdc37f8e301b29e23c1466659c62bdbf8cff5d4c20f0fb0851ec72f5e9385dd40fdd2e3ed67ca4517117825665e50a3e26f73c66998daf18e418e8aef9ce2d20da33c3629db2933640e03e7b44c2edf49e9b482db7b475cfd4c617ae1d46d5c24d697846f9f08561eac2b065f9b382501f6eabf07343ed6c602f61eab99cdb52adf63fd44a8db2d3016387ea708fc1c08591e19b4d9984ebe31edbd684c2ea86526dd8c7732b1d8d9117511dc1b643976d356258fce8313b1cb92682f41ab72dedd766f06de375f9edacbcd0ca8c99b865ea2b7952318ea1fd20775a28028b5cf59dece5de14f615b8df254eee63493a5111ea987224bea006d8f1b60d565eef06ac0da194dba2a6d02e79b2f2f34e9ca6e1984a507319d86e9d4fcaeea41b4b9144e0b1826304d4cc1da61cfc5f8b9850697df8adc5e9d6f3acb3219b02764b4909f2b2b22e799fd66c383414a84a7d791b899d4aa663770009eb122f90282c8cb9cda16aba6897edcf9b32951d0080c0f52be3ca011fbec3fb16423deb47744645c3b05fdbd932edf54ba6efd26e65340a8e9b1d1216582e1b30d64524f8ca2d6c5ba63a38f7120a3ed71bed8960bcac2feee2dd41c90be48e3c11ec518eb3d872779e4765a6cc28c6b0fa71ab57ced73ae963cc630edae4258cba2bf25821a6ae049fec2fca28b5dd1bb004d92924b65701b06dcf37f0ccd147a13a03f9bc0f98b7d78fe9058089756931e2cd0e0ed92ec6759d07b248069526c67e9e6ce095118fd3501ba0f858ef030b76c6f6beb11a09317b5ad25343f4b31aef02bc555951bc7791c2c289ecf94d5544dcd6ad3021ed8e8e3db34b2a73e1eedb57b578b068a5401836d6e382110b73690a94328c404af25e85a8d6b808893d1b71af6a31fadd8a8cc6e31ecc0d9ff7e6b91fd03c274a5c1f1ccd25b61150220a3fddb04c91012f5f7a83a5c90deb2470089d6e38cd5914b9c946eca6e9d31bbf8667d36cf87effc3f3ff283c21dd4137bd569fe7cf758feac94053e4baf7338bb592c8b7c291667fadf4a9bf9a2a154a18f612cbc7f851b3f8f2070e0a9d180622ee4f8e81b0ab250d504cef24116a3ff188cc829fcd8610b56343569e8dc997629410d1967ca9dd1d27eec5e01e4375aad16c46faba268524b154850d0d6fe3a76af2c6aa3e97647c51036049ac565370028d6a439a2672b6face56e1b171496c0722cfa22d9da631be359661617c5d5a2d286c5e19db9452c1e21a0107b6400debda2decb0c838f342dd017cdb2dccdf1fe97e3df3f881856b546997a3fed9e279c720145101567dd56be21688fed66bf9759e432a9aa89cbbd225d13cdea4ca05f7a45cfb6a682a3d5b1e18f7e6cf934fae5098108bae9058d05c3387a01d8d02a656d2bfff67e9f46b2d8a6aac28129e52efddf6e552214c3f8a45bc7a912cca9a7fec1d7d06412c6972cb9e3dc518983f56530b8bffe7f92c4b6eb47d4aef59fb513c4653a42de61bc17ad7728e7fc7590ff05a9e991de03f023d0aaf8688ed6170def5091c66576a424ac1cb").unwrap()); let sender_dummy_node_id = PublicKey::from_slice(&[2; 33]).unwrap(); - nodes[0].handle_onion_message(&sender_dummy_node_id, &sender_to_alice_om); - let alice_to_bob_om = nodes[0].next_onion_message_for_peer( - keys_mgrs[1].1.get_node_id(Recipient::Node).unwrap()).unwrap(); + nodes[0].messenger.handle_onion_message(&sender_dummy_node_id, &sender_to_alice_om); + let alice_to_bob_om = nodes[0].messenger.next_onion_message_for_peer(nodes[1].node_id).unwrap(); assert_eq!(alice_to_bob_om.encode(), >::from_hex("031b84c5567b126440995d3ed5aaba0565d71e1834604819ff9c17f5e9d5dd078f05560002536d53f93796cad550b6c68662dca41f7e8c221c31022c64dd1a627b2df3982b25eac261e88369cfc66e1e3b6d9829cb3dcd707046e68a7796065202a7904811bf2608c5611cf74c9eb5371c7eb1a4428bb39a041493e2a568ddb0b2482a6cc6711bc6116cef144ebf988073cb18d9dd4ce2d3aa9de91a7dc6d7c6f11a852024626e66b41ba1158055505dff9cb15aa51099f315564d9ee3ed6349665dc3e209eedf9b5805ee4f69d315df44c80e63d0e2efbdab60ec96f44a3447c6a6ddb1efb6aa4e072bde1dab974081646bfddf3b02daa2b83847d74dd336465e76e9b8fecc2b0414045eeedfc39939088a76820177dd1103c99939e659beb07197bab9f714b30ba8dc83738e9a6553a57888aaeda156c68933a2f4ff35e3f81135076b944ed9856acbfee9c61299a5d1763eadd14bf5eaf71304c8e165e590d7ecbcd25f1650bf5b6c2ad1823b2dc9145e168974ecf6a2273c94decff76d94bc6708007a17f22262d63033c184d0166c14f41b225a956271947aae6ce65890ed8f0d09c6ffe05ec02ee8b9de69d7077a0c5adeb813aabcc1ba8975b73ab06ddea5f4db3c23a1de831602de2b83f990d4133871a1a81e53f86393e6a7c3a7b73f0c099fa72afe26c3027bb9412338a19303bd6e6591c04fb4cde9b832b5f41ae199301ea8c303b5cef3aca599454273565de40e1148156d1f97c1aa9e58459ab318304075e034f5b7899c12587b86776a18a1da96b7bcdc22864fccc4c41538ebce92a6f054d53bf46770273a70e75fe0155cd6d2f2e937465b0825ce3123b8c206fac4c30478fa0f08a97ade7216dce11626401374993213636e93545a31f500562130f2feb04089661ad8c34d5a4cbd2e4e426f37cb094c786198a220a2646ecadc38c04c29ee67b19d662c209a7b30bfecc7fe8bf7d274de0605ee5df4db490f6d32234f6af639d3fce38a2801bcf8d51e9c090a6c6932355a83848129a378095b34e71cb8f51152dc035a4fe8e802fec8de221a02ba5afd6765ce570bef912f87357936ea0b90cb2990f56035e89539ec66e8dbd6ed50835158614096990e019c3eba3d7dd6a77147641c6145e8b17552cd5cf7cd163dd40b9eaeba8c78e03a2cd8c0b7997d6f56d35f38983a202b4eb8a54e14945c4de1a6dde46167e11708b7a5ff5cb9c0f7fc12fae49a012aa90bb1995c038130b749c48e6f1ffb732e92086def42af10fbc460d94abeb7b2fa744a5e9a491d62a08452be8cf2fdef573deedc1fe97098bce889f98200b26f9bb99da9aceddda6d793d8e0e44a2601ef4590cfbb5c3d0197aac691e3d31c20fd8e38764962ca34dabeb85df28feabaf6255d4d0df3d814455186a84423182caa87f9673df770432ad8fdfe78d4888632d460d36d2719e8fa8e4b4ca10d817c5d6bc44a8b2affab8c2ba53b8bf4994d63286c2fad6be04c28661162fa1a67065ecda8ba8c13aee4a8039f4f0110e0c0da2366f178d8903e19136dad6df9d8693ce71f3a270f9941de2a93d9b67bc516207ac1687bf6e00b29723c42c7d9c90df9d5e599dbeb7b73add0a6a2b7aba82f98ac93cb6e60494040445229f983a81c34f7f686d166dfc98ec23a6318d4a02a311ac28d655ea4e0f9c3014984f31e621ef003e98c373561d9040893feece2e0fa6cd2dd565e6fbb2773a2407cb2c3273c306cf71f427f2e551c4092e067cf9869f31ac7c6c80dd52d4f85be57a891a41e34be0d564e39b4af6f46b85339254a58b205fb7e10e7d0470ee73622493f28c08962118c23a1198467e72c4ae1cd482144b419247a5895975ea90d135e2a46ef7e5794a1551a447ff0a0d299b66a7f565cd86531f5e7af5408d85d877ce95b1df12b88b7d5954903a5296325ba478ba1e1a9d1f30a2d5052b2e2889bbd64f72c72bc71d8817288a2").unwrap()); - nodes[1].handle_onion_message( - &keys_mgrs[0].1.get_node_id(Recipient::Node).unwrap(), &alice_to_bob_om); - let bob_to_carol_om = nodes[1].next_onion_message_for_peer( - keys_mgrs[2].1.get_node_id(Recipient::Node).unwrap()).unwrap(); + nodes[1].messenger.handle_onion_message(&nodes[0].node_id, &alice_to_bob_om); + let bob_to_carol_om = nodes[1].messenger.next_onion_message_for_peer(nodes[2].node_id).unwrap(); assert_eq!(bob_to_carol_om.encode(), >::from_hex("02b684babfd400c8dd48b367e9754b8021a3594a34dc94d7101776c7f6a86d0582055600029a77e8523162efa1f4208f4f2050cd5c386ddb6ce6d36235ea569d217ec52209fb85fdf7dbc4786c373eebdba0ddc184cfbe6da624f610e93f62c70f2c56be1090b926359969f040f932c03f53974db5656233bd60af375517d4323002937d784c2c88a564bcefe5c33d3fc21c26d94dfacab85e2e19685fd2ff4c543650958524439b6da68779459aee5ffc9dc543339acec73ff43be4c44ddcbe1c11d50e2411a67056ba9db7939d780f5a86123fdd3abd6f075f7a1d78ab7daf3a82798b7ec1e9f1345bc0d1e935098497067e2ae5a51ece396fcb3bb30871ad73aee51b2418b39f00c8e8e22be4a24f4b624e09cb0414dd46239de31c7be035f71e8da4f5a94d15b44061f46414d3f355069b5c5b874ba56704eb126148a22ec873407fe118972127e63ff80e682e410f297f23841777cec0517e933eaf49d7e34bd203266b42081b3a5193b51ccd34b41342bc67cf73523b741f5c012ba2572e9dda15fbe131a6ac2ff24dc2a7622d58b9f3553092cfae7fae3c8864d95f97aa49ec8edeff5d9f5782471160ee412d82ff6767030fc63eec6a93219a108cd41433834b26676a39846a944998796c79cd1cc460531b8ded659cedfd8aecefd91944f00476f1496daafb4ea6af3feacac1390ea510709783c2aa81a29de27f8959f6284f4684102b17815667cbb0645396ac7d542b878d90c42a1f7f00c4c4eedb2a22a219f38afadb4f1f562b6e000a94e75cc38f535b43a3c0384ccef127fde254a9033a317701c710b2b881065723486e3f4d3eea5e12f374a41565fe43fa137c1a252c2153dde055bb343344c65ad0529010ece29bbd405effbebfe3ba21382b94a60ac1a5ffa03f521792a67b30773cb42e862a8a02a8bbd41b842e115969c87d1ff1f8c7b5726b9f20772dd57fe6e4ea41f959a2a673ffad8e2f2a472c4c8564f3a5a47568dd75294b1c7180c500f7392a7da231b1fe9e525ea2d7251afe9ca52a17fe54a116cb57baca4f55b9b6de915924d644cba9dade4ccc01939d7935749c008bafc6d3ad01cd72341ce5ddf7a5d7d21cf0465ab7a3233433aef21f9acf2bfcdc5a8cc003adc4d82ac9d72b36eb74e05c9aa6ccf439ac92e6b84a3191f0764dd2a2e0b4cc3baa08782b232ad6ecd3ca6029bc08cc094aef3aebddcaddc30070cb6023a689641de86cfc6341c8817215a4650f844cd2ca60f2f10c6e44cfc5f23912684d4457bf4f599879d30b79bf12ef1ab8d34dddc15672b82e56169d4c770f0a2a7a960b1e8790773f5ff7fce92219808f16d061cc85e053971213676d28fb48925e9232b66533dbd938458eb2cc8358159df7a2a2e4cf87500ede2afb8ce963a845b98978edf26a6948d4932a6b95d022004556d25515fe158092ce9a913b4b4a493281393ca731e8d8e5a3449b9d888fc4e73ffcbb9c6d6d66e88e03cf6e81a0496ede6e4e4172b08c000601993af38f80c7f68c9d5fff9e0e215cff088285bf039ca731744efcb7825a272ca724517736b4890f47e306b200aa2543c363e2c9090bcf3cf56b5b86868a62471c7123a41740392fc1d5ab28da18dca66618e9af7b42b62b23aba907779e73ca03ec60e6ab9e0484b9cae6578e0fddb6386cb3468506bf6420298bf4a690947ab582255551d82487f271101c72e19e54872ab47eae144db66bc2f8194a666a5daec08d12822cb83a61946234f2dfdbd6ca7d8763e6818adee7b401fcdb1ac42f9df1ac5cc5ac131f2869013c8d6cd29d4c4e3d05bccd34ca83366d616296acf854fa05149bfd763a25b9938e96826a037fdcb85545439c76df6beed3bdbd01458f9cf984997cc4f0a7ac3cc3f5e1eeb59c09cadcf5a537f16e444149c8f17d4bdaef16c9fbabc5ef06eb0f0bf3a07a1beddfeacdaf1df5582d6dbd6bb808d6ab31bc22e5d7").unwrap()); - nodes[2].handle_onion_message( - &keys_mgrs[1].1.get_node_id(Recipient::Node).unwrap(), &bob_to_carol_om); - let carol_to_dave_om = nodes[2].next_onion_message_for_peer( - keys_mgrs[3].1.get_node_id(Recipient::Node).unwrap()).unwrap(); + nodes[2].messenger.handle_onion_message(&nodes[1].node_id, &bob_to_carol_om); + let carol_to_dave_om = nodes[2].messenger.next_onion_message_for_peer(nodes[3].node_id).unwrap(); assert_eq!(carol_to_dave_om.encode(), >::from_hex("025aaca62db7ce6b46386206ef9930daa32e979a35cb185a41cb951aa7d254b03c055600025550b2910294fa73bda99b9de9c851be9cbb481e23194a1743033630efba546b86e7d838d0f6e9cc0ed088dbf6889f0dceca3bfc745bd77d013a31311fa932a8bf1d28387d9ff521eabc651dee8f861fed609a68551145a451f017ec44978addeee97a423c08445531da488fd1ddc998e9cdbfcea59517b53fbf1833f0bbe6188dba6ca773a247220ec934010daca9cc185e1ceb136803469baac799e27a0d82abe53dc48a06a55d1f643885cc7894677dd20a4e4152577d1ba74b870b9279f065f9b340cedb3ca13b7df218e853e10ccd1b59c42a2acf93f489e170ee4373d30ab158b60fc20d3ba73a1f8c750951d69fb5b9321b968ddc8114936412346aff802df65516e1c09c51ef19849ff36c0199fd88c8bec301a30fef0c7cb497901c038611303f64e4174b5daf42832aa5586b84d2c9b95f382f4269a5d1bd4be898618dc78dfd451170f72ca16decac5b03e60702112e439cadd104fb3bbb3d5023c9b80823fdcd0a212a7e1aaa6eeb027adc7f8b3723031d135a09a979a4802788bb7861c6cc85501fb91137768b70aeab309b27b885686604ffc387004ac4f8c44b101c39bc0597ef7fd957f53fc5051f534b10eb3852100962b5e58254e5558689913c26ad6072ea41f5c5db10077cfc91101d4ae393be274c74297da5cc381cd88d54753aaa7df74b2f9da8d88a72bc9218fcd1f19e4ff4aace182312b9509c5175b6988f044c5756d232af02a451a02ca752f3c52747773acff6fd07d2032e6ce562a2c42105d106eba02d0b1904182cdc8c74875b082d4989d3a7e9f0e73de7c75d357f4af976c28c0b206c5e8123fc2391d078592d0d5ff686fd245c0a2de2e535b7cca99c0a37d432a8657393a9e3ca53eec1692159046ba52cb9bc97107349d8673f74cbc97e231f1108005c8d03e24ca813cea2294b39a7a493bcc062708f1f6cf0074e387e7d50e0666ce784ef4d31cb860f6cad767438d9ea5156ff0ae86e029e0247bf94df75ee0cda4f2006061455cb2eaff513d558863ae334cef7a3d45f55e7cc13153c6719e9901c1d4db6c03f643b69ea4860690305651794284d9e61eb848ccdf5a77794d376f0af62e46d4835acce6fd9eef5df73ebb8ea3bb48629766967f446e744ecc57ff3642c4aa1ccee9a2f72d5caa75fa05787d08b79408fce792485fdecdc25df34820fb061275d70b84ece540b0fc47b2453612be34f2b78133a64e812598fbe225fd85415f8ffe5340ce955b5fd9d67dd88c1c531dde298ed25f96df271558c812c26fa386966c76f03a6ebccbca49ac955916929bd42e134f982dde03f924c464be5fd1ba44f8dc4c3cbc8162755fd1d8f7dc044b15b1a796c53df7d8769bb167b2045b49cc71e08908796c92c16a235717cabc4bb9f60f8f66ff4fff1f9836388a99583acebdff4a7fb20f48eedcd1f4bdcc06ec8b48e35307df51d9bc81d38a94992dd135b30079e1f592da6e98dff496cb1a7776460a26b06395b176f585636ebdf7eab692b227a31d6979f5a6141292698e91346b6c806b90c7c6971e481559cae92ee8f4136f2226861f5c39ddd29bbdb118a35dece03f49a96804caea79a3dacfbf09d65f2611b5622de51d98e18151acb3bb84c09caaa0cc80edfa743a4679f37d6167618ce99e73362fa6f213409931762618a61f1738c071bba5afc1db24fe94afb70c40d731908ab9a505f76f57a7d40e708fd3df0efc5b7cbb2a7b75cd23449e09684a2f0e2bfa0d6176c35f96fe94d92fc9fa4103972781f81cb6e8df7dbeb0fc529c600d768bed3f08828b773d284f69e9a203459d88c12d6df7a75be2455fec128f07a497a2b2bf626cc6272d0419ca663e9dc66b8224227eb796f0246dcae9c5b0b6cfdbbd40c3245a610481c92047c968c9fc92c04b89cc41a0c15355a8f").unwrap()); // Dave handles the onion message but he'll log that he errored while decoding the hop data // because he sees it as an empty onion message (the only contents of the sender's OM is "hello" // with TLV type 1, which Dave ignores because (1) it's odd and he can't understand it and (2) LDK // only attempts to parse custom OM TLVs with type > 64). - nodes[3].handle_onion_message( - &keys_mgrs[2].1.get_node_id(Recipient::Node).unwrap(), &carol_to_dave_om); + nodes[3].messenger.handle_onion_message(&nodes[2].node_id, &carol_to_dave_om); } diff --git a/lightning/src/onion_message/messenger.rs b/lightning/src/onion_message/messenger.rs index c7f01ae5978..05ea7a2853c 100644 --- a/lightning/src/onion_message/messenger.rs +++ b/lightning/src/onion_message/messenger.rs @@ -18,13 +18,15 @@ use bitcoin::secp256k1::{self, PublicKey, Scalar, Secp256k1, SecretKey}; use crate::blinded_path::BlindedPath; use crate::blinded_path::message::{advance_path_by_one, ForwardTlvs, ReceiveTlvs}; use crate::blinded_path::utils; +use crate::events::{Event, EventHandler, EventsProvider}; use crate::sign::{EntropySource, KeysManager, NodeSigner, Recipient}; #[cfg(not(c_bindings))] use crate::ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager}; use crate::ln::features::{InitFeatures, NodeFeatures}; -use crate::ln::msgs::{self, OnionMessage, OnionMessageHandler}; +use crate::ln::msgs::{self, OnionMessage, OnionMessageHandler, SocketAddress}; use crate::ln::onion_utils; use crate::ln::peer_handler::IgnoringMessageHandler; +use crate::routing::gossip::{NetworkGraph, NodeId}; pub use super::packet::OnionMessageContents; use super::packet::ParsedOnionMessageContents; use super::offers::OffersMessageHandler; @@ -38,6 +40,8 @@ use crate::io; use crate::sync::{Arc, Mutex}; use crate::prelude::*; +pub(super) const MAX_TIMER_TICKS: usize = 2; + /// A sender, receiver and forwarder of [`OnionMessage`]s. /// /// # Handling Messages @@ -76,7 +80,15 @@ use crate::prelude::*; /// # struct FakeMessageRouter {} /// # impl MessageRouter for FakeMessageRouter { /// # fn find_path(&self, sender: PublicKey, peers: Vec, destination: Destination) -> Result { -/// # unimplemented!() +/// # let secp_ctx = Secp256k1::new(); +/// # let node_secret = SecretKey::from_slice(&>::from_hex("0101010101010101010101010101010101010101010101010101010101010101").unwrap()[..]).unwrap(); +/// # let hop_node_id1 = PublicKey::from_secret_key(&secp_ctx, &node_secret); +/// # let hop_node_id2 = hop_node_id1; +/// # Ok(OnionMessagePath { +/// # intermediate_nodes: vec![hop_node_id1, hop_node_id2], +/// # destination, +/// # addresses: None, +/// # }) /// # } /// # } /// # let seed = [42u8; 32]; @@ -86,7 +98,7 @@ use crate::prelude::*; /// # let node_secret = SecretKey::from_slice(&>::from_hex("0101010101010101010101010101010101010101010101010101010101010101").unwrap()[..]).unwrap(); /// # let secp_ctx = Secp256k1::new(); /// # let hop_node_id1 = PublicKey::from_secret_key(&secp_ctx, &node_secret); -/// # let (hop_node_id2, hop_node_id3, hop_node_id4) = (hop_node_id1, hop_node_id1, hop_node_id1); +/// # let (hop_node_id3, hop_node_id4) = (hop_node_id1, hop_node_id1); /// # let destination_node_id = hop_node_id1; /// # let message_router = Arc::new(FakeMessageRouter {}); /// # let custom_message_handler = IgnoringMessageHandler {}; @@ -113,13 +125,10 @@ use crate::prelude::*; /// } /// } /// // Send a custom onion message to a node id. -/// let path = OnionMessagePath { -/// intermediate_nodes: vec![hop_node_id1, hop_node_id2], -/// destination: Destination::Node(destination_node_id), -/// }; +/// let destination = Destination::Node(destination_node_id); /// let reply_path = None; /// # let message = YourCustomMessage {}; -/// onion_messenger.send_onion_message(path, message, reply_path); +/// onion_messenger.send_onion_message(message, destination, reply_path); /// /// // Create a blinded path to yourself, for someone to send an onion message to. /// # let your_node_id = hop_node_id1; @@ -127,13 +136,10 @@ use crate::prelude::*; /// let blinded_path = BlindedPath::new_for_message(&hops, &keys_manager, &secp_ctx).unwrap(); /// /// // Send a custom onion message to a blinded path. -/// let path = OnionMessagePath { -/// intermediate_nodes: vec![hop_node_id1, hop_node_id2], -/// destination: Destination::BlindedPath(blinded_path), -/// }; +/// let destination = Destination::BlindedPath(blinded_path); /// let reply_path = None; /// # let message = YourCustomMessage {}; -/// onion_messenger.send_onion_message(path, message, reply_path); +/// onion_messenger.send_onion_message(message, destination, reply_path); /// ``` /// /// [`InvoiceRequest`]: crate::offers::invoice_request::InvoiceRequest @@ -145,18 +151,80 @@ where L::Target: Logger, MR::Target: MessageRouter, OMH::Target: OffersMessageHandler, - CMH:: Target: CustomOnionMessageHandler, + CMH::Target: CustomOnionMessageHandler, { entropy_source: ES, node_signer: NS, logger: L, - pending_messages: Mutex>>, + message_recipients: Mutex>, secp_ctx: Secp256k1, message_router: MR, offers_handler: OMH, custom_handler: CMH, } +/// [`OnionMessage`]s buffered to be sent. +enum OnionMessageRecipient { + /// Messages for a node connected as a peer. + ConnectedPeer(VecDeque), + + /// Messages for a node that is not yet connected, which are dropped after [`MAX_TIMER_TICKS`] + /// and tracked here. + PendingConnection(VecDeque, Option>, usize), +} + +impl OnionMessageRecipient { + fn pending_connection(addresses: Vec) -> Self { + Self::PendingConnection(VecDeque::new(), Some(addresses), 0) + } + + fn pending_messages(&self) -> &VecDeque { + match self { + OnionMessageRecipient::ConnectedPeer(pending_messages) => pending_messages, + OnionMessageRecipient::PendingConnection(pending_messages, _, _) => pending_messages, + } + } + + fn enqueue_message(&mut self, message: OnionMessage) { + let pending_messages = match self { + OnionMessageRecipient::ConnectedPeer(pending_messages) => pending_messages, + OnionMessageRecipient::PendingConnection(pending_messages, _, _) => pending_messages, + }; + + pending_messages.push_back(message); + } + + fn dequeue_message(&mut self) -> Option { + let pending_messages = match self { + OnionMessageRecipient::ConnectedPeer(pending_messages) => pending_messages, + OnionMessageRecipient::PendingConnection(pending_messages, _, _) => { + debug_assert!(false); + pending_messages + }, + }; + + pending_messages.pop_front() + } + + #[cfg(test)] + fn release_pending_messages(&mut self) -> VecDeque { + let pending_messages = match self { + OnionMessageRecipient::ConnectedPeer(pending_messages) => pending_messages, + OnionMessageRecipient::PendingConnection(pending_messages, _, _) => pending_messages, + }; + + core::mem::take(pending_messages) + } + + fn mark_connected(&mut self) { + if let OnionMessageRecipient::PendingConnection(pending_messages, _, _) = self { + let mut new_pending_messages = VecDeque::new(); + core::mem::swap(pending_messages, &mut new_pending_messages); + *self = OnionMessageRecipient::ConnectedPeer(new_pending_messages); + } + } +} + /// An [`OnionMessage`] for [`OnionMessenger`] to send. /// /// These are obtained when released from [`OnionMessenger`]'s handlers after which they are @@ -198,16 +266,48 @@ pub trait MessageRouter { } /// A [`MessageRouter`] that can only route to a directly connected [`Destination`]. -pub struct DefaultMessageRouter; +pub struct DefaultMessageRouter>, L: Deref> +where + L::Target: Logger, +{ + network_graph: G, +} + +impl>, L: Deref> DefaultMessageRouter +where + L::Target: Logger, +{ + /// Creates a [`DefaultMessageRouter`] using the given [`NetworkGraph`]. + pub fn new(network_graph: G) -> Self { + Self { network_graph } + } +} -impl MessageRouter for DefaultMessageRouter { +impl>, L: Deref> MessageRouter for DefaultMessageRouter +where + L::Target: Logger, +{ fn find_path( &self, _sender: PublicKey, peers: Vec, destination: Destination ) -> Result { - if peers.contains(&destination.first_node()) { - Ok(OnionMessagePath { intermediate_nodes: vec![], destination }) + let first_node = destination.first_node(); + if peers.contains(&first_node) { + Ok(OnionMessagePath { intermediate_nodes: vec![], destination, addresses: None }) } else { - Err(()) + let network_graph = self.network_graph.deref().read_only(); + let node_announcement = network_graph + .node(&NodeId::from_pubkey(&first_node)) + .and_then(|node_info| node_info.announcement_info.as_ref()) + .and_then(|announcement_info| announcement_info.announcement_message.as_ref()) + .map(|node_announcement| &node_announcement.contents); + + match node_announcement { + Some(node_announcement) if node_announcement.features.supports_onion_messages() => { + let addresses = Some(node_announcement.addresses.clone()); + Ok(OnionMessagePath { intermediate_nodes: vec![], destination, addresses }) + }, + _ => Err(()), + } } } } @@ -220,6 +320,22 @@ pub struct OnionMessagePath { /// The recipient of the message. pub destination: Destination, + + /// Addresses that may be used to connect to [`OnionMessagePath::first_node`]. + /// + /// Only needs to be set if a connection to the node is required. [`OnionMessenger`] may use + /// this to initiate such a connection. + pub addresses: Option>, +} + +impl OnionMessagePath { + /// Returns the first node in the path. + pub fn first_node(&self) -> PublicKey { + self.intermediate_nodes + .first() + .copied() + .unwrap_or_else(|| self.destination.first_node()) + } } /// The destination of an onion message. @@ -247,6 +363,19 @@ impl Destination { } } +/// Result of successfully [sending an onion message]. +/// +/// [sending an onion message]: OnionMessenger::send_onion_message +#[derive(Debug, PartialEq, Eq)] +pub enum SendSuccess { + /// The message was buffered and will be sent once it is processed by + /// [`OnionMessageHandler::next_onion_message_for_peer`]. + Buffered, + /// The message was buffered and will be sent once the node is connected as a peer and it is + /// processed by [`OnionMessageHandler::next_onion_message_for_peer`]. + BufferedAwaitingConnection(PublicKey), +} + /// Errors that may occur when [sending an onion message]. /// /// [sending an onion message]: OnionMessenger::send_onion_message @@ -260,8 +389,10 @@ pub enum SendError { /// The provided [`Destination`] was an invalid [`BlindedPath`] due to not having any blinded /// hops. TooFewBlindedHops, - /// Our next-hop peer was offline or does not support onion message forwarding. - InvalidFirstHop, + /// The first hop is not a peer and doesn't have a known [`SocketAddress`]. + InvalidFirstHop(PublicKey), + /// A path from the sender to the destination could not be found by the [`MessageRouter`]. + PathNotFound, /// Onion message contents must have a TLV type >= 64. InvalidMessage, /// Our next-hop peer's buffer was full or our total outbound buffer was full. @@ -332,12 +463,12 @@ pub enum PeeledOnion { pub fn create_onion_message( entropy_source: &ES, node_signer: &NS, secp_ctx: &Secp256k1, path: OnionMessagePath, contents: T, reply_path: Option, -) -> Result<(PublicKey, OnionMessage), SendError> +) -> Result<(PublicKey, OnionMessage, Option>), SendError> where ES::Target: EntropySource, NS::Target: NodeSigner, { - let OnionMessagePath { intermediate_nodes, mut destination } = path; + let OnionMessagePath { intermediate_nodes, mut destination, addresses } = path; if let Destination::BlindedPath(BlindedPath { ref blinded_hops, .. }) = destination { if blinded_hops.is_empty() { return Err(SendError::TooFewBlindedHops); @@ -378,10 +509,8 @@ where let onion_routing_packet = construct_onion_message_packet( packet_payloads, packet_keys, prng_seed).map_err(|()| SendError::TooBigPacket)?; - Ok((first_node_id, OnionMessage { - blinding_point, - onion_routing_packet - })) + let message = OnionMessage { blinding_point, onion_routing_packet }; + Ok((first_node_id, message, addresses)) } /// Decode one layer of an incoming [`OnionMessage`]. @@ -502,7 +631,7 @@ where OnionMessenger { entropy_source, node_signer, - pending_messages: Mutex::new(HashMap::new()), + message_recipients: Mutex::new(HashMap::new()), secp_ctx, logger, message_router, @@ -511,38 +640,109 @@ where } } - /// Sends an [`OnionMessage`] with the given `contents` for sending to the destination of - /// `path`. + /// Sends an [`OnionMessage`] with the given `contents` to `destination`. /// /// See [`OnionMessenger`] for example usage. pub fn send_onion_message( - &self, path: OnionMessagePath, contents: T, reply_path: Option - ) -> Result<(), SendError> { + &self, contents: T, destination: Destination, reply_path: Option + ) -> Result { + self.find_path_and_enqueue_onion_message( + contents, destination, reply_path, format_args!("") + ) + } - log_trace!(self.logger, "Sending onion message: {:?}", contents); - - let (first_node_id, onion_msg) = create_onion_message( + fn find_path_and_enqueue_onion_message( + &self, contents: T, destination: Destination, reply_path: Option, + log_suffix: fmt::Arguments + ) -> Result { + let result = self.find_path(destination) + .and_then(|path| self.enqueue_onion_message(path, contents, reply_path, log_suffix)); + + match result.as_ref() { + Err(SendError::GetNodeIdFailed) => { + log_warn!(self.logger, "Unable to retrieve node id {}", log_suffix); + }, + Err(SendError::PathNotFound) => { + log_trace!(self.logger, "Failed to find path {}", log_suffix); + }, + Err(e) => { + log_trace!(self.logger, "Failed sending onion message {}: {:?}", log_suffix, e); + }, + Ok(SendSuccess::Buffered) => { + log_trace!(self.logger, "Buffered onion message {}", log_suffix); + }, + Ok(SendSuccess::BufferedAwaitingConnection(node_id)) => { + log_trace!( + self.logger, "Buffered onion message waiting on peer connection {}: {:?}", + log_suffix, node_id + ); + }, + } + + result + } + + fn find_path(&self, destination: Destination) -> Result { + let sender = self.node_signer + .get_node_id(Recipient::Node) + .map_err(|_| SendError::GetNodeIdFailed)?; + + let peers = self.message_recipients.lock().unwrap() + .iter() + .filter(|(_, recipient)| matches!(recipient, OnionMessageRecipient::ConnectedPeer(_))) + .map(|(node_id, _)| *node_id) + .collect(); + + self.message_router + .find_path(sender, peers, destination) + .map_err(|_| SendError::PathNotFound) + } + + fn enqueue_onion_message( + &self, path: OnionMessagePath, contents: T, reply_path: Option, + log_suffix: fmt::Arguments + ) -> Result { + log_trace!(self.logger, "Constructing onion message {}: {:?}", log_suffix, contents); + + let (first_node_id, onion_message, addresses) = create_onion_message( &self.entropy_source, &self.node_signer, &self.secp_ctx, path, contents, reply_path )?; - let mut pending_per_peer_msgs = self.pending_messages.lock().unwrap(); - if outbound_buffer_full(&first_node_id, &pending_per_peer_msgs) { return Err(SendError::BufferFull) } - match pending_per_peer_msgs.entry(first_node_id) { - hash_map::Entry::Vacant(_) => Err(SendError::InvalidFirstHop), + let mut message_recipients = self.message_recipients.lock().unwrap(); + if outbound_buffer_full(&first_node_id, &message_recipients) { + return Err(SendError::BufferFull); + } + + match message_recipients.entry(first_node_id) { + hash_map::Entry::Vacant(e) => match addresses { + None => Err(SendError::InvalidFirstHop(first_node_id)), + Some(addresses) => { + e.insert(OnionMessageRecipient::pending_connection(addresses)) + .enqueue_message(onion_message); + Ok(SendSuccess::BufferedAwaitingConnection(first_node_id)) + }, + }, hash_map::Entry::Occupied(mut e) => { - e.get_mut().push_back(onion_msg); - Ok(()) - } + e.get_mut().enqueue_message(onion_message); + Ok(SendSuccess::Buffered) + }, } } + #[cfg(test)] + pub(super) fn send_onion_message_using_path( + &self, path: OnionMessagePath, contents: T, reply_path: Option + ) -> Result { + self.enqueue_onion_message(path, contents, reply_path, format_args!("")) + } + fn handle_onion_message_response( &self, response: Option, reply_path: Option, log_suffix: fmt::Arguments ) { if let Some(response) = response { match reply_path { Some(reply_path) => { - self.find_path_and_enqueue_onion_message( + let _ = self.find_path_and_enqueue_onion_message( response, Destination::BlindedPath(reply_path), None, log_suffix ); }, @@ -553,55 +753,26 @@ where } } - fn find_path_and_enqueue_onion_message( - &self, contents: T, destination: Destination, reply_path: Option, - log_suffix: fmt::Arguments - ) { - let sender = match self.node_signer.get_node_id(Recipient::Node) { - Ok(node_id) => node_id, - Err(_) => { - log_warn!(self.logger, "Unable to retrieve node id {}", log_suffix); - return; - } - }; - - let peers = self.pending_messages.lock().unwrap().keys().copied().collect(); - let path = match self.message_router.find_path(sender, peers, destination) { - Ok(path) => path, - Err(()) => { - log_trace!(self.logger, "Failed to find path {}", log_suffix); - return; - }, - }; - - log_trace!(self.logger, "Sending onion message {}: {:?}", log_suffix, contents); - - if let Err(e) = self.send_onion_message(path, contents, reply_path) { - log_trace!(self.logger, "Failed sending onion message {}: {:?}", log_suffix, e); - return; - } - } - #[cfg(test)] pub(super) fn release_pending_msgs(&self) -> HashMap> { - let mut pending_msgs = self.pending_messages.lock().unwrap(); + let mut message_recipients = self.message_recipients.lock().unwrap(); let mut msgs = HashMap::new(); // We don't want to disconnect the peers by removing them entirely from the original map, so we - // swap the pending message buffers individually. - for (peer_node_id, pending_messages) in &mut *pending_msgs { - msgs.insert(*peer_node_id, core::mem::take(pending_messages)); + // release the pending message buffers individually. + for (node_id, recipient) in &mut *message_recipients { + msgs.insert(*node_id, recipient.release_pending_messages()); } msgs } } -fn outbound_buffer_full(peer_node_id: &PublicKey, buffer: &HashMap>) -> bool { +fn outbound_buffer_full(peer_node_id: &PublicKey, buffer: &HashMap) -> bool { const MAX_TOTAL_BUFFER_SIZE: usize = (1 << 20) * 128; const MAX_PER_PEER_BUFFER_SIZE: usize = (1 << 10) * 256; let mut total_buffered_bytes = 0; let mut peer_buffered_bytes = 0; for (pk, peer_buf) in buffer { - for om in peer_buf { + for om in peer_buf.pending_messages() { let om_len = om.serialized_length(); if pk == peer_node_id { peer_buffered_bytes += om_len; @@ -618,6 +789,27 @@ fn outbound_buffer_full(peer_node_id: &PublicKey, buffer: &HashMap EventsProvider +for OnionMessenger +where + ES::Target: EntropySource, + NS::Target: NodeSigner, + L::Target: Logger, + MR::Target: MessageRouter, + OMH::Target: OffersMessageHandler, + CMH::Target: CustomOnionMessageHandler, +{ + fn process_pending_events(&self, handler: H) where H::Target: EventHandler { + for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() { + if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient { + if let Some(addresses) = addresses.take() { + handler.handle_event(Event::ConnectionNeeded { node_id: *node_id, addresses }); + } + } + } + } +} + impl OnionMessageHandler for OnionMessenger where @@ -660,24 +852,28 @@ where } }, Ok(PeeledOnion::Forward(next_node_id, onion_message)) => { - let mut pending_per_peer_msgs = self.pending_messages.lock().unwrap(); - if outbound_buffer_full(&next_node_id, &pending_per_peer_msgs) { + let mut message_recipients = self.message_recipients.lock().unwrap(); + if outbound_buffer_full(&next_node_id, &message_recipients) { log_trace!(self.logger, "Dropping forwarded onion message to peer {:?}: outbound buffer full", next_node_id); return } #[cfg(fuzzing)] - pending_per_peer_msgs.entry(next_node_id).or_insert_with(VecDeque::new); - - match pending_per_peer_msgs.entry(next_node_id) { - hash_map::Entry::Vacant(_) => { + message_recipients + .entry(next_node_id) + .or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new())); + + match message_recipients.entry(next_node_id) { + hash_map::Entry::Occupied(mut e) if matches!( + e.get(), OnionMessageRecipient::ConnectedPeer(..) + ) => { + e.get_mut().enqueue_message(onion_message); + log_trace!(self.logger, "Forwarding an onion message to peer {}", next_node_id); + }, + _ => { log_trace!(self.logger, "Dropping forwarded onion message to disconnected peer {:?}", next_node_id); return }, - hash_map::Entry::Occupied(mut e) => { - e.get_mut().push_back(onion_message); - log_trace!(self.logger, "Forwarding an onion message to peer {}", next_node_id); - } } }, Err(e) => { @@ -688,15 +884,42 @@ where fn peer_connected(&self, their_node_id: &PublicKey, init: &msgs::Init, _inbound: bool) -> Result<(), ()> { if init.features.supports_onion_messages() { - let mut peers = self.pending_messages.lock().unwrap(); - peers.insert(their_node_id.clone(), VecDeque::new()); + self.message_recipients.lock().unwrap() + .entry(*their_node_id) + .or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new())) + .mark_connected(); + } else { + self.message_recipients.lock().unwrap().remove(their_node_id); } + Ok(()) } fn peer_disconnected(&self, their_node_id: &PublicKey) { - let mut pending_msgs = self.pending_messages.lock().unwrap(); - pending_msgs.remove(their_node_id); + match self.message_recipients.lock().unwrap().remove(their_node_id) { + Some(OnionMessageRecipient::ConnectedPeer(..)) => {}, + _ => debug_assert!(false), + } + } + + fn timer_tick_occurred(&self) { + let mut message_recipients = self.message_recipients.lock().unwrap(); + + // Drop any pending recipients since the last call to avoid retaining buffered messages for + // too long. + message_recipients.retain(|_, recipient| match recipient { + OnionMessageRecipient::PendingConnection(_, None, ticks) => *ticks < MAX_TIMER_TICKS, + OnionMessageRecipient::PendingConnection(_, Some(_), _) => true, + _ => true, + }); + + // Increment a timer tick for pending recipients so that their buffered messages are dropped + // at MAX_TIMER_TICKS. + for recipient in message_recipients.values_mut() { + if let OnionMessageRecipient::PendingConnection(_, None, ticks) = recipient { + *ticks += 1; + } + } } fn provided_node_features(&self) -> NodeFeatures { @@ -721,7 +944,7 @@ where let PendingOnionMessage { contents, destination, reply_path } = message; #[cfg(c_bindings)] let (contents, destination, reply_path) = message; - self.find_path_and_enqueue_onion_message( + let _ = self.find_path_and_enqueue_onion_message( contents, destination, reply_path, format_args!("when sending OffersMessage") ); } @@ -732,16 +955,14 @@ where let PendingOnionMessage { contents, destination, reply_path } = message; #[cfg(c_bindings)] let (contents, destination, reply_path) = message; - self.find_path_and_enqueue_onion_message( + let _ = self.find_path_and_enqueue_onion_message( contents, destination, reply_path, format_args!("when sending CustomMessage") ); } - let mut pending_msgs = self.pending_messages.lock().unwrap(); - if let Some(msgs) = pending_msgs.get_mut(&peer_node_id) { - return msgs.pop_front() - } - None + self.message_recipients.lock().unwrap() + .get_mut(&peer_node_id) + .and_then(|buffer| buffer.dequeue_message()) } } @@ -759,7 +980,7 @@ pub type SimpleArcOnionMessenger = OnionMessenger< Arc, Arc, Arc, - Arc, + Arc>>, Arc>>, Arc>, IgnoringMessageHandler >; @@ -778,7 +999,7 @@ pub type SimpleRefOnionMessenger< &'a KeysManager, &'a KeysManager, &'b L, - &'i DefaultMessageRouter, + &'i DefaultMessageRouter<&'g NetworkGraph<&'b L>, &'b L>, &'j SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L>, IgnoringMessageHandler >;