Skip to content

Commit 0cfcc4d

Browse files
authored
fix: limit max dial queue size (#2472)
Add an upper bound to the maximum length of the dial queue. Any attempts to dial when the queue is full will throw.
1 parent 936dbba commit 0cfcc4d

File tree

4 files changed

+54
-3
lines changed

4 files changed

+54
-3
lines changed

packages/libp2p/src/connection-manager/constants.defaults.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,3 +57,8 @@ export const MAX_INCOMING_PENDING_CONNECTIONS = 10
5757
* failed to dial.
5858
*/
5959
export const LAST_DIAL_FAILURE_KEY = 'last-dial-failure'
60+
61+
/**
62+
* @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#maxDialQueueLength
63+
*/
64+
export const MAX_DIAL_QUEUE_LENGTH = 500

packages/libp2p/src/connection-manager/dial-queue.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ import {
1313
DIAL_TIMEOUT,
1414
MAX_PARALLEL_DIALS,
1515
MAX_PEER_ADDRS_TO_DIAL,
16-
LAST_DIAL_FAILURE_KEY
16+
LAST_DIAL_FAILURE_KEY,
17+
MAX_DIAL_QUEUE_LENGTH
1718
} from './constants.js'
1819
import { resolveMultiaddrs } from './utils.js'
1920
import type { AddressSorter, AbortOptions, ComponentLogger, Logger, Connection, ConnectionGater, Metrics, PeerId, Address, PeerStore, PeerRouting } from '@libp2p/interface'
@@ -38,6 +39,7 @@ interface DialQueueJobOptions extends QueueAddOptions {
3839
interface DialerInit {
3940
addressSorter?: AddressSorter
4041
maxParallelDials?: number
42+
maxDialQueueLength?: number
4143
maxPeerAddrsToDial?: number
4244
dialTimeout?: number
4345
resolvers?: Record<string, Resolver>
@@ -47,6 +49,7 @@ interface DialerInit {
4749
const defaultOptions = {
4850
addressSorter: defaultAddressSort,
4951
maxParallelDials: MAX_PARALLEL_DIALS,
52+
maxDialQueueLength: MAX_DIAL_QUEUE_LENGTH,
5053
maxPeerAddrsToDial: MAX_PEER_ADDRS_TO_DIAL,
5154
dialTimeout: DIAL_TIMEOUT,
5255
resolvers: {
@@ -70,6 +73,7 @@ export class DialQueue {
7073
private readonly components: DialQueueComponents
7174
private readonly addressSorter: AddressSorter
7275
private readonly maxPeerAddrsToDial: number
76+
private readonly maxDialQueueLength: number
7377
private readonly dialTimeout: number
7478
private shutDownController: AbortController
7579
private readonly connections: PeerMap<Connection[]>
@@ -78,6 +82,7 @@ export class DialQueue {
7882
constructor (components: DialQueueComponents, init: DialerInit = {}) {
7983
this.addressSorter = init.addressSorter ?? defaultOptions.addressSorter
8084
this.maxPeerAddrsToDial = init.maxPeerAddrsToDial ?? defaultOptions.maxPeerAddrsToDial
85+
this.maxDialQueueLength = init.maxDialQueueLength ?? defaultOptions.maxDialQueueLength
8186
this.dialTimeout = init.dialTimeout ?? defaultOptions.dialTimeout
8287
this.connections = init.connections ?? new PeerMap()
8388
this.log = components.logger.forComponent('libp2p:connection-manager:dial-queue')
@@ -185,6 +190,10 @@ export class DialQueue {
185190
return existingDial.join(options)
186191
}
187192

193+
if (this.queue.size >= this.maxDialQueueLength) {
194+
throw new CodeError('Dial queue is full', 'ERR_DIAL_QUEUE_FULL')
195+
}
196+
188197
this.log('creating dial target for %p', peerId, multiaddrs.map(ma => ma.toString()))
189198

190199
return this.queue.add(async (options) => {

packages/libp2p/src/connection-manager/index.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import { codes } from '../errors.js'
88
import { getPeerAddress } from '../get-peer.js'
99
import { AutoDial } from './auto-dial.js'
1010
import { ConnectionPruner } from './connection-pruner.js'
11-
import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_MAX_QUEUE_LENGTH, AUTO_DIAL_PRIORITY, DIAL_TIMEOUT, INBOUND_CONNECTION_THRESHOLD, MAX_CONNECTIONS, MAX_INCOMING_PENDING_CONNECTIONS, MAX_PARALLEL_DIALS, MAX_PEER_ADDRS_TO_DIAL, MIN_CONNECTIONS } from './constants.js'
11+
import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_MAX_QUEUE_LENGTH, AUTO_DIAL_PRIORITY, DIAL_TIMEOUT, INBOUND_CONNECTION_THRESHOLD, MAX_CONNECTIONS, MAX_DIAL_QUEUE_LENGTH, MAX_INCOMING_PENDING_CONNECTIONS, MAX_PARALLEL_DIALS, MAX_PEER_ADDRS_TO_DIAL, MIN_CONNECTIONS } from './constants.js'
1212
import { DialQueue } from './dial-queue.js'
1313
import type { PendingDial, AddressSorter, Libp2pEvents, AbortOptions, ComponentLogger, Logger, Connection, MultiaddrConnection, ConnectionGater, TypedEventTarget, Metrics, PeerId, Peer, PeerStore, Startable, PendingDialStatus, PeerRouting } from '@libp2p/interface'
1414
import type { ConnectionManager, OpenConnectionOptions, TransportManager } from '@libp2p/interface-internal'
@@ -81,6 +81,15 @@ export interface ConnectionManagerInit {
8181
*/
8282
maxParallelDials?: number
8383

84+
/**
85+
* The maximum size the dial queue is allowed to grow to. Promises returned
86+
* when dialing peers after this limit is reached will not resolve until the
87+
* queue size falls beneath this size.
88+
*
89+
* @default 500
90+
*/
91+
maxDialQueueLength?: number
92+
8493
/**
8594
* Maximum number of addresses allowed for a given peer before giving up
8695
*
@@ -238,6 +247,7 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
238247
this.dialQueue = new DialQueue(components, {
239248
addressSorter: init.addressSorter ?? defaultAddressSort,
240249
maxParallelDials: init.maxParallelDials ?? MAX_PARALLEL_DIALS,
250+
maxDialQueueLength: init.maxDialQueueLength ?? MAX_DIAL_QUEUE_LENGTH,
241251
maxPeerAddrsToDial: init.maxPeerAddrsToDial ?? MAX_PEER_ADDRS_TO_DIAL,
242252
dialTimeout: init.dialTimeout ?? DIAL_TIMEOUT,
243253
resolvers: init.resolvers ?? {

packages/libp2p/test/connection-manager/direct.spec.ts

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import { DefaultConnectionManager } from '../../src/connection-manager/index.js'
2727
import { codes as ErrorCodes } from '../../src/errors.js'
2828
import { createLibp2p } from '../../src/index.js'
2929
import { DefaultTransportManager } from '../../src/transport-manager.js'
30-
import type { Libp2p, Connection, PeerId } from '@libp2p/interface'
30+
import type { Libp2p, Connection, PeerId, Transport } from '@libp2p/interface'
3131
import type { TransportManager } from '@libp2p/interface-internal'
3232
import type { Multiaddr } from '@multiformats/multiaddr'
3333

@@ -504,4 +504,31 @@ describe('libp2p.dialer (direct, WebSockets)', () => {
504504
.to.eventually.be.rejected()
505505
.and.to.have.property('code', ErrorCodes.ERR_DIALED_SELF)
506506
})
507+
508+
it('should limit the maximum dial queue size', async () => {
509+
const transport = stubInterface<Transport>({
510+
filter: (ma) => ma,
511+
dial: async () => {
512+
await delay(1000)
513+
return stubInterface<Connection>()
514+
}
515+
})
516+
517+
libp2p = await createLibp2p({
518+
peerId,
519+
transports: [
520+
() => transport
521+
],
522+
connectionManager: {
523+
maxDialQueueLength: 1,
524+
maxParallelDials: 1
525+
}
526+
})
527+
528+
await expect(Promise.all([
529+
libp2p.dial(multiaddr('/ip4/127.0.0.1/tcp/1234')),
530+
libp2p.dial(multiaddr('/ip4/127.0.0.1/tcp/1235'))
531+
])).to.eventually.be.rejected
532+
.with.property('code', 'ERR_DIAL_QUEUE_FULL')
533+
})
507534
})

0 commit comments

Comments
 (0)