@@ -718,19 +718,11 @@ enum MessageBatchImpl {
718718 CommitmentSigned ( Vec < msgs:: CommitmentSigned > ) ,
719719}
720720
721- /// The ratio between buffer sizes at which we stop sending initial sync messages vs when we stop
722- /// forwarding gossip messages to peers altogether.
723- const FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO : usize = 2 ;
724-
725721/// When the outbound buffer has this many messages, we'll stop reading bytes from the peer until
726722/// we have fewer than this many messages in the outbound buffer again.
727723/// We also use this as the target number of outbound gossip messages to keep in the write buffer,
728724/// refilled as we send bytes.
729725const OUTBOUND_BUFFER_LIMIT_READ_PAUSE : usize = 12 ;
730- /// When the outbound buffer has this many messages, we'll simply skip relaying gossip messages to
731- /// the peer.
732- const OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP : usize =
733- OUTBOUND_BUFFER_LIMIT_READ_PAUSE * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO ;
734726
735727/// If we've sent a ping, and are still awaiting a response, we may need to churn our way through
736728/// the socket receive buffer before receiving the ping.
@@ -754,10 +746,20 @@ const MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER: i8 = 4;
754746/// process before the next ping.
755747///
756748/// Note that we continue responding to other messages even after we've sent this many messages, so
757- /// it's more of a general guideline used for gossip backfill (and gossip forwarding, times
758- /// [`FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO`]) than a hard limit.
749+ /// this really limits gossip broadcast, gossip backfill, and onion message relay.
759750const BUFFER_DRAIN_MSGS_PER_TICK : usize = 32 ;
760751
752+ /// The maximum number of bytes which we allow in a peer's outbound buffers before we start
753+ /// dropping outbound gossip forwards.
754+ ///
755+ /// This is currently 128KiB, or two messages at the maximum message size (though in practice we
756+ /// refuse to forward gossip messages which are substantially larger than we expect, so this is
757+ /// closer to ~85 messages if all queued messages are maximum-sized channel announcements).
758+ ///
759+ /// Note that as we always drain the gossip forwarding queue before continuing gossip backfill,
760+ /// the equivalent maximum buffer size for gossip backfill is zero.
761+ const OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP : usize = 64 * 1024 * 2 ;
762+
761763struct Peer {
762764 channel_encryptor : PeerChannelEncryptor ,
763765 /// We cache a `NodeId` here to avoid serializing peers' keys every time we forward gossip
@@ -889,12 +891,11 @@ impl Peer {
889891
890892 /// Returns whether this peer's outbound buffers are full and we should drop gossip broadcasts.
891893 fn buffer_full_drop_gossip_broadcast ( & self ) -> bool {
892- let total_outbound_buffered =
893- self . gossip_broadcast_buffer . len ( ) + self . pending_outbound_buffer . len ( ) ;
894+ let total_outbound_buffered: usize =
895+ self . gossip_broadcast_buffer . iter ( ) . map ( |m| m. capacity ( ) ) . sum :: < usize > ( )
896+ + self . pending_outbound_buffer . iter ( ) . map ( |m| m. capacity ( ) ) . sum :: < usize > ( ) ;
894897
895- total_outbound_buffered > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
896- || self . msgs_sent_since_pong
897- > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
898+ total_outbound_buffered > OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP
898899 }
899900
900901 fn set_their_node_id ( & mut self , node_id : PublicKey ) {
@@ -4693,22 +4694,27 @@ mod tests {
46934694 let secp_ctx = Secp256k1 :: new ( ) ;
46944695 let key = SecretKey :: from_slice ( & [ 1 ; 32 ] ) . unwrap ( ) ;
46954696 let msg = channel_announcement ( & key, & key, ChannelFeatures :: empty ( ) , 42 , & secp_ctx) ;
4697+ // The message bufer size is the message length plus two 16-byte MACs plus a 2-byte length
4698+ // and 2-byte type.
4699+ let encoded_size = msg. serialized_length ( ) + 16 * 2 + 2 + 2 ;
46964700 let msg_ev = MessageSendEvent :: BroadcastChannelAnnouncement { msg, update_msg : None } ;
46974701
46984702 fd_a. hang_writes . store ( true , Ordering :: Relaxed ) ;
46994703
47004704 // Now push an arbitrarily large number of messages and check that only
4701- // `OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP` messages end up in the queue.
4702- for _ in 0 ..OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP * 2 {
4705+ // `OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP` message bytes end up in the queue.
4706+ for _ in 0 ..OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP / encoded_size {
47034707 cfgs[ 0 ] . routing_handler . pending_events . lock ( ) . unwrap ( ) . push ( msg_ev. clone ( ) ) ;
47044708 peers[ 0 ] . process_events ( ) ;
47054709 }
47064710
47074711 {
47084712 let peer_a_lock = peers[ 0 ] . peers . read ( ) . unwrap ( ) ;
4709- let buf_len =
4710- peer_a_lock. get ( & fd_a) . unwrap ( ) . lock ( ) . unwrap ( ) . gossip_broadcast_buffer . len ( ) ;
4711- assert_eq ! ( buf_len, OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP ) ;
4713+ let peer = peer_a_lock. get ( & fd_a) . unwrap ( ) . lock ( ) . unwrap ( ) ;
4714+ let buf_len = peer. pending_outbound_buffer . iter ( ) . map ( |m| m. capacity ( ) ) . sum :: < usize > ( )
4715+ + peer. gossip_broadcast_buffer . iter ( ) . map ( |m| m. capacity ( ) ) . sum :: < usize > ( ) ;
4716+ assert ! ( buf_len > OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP - encoded_size) ;
4717+ assert ! ( buf_len < OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP ) ;
47124718 }
47134719
47144720 // Check that if a broadcast message comes in from the channel handler (i.e. it is an
@@ -4718,14 +4724,17 @@ mod tests {
47184724
47194725 {
47204726 let peer_a_lock = peers[ 0 ] . peers . read ( ) . unwrap ( ) ;
4721- let buf_len =
4722- peer_a_lock. get ( & fd_a) . unwrap ( ) . lock ( ) . unwrap ( ) . gossip_broadcast_buffer . len ( ) ;
4723- assert_eq ! ( buf_len, OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + 1 ) ;
4727+ let peer = peer_a_lock. get ( & fd_a) . unwrap ( ) . lock ( ) . unwrap ( ) ;
4728+ let buf_len = peer. pending_outbound_buffer . iter ( ) . map ( |m| m. capacity ( ) ) . sum :: < usize > ( )
4729+ + peer. gossip_broadcast_buffer . iter ( ) . map ( |m| m. capacity ( ) ) . sum :: < usize > ( ) ;
4730+ assert ! ( buf_len > OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP ) ;
4731+ assert ! ( buf_len < OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP + encoded_size) ;
47244732 }
47254733
47264734 // Finally, deliver all the messages and make sure we got the right count. Note that there
47274735 // was an extra message that had already moved from the broadcast queue to the encrypted
4728- // message queue so we actually receive `OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + 2` messages.
4736+ // message queue so we actually receive `OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP + 2`
4737+ // message bytes.
47294738 fd_a. hang_writes . store ( false , Ordering :: Relaxed ) ;
47304739 cfgs[ 1 ] . routing_handler . chan_anns_recvd . store ( 0 , Ordering :: Relaxed ) ;
47314740 peers[ 0 ] . write_buffer_space_avail ( & mut fd_a) . unwrap ( ) ;
@@ -4740,7 +4749,7 @@ mod tests {
47404749
47414750 assert_eq ! (
47424751 cfgs[ 1 ] . routing_handler. chan_anns_recvd. load( Ordering :: Relaxed ) ,
4743- OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + 2
4752+ OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP / encoded_size + 1
47444753 ) ;
47454754 }
47464755
0 commit comments