@@ -166,22 +166,27 @@ enum OnionMessageBuffer {
166166 /// Messages for a node connected as a peer.
167167 ConnectedPeer ( VecDeque < OnionMessage > ) ,
168168
169- /// Messages for a node that is not yet connected.
170- PendingConnection ( VecDeque < OnionMessage > , Option < Vec < SocketAddress > > ) ,
169+ /// Messages for a node that is not yet connected, which are dropped after a certain number of
170+ /// timer ticks defined in [`OnionMessenger::timer_tick_occurred`] and tracked here.
171+ PendingConnection ( VecDeque < OnionMessage > , Option < Vec < SocketAddress > > , usize ) ,
171172}
172173
173174impl OnionMessageBuffer {
175+ fn pending_connection ( addresses : Vec < SocketAddress > ) -> Self {
176+ Self :: PendingConnection ( VecDeque :: new ( ) , Some ( addresses) , 0 )
177+ }
178+
174179 fn pending_messages ( & self ) -> & VecDeque < OnionMessage > {
175180 match self {
176181 OnionMessageBuffer :: ConnectedPeer ( pending_messages) => pending_messages,
177- OnionMessageBuffer :: PendingConnection ( pending_messages, _) => pending_messages,
182+ OnionMessageBuffer :: PendingConnection ( pending_messages, _, _ ) => pending_messages,
178183 }
179184 }
180185
181186 fn enqueue_message ( & mut self , message : OnionMessage ) {
182187 let pending_messages = match self {
183188 OnionMessageBuffer :: ConnectedPeer ( pending_messages) => pending_messages,
184- OnionMessageBuffer :: PendingConnection ( pending_messages, _) => pending_messages,
189+ OnionMessageBuffer :: PendingConnection ( pending_messages, _, _ ) => pending_messages,
185190 } ;
186191
187192 pending_messages. push_back ( message) ;
@@ -190,7 +195,7 @@ impl OnionMessageBuffer {
190195 fn dequeue_message ( & mut self ) -> Option < OnionMessage > {
191196 let pending_messages = match self {
192197 OnionMessageBuffer :: ConnectedPeer ( pending_messages) => pending_messages,
193- OnionMessageBuffer :: PendingConnection ( pending_messages, _) => {
198+ OnionMessageBuffer :: PendingConnection ( pending_messages, _, _ ) => {
194199 debug_assert ! ( false ) ;
195200 pending_messages
196201 } ,
@@ -203,14 +208,14 @@ impl OnionMessageBuffer {
203208 fn release_pending_messages ( & mut self ) -> VecDeque < OnionMessage > {
204209 let pending_messages = match self {
205210 OnionMessageBuffer :: ConnectedPeer ( pending_messages) => pending_messages,
206- OnionMessageBuffer :: PendingConnection ( pending_messages, _) => pending_messages,
211+ OnionMessageBuffer :: PendingConnection ( pending_messages, _, _ ) => pending_messages,
207212 } ;
208213
209214 core:: mem:: take ( pending_messages)
210215 }
211216
212217 fn mark_connected ( & mut self ) {
213- if let OnionMessageBuffer :: PendingConnection ( pending_messages, _) = self {
218+ if let OnionMessageBuffer :: PendingConnection ( pending_messages, _, _ ) = self {
214219 let mut new_pending_messages = VecDeque :: new ( ) ;
215220 core:: mem:: swap ( pending_messages, & mut new_pending_messages) ;
216221 * self = OnionMessageBuffer :: ConnectedPeer ( new_pending_messages) ;
@@ -710,9 +715,8 @@ where
710715 hash_map:: Entry :: Vacant ( e) => match addresses {
711716 None => Err ( SendError :: InvalidFirstHop ( first_node_id) ) ,
712717 Some ( addresses) => {
713- e. insert (
714- OnionMessageBuffer :: PendingConnection ( VecDeque :: new ( ) , Some ( addresses) )
715- ) . enqueue_message ( onion_message) ;
718+ e. insert ( OnionMessageBuffer :: pending_connection ( addresses) )
719+ . enqueue_message ( onion_message) ;
716720 Ok ( SendSuccess :: BufferedAwaitingConnection ( first_node_id) )
717721 } ,
718722 } ,
@@ -795,7 +799,7 @@ where
795799{
796800 fn process_pending_events < H : Deref > ( & self , handler : H ) where H :: Target : EventHandler {
797801 for ( node_id, recipient) in self . message_buffers . lock ( ) . unwrap ( ) . iter_mut ( ) {
798- if let OnionMessageBuffer :: PendingConnection ( _, addresses) = recipient {
802+ if let OnionMessageBuffer :: PendingConnection ( _, addresses, _ ) = recipient {
799803 if let Some ( addresses) = addresses. take ( ) {
800804 handler. handle_event ( Event :: ConnectionNeeded { node_id : * node_id, addresses } ) ;
801805 }
@@ -896,6 +900,27 @@ where
896900 }
897901 }
898902
903+ fn timer_tick_occurred ( & self ) {
904+ const MAX_TIMER_TICKS : usize = 2 ;
905+ let mut message_buffers = self . message_buffers . lock ( ) . unwrap ( ) ;
906+
907+ // Drop any pending recipients since the last call to avoid retaining buffered messages for
908+ // too long.
909+ message_buffers. retain ( |_, recipient| match recipient {
910+ OnionMessageBuffer :: PendingConnection ( _, None , ticks) => * ticks < MAX_TIMER_TICKS ,
911+ OnionMessageBuffer :: PendingConnection ( _, Some ( _) , _) => true ,
912+ _ => true ,
913+ } ) ;
914+
915+ // Increment a timer tick for pending recipients so that their buffered messages are dropped
916+ // at MAX_TIMER_TICKS.
917+ for recipient in message_buffers. values_mut ( ) {
918+ if let OnionMessageBuffer :: PendingConnection ( _, None , ticks) = recipient {
919+ * ticks += 1 ;
920+ }
921+ }
922+ }
923+
899924 fn provided_node_features ( & self ) -> NodeFeatures {
900925 let mut features = NodeFeatures :: empty ( ) ;
901926 features. set_onion_messages_optional ( ) ;
0 commit comments