@@ -164,22 +164,27 @@ enum OnionMessageBuffer {
164164 /// Messages for a node connected as a peer.
165165 ConnectedPeer ( VecDeque < OnionMessage > ) ,
166166
167- /// Messages for a node that is not yet connected.
168- PendingConnection ( VecDeque < OnionMessage > , Option < Vec < SocketAddress > > ) ,
167+ /// Messages for a node that is not yet connected, which are dropped after a certain number of
168+ /// timer ticks defined in OnionMessenger::release_pending_connections and tracked here.
169+ PendingConnection ( VecDeque < OnionMessage > , Option < Vec < SocketAddress > > , usize ) ,
169170}
170171
171172impl OnionMessageBuffer {
173+ fn pending_connection ( addresses : Vec < SocketAddress > ) -> Self {
174+ Self :: PendingConnection ( VecDeque :: new ( ) , Some ( addresses) , 0 )
175+ }
176+
172177 fn pending_messages ( & self ) -> & VecDeque < OnionMessage > {
173178 match self {
174179 OnionMessageBuffer :: ConnectedPeer ( pending_messages) => pending_messages,
175- OnionMessageBuffer :: PendingConnection ( pending_messages, _) => pending_messages,
180+ OnionMessageBuffer :: PendingConnection ( pending_messages, _, _ ) => pending_messages,
176181 }
177182 }
178183
179184 fn enqueue_message ( & mut self , message : OnionMessage ) {
180185 let pending_messages = match self {
181186 OnionMessageBuffer :: ConnectedPeer ( pending_messages) => pending_messages,
182- OnionMessageBuffer :: PendingConnection ( pending_messages, _) => pending_messages,
187+ OnionMessageBuffer :: PendingConnection ( pending_messages, _, _ ) => pending_messages,
183188 } ;
184189
185190 pending_messages. push_back ( message) ;
@@ -188,7 +193,7 @@ impl OnionMessageBuffer {
188193 fn dequeue_message ( & mut self ) -> Option < OnionMessage > {
189194 let pending_messages = match self {
190195 OnionMessageBuffer :: ConnectedPeer ( pending_messages) => pending_messages,
191- OnionMessageBuffer :: PendingConnection ( pending_messages, _) => pending_messages,
196+ OnionMessageBuffer :: PendingConnection ( pending_messages, _, _ ) => pending_messages,
192197 } ;
193198
194199 pending_messages. pop_front ( )
@@ -198,14 +203,14 @@ impl OnionMessageBuffer {
198203 fn release_pending_messages ( & mut self ) -> VecDeque < OnionMessage > {
199204 let pending_messages = match self {
200205 OnionMessageBuffer :: ConnectedPeer ( pending_messages) => pending_messages,
201- OnionMessageBuffer :: PendingConnection ( pending_messages, _) => pending_messages,
206+ OnionMessageBuffer :: PendingConnection ( pending_messages, _, _ ) => pending_messages,
202207 } ;
203208
204209 core:: mem:: take ( pending_messages)
205210 }
206211
207212 fn mark_connected ( & mut self ) {
208- if let OnionMessageBuffer :: PendingConnection ( pending_messages, _) = self {
213+ if let OnionMessageBuffer :: PendingConnection ( pending_messages, _, _ ) = self {
209214 let mut new_pending_messages = VecDeque :: new ( ) ;
210215 core:: mem:: swap ( pending_messages, & mut new_pending_messages) ;
211216 * self = OnionMessageBuffer :: ConnectedPeer ( new_pending_messages) ;
@@ -667,9 +672,8 @@ where
667672 hash_map:: Entry :: Vacant ( e) => match addresses {
668673 None => Err ( SendError :: InvalidFirstHop ( first_node_id) ) ,
669674 Some ( addresses) => {
670- e. insert (
671- OnionMessageBuffer :: PendingConnection ( VecDeque :: new ( ) , Some ( addresses) )
672- ) . enqueue_message ( onion_message) ;
675+ e. insert ( OnionMessageBuffer :: pending_connection ( addresses) )
676+ . enqueue_message ( onion_message) ;
673677 Ok ( SendSuccess :: BufferedAwaitingConnection ( first_node_id) )
674678 } ,
675679 } ,
@@ -771,7 +775,7 @@ where
771775{
772776 fn process_pending_events < H : Deref > ( & self , handler : H ) where H :: Target : EventHandler {
773777 for ( node_id, recipient) in self . message_buffers . lock ( ) . unwrap ( ) . iter_mut ( ) {
774- if let OnionMessageBuffer :: PendingConnection ( _, addresses) = recipient {
778+ if let OnionMessageBuffer :: PendingConnection ( _, addresses, _ ) = recipient {
775779 if let Some ( addresses) = addresses. take ( ) {
776780 handler. handle_event ( Event :: ConnectionNeeded { node_id : * node_id, addresses } ) ;
777781 }
@@ -867,13 +871,24 @@ where
867871 }
868872
869873 fn timer_tick_occurred ( & self ) {
874+ const MAX_TIMER_TICKS : usize = 2 ;
875+ let mut message_buffers = self . message_buffers . lock ( ) . unwrap ( ) ;
876+
870877 // Drop any pending recipients since the last call to avoid retaining buffered messages for
871878 // too long.
872- self . message_buffers . lock ( ) . unwrap ( ) . retain ( |_, recipient| match recipient {
873- OnionMessageBuffer :: PendingConnection ( _, None ) => false ,
874- OnionMessageBuffer :: PendingConnection ( _, Some ( _) ) => true ,
879+ message_buffers. retain ( |_, recipient| match recipient {
880+ OnionMessageBuffer :: PendingConnection ( _, None , ticks ) => * ticks < MAX_TIMER_TICKS ,
881+ OnionMessageBuffer :: PendingConnection ( _, Some ( _) , _ ) => true ,
875882 _ => true ,
876883 } ) ;
884+
885+ // Increment a timer tick for pending recipients so that their buffered messages are dropped
886+ // at MAX_TIMER_TICKS.
887+ for recipient in message_buffers. values_mut ( ) {
888+ if let OnionMessageBuffer :: PendingConnection ( _, Some ( _) , ticks) = recipient {
889+ * ticks += 1 ;
890+ }
891+ }
877892 }
878893
879894 fn provided_node_features ( & self ) -> NodeFeatures {
0 commit comments