@@ -2725,20 +2725,21 @@ fn is_gossip_msg(type_id: u16) -> bool {
27252725
27262726#[ cfg( test) ]
27272727mod tests {
2728+ use super :: * ;
2729+
27282730 use crate :: sign:: { NodeSigner , Recipient } ;
27292731 use crate :: events;
27302732 use crate :: io;
27312733 use crate :: ln:: types:: ChannelId ;
27322734 use crate :: ln:: features:: { InitFeatures , NodeFeatures } ;
27332735 use crate :: ln:: peer_channel_encryptor:: PeerChannelEncryptor ;
2734- use crate :: ln:: peer_handler:: { CustomMessageHandler , PeerManager , MessageHandler , SocketDescriptor , IgnoringMessageHandler , filter_addresses, ErroringMessageHandler , MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER } ;
27352736 use crate :: ln:: { msgs, wire} ;
27362737 use crate :: ln:: msgs:: { Init , LightningError , SocketAddress } ;
27372738 use crate :: util:: test_utils;
27382739
27392740 use bitcoin:: Network ;
27402741 use bitcoin:: constants:: ChainHash ;
2741- use bitcoin:: secp256k1:: { PublicKey , SecretKey } ;
2742+ use bitcoin:: secp256k1:: { PublicKey , SecretKey , Secp256k1 } ;
27422743
27432744 use crate :: sync:: { Arc , Mutex } ;
27442745 use core:: convert:: Infallible ;
@@ -3196,6 +3197,8 @@ mod tests {
31963197 let cfgs = create_peermgr_cfgs ( 2 ) ;
31973198 cfgs[ 0 ] . routing_handler . request_full_sync . store ( true , Ordering :: Release ) ;
31983199 cfgs[ 1 ] . routing_handler . request_full_sync . store ( true , Ordering :: Release ) ;
3200+ cfgs[ 0 ] . routing_handler . announcement_available_for_sync . store ( true , Ordering :: Release ) ;
3201+ cfgs[ 1 ] . routing_handler . announcement_available_for_sync . store ( true , Ordering :: Release ) ;
31993202 let peers = create_network ( 2 , & cfgs) ;
32003203
32013204 // By calling establish_connect, we trigger do_attempt_write_data between
@@ -3359,6 +3362,79 @@ mod tests {
33593362 assert_eq ! ( peer_b. peers. read( ) . unwrap( ) . len( ) , 0 ) ;
33603363 }
33613364
3365+ #[ test]
3366+ fn test_gossip_flood_pause ( ) {
3367+ use crate :: routing:: test_utils:: channel_announcement;
3368+ use lightning_types:: features:: ChannelFeatures ;
3369+
3370+ // Simple test which connects two nodes to a PeerManager and checks that if we run out of
3371+ // socket buffer space we'll stop forwarding gossip but still push our own gossip.
3372+ let cfgs = create_peermgr_cfgs ( 2 ) ;
3373+ let peers = create_network ( 2 , & cfgs) ;
3374+ let ( mut fd_a, mut fd_b) = establish_connection ( & peers[ 0 ] , & peers[ 1 ] ) ;
3375+
3376+ macro_rules! drain_queues { ( ) => {
3377+ loop {
3378+ peers[ 0 ] . process_events( ) ;
3379+ peers[ 1 ] . process_events( ) ;
3380+
3381+ let msg = fd_a. outbound_data. lock( ) . unwrap( ) . split_off( 0 ) ;
3382+ if !msg. is_empty( ) {
3383+ assert_eq!( peers[ 1 ] . read_event( & mut fd_b, & msg) . unwrap( ) , false ) ;
3384+ continue ;
3385+ }
3386+ let msg = fd_b. outbound_data. lock( ) . unwrap( ) . split_off( 0 ) ;
3387+ if !msg. is_empty( ) {
3388+ assert_eq!( peers[ 0 ] . read_event( & mut fd_a, & msg) . unwrap( ) , false ) ;
3389+ continue ;
3390+ }
3391+ break ;
3392+ }
3393+ } }
3394+
3395+ // First, make sure all pending messages have been processed and queues drained.
3396+ drain_queues ! ( ) ;
3397+
3398+ let secp_ctx = Secp256k1 :: new ( ) ;
3399+ let key = SecretKey :: from_slice ( & [ 1 ; 32 ] ) . unwrap ( ) ;
3400+ let msg = channel_announcement ( & key, & key, ChannelFeatures :: empty ( ) , 42 , & secp_ctx) ;
3401+ let msg_ev = MessageSendEvent :: BroadcastChannelAnnouncement {
3402+ msg,
3403+ update_msg : None ,
3404+ } ;
3405+
3406+ fd_a. hang_writes . store ( true , Ordering :: Relaxed ) ;
3407+
3408+ // Now push an arbitrarily large number of messages and check that only
3409+ // `OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP` messages end up in the queue.
3410+ for _ in 0 ..OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP * 2 {
3411+ cfgs[ 0 ] . routing_handler . pending_events . lock ( ) . unwrap ( ) . push ( msg_ev. clone ( ) ) ;
3412+ peers[ 0 ] . process_events ( ) ;
3413+ }
3414+
3415+ assert_eq ! ( peers[ 0 ] . peers. read( ) . unwrap( ) . get( & fd_a) . unwrap( ) . lock( ) . unwrap( ) . gossip_broadcast_buffer. len( ) ,
3416+ OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP ) ;
3417+
3418+ // Check that if a broadcast message comes in from the channel handler (i.e. it is an
3419+ // announcement for our own channel), it gets queued anyway.
3420+ cfgs[ 0 ] . chan_handler . pending_events . lock ( ) . unwrap ( ) . push ( msg_ev) ;
3421+ peers[ 0 ] . process_events ( ) ;
3422+ assert_eq ! ( peers[ 0 ] . peers. read( ) . unwrap( ) . get( & fd_a) . unwrap( ) . lock( ) . unwrap( ) . gossip_broadcast_buffer. len( ) ,
3423+ OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + 1 ) ;
3424+
3425+ // Finally, deliver all the messages and make sure we got the right count. Note that there
3426+ // was an extra message that had already moved from the broadcast queue to the encrypted
3427+ // message queue so we actually receive `OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + 2` messages.
3428+ fd_a. hang_writes . store ( false , Ordering :: Relaxed ) ;
3429+ cfgs[ 1 ] . routing_handler . chan_anns_recvd . store ( 0 , Ordering :: Relaxed ) ;
3430+ peers[ 0 ] . write_buffer_space_avail ( & mut fd_a) . unwrap ( ) ;
3431+
3432+ drain_queues ! ( ) ;
3433+ assert ! ( peers[ 0 ] . peers. read( ) . unwrap( ) . get( & fd_a) . unwrap( ) . lock( ) . unwrap( ) . gossip_broadcast_buffer. is_empty( ) ) ;
3434+ assert_eq ! ( cfgs[ 1 ] . routing_handler. chan_anns_recvd. load( Ordering :: Relaxed ) ,
3435+ OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + 2 ) ;
3436+ }
3437+
33623438 #[ test]
33633439 fn test_filter_addresses ( ) {
33643440 // Tests the filter_addresses function.
0 commit comments