Skip to content

Commit 661d658

Browse files
authored
feat!: ping peers before adding to routing table (#2745)
Implements the [check-before-add](https://github.com/libp2p/go-libp2p-kad-dht/blob/master/optimizations.md#checking-before-adding) client optimisation to ping a peer before adding it to the routing table. Adds a "new peer ping queue" to apply a concurrency limit to these pings, because it would be expected for old contacts to be less likely to be online so don't block adding new contacts to unrelated buckets if the connection to an old contact is timing out while being pinged before eviction. BREAKING CHANGE: the routing ping options have been split into "old contact" and "new contact" and renamed according
1 parent 80e798c commit 661d658

File tree

14 files changed

+927
-472
lines changed

14 files changed

+927
-472
lines changed

packages/kad-dht/package.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,6 @@
109109
"lodash.random": "^3.2.0",
110110
"lodash.range": "^3.2.0",
111111
"p-retry": "^6.2.0",
112-
"p-wait-for": "^5.0.2",
113112
"protons": "^7.5.0",
114113
"sinon": "^18.0.0",
115114
"sinon-ts": "^2.0.0",

packages/kad-dht/src/index.ts

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -400,15 +400,43 @@ export interface KadDHTInit {
400400
* Settings for how long to wait in ms when pinging DHT peers to decide if
401401
* they should be evicted from the routing table or not.
402402
*/
403-
pingTimeout?: Omit<AdaptiveTimeoutInit, 'metricsName' | 'metrics'>
403+
pingOldContactTimeout?: Omit<AdaptiveTimeoutInit, 'metricsName' | 'metrics'>
404404

405405
/**
406406
* How many peers to ping in parallel when deciding if they should
407407
* be evicted from the routing table or not
408408
*
409409
* @default 10
410410
*/
411-
pingConcurrency?: number
411+
pingOldContactConcurrency?: number
412+
413+
/**
414+
* How long the queue to ping peers is allowed to grow
415+
*
416+
* @default 100
417+
*/
418+
pingOldContactMaxQueueSize?: number
419+
420+
/**
421+
* Settings for how long to wait in ms when pinging DHT peers to decide if
422+
* they should be added to the routing table or not.
423+
*/
424+
pingNewContactTimeout?: Omit<AdaptiveTimeoutInit, 'metricsName' | 'metrics'>
425+
426+
/**
427+
* How many peers to ping in parallel when deciding if they should be added to
428+
* the routing table or not
429+
*
430+
* @default 10
431+
*/
432+
pingNewContactConcurrency?: number
433+
434+
/**
435+
* How long the queue to ping peers is allowed to grow
436+
*
437+
* @default 100
438+
*/
439+
pingNewContactMaxQueueSize?: number
412440

413441
/**
414442
* How many parallel incoming streams to allow on the DHT protocol per

packages/kad-dht/src/kad-dht.ts

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -138,8 +138,6 @@ export class KadDHT extends TypedEventEmitter<PeerDiscoveryEvents> implements Ka
138138
querySelfInterval,
139139
protocol,
140140
logPrefix,
141-
pingTimeout,
142-
pingConcurrency,
143141
maxInboundStreams,
144142
maxOutboundStreams,
145143
providers: providersInit
@@ -156,15 +154,6 @@ export class KadDHT extends TypedEventEmitter<PeerDiscoveryEvents> implements Ka
156154
this.maxInboundStreams = maxInboundStreams ?? DEFAULT_MAX_INBOUND_STREAMS
157155
this.maxOutboundStreams = maxOutboundStreams ?? DEFAULT_MAX_OUTBOUND_STREAMS
158156
this.peerInfoMapper = init.peerInfoMapper ?? removePrivateAddressesMapper
159-
this.routingTable = new RoutingTable(components, {
160-
kBucketSize,
161-
pingTimeout,
162-
pingConcurrency,
163-
protocol: this.protocol,
164-
logPrefix: loggingPrefix,
165-
prefixLength: init.prefixLength,
166-
splitThreshold: init.kBucketSplitThreshold
167-
})
168157

169158
this.providers = new Providers(components, providersInit ?? {})
170159

@@ -181,6 +170,21 @@ export class KadDHT extends TypedEventEmitter<PeerDiscoveryEvents> implements Ka
181170
logPrefix: loggingPrefix
182171
})
183172

173+
this.routingTable = new RoutingTable(components, {
174+
kBucketSize,
175+
pingOldContactTimeout: init.pingOldContactTimeout,
176+
pingOldContactConcurrency: init.pingOldContactConcurrency,
177+
pingOldContactMaxQueueSize: init.pingOldContactMaxQueueSize,
178+
pingNewContactTimeout: init.pingNewContactTimeout,
179+
pingNewContactConcurrency: init.pingNewContactConcurrency,
180+
pingNewContactMaxQueueSize: init.pingNewContactMaxQueueSize,
181+
protocol: this.protocol,
182+
logPrefix: loggingPrefix,
183+
prefixLength: init.prefixLength,
184+
splitThreshold: init.kBucketSplitThreshold,
185+
network: this.network
186+
})
187+
184188
// all queries should wait for the initial query-self query to run so we have
185189
// some peers and don't force consumers to use arbitrary timeouts
186190
const initialQuerySelfHasRun = pDefer<any>()
@@ -376,11 +380,17 @@ export class KadDHT extends TypedEventEmitter<PeerDiscoveryEvents> implements Ka
376380

377381
await this.components.registrar.unhandle(this.protocol)
378382

383+
// check again after async work
384+
if (mode === this.getMode() && !force) {
385+
this.log('already in %s mode', mode)
386+
return
387+
}
388+
379389
if (mode === 'client') {
380-
this.log('enabling client mode')
390+
this.log('enabling client mode while in %s mode', this.getMode())
381391
this.clientMode = true
382392
} else {
383-
this.log('enabling server mode')
393+
this.log('enabling server mode while in %s mode', this.getMode())
384394
this.clientMode = false
385395
await this.components.registrar.handle(this.protocol, this.rpc.onIncomingStream.bind(this.rpc), {
386396
maxInboundStreams: this.maxInboundStreams,
@@ -399,14 +409,18 @@ export class KadDHT extends TypedEventEmitter<PeerDiscoveryEvents> implements Ka
399409
await this.setMode(this.clientMode ? 'client' : 'server', true)
400410

401411
await start(
402-
this.querySelf,
412+
this.routingTable,
403413
this.providers,
404414
this.queryManager,
405415
this.network,
406-
this.routingTable,
407416
this.topologyListener,
408417
this.routingTableRefresh
409418
)
419+
420+
// Query self after other components are configured
421+
await start(
422+
this.querySelf
423+
)
410424
}
411425

412426
/**

packages/kad-dht/src/network.ts

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ export class Network extends TypedEventEmitter<NetworkEvents> implements Startab
8585
}
8686

8787
/**
88-
* Send a request and record RTT for latency measurements
88+
* Send a request and read a response
8989
*/
9090
async * sendRequest (to: PeerId, msg: Partial<Message>, options: RoutingOptions = {}): AsyncGenerator<QueryEvent> {
9191
if (!this.running) {
@@ -204,7 +204,6 @@ export class Network extends TypedEventEmitter<NetworkEvents> implements Startab
204204
async _writeMessage (stream: Stream, msg: Partial<Message>, options: AbortOptions): Promise<void> {
205205
const pb = pbStream(stream)
206206
await pb.write(msg, Message, options)
207-
await pb.unwrap().close(options)
208207
}
209208

210209
/**
@@ -219,8 +218,6 @@ export class Network extends TypedEventEmitter<NetworkEvents> implements Startab
219218

220219
const message = await pb.read(Message, options)
221220

222-
await pb.unwrap().close(options)
223-
224221
// tell any listeners about new peers we've seen
225222
message.closer.forEach(peerData => {
226223
this.safeDispatchEvent<PeerInfo>('peer', {

packages/kad-dht/src/query-self.ts

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -106,19 +106,27 @@ export class QuerySelf implements Startable {
106106

107107
if (this.started) {
108108
this.controller = new AbortController()
109-
const timeoutSignal = AbortSignal.timeout(this.queryTimeout)
110-
const signal = anySignal([this.controller.signal, timeoutSignal])
109+
const signals = [this.controller.signal]
111110

112-
// this controller will get used for lots of dial attempts so make sure we don't cause warnings to be logged
113-
setMaxListeners(Infinity, signal, this.controller.signal, timeoutSignal)
111+
// add a shorter timeout if we've already run our initial self query
112+
if (this.initialQuerySelfHasRun == null) {
113+
const timeoutSignal = AbortSignal.timeout(this.queryTimeout)
114+
setMaxListeners(Infinity, timeoutSignal)
115+
signals.push(timeoutSignal)
116+
}
117+
118+
const signal = anySignal(signals)
119+
setMaxListeners(Infinity, signal, this.controller.signal)
114120

115121
try {
116122
if (this.routingTable.size === 0) {
117123
this.log('routing table was empty, waiting for some peers before running query')
118-
// wait to discover at least one DHT peer
124+
// wait to discover at least one DHT peer that isn't us
119125
await pEvent(this.routingTable, 'peer:add', {
120-
signal
126+
signal,
127+
filter: (event) => !this.peerId.equals(event.detail)
121128
})
129+
this.log('routing table has peers, continuing with query')
122130
}
123131

124132
this.log('run self-query, look for %d peers timing out after %dms', this.count, this.queryTimeout)

0 commit comments

Comments
 (0)