@@ -1600,7 +1600,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
16001600 }
16011601
16021602 for msg in msgs_to_forward. drain ( ..) {
1603- self . forward_broadcast_msg ( & * peers, & msg, peer_node_id. as_ref ( ) . map ( |( pk, _) | pk) ) ;
1603+ self . forward_broadcast_msg ( & * peers, & msg, peer_node_id. as_ref ( ) . map ( |( pk, _) | pk) , false ) ;
16041604 }
16051605
16061606 Ok ( pause_read)
@@ -1946,7 +1946,17 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
19461946 Ok ( should_forward)
19471947 }
19481948
1949- fn forward_broadcast_msg ( & self , peers : & HashMap < Descriptor , Mutex < Peer > > , msg : & wire:: Message < <<CMH as Deref >:: Target as wire:: CustomMessageReader >:: CustomMessage > , except_node : Option < & PublicKey > ) {
1949+ /// Forwards a gossip `msg` to `peers` excluding node(s) that generated the gossip message and
1950+ /// excluding `except_node`.
1951+ ///
1952+ /// If the message queue for a peer is somewhat full, the message will not be forwarded to them
1953+ /// unless `allow_large_buffer` is set, in which case the message will be treated as critical
1954+ /// and delivered no matter the available buffer space.
1955+ fn forward_broadcast_msg (
1956+ & self , peers : & HashMap < Descriptor , Mutex < Peer > > ,
1957+ msg : & wire:: Message < <<CMH as Deref >:: Target as wire:: CustomMessageReader >:: CustomMessage > ,
1958+ except_node : Option < & PublicKey > , allow_large_buffer : bool ,
1959+ ) {
19501960 match msg {
19511961 wire:: Message :: ChannelAnnouncement ( ref msg) => {
19521962 log_gossip ! ( self . logger, "Sending message to all peers except {:?} or the announced channel's counterparties: {:?}" , except_node, msg) ;
@@ -1961,7 +1971,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
19611971 debug_assert ! ( peer. their_node_id. is_some( ) ) ;
19621972 debug_assert ! ( peer. channel_encryptor. is_ready_for_encryption( ) ) ;
19631973 let logger = WithContext :: from ( & self . logger , peer. their_node_id . map ( |p| p. 0 ) , None , None ) ;
1964- if peer. buffer_full_drop_gossip_broadcast ( ) {
1974+ if peer. buffer_full_drop_gossip_broadcast ( ) && !allow_large_buffer {
19651975 log_gossip ! ( logger, "Skipping broadcast message to {:?} as its outbound buffer is full" , peer. their_node_id) ;
19661976 continue ;
19671977 }
@@ -1989,7 +1999,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
19891999 debug_assert ! ( peer. their_node_id. is_some( ) ) ;
19902000 debug_assert ! ( peer. channel_encryptor. is_ready_for_encryption( ) ) ;
19912001 let logger = WithContext :: from ( & self . logger , peer. their_node_id . map ( |p| p. 0 ) , None , None ) ;
1992- if peer. buffer_full_drop_gossip_broadcast ( ) {
2002+ if peer. buffer_full_drop_gossip_broadcast ( ) && !allow_large_buffer {
19932003 log_gossip ! ( logger, "Skipping broadcast message to {:?} as its outbound buffer is full" , peer. their_node_id) ;
19942004 continue ;
19952005 }
@@ -2017,7 +2027,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
20172027 debug_assert ! ( peer. their_node_id. is_some( ) ) ;
20182028 debug_assert ! ( peer. channel_encryptor. is_ready_for_encryption( ) ) ;
20192029 let logger = WithContext :: from ( & self . logger , peer. their_node_id . map ( |p| p. 0 ) , None , None ) ;
2020- if peer. buffer_full_drop_gossip_broadcast ( ) {
2030+ if peer. buffer_full_drop_gossip_broadcast ( ) && !allow_large_buffer {
20212031 log_gossip ! ( logger, "Skipping broadcast message to {:?} as its outbound buffer is full" , peer. their_node_id) ;
20222032 continue ;
20232033 }
@@ -2099,6 +2109,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
20992109 }
21002110 }
21012111 }
2112+
2113+ // Handles a `MessageSendEvent`, using `from_chan_handler` to decide if we should
2114+ // robustly gossip broadcast events even if a peer's message buffer is full.
21022115 let mut handle_event = |event, from_chan_handler| {
21032116 match event {
21042117 MessageSendEvent :: SendAcceptChannel { ref node_id, ref msg } => {
@@ -2293,31 +2306,39 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
22932306 MessageSendEvent :: BroadcastChannelAnnouncement { msg, update_msg } => {
22942307 log_debug ! ( self . logger, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}" , msg. contents. short_channel_id) ;
22952308 match self . message_handler . route_handler . handle_channel_announcement ( None , & msg) {
2296- Ok ( _) | Err ( LightningError { action : msgs:: ErrorAction :: IgnoreDuplicateGossip , .. } ) =>
2297- self . forward_broadcast_msg ( peers, & wire:: Message :: ChannelAnnouncement ( msg) , None ) ,
2309+ Ok ( _) | Err ( LightningError { action : msgs:: ErrorAction :: IgnoreDuplicateGossip , .. } ) => {
2310+ let forward = wire:: Message :: ChannelAnnouncement ( msg) ;
2311+ self . forward_broadcast_msg ( peers, & forward, None , from_chan_handler) ;
2312+ } ,
22982313 _ => { } ,
22992314 }
23002315 if let Some ( msg) = update_msg {
23012316 match self . message_handler . route_handler . handle_channel_update ( None , & msg) {
2302- Ok ( _) | Err ( LightningError { action : msgs:: ErrorAction :: IgnoreDuplicateGossip , .. } ) =>
2303- self . forward_broadcast_msg ( peers, & wire:: Message :: ChannelUpdate ( msg) , None ) ,
2317+ Ok ( _) | Err ( LightningError { action : msgs:: ErrorAction :: IgnoreDuplicateGossip , .. } ) => {
2318+ let forward = wire:: Message :: ChannelUpdate ( msg) ;
2319+ self . forward_broadcast_msg ( peers, & forward, None , from_chan_handler) ;
2320+ } ,
23042321 _ => { } ,
23052322 }
23062323 }
23072324 } ,
23082325 MessageSendEvent :: BroadcastChannelUpdate { msg } => {
23092326 log_debug ! ( self . logger, "Handling BroadcastChannelUpdate event in peer_handler for contents {:?}" , msg. contents) ;
23102327 match self . message_handler . route_handler . handle_channel_update ( None , & msg) {
2311- Ok ( _) | Err ( LightningError { action : msgs:: ErrorAction :: IgnoreDuplicateGossip , .. } ) =>
2312- self . forward_broadcast_msg ( peers, & wire:: Message :: ChannelUpdate ( msg) , None ) ,
2328+ Ok ( _) | Err ( LightningError { action : msgs:: ErrorAction :: IgnoreDuplicateGossip , .. } ) => {
2329+ let forward = wire:: Message :: ChannelUpdate ( msg) ;
2330+ self . forward_broadcast_msg ( peers, & forward, None , from_chan_handler) ;
2331+ } ,
23132332 _ => { } ,
23142333 }
23152334 } ,
23162335 MessageSendEvent :: BroadcastNodeAnnouncement { msg } => {
23172336 log_debug ! ( self . logger, "Handling BroadcastNodeAnnouncement event in peer_handler for node {}" , msg. contents. node_id) ;
23182337 match self . message_handler . route_handler . handle_node_announcement ( None , & msg) {
2319- Ok ( _) | Err ( LightningError { action : msgs:: ErrorAction :: IgnoreDuplicateGossip , .. } ) =>
2320- self . forward_broadcast_msg ( peers, & wire:: Message :: NodeAnnouncement ( msg) , None ) ,
2338+ Ok ( _) | Err ( LightningError { action : msgs:: ErrorAction :: IgnoreDuplicateGossip , .. } ) => {
2339+ let forward = wire:: Message :: NodeAnnouncement ( msg) ;
2340+ self . forward_broadcast_msg ( peers, & forward, None , from_chan_handler) ;
2341+ } ,
23212342 _ => { } ,
23222343 }
23232344 } ,
@@ -2689,7 +2710,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
26892710
26902711 log_debug ! ( self . logger, "Broadcasting NodeAnnouncement after passing it to our own RoutingMessageHandler." ) ;
26912712 let _ = self . message_handler . route_handler . handle_node_announcement ( None , & msg) ;
2692- self . forward_broadcast_msg ( & * self . peers . read ( ) . unwrap ( ) , & wire:: Message :: NodeAnnouncement ( msg) , None ) ;
2713+ self . forward_broadcast_msg ( & * self . peers . read ( ) . unwrap ( ) , & wire:: Message :: NodeAnnouncement ( msg) , None , true ) ;
26932714 }
26942715}
26952716
0 commit comments