@@ -18,6 +18,7 @@ use bitcoin::secp256k1::{self, PublicKey, Scalar, Secp256k1, SecretKey};
1818use crate :: blinded_path:: BlindedPath ;
1919use crate :: blinded_path:: message:: { advance_path_by_one, ForwardTlvs , ReceiveTlvs } ;
2020use crate :: blinded_path:: utils;
21+ use crate :: events:: { Event , EventHandler , EventsProvider } ;
2122use crate :: sign:: { EntropySource , KeysManager , NodeSigner , Recipient } ;
2223#[ cfg( not( c_bindings) ) ]
2324use crate :: ln:: channelmanager:: { SimpleArcChannelManager , SimpleRefChannelManager } ;
@@ -166,21 +167,21 @@ enum OnionMessageBuffer {
166167 ConnectedPeer ( VecDeque < OnionMessage > ) ,
167168
168169 /// Messages for a node that is not yet connected.
169- PendingConnection ( VecDeque < OnionMessage > ) ,
170+ PendingConnection ( VecDeque < OnionMessage > , Option < Vec < SocketAddress > > ) ,
170171}
171172
172173impl OnionMessageBuffer {
173174 fn pending_messages ( & self ) -> & VecDeque < OnionMessage > {
174175 match self {
175176 OnionMessageBuffer :: ConnectedPeer ( pending_messages) => pending_messages,
176- OnionMessageBuffer :: PendingConnection ( pending_messages) => pending_messages,
177+ OnionMessageBuffer :: PendingConnection ( pending_messages, _ ) => pending_messages,
177178 }
178179 }
179180
180181 fn enqueue_message ( & mut self , message : OnionMessage ) {
181182 let pending_messages = match self {
182183 OnionMessageBuffer :: ConnectedPeer ( pending_messages) => pending_messages,
183- OnionMessageBuffer :: PendingConnection ( pending_messages) => pending_messages,
184+ OnionMessageBuffer :: PendingConnection ( pending_messages, _ ) => pending_messages,
184185 } ;
185186
186187 pending_messages. push_back ( message) ;
@@ -189,7 +190,7 @@ impl OnionMessageBuffer {
189190 fn dequeue_message ( & mut self ) -> Option < OnionMessage > {
190191 let pending_messages = match self {
191192 OnionMessageBuffer :: ConnectedPeer ( pending_messages) => pending_messages,
192- OnionMessageBuffer :: PendingConnection ( pending_messages) => {
193+ OnionMessageBuffer :: PendingConnection ( pending_messages, _ ) => {
193194 debug_assert ! ( false ) ;
194195 pending_messages
195196 } ,
@@ -202,14 +203,14 @@ impl OnionMessageBuffer {
202203 fn release_pending_messages ( & mut self ) -> VecDeque < OnionMessage > {
203204 let pending_messages = match self {
204205 OnionMessageBuffer :: ConnectedPeer ( pending_messages) => pending_messages,
205- OnionMessageBuffer :: PendingConnection ( pending_messages) => pending_messages,
206+ OnionMessageBuffer :: PendingConnection ( pending_messages, _ ) => pending_messages,
206207 } ;
207208
208209 core:: mem:: take ( pending_messages)
209210 }
210211
211212 fn mark_connected ( & mut self ) {
212- if let OnionMessageBuffer :: PendingConnection ( pending_messages) = self {
213+ if let OnionMessageBuffer :: PendingConnection ( pending_messages, _ ) = self {
213214 let mut new_pending_messages = VecDeque :: new ( ) ;
214215 core:: mem:: swap ( pending_messages, & mut new_pending_messages) ;
215216 * self = OnionMessageBuffer :: ConnectedPeer ( new_pending_messages) ;
@@ -381,6 +382,8 @@ pub enum SendError {
381382 /// The provided [`Destination`] was an invalid [`BlindedPath`] due to not having any blinded
382383 /// hops.
383384 TooFewBlindedHops ,
385+ /// The first hop is not a peer and doesn't have a known [`SocketAddress`].
386+ InvalidFirstHop ( PublicKey ) ,
384387 /// A path from the sender to the destination could not be found by the [`MessageRouter`].
385388 PathNotFound ,
386389 /// Onion message contents must have a TLV type >= 64.
@@ -453,12 +456,12 @@ pub enum PeeledOnion<T: OnionMessageContents> {
453456pub fn create_onion_message < ES : Deref , NS : Deref , T : OnionMessageContents > (
454457 entropy_source : & ES , node_signer : & NS , secp_ctx : & Secp256k1 < secp256k1:: All > ,
455458 path : OnionMessagePath , contents : T , reply_path : Option < BlindedPath > ,
456- ) -> Result < ( PublicKey , OnionMessage ) , SendError >
459+ ) -> Result < ( PublicKey , OnionMessage , Option < Vec < SocketAddress > > ) , SendError >
457460where
458461 ES :: Target : EntropySource ,
459462 NS :: Target : NodeSigner ,
460463{
461- let OnionMessagePath { intermediate_nodes, mut destination, .. } = path;
464+ let OnionMessagePath { intermediate_nodes, mut destination, addresses } = path;
462465 if let Destination :: BlindedPath ( BlindedPath { ref blinded_hops, .. } ) = destination {
463466 if blinded_hops. is_empty ( ) {
464467 return Err ( SendError :: TooFewBlindedHops ) ;
@@ -499,10 +502,8 @@ where
499502 let onion_routing_packet = construct_onion_message_packet (
500503 packet_payloads, packet_keys, prng_seed) . map_err ( |( ) | SendError :: TooBigPacket ) ?;
501504
502- Ok ( ( first_node_id, OnionMessage {
503- blinding_point,
504- onion_routing_packet
505- } ) )
505+ let message = OnionMessage { blinding_point, onion_routing_packet } ;
506+ Ok ( ( first_node_id, message, addresses) )
506507}
507508
508509/// Decode one layer of an incoming [`OnionMessage`].
@@ -696,7 +697,7 @@ where
696697 ) -> Result < SendSuccess , SendError > {
697698 log_trace ! ( self . logger, "Constructing onion message {}: {:?}" , log_suffix, contents) ;
698699
699- let ( first_node_id, onion_message) = create_onion_message (
700+ let ( first_node_id, onion_message, addresses ) = create_onion_message (
700701 & self . entropy_source , & self . node_signer , & self . secp_ctx , path, contents, reply_path
701702 ) ?;
702703
@@ -706,10 +707,14 @@ where
706707 }
707708
708709 match message_buffers. entry ( first_node_id) {
709- hash_map:: Entry :: Vacant ( e) => {
710- e. insert ( OnionMessageBuffer :: PendingConnection ( VecDeque :: new ( ) ) )
711- . enqueue_message ( onion_message) ;
712- Ok ( SendSuccess :: BufferedAwaitingConnection ( first_node_id) )
710+ hash_map:: Entry :: Vacant ( e) => match addresses {
711+ None => Err ( SendError :: InvalidFirstHop ( first_node_id) ) ,
712+ Some ( addresses) => {
713+ e. insert (
714+ OnionMessageBuffer :: PendingConnection ( VecDeque :: new ( ) , Some ( addresses) )
715+ ) . enqueue_message ( onion_message) ;
716+ Ok ( SendSuccess :: BufferedAwaitingConnection ( first_node_id) )
717+ } ,
713718 } ,
714719 hash_map:: Entry :: Occupied ( mut e) => {
715720 e. get_mut ( ) . enqueue_message ( onion_message) ;
@@ -778,6 +783,27 @@ fn outbound_buffer_full(peer_node_id: &PublicKey, buffer: &HashMap<PublicKey, On
778783 false
779784}
780785
786+ impl < ES : Deref , NS : Deref , L : Deref , MR : Deref , OMH : Deref , CMH : Deref > EventsProvider
787+ for OnionMessenger < ES , NS , L , MR , OMH , CMH >
788+ where
789+ ES :: Target : EntropySource ,
790+ NS :: Target : NodeSigner ,
791+ L :: Target : Logger ,
792+ MR :: Target : MessageRouter ,
793+ OMH :: Target : OffersMessageHandler ,
794+ CMH :: Target : CustomOnionMessageHandler ,
795+ {
796+ fn process_pending_events < H : Deref > ( & self , handler : H ) where H :: Target : EventHandler {
797+ for ( node_id, recipient) in self . message_buffers . lock ( ) . unwrap ( ) . iter_mut ( ) {
798+ if let OnionMessageBuffer :: PendingConnection ( _, addresses) = recipient {
799+ if let Some ( addresses) = addresses. take ( ) {
800+ handler. handle_event ( Event :: ConnectionNeeded { node_id : * node_id, addresses } ) ;
801+ }
802+ }
803+ }
804+ }
805+ }
806+
781807impl < ES : Deref , NS : Deref , L : Deref , MR : Deref , OMH : Deref , CMH : Deref > OnionMessageHandler
782808for OnionMessenger < ES , NS , L , MR , OMH , CMH >
783809where
0 commit comments