@@ -166,22 +166,27 @@ enum OnionMessageBuffer {
166
166
/// Messages for a node connected as a peer.
167
167
ConnectedPeer ( VecDeque < OnionMessage > ) ,
168
168
169
- /// Messages for a node that is not yet connected.
170
- PendingConnection ( VecDeque < OnionMessage > , Option < NodeAnnouncement > ) ,
169
+ /// Messages for a node that is not yet connected, which are dropped after a certain number of
170
+ /// timer ticks defined in OnionMessenger::release_pending_connections and tracked here.
171
+ PendingConnection ( VecDeque < OnionMessage > , Option < NodeAnnouncement > , usize ) ,
171
172
}
172
173
173
174
impl OnionMessageBuffer {
175
+ fn pending_connection ( node_announcement : NodeAnnouncement ) -> Self {
176
+ Self :: PendingConnection ( VecDeque :: new ( ) , Some ( node_announcement) , 0 )
177
+ }
178
+
174
179
fn pending_messages ( & self ) -> & VecDeque < OnionMessage > {
175
180
match self {
176
181
OnionMessageBuffer :: ConnectedPeer ( pending_messages) => pending_messages,
177
- OnionMessageBuffer :: PendingConnection ( pending_messages, _) => pending_messages,
182
+ OnionMessageBuffer :: PendingConnection ( pending_messages, _, _ ) => pending_messages,
178
183
}
179
184
}
180
185
181
186
fn enqueue_message ( & mut self , message : OnionMessage ) {
182
187
let pending_messages = match self {
183
188
OnionMessageBuffer :: ConnectedPeer ( pending_messages) => pending_messages,
184
- OnionMessageBuffer :: PendingConnection ( pending_messages, _) => pending_messages,
189
+ OnionMessageBuffer :: PendingConnection ( pending_messages, _, _ ) => pending_messages,
185
190
} ;
186
191
187
192
pending_messages. push_back ( message) ;
@@ -190,7 +195,7 @@ impl OnionMessageBuffer {
190
195
fn dequeue_message ( & mut self ) -> Option < OnionMessage > {
191
196
let pending_messages = match self {
192
197
OnionMessageBuffer :: ConnectedPeer ( pending_messages) => pending_messages,
193
- OnionMessageBuffer :: PendingConnection ( pending_messages, _) => pending_messages,
198
+ OnionMessageBuffer :: PendingConnection ( pending_messages, _, _ ) => pending_messages,
194
199
} ;
195
200
196
201
pending_messages. pop_front ( )
@@ -200,14 +205,14 @@ impl OnionMessageBuffer {
200
205
fn release_pending_messages ( & mut self ) -> VecDeque < OnionMessage > {
201
206
let pending_messages = match self {
202
207
OnionMessageBuffer :: ConnectedPeer ( pending_messages) => pending_messages,
203
- OnionMessageBuffer :: PendingConnection ( pending_messages, _) => pending_messages,
208
+ OnionMessageBuffer :: PendingConnection ( pending_messages, _, _ ) => pending_messages,
204
209
} ;
205
210
206
211
core:: mem:: take ( pending_messages)
207
212
}
208
213
209
214
fn mark_connected ( & mut self ) {
210
- if let OnionMessageBuffer :: PendingConnection ( pending_messages, _) = self {
215
+ if let OnionMessageBuffer :: PendingConnection ( pending_messages, _, _ ) = self {
211
216
let mut new_pending_messages = VecDeque :: new ( ) ;
212
217
core:: mem:: swap ( pending_messages, & mut new_pending_messages) ;
213
218
* self = OnionMessageBuffer :: ConnectedPeer ( new_pending_messages) ;
@@ -652,7 +657,9 @@ where
652
657
if node_announcement. contents . features . supports_onion_messages ( ) {
653
658
self . message_buffers . lock ( ) . unwrap ( )
654
659
. entry ( first_node_id)
655
- . or_insert_with ( || OnionMessageBuffer :: PendingConnection ( VecDeque :: new ( ) , Some ( node_announcement. clone ( ) ) ) )
660
+ . or_insert_with (
661
+ || OnionMessageBuffer :: pending_connection ( node_announcement. clone ( ) )
662
+ )
656
663
. enqueue_message ( onion_message) ;
657
664
log_trace ! (
658
665
self . logger, "Buffered onion message waiting on peer connection {}: {:?}" ,
@@ -797,22 +804,23 @@ where
797
804
}
798
805
799
806
fn release_pending_connections ( & self ) -> Vec < NodeAnnouncement > {
807
+ const MAX_TIMER_TICKS : usize = 2 ;
800
808
let mut message_buffers = self . message_buffers . lock ( ) . unwrap ( ) ;
801
809
802
- // Drop any pending recipients since the last call to avoid retaining buffered messages for
810
+ // Drop any pending recipients over MAX_TIMER_TICKS to avoid retaining buffered messages for
803
811
// too long.
804
812
message_buffers. retain ( |_, recipient| match recipient {
805
- OnionMessageBuffer :: PendingConnection ( _, None ) => false ,
806
- OnionMessageBuffer :: PendingConnection ( _, Some ( _) ) => true ,
813
+ OnionMessageBuffer :: PendingConnection ( _, _, ticks) => * ticks < MAX_TIMER_TICKS ,
807
814
_ => true ,
808
815
} ) ;
809
816
810
- // Release node announcements for pending recipients so that their buffered messages are
811
- // dropped the next time this method is called .
817
+ // Release node announcements and increment a timer tick for pending recipients so that
818
+ // their buffered messages are dropped at MAX_TIMER_TICKS .
812
819
message_buffers
813
820
. values_mut ( )
814
821
. filter_map ( |recipient| match recipient {
815
- OnionMessageBuffer :: PendingConnection ( _, node_announcement) => {
822
+ OnionMessageBuffer :: PendingConnection ( _, node_announcement, ticks) => {
823
+ * ticks += 1 ;
816
824
node_announcement. take ( )
817
825
} ,
818
826
_ => None ,
0 commit comments