@@ -1600,7 +1600,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
16001600 }
16011601
16021602 for msg in msgs_to_forward. drain ( ..) {
1603- self . forward_broadcast_msg ( & * peers, & msg, peer_node_id. as_ref ( ) . map ( |( pk, _) | pk) ) ;
1603+ self . forward_broadcast_msg ( & * peers, & msg, peer_node_id. as_ref ( ) . map ( |( pk, _) | pk) , false ) ;
16041604 }
16051605
16061606 Ok ( pause_read)
@@ -1946,7 +1946,13 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
19461946 Ok ( should_forward)
19471947 }
19481948
1949- fn forward_broadcast_msg ( & self , peers : & HashMap < Descriptor , Mutex < Peer > > , msg : & wire:: Message < <<CMH as Deref >:: Target as wire:: CustomMessageReader >:: CustomMessage > , except_node : Option < & PublicKey > ) {
1949+ /// Forwards a gossip `msg` to `peers` excluding node(s) that generated the gossip message and
1950+ /// excluding `except_node`.
1951+ ///
1952+ /// If the message queue for a peer is somewhat full, the message will not be forwarded to them
1953+ /// unless `allow_large_buffer` is set, in which case the message will be treated as critical
1954+ /// and delivered no matter the available buffer space.
1955+ fn forward_broadcast_msg ( & self , peers : & HashMap < Descriptor , Mutex < Peer > > , msg : & wire:: Message < <<CMH as Deref >:: Target as wire:: CustomMessageReader >:: CustomMessage > , except_node : Option < & PublicKey > , allow_large_buffer : bool ) {
19501956 match msg {
19511957 wire:: Message :: ChannelAnnouncement ( ref msg) => {
19521958 log_gossip ! ( self . logger, "Sending message to all peers except {:?} or the announced channel's counterparties: {:?}" , except_node, msg) ;
@@ -1961,7 +1967,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
19611967 debug_assert ! ( peer. their_node_id. is_some( ) ) ;
19621968 debug_assert ! ( peer. channel_encryptor. is_ready_for_encryption( ) ) ;
19631969 let logger = WithContext :: from ( & self . logger , peer. their_node_id . map ( |p| p. 0 ) , None , None ) ;
1964- if peer. buffer_full_drop_gossip_broadcast ( ) {
1970+ if peer. buffer_full_drop_gossip_broadcast ( ) && !allow_large_buffer {
19651971 log_gossip ! ( logger, "Skipping broadcast message to {:?} as its outbound buffer is full" , peer. their_node_id) ;
19661972 continue ;
19671973 }
@@ -1989,7 +1995,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
19891995 debug_assert ! ( peer. their_node_id. is_some( ) ) ;
19901996 debug_assert ! ( peer. channel_encryptor. is_ready_for_encryption( ) ) ;
19911997 let logger = WithContext :: from ( & self . logger , peer. their_node_id . map ( |p| p. 0 ) , None , None ) ;
1992- if peer. buffer_full_drop_gossip_broadcast ( ) {
1998+ if peer. buffer_full_drop_gossip_broadcast ( ) && !allow_large_buffer {
19931999 log_gossip ! ( logger, "Skipping broadcast message to {:?} as its outbound buffer is full" , peer. their_node_id) ;
19942000 continue ;
19952001 }
@@ -2017,7 +2023,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
20172023 debug_assert ! ( peer. their_node_id. is_some( ) ) ;
20182024 debug_assert ! ( peer. channel_encryptor. is_ready_for_encryption( ) ) ;
20192025 let logger = WithContext :: from ( & self . logger , peer. their_node_id . map ( |p| p. 0 ) , None , None ) ;
2020- if peer. buffer_full_drop_gossip_broadcast ( ) {
2026+ if peer. buffer_full_drop_gossip_broadcast ( ) && !allow_large_buffer {
20212027 log_gossip ! ( logger, "Skipping broadcast message to {:?} as its outbound buffer is full" , peer. their_node_id) ;
20222028 continue ;
20232029 }
@@ -2099,6 +2105,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
20992105 }
21002106 }
21012107 }
2108+
2109+ // Handles a `MessageSendEvent`, using `from_chan_handler` to decide if we should
2110+ // robustly gossip broadcast events even if a peer's message buffer is full.
21022111 let mut handle_event = |event, from_chan_handler| {
21032112 match event {
21042113 MessageSendEvent :: SendAcceptChannel { ref node_id, ref msg } => {
@@ -2293,31 +2302,39 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
22932302 MessageSendEvent :: BroadcastChannelAnnouncement { msg, update_msg } => {
22942303 log_debug ! ( self . logger, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}" , msg. contents. short_channel_id) ;
22952304 match self . message_handler . route_handler . handle_channel_announcement ( None , & msg) {
2296- Ok ( _) | Err ( LightningError { action : msgs:: ErrorAction :: IgnoreDuplicateGossip , .. } ) =>
2297- self . forward_broadcast_msg ( peers, & wire:: Message :: ChannelAnnouncement ( msg) , None ) ,
2305+ Ok ( _) | Err ( LightningError { action : msgs:: ErrorAction :: IgnoreDuplicateGossip , .. } ) => {
2306+ let forward = wire:: Message :: ChannelAnnouncement ( msg) ;
2307+ self . forward_broadcast_msg ( peers, & forward, None , from_chan_handler) ;
2308+ } ,
22982309 _ => { } ,
22992310 }
23002311 if let Some ( msg) = update_msg {
23012312 match self . message_handler . route_handler . handle_channel_update ( None , & msg) {
2302- Ok ( _) | Err ( LightningError { action : msgs:: ErrorAction :: IgnoreDuplicateGossip , .. } ) =>
2303- self . forward_broadcast_msg ( peers, & wire:: Message :: ChannelUpdate ( msg) , None ) ,
2313+ Ok ( _) | Err ( LightningError { action : msgs:: ErrorAction :: IgnoreDuplicateGossip , .. } ) => {
2314+ let forward = wire:: Message :: ChannelUpdate ( msg) ;
2315+ self . forward_broadcast_msg ( peers, & forward, None , from_chan_handler) ;
2316+ } ,
23042317 _ => { } ,
23052318 }
23062319 }
23072320 } ,
23082321 MessageSendEvent :: BroadcastChannelUpdate { msg } => {
23092322 log_debug ! ( self . logger, "Handling BroadcastChannelUpdate event in peer_handler for contents {:?}" , msg. contents) ;
23102323 match self . message_handler . route_handler . handle_channel_update ( None , & msg) {
2311- Ok ( _) | Err ( LightningError { action : msgs:: ErrorAction :: IgnoreDuplicateGossip , .. } ) =>
2312- self . forward_broadcast_msg ( peers, & wire:: Message :: ChannelUpdate ( msg) , None ) ,
2324+ Ok ( _) | Err ( LightningError { action : msgs:: ErrorAction :: IgnoreDuplicateGossip , .. } ) => {
2325+ let forward = wire:: Message :: ChannelUpdate ( msg) ;
2326+ self . forward_broadcast_msg ( peers, & forward, None , from_chan_handler) ;
2327+ } ,
23132328 _ => { } ,
23142329 }
23152330 } ,
23162331 MessageSendEvent :: BroadcastNodeAnnouncement { msg } => {
23172332 log_debug ! ( self . logger, "Handling BroadcastNodeAnnouncement event in peer_handler for node {}" , msg. contents. node_id) ;
23182333 match self . message_handler . route_handler . handle_node_announcement ( None , & msg) {
2319- Ok ( _) | Err ( LightningError { action : msgs:: ErrorAction :: IgnoreDuplicateGossip , .. } ) =>
2320- self . forward_broadcast_msg ( peers, & wire:: Message :: NodeAnnouncement ( msg) , None ) ,
2334+ Ok ( _) | Err ( LightningError { action : msgs:: ErrorAction :: IgnoreDuplicateGossip , .. } ) => {
2335+ let forward = wire:: Message :: NodeAnnouncement ( msg) ;
2336+ self . forward_broadcast_msg ( peers, & forward, None , from_chan_handler) ;
2337+ } ,
23212338 _ => { } ,
23222339 }
23232340 } ,
@@ -2689,7 +2706,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
26892706
26902707 log_debug ! ( self . logger, "Broadcasting NodeAnnouncement after passing it to our own RoutingMessageHandler." ) ;
26912708 let _ = self . message_handler . route_handler . handle_node_announcement ( None , & msg) ;
2692- self . forward_broadcast_msg ( & * self . peers . read ( ) . unwrap ( ) , & wire:: Message :: NodeAnnouncement ( msg) , None ) ;
2709+ self . forward_broadcast_msg ( & * self . peers . read ( ) . unwrap ( ) , & wire:: Message :: NodeAnnouncement ( msg) , None , true ) ;
26932710 }
26942711}
26952712
0 commit comments