diff --git a/lightning/src/onion_message/functional_tests.rs b/lightning/src/onion_message/functional_tests.rs index 10c5bea86f7..f5f4854dda1 100644 --- a/lightning/src/onion_message/functional_tests.rs +++ b/lightning/src/onion_message/functional_tests.rs @@ -31,7 +31,7 @@ use crate::routing::test_utils::{add_channel, add_or_update_node}; use crate::sign::{NodeSigner, Recipient}; use crate::types::features::{ChannelFeatures, InitFeatures}; use crate::util::ser::{FixedLengthReader, LengthReadable, Writeable, Writer}; -use crate::util::test_utils; +use crate::util::test_utils::{TestChainSource, TestKeysInterface, TestLogger, TestNodeSigner}; use bitcoin::hex::FromHex; use bitcoin::network::Network; @@ -45,35 +45,26 @@ use core::ops::Deref; use crate::prelude::*; +type NetGraph = NetworkGraph>; +type MessageRouter = DefaultMessageRouter, Arc, Arc>; + struct MessengerNode { node_id: PublicKey, privkey: SecretKey, - entropy_source: Arc, + entropy_source: Arc, messenger: OnionMessenger< - Arc, - Arc, - Arc, + Arc, + Arc, + Arc, Arc, - Arc< - DefaultMessageRouter< - Arc>>, - Arc, - Arc, - >, - >, + Arc, Arc, Arc, Arc, Arc, >, custom_message_handler: Arc, - gossip_sync: Arc< - P2PGossipSync< - Arc>>, - Arc, - Arc, - >, - >, + gossip_sync: Arc, Arc, Arc>>, } impl Drop for MessengerNode { @@ -171,17 +162,13 @@ impl TestCustomMessageHandler { } fn expect_message(&self, message: TestCustomMessage) { - self.expectations - .lock() - .unwrap() - .push_back(OnHandleCustomMessage { expect: message, include_reply_path: false }); + let expectation = OnHandleCustomMessage { expect: message, include_reply_path: false }; + self.expectations.lock().unwrap().push_back(expectation); } fn expect_message_and_response(&self, message: TestCustomMessage) { - self.expectations - .lock() - .unwrap() - .push_back(OnHandleCustomMessage { expect: message, include_reply_path: true }); + let expectation = OnHandleCustomMessage { expect: message, include_reply_path: true }; + self.expectations.lock().unwrap().push_back(expectation); } fn get_next_expectation(&self) -> OnHandleCustomMessage { @@ -217,12 +204,11 @@ impl CustomOnionMessageHandler for TestCustomMessageHandler { } match responder { - Some(responder) if expectation.include_reply_path => Some(( - response, - responder.respond_with_reply_path(MessageContext::Custom( - context.unwrap_or_else(Vec::new), - )), - )), + Some(responder) if expectation.include_reply_path => { + let context = MessageContext::Custom(context.unwrap_or_else(Vec::new)); + let reply = responder.respond_with_reply_path(context); + Some((response, reply)) + }, Some(responder) => Some((response, responder.respond())), None => None, } @@ -278,7 +264,7 @@ impl MessengerCfg { } fn create_nodes_using_cfgs(cfgs: Vec) -> Vec { - let gossip_logger = Arc::new(test_utils::TestLogger::with_id("gossip".to_string())); + let gossip_logger = Arc::new(TestLogger::with_id("gossip".to_string())); let network_graph = Arc::new(NetworkGraph::new(Network::Testnet, gossip_logger.clone())); let gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), None, gossip_logger)); @@ -286,10 +272,10 @@ fn create_nodes_using_cfgs(cfgs: Vec) -> Vec { for (i, cfg) in cfgs.into_iter().enumerate() { let secret_key = cfg.secret_override.unwrap_or(SecretKey::from_slice(&[(i + 1) as u8; 32]).unwrap()); - let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i))); + let logger = Arc::new(TestLogger::with_id(format!("node {}", i))); let seed = [i as u8; 32]; - let entropy_source = Arc::new(test_utils::TestKeysInterface::new(&seed, Network::Testnet)); - let node_signer = Arc::new(test_utils::TestNodeSigner::new(secret_key)); + let entropy_source = Arc::new(TestKeysInterface::new(&seed, Network::Testnet)); + let node_signer = Arc::new(TestNodeSigner::new(secret_key)); let node_id_lookup = Arc::new(EmptyNodeIdLookUp {}); let message_router = @@ -421,14 +407,9 @@ fn one_blinded_hop() { let secp_ctx = Secp256k1::new(); let context = MessageContext::Custom(Vec::new()); - let blinded_path = BlindedMessagePath::new( - &[], - nodes[1].node_id, - context, - &*nodes[1].entropy_source, - &secp_ctx, - ) - .unwrap(); + let entropy = &*nodes[1].entropy_source; + let blinded_path = + BlindedMessagePath::new(&[], nodes[1].node_id, context, entropy, &secp_ctx).unwrap(); let destination = Destination::BlindedPath(blinded_path); let instructions = MessageSendInstructions::WithoutReplyPath { destination }; nodes[0].messenger.send_onion_message(test_msg, instructions).unwrap(); @@ -445,14 +426,10 @@ fn two_unblinded_two_blinded() { let intermediate_nodes = [MessageForwardNode { node_id: nodes[3].node_id, short_channel_id: None }]; let context = MessageContext::Custom(Vec::new()); - let blinded_path = BlindedMessagePath::new( - &intermediate_nodes, - nodes[4].node_id, - context, - &*nodes[4].entropy_source, - &secp_ctx, - ) - .unwrap(); + let entropy = &*nodes[4].entropy_source; + let blinded_path = + BlindedMessagePath::new(&intermediate_nodes, nodes[4].node_id, context, entropy, &secp_ctx) + .unwrap(); let path = OnionMessagePath { intermediate_nodes: vec![nodes[1].node_id, nodes[2].node_id], destination: Destination::BlindedPath(blinded_path), @@ -475,14 +452,10 @@ fn three_blinded_hops() { MessageForwardNode { node_id: nodes[2].node_id, short_channel_id: None }, ]; let context = MessageContext::Custom(Vec::new()); - let blinded_path = BlindedMessagePath::new( - &intermediate_nodes, - nodes[3].node_id, - context, - &*nodes[3].entropy_source, - &secp_ctx, - ) - .unwrap(); + let entropy = &*nodes[3].entropy_source; + let blinded_path = + BlindedMessagePath::new(&intermediate_nodes, nodes[3].node_id, context, entropy, &secp_ctx) + .unwrap(); let destination = Destination::BlindedPath(blinded_path); let instructions = MessageSendInstructions::WithoutReplyPath { destination }; @@ -506,14 +479,9 @@ fn async_response_over_one_blinded_hop() { // 3. Simulate the creation of a Blinded Reply path provided by Bob. let secp_ctx = Secp256k1::new(); let context = MessageContext::Custom(Vec::new()); - let reply_path = BlindedMessagePath::new( - &[], - nodes[1].node_id, - context, - &*nodes[1].entropy_source, - &secp_ctx, - ) - .unwrap(); + let entropy = &*nodes[1].entropy_source; + let reply_path = + BlindedMessagePath::new(&[], nodes[1].node_id, context, entropy, &secp_ctx).unwrap(); // 4. Create a responder using the reply path for Alice. let responder = Some(Responder::new(reply_path)); @@ -641,14 +609,10 @@ fn we_are_intro_node() { MessageForwardNode { node_id: nodes[1].node_id, short_channel_id: None }, ]; let context = MessageContext::Custom(Vec::new()); - let blinded_path = BlindedMessagePath::new( - &intermediate_nodes, - nodes[2].node_id, - context, - &*nodes[2].entropy_source, - &secp_ctx, - ) - .unwrap(); + let entropy = &*nodes[2].entropy_source; + let blinded_path = + BlindedMessagePath::new(&intermediate_nodes, nodes[2].node_id, context, entropy, &secp_ctx) + .unwrap(); let destination = Destination::BlindedPath(blinded_path); let instructions = MessageSendInstructions::WithoutReplyPath { destination }; @@ -660,14 +624,10 @@ fn we_are_intro_node() { let intermediate_nodes = [MessageForwardNode { node_id: nodes[0].node_id, short_channel_id: None }]; let context = MessageContext::Custom(Vec::new()); - let blinded_path = BlindedMessagePath::new( - &intermediate_nodes, - nodes[1].node_id, - context, - &*nodes[1].entropy_source, - &secp_ctx, - ) - .unwrap(); + let entropy = &*nodes[1].entropy_source; + let blinded_path = + BlindedMessagePath::new(&intermediate_nodes, nodes[1].node_id, context, entropy, &secp_ctx) + .unwrap(); let destination = Destination::BlindedPath(blinded_path); let instructions = MessageSendInstructions::WithoutReplyPath { destination }; @@ -687,14 +647,10 @@ fn invalid_blinded_path_error() { let intermediate_nodes = [MessageForwardNode { node_id: nodes[1].node_id, short_channel_id: None }]; let context = MessageContext::Custom(Vec::new()); - let mut blinded_path = BlindedMessagePath::new( - &intermediate_nodes, - nodes[2].node_id, - context, - &*nodes[2].entropy_source, - &secp_ctx, - ) - .unwrap(); + let entropy = &*nodes[2].entropy_source; + let mut blinded_path = + BlindedMessagePath::new(&intermediate_nodes, nodes[2].node_id, context, entropy, &secp_ctx) + .unwrap(); blinded_path.clear_blinded_hops(); let destination = Destination::BlindedPath(blinded_path); let instructions = MessageSendInstructions::WithoutReplyPath { destination }; @@ -720,14 +676,10 @@ fn reply_path() { MessageForwardNode { node_id: nodes[1].node_id, short_channel_id: None }, ]; let context = MessageContext::Custom(Vec::new()); - let reply_path = BlindedMessagePath::new( - &intermediate_nodes, - nodes[0].node_id, - context, - &*nodes[0].entropy_source, - &secp_ctx, - ) - .unwrap(); + let entropy = &*nodes[0].entropy_source; + let reply_path = + BlindedMessagePath::new(&intermediate_nodes, nodes[0].node_id, context, entropy, &secp_ctx) + .unwrap(); nodes[0] .messenger .send_onion_message_using_path(path, test_msg.clone(), Some(reply_path)) @@ -745,28 +697,20 @@ fn reply_path() { MessageForwardNode { node_id: nodes[2].node_id, short_channel_id: None }, ]; let context = MessageContext::Custom(Vec::new()); - let blinded_path = BlindedMessagePath::new( - &intermediate_nodes, - nodes[3].node_id, - context, - &*nodes[3].entropy_source, - &secp_ctx, - ) - .unwrap(); + let entropy = &*nodes[3].entropy_source; + let blinded_path = + BlindedMessagePath::new(&intermediate_nodes, nodes[3].node_id, context, entropy, &secp_ctx) + .unwrap(); let destination = Destination::BlindedPath(blinded_path); let intermediate_nodes = [ MessageForwardNode { node_id: nodes[2].node_id, short_channel_id: None }, MessageForwardNode { node_id: nodes[1].node_id, short_channel_id: None }, ]; let context = MessageContext::Custom(Vec::new()); - let reply_path = BlindedMessagePath::new( - &intermediate_nodes, - nodes[0].node_id, - context, - &*nodes[0].entropy_source, - &secp_ctx, - ) - .unwrap(); + let entropy = &*nodes[0].entropy_source; + let reply_path = + BlindedMessagePath::new(&intermediate_nodes, nodes[0].node_id, context, entropy, &secp_ctx) + .unwrap(); let instructions = MessageSendInstructions::WithSpecifiedReplyPath { destination, reply_path }; nodes[0].messenger.send_onion_message(test_msg, instructions).unwrap(); @@ -821,8 +765,7 @@ fn peer_buffer_full() { let destination = Destination::Node(nodes[1].node_id); let instructions = MessageSendInstructions::WithoutReplyPath { destination }; - for _ in 0..188 { - // Based on MAX_PER_PEER_BUFFER_SIZE in OnionMessenger + for _ in 0..super::messenger::MAX_PER_PEER_BUFFER_SIZE / 1401 + 1 { nodes[0].messenger.send_onion_message(test_msg.clone(), instructions.clone()).unwrap(); } let err = nodes[0].messenger.send_onion_message(test_msg, instructions.clone()).unwrap_err(); @@ -862,14 +805,10 @@ fn requests_peer_connection_for_buffered_messages() { let intermediate_nodes = [MessageForwardNode { node_id: nodes[1].node_id, short_channel_id: None }]; let context = MessageContext::Custom(Vec::new()); - let blinded_path = BlindedMessagePath::new( - &intermediate_nodes, - nodes[2].node_id, - context, - &*nodes[0].entropy_source, - &secp_ctx, - ) - .unwrap(); + let entropy = &*nodes[0].entropy_source; + let blinded_path = + BlindedMessagePath::new(&intermediate_nodes, nodes[2].node_id, context, entropy, &secp_ctx) + .unwrap(); let destination = Destination::BlindedPath(blinded_path); let instructions = MessageSendInstructions::WithoutReplyPath { destination }; @@ -908,14 +847,10 @@ fn drops_buffered_messages_waiting_for_peer_connection() { let intermediate_nodes = [MessageForwardNode { node_id: nodes[1].node_id, short_channel_id: None }]; let context = MessageContext::Custom(Vec::new()); - let blinded_path = BlindedMessagePath::new( - &intermediate_nodes, - nodes[2].node_id, - context, - &*nodes[0].entropy_source, - &secp_ctx, - ) - .unwrap(); + let entropy = &*nodes[0].entropy_source; + let blinded_path = + BlindedMessagePath::new(&intermediate_nodes, nodes[2].node_id, context, entropy, &secp_ctx) + .unwrap(); let destination = Destination::BlindedPath(blinded_path); let instructions = MessageSendInstructions::WithoutReplyPath { destination }; @@ -970,14 +905,10 @@ fn intercept_offline_peer_oms() { let intermediate_nodes = [MessageForwardNode { node_id: nodes[1].node_id, short_channel_id: None }]; let context = MessageContext::Custom(Vec::new()); - let blinded_path = BlindedMessagePath::new( - &intermediate_nodes, - nodes[2].node_id, - context, - &*nodes[2].entropy_source, - &secp_ctx, - ) - .unwrap(); + let entropy = &*nodes[2].entropy_source; + let blinded_path = + BlindedMessagePath::new(&intermediate_nodes, nodes[2].node_id, context, entropy, &secp_ctx) + .unwrap(); let destination = Destination::BlindedPath(blinded_path); let instructions = MessageSendInstructions::WithoutReplyPath { destination }; @@ -999,10 +930,9 @@ fn intercept_offline_peer_oms() { // Ensure that we'll refuse to forward the re-injected OM until after the // outbound peer comes back online. - let err = nodes[1] - .messenger - .forward_onion_message(onion_message.clone(), &final_node_vec[0].node_id) - .unwrap_err(); + let next_node_id = &final_node_vec[0].node_id; + let err = + nodes[1].messenger.forward_onion_message(onion_message.clone(), next_node_id).unwrap_err(); assert_eq!(err, SendError::InvalidFirstHop(final_node_vec[0].node_id)); connect_peers(&nodes[1], &final_node_vec[0]); @@ -1043,17 +973,12 @@ fn spec_test_vector() { let sender_to_alice_packet: Packet = ::read(&mut packet_reader).unwrap(); let secp_ctx = Secp256k1::new(); + + let blinding_key_hex = "6363636363636363636363636363636363636363636363636363636363636363"; + let blinding_key = + SecretKey::from_slice(&>::from_hex(blinding_key_hex).unwrap()).unwrap(); let sender_to_alice_om = msgs::OnionMessage { - blinding_point: PublicKey::from_secret_key( - &secp_ctx, - &SecretKey::from_slice( - &>::from_hex( - "6363636363636363636363636363636363636363636363636363636363636363", - ) - .unwrap(), - ) - .unwrap(), - ), + blinding_point: PublicKey::from_secret_key(&secp_ctx, &blinding_key), onion_routing_packet: sender_to_alice_packet, }; // The spec test vectors prepend the OM message type (513) to the encoded onion message strings, diff --git a/lightning/src/onion_message/messenger.rs b/lightning/src/onion_message/messenger.rs index f918c998f6d..012b7978053 100644 --- a/lightning/src/onion_message/messenger.rs +++ b/lightning/src/onion_message/messenger.rs @@ -587,16 +587,11 @@ where a_tor_only.cmp(b_tor_only).then(a_channels.cmp(b_channels).reverse()) }); + let entropy = &**entropy_source; let paths = peer_info .into_iter() .map(|(peer, _, _)| { - BlindedMessagePath::new( - &[peer], - recipient, - context.clone(), - &**entropy_source, - secp_ctx, - ) + BlindedMessagePath::new(&[peer], recipient, context.clone(), entropy, secp_ctx) }) .take(MAX_PATHS) .collect::, _>>(); @@ -1050,13 +1045,11 @@ where let blinding_factor = { let mut hmac = HmacEngine::::new(b"blinded_node_id"); hmac.input(control_tlvs_ss.as_ref()); - Hmac::from_engine(hmac).to_byte_array() + let hmac = Hmac::from_engine(hmac).to_byte_array(); + Scalar::from_be_bytes(hmac).unwrap() }; - match node_signer.ecdh( - Recipient::Node, - &msg.onion_routing_packet.public_key, - Some(&Scalar::from_be_bytes(blinding_factor).unwrap()), - ) { + let packet_pubkey = &msg.onion_routing_packet.public_key; + match node_signer.ecdh(Recipient::Node, packet_pubkey, Some(&blinding_factor)) { Ok(ss) => ss.secret_bytes(), Err(()) => { log_trace!(logger, "Failed to compute onion packet shared secret"); @@ -1064,18 +1057,15 @@ where }, } }; - match onion_utils::decode_next_untagged_hop( + let next_hop = onion_utils::decode_next_untagged_hop( onion_decode_ss, &msg.onion_routing_packet.hop_data[..], msg.onion_routing_packet.hmac, (control_tlvs_ss, custom_handler.deref(), logger.deref()), - ) { + ); + match next_hop { Ok(( - Payload::Receive::< - ParsedOnionMessageContents< - <::Target as CustomOnionMessageHandler>::CustomMessage, - >, - > { + Payload::Receive { message, control_tlvs: ReceiveControlTlvs::Unblinded(ReceiveTlvs { context }), reply_path, @@ -1117,11 +1107,10 @@ where // unwrapping the onion layers to get to the final payload. Since we don't have the option // of creating blinded paths with dummy hops currently, we should be ok to not handle this // for now. - let new_pubkey = match onion_utils::next_hop_pubkey( - &secp_ctx, - msg.onion_routing_packet.public_key, - &onion_decode_ss, - ) { + let packet_pubkey = msg.onion_routing_packet.public_key; + let new_pubkey_opt = + onion_utils::next_hop_pubkey(&secp_ctx, packet_pubkey, &onion_decode_ss); + let new_pubkey = match new_pubkey_opt { Ok(pk) => pk, Err(e) => { log_trace!(logger, "Failed to compute next hop packet pubkey: {}", e); @@ -1406,14 +1395,14 @@ where .map_err(|_| SendError::GetNodeIdFailed)?; let secp_ctx = &self.secp_ctx; - let peers = self - .message_recipients - .lock() - .unwrap() - .iter() - .filter(|(_, peer)| matches!(peer, OnionMessageRecipient::ConnectedPeer(_))) - .map(|(node_id, _)| *node_id) - .collect::>(); + let peers = { + let message_recipients = self.message_recipients.lock().unwrap(); + message_recipients + .iter() + .filter(|(_, peer)| matches!(peer, OnionMessageRecipient::ConnectedPeer(_))) + .map(|(node_id, _)| *node_id) + .collect::>() + }; self.message_router .create_blinded_paths(recipient, context, peers, secp_ctx) @@ -1627,10 +1616,9 @@ where if num_peer_connecteds <= 1 { for event in peer_connecteds { if handler(event).await.is_ok() { - self.pending_peer_connected_events - .lock() - .unwrap() - .drain(..num_peer_connecteds); + let mut pending_peer_connected_events = + self.pending_peer_connected_events.lock().unwrap(); + pending_peer_connected_events.drain(..num_peer_connecteds); } else { // We failed handling the event. Return to have it eventually replayed. self.pending_events_processor.store(false, Ordering::Release); @@ -1661,11 +1649,12 @@ where } } +const MAX_TOTAL_BUFFER_SIZE: usize = (1 << 20) * 128; +pub(super) const MAX_PER_PEER_BUFFER_SIZE: usize = (1 << 10) * 256; + fn outbound_buffer_full( peer_node_id: &PublicKey, buffer: &HashMap, ) -> bool { - const MAX_TOTAL_BUFFER_SIZE: usize = (1 << 20) * 128; - const MAX_PER_PEER_BUFFER_SIZE: usize = (1 << 10) * 256; let mut total_buffered_bytes = 0; let mut peer_buffered_bytes = 0; for (pk, peer_buf) in buffer { @@ -1987,12 +1976,13 @@ where &self, their_node_id: PublicKey, init: &msgs::Init, _inbound: bool, ) -> Result<(), ()> { if init.features.supports_onion_messages() { - self.message_recipients - .lock() - .unwrap() - .entry(their_node_id) - .or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new())) - .mark_connected(); + { + let mut message_recipients = self.message_recipients.lock().unwrap(); + message_recipients + .entry(their_node_id) + .or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new())) + .mark_connected(); + } if self.intercept_messages_for_offline_peers { let mut pending_peer_connected_events = self.pending_peer_connected_events.lock().unwrap(); @@ -2089,11 +2079,8 @@ where ); } - self.message_recipients - .lock() - .unwrap() - .get_mut(&peer_node_id) - .and_then(|buffer| buffer.dequeue_message()) + let mut message_recipients = self.message_recipients.lock().unwrap(); + message_recipients.get_mut(&peer_node_id).and_then(|buffer| buffer.dequeue_message()) } }