Skip to content

Commit 79f212b

Browse files
committed
Use a message buffer abstraction in OnionMessenger
Onion messages are buffered for sending to the next node. Since the network has limited adoption, connecting directly to a peer may be necessary. Add an OnionMessageBuffer abstraction that can differentiate between connected peers and those are pending a connection. This allows for buffering messages before a connection is established and applying different buffer policies for peers yet to be connected.
1 parent 37150b4 commit 79f212b

File tree

2 files changed

+105
-38
lines changed

2 files changed

+105
-38
lines changed

fuzz/src/onion_message.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ mod tests {
269269
"Received an onion message with path_id None and a reply_path: Custom(TestCustomMessage)"
270270
.to_string())), Some(&1));
271271
assert_eq!(log_entries.get(&("lightning::onion_message::messenger".to_string(),
272-
"Sending onion message: TestCustomMessage".to_string())), Some(&1));
272+
"Sending onion message when responding to Custom onion message with path_id None: TestCustomMessage".to_string())), Some(&1));
273273
}
274274

275275
let two_unblinded_hops_om = "\

lightning/src/onion_message/messenger.rs

Lines changed: 104 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -150,13 +150,70 @@ where
150150
entropy_source: ES,
151151
node_signer: NS,
152152
logger: L,
153-
pending_messages: Mutex<HashMap<PublicKey, VecDeque<OnionMessage>>>,
153+
message_buffers: Mutex<HashMap<PublicKey, OnionMessageBuffer>>,
154154
secp_ctx: Secp256k1<secp256k1::All>,
155155
message_router: MR,
156156
offers_handler: OMH,
157157
custom_handler: CMH,
158158
}
159159

160+
/// [`OnionMessage`]s buffered to be sent.
161+
enum OnionMessageBuffer {
162+
/// Messages for a node connected as a peer.
163+
ConnectedPeer(VecDeque<OnionMessage>),
164+
165+
/// Messages for a node that is not yet connected.
166+
PendingConnection(VecDeque<OnionMessage>),
167+
}
168+
169+
impl OnionMessageBuffer {
170+
fn pending_messages(&self) -> &VecDeque<OnionMessage> {
171+
match self {
172+
OnionMessageBuffer::ConnectedPeer(pending_messages) => pending_messages,
173+
OnionMessageBuffer::PendingConnection(pending_messages) => pending_messages,
174+
}
175+
}
176+
177+
fn enqueue_message(&mut self, message: OnionMessage) {
178+
let pending_messages = match self {
179+
OnionMessageBuffer::ConnectedPeer(pending_messages) => pending_messages,
180+
OnionMessageBuffer::PendingConnection(pending_messages) => pending_messages,
181+
};
182+
183+
pending_messages.push_back(message);
184+
}
185+
186+
fn dequeue_message(&mut self) -> Option<OnionMessage> {
187+
let pending_messages = match self {
188+
OnionMessageBuffer::ConnectedPeer(pending_messages) => pending_messages,
189+
OnionMessageBuffer::PendingConnection(pending_messages) => {
190+
debug_assert!(false);
191+
pending_messages
192+
},
193+
};
194+
195+
pending_messages.pop_front()
196+
}
197+
198+
#[cfg(test)]
199+
fn release_pending_messages(&mut self) -> VecDeque<OnionMessage> {
200+
let pending_messages = match self {
201+
OnionMessageBuffer::ConnectedPeer(pending_messages) => pending_messages,
202+
OnionMessageBuffer::PendingConnection(pending_messages) => pending_messages,
203+
};
204+
205+
core::mem::take(pending_messages)
206+
}
207+
208+
fn mark_connected(&mut self) {
209+
if let OnionMessageBuffer::PendingConnection(pending_messages) = self {
210+
let mut new_pending_messages = VecDeque::new();
211+
core::mem::swap(pending_messages, &mut new_pending_messages);
212+
*self = OnionMessageBuffer::ConnectedPeer(new_pending_messages);
213+
}
214+
}
215+
}
216+
160217
/// An [`OnionMessage`] for [`OnionMessenger`] to send.
161218
///
162219
/// These are obtained when released from [`OnionMessenger`]'s handlers after which they are
@@ -502,7 +559,7 @@ where
502559
OnionMessenger {
503560
entropy_source,
504561
node_signer,
505-
pending_messages: Mutex::new(HashMap::new()),
562+
message_buffers: Mutex::new(HashMap::new()),
506563
secp_ctx,
507564
logger,
508565
message_router,
@@ -518,21 +575,23 @@ where
518575
pub fn send_onion_message<T: OnionMessageContents>(
519576
&self, path: OnionMessagePath, contents: T, reply_path: Option<BlindedPath>
520577
) -> Result<(), SendError> {
521-
522578
log_trace!(self.logger, "Sending onion message: {:?}", contents);
523-
524-
let (first_node_id, onion_msg) = create_onion_message(
579+
580+
let (first_node_id, onion_message) = create_onion_message(
525581
&self.entropy_source, &self.node_signer, &self.secp_ctx, path, contents, reply_path
526582
)?;
527583

528-
let mut pending_per_peer_msgs = self.pending_messages.lock().unwrap();
529-
if outbound_buffer_full(&first_node_id, &pending_per_peer_msgs) { return Err(SendError::BufferFull) }
530-
match pending_per_peer_msgs.entry(first_node_id) {
584+
let mut message_buffers = self.message_buffers.lock().unwrap();
585+
if outbound_buffer_full(&first_node_id, &message_buffers) {
586+
return Err(SendError::BufferFull);
587+
}
588+
589+
match message_buffers.entry(first_node_id) {
531590
hash_map::Entry::Vacant(_) => Err(SendError::InvalidFirstHop),
532591
hash_map::Entry::Occupied(mut e) => {
533-
e.get_mut().push_back(onion_msg);
592+
e.get_mut().enqueue_message(onion_message);
534593
Ok(())
535-
}
594+
},
536595
}
537596
}
538597

@@ -565,7 +624,7 @@ where
565624
}
566625
};
567626

568-
let peers = self.pending_messages.lock().unwrap().keys().copied().collect();
627+
let peers = self.message_buffers.lock().unwrap().keys().copied().collect();
569628
let path = match self.message_router.find_path(sender, peers, destination) {
570629
Ok(path) => path,
571630
Err(()) => {
@@ -578,30 +637,29 @@ where
578637

579638
if let Err(e) = self.send_onion_message(path, contents, reply_path) {
580639
log_trace!(self.logger, "Failed sending onion message {}: {:?}", log_suffix, e);
581-
return;
582640
}
583641
}
584642

585643
#[cfg(test)]
586644
pub(super) fn release_pending_msgs(&self) -> HashMap<PublicKey, VecDeque<OnionMessage>> {
587-
let mut pending_msgs = self.pending_messages.lock().unwrap();
645+
let mut message_buffers = self.message_buffers.lock().unwrap();
588646
let mut msgs = HashMap::new();
589647
// We don't want to disconnect the peers by removing them entirely from the original map, so we
590-
// swap the pending message buffers individually.
591-
for (peer_node_id, pending_messages) in &mut *pending_msgs {
592-
msgs.insert(*peer_node_id, core::mem::take(pending_messages));
648+
// release the pending message buffers individually.
649+
for (peer_node_id, buffer) in &mut *message_buffers {
650+
msgs.insert(*peer_node_id, buffer.release_pending_messages());
593651
}
594652
msgs
595653
}
596654
}
597655

598-
fn outbound_buffer_full(peer_node_id: &PublicKey, buffer: &HashMap<PublicKey, VecDeque<OnionMessage>>) -> bool {
656+
fn outbound_buffer_full(peer_node_id: &PublicKey, buffer: &HashMap<PublicKey, OnionMessageBuffer>) -> bool {
599657
const MAX_TOTAL_BUFFER_SIZE: usize = (1 << 20) * 128;
600658
const MAX_PER_PEER_BUFFER_SIZE: usize = (1 << 10) * 256;
601659
let mut total_buffered_bytes = 0;
602660
let mut peer_buffered_bytes = 0;
603661
for (pk, peer_buf) in buffer {
604-
for om in peer_buf {
662+
for om in peer_buf.pending_messages() {
605663
let om_len = om.serialized_length();
606664
if pk == peer_node_id {
607665
peer_buffered_bytes += om_len;
@@ -660,24 +718,28 @@ where
660718
}
661719
},
662720
Ok(PeeledOnion::Forward(next_node_id, onion_message)) => {
663-
let mut pending_per_peer_msgs = self.pending_messages.lock().unwrap();
664-
if outbound_buffer_full(&next_node_id, &pending_per_peer_msgs) {
721+
let mut message_buffers = self.message_buffers.lock().unwrap();
722+
if outbound_buffer_full(&next_node_id, &message_buffers) {
665723
log_trace!(self.logger, "Dropping forwarded onion message to peer {:?}: outbound buffer full", next_node_id);
666724
return
667725
}
668726

669727
#[cfg(fuzzing)]
670-
pending_per_peer_msgs.entry(next_node_id).or_insert_with(VecDeque::new);
671-
672-
match pending_per_peer_msgs.entry(next_node_id) {
673-
hash_map::Entry::Vacant(_) => {
728+
message_buffers
729+
.entry(next_node_id)
730+
.or_insert_with(|| OnionMessageBuffer::ConnectedPeer(VecDeque::new()));
731+
732+
match message_buffers.entry(next_node_id) {
733+
hash_map::Entry::Occupied(mut e) if matches!(
734+
e.get(), OnionMessageBuffer::ConnectedPeer(..)
735+
) => {
736+
e.get_mut().enqueue_message(onion_message);
737+
log_trace!(self.logger, "Forwarding an onion message to peer {}", next_node_id);
738+
},
739+
_ => {
674740
log_trace!(self.logger, "Dropping forwarded onion message to disconnected peer {:?}", next_node_id);
675741
return
676742
},
677-
hash_map::Entry::Occupied(mut e) => {
678-
e.get_mut().push_back(onion_message);
679-
log_trace!(self.logger, "Forwarding an onion message to peer {}", next_node_id);
680-
}
681743
}
682744
},
683745
Err(e) => {
@@ -688,15 +750,22 @@ where
688750

689751
fn peer_connected(&self, their_node_id: &PublicKey, init: &msgs::Init, _inbound: bool) -> Result<(), ()> {
690752
if init.features.supports_onion_messages() {
691-
let mut peers = self.pending_messages.lock().unwrap();
692-
peers.insert(their_node_id.clone(), VecDeque::new());
753+
self.message_buffers.lock().unwrap()
754+
.entry(*their_node_id)
755+
.or_insert_with(|| OnionMessageBuffer::ConnectedPeer(VecDeque::new()))
756+
.mark_connected();
757+
} else {
758+
self.message_buffers.lock().unwrap().remove(their_node_id);
693759
}
760+
694761
Ok(())
695762
}
696763

697764
fn peer_disconnected(&self, their_node_id: &PublicKey) {
698-
let mut pending_msgs = self.pending_messages.lock().unwrap();
699-
pending_msgs.remove(their_node_id);
765+
match self.message_buffers.lock().unwrap().remove(their_node_id) {
766+
Some(OnionMessageBuffer::ConnectedPeer(..)) => {},
767+
_ => debug_assert!(false),
768+
}
700769
}
701770

702771
fn provided_node_features(&self) -> NodeFeatures {
@@ -737,11 +806,9 @@ where
737806
);
738807
}
739808

740-
let mut pending_msgs = self.pending_messages.lock().unwrap();
741-
if let Some(msgs) = pending_msgs.get_mut(&peer_node_id) {
742-
return msgs.pop_front()
743-
}
744-
None
809+
self.message_buffers.lock().unwrap()
810+
.get_mut(&peer_node_id)
811+
.and_then(|buffer| buffer.dequeue_message())
745812
}
746813
}
747814

0 commit comments

Comments
 (0)