Skip to content

Commit 568f1a7

Browse files
committed
Reliably deliver gossip messages from our ChannelMessageHandler
When our `ChannelMessageHandler` creates gossip broadcast `MessageSendEvent`s, we generally want these to be reliably delivered to all our peers, even if there's not much buffer space available. Here we do this by passing an extra flag to `forward_broadcast_msg` which indicates where the message came from, then ignoring the buffer-full criteria when the flag is set.
1 parent 69d41d8 commit 568f1a7

File tree

1 file changed

+22
-14
lines changed

1 file changed

+22
-14
lines changed

lightning/src/ln/peer_handler.rs

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1597,7 +1597,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
15971597
}
15981598

15991599
for msg in msgs_to_forward.drain(..) {
1600-
self.forward_broadcast_msg(&*peers, &msg, peer_node_id.as_ref().map(|(pk, _)| pk));
1600+
self.forward_broadcast_msg(&*peers, &msg, peer_node_id.as_ref().map(|(pk, _)| pk), false);
16011601
}
16021602

16031603
Ok(pause_read)
@@ -1940,7 +1940,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
19401940
Ok(should_forward)
19411941
}
19421942

1943-
fn forward_broadcast_msg(&self, peers: &HashMap<Descriptor, Mutex<Peer>>, msg: &wire::Message<<<CMH as Deref>::Target as wire::CustomMessageReader>::CustomMessage>, except_node: Option<&PublicKey>) {
1943+
fn forward_broadcast_msg(&self, peers: &HashMap<Descriptor, Mutex<Peer>>, msg: &wire::Message<<<CMH as Deref>::Target as wire::CustomMessageReader>::CustomMessage>, except_node: Option<&PublicKey>, allow_large_buffer: bool) {
19441944
match msg {
19451945
wire::Message::ChannelAnnouncement(ref msg) => {
19461946
log_gossip!(self.logger, "Sending message to all peers except {:?} or the announced channel's counterparties: {:?}", except_node, msg);
@@ -1955,7 +1955,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
19551955
debug_assert!(peer.their_node_id.is_some());
19561956
debug_assert!(peer.channel_encryptor.is_ready_for_encryption());
19571957
let logger = WithContext::from(&self.logger, peer.their_node_id.map(|p| p.0), None, None);
1958-
if peer.buffer_full_drop_gossip_broadcast() {
1958+
if peer.buffer_full_drop_gossip_broadcast() && !allow_large_buffer {
19591959
log_gossip!(logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
19601960
continue;
19611961
}
@@ -1983,7 +1983,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
19831983
debug_assert!(peer.their_node_id.is_some());
19841984
debug_assert!(peer.channel_encryptor.is_ready_for_encryption());
19851985
let logger = WithContext::from(&self.logger, peer.their_node_id.map(|p| p.0), None, None);
1986-
if peer.buffer_full_drop_gossip_broadcast() {
1986+
if peer.buffer_full_drop_gossip_broadcast() && !allow_large_buffer {
19871987
log_gossip!(logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
19881988
continue;
19891989
}
@@ -2011,7 +2011,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
20112011
debug_assert!(peer.their_node_id.is_some());
20122012
debug_assert!(peer.channel_encryptor.is_ready_for_encryption());
20132013
let logger = WithContext::from(&self.logger, peer.their_node_id.map(|p| p.0), None, None);
2014-
if peer.buffer_full_drop_gossip_broadcast() {
2014+
if peer.buffer_full_drop_gossip_broadcast() && !allow_large_buffer {
20152015
log_gossip!(logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
20162016
continue;
20172017
}
@@ -2287,31 +2287,39 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
22872287
MessageSendEvent::BroadcastChannelAnnouncement { msg, update_msg } => {
22882288
log_debug!(self.logger, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id);
22892289
match self.message_handler.route_handler.handle_channel_announcement(&msg) {
2290-
Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) =>
2291-
self.forward_broadcast_msg(peers, &wire::Message::ChannelAnnouncement(msg), None),
2290+
Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) => {
2291+
let forward = wire::Message::ChannelAnnouncement(msg);
2292+
self.forward_broadcast_msg(peers, &forward, None, from_chan_handler);
2293+
},
22922294
_ => {},
22932295
}
22942296
if let Some(msg) = update_msg {
22952297
match self.message_handler.route_handler.handle_channel_update(&msg) {
2296-
Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) =>
2297-
self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(msg), None),
2298+
Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) => {
2299+
let forward = wire::Message::ChannelUpdate(msg);
2300+
self.forward_broadcast_msg(peers, &forward, None, from_chan_handler);
2301+
},
22982302
_ => {},
22992303
}
23002304
}
23012305
},
23022306
MessageSendEvent::BroadcastChannelUpdate { msg } => {
23032307
log_debug!(self.logger, "Handling BroadcastChannelUpdate event in peer_handler for contents {:?}", msg.contents);
23042308
match self.message_handler.route_handler.handle_channel_update(&msg) {
2305-
Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) =>
2306-
self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(msg), None),
2309+
Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) => {
2310+
let forward = wire::Message::ChannelUpdate(msg);
2311+
self.forward_broadcast_msg(peers, &forward, None, from_chan_handler);
2312+
},
23072313
_ => {},
23082314
}
23092315
},
23102316
MessageSendEvent::BroadcastNodeAnnouncement { msg } => {
23112317
log_debug!(self.logger, "Handling BroadcastNodeAnnouncement event in peer_handler for node {}", msg.contents.node_id);
23122318
match self.message_handler.route_handler.handle_node_announcement(&msg) {
2313-
Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) =>
2314-
self.forward_broadcast_msg(peers, &wire::Message::NodeAnnouncement(msg), None),
2319+
Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) => {
2320+
let forward = wire::Message::NodeAnnouncement(msg);
2321+
self.forward_broadcast_msg(peers, &forward, None, from_chan_handler);
2322+
},
23152323
_ => {},
23162324
}
23172325
},
@@ -2682,7 +2690,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
26822690

26832691
log_debug!(self.logger, "Broadcasting NodeAnnouncement after passing it to our own RoutingMessageHandler.");
26842692
let _ = self.message_handler.route_handler.handle_node_announcement(&msg);
2685-
self.forward_broadcast_msg(&*self.peers.read().unwrap(), &wire::Message::NodeAnnouncement(msg), None);
2693+
self.forward_broadcast_msg(&*self.peers.read().unwrap(), &wire::Message::NodeAnnouncement(msg), None, true);
26862694
}
26872695
}
26882696

0 commit comments

Comments
 (0)