Skip to content

Commit be9b6a0

Browse files
authored
fix: check for new addresses during dialing (#3003)
Fixes a race condition where two independent dials occur for the same peer id with different addresses (e.g. one IPv4 and one IPv6) - if the IPv6 is the current dial, the IPv4 dial joins the IPv6 one which can then fail if the current network doesn't support IPv6. Instead check the list of addresses to dial after each dial attempt to see if new addresses have been added.
1 parent e2f4943 commit be9b6a0

File tree

2 files changed

+163
-76
lines changed

2 files changed

+163
-76
lines changed

packages/libp2p/src/connection-manager/dial-queue.ts

Lines changed: 119 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ export class DialQueue {
203203

204204
options.onProgress?.(new CustomProgressEvent('dial-queue:add-to-dial-queue'))
205205
return this.queue.add(async (options) => {
206-
options?.onProgress?.(new CustomProgressEvent('dial-queue:start-dial'))
206+
options.onProgress?.(new CustomProgressEvent('dial-queue:start-dial'))
207207
// create abort conditions - need to do this before `calculateMultiaddrs` as
208208
// we may be about to resolve a dns addr which can time out
209209
const signal = anySignal([
@@ -212,103 +212,145 @@ export class DialQueue {
212212
])
213213
setMaxListeners(Infinity, signal)
214214

215-
let addrsToDial: Address[]
216-
217215
try {
218-
// load addresses from address book, resolve and dnsaddrs, filter
219-
// undialables, add peer IDs, etc
220-
addrsToDial = await this.calculateMultiaddrs(peerId, options?.multiaddrs, {
221-
...options,
222-
signal
223-
})
216+
return await this.dialPeer(options, signal)
217+
} finally {
218+
// clean up abort signals/controllers
219+
signal.clear()
220+
}
221+
}, {
222+
peerId,
223+
priority: options.priority ?? DEFAULT_DIAL_PRIORITY,
224+
multiaddrs: new Set(multiaddrs.map(ma => ma.toString())),
225+
signal: options.signal ?? AbortSignal.timeout(this.dialTimeout),
226+
onProgress: options.onProgress
227+
})
228+
}
224229

225-
options?.onProgress?.(new CustomProgressEvent<Address[]>('dial-queue:calculated-addresses', addrsToDial))
230+
private async dialPeer (options: DialQueueJobOptions, signal: AbortSignal): Promise<Connection> {
231+
const peerId = options.peerId
232+
const multiaddrs = options.multiaddrs
233+
const failedMultiaddrs = new Set<string>()
226234

227-
addrsToDial.map(({ multiaddr }) => multiaddr.toString()).forEach(addr => {
228-
options?.multiaddrs.add(addr)
229-
})
230-
} catch (err) {
231-
signal.clear()
232-
throw err
235+
// if we have no multiaddrs, only a peer id, set a flag so we will look the
236+
// peer up in the peer routing to obtain multiaddrs
237+
let forcePeerLookup = options.multiaddrs.size === 0
238+
239+
let dialed = 0
240+
let dialIteration = 0
241+
const errors: Error[] = []
242+
243+
this.log('starting dial to %p', peerId)
244+
245+
// repeat this operation in case addresses are added to the dial while we
246+
// resolve multiaddrs, etc
247+
while (forcePeerLookup || multiaddrs.size > 0) {
248+
dialIteration++
249+
250+
// only perform peer lookup once
251+
forcePeerLookup = false
252+
253+
// the addresses we will dial
254+
const addrsToDial: Address[] = []
255+
256+
// copy the addresses into a new set
257+
const addrs = new Set(options.multiaddrs)
258+
259+
// empty the old set - subsequent dial attempts for the same peer id may
260+
// add more addresses to try
261+
multiaddrs.clear()
262+
263+
this.log('calculating addrs to dial %p from %s', peerId, [...addrs])
264+
265+
// load addresses from address book, resolve and dnsaddrs, filter
266+
// undialables, add peer IDs, etc
267+
const calculatedAddrs = await this.calculateMultiaddrs(peerId, addrs, {
268+
...options,
269+
signal
270+
})
271+
272+
for (const addr of calculatedAddrs) {
273+
// skip any addresses we have previously failed to dial
274+
if (failedMultiaddrs.has(addr.multiaddr.toString())) {
275+
this.log.trace('skipping previously failed multiaddr %a while dialing %p', addr.multiaddr, peerId)
276+
continue
277+
}
278+
279+
addrsToDial.push(addr)
233280
}
234281

235-
try {
236-
let dialed = 0
237-
const errors: Error[] = []
282+
this.log('%s dial to %p with %s', dialIteration === 1 ? 'starting' : 'continuing', peerId, addrsToDial.map(ma => ma.multiaddr.toString()))
238283

239-
for (const address of addrsToDial) {
240-
if (dialed === this.maxPeerAddrsToDial) {
241-
this.log('dialed maxPeerAddrsToDial (%d) addresses for %p, not trying any others', dialed, peerId)
284+
options?.onProgress?.(new CustomProgressEvent<Address[]>('dial-queue:calculated-addresses', addrsToDial))
242285

243-
throw new DialError('Peer had more than maxPeerAddrsToDial')
244-
}
286+
for (const address of addrsToDial) {
287+
if (dialed === this.maxPeerAddrsToDial) {
288+
this.log('dialed maxPeerAddrsToDial (%d) addresses for %p, not trying any others', dialed, options.peerId)
245289

246-
dialed++
290+
throw new DialError('Peer had more than maxPeerAddrsToDial')
291+
}
247292

293+
dialed++
294+
295+
try {
296+
// try to dial the address
297+
const conn = await this.components.transportManager.dial(address.multiaddr, {
298+
...options,
299+
signal
300+
})
301+
302+
this.log('dial to %a succeeded', address.multiaddr)
303+
304+
// record the successful dial and the address
248305
try {
249-
const conn = await this.components.transportManager.dial(address.multiaddr, {
250-
...options,
251-
signal
306+
await this.components.peerStore.merge(conn.remotePeer, {
307+
multiaddrs: [
308+
conn.remoteAddr
309+
],
310+
metadata: {
311+
[LAST_DIAL_SUCCESS_KEY]: uint8ArrayFromString(Date.now().toString())
312+
}
252313
})
314+
} catch (err: any) {
315+
this.log.error('could not update last dial failure key for %p', peerId, err)
316+
}
317+
318+
// dial successful, return the connection
319+
return conn
320+
} catch (err: any) {
321+
this.log.error('dial failed to %a', address.multiaddr, err)
253322

254-
this.log('dial to %a succeeded', address.multiaddr)
323+
// ensure we don't dial it again in this attempt
324+
failedMultiaddrs.add(address.multiaddr.toString())
255325

256-
// record the successful dial and the address
326+
if (peerId != null) {
327+
// record the failed dial
257328
try {
258-
await this.components.peerStore.merge(conn.remotePeer, {
259-
multiaddrs: [
260-
conn.remoteAddr
261-
],
329+
await this.components.peerStore.merge(peerId, {
262330
metadata: {
263-
[LAST_DIAL_SUCCESS_KEY]: uint8ArrayFromString(Date.now().toString())
331+
[LAST_DIAL_FAILURE_KEY]: uint8ArrayFromString(Date.now().toString())
264332
}
265333
})
266334
} catch (err: any) {
267335
this.log.error('could not update last dial failure key for %p', peerId, err)
268336
}
337+
}
269338

270-
return conn
271-
} catch (err: any) {
272-
this.log.error('dial failed to %a', address.multiaddr, err)
273-
274-
if (peerId != null) {
275-
// record the failed dial
276-
try {
277-
await this.components.peerStore.merge(peerId, {
278-
metadata: {
279-
[LAST_DIAL_FAILURE_KEY]: uint8ArrayFromString(Date.now().toString())
280-
}
281-
})
282-
} catch (err: any) {
283-
this.log.error('could not update last dial failure key for %p', peerId, err)
284-
}
285-
}
286-
287-
// the user/dial timeout/shutdown controller signal aborted
288-
if (signal.aborted) {
289-
throw new TimeoutError(err.message)
290-
}
291-
292-
errors.push(err)
339+
// the user/dial timeout/shutdown controller signal aborted
340+
if (signal.aborted) {
341+
throw new TimeoutError(err.message)
293342
}
294-
}
295343

296-
if (errors.length === 1) {
297-
throw errors[0]
344+
errors.push(err)
298345
}
299-
300-
throw new AggregateError(errors, 'All multiaddr dials failed')
301-
} finally {
302-
// clean up abort signals/controllers
303-
signal.clear()
304346
}
305-
}, {
306-
peerId,
307-
priority: options.priority ?? DEFAULT_DIAL_PRIORITY,
308-
multiaddrs: new Set(multiaddrs.map(ma => ma.toString())),
309-
signal: options.signal ?? AbortSignal.timeout(this.dialTimeout),
310-
onProgress: options.onProgress
311-
})
347+
}
348+
349+
if (errors.length === 1) {
350+
throw errors[0]
351+
}
352+
353+
throw new AggregateError(errors, 'All multiaddr dials failed')
312354
}
313355

314356
// eslint-disable-next-line complexity
@@ -358,8 +400,10 @@ export class DialQueue {
358400
isCertified: false
359401
})))
360402
} catch (err: any) {
361-
if (err.name !== 'NoPeerRoutersError') {
362-
this.log.error('looking up multiaddrs for %p in the peer routing failed', peerId, err)
403+
if (err.name === 'NoPeerRoutersError') {
404+
this.log('no peer routers configured', peerId)
405+
} else {
406+
this.log.error('looking up multiaddrs for %p in the peer routing failed - %e', peerId, err)
363407
}
364408
}
365409
}

packages/libp2p/test/connection-manager/dial-queue.spec.ts

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import { NotFoundError } from '@libp2p/interface'
55
import { peerLogger } from '@libp2p/logger'
66
import { peerIdFromPrivateKey } from '@libp2p/peer-id'
77
import { multiaddr, resolvers } from '@multiformats/multiaddr'
8-
import { WebRTC } from '@multiformats/multiaddr-matcher'
8+
import { TCP, WebRTC } from '@multiformats/multiaddr-matcher'
99
import { expect } from 'aegir/chai'
1010
import delay from 'delay'
1111
import pDefer from 'p-defer'
@@ -377,4 +377,47 @@ describe('dial queue', () => {
377377
expect(all[1].status).to.equal('fulfilled', 'did not respect user dial timeout')
378378
expect(components.transportManager.dial.callCount).to.equal(1, 'should have coalesced multiple dials to same dial')
379379
})
380+
381+
it('should continue dial when new addresses are discovered', async () => {
382+
const remotePeer = peerIdFromPrivateKey(await generateKeyPair('Ed25519'))
383+
const ma1 = multiaddr(`/ip6/2001:db8:1:2:3:4:5:6/tcp/123/p2p/${remotePeer}`)
384+
const ma2 = multiaddr(`/ip4/123.123.123.123/tcp/123/p2p/${remotePeer}`)
385+
386+
components.transportManager.dialTransportForMultiaddr.callsFake(ma => {
387+
if (TCP.exactMatch(ma)) {
388+
return stubInterface<Transport>()
389+
}
390+
})
391+
392+
const connection = stubInterface<Connection>({
393+
remotePeer
394+
})
395+
396+
components.transportManager.dial.callsFake(async (ma, opts = {}) => {
397+
if (ma.equals(ma2)) {
398+
await delay(100)
399+
return connection
400+
}
401+
402+
// second dial should take place while this dial is in progress but has
403+
// not yet failed
404+
await delay(500)
405+
throw new Error('Could not dial address')
406+
})
407+
408+
dialer = new DialQueue(components)
409+
410+
// dial peer with address that fails
411+
const dial1 = dialer.dial(ma1)
412+
413+
// let dial begin
414+
await delay(50)
415+
416+
// dial same peer again with address that succeeds
417+
const dial2 = dialer.dial(ma2)
418+
419+
// both dials should coalesce to the same connection
420+
await expect(dial1).to.eventually.equal(connection)
421+
await expect(dial2).to.eventually.equal(connection)
422+
})
380423
})

0 commit comments

Comments
 (0)