diff --git a/packages/libp2p/src/connection-manager/dial-queue.ts b/packages/libp2p/src/connection-manager/dial-queue.ts index 5202260ac8..ef99a8a9bd 100644 --- a/packages/libp2p/src/connection-manager/dial-queue.ts +++ b/packages/libp2p/src/connection-manager/dial-queue.ts @@ -36,6 +36,7 @@ export interface PendingDialTarget { interface DialQueueJobOptions extends PriorityQueueJobOptions, ProgressOptions { peerId?: PeerId multiaddrs: Set + force?: boolean } interface DialerInit { @@ -135,11 +136,12 @@ export class DialQueue { */ async dial (peerIdOrMultiaddr: PeerId | Multiaddr | Multiaddr[], options: OpenConnectionOptions = {}): Promise { const { peerId, multiaddrs } = getPeerAddress(peerIdOrMultiaddr) + const { force } = options // make sure we don't have an existing connection to any of the addresses we // are about to dial const existingConnection = Array.from(this.connections.values()).flat().find(conn => { - if (options.force === true) { + if (force === true) { return false } @@ -220,6 +222,7 @@ export class DialQueue { peerId, priority: options.priority ?? DEFAULT_DIAL_PRIORITY, multiaddrs: new Set(multiaddrs.map(ma => ma.toString())), + force, signal: options.signal ?? AbortSignal.timeout(this.dialTimeout), onProgress: options.onProgress }) @@ -313,6 +316,31 @@ export class DialQueue { this.log.error('could not update last dial failure key for %p', peerId, err) } + const { remotePeer } = conn + + // make sure we don't have an existing connection to the address we dialed + const existingConnection = Array.from(this.connections.values()).flat().find(_conn => { + if (options.force === true) { + return false + } + + if (_conn.remotePeer.equals(remotePeer) && _conn !== conn) { + return true + } + + return false + }) + + if (existingConnection?.status === 'open') { + this.log('already connected to %a', existingConnection.remoteAddr) + options?.onProgress?.(new CustomProgressEvent('dial-queue:already-connected')) + + this.log('closing duplicate connection to %p', remotePeer) + await conn.close() + + return existingConnection + } + // dial successful, return the connection return conn } catch (err: any) { diff --git a/packages/libp2p/test/connection-manager/dial-queue.spec.ts b/packages/libp2p/test/connection-manager/dial-queue.spec.ts index 7f078eaa56..21898bbfbd 100644 --- a/packages/libp2p/test/connection-manager/dial-queue.spec.ts +++ b/packages/libp2p/test/connection-manager/dial-queue.spec.ts @@ -3,6 +3,7 @@ import { generateKeyPair } from '@libp2p/crypto/keys' import { NotFoundError } from '@libp2p/interface' import { peerLogger } from '@libp2p/logger' +import { PeerMap } from '@libp2p/peer-collections' import { peerIdFromPrivateKey } from '@libp2p/peer-id' import { multiaddr } from '@multiformats/multiaddr' import { TCP, WebRTC } from '@multiformats/multiaddr-matcher' @@ -381,6 +382,42 @@ describe('dial queue', () => { await expect(dialer.dial(remotePeer)).to.eventually.equal(connection) }) + it('should return existing connection when dialing a multiaddr without a peer id', async () => { + const remotePeer = peerIdFromPrivateKey(await generateKeyPair('Ed25519')) + const ip = multiaddr('/ip4/123.123.123.123') + const addr1 = ip.encapsulate('/tcp/123') + const addr2 = ip.encapsulate('/tcp/321') + + const existingConnection = stubInterface({ + limits: { + bytes: 100n + }, + remotePeer, + remoteAddr: addr1.encapsulate(`/p2p/${remotePeer}`), + status: 'open' + }) + + const newConnection = stubInterface({ + limits: { + bytes: 100n + }, + remotePeer, + remoteAddr: addr2.encapsulate(`/p2p/${remotePeer}`), + status: 'open' + }) + + const connections = new PeerMap() + connections.set(remotePeer, [existingConnection]) + + components.transportManager.dialTransportForMultiaddr.callsFake(ma => { + return stubInterface() + }) + components.transportManager.dial.callsFake(async (ma, opts = {}) => newConnection) + dialer = new DialQueue(components, { connections }) + + await expect(dialer.dial(addr2)).to.eventually.equal(existingConnection) + }) + it('should respect user dial signal over default timeout if it is passed', async () => { const dialTimeout = 10 const userTimeout = 500