@@ -2729,20 +2729,21 @@ fn is_gossip_msg(type_id: u16) -> bool {
27292729
27302730#[ cfg( test) ]
27312731mod tests {
2732+ use super :: * ;
2733+
27322734 use crate :: sign:: { NodeSigner , Recipient } ;
27332735 use crate :: events;
27342736 use crate :: io;
27352737 use crate :: ln:: types:: ChannelId ;
27362738 use crate :: ln:: features:: { InitFeatures , NodeFeatures } ;
27372739 use crate :: ln:: peer_channel_encryptor:: PeerChannelEncryptor ;
2738- use crate :: ln:: peer_handler:: { CustomMessageHandler , PeerManager , MessageHandler , SocketDescriptor , IgnoringMessageHandler , filter_addresses, ErroringMessageHandler , MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER } ;
27392740 use crate :: ln:: { msgs, wire} ;
27402741 use crate :: ln:: msgs:: { Init , LightningError , SocketAddress } ;
27412742 use crate :: util:: test_utils;
27422743
27432744 use bitcoin:: Network ;
27442745 use bitcoin:: constants:: ChainHash ;
2745- use bitcoin:: secp256k1:: { PublicKey , SecretKey } ;
2746+ use bitcoin:: secp256k1:: { PublicKey , SecretKey , Secp256k1 } ;
27462747
27472748 use crate :: sync:: { Arc , Mutex } ;
27482749 use core:: convert:: Infallible ;
@@ -3200,6 +3201,8 @@ mod tests {
32003201 let cfgs = create_peermgr_cfgs ( 2 ) ;
32013202 cfgs[ 0 ] . routing_handler . request_full_sync . store ( true , Ordering :: Release ) ;
32023203 cfgs[ 1 ] . routing_handler . request_full_sync . store ( true , Ordering :: Release ) ;
3204+ cfgs[ 0 ] . routing_handler . announcement_available_for_sync . store ( true , Ordering :: Release ) ;
3205+ cfgs[ 1 ] . routing_handler . announcement_available_for_sync . store ( true , Ordering :: Release ) ;
32033206 let peers = create_network ( 2 , & cfgs) ;
32043207
32053208 // By calling establish_connect, we trigger do_attempt_write_data between
@@ -3363,6 +3366,79 @@ mod tests {
33633366 assert_eq ! ( peer_b. peers. read( ) . unwrap( ) . len( ) , 0 ) ;
33643367 }
33653368
3369+ #[ test]
3370+ fn test_gossip_flood_pause ( ) {
3371+ use crate :: routing:: test_utils:: channel_announcement;
3372+ use lightning_types:: features:: ChannelFeatures ;
3373+
3374+ // Simple test which connects two nodes to a PeerManager and checks that if we run out of
3375+ // socket buffer space we'll stop forwarding gossip but still push our own gossip.
3376+ let cfgs = create_peermgr_cfgs ( 2 ) ;
3377+ let peers = create_network ( 2 , & cfgs) ;
3378+ let ( mut fd_a, mut fd_b) = establish_connection ( & peers[ 0 ] , & peers[ 1 ] ) ;
3379+
3380+ macro_rules! drain_queues { ( ) => {
3381+ loop {
3382+ peers[ 0 ] . process_events( ) ;
3383+ peers[ 1 ] . process_events( ) ;
3384+
3385+ let msg = fd_a. outbound_data. lock( ) . unwrap( ) . split_off( 0 ) ;
3386+ if !msg. is_empty( ) {
3387+ assert_eq!( peers[ 1 ] . read_event( & mut fd_b, & msg) . unwrap( ) , false ) ;
3388+ continue ;
3389+ }
3390+ let msg = fd_b. outbound_data. lock( ) . unwrap( ) . split_off( 0 ) ;
3391+ if !msg. is_empty( ) {
3392+ assert_eq!( peers[ 0 ] . read_event( & mut fd_a, & msg) . unwrap( ) , false ) ;
3393+ continue ;
3394+ }
3395+ break ;
3396+ }
3397+ } }
3398+
3399+ // First, make sure all pending messages have been processed and queues drained.
3400+ drain_queues ! ( ) ;
3401+
3402+ let secp_ctx = Secp256k1 :: new ( ) ;
3403+ let key = SecretKey :: from_slice ( & [ 1 ; 32 ] ) . unwrap ( ) ;
3404+ let msg = channel_announcement ( & key, & key, ChannelFeatures :: empty ( ) , 42 , & secp_ctx) ;
3405+ let msg_ev = MessageSendEvent :: BroadcastChannelAnnouncement {
3406+ msg,
3407+ update_msg : None ,
3408+ } ;
3409+
3410+ fd_a. hang_writes . store ( true , Ordering :: Relaxed ) ;
3411+
3412+ // Now push an arbitrarily large number of messages and check that only
3413+ // `OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP` messages end up in the queue.
3414+ for _ in 0 ..OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP * 2 {
3415+ cfgs[ 0 ] . routing_handler . pending_events . lock ( ) . unwrap ( ) . push ( msg_ev. clone ( ) ) ;
3416+ peers[ 0 ] . process_events ( ) ;
3417+ }
3418+
3419+ assert_eq ! ( peers[ 0 ] . peers. read( ) . unwrap( ) . get( & fd_a) . unwrap( ) . lock( ) . unwrap( ) . gossip_broadcast_buffer. len( ) ,
3420+ OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP ) ;
3421+
3422+ // Check that if a broadcast message comes in from the channel handler (i.e. it is an
3423+ // announcement for our own channel), it gets queued anyway.
3424+ cfgs[ 0 ] . chan_handler . pending_events . lock ( ) . unwrap ( ) . push ( msg_ev) ;
3425+ peers[ 0 ] . process_events ( ) ;
3426+ assert_eq ! ( peers[ 0 ] . peers. read( ) . unwrap( ) . get( & fd_a) . unwrap( ) . lock( ) . unwrap( ) . gossip_broadcast_buffer. len( ) ,
3427+ OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + 1 ) ;
3428+
3429+ // Finally, deliver all the messages and make sure we got the right count. Note that there
3430+ // was an extra message that had already moved from the broadcast queue to the encrypted
3431+ // message queue so we actually receive `OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + 2` messages.
3432+ fd_a. hang_writes . store ( false , Ordering :: Relaxed ) ;
3433+ cfgs[ 1 ] . routing_handler . chan_anns_recvd . store ( 0 , Ordering :: Relaxed ) ;
3434+ peers[ 0 ] . write_buffer_space_avail ( & mut fd_a) . unwrap ( ) ;
3435+
3436+ drain_queues ! ( ) ;
3437+ assert ! ( peers[ 0 ] . peers. read( ) . unwrap( ) . get( & fd_a) . unwrap( ) . lock( ) . unwrap( ) . gossip_broadcast_buffer. is_empty( ) ) ;
3438+ assert_eq ! ( cfgs[ 1 ] . routing_handler. chan_anns_recvd. load( Ordering :: Relaxed ) ,
3439+ OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + 2 ) ;
3440+ }
3441+
33663442 #[ test]
33673443 fn test_filter_addresses ( ) {
33683444 // Tests the filter_addresses function.
0 commit comments