From 0ba12dbb8878f57dc75a4de614b0f51d2f8bb0b5 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sat, 1 Feb 2025 18:37:07 +0000 Subject: [PATCH 1/4] Clean up type aliases in `functional_tests` In 3c3e93e7b5a871e57462cc14e0509d271421c88a we ran rustfmt blindly on `lightning/src/onion_message/functional_tests.rs`. This left the file with a handful of warts that really should have been cleaned up as we went. Here we clean up types a bit by importing structs directly and adding a few type aliases. --- .../src/onion_message/functional_tests.rs | 37 +++++++------------ 1 file changed, 14 insertions(+), 23 deletions(-) diff --git a/lightning/src/onion_message/functional_tests.rs b/lightning/src/onion_message/functional_tests.rs index 10c5bea86f7..2e3b26cc4f4 100644 --- a/lightning/src/onion_message/functional_tests.rs +++ b/lightning/src/onion_message/functional_tests.rs @@ -31,7 +31,7 @@ use crate::routing::test_utils::{add_channel, add_or_update_node}; use crate::sign::{NodeSigner, Recipient}; use crate::types::features::{ChannelFeatures, InitFeatures}; use crate::util::ser::{FixedLengthReader, LengthReadable, Writeable, Writer}; -use crate::util::test_utils; +use crate::util::test_utils::{TestChainSource, TestKeysInterface, TestLogger, TestNodeSigner}; use bitcoin::hex::FromHex; use bitcoin::network::Network; @@ -45,35 +45,26 @@ use core::ops::Deref; use crate::prelude::*; +type NetGraph = NetworkGraph>; +type MessageRouter = DefaultMessageRouter, Arc, Arc>; + struct MessengerNode { node_id: PublicKey, privkey: SecretKey, - entropy_source: Arc, + entropy_source: Arc, messenger: OnionMessenger< - Arc, - Arc, - Arc, + Arc, + Arc, + Arc, Arc, - Arc< - DefaultMessageRouter< - Arc>>, - Arc, - Arc, - >, - >, + Arc, Arc, Arc, Arc, Arc, >, custom_message_handler: Arc, - gossip_sync: Arc< - P2PGossipSync< - Arc>>, - Arc, - Arc, - >, - >, + gossip_sync: Arc, Arc, Arc>>, } impl Drop for MessengerNode { @@ -278,7 +269,7 @@ impl MessengerCfg { } fn create_nodes_using_cfgs(cfgs: Vec) -> Vec { - let gossip_logger = Arc::new(test_utils::TestLogger::with_id("gossip".to_string())); + let gossip_logger = Arc::new(TestLogger::with_id("gossip".to_string())); let network_graph = Arc::new(NetworkGraph::new(Network::Testnet, gossip_logger.clone())); let gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), None, gossip_logger)); @@ -286,10 +277,10 @@ fn create_nodes_using_cfgs(cfgs: Vec) -> Vec { for (i, cfg) in cfgs.into_iter().enumerate() { let secret_key = cfg.secret_override.unwrap_or(SecretKey::from_slice(&[(i + 1) as u8; 32]).unwrap()); - let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i))); + let logger = Arc::new(TestLogger::with_id(format!("node {}", i))); let seed = [i as u8; 32]; - let entropy_source = Arc::new(test_utils::TestKeysInterface::new(&seed, Network::Testnet)); - let node_signer = Arc::new(test_utils::TestNodeSigner::new(secret_key)); + let entropy_source = Arc::new(TestKeysInterface::new(&seed, Network::Testnet)); + let node_signer = Arc::new(TestNodeSigner::new(secret_key)); let node_id_lookup = Arc::new(EmptyNodeIdLookUp {}); let message_router = From f8a5ef5c2c3140724cbe70817e673d0236de4bd4 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sat, 1 Feb 2025 18:52:32 +0000 Subject: [PATCH 2/4] Clean up terrible rustfmt-isms in OM `functional_tests.rs` In 3c3e93e7b5a871e57462cc14e0509d271421c88a we ran rustfmt blindly on `lightning/src/onion_message/functional_tests.rs`. This left the file with a handful of warts that really should have been cleaned up as we went. Here we clean up a handful of rustfmt-isms, ranging from terrible to somewhat ugly, introduced in `functional_tests.rs`. --- .../src/onion_message/functional_tests.rs | 199 ++++++------------ 1 file changed, 67 insertions(+), 132 deletions(-) diff --git a/lightning/src/onion_message/functional_tests.rs b/lightning/src/onion_message/functional_tests.rs index 2e3b26cc4f4..bc74ca9f853 100644 --- a/lightning/src/onion_message/functional_tests.rs +++ b/lightning/src/onion_message/functional_tests.rs @@ -162,17 +162,13 @@ impl TestCustomMessageHandler { } fn expect_message(&self, message: TestCustomMessage) { - self.expectations - .lock() - .unwrap() - .push_back(OnHandleCustomMessage { expect: message, include_reply_path: false }); + let expectation = OnHandleCustomMessage { expect: message, include_reply_path: false }; + self.expectations.lock().unwrap().push_back(expectation); } fn expect_message_and_response(&self, message: TestCustomMessage) { - self.expectations - .lock() - .unwrap() - .push_back(OnHandleCustomMessage { expect: message, include_reply_path: true }); + let expectation = OnHandleCustomMessage { expect: message, include_reply_path: true }; + self.expectations.lock().unwrap().push_back(expectation); } fn get_next_expectation(&self) -> OnHandleCustomMessage { @@ -208,12 +204,11 @@ impl CustomOnionMessageHandler for TestCustomMessageHandler { } match responder { - Some(responder) if expectation.include_reply_path => Some(( - response, - responder.respond_with_reply_path(MessageContext::Custom( - context.unwrap_or_else(Vec::new), - )), - )), + Some(responder) if expectation.include_reply_path => { + let context = MessageContext::Custom(context.unwrap_or_else(Vec::new)); + let reply = responder.respond_with_reply_path(context); + Some((response, reply)) + }, Some(responder) => Some((response, responder.respond())), None => None, } @@ -412,14 +407,9 @@ fn one_blinded_hop() { let secp_ctx = Secp256k1::new(); let context = MessageContext::Custom(Vec::new()); - let blinded_path = BlindedMessagePath::new( - &[], - nodes[1].node_id, - context, - &*nodes[1].entropy_source, - &secp_ctx, - ) - .unwrap(); + let entropy = &*nodes[1].entropy_source; + let blinded_path = + BlindedMessagePath::new(&[], nodes[1].node_id, context, entropy, &secp_ctx).unwrap(); let destination = Destination::BlindedPath(blinded_path); let instructions = MessageSendInstructions::WithoutReplyPath { destination }; nodes[0].messenger.send_onion_message(test_msg, instructions).unwrap(); @@ -436,14 +426,10 @@ fn two_unblinded_two_blinded() { let intermediate_nodes = [MessageForwardNode { node_id: nodes[3].node_id, short_channel_id: None }]; let context = MessageContext::Custom(Vec::new()); - let blinded_path = BlindedMessagePath::new( - &intermediate_nodes, - nodes[4].node_id, - context, - &*nodes[4].entropy_source, - &secp_ctx, - ) - .unwrap(); + let entropy = &*nodes[4].entropy_source; + let blinded_path = + BlindedMessagePath::new(&intermediate_nodes, nodes[4].node_id, context, entropy, &secp_ctx) + .unwrap(); let path = OnionMessagePath { intermediate_nodes: vec![nodes[1].node_id, nodes[2].node_id], destination: Destination::BlindedPath(blinded_path), @@ -466,14 +452,10 @@ fn three_blinded_hops() { MessageForwardNode { node_id: nodes[2].node_id, short_channel_id: None }, ]; let context = MessageContext::Custom(Vec::new()); - let blinded_path = BlindedMessagePath::new( - &intermediate_nodes, - nodes[3].node_id, - context, - &*nodes[3].entropy_source, - &secp_ctx, - ) - .unwrap(); + let entropy = &*nodes[3].entropy_source; + let blinded_path = + BlindedMessagePath::new(&intermediate_nodes, nodes[3].node_id, context, entropy, &secp_ctx) + .unwrap(); let destination = Destination::BlindedPath(blinded_path); let instructions = MessageSendInstructions::WithoutReplyPath { destination }; @@ -497,14 +479,9 @@ fn async_response_over_one_blinded_hop() { // 3. Simulate the creation of a Blinded Reply path provided by Bob. let secp_ctx = Secp256k1::new(); let context = MessageContext::Custom(Vec::new()); - let reply_path = BlindedMessagePath::new( - &[], - nodes[1].node_id, - context, - &*nodes[1].entropy_source, - &secp_ctx, - ) - .unwrap(); + let entropy = &*nodes[1].entropy_source; + let reply_path = + BlindedMessagePath::new(&[], nodes[1].node_id, context, entropy, &secp_ctx).unwrap(); // 4. Create a responder using the reply path for Alice. let responder = Some(Responder::new(reply_path)); @@ -632,14 +609,10 @@ fn we_are_intro_node() { MessageForwardNode { node_id: nodes[1].node_id, short_channel_id: None }, ]; let context = MessageContext::Custom(Vec::new()); - let blinded_path = BlindedMessagePath::new( - &intermediate_nodes, - nodes[2].node_id, - context, - &*nodes[2].entropy_source, - &secp_ctx, - ) - .unwrap(); + let entropy = &*nodes[2].entropy_source; + let blinded_path = + BlindedMessagePath::new(&intermediate_nodes, nodes[2].node_id, context, entropy, &secp_ctx) + .unwrap(); let destination = Destination::BlindedPath(blinded_path); let instructions = MessageSendInstructions::WithoutReplyPath { destination }; @@ -651,14 +624,10 @@ fn we_are_intro_node() { let intermediate_nodes = [MessageForwardNode { node_id: nodes[0].node_id, short_channel_id: None }]; let context = MessageContext::Custom(Vec::new()); - let blinded_path = BlindedMessagePath::new( - &intermediate_nodes, - nodes[1].node_id, - context, - &*nodes[1].entropy_source, - &secp_ctx, - ) - .unwrap(); + let entropy = &*nodes[1].entropy_source; + let blinded_path = + BlindedMessagePath::new(&intermediate_nodes, nodes[1].node_id, context, entropy, &secp_ctx) + .unwrap(); let destination = Destination::BlindedPath(blinded_path); let instructions = MessageSendInstructions::WithoutReplyPath { destination }; @@ -678,14 +647,10 @@ fn invalid_blinded_path_error() { let intermediate_nodes = [MessageForwardNode { node_id: nodes[1].node_id, short_channel_id: None }]; let context = MessageContext::Custom(Vec::new()); - let mut blinded_path = BlindedMessagePath::new( - &intermediate_nodes, - nodes[2].node_id, - context, - &*nodes[2].entropy_source, - &secp_ctx, - ) - .unwrap(); + let entropy = &*nodes[2].entropy_source; + let mut blinded_path = + BlindedMessagePath::new(&intermediate_nodes, nodes[2].node_id, context, entropy, &secp_ctx) + .unwrap(); blinded_path.clear_blinded_hops(); let destination = Destination::BlindedPath(blinded_path); let instructions = MessageSendInstructions::WithoutReplyPath { destination }; @@ -711,14 +676,10 @@ fn reply_path() { MessageForwardNode { node_id: nodes[1].node_id, short_channel_id: None }, ]; let context = MessageContext::Custom(Vec::new()); - let reply_path = BlindedMessagePath::new( - &intermediate_nodes, - nodes[0].node_id, - context, - &*nodes[0].entropy_source, - &secp_ctx, - ) - .unwrap(); + let entropy = &*nodes[0].entropy_source; + let reply_path = + BlindedMessagePath::new(&intermediate_nodes, nodes[0].node_id, context, entropy, &secp_ctx) + .unwrap(); nodes[0] .messenger .send_onion_message_using_path(path, test_msg.clone(), Some(reply_path)) @@ -736,28 +697,20 @@ fn reply_path() { MessageForwardNode { node_id: nodes[2].node_id, short_channel_id: None }, ]; let context = MessageContext::Custom(Vec::new()); - let blinded_path = BlindedMessagePath::new( - &intermediate_nodes, - nodes[3].node_id, - context, - &*nodes[3].entropy_source, - &secp_ctx, - ) - .unwrap(); + let entropy = &*nodes[3].entropy_source; + let blinded_path = + BlindedMessagePath::new(&intermediate_nodes, nodes[3].node_id, context, entropy, &secp_ctx) + .unwrap(); let destination = Destination::BlindedPath(blinded_path); let intermediate_nodes = [ MessageForwardNode { node_id: nodes[2].node_id, short_channel_id: None }, MessageForwardNode { node_id: nodes[1].node_id, short_channel_id: None }, ]; let context = MessageContext::Custom(Vec::new()); - let reply_path = BlindedMessagePath::new( - &intermediate_nodes, - nodes[0].node_id, - context, - &*nodes[0].entropy_source, - &secp_ctx, - ) - .unwrap(); + let entropy = &*nodes[0].entropy_source; + let reply_path = + BlindedMessagePath::new(&intermediate_nodes, nodes[0].node_id, context, entropy, &secp_ctx) + .unwrap(); let instructions = MessageSendInstructions::WithSpecifiedReplyPath { destination, reply_path }; nodes[0].messenger.send_onion_message(test_msg, instructions).unwrap(); @@ -853,14 +806,10 @@ fn requests_peer_connection_for_buffered_messages() { let intermediate_nodes = [MessageForwardNode { node_id: nodes[1].node_id, short_channel_id: None }]; let context = MessageContext::Custom(Vec::new()); - let blinded_path = BlindedMessagePath::new( - &intermediate_nodes, - nodes[2].node_id, - context, - &*nodes[0].entropy_source, - &secp_ctx, - ) - .unwrap(); + let entropy = &*nodes[0].entropy_source; + let blinded_path = + BlindedMessagePath::new(&intermediate_nodes, nodes[2].node_id, context, entropy, &secp_ctx) + .unwrap(); let destination = Destination::BlindedPath(blinded_path); let instructions = MessageSendInstructions::WithoutReplyPath { destination }; @@ -899,14 +848,10 @@ fn drops_buffered_messages_waiting_for_peer_connection() { let intermediate_nodes = [MessageForwardNode { node_id: nodes[1].node_id, short_channel_id: None }]; let context = MessageContext::Custom(Vec::new()); - let blinded_path = BlindedMessagePath::new( - &intermediate_nodes, - nodes[2].node_id, - context, - &*nodes[0].entropy_source, - &secp_ctx, - ) - .unwrap(); + let entropy = &*nodes[0].entropy_source; + let blinded_path = + BlindedMessagePath::new(&intermediate_nodes, nodes[2].node_id, context, entropy, &secp_ctx) + .unwrap(); let destination = Destination::BlindedPath(blinded_path); let instructions = MessageSendInstructions::WithoutReplyPath { destination }; @@ -961,14 +906,10 @@ fn intercept_offline_peer_oms() { let intermediate_nodes = [MessageForwardNode { node_id: nodes[1].node_id, short_channel_id: None }]; let context = MessageContext::Custom(Vec::new()); - let blinded_path = BlindedMessagePath::new( - &intermediate_nodes, - nodes[2].node_id, - context, - &*nodes[2].entropy_source, - &secp_ctx, - ) - .unwrap(); + let entropy = &*nodes[2].entropy_source; + let blinded_path = + BlindedMessagePath::new(&intermediate_nodes, nodes[2].node_id, context, entropy, &secp_ctx) + .unwrap(); let destination = Destination::BlindedPath(blinded_path); let instructions = MessageSendInstructions::WithoutReplyPath { destination }; @@ -990,10 +931,9 @@ fn intercept_offline_peer_oms() { // Ensure that we'll refuse to forward the re-injected OM until after the // outbound peer comes back online. - let err = nodes[1] - .messenger - .forward_onion_message(onion_message.clone(), &final_node_vec[0].node_id) - .unwrap_err(); + let next_node_id = &final_node_vec[0].node_id; + let err = + nodes[1].messenger.forward_onion_message(onion_message.clone(), next_node_id).unwrap_err(); assert_eq!(err, SendError::InvalidFirstHop(final_node_vec[0].node_id)); connect_peers(&nodes[1], &final_node_vec[0]); @@ -1034,17 +974,12 @@ fn spec_test_vector() { let sender_to_alice_packet: Packet = ::read(&mut packet_reader).unwrap(); let secp_ctx = Secp256k1::new(); + + let blinding_key_hex = "6363636363636363636363636363636363636363636363636363636363636363"; + let blinding_key = + SecretKey::from_slice(&>::from_hex(blinding_key_hex).unwrap()).unwrap(); let sender_to_alice_om = msgs::OnionMessage { - blinding_point: PublicKey::from_secret_key( - &secp_ctx, - &SecretKey::from_slice( - &>::from_hex( - "6363636363636363636363636363636363636363636363636363636363636363", - ) - .unwrap(), - ) - .unwrap(), - ), + blinding_point: PublicKey::from_secret_key(&secp_ctx, &blinding_key), onion_routing_packet: sender_to_alice_packet, }; // The spec test vectors prepend the OM message type (513) to the encoded onion message strings, From c45c3a7d4d3743ed0cdceb76e7aef19c37798fd7 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sat, 1 Feb 2025 19:25:59 +0000 Subject: [PATCH 3/4] Use constant for buffer size in OM `functional_tests.rs` In 3c3e93e7b5a871e57462cc14e0509d271421c88a we ran rustfmt blindly on `lightning/src/onion_message/functional_tests.rs`. This left the file with a handful of warts that really should have been cleaned up as we went. Here we take the opportunity to clean up the `peer_buffer_full` test which used a constant for the number of messages we can push before the buffer fills, rather than calculating the amount based on the buffer size constant, also removing a comment that got moved by rustfmt into a place where it was no longer understandable. --- lightning/src/onion_message/functional_tests.rs | 3 +-- lightning/src/onion_message/messenger.rs | 5 +++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/lightning/src/onion_message/functional_tests.rs b/lightning/src/onion_message/functional_tests.rs index bc74ca9f853..f5f4854dda1 100644 --- a/lightning/src/onion_message/functional_tests.rs +++ b/lightning/src/onion_message/functional_tests.rs @@ -765,8 +765,7 @@ fn peer_buffer_full() { let destination = Destination::Node(nodes[1].node_id); let instructions = MessageSendInstructions::WithoutReplyPath { destination }; - for _ in 0..188 { - // Based on MAX_PER_PEER_BUFFER_SIZE in OnionMessenger + for _ in 0..super::messenger::MAX_PER_PEER_BUFFER_SIZE / 1401 + 1 { nodes[0].messenger.send_onion_message(test_msg.clone(), instructions.clone()).unwrap(); } let err = nodes[0].messenger.send_onion_message(test_msg, instructions.clone()).unwrap_err(); diff --git a/lightning/src/onion_message/messenger.rs b/lightning/src/onion_message/messenger.rs index f918c998f6d..8ad9e6b7a76 100644 --- a/lightning/src/onion_message/messenger.rs +++ b/lightning/src/onion_message/messenger.rs @@ -1661,11 +1661,12 @@ where } } +const MAX_TOTAL_BUFFER_SIZE: usize = (1 << 20) * 128; +pub(super) const MAX_PER_PEER_BUFFER_SIZE: usize = (1 << 10) * 256; + fn outbound_buffer_full( peer_node_id: &PublicKey, buffer: &HashMap, ) -> bool { - const MAX_TOTAL_BUFFER_SIZE: usize = (1 << 20) * 128; - const MAX_PER_PEER_BUFFER_SIZE: usize = (1 << 10) * 256; let mut total_buffered_bytes = 0; let mut peer_buffered_bytes = 0; for (pk, peer_buf) in buffer { From 04c769b0c5b6915590bff6da00d7c350d97227d1 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sat, 1 Feb 2025 20:50:55 +0000 Subject: [PATCH 4/4] Improve readability in `messenger.rs` now that we're rustfmt'ing In ecbab2938626ceb23bfad7af575638a34f1d6959 we ran rustfmt blindly on `lightning/src/onion_message/messenger.rs`. This exposed a few untidy things in the file and made them worse, which we should have cleaned up as we went. Here we address these, removing some unnecessary generics, condensing some lines by adding intermediate variables, and reducing very vertical match statements. --- lightning/src/onion_message/messenger.rs | 82 ++++++++++-------------- 1 file changed, 34 insertions(+), 48 deletions(-) diff --git a/lightning/src/onion_message/messenger.rs b/lightning/src/onion_message/messenger.rs index 8ad9e6b7a76..012b7978053 100644 --- a/lightning/src/onion_message/messenger.rs +++ b/lightning/src/onion_message/messenger.rs @@ -587,16 +587,11 @@ where a_tor_only.cmp(b_tor_only).then(a_channels.cmp(b_channels).reverse()) }); + let entropy = &**entropy_source; let paths = peer_info .into_iter() .map(|(peer, _, _)| { - BlindedMessagePath::new( - &[peer], - recipient, - context.clone(), - &**entropy_source, - secp_ctx, - ) + BlindedMessagePath::new(&[peer], recipient, context.clone(), entropy, secp_ctx) }) .take(MAX_PATHS) .collect::, _>>(); @@ -1050,13 +1045,11 @@ where let blinding_factor = { let mut hmac = HmacEngine::::new(b"blinded_node_id"); hmac.input(control_tlvs_ss.as_ref()); - Hmac::from_engine(hmac).to_byte_array() + let hmac = Hmac::from_engine(hmac).to_byte_array(); + Scalar::from_be_bytes(hmac).unwrap() }; - match node_signer.ecdh( - Recipient::Node, - &msg.onion_routing_packet.public_key, - Some(&Scalar::from_be_bytes(blinding_factor).unwrap()), - ) { + let packet_pubkey = &msg.onion_routing_packet.public_key; + match node_signer.ecdh(Recipient::Node, packet_pubkey, Some(&blinding_factor)) { Ok(ss) => ss.secret_bytes(), Err(()) => { log_trace!(logger, "Failed to compute onion packet shared secret"); @@ -1064,18 +1057,15 @@ where }, } }; - match onion_utils::decode_next_untagged_hop( + let next_hop = onion_utils::decode_next_untagged_hop( onion_decode_ss, &msg.onion_routing_packet.hop_data[..], msg.onion_routing_packet.hmac, (control_tlvs_ss, custom_handler.deref(), logger.deref()), - ) { + ); + match next_hop { Ok(( - Payload::Receive::< - ParsedOnionMessageContents< - <::Target as CustomOnionMessageHandler>::CustomMessage, - >, - > { + Payload::Receive { message, control_tlvs: ReceiveControlTlvs::Unblinded(ReceiveTlvs { context }), reply_path, @@ -1117,11 +1107,10 @@ where // unwrapping the onion layers to get to the final payload. Since we don't have the option // of creating blinded paths with dummy hops currently, we should be ok to not handle this // for now. - let new_pubkey = match onion_utils::next_hop_pubkey( - &secp_ctx, - msg.onion_routing_packet.public_key, - &onion_decode_ss, - ) { + let packet_pubkey = msg.onion_routing_packet.public_key; + let new_pubkey_opt = + onion_utils::next_hop_pubkey(&secp_ctx, packet_pubkey, &onion_decode_ss); + let new_pubkey = match new_pubkey_opt { Ok(pk) => pk, Err(e) => { log_trace!(logger, "Failed to compute next hop packet pubkey: {}", e); @@ -1406,14 +1395,14 @@ where .map_err(|_| SendError::GetNodeIdFailed)?; let secp_ctx = &self.secp_ctx; - let peers = self - .message_recipients - .lock() - .unwrap() - .iter() - .filter(|(_, peer)| matches!(peer, OnionMessageRecipient::ConnectedPeer(_))) - .map(|(node_id, _)| *node_id) - .collect::>(); + let peers = { + let message_recipients = self.message_recipients.lock().unwrap(); + message_recipients + .iter() + .filter(|(_, peer)| matches!(peer, OnionMessageRecipient::ConnectedPeer(_))) + .map(|(node_id, _)| *node_id) + .collect::>() + }; self.message_router .create_blinded_paths(recipient, context, peers, secp_ctx) @@ -1627,10 +1616,9 @@ where if num_peer_connecteds <= 1 { for event in peer_connecteds { if handler(event).await.is_ok() { - self.pending_peer_connected_events - .lock() - .unwrap() - .drain(..num_peer_connecteds); + let mut pending_peer_connected_events = + self.pending_peer_connected_events.lock().unwrap(); + pending_peer_connected_events.drain(..num_peer_connecteds); } else { // We failed handling the event. Return to have it eventually replayed. self.pending_events_processor.store(false, Ordering::Release); @@ -1988,12 +1976,13 @@ where &self, their_node_id: PublicKey, init: &msgs::Init, _inbound: bool, ) -> Result<(), ()> { if init.features.supports_onion_messages() { - self.message_recipients - .lock() - .unwrap() - .entry(their_node_id) - .or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new())) - .mark_connected(); + { + let mut message_recipients = self.message_recipients.lock().unwrap(); + message_recipients + .entry(their_node_id) + .or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new())) + .mark_connected(); + } if self.intercept_messages_for_offline_peers { let mut pending_peer_connected_events = self.pending_peer_connected_events.lock().unwrap(); @@ -2090,11 +2079,8 @@ where ); } - self.message_recipients - .lock() - .unwrap() - .get_mut(&peer_node_id) - .and_then(|buffer| buffer.dequeue_message()) + let mut message_recipients = self.message_recipients.lock().unwrap(); + message_recipients.get_mut(&peer_node_id).and_then(|buffer| buffer.dequeue_message()) } }