Skip to content

Commit 0c59578

Browse files
authored
fix: split error/operation metrics (#2728)
Splitting metrics is considered best practice where there are a small number of outcomes. Refs: https://promlabs.com/blog/2023/09/19/errors-successes-totals-which-metrics-should-i-expose-to-prometheus/
1 parent 442a835 commit 0c59578

File tree

5 files changed

+99
-39
lines changed

5 files changed

+99
-39
lines changed

packages/libp2p/src/connection-manager/dial-queue.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/* eslint-disable max-depth */
2-
import { TimeoutError, DialError, setMaxListeners } from '@libp2p/interface'
2+
import { TimeoutError, DialError, setMaxListeners, AbortError } from '@libp2p/interface'
33
import { PeerMap } from '@libp2p/peer-collections'
44
import { defaultAddressSort } from '@libp2p/utils/address-sort'
55
import { PriorityQueue, type PriorityQueueJobOptions } from '@libp2p/utils/priority-queue'
@@ -103,7 +103,9 @@ export class DialQueue {
103103
})
104104
// a started job errored
105105
this.queue.addEventListener('error', (event) => {
106-
this.log.error('error in dial queue', event.detail)
106+
if (event.detail.name !== AbortError.name) {
107+
this.log.error('error in dial queue - %e', event.detail)
108+
}
107109
})
108110
}
109111

packages/libp2p/src/connection-manager/index.ts

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { InvalidParametersError, NotStartedError, start, stop } from '@libp2p/interface'
1+
import { InvalidMultiaddrError, InvalidParametersError, InvalidPeerIdError, NotStartedError, start, stop } from '@libp2p/interface'
22
import { PeerMap } from '@libp2p/peer-collections'
33
import { defaultAddressSort } from '@libp2p/utils/address-sort'
44
import { RateLimiter } from '@libp2p/utils/rate-limiter'
@@ -191,6 +191,7 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
191191
private readonly metrics?: Metrics
192192
private readonly events: TypedEventTarget<Libp2pEvents>
193193
private readonly log: Logger
194+
private readonly peerId: PeerId
194195

195196
constructor (components: DefaultConnectionManagerComponents, init: ConnectionManagerInit = {}) {
196197
this.maxConnections = init.maxConnections ?? defaultOptions.maxConnections
@@ -205,6 +206,7 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
205206
this.connections = new PeerMap()
206207

207208
this.started = false
209+
this.peerId = components.peerId
208210
this.peerStore = components.peerStore
209211
this.metrics = components.metrics
210212
this.events = components.events
@@ -484,6 +486,10 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
484486

485487
const { peerId } = getPeerAddress(peerIdOrMultiaddr)
486488

489+
if (this.peerId.equals(peerId)) {
490+
throw new InvalidPeerIdError('Can not dial self')
491+
}
492+
487493
if (peerId != null && options.force !== true) {
488494
this.log('dial %p', peerId)
489495
const existingConnection = this.getConnections(peerId)
@@ -501,6 +507,13 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
501507
...options,
502508
priority: options.priority ?? DEFAULT_DIAL_PRIORITY
503509
})
510+
511+
if (connection.remotePeer.equals(this.peerId)) {
512+
const err = new InvalidPeerIdError('Can not dial self')
513+
connection.abort(err)
514+
throw err
515+
}
516+
504517
let peerConnections = this.connections.get(connection.remotePeer)
505518

506519
if (peerConnections == null) {
@@ -517,6 +530,14 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
517530
if (conn.id === connection.id) {
518531
trackedConnection = true
519532
}
533+
534+
// make sure we don't already have a connection to this multiaddr
535+
if (options.force !== true && conn.id !== connection.id && conn.remoteAddr.equals(connection.remoteAddr)) {
536+
connection.abort(new InvalidMultiaddrError('Duplicate multiaddr connection'))
537+
538+
// return the existing connection
539+
return conn
540+
}
520541
}
521542

522543
if (!trackedConnection) {

packages/libp2p/src/upgrader.ts

Lines changed: 56 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import { createConnection } from './connection/index.js'
77
import { PROTOCOL_NEGOTIATION_TIMEOUT, UPGRADE_TIMEOUT } from './connection-manager/constants.js'
88
import { ConnectionDeniedError, ConnectionInterceptedError, EncryptionFailedError, MuxerUnavailableError } from './errors.js'
99
import { DEFAULT_MAX_INBOUND_STREAMS, DEFAULT_MAX_OUTBOUND_STREAMS } from './registrar.js'
10-
import type { Libp2pEvents, AbortOptions, ComponentLogger, MultiaddrConnection, Connection, Stream, ConnectionProtector, NewStreamOptions, ConnectionEncrypter, SecuredConnection, ConnectionGater, TypedEventTarget, Metrics, PeerId, PeerStore, StreamMuxer, StreamMuxerFactory, Upgrader, UpgraderOptions, ConnectionLimits, SecureConnectionOptions } from '@libp2p/interface'
10+
import type { Libp2pEvents, AbortOptions, ComponentLogger, MultiaddrConnection, Connection, Stream, ConnectionProtector, NewStreamOptions, ConnectionEncrypter, SecuredConnection, ConnectionGater, TypedEventTarget, Metrics, PeerId, PeerStore, StreamMuxer, StreamMuxerFactory, Upgrader, UpgraderOptions, ConnectionLimits, SecureConnectionOptions, CounterGroup } from '@libp2p/interface'
1111
import type { ConnectionManager, Registrar } from '@libp2p/interface-internal'
1212

1313
interface CreateConnectionOptions {
@@ -130,6 +130,10 @@ export class DefaultUpgrader implements Upgrader {
130130
private readonly inboundStreamProtocolNegotiationTimeout: number
131131
private readonly outboundStreamProtocolNegotiationTimeout: number
132132
private readonly events: TypedEventTarget<Libp2pEvents>
133+
private readonly metrics: {
134+
dials?: CounterGroup<'inbound' | 'outbound'>
135+
errors?: CounterGroup<'inbound' | 'outbound'>
136+
}
133137

134138
constructor (components: DefaultUpgraderComponents, init: UpgraderInit) {
135139
this.components = components
@@ -150,6 +154,10 @@ export class DefaultUpgrader implements Upgrader {
150154
this.inboundStreamProtocolNegotiationTimeout = init.inboundStreamProtocolNegotiationTimeout ?? PROTOCOL_NEGOTIATION_TIMEOUT
151155
this.outboundStreamProtocolNegotiationTimeout = init.outboundStreamProtocolNegotiationTimeout ?? PROTOCOL_NEGOTIATION_TIMEOUT
152156
this.events = components.events
157+
this.metrics = {
158+
dials: components.metrics?.registerCounterGroup('libp2p_connection_manager_dials_total'),
159+
errors: components.metrics?.registerCounterGroup('libp2p_connection_manager_dial_errors_total')
160+
}
153161
}
154162

155163
readonly [Symbol.toStringTag] = '@libp2p/upgrader'
@@ -175,6 +183,10 @@ export class DefaultUpgrader implements Upgrader {
175183
*/
176184
async upgradeInbound (maConn: MultiaddrConnection, opts: UpgraderOptions = {}): Promise<Connection> {
177185
try {
186+
this.metrics.dials?.increment({
187+
inbound: true
188+
})
189+
178190
const accept = await this.components.connectionManager.acceptIncomingConnection(maConn)
179191

180192
if (!accept) {
@@ -183,7 +195,15 @@ export class DefaultUpgrader implements Upgrader {
183195

184196
await this.shouldBlockConnection('denyInboundConnection', maConn)
185197

186-
return await this._performUpgrade(maConn, 'inbound', opts)
198+
const conn = await this._performUpgrade(maConn, 'inbound', opts)
199+
200+
return conn
201+
} catch (err) {
202+
this.metrics.errors?.increment({
203+
inbound: true
204+
})
205+
206+
throw err
187207
} finally {
188208
this.components.connectionManager.afterUpgradeInbound()
189209
}
@@ -193,15 +213,27 @@ export class DefaultUpgrader implements Upgrader {
193213
* Upgrades an outbound connection
194214
*/
195215
async upgradeOutbound (maConn: MultiaddrConnection, opts: UpgraderOptions = {}): Promise<Connection> {
196-
const idStr = maConn.remoteAddr.getPeerId()
197-
let remotePeerId: PeerId | undefined
216+
try {
217+
this.metrics.dials?.increment({
218+
outbound: true
219+
})
198220

199-
if (idStr != null) {
200-
remotePeerId = peerIdFromString(idStr)
201-
await this.shouldBlockConnection('denyOutboundConnection', remotePeerId, maConn)
202-
}
221+
const idStr = maConn.remoteAddr.getPeerId()
222+
let remotePeerId: PeerId | undefined
203223

204-
return this._performUpgrade(maConn, 'outbound', opts)
224+
if (idStr != null) {
225+
remotePeerId = peerIdFromString(idStr)
226+
await this.shouldBlockConnection('denyOutboundConnection', remotePeerId, maConn)
227+
}
228+
229+
return await this._performUpgrade(maConn, 'outbound', opts)
230+
} catch (err) {
231+
this.metrics.errors?.increment({
232+
outbound: true
233+
})
234+
235+
throw err
236+
}
205237
}
206238

207239
private async _performUpgrade (maConn: MultiaddrConnection, direction: 'inbound' | 'outbound', opts: UpgraderOptions): Promise<Connection> {
@@ -218,7 +250,7 @@ export class DefaultUpgrader implements Upgrader {
218250

219251
this.components.metrics?.trackMultiaddrConnection(maConn)
220252

221-
maConn.log('starting the %s connection upgrade', direction)
253+
maConn.log.trace('starting the %s connection upgrade', direction)
222254

223255
// Protect
224256
let protectedConn = maConn
@@ -292,13 +324,13 @@ export class DefaultUpgrader implements Upgrader {
292324
upgradedConn = multiplexed.stream
293325
}
294326
} catch (err: any) {
295-
maConn.log.error('failed to upgrade inbound connection', err)
327+
maConn.log.error('failed to upgrade inbound connection %s %a - %e', direction === 'inbound' ? 'from' : 'to', maConn.remoteAddr, err)
296328
throw err
297329
}
298330

299331
await this.shouldBlockConnection(direction === 'inbound' ? 'denyInboundUpgradedConnection' : 'denyOutboundUpgradedConnection', remotePeer, maConn)
300332

301-
maConn.log('successfully %s inbound connection', direction)
333+
maConn.log('successfully upgraded %s connection', direction)
302334

303335
return this._createConnection({
304336
cryptoProtocol,
@@ -399,7 +431,7 @@ export class DefaultUpgrader implements Upgrader {
399431
this._onStream({ connection, stream: muxedStream, protocol })
400432
})
401433
.catch(async err => {
402-
connection.log.error('error handling incoming stream id %s', muxedStream.id, err.message, err.code, err.stack)
434+
connection.log.error('error handling incoming stream id %s - %e', muxedStream.id, err)
403435

404436
if (muxedStream.timeline.close == null) {
405437
await muxedStream.close()
@@ -413,7 +445,7 @@ export class DefaultUpgrader implements Upgrader {
413445
throw new MuxerUnavailableError('Connection is not multiplexed')
414446
}
415447

416-
connection.log('starting new stream for protocols %s', protocols)
448+
connection.log.trace('starting new stream for protocols %s', protocols)
417449
const muxedStream = await muxer.newStream()
418450
connection.log.trace('started new stream %s for protocols %s', muxedStream.id, protocols)
419451

@@ -441,7 +473,7 @@ export class DefaultUpgrader implements Upgrader {
441473
yieldBytes: true
442474
})
443475

444-
muxedStream.log('selected protocol %s', protocol)
476+
muxedStream.log.trace('selected protocol %s', protocol)
445477

446478
const outgoingLimit = findOutgoingStreamLimit(protocol, this.components.registrar, options)
447479
const streamCount = countStreams(protocol, 'outbound', connection)
@@ -484,7 +516,7 @@ export class DefaultUpgrader implements Upgrader {
484516

485517
return muxedStream
486518
} catch (err: any) {
487-
connection.log.error('could not create new stream for protocols %s', protocols, err)
519+
connection.log.error('could not create new outbound stream on connection %s %a for protocols %s - %e', direction === 'inbound' ? 'from' : 'to', opts.maConn.remoteAddr, protocols, err)
488520

489521
if (muxedStream.timeline.close == null) {
490522
muxedStream.abort(err)
@@ -499,7 +531,7 @@ export class DefaultUpgrader implements Upgrader {
499531
muxer.sink(upgradedConn.source),
500532
upgradedConn.sink(muxer.source)
501533
]).catch(err => {
502-
connection.log.error('error piping data through muxer', err)
534+
connection.log.error('error piping data through muxer - %e', err)
503535
})
504536
}
505537

@@ -594,7 +626,6 @@ export class DefaultUpgrader implements Upgrader {
594626
*/
595627
async _encryptInbound (connection: MultiaddrConnection, options?: AbortOptions): Promise<CryptoResult> {
596628
const protocols = Array.from(this.connectionEncrypters.keys())
597-
connection.log('handling inbound crypto protocol selection', protocols)
598629

599630
try {
600631
const { stream, protocol } = await mss.handle(connection, protocols, {
@@ -604,17 +635,17 @@ export class DefaultUpgrader implements Upgrader {
604635
const encrypter = this.connectionEncrypters.get(protocol)
605636

606637
if (encrypter == null) {
607-
throw new Error(`no crypto module found for ${protocol}`)
638+
throw new EncryptionFailedError(`no crypto module found for ${protocol}`)
608639
}
609640

610-
connection.log('encrypting inbound connection using', protocol)
641+
connection.log('encrypting inbound connection to %a using %s', connection.remoteAddr, protocol)
611642

612643
return {
613644
...await encrypter.secureInbound(stream, options),
614645
protocol
615646
}
616647
} catch (err: any) {
617-
connection.log.error('encrypting inbound connection failed', err)
648+
connection.log.error('encrypting inbound connection from %a failed', connection.remoteAddr, err)
618649
throw new EncryptionFailedError(err.message)
619650
}
620651
}
@@ -625,34 +656,29 @@ export class DefaultUpgrader implements Upgrader {
625656
*/
626657
async _encryptOutbound (connection: MultiaddrConnection, options: SecureConnectionOptions): Promise<CryptoResult> {
627658
const protocols = Array.from(this.connectionEncrypters.keys())
628-
connection.log('selecting outbound crypto protocol', protocols)
629659

630660
try {
631661
connection.log.trace('selecting encrypter from %s', protocols)
632662

633-
const {
634-
stream,
635-
protocol
636-
} = await mss.select(connection, protocols, {
663+
const { stream, protocol } = await mss.select(connection, protocols, {
637664
...options,
638665
log: connection.log,
639666
yieldBytes: true
640667
})
641-
642668
const encrypter = this.connectionEncrypters.get(protocol)
643669

644670
if (encrypter == null) {
645-
throw new Error(`no crypto module found for ${protocol}`)
671+
throw new EncryptionFailedError(`no crypto module found for ${protocol}`)
646672
}
647673

648-
connection.log('encrypting outbound connection to %p using %s', options?.remotePeer, encrypter)
674+
connection.log('encrypting outbound connection to %a using %s', connection.remoteAddr, protocol)
649675

650676
return {
651677
...await encrypter.secureOutbound(stream, options),
652678
protocol
653679
}
654680
} catch (err: any) {
655-
connection.log.error('encrypting outbound connection to %p failed', options?.remotePeer, err)
681+
connection.log.error('encrypting outbound connection to %a failed', connection.remoteAddr, err)
656682
throw new EncryptionFailedError(err.message)
657683
}
658684
}

packages/libp2p/test/connection-manager/direct.spec.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -490,7 +490,7 @@ describe('libp2p.dialer (direct, WebSockets)', () => {
490490

491491
await expect(libp2p.dial(multiaddr(`/ip4/127.0.0.1/tcp/1234/ws/p2p/${libp2p.peerId.toString()}`)))
492492
.to.eventually.be.rejected()
493-
.and.to.have.property('name', 'DialError')
493+
.and.to.have.property('name', 'InvalidPeerIdError')
494494
})
495495

496496
it('should limit the maximum dial queue size', async () => {

packages/libp2p/test/connection-manager/index.spec.ts

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,9 @@ describe('Connection Manager', () => {
391391
})
392392
await connectionManager.start()
393393

394-
sinon.stub(connectionManager.dialQueue, 'dial').resolves(stubInterface<Connection>())
394+
sinon.stub(connectionManager.dialQueue, 'dial').resolves(stubInterface<Connection>({
395+
remotePeer: peerIdFromPrivateKey(await generateKeyPair('Ed25519'))
396+
}))
395397

396398
// max out the connection limit
397399
await connectionManager.openConnection(peerIdFromPrivateKey(await generateKeyPair('Ed25519')))
@@ -450,7 +452,9 @@ describe('Connection Manager', () => {
450452
})
451453
await connectionManager.start()
452454

453-
sinon.stub(connectionManager.dialQueue, 'dial').resolves(stubInterface<Connection>())
455+
sinon.stub(connectionManager.dialQueue, 'dial').resolves(stubInterface<Connection>({
456+
remotePeer: peerIdFromPrivateKey(await generateKeyPair('Ed25519'))
457+
}))
454458

455459
// max out the connection limit
456460
await connectionManager.openConnection(peerIdFromPrivateKey(await generateKeyPair('Ed25519')))
@@ -477,7 +481,9 @@ describe('Connection Manager', () => {
477481
})
478482
await connectionManager.start()
479483

480-
sinon.stub(connectionManager.dialQueue, 'dial').resolves(stubInterface<Connection>())
484+
sinon.stub(connectionManager.dialQueue, 'dial').resolves(stubInterface<Connection>({
485+
remotePeer: peerIdFromPrivateKey(await generateKeyPair('Ed25519'))
486+
}))
481487

482488
// start the upgrade
483489
const maConn1 = mockMultiaddrConnection({
@@ -523,9 +529,14 @@ describe('Connection Manager', () => {
523529
const existingConnection = stubInterface<Connection>({
524530
limits: {
525531
bytes: 100n
526-
}
532+
},
533+
remotePeer: targetPeer,
534+
remoteAddr: multiaddr(`/ip4/123.123.123.123/tcp/123/p2p-circuit/p2p/${targetPeer}`)
535+
})
536+
const newConnection = stubInterface<Connection>({
537+
remotePeer: targetPeer,
538+
remoteAddr: addr
527539
})
528-
const newConnection = stubInterface<Connection>()
529540

530541
sinon.stub(connectionManager.dialQueue, 'dial')
531542
.withArgs(addr)

0 commit comments

Comments
 (0)