Skip to content

Commit d9c7e0f

Browse files
authored
fix: record outbound pending connections metric (#2737)
This is similar to the dial queue size but is reported in a way that's analogous to inbound pending connections metric making it a bit easier to digest.
1 parent d7312ae commit d9c7e0f

File tree

1 file changed

+54
-45
lines changed
  • packages/libp2p/src/connection-manager

1 file changed

+54
-45
lines changed

packages/libp2p/src/connection-manager/index.ts

Lines changed: 54 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
180180
private readonly deny: Multiaddr[]
181181
private readonly maxIncomingPendingConnections: number
182182
private incomingPendingConnections: number
183+
private outboundPendingConnections: number
183184
private readonly maxConnections: number
184185

185186
public readonly dialQueue: DialQueue
@@ -220,6 +221,7 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
220221

221222
this.incomingPendingConnections = 0
222223
this.maxIncomingPendingConnections = init.maxIncomingPendingConnections ?? defaultOptions.maxIncomingPendingConnections
224+
this.outboundPendingConnections = 0
223225

224226
// controls individual peers trying to dial us too quickly
225227
this.inboundConnectionRateLimiter = new RateLimiter({
@@ -276,7 +278,8 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
276278
const metric = {
277279
inbound: 0,
278280
'inbound pending': this.incomingPendingConnections,
279-
outbound: 0
281+
outbound: 0,
282+
'outbound pending': this.outboundPendingConnections
280283
}
281284

282285
for (const conns of this.connections.values()) {
@@ -482,67 +485,73 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
482485
throw new NotStartedError('Not started')
483486
}
484487

485-
options.signal?.throwIfAborted()
488+
this.outboundPendingConnections++
486489

487-
const { peerId } = getPeerAddress(peerIdOrMultiaddr)
490+
try {
491+
options.signal?.throwIfAborted()
488492

489-
if (this.peerId.equals(peerId)) {
490-
throw new InvalidPeerIdError('Can not dial self')
491-
}
493+
const { peerId } = getPeerAddress(peerIdOrMultiaddr)
494+
495+
if (this.peerId.equals(peerId)) {
496+
throw new InvalidPeerIdError('Can not dial self')
497+
}
492498

493-
if (peerId != null && options.force !== true) {
494-
this.log('dial %p', peerId)
495-
const existingConnection = this.getConnections(peerId)
496-
.find(conn => conn.limits == null)
499+
if (peerId != null && options.force !== true) {
500+
this.log('dial %p', peerId)
501+
const existingConnection = this.getConnections(peerId)
502+
.find(conn => conn.limits == null)
497503

498-
if (existingConnection != null) {
499-
this.log('had an existing non-limited connection to %p', peerId)
504+
if (existingConnection != null) {
505+
this.log('had an existing non-limited connection to %p', peerId)
500506

501-
options.onProgress?.(new CustomProgressEvent('dial-queue:already-connected'))
502-
return existingConnection
507+
options.onProgress?.(new CustomProgressEvent('dial-queue:already-connected'))
508+
return existingConnection
509+
}
503510
}
504-
}
505511

506-
const connection = await this.dialQueue.dial(peerIdOrMultiaddr, {
507-
...options,
508-
priority: options.priority ?? DEFAULT_DIAL_PRIORITY
509-
})
512+
const connection = await this.dialQueue.dial(peerIdOrMultiaddr, {
513+
...options,
514+
priority: options.priority ?? DEFAULT_DIAL_PRIORITY
515+
})
510516

511-
if (connection.status !== 'open') {
512-
throw new ConnectionClosedError('Remote closed connection during opening')
513-
}
517+
if (connection.status !== 'open') {
518+
throw new ConnectionClosedError('Remote closed connection during opening')
519+
}
514520

515-
let peerConnections = this.connections.get(connection.remotePeer)
521+
let peerConnections = this.connections.get(connection.remotePeer)
516522

517-
if (peerConnections == null) {
518-
peerConnections = []
519-
this.connections.set(connection.remotePeer, peerConnections)
520-
}
523+
if (peerConnections == null) {
524+
peerConnections = []
525+
this.connections.set(connection.remotePeer, peerConnections)
526+
}
521527

522-
// we get notified of connections via the Upgrader emitting "connection"
523-
// events, double check we aren't already tracking this connection before
524-
// storing it
525-
let trackedConnection = false
528+
// we get notified of connections via the Upgrader emitting "connection"
529+
// events, double check we aren't already tracking this connection before
530+
// storing it
531+
let trackedConnection = false
526532

527-
for (const conn of peerConnections) {
528-
if (conn.id === connection.id) {
529-
trackedConnection = true
530-
}
533+
for (const conn of peerConnections) {
534+
if (conn.id === connection.id) {
535+
trackedConnection = true
536+
}
531537

532-
// make sure we don't already have a connection to this multiaddr
533-
if (options.force !== true && conn.id !== connection.id && conn.remoteAddr.equals(connection.remoteAddr)) {
534-
connection.abort(new InvalidMultiaddrError('Duplicate multiaddr connection'))
538+
// make sure we don't already have a connection to this multiaddr
539+
if (options.force !== true && conn.id !== connection.id && conn.remoteAddr.equals(connection.remoteAddr)) {
540+
connection.abort(new InvalidMultiaddrError('Duplicate multiaddr connection'))
535541

536-
// return the existing connection
537-
return conn
542+
// return the existing connection
543+
return conn
544+
}
538545
}
539-
}
540546

541-
if (!trackedConnection) {
542-
peerConnections.push(connection)
543-
}
547+
if (!trackedConnection) {
548+
peerConnections.push(connection)
549+
}
544550

545-
return connection
551+
return connection
552+
} finally {
553+
this.outboundPendingConnections--
554+
}
546555
}
547556

548557
async closeConnections (peerId: PeerId, options: AbortOptions = {}): Promise<void> {

0 commit comments

Comments
 (0)