@@ -5,7 +5,7 @@ import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
55import {
66 RELAY_V2_HOP_CODEC
77} from '../constants.js'
8- import type { ComponentLogger , Logger , Peer , PeerId , PeerStore , Startable , TopologyFilter } from '@libp2p/interface'
8+ import type { ComponentLogger , Libp2pEvents , Logger , Peer , PeerId , PeerInfo , PeerStore , Startable , TopologyFilter , TypedEventTarget } from '@libp2p/interface'
99import type { ConnectionManager , RandomWalk , Registrar , TransportManager } from '@libp2p/interface-internal'
1010
1111export interface RelayDiscoveryEvents {
@@ -19,6 +19,7 @@ export interface RelayDiscoveryComponents {
1919 registrar : Registrar
2020 logger : ComponentLogger
2121 randomWalk : RandomWalk
22+ events : TypedEventTarget < Libp2pEvents >
2223}
2324
2425export interface RelayDiscoveryInit {
@@ -30,10 +31,7 @@ export interface RelayDiscoveryInit {
3031 * peers that support the circuit v2 HOP protocol.
3132 */
3233export class RelayDiscovery extends TypedEventEmitter < RelayDiscoveryEvents > implements Startable {
33- private readonly peerStore : PeerStore
34- private readonly registrar : Registrar
35- private readonly connectionManager : ConnectionManager
36- private readonly randomWalk : RandomWalk
34+ private readonly components : RelayDiscoveryComponents
3735 private started : boolean
3836 private running : boolean
3937 private topologyId ?: string
@@ -46,15 +44,14 @@ export class RelayDiscovery extends TypedEventEmitter<RelayDiscoveryEvents> impl
4644 super ( )
4745
4846 this . log = components . logger . forComponent ( 'libp2p:circuit-relay:discover-relays' )
47+ this . components = components
4948 this . started = false
5049 this . running = false
51- this . peerStore = components . peerStore
52- this . registrar = components . registrar
53- this . connectionManager = components . connectionManager
54- this . randomWalk = components . randomWalk
5550 this . filter = init . filter
5651 this . discoveryController = new AbortController ( )
5752 setMaxListeners ( Infinity , this . discoveryController . signal )
53+ this . dialPeer = this . dialPeer . bind ( this )
54+ this . onPeer = this . onPeer . bind ( this )
5855 }
5956
6057 isStarted ( ) : boolean {
@@ -64,7 +61,7 @@ export class RelayDiscovery extends TypedEventEmitter<RelayDiscoveryEvents> impl
6461 async start ( ) : Promise < void > {
6562 // register a topology listener for when new peers are encountered
6663 // that support the hop protocol
67- this . topologyId = await this . registrar . register ( RELAY_V2_HOP_CODEC , {
64+ this . topologyId = await this . components . registrar . register ( RELAY_V2_HOP_CODEC , {
6865 filter : this . filter ,
6966 onConnect : ( peerId ) => {
7067 this . log . trace ( 'discovered relay %p queue (length: %d, active %d)' , peerId , this . queue ?. size , this . queue ?. running )
@@ -77,10 +74,13 @@ export class RelayDiscovery extends TypedEventEmitter<RelayDiscoveryEvents> impl
7774
7875 stop ( ) : void {
7976 if ( this . topologyId != null ) {
80- this . registrar . unregister ( this . topologyId )
77+ this . components . registrar . unregister ( this . topologyId )
78+ }
79+
80+ if ( this . running ) {
81+ this . stopDiscovery ( )
8182 }
8283
83- this . discoveryController ?. abort ( )
8484 this . started = false
8585 }
8686
@@ -90,7 +90,8 @@ export class RelayDiscovery extends TypedEventEmitter<RelayDiscoveryEvents> impl
9090 *
9191 * 1. Check the metadata store for known relays, try to listen on the ones we are already connected to
9292 * 2. Dial and try to listen on the peers we know that support hop but are not connected
93- * 3. Search the network
93+ * 3. Search the network - this requires a peer routing implementation to be configured but will fail gracefully
94+ * 4. Dial any peers discovered - this covers when no peer routing implementation has been configured but some peer discovery mechanism is also present
9495 */
9596 startDiscovery ( ) : void {
9697 if ( this . running ) {
@@ -102,11 +103,14 @@ export class RelayDiscovery extends TypedEventEmitter<RelayDiscoveryEvents> impl
102103 this . discoveryController = new AbortController ( )
103104 setMaxListeners ( Infinity , this . discoveryController . signal )
104105
106+ // dial any peer we discover
107+ this . components . events . addEventListener ( 'peer:discovery' , this . onPeer )
108+
105109 Promise . resolve ( )
106110 . then ( async ( ) => {
107111 this . log ( 'searching peer store for relays' )
108112
109- const peers = ( await this . peerStore . all ( {
113+ const peers = ( await this . components . peerStore . all ( {
110114 filters : [
111115 // filter by a list of peers supporting RELAY_V2_HOP and ones we are not listening on
112116 ( peer ) => {
@@ -149,7 +153,7 @@ export class RelayDiscovery extends TypedEventEmitter<RelayDiscoveryEvents> impl
149153
150154 this . log ( 'start random walk' )
151155
152- for await ( const peer of this . randomWalk . walk ( { signal : this . discoveryController . signal } ) ) {
156+ for await ( const peer of this . components . randomWalk . walk ( { signal : this . discoveryController . signal } ) ) {
153157 this . log . trace ( 'found random peer %p' , peer . id )
154158
155159 if ( queue . has ( peer . id ) ) {
@@ -159,14 +163,14 @@ export class RelayDiscovery extends TypedEventEmitter<RelayDiscoveryEvents> impl
159163 continue
160164 }
161165
162- if ( this . connectionManager . getConnections ( peer . id ) ?. length > 0 ) {
166+ if ( this . components . connectionManager . getConnections ( peer . id ) ?. length > 0 ) {
163167 this . log . trace ( 'random peer %p was already connected' , peer . id )
164168
165169 // skip peers we are already connected to
166170 continue
167171 }
168172
169- if ( ! ( await this . connectionManager . isDialable ( peer . multiaddrs ) ) ) {
173+ if ( ! ( await this . components . connectionManager . isDialable ( peer . multiaddrs ) ) ) {
170174 this . log . trace ( 'random peer %p was not dialable' , peer . id , peer . multiaddrs . map ( ma => ma . toString ( ) ) )
171175
172176 // skip peers we can't dial
@@ -186,16 +190,7 @@ export class RelayDiscovery extends TypedEventEmitter<RelayDiscoveryEvents> impl
186190
187191 // dial the peer - this will cause identify to run and our topology to
188192 // be notified and we'll attempt to create reservations
189- queue . add ( async ( ) => {
190- const signal = anySignal ( [ this . discoveryController . signal , AbortSignal . timeout ( 5000 ) ] )
191- setMaxListeners ( Infinity , signal )
192-
193- try {
194- await this . connectionManager . openConnection ( peer . id , { signal } )
195- } finally {
196- signal . clear ( )
197- }
198- } , {
193+ queue . add ( this . dialPeer , {
199194 peerId : peer . id ,
200195 signal : this . discoveryController . signal
201196 } )
@@ -219,6 +214,70 @@ export class RelayDiscovery extends TypedEventEmitter<RelayDiscoveryEvents> impl
219214 this . log ( 'stop discovery' )
220215 this . running = false
221216 this . discoveryController ?. abort ( )
217+ this . queue ?. clear ( )
218+
219+ // stop dialing any peer we discover
220+ this . components . events . removeEventListener ( 'peer:discovery' , this . onPeer )
221+ }
222+
223+ onPeer ( evt : CustomEvent < PeerInfo > ) : void {
224+ this . log . trace ( 'maybe dialing discovered peer %p - %e' , evt . detail . id )
225+
226+ this . maybeDialPeer ( evt )
227+ . catch ( err => {
228+ this . log . trace ( 'error dialing discovered peer %p - %e' , evt . detail . id , err )
229+ } )
230+ }
231+
232+ async maybeDialPeer ( evt : CustomEvent < PeerInfo > ) : Promise < void > {
233+ if ( this . queue == null ) {
234+ return
235+ }
236+
237+ const peerId = evt . detail . id
238+ const multiaddrs = evt . detail . multiaddrs
239+
240+ if ( this . queue . has ( peerId ) ) {
241+ this . log . trace ( 'random peer %p was already in queue' , peerId )
242+
243+ // skip peers already in the queue
244+ return
245+ }
246+
247+ if ( this . components . connectionManager . getConnections ( peerId ) ?. length > 0 ) {
248+ this . log . trace ( 'random peer %p was already connected' , peerId )
249+
250+ // skip peers we are already connected to
251+ return
252+ }
253+
254+ if ( ! ( await this . components . connectionManager . isDialable ( multiaddrs ) ) ) {
255+ this . log . trace ( 'random peer %p was not dialable' , peerId )
256+
257+ // skip peers we can't dial
258+ return
259+ }
260+
261+ this . queue ?. add ( this . dialPeer , {
262+ peerId : evt . detail . id ,
263+ signal : this . discoveryController . signal
264+ } )
265+ . catch ( err => {
266+ this . log . error ( 'error opening connection to discovered peer %p' , evt . detail . id , err )
267+ } )
268+ }
269+
270+ async dialPeer ( { peerId, signal } : { peerId : PeerId , signal ?: AbortSignal } ) : Promise < void > {
271+ const combinedSignal = anySignal ( [ AbortSignal . timeout ( 5_000 ) , signal ] )
272+ setMaxListeners ( Infinity , combinedSignal )
273+
274+ try {
275+ await this . components . connectionManager . openConnection ( peerId , {
276+ signal : combinedSignal
277+ } )
278+ } finally {
279+ combinedSignal . clear ( )
280+ }
222281 }
223282}
224283
0 commit comments