Skip to content

Commit 58784ab

Browse files
authored
fix: record query metrics properly (#2736)
Using a metric for running queries misses most queryies because they complete within the scrape interval. Use a counter instead along with another for errors and record outgoing RPC messages in the same way as incoming ones.
1 parent d9c7e0f commit 58784ab

File tree

3 files changed

+43
-25
lines changed

3 files changed

+43
-25
lines changed

packages/kad-dht/src/network.ts

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import {
1111
queryErrorEvent
1212
} from './query/events.js'
1313
import type { KadDHTComponents, QueryEvent } from './index.js'
14-
import type { AbortOptions, Logger, Stream, PeerId, PeerInfo, Startable, RoutingOptions } from '@libp2p/interface'
14+
import type { AbortOptions, Logger, Stream, PeerId, PeerInfo, Startable, RoutingOptions, CounterGroup } from '@libp2p/interface'
1515

1616
export interface NetworkInit {
1717
protocol: string
@@ -32,6 +32,10 @@ export class Network extends TypedEventEmitter<NetworkEvents> implements Startab
3232
private running: boolean
3333
private readonly components: KadDHTComponents
3434
private readonly timeout: AdaptiveTimeout
35+
private readonly metrics: {
36+
operations?: CounterGroup
37+
errors?: CounterGroup
38+
}
3539

3640
/**
3741
* Create a new network
@@ -49,6 +53,10 @@ export class Network extends TypedEventEmitter<NetworkEvents> implements Startab
4953
metrics: components.metrics,
5054
metricName: `${init.logPrefix.replaceAll(':', '_')}_network_message_send_times_milliseconds`
5155
})
56+
this.metrics = {
57+
operations: components.metrics?.registerCounterGroup(`${init.logPrefix.replaceAll(':', '_')}_outbound_rpc_requests_total`),
58+
errors: components.metrics?.registerCounterGroup(`${init.logPrefix.replaceAll(':', '_')}_outbound_rpc_errors_total`)
59+
}
5260
}
5361

5462
/**
@@ -103,6 +111,8 @@ export class Network extends TypedEventEmitter<NetworkEvents> implements Startab
103111
}
104112

105113
try {
114+
this.metrics.operations?.increment({ [type]: true })
115+
106116
const connection = await this.components.connectionManager.openConnection(to, options)
107117
stream = await connection.newStream(this.protocol, options)
108118
const response = await this._writeReadMessage(stream, msg, options)
@@ -121,6 +131,8 @@ export class Network extends TypedEventEmitter<NetworkEvents> implements Startab
121131
record: response.record == null ? undefined : Libp2pRecord.deserialize(response.record)
122132
}, options)
123133
} catch (err: any) {
134+
this.metrics.errors?.increment({ [type]: true })
135+
124136
stream?.abort(err)
125137

126138
// only log if the incoming signal was not aborted - this means we were
@@ -162,6 +174,8 @@ export class Network extends TypedEventEmitter<NetworkEvents> implements Startab
162174
}
163175

164176
try {
177+
this.metrics.operations?.increment({ [type]: true })
178+
165179
const connection = await this.components.connectionManager.openConnection(to, options)
166180
stream = await connection.newStream(this.protocol, options)
167181

@@ -175,6 +189,8 @@ export class Network extends TypedEventEmitter<NetworkEvents> implements Startab
175189

176190
yield peerResponseEvent({ from: to, messageType: type }, options)
177191
} catch (err: any) {
192+
this.metrics.errors?.increment({ [type]: true })
193+
178194
stream?.abort(err)
179195
yield queryErrorEvent({ from: to, error: err }, options)
180196
} finally {

packages/kad-dht/src/query/manager.ts

Lines changed: 16 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import { queryPath } from './query-path.js'
1212
import type { QueryFunc } from './types.js'
1313
import type { QueryEvent } from '../index.js'
1414
import type { RoutingTable } from '../routing-table/index.js'
15-
import type { ComponentLogger, Metric, Metrics, PeerId, RoutingOptions, Startable } from '@libp2p/interface'
15+
import type { ComponentLogger, Counter, Metric, Metrics, PeerId, RoutingOptions, Startable } from '@libp2p/interface'
1616
import type { ConnectionManager } from '@libp2p/interface-internal'
1717
import type { DeferredPromise } from 'p-defer'
1818

@@ -52,16 +52,16 @@ export class QueryManager implements Startable {
5252
private readonly alpha: number
5353
private shutDownController: AbortController
5454
private running: boolean
55-
private queries: number
5655
private readonly logger: ComponentLogger
5756
private readonly peerId: PeerId
5857
private readonly connectionManager: ConnectionManager
5958
private readonly routingTable: RoutingTable
6059
private initialQuerySelfHasRun?: DeferredPromise<void>
6160
private readonly logPrefix: string
62-
private readonly metrics?: {
63-
runningQueries: Metric
64-
queryTime: Metric
61+
private readonly metrics: {
62+
queries?: Counter
63+
errors?: Counter
64+
queryTime?: Metric
6565
}
6666

6767
constructor (components: QueryManagerComponents, init: QueryManagerInit) {
@@ -71,18 +71,16 @@ export class QueryManager implements Startable {
7171
this.disjointPaths = disjointPaths ?? K
7272
this.running = false
7373
this.alpha = alpha ?? ALPHA
74-
this.queries = 0
7574
this.initialQuerySelfHasRun = init.initialQuerySelfHasRun
7675
this.routingTable = init.routingTable
7776
this.logger = components.logger
7877
this.peerId = components.peerId
7978
this.connectionManager = components.connectionManager
8079

81-
if (components.metrics != null) {
82-
this.metrics = {
83-
runningQueries: components.metrics.registerMetric(`${logPrefix.replaceAll(':', '_')}_running_queries`),
84-
queryTime: components.metrics.registerMetric(`${logPrefix.replaceAll(':', '_')}_query_time_seconds`)
85-
}
80+
this.metrics = {
81+
queries: components.metrics?.registerCounter(`${logPrefix.replaceAll(':', '_')}_queries_total`),
82+
errors: components.metrics?.registerCounter(`${logPrefix.replaceAll(':', '_')}_query_errors_total`),
83+
queryTime: components.metrics?.registerMetric(`${logPrefix.replaceAll(':', '_')}_query_time_seconds`)
8684
}
8785

8886
// allow us to stop queries on shut down
@@ -121,7 +119,7 @@ export class QueryManager implements Startable {
121119
throw new Error('QueryManager not started')
122120
}
123121

124-
const stopQueryTimer = this.metrics?.queryTime.timer()
122+
const stopQueryTimer = this.metrics.queryTime?.timer()
125123

126124
if (options.signal == null) {
127125
// don't let queries run forever
@@ -167,8 +165,7 @@ export class QueryManager implements Startable {
167165
}
168166

169167
log('query:start')
170-
this.queries++
171-
this.metrics?.runningQueries.update(this.queries)
168+
this.metrics?.queries?.increment()
172169

173170
const id = await convertBuffer(key)
174171
const peers = this.routingTable.closestPeers(id)
@@ -223,6 +220,10 @@ export class QueryManager implements Startable {
223220

224221
queryFinished = true
225222
} catch (err: any) {
223+
if (!queryFinished) {
224+
this.metrics?.errors?.increment()
225+
}
226+
226227
if (!this.running && err.name === 'QueryAbortedError') {
227228
// ignore query aborted errors that were thrown during query manager shutdown
228229
} else {
@@ -236,13 +237,7 @@ export class QueryManager implements Startable {
236237

237238
signal.clear()
238239

239-
this.queries--
240-
this.metrics?.runningQueries.update(this.queries)
241-
242-
if (stopQueryTimer != null) {
243-
stopQueryTimer()
244-
}
245-
240+
stopQueryTimer?.()
246241
log('query:done in %dms', Date.now() - startTime)
247242
}
248243
}

packages/kad-dht/src/routing-table/index.ts

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import { pbStream } from 'it-protobuf-stream'
55
import { Message, MessageType } from '../message/dht.js'
66
import * as utils from '../utils.js'
77
import { KBucket, isLeafBucket, type Bucket, type PingEventDetails } from './k-bucket.js'
8-
import type { ComponentLogger, Logger, Metric, Metrics, PeerId, PeerStore, Startable, Stream } from '@libp2p/interface'
8+
import type { ComponentLogger, CounterGroup, Logger, Metric, Metrics, PeerId, PeerStore, Startable, Stream } from '@libp2p/interface'
99
import type { ConnectionManager } from '@libp2p/interface-internal'
1010

1111
export const KAD_CLOSE_TAG_NAME = 'kad-close'
@@ -64,6 +64,7 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen
6464
routingTableKadBucketTotal: Metric
6565
routingTableKadBucketAverageOccupancy: Metric
6666
routingTableKadBucketMaxDepth: Metric
67+
kadBucketEvents: CounterGroup<'ping' | 'ping_error' | 'peer_added' | 'peer_removed'>
6768
}
6869

6970
constructor (components: RoutingTableComponents, init: RoutingTableInit) {
@@ -95,7 +96,8 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen
9596
routingTableSize: this.components.metrics.registerMetric(`${init.logPrefix.replaceAll(':', '_')}_routing_table_size`),
9697
routingTableKadBucketTotal: this.components.metrics.registerMetric(`${init.logPrefix.replaceAll(':', '_')}_routing_table_kad_bucket_total`),
9798
routingTableKadBucketAverageOccupancy: this.components.metrics.registerMetric(`${init.logPrefix.replaceAll(':', '_')}_routing_table_kad_bucket_average_occupancy`),
98-
routingTableKadBucketMaxDepth: this.components.metrics.registerMetric(`${init.logPrefix.replaceAll(':', '_')}_routing_table_kad_bucket_max_depth`)
99+
routingTableKadBucketMaxDepth: this.components.metrics.registerMetric(`${init.logPrefix.replaceAll(':', '_')}_routing_table_kad_bucket_max_depth`),
100+
kadBucketEvents: this.components.metrics.registerCounterGroup(`${init.logPrefix.replaceAll(':', '_')}_kad_bucket_events_total`)
99101
}
100102
}
101103
}
@@ -121,7 +123,10 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen
121123

122124
// test whether to evict peers
123125
kBuck.addEventListener('ping', (evt) => {
126+
this.metrics?.kadBucketEvents.increment({ ping: true })
127+
124128
this._onPing(evt).catch(err => {
129+
this.metrics?.kadBucketEvents.increment({ ping_error: true })
125130
this.log.error('could not process k-bucket ping event', err)
126131
})
127132
})
@@ -195,11 +200,13 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen
195200
kBuck.addEventListener('added', (evt) => {
196201
updatePeerTags()
197202

203+
this.metrics?.kadBucketEvents.increment({ peer_added: true })
198204
this.safeDispatchEvent('peer:add', { detail: evt.detail.peerId })
199205
})
200206
kBuck.addEventListener('removed', (evt) => {
201207
updatePeerTags()
202208

209+
this.metrics?.kadBucketEvents.increment({ peer_removed: true })
203210
this.safeDispatchEvent('peer:remove', { detail: evt.detail.peerId })
204211
})
205212
}
@@ -271,7 +278,7 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen
271278

272279
return false
273280
} finally {
274-
this.metrics?.routingTableSize.update(this.size)
281+
this.updateMetrics()
275282
}
276283
}, {
277284
peerId: oldContact.peerId

0 commit comments

Comments
 (0)