5353
5454use arti_client:: { TorClient , TorClientBuilder } ;
5555use futures:: future:: BoxFuture ;
56- use futures:: stream:: BoxStream ;
57- use libp2p:: multiaddr:: Protocol ;
5856use libp2p:: {
5957 core:: transport:: { ListenerId , TransportEvent } ,
6058 Multiaddr , Transport , TransportError ,
6159} ;
62- use std:: collections:: HashMap ;
63- use std:: pin:: Pin ;
64- use std:: str:: FromStr ;
65- use std:: sync:: Arc ;
6660use std:: task:: { Context , Poll } ;
61+ use std:: { collections:: HashSet , pin:: Pin } ;
62+ use std:: { collections:: VecDeque , sync:: Arc } ;
6763use thiserror:: Error ;
68- use tor_hsservice:: handle_rend_requests;
69- use tor_hsservice:: status:: OnionServiceStatus ;
70- use tor_hsservice:: StreamRequest ;
7164use tor_rtcompat:: tokio:: TokioRustlsRuntime ;
7265
7366// We only need these imports if the `listen-onion-service` feature is enabled
7467#[ cfg( feature = "listen-onion-service" ) ]
68+ use std:: collections:: HashMap ;
69+ #[ cfg( feature = "listen-onion-service" ) ]
70+ use std:: str:: FromStr ;
71+ #[ cfg( feature = "listen-onion-service" ) ]
7572use tor_cell:: relaycell:: msg:: { Connected , End , EndReason } ;
7673#[ cfg( feature = "listen-onion-service" ) ]
77- use tor_hsservice:: { HsId , OnionServiceConfig , RunningOnionService } ;
74+ use tor_hsservice:: {
75+ handle_rend_requests, status:: OnionServiceStatus , HsId , OnionServiceConfig ,
76+ RunningOnionService , StreamRequest ,
77+ } ;
7878#[ cfg( feature = "listen-onion-service" ) ]
7979use tor_proto:: stream:: IncomingStreamRequest ;
8080
@@ -88,9 +88,9 @@ pub type TorError = arti_client::Error;
8888
8989type PendingUpgrade = BoxFuture < ' static , Result < TokioTorStream , TorTransportError > > ;
9090#[ cfg( feature = "listen-onion-service" ) ]
91- type OnionServiceStream = BoxStream < ' static , StreamRequest > ;
91+ type OnionServiceStream = futures :: stream :: BoxStream < ' static , StreamRequest > ;
9292#[ cfg( feature = "listen-onion-service" ) ]
93- type OnionServiceStatusStream = BoxStream < ' static , OnionServiceStatus > ;
93+ type OnionServiceStatusStream = futures :: stream :: BoxStream < ' static , OnionServiceStatus > ;
9494
9595/// Struct representing an onion address we are listening on for libp2p connections.
9696#[ cfg( feature = "listen-onion-service" ) ]
@@ -107,6 +107,8 @@ struct TorListener {
107107 port : u16 ,
108108 /// The onion address we are listening on
109109 onion_address : Multiaddr ,
110+ /// Whether we have already announced this address
111+ announced : bool ,
110112}
111113
112114/// Mode of address conversion.
@@ -176,8 +178,8 @@ impl TorTransport {
176178 conversion_mode : AddressConversion ,
177179 ) -> Self {
178180 Self {
179- client,
180181 conversion_mode,
182+ client,
181183 #[ cfg( feature = "listen-onion-service" ) ]
182184 listeners : HashMap :: new ( ) ,
183185 #[ cfg( feature = "listen-onion-service" ) ]
@@ -206,6 +208,9 @@ impl TorTransport {
206208 /// # Returns
207209 /// Returns the Multiaddr of the onion address that the transport can be instructed to listen on
208210 /// To actually listen on the address, you need to call [`listen_on`] with the returned address
211+ ///
212+ /// # Errors
213+ /// Returns an error if we cannot get the onion address of the service
209214 #[ cfg( feature = "listen-onion-service" ) ]
210215 pub fn add_onion_service (
211216 & mut self ,
@@ -217,7 +222,7 @@ impl TorTransport {
217222
218223 let multiaddr = service
219224 . onion_name ( )
220- . ok_or_else ( || anyhow:: anyhow!( "Onion service has no nickname " ) ) ?
225+ . ok_or_else ( || anyhow:: anyhow!( "Onion service has no onion address " ) ) ?
221226 . to_multiaddr ( port) ;
222227
223228 self . services . push ( ( service, request_stream) ) ;
@@ -251,63 +256,50 @@ trait HsIdExt {
251256
252257#[ cfg( feature = "listen-onion-service" ) ]
253258impl HsIdExt for HsId {
254- /// Convert an HsId to a Multiaddr
259+ /// Convert an ` HsId` to a ` Multiaddr`
255260 fn to_multiaddr ( & self , port : u16 ) -> Multiaddr {
256261 let onion_domain = self . to_string ( ) ;
257262 let onion_without_dot_onion = onion_domain
258- . split ( "." )
263+ . split ( '.' )
259264 . nth ( 0 )
260265 . expect ( "Display formatting of HsId to contain .onion suffix" ) ;
261- let multiaddress_string = format ! ( "/onion3/{}:{}" , onion_without_dot_onion , port ) ;
266+ let multiaddress_string = format ! ( "/onion3/{onion_without_dot_onion }:{port}" ) ;
262267
263268 Multiaddr :: from_str ( & multiaddress_string)
264269 . expect ( "A valid onion address to be convertible to a Multiaddr" )
265270 }
266271}
267272
268- trait StatusExt {
269- fn is_reachable ( & self ) -> bool ;
270- fn is_broken ( & self ) -> bool ;
271- }
272-
273- impl StatusExt for OnionServiceStatus {
274- /// Returns true if the onion service is reachable
275- fn is_reachable ( & self ) -> bool {
276- match self . state ( ) {
277- tor_hsservice:: status:: State :: Running => true ,
278- tor_hsservice:: status:: State :: DegradedReachable => true ,
279- _ => false ,
280- }
281- }
282-
283- fn is_broken ( & self ) -> bool {
284- matches ! ( self . state( ) , tor_hsservice:: status:: State :: Broken )
285- }
286- }
287-
288273impl Transport for TorTransport {
289274 type Output = TokioTorStream ;
290275 type Error = TorTransportError ;
291276 type Dial = BoxFuture < ' static , Result < Self :: Output , Self :: Error > > ;
292277 type ListenerUpgrade = PendingUpgrade ;
293278
279+ #[ cfg( not( feature = "listen-onion-service" ) ) ]
294280 fn listen_on (
295281 & mut self ,
296- id : ListenerId ,
282+ _id : ListenerId ,
297283 onion_address : Multiaddr ,
298284 ) -> Result < ( ) , TransportError < Self :: Error > > {
299- // If the `listen-onion-service` feature is not enabled, immediately return an error
300- # [ cfg ( not ( feature = "listen-onion-service" ) ) ]
301- return Err ( TransportError :: MultiaddrNotSupported ( onion_address . clone ( ) ) ) ;
285+ // If the `listen-onion-service` feature is not enabled, we do not support listening
286+ Err ( TransportError :: MultiaddrNotSupported ( onion_address . clone ( ) ) )
287+ }
302288
289+ #[ cfg( feature = "listen-onion-service" ) ]
290+ fn listen_on (
291+ & mut self ,
292+ id : ListenerId ,
293+ onion_address : Multiaddr ,
294+ ) -> Result < ( ) , TransportError < Self :: Error > > {
303295 // If the address is not an onion3 address, return an error
304- let Some ( Protocol :: Onion3 ( address) ) = onion_address. into_iter ( ) . nth ( 0 ) else {
296+ let Some ( libp2p:: multiaddr:: Protocol :: Onion3 ( address) ) = onion_address. into_iter ( ) . nth ( 0 )
297+ else {
305298 return Err ( TransportError :: MultiaddrNotSupported ( onion_address. clone ( ) ) ) ;
306299 } ;
307300
308301 // Find the running onion service that matches the requested address
309302 // If we find it, remove it from [`services`] and insert it into [`listeners`]
310-
311303 let position = self
312304 . services
313305 . iter ( )
@@ -330,17 +322,21 @@ impl Transport for TorTransport {
330322 onion_address : onion_address. clone ( ) ,
331323 port : address. port ( ) ,
332324 status_stream,
325+ announced : false ,
333326 } ,
334327 ) ;
335328
336- return Ok ( ( ) ) ;
329+ Ok ( ( ) )
337330 }
338331
339- fn remove_listener ( & mut self , id : ListenerId ) -> bool {
340- // If the `listen-onion-service` feature is not enabled, we do not support listening
341- #[ cfg( not( feature = "listen-onion-service" ) ) ]
342- return false ;
332+ // We do not support removing listeners if the `listen-onion-service` feature is not enabled
333+ #[ cfg( not( feature = "listen-onion-service" ) ) ]
334+ fn remove_listener ( & mut self , _id : ListenerId ) -> bool {
335+ false
336+ }
343337
338+ #[ cfg( feature = "listen-onion-service" ) ]
339+ fn remove_listener ( & mut self , id : ListenerId ) -> bool {
344340 // Take the listener out of the map. This will stop listening on onion service for libp2p connections (we will not poll it anymore)
345341 // However, we will not stop the onion service itself because we might want to reuse it later
346342 // The onion service will be stopped when the transport is dropped
@@ -383,37 +379,45 @@ impl Transport for TorTransport {
383379 None
384380 }
385381
382+ #[ cfg( not( feature = "listen-onion-service" ) ) ]
386383 fn poll (
387- mut self : Pin < & mut Self > ,
388- cx : & mut Context < ' _ > ,
384+ self : Pin < & mut Self > ,
385+ _cx : & mut Context < ' _ > ,
389386 ) -> Poll < TransportEvent < Self :: ListenerUpgrade , Self :: Error > > {
390387 // If the `listen-onion-service` feature is not enabled, we do not support listening
391- # [ cfg ( not ( feature = "listen-onion-service" ) ) ]
392- return Poll :: Pending ;
388+ Poll :: Pending
389+ }
393390
394- for ( listener_id, listener) in self . listeners . iter_mut ( ) {
391+ #[ cfg( feature = "listen-onion-service" ) ]
392+ fn poll (
393+ mut self : Pin < & mut Self > ,
394+ cx : & mut Context < ' _ > ,
395+ ) -> Poll < TransportEvent < Self :: ListenerUpgrade , Self :: Error > > {
396+ for ( listener_id, listener) in & mut self . listeners {
395397 // Check if the service has any new statuses
396398 if let Poll :: Ready ( Some ( status) ) = listener. status_stream . as_mut ( ) . poll_next ( cx) {
397399 tracing:: debug!(
398400 status = ?status. state( ) ,
399401 address = listener. onion_address. to_string( ) ,
400402 "Onion service status changed"
401403 ) ;
404+ }
402405
403- if status. is_reachable ( ) {
404- // TODO: We might report the address here multiple time to the swarm. Is this a problem?
405- return Poll :: Ready ( TransportEvent :: NewAddress {
406- listener_id : * listener_id,
407- listen_addr : listener. onion_address . clone ( ) ,
408- } ) ;
409- }
410-
411- if status. is_broken ( ) {
412- return Poll :: Ready ( TransportEvent :: ListenerError {
413- listener_id : * listener_id,
414- error : TorTransportError :: Broken ,
415- } ) ;
416- }
406+ // Check if we have already announced this address, if not, do it now
407+ if !listener. announced {
408+ listener. announced = true ;
409+
410+ // We announce the address here to the swarm even though we technically cannot guarantee
411+ // that the address is reachable yet from the outside. We might not have registered the
412+ // onion service fully yet (introduction points, hsdir, ...)
413+ //
414+ // However, we need to announce it now because otherwise libp2p might not poll the listener
415+ // again and we will not be able to announce it later.
416+ // TODO: Find out why this is the case, if this is intended behaviour or a bug
417+ return Poll :: Ready ( TransportEvent :: NewAddress {
418+ listener_id : * listener_id,
419+ listen_addr : listener. onion_address . clone ( ) ,
420+ } ) ;
417421 }
418422
419423 match listener. request_stream . as_mut ( ) . poll_next ( cx) {
@@ -445,7 +449,8 @@ impl Transport for TorTransport {
445449 } ) ;
446450 }
447451
448- // The stream has ended. Most likely because the service was shut down
452+ // The stream has ended
453+ // This means that the onion service was shut down, and we will not receive any more connections on it
449454 Poll :: Ready ( None ) => {
450455 return Poll :: Ready ( TransportEvent :: ListenerClosed {
451456 listener_id : * listener_id,
0 commit comments