Skip to content

Commit 400d5e2

Browse files
committed
Use a message buffer abstraction in OnionMessenger
Onion messages are buffered for sending to the next node. Since the network has limited adoption, connecting directly to a peer may be necessary. Add an OnionMessageBuffer abstraction that can differentiate between connected peers and those are pending a connection. This allows for buffering messages before a connection is established and applying different buffer policies for peers yet to be connected.
1 parent d2242f6 commit 400d5e2

File tree

1 file changed

+91
-31
lines changed

1 file changed

+91
-31
lines changed

lightning/src/onion_message/messenger.rs

Lines changed: 91 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -148,13 +148,67 @@ where
148148
entropy_source: ES,
149149
node_signer: NS,
150150
logger: L,
151-
pending_messages: Mutex<HashMap<PublicKey, VecDeque<OnionMessage>>>,
151+
message_buffers: Mutex<HashMap<PublicKey, OnionMessageBuffer>>,
152152
secp_ctx: Secp256k1<secp256k1::All>,
153153
message_router: MR,
154154
offers_handler: OMH,
155155
custom_handler: CMH,
156156
}
157157

158+
/// [`OnionMessage`]s buffered to be sent.
159+
enum OnionMessageBuffer {
160+
/// Messages for a node connected as a peer.
161+
ConnectedPeer(VecDeque<OnionMessage>),
162+
163+
/// Messages for a node that is not yet connected.
164+
PendingConnection(VecDeque<OnionMessage>),
165+
}
166+
167+
impl OnionMessageBuffer {
168+
fn pending_messages(&self) -> &VecDeque<OnionMessage> {
169+
match self {
170+
OnionMessageBuffer::ConnectedPeer(pending_messages) => pending_messages,
171+
OnionMessageBuffer::PendingConnection(pending_messages) => pending_messages,
172+
}
173+
}
174+
175+
fn enqueue_message(&mut self, message: OnionMessage) {
176+
let pending_messages = match self {
177+
OnionMessageBuffer::ConnectedPeer(pending_messages) => pending_messages,
178+
OnionMessageBuffer::PendingConnection(pending_messages) => pending_messages,
179+
};
180+
181+
pending_messages.push_back(message);
182+
}
183+
184+
fn dequeue_message(&mut self) -> Option<OnionMessage> {
185+
let pending_messages = match self {
186+
OnionMessageBuffer::ConnectedPeer(pending_messages) => pending_messages,
187+
OnionMessageBuffer::PendingConnection(pending_messages) => pending_messages,
188+
};
189+
190+
pending_messages.pop_front()
191+
}
192+
193+
#[cfg(test)]
194+
fn release_pending_messages(&mut self) -> VecDeque<OnionMessage> {
195+
let pending_messages = match self {
196+
OnionMessageBuffer::ConnectedPeer(pending_messages) => pending_messages,
197+
OnionMessageBuffer::PendingConnection(pending_messages) => pending_messages,
198+
};
199+
200+
core::mem::take(pending_messages)
201+
}
202+
203+
fn mark_connected(&mut self) {
204+
if let OnionMessageBuffer::PendingConnection(pending_messages) = self {
205+
let mut new_pending_messages = VecDeque::new();
206+
core::mem::swap(pending_messages, &mut new_pending_messages);
207+
*self = OnionMessageBuffer::ConnectedPeer(new_pending_messages);
208+
}
209+
}
210+
}
211+
158212
/// An [`OnionMessage`] for [`OnionMessenger`] to send.
159213
///
160214
/// These are obtained when released from [`OnionMessenger`]'s handlers after which they are
@@ -500,7 +554,7 @@ where
500554
OnionMessenger {
501555
entropy_source,
502556
node_signer,
503-
pending_messages: Mutex::new(HashMap::new()),
557+
message_buffers: Mutex::new(HashMap::new()),
504558
secp_ctx,
505559
logger,
506560
message_router,
@@ -516,18 +570,21 @@ where
516570
pub fn send_onion_message<T: OnionMessageContents>(
517571
&self, path: OnionMessagePath, contents: T, reply_path: Option<BlindedPath>
518572
) -> Result<(), SendError> {
519-
let (first_node_id, onion_msg) = create_onion_message(
573+
let (first_node_id, onion_message) = create_onion_message(
520574
&self.entropy_source, &self.node_signer, &self.secp_ctx, path, contents, reply_path
521575
)?;
522576

523-
let mut pending_per_peer_msgs = self.pending_messages.lock().unwrap();
524-
if outbound_buffer_full(&first_node_id, &pending_per_peer_msgs) { return Err(SendError::BufferFull) }
525-
match pending_per_peer_msgs.entry(first_node_id) {
577+
let mut message_buffers = self.message_buffers.lock().unwrap();
578+
if outbound_buffer_full(&first_node_id, &message_buffers) {
579+
return Err(SendError::BufferFull);
580+
}
581+
582+
match message_buffers.entry(first_node_id) {
526583
hash_map::Entry::Vacant(_) => Err(SendError::InvalidFirstHop),
527584
hash_map::Entry::Occupied(mut e) => {
528-
e.get_mut().push_back(onion_msg);
585+
e.get_mut().enqueue_message(onion_message);
529586
Ok(())
530-
}
587+
},
531588
}
532589
}
533590

@@ -560,7 +617,7 @@ where
560617
}
561618
};
562619

563-
let peers = self.pending_messages.lock().unwrap().keys().copied().collect();
620+
let peers = self.message_buffers.lock().unwrap().keys().copied().collect();
564621
let path = match self.message_router.find_path(sender, peers, destination) {
565622
Ok(path) => path,
566623
Err(()) => {
@@ -573,30 +630,29 @@ where
573630

574631
if let Err(e) = self.send_onion_message(path, contents, reply_path) {
575632
log_trace!(self.logger, "Failed sending onion message {}: {:?}", log_suffix, e);
576-
return;
577633
}
578634
}
579635

580636
#[cfg(test)]
581637
pub(super) fn release_pending_msgs(&self) -> HashMap<PublicKey, VecDeque<OnionMessage>> {
582-
let mut pending_msgs = self.pending_messages.lock().unwrap();
638+
let mut message_buffers = self.message_buffers.lock().unwrap();
583639
let mut msgs = HashMap::new();
584640
// We don't want to disconnect the peers by removing them entirely from the original map, so we
585-
// swap the pending message buffers individually.
586-
for (peer_node_id, pending_messages) in &mut *pending_msgs {
587-
msgs.insert(*peer_node_id, core::mem::take(pending_messages));
641+
// release the pending message buffers individually.
642+
for (peer_node_id, buffer) in &mut *message_buffers {
643+
msgs.insert(*peer_node_id, buffer.release_pending_messages());
588644
}
589645
msgs
590646
}
591647
}
592648

593-
fn outbound_buffer_full(peer_node_id: &PublicKey, buffer: &HashMap<PublicKey, VecDeque<OnionMessage>>) -> bool {
649+
fn outbound_buffer_full(peer_node_id: &PublicKey, buffer: &HashMap<PublicKey, OnionMessageBuffer>) -> bool {
594650
const MAX_TOTAL_BUFFER_SIZE: usize = (1 << 20) * 128;
595651
const MAX_PER_PEER_BUFFER_SIZE: usize = (1 << 10) * 256;
596652
let mut total_buffered_bytes = 0;
597653
let mut peer_buffered_bytes = 0;
598654
for (pk, peer_buf) in buffer {
599-
for om in peer_buf {
655+
for om in peer_buf.pending_messages() {
600656
let om_len = om.serialized_length();
601657
if pk == peer_node_id {
602658
peer_buffered_bytes += om_len;
@@ -654,24 +710,26 @@ where
654710
}
655711
},
656712
Ok(PeeledOnion::Forward(next_node_id, onion_message)) => {
657-
let mut pending_per_peer_msgs = self.pending_messages.lock().unwrap();
658-
if outbound_buffer_full(&next_node_id, &pending_per_peer_msgs) {
713+
let mut message_buffers = self.message_buffers.lock().unwrap();
714+
if outbound_buffer_full(&next_node_id, &message_buffers) {
659715
log_trace!(self.logger, "Dropping forwarded onion message to peer {:?}: outbound buffer full", next_node_id);
660716
return
661717
}
662718

663719
#[cfg(fuzzing)]
664-
pending_per_peer_msgs.entry(next_node_id).or_insert_with(VecDeque::new);
720+
message_buffers
721+
.entry(next_node_id)
722+
.or_insert_with(|| OnionMessageBuffer::ConnectedPeer(VecDeque::new()));
665723

666-
match pending_per_peer_msgs.entry(next_node_id) {
724+
match message_buffers.entry(next_node_id) {
667725
hash_map::Entry::Vacant(_) => {
668726
log_trace!(self.logger, "Dropping forwarded onion message to disconnected peer {:?}", next_node_id);
669727
return
670728
},
671729
hash_map::Entry::Occupied(mut e) => {
672-
e.get_mut().push_back(onion_message);
730+
e.get_mut().enqueue_message(onion_message);
673731
log_trace!(self.logger, "Forwarding an onion message to peer {}", next_node_id);
674-
}
732+
},
675733
}
676734
},
677735
Err(e) => {
@@ -682,15 +740,19 @@ where
682740

683741
fn peer_connected(&self, their_node_id: &PublicKey, init: &msgs::Init, _inbound: bool) -> Result<(), ()> {
684742
if init.features.supports_onion_messages() {
685-
let mut peers = self.pending_messages.lock().unwrap();
686-
peers.insert(their_node_id.clone(), VecDeque::new());
743+
self.message_buffers.lock().unwrap()
744+
.entry(*their_node_id)
745+
.or_insert_with(|| OnionMessageBuffer::ConnectedPeer(VecDeque::new()))
746+
.mark_connected();
747+
} else {
748+
self.message_buffers.lock().unwrap().remove(their_node_id);
687749
}
750+
688751
Ok(())
689752
}
690753

691754
fn peer_disconnected(&self, their_node_id: &PublicKey) {
692-
let mut pending_msgs = self.pending_messages.lock().unwrap();
693-
pending_msgs.remove(their_node_id);
755+
self.message_buffers.lock().unwrap().remove(their_node_id);
694756
}
695757

696758
fn provided_node_features(&self) -> NodeFeatures {
@@ -731,11 +793,9 @@ where
731793
);
732794
}
733795

734-
let mut pending_msgs = self.pending_messages.lock().unwrap();
735-
if let Some(msgs) = pending_msgs.get_mut(&peer_node_id) {
736-
return msgs.pop_front()
737-
}
738-
None
796+
self.message_buffers.lock().unwrap()
797+
.get_mut(&peer_node_id)
798+
.and_then(|buffer| buffer.dequeue_message())
739799
}
740800
}
741801

0 commit comments

Comments
 (0)