1
- import { InvalidMessageError , TypedEventEmitter } from '@libp2p/interface'
1
+ import { InvalidMessageError , KEEP_ALIVE , TypedEventEmitter } from '@libp2p/interface'
2
2
import { PeerSet } from '@libp2p/peer-collections'
3
+ import { AdaptiveTimeout } from '@libp2p/utils/adaptive-timeout'
3
4
import { PeerQueue } from '@libp2p/utils/peer-queue'
4
5
import { pbStream } from 'it-protobuf-stream'
5
6
import { Message , MessageType } from '../message/dht.js'
6
7
import * as utils from '../utils.js'
7
8
import { KBucket , isLeafBucket , type Bucket , type PingEventDetails } from './k-bucket.js'
8
9
import type { ComponentLogger , CounterGroup , Logger , Metric , Metrics , PeerId , PeerStore , Startable , Stream } from '@libp2p/interface'
9
10
import type { ConnectionManager } from '@libp2p/interface-internal'
11
+ import type { AdaptiveTimeoutInit } from '@libp2p/utils/adaptive-timeout'
10
12
11
13
export const KAD_CLOSE_TAG_NAME = 'kad-close'
12
14
export const KAD_CLOSE_TAG_VALUE = 50
13
15
export const KBUCKET_SIZE = 20
14
16
export const PREFIX_LENGTH = 32
15
- export const PING_TIMEOUT = 10000
16
- export const PING_CONCURRENCY = 10
17
+ export const PING_TIMEOUT = 2000
18
+ export const PING_CONCURRENCY = 20
17
19
18
20
export interface RoutingTableInit {
19
21
logPrefix : string
20
22
protocol : string
21
23
prefixLength ?: number
22
24
splitThreshold ?: number
23
25
kBucketSize ?: number
24
- pingTimeout ?: number
26
+ pingTimeout ?: AdaptiveTimeoutInit
25
27
pingConcurrency ?: number
26
28
tagName ?: string
27
29
tagValue ?: number
@@ -53,7 +55,7 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen
53
55
private readonly components : RoutingTableComponents
54
56
private readonly prefixLength : number
55
57
private readonly splitThreshold : number
56
- private readonly pingTimeout : number
58
+ private readonly pingTimeout : AdaptiveTimeout
57
59
private readonly pingConcurrency : number
58
60
private running : boolean
59
61
private readonly protocol : string
@@ -73,7 +75,6 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen
73
75
this . components = components
74
76
this . log = components . logger . forComponent ( `${ init . logPrefix } :routing-table` )
75
77
this . kBucketSize = init . kBucketSize ?? KBUCKET_SIZE
76
- this . pingTimeout = init . pingTimeout ?? PING_TIMEOUT
77
78
this . pingConcurrency = init . pingConcurrency ?? PING_CONCURRENCY
78
79
this . running = false
79
80
this . protocol = init . protocol
@@ -90,6 +91,11 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen
90
91
this . pingQueue . addEventListener ( 'error' , evt => {
91
92
this . log . error ( 'error pinging peer' , evt . detail )
92
93
} )
94
+ this . pingTimeout = new AdaptiveTimeout ( {
95
+ ...( init . pingTimeout ?? { } ) ,
96
+ metrics : this . components . metrics ,
97
+ metricName : `${ init . logPrefix . replaceAll ( ':' , '_' ) } _routing_table_ping_time_milliseconds`
98
+ } )
93
99
94
100
if ( this . components . metrics != null ) {
95
101
this . metrics = {
@@ -177,6 +183,9 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen
177
183
tags : {
178
184
[ this . tagName ] : {
179
185
value : this . tagValue
186
+ } ,
187
+ [ KEEP_ALIVE ] : {
188
+ value : 1
180
189
}
181
190
}
182
191
} )
@@ -185,7 +194,8 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen
185
194
for ( const peer of removedPeers ) {
186
195
await this . components . peerStore . merge ( peer , {
187
196
tags : {
188
- [ this . tagName ] : undefined
197
+ [ this . tagName ] : undefined ,
198
+ [ KEEP_ALIVE ] : undefined
189
199
}
190
200
} )
191
201
}
@@ -242,10 +252,11 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen
242
252
243
253
return this . pingQueue . add ( async ( ) => {
244
254
let stream : Stream | undefined
255
+ const signal = this . pingTimeout . getTimeoutSignal ( )
245
256
246
257
try {
247
258
const options = {
248
- signal : AbortSignal . timeout ( this . pingTimeout )
259
+ signal
249
260
}
250
261
251
262
this . log ( 'pinging old contact %p' , oldContact . peerId )
@@ -278,6 +289,7 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen
278
289
279
290
return false
280
291
} finally {
292
+ this . pingTimeout . cleanUp ( signal )
281
293
this . updateMetrics ( )
282
294
}
283
295
} , {
0 commit comments