Skip to content
This repository was archived by the owner on Jul 21, 2023. It is now read-only.

Commit fa7cfc1

Browse files
authored
fix: increase ping concurrency (#341)
After running an js-ipfs node for a week I noticed the lan routing table ping queue had 200k items in it (😱). I think this is because we encounter lots of peers that are advertising their private addresses, they get added to the routing table, then we need to ping existing peers to see if we should evict them, which times out because the private addresses aren't routable. We do this with `concurrency: 1` so it takes 10s to process each item in the queue but we are adding them to the queue at a much faster rate than that so it grows out of control. Increasing the concurrency alleviates the queue pressure, so here we increase the default concurrency to 10 and expose it as a config option, as well as the timeout value to allow tuning by the user for their specific application. We also add the ping queue size and current in-flight ping requests to the list of tracked metrics for the node.
1 parent 190136a commit fa7cfc1

File tree

6 files changed

+90
-19
lines changed

6 files changed

+90
-19
lines changed

package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@
133133
},
134134
"dependencies": {
135135
"@libp2p/crypto": "^0.22.12",
136-
"@libp2p/interfaces": "^2.0.1",
136+
"@libp2p/interfaces": "^2.0.2",
137137
"@libp2p/logger": "^1.1.4",
138138
"@libp2p/peer-id": "^1.1.10",
139139
"@libp2p/record": "^1.0.4",
@@ -167,7 +167,7 @@
167167
"varint": "^6.0.0"
168168
},
169169
"devDependencies": {
170-
"@libp2p/interface-compliance-tests": "^2.0.1",
170+
"@libp2p/interface-compliance-tests": "^2.0.3",
171171
"@libp2p/peer-id-factory": "^1.0.9",
172172
"@libp2p/peer-store": "^1.0.11",
173173
"@types/lodash.random": "^3.2.6",

src/index.ts

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,48 @@ import { DualKadDHT } from './dual-kad-dht.js'
33
import type { Selectors, Validators } from '@libp2p/interfaces/dht'
44

55
export interface KadDHTInit {
6+
/**
7+
* How many peers to store in each kBucket (default 20)
8+
*/
69
kBucketSize?: number
10+
11+
/**
12+
* Whether to start up as a DHT client or server
13+
*/
714
clientMode?: boolean
15+
16+
/**
17+
* Record selectors
18+
*/
819
selectors?: Selectors
20+
21+
/**
22+
* Record validators
23+
*/
924
validators?: Validators
25+
26+
/**
27+
* How often to query our own PeerId in order to ensure we have a
28+
* good view on the KAD address space local to our PeerId
29+
*/
1030
querySelfInterval?: number
11-
lan?: boolean
31+
32+
/**
33+
* A custom protocol prefix to use (default: '/ipfs')
34+
*/
1235
protocolPrefix?: string
36+
37+
/**
38+
* How long to wait in ms when pinging DHT peers to decide if they
39+
* should be evicted from the routing table or not (default 10000)
40+
*/
41+
pingTimeout?: number
42+
43+
/**
44+
* How many peers to ping in parallel when deciding if they should
45+
* be evicted from the routing table or not (default 10)
46+
*/
47+
pingConcurrency?: number
1348
}
1449

1550
export class KadDHT extends DualKadDHT {

src/kad-dht.ts

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,13 @@ import { validators as recordValidators } from '@libp2p/record/validators'
2626
import { selectors as recordSelectors } from '@libp2p/record/selectors'
2727
import { symbol } from '@libp2p/interfaces/peer-discovery'
2828

29+
export interface SingleKadDHTInit extends KadDHTInit {
30+
/**
31+
* Whether to start up in lan or wan mode
32+
*/
33+
lan?: boolean
34+
}
35+
2936
/**
3037
* A DHT implementation modelled after Kademlia with S/Kademlia modifications.
3138
* Original implementation in go: https://github.com/libp2p/go-libp2p-kad-dht.
@@ -56,7 +63,7 @@ export class KadDHT extends EventEmitter<PeerDiscoveryEvents> implements DHT, In
5663
/**
5764
* Create a new KadDHT
5865
*/
59-
constructor (init: KadDHTInit) {
66+
constructor (init: SingleKadDHTInit) {
6067
super()
6168

6269
const {
@@ -66,7 +73,9 @@ export class KadDHT extends EventEmitter<PeerDiscoveryEvents> implements DHT, In
6673
selectors,
6774
querySelfInterval,
6875
lan,
69-
protocolPrefix
76+
protocolPrefix,
77+
pingTimeout,
78+
pingConcurrency
7079
} = init
7180

7281
this.running = false
@@ -77,7 +86,9 @@ export class KadDHT extends EventEmitter<PeerDiscoveryEvents> implements DHT, In
7786
this.clientMode = clientMode ?? true
7887
this.routingTable = new RoutingTable({
7988
kBucketSize,
80-
lan: this.lan
89+
lan: this.lan,
90+
pingTimeout,
91+
pingConcurrency
8192
})
8293

8394
this.providers = new Providers()

src/network.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ export class Network extends EventEmitter<NetworkEvents> implements Startable, I
9797

9898
try {
9999
const connection = await this.components.getConnectionManager().openConnection(to, options)
100-
const streamData = await connection.newStream(this.protocol)
100+
const streamData = await connection.newStream(this.protocol, options)
101101
stream = streamData.stream
102102

103103
const response = await this._writeReadMessage(stream, msg.serialize(), options)
@@ -134,7 +134,7 @@ export class Network extends EventEmitter<NetworkEvents> implements Startable, I
134134

135135
try {
136136
const connection = await this.components.getConnectionManager().openConnection(to, options)
137-
const data = await connection.newStream(this.protocol)
137+
const data = await connection.newStream(this.protocol, options)
138138
stream = data.stream
139139

140140
await this._writeMessage(stream, msg.serialize(), options)

src/routing-table/index.ts

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,14 @@ export interface KBucketTree {
3737
}
3838

3939
const METRIC_ROUTING_TABLE_SIZE = 'routing-table-size'
40+
const METRIC_PING_QUEUE_SIZE = 'ping-queue-size'
41+
const METRIC_PING_RUNNING = 'ping-running'
4042

4143
export interface RoutingTableInit {
4244
lan: boolean
4345
kBucketSize?: number
4446
pingTimeout?: number
47+
pingConcurrency?: number
4548
}
4649

4750
/**
@@ -57,18 +60,38 @@ export class RoutingTable implements Startable, Initializable {
5760
private components: Components = new Components()
5861
private readonly lan: boolean
5962
private readonly pingTimeout: number
63+
private readonly pingConcurrency: number
6064
private running: boolean
6165

6266
constructor (init: RoutingTableInit) {
63-
const { kBucketSize, pingTimeout, lan } = init
67+
const { kBucketSize, pingTimeout, lan, pingConcurrency } = init
6468

6569
this.log = logger(`libp2p:kad-dht:${lan ? 'lan' : 'wan'}:routing-table`)
6670
this.kBucketSize = kBucketSize ?? 20
6771
this.pingTimeout = pingTimeout ?? 10000
72+
this.pingConcurrency = pingConcurrency ?? 10
6873
this.lan = lan
69-
this.pingQueue = new Queue({ concurrency: 1 })
7074
this.running = false
7175

76+
const updatePingQueueSizeMetric = () => {
77+
this.components.getMetrics()?.updateComponentMetric({
78+
system: 'libp2p',
79+
component: `kad-dht-${this.lan ? 'lan' : 'wan'}`,
80+
metric: METRIC_PING_QUEUE_SIZE,
81+
value: this.pingQueue.size
82+
})
83+
this.components.getMetrics()?.updateComponentMetric({
84+
system: 'libp2p',
85+
component: `kad-dht-${this.lan ? 'lan' : 'wan'}`,
86+
metric: METRIC_PING_RUNNING,
87+
value: this.pingQueue.pending
88+
})
89+
}
90+
91+
this.pingQueue = new Queue({ concurrency: this.pingConcurrency })
92+
this.pingQueue.addListener('add', updatePingQueueSizeMetric)
93+
this.pingQueue.addListener('next', updatePingQueueSizeMetric)
94+
7295
this._onPing = this._onPing.bind(this)
7396
}
7497

@@ -127,12 +150,14 @@ export class RoutingTable implements Startable, Initializable {
127150
try {
128151
timeoutController = new TimeoutController(this.pingTimeout)
129152

130-
this.log('pinging old contact %p', oldContact.peer)
131-
const connection = await this.components.getConnectionManager().openConnection(oldContact.peer, {
153+
const options = {
132154
signal: timeoutController.signal
133-
})
134-
const { stream } = await connection.newStream(PROTOCOL_DHT)
135-
await stream.close()
155+
}
156+
157+
this.log('pinging old contact %p', oldContact.peer)
158+
const connection = await this.components.getConnectionManager().openConnection(oldContact.peer, options)
159+
const { stream } = await connection.newStream(PROTOCOL_DHT, options)
160+
stream.close()
136161
responded++
137162
} catch (err: any) {
138163
if (this.running && this.kb != null) {

src/rpc/index.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,9 @@ export class RPC implements Initializable {
9393
const self = this // eslint-disable-line @typescript-eslint/no-this-alias
9494

9595
await pipe(
96-
stream.source,
96+
stream,
9797
lp.decode(),
98-
source => (async function * () {
98+
async function * (source) {
9999
for await (const msg of source) {
100100
// handle the message
101101
const desMessage = Message.deserialize(msg)
@@ -107,9 +107,9 @@ export class RPC implements Initializable {
107107
yield res.serialize()
108108
}
109109
}
110-
})(),
110+
},
111111
lp.encode(),
112-
stream.sink
112+
stream
113113
)
114114
})
115115
.catch(err => {

0 commit comments

Comments
 (0)