@@ -156,15 +156,15 @@ where
156
156
entropy_source : ES ,
157
157
node_signer : NS ,
158
158
logger : L ,
159
- message_buffers : Mutex < HashMap < PublicKey , OnionMessageBuffer > > ,
159
+ message_recipients : Mutex < HashMap < PublicKey , OnionMessageRecipient > > ,
160
160
secp_ctx : Secp256k1 < secp256k1:: All > ,
161
161
message_router : MR ,
162
162
offers_handler : OMH ,
163
163
custom_handler : CMH ,
164
164
}
165
165
166
166
/// [`OnionMessage`]s buffered to be sent.
167
- enum OnionMessageBuffer {
167
+ enum OnionMessageRecipient {
168
168
/// Messages for a node connected as a peer.
169
169
ConnectedPeer ( VecDeque < OnionMessage > ) ,
170
170
@@ -173,31 +173,31 @@ enum OnionMessageBuffer {
173
173
PendingConnection ( VecDeque < OnionMessage > , Option < Vec < SocketAddress > > , usize ) ,
174
174
}
175
175
176
- impl OnionMessageBuffer {
176
+ impl OnionMessageRecipient {
177
177
fn pending_connection ( addresses : Vec < SocketAddress > ) -> Self {
178
178
Self :: PendingConnection ( VecDeque :: new ( ) , Some ( addresses) , 0 )
179
179
}
180
180
181
181
fn pending_messages ( & self ) -> & VecDeque < OnionMessage > {
182
182
match self {
183
- OnionMessageBuffer :: ConnectedPeer ( pending_messages) => pending_messages,
184
- OnionMessageBuffer :: PendingConnection ( pending_messages, _, _) => pending_messages,
183
+ OnionMessageRecipient :: ConnectedPeer ( pending_messages) => pending_messages,
184
+ OnionMessageRecipient :: PendingConnection ( pending_messages, _, _) => pending_messages,
185
185
}
186
186
}
187
187
188
188
fn enqueue_message ( & mut self , message : OnionMessage ) {
189
189
let pending_messages = match self {
190
- OnionMessageBuffer :: ConnectedPeer ( pending_messages) => pending_messages,
191
- OnionMessageBuffer :: PendingConnection ( pending_messages, _, _) => pending_messages,
190
+ OnionMessageRecipient :: ConnectedPeer ( pending_messages) => pending_messages,
191
+ OnionMessageRecipient :: PendingConnection ( pending_messages, _, _) => pending_messages,
192
192
} ;
193
193
194
194
pending_messages. push_back ( message) ;
195
195
}
196
196
197
197
fn dequeue_message ( & mut self ) -> Option < OnionMessage > {
198
198
let pending_messages = match self {
199
- OnionMessageBuffer :: ConnectedPeer ( pending_messages) => pending_messages,
200
- OnionMessageBuffer :: PendingConnection ( pending_messages, _, _) => {
199
+ OnionMessageRecipient :: ConnectedPeer ( pending_messages) => pending_messages,
200
+ OnionMessageRecipient :: PendingConnection ( pending_messages, _, _) => {
201
201
debug_assert ! ( false ) ;
202
202
pending_messages
203
203
} ,
@@ -209,18 +209,18 @@ impl OnionMessageBuffer {
209
209
#[ cfg( test) ]
210
210
fn release_pending_messages ( & mut self ) -> VecDeque < OnionMessage > {
211
211
let pending_messages = match self {
212
- OnionMessageBuffer :: ConnectedPeer ( pending_messages) => pending_messages,
213
- OnionMessageBuffer :: PendingConnection ( pending_messages, _, _) => pending_messages,
212
+ OnionMessageRecipient :: ConnectedPeer ( pending_messages) => pending_messages,
213
+ OnionMessageRecipient :: PendingConnection ( pending_messages, _, _) => pending_messages,
214
214
} ;
215
215
216
216
core:: mem:: take ( pending_messages)
217
217
}
218
218
219
219
fn mark_connected ( & mut self ) {
220
- if let OnionMessageBuffer :: PendingConnection ( pending_messages, _, _) = self {
220
+ if let OnionMessageRecipient :: PendingConnection ( pending_messages, _, _) = self {
221
221
let mut new_pending_messages = VecDeque :: new ( ) ;
222
222
core:: mem:: swap ( pending_messages, & mut new_pending_messages) ;
223
- * self = OnionMessageBuffer :: ConnectedPeer ( new_pending_messages) ;
223
+ * self = OnionMessageRecipient :: ConnectedPeer ( new_pending_messages) ;
224
224
}
225
225
}
226
226
}
@@ -631,7 +631,7 @@ where
631
631
OnionMessenger {
632
632
entropy_source,
633
633
node_signer,
634
- message_buffers : Mutex :: new ( HashMap :: new ( ) ) ,
634
+ message_recipients : Mutex :: new ( HashMap :: new ( ) ) ,
635
635
secp_ctx,
636
636
logger,
637
637
message_router,
@@ -659,9 +659,9 @@ where
659
659
. get_node_id ( Recipient :: Node )
660
660
. map_err ( |_| SendError :: GetNodeIdFailed ) ?;
661
661
662
- let peers = self . message_buffers . lock ( ) . unwrap ( )
662
+ let peers = self . message_recipients . lock ( ) . unwrap ( )
663
663
. iter ( )
664
- . filter ( |( _, buffer ) | matches ! ( buffer , OnionMessageBuffer :: ConnectedPeer ( _) ) )
664
+ . filter ( |( _, recipient ) | matches ! ( recipient , OnionMessageRecipient :: ConnectedPeer ( _) ) )
665
665
. map ( |( node_id, _) | * node_id)
666
666
. collect ( ) ;
667
667
@@ -704,16 +704,16 @@ where
704
704
& self . entropy_source , & self . node_signer , & self . secp_ctx , path, contents, reply_path
705
705
) ?;
706
706
707
- let mut message_buffers = self . message_buffers . lock ( ) . unwrap ( ) ;
708
- if outbound_buffer_full ( & first_node_id, & message_buffers ) {
707
+ let mut message_recipients = self . message_recipients . lock ( ) . unwrap ( ) ;
708
+ if outbound_buffer_full ( & first_node_id, & message_recipients ) {
709
709
return Err ( SendError :: BufferFull ) ;
710
710
}
711
711
712
- match message_buffers . entry ( first_node_id) {
712
+ match message_recipients . entry ( first_node_id) {
713
713
hash_map:: Entry :: Vacant ( e) => match addresses {
714
714
None => Err ( SendError :: InvalidFirstHop ( first_node_id) ) ,
715
715
Some ( addresses) => {
716
- e. insert ( OnionMessageBuffer :: pending_connection ( addresses) )
716
+ e. insert ( OnionMessageRecipient :: pending_connection ( addresses) )
717
717
. enqueue_message ( onion_message) ;
718
718
Ok ( SendSuccess :: BufferedAwaitingConnection ( first_node_id) )
719
719
} ,
@@ -744,18 +744,18 @@ where
744
744
745
745
#[ cfg( test) ]
746
746
pub ( super ) fn release_pending_msgs ( & self ) -> HashMap < PublicKey , VecDeque < OnionMessage > > {
747
- let mut message_buffers = self . message_buffers . lock ( ) . unwrap ( ) ;
747
+ let mut message_recipients = self . message_recipients . lock ( ) . unwrap ( ) ;
748
748
let mut msgs = HashMap :: new ( ) ;
749
749
// We don't want to disconnect the peers by removing them entirely from the original map, so we
750
750
// release the pending message buffers individually.
751
- for ( peer_node_id , buffer ) in & mut * message_buffers {
752
- msgs. insert ( * peer_node_id , buffer . release_pending_messages ( ) ) ;
751
+ for ( node_id , recipient ) in & mut * message_recipients {
752
+ msgs. insert ( * node_id , recipient . release_pending_messages ( ) ) ;
753
753
}
754
754
msgs
755
755
}
756
756
}
757
757
758
- fn outbound_buffer_full ( peer_node_id : & PublicKey , buffer : & HashMap < PublicKey , OnionMessageBuffer > ) -> bool {
758
+ fn outbound_buffer_full ( peer_node_id : & PublicKey , buffer : & HashMap < PublicKey , OnionMessageRecipient > ) -> bool {
759
759
const MAX_TOTAL_BUFFER_SIZE : usize = ( 1 << 20 ) * 128 ;
760
760
const MAX_PER_PEER_BUFFER_SIZE : usize = ( 1 << 10 ) * 256 ;
761
761
let mut total_buffered_bytes = 0 ;
@@ -789,8 +789,8 @@ where
789
789
CMH :: Target : CustomOnionMessageHandler ,
790
790
{
791
791
fn process_pending_events < H : Deref > ( & self , handler : H ) where H :: Target : EventHandler {
792
- for ( node_id, recipient) in self . message_buffers . lock ( ) . unwrap ( ) . iter_mut ( ) {
793
- if let OnionMessageBuffer :: PendingConnection ( _, addresses, _) = recipient {
792
+ for ( node_id, recipient) in self . message_recipients . lock ( ) . unwrap ( ) . iter_mut ( ) {
793
+ if let OnionMessageRecipient :: PendingConnection ( _, addresses, _) = recipient {
794
794
if let Some ( addresses) = addresses. take ( ) {
795
795
handler. handle_event ( Event :: ConnectionNeeded { node_id : * node_id, addresses } ) ;
796
796
}
@@ -841,20 +841,20 @@ where
841
841
}
842
842
} ,
843
843
Ok ( PeeledOnion :: Forward ( next_node_id, onion_message) ) => {
844
- let mut message_buffers = self . message_buffers . lock ( ) . unwrap ( ) ;
845
- if outbound_buffer_full ( & next_node_id, & message_buffers ) {
844
+ let mut message_recipients = self . message_recipients . lock ( ) . unwrap ( ) ;
845
+ if outbound_buffer_full ( & next_node_id, & message_recipients ) {
846
846
log_trace ! ( self . logger, "Dropping forwarded onion message to peer {:?}: outbound buffer full" , next_node_id) ;
847
847
return
848
848
}
849
849
850
850
#[ cfg( fuzzing) ]
851
- message_buffers
851
+ message_recipients
852
852
. entry ( next_node_id)
853
- . or_insert_with ( || OnionMessageBuffer :: ConnectedPeer ( VecDeque :: new ( ) ) ) ;
853
+ . or_insert_with ( || OnionMessageRecipient :: ConnectedPeer ( VecDeque :: new ( ) ) ) ;
854
854
855
- match message_buffers . entry ( next_node_id) {
855
+ match message_recipients . entry ( next_node_id) {
856
856
hash_map:: Entry :: Occupied ( mut e) if matches ! (
857
- e. get( ) , OnionMessageBuffer :: ConnectedPeer ( ..)
857
+ e. get( ) , OnionMessageRecipient :: ConnectedPeer ( ..)
858
858
) => {
859
859
e. get_mut ( ) . enqueue_message ( onion_message) ;
860
860
log_trace ! ( self . logger, "Forwarding an onion message to peer {}" , next_node_id) ;
@@ -873,39 +873,39 @@ where
873
873
874
874
fn peer_connected ( & self , their_node_id : & PublicKey , init : & msgs:: Init , _inbound : bool ) -> Result < ( ) , ( ) > {
875
875
if init. features . supports_onion_messages ( ) {
876
- self . message_buffers . lock ( ) . unwrap ( )
876
+ self . message_recipients . lock ( ) . unwrap ( )
877
877
. entry ( * their_node_id)
878
- . or_insert_with ( || OnionMessageBuffer :: ConnectedPeer ( VecDeque :: new ( ) ) )
878
+ . or_insert_with ( || OnionMessageRecipient :: ConnectedPeer ( VecDeque :: new ( ) ) )
879
879
. mark_connected ( ) ;
880
880
} else {
881
- self . message_buffers . lock ( ) . unwrap ( ) . remove ( their_node_id) ;
881
+ self . message_recipients . lock ( ) . unwrap ( ) . remove ( their_node_id) ;
882
882
}
883
883
884
884
Ok ( ( ) )
885
885
}
886
886
887
887
fn peer_disconnected ( & self , their_node_id : & PublicKey ) {
888
- match self . message_buffers . lock ( ) . unwrap ( ) . remove ( their_node_id) {
889
- Some ( OnionMessageBuffer :: ConnectedPeer ( ..) ) => { } ,
888
+ match self . message_recipients . lock ( ) . unwrap ( ) . remove ( their_node_id) {
889
+ Some ( OnionMessageRecipient :: ConnectedPeer ( ..) ) => { } ,
890
890
_ => debug_assert ! ( false ) ,
891
891
}
892
892
}
893
893
894
894
fn timer_tick_occurred ( & self ) {
895
- let mut message_buffers = self . message_buffers . lock ( ) . unwrap ( ) ;
895
+ let mut message_recipients = self . message_recipients . lock ( ) . unwrap ( ) ;
896
896
897
897
// Drop any pending recipients since the last call to avoid retaining buffered messages for
898
898
// too long.
899
- message_buffers . retain ( |_, recipient| match recipient {
900
- OnionMessageBuffer :: PendingConnection ( _, None , ticks) => * ticks < MAX_TIMER_TICKS ,
901
- OnionMessageBuffer :: PendingConnection ( _, Some ( _) , _) => true ,
899
+ message_recipients . retain ( |_, recipient| match recipient {
900
+ OnionMessageRecipient :: PendingConnection ( _, None , ticks) => * ticks < MAX_TIMER_TICKS ,
901
+ OnionMessageRecipient :: PendingConnection ( _, Some ( _) , _) => true ,
902
902
_ => true ,
903
903
} ) ;
904
904
905
905
// Increment a timer tick for pending recipients so that their buffered messages are dropped
906
906
// at MAX_TIMER_TICKS.
907
- for recipient in message_buffers . values_mut ( ) {
908
- if let OnionMessageBuffer :: PendingConnection ( _, None , ticks) = recipient {
907
+ for recipient in message_recipients . values_mut ( ) {
908
+ if let OnionMessageRecipient :: PendingConnection ( _, None , ticks) = recipient {
909
909
* ticks += 1 ;
910
910
}
911
911
}
@@ -949,7 +949,7 @@ where
949
949
) ;
950
950
}
951
951
952
- self . message_buffers . lock ( ) . unwrap ( )
952
+ self . message_recipients . lock ( ) . unwrap ( )
953
953
. get_mut ( & peer_node_id)
954
954
. and_then ( |buffer| buffer. dequeue_message ( ) )
955
955
}
0 commit comments