@@ -164,22 +164,27 @@ enum OnionMessageBuffer {
164164 /// Messages for a node connected as a peer.
165165 ConnectedPeer ( VecDeque < OnionMessage > ) ,
166166
167- /// Messages for a node that is not yet connected.
168- PendingConnection ( VecDeque < OnionMessage > , Option < Vec < SocketAddress > > ) ,
167+ /// Messages for a node that is not yet connected, which are dropped after a certain number of
168+ /// timer ticks defined in OnionMessenger::release_pending_connections and tracked here.
169+ PendingConnection ( VecDeque < OnionMessage > , Option < Vec < SocketAddress > > , usize ) ,
169170}
170171
171172impl OnionMessageBuffer {
173+ fn pending_connection ( addresses : Vec < SocketAddress > ) -> Self {
174+ Self :: PendingConnection ( VecDeque :: new ( ) , Some ( addresses) , 0 )
175+ }
176+
172177 fn pending_messages ( & self ) -> & VecDeque < OnionMessage > {
173178 match self {
174179 OnionMessageBuffer :: ConnectedPeer ( pending_messages) => pending_messages,
175- OnionMessageBuffer :: PendingConnection ( pending_messages, _) => pending_messages,
180+ OnionMessageBuffer :: PendingConnection ( pending_messages, _, _ ) => pending_messages,
176181 }
177182 }
178183
179184 fn enqueue_message ( & mut self , message : OnionMessage ) {
180185 let pending_messages = match self {
181186 OnionMessageBuffer :: ConnectedPeer ( pending_messages) => pending_messages,
182- OnionMessageBuffer :: PendingConnection ( pending_messages, _) => pending_messages,
187+ OnionMessageBuffer :: PendingConnection ( pending_messages, _, _ ) => pending_messages,
183188 } ;
184189
185190 pending_messages. push_back ( message) ;
@@ -188,7 +193,7 @@ impl OnionMessageBuffer {
188193 fn dequeue_message ( & mut self ) -> Option < OnionMessage > {
189194 let pending_messages = match self {
190195 OnionMessageBuffer :: ConnectedPeer ( pending_messages) => pending_messages,
191- OnionMessageBuffer :: PendingConnection ( pending_messages, _) => {
196+ OnionMessageBuffer :: PendingConnection ( pending_messages, _, _ ) => {
192197 debug_assert ! ( false ) ;
193198 pending_messages
194199 } ,
@@ -201,14 +206,14 @@ impl OnionMessageBuffer {
201206 fn release_pending_messages ( & mut self ) -> VecDeque < OnionMessage > {
202207 let pending_messages = match self {
203208 OnionMessageBuffer :: ConnectedPeer ( pending_messages) => pending_messages,
204- OnionMessageBuffer :: PendingConnection ( pending_messages, _) => pending_messages,
209+ OnionMessageBuffer :: PendingConnection ( pending_messages, _, _ ) => pending_messages,
205210 } ;
206211
207212 core:: mem:: take ( pending_messages)
208213 }
209214
210215 fn mark_connected ( & mut self ) {
211- if let OnionMessageBuffer :: PendingConnection ( pending_messages, _) = self {
216+ if let OnionMessageBuffer :: PendingConnection ( pending_messages, _, _ ) = self {
212217 let mut new_pending_messages = VecDeque :: new ( ) ;
213218 core:: mem:: swap ( pending_messages, & mut new_pending_messages) ;
214219 * self = OnionMessageBuffer :: ConnectedPeer ( new_pending_messages) ;
@@ -670,9 +675,8 @@ where
670675 hash_map:: Entry :: Vacant ( e) => match addresses {
671676 None => Err ( SendError :: InvalidFirstHop ( first_node_id) ) ,
672677 Some ( addresses) => {
673- e. insert (
674- OnionMessageBuffer :: PendingConnection ( VecDeque :: new ( ) , Some ( addresses) )
675- ) . enqueue_message ( onion_message) ;
678+ e. insert ( OnionMessageBuffer :: pending_connection ( addresses) )
679+ . enqueue_message ( onion_message) ;
676680 Ok ( SendSuccess :: BufferedAwaitingConnection ( first_node_id) )
677681 } ,
678682 } ,
@@ -774,7 +778,7 @@ where
774778{
775779 fn process_pending_events < H : Deref > ( & self , handler : H ) where H :: Target : EventHandler {
776780 for ( node_id, recipient) in self . message_buffers . lock ( ) . unwrap ( ) . iter_mut ( ) {
777- if let OnionMessageBuffer :: PendingConnection ( _, addresses) = recipient {
781+ if let OnionMessageBuffer :: PendingConnection ( _, addresses, _ ) = recipient {
778782 if let Some ( addresses) = addresses. take ( ) {
779783 handler. handle_event ( Event :: ConnectionNeeded { node_id : * node_id, addresses } ) ;
780784 }
@@ -872,13 +876,24 @@ where
872876 }
873877
874878 fn timer_tick_occurred ( & self ) {
879+ const MAX_TIMER_TICKS : usize = 2 ;
880+ let mut message_buffers = self . message_buffers . lock ( ) . unwrap ( ) ;
881+
875882 // Drop any pending recipients since the last call to avoid retaining buffered messages for
876883 // too long.
877- self . message_buffers . lock ( ) . unwrap ( ) . retain ( |_, recipient| match recipient {
878- OnionMessageBuffer :: PendingConnection ( _, None ) => false ,
879- OnionMessageBuffer :: PendingConnection ( _, Some ( _) ) => true ,
884+ message_buffers. retain ( |_, recipient| match recipient {
885+ OnionMessageBuffer :: PendingConnection ( _, None , ticks ) => * ticks < MAX_TIMER_TICKS ,
886+ OnionMessageBuffer :: PendingConnection ( _, Some ( _) , _ ) => true ,
880887 _ => true ,
881888 } ) ;
889+
890+ // Increment a timer tick for pending recipients so that their buffered messages are dropped
891+ // at MAX_TIMER_TICKS.
892+ for recipient in message_buffers. values_mut ( ) {
893+ if let OnionMessageBuffer :: PendingConnection ( _, Some ( _) , ticks) = recipient {
894+ * ticks += 1 ;
895+ }
896+ }
882897 }
883898
884899 fn provided_node_features ( & self ) -> NodeFeatures {
0 commit comments