Skip to content
This repository was archived by the owner on Jul 21, 2023. It is now read-only.

Commit eff54bf

Browse files
authored
feat: ping old DHT peers before eviction (#229)
Updates the DHT contact logic to ensure we can still connect to old peers when buckets are full and only evict if the peer does not respond.
1 parent a5ed24c commit eff54bf

File tree

3 files changed

+160
-17
lines changed

3 files changed

+160
-17
lines changed

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
"peer-id": "^0.15.0",
7070
"protobufjs": "^6.10.2",
7171
"streaming-iterables": "^6.0.0",
72+
"timeout-abort-controller": "^1.1.1",
7273
"uint8arrays": "^3.0.0",
7374
"varint": "^6.0.0"
7475
},

src/routing-table/index.js

Lines changed: 56 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ const log = Object.assign(debug('libp2p:dht:routing-table'), {
1414
})
1515
// @ts-ignore
1616
const length = require('it-length')
17+
const { default: Queue } = require('p-queue')
18+
const { PROTOCOL_DHT } = require('../constants')
19+
// @ts-expect-error no types
20+
const TimeoutController = require('timeout-abort-controller')
1721

1822
/**
1923
* @typedef {object} KBucketPeer
@@ -54,12 +58,14 @@ class RoutingTable {
5458
* @param {object} [options]
5559
* @param {number} [options.kBucketSize=20]
5660
* @param {number} [options.refreshInterval=30000]
61+
* @param {number} [options.pingTimeout=10000]
5762
*/
58-
constructor (dht, { kBucketSize, refreshInterval } = {}) {
63+
constructor (dht, { kBucketSize, refreshInterval, pingTimeout } = {}) {
5964
this.peerId = dht.peerId
6065
this.dht = dht
6166
this._kBucketSize = kBucketSize || 20
6267
this._refreshInterval = refreshInterval || 30000
68+
this._pingTimeout = pingTimeout || 10000
6369

6470
/** @type {KBucketTree} */
6571
this.kb = new KBuck({
@@ -72,6 +78,7 @@ class RoutingTable {
7278

7379
this._refreshTable = this._refreshTable.bind(this)
7480
this._onPing = this._onPing.bind(this)
81+
this._pingQueue = new Queue({ concurrency: 1 })
7582
}
7683

7784
async start () {
@@ -85,6 +92,8 @@ class RoutingTable {
8592
if (this._refreshTimeoutId) {
8693
clearTimeout(this._refreshTimeoutId)
8794
}
95+
96+
this._pingQueue.clear()
8897
}
8998

9099
/**
@@ -295,26 +304,58 @@ class RoutingTable {
295304
}
296305

297306
/**
298-
* Called on the `ping` event from `k-bucket`.
299-
* Currently this just removes the oldest contact from
300-
* the list, without actually pinging the individual peers.
301-
* This is the same as go does, but should probably
302-
* be upgraded to actually ping the individual peers.
307+
* Called on the `ping` event from `k-bucket` when a bucket is full
308+
* and cannot split.
309+
*
310+
* `oldContacts.length` is defined by the `numberOfNodesToPing` param
311+
* passed to the `k-bucket` constructor.
312+
*
313+
* `oldContacts` will not be empty and is the list of contacts that
314+
* have not been contacted for the longest.
303315
*
304316
* @param {KBucketPeer[]} oldContacts
305317
* @param {KBucketPeer} newContact
306318
*/
307319
_onPing (oldContacts, newContact) {
308-
// just use the first one (k-bucket sorts from oldest to newest)
309-
const oldest = oldContacts[0]
310-
311-
if (oldest) {
312-
// remove the oldest one
313-
this.kb.remove(oldest.id)
314-
}
320+
// add to a queue so multiple ping requests do not overlap and we don't
321+
// flood the network with ping requests if lots of newContact requests
322+
// are received
323+
this._pingQueue.add(async () => {
324+
let responded = 0
325+
326+
try {
327+
await Promise.all(
328+
oldContacts.map(async oldContact => {
329+
let timeoutController
330+
331+
try {
332+
timeoutController = new TimeoutController(this._pingTimeout)
333+
log(`Pinging old contact ${oldContact.peer.toB58String()}`)
334+
const conn = await this.dht.libp2p.dialProtocol(oldContact.peer, PROTOCOL_DHT, {
335+
signal: timeoutController.signal
336+
})
337+
await conn.close()
338+
responded++
339+
} catch (err) {
340+
log.error('Could not ping peer', err)
341+
log(`Evicting old contact after ping failed ${oldContact.peer.toB58String()}`)
342+
this.kb.remove(oldContact.id)
343+
} finally {
344+
if (timeoutController) {
345+
timeoutController.clear()
346+
}
347+
}
348+
})
349+
)
315350

316-
// add the new one
317-
this.kb.add(newContact)
351+
if (responded < oldContacts.length) {
352+
log(`Adding new contact ${newContact.peer.toB58String()}`)
353+
this.kb.add(newContact)
354+
}
355+
} catch (err) {
356+
log.error('Could not process k-bucket ping event', err)
357+
}
358+
})
318359
}
319360

320361
// -- Public Interface

test/routing-table.spec.js

Lines changed: 103 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,30 @@
44
const { expect } = require('aegir/utils/chai')
55
const PeerId = require('peer-id')
66
const random = require('lodash.random')
7+
const sinon = require('sinon')
78

89
const RoutingTable = require('../src/routing-table')
910
const kadUtils = require('../src/utils')
1011
const createPeerId = require('./utils/create-peer-id')
12+
const { PROTOCOL_DHT } = require('../src/constants')
1113

1214
describe('Routing Table', () => {
1315
let table
1416

1517
beforeEach(async function () {
1618
this.timeout(20 * 1000)
1719

18-
const id = await PeerId.create({ bits: 512 })
19-
table = new RoutingTable(id, 20)
20+
const dht = {
21+
peerId: await PeerId.create({ bits: 512 }),
22+
libp2p: {
23+
dialProtocol: sinon.stub()
24+
}
25+
}
26+
27+
table = new RoutingTable(dht, {
28+
kBucketSize: 20,
29+
refreshInterval: 30000
30+
})
2031
})
2132

2233
it('add', async function () {
@@ -73,4 +84,94 @@ describe('Routing Table', () => {
7384
const key = await kadUtils.convertPeerId(peers[2])
7485
expect(table.closestPeers(key, 15)).to.have.length(15)
7586
})
87+
88+
it('favours old peers that respond to pings', async () => {
89+
let fn
90+
91+
// execute queued functions immediately
92+
table._pingQueue = {
93+
add: (f) => {
94+
fn = f
95+
}
96+
}
97+
98+
const peerIds = [
99+
PeerId.createFromB58String('QmYobx1VAHP7Mi88LcDvLeQoWcc1Aa2rynYHpdEPBqHZi5'),
100+
PeerId.createFromB58String('QmYobx1VAHP7Mi88LcDvLeQoWcc1Aa2rynYHpdEPBqHZi6')
101+
]
102+
103+
const oldPeer = {
104+
id: peerIds[0].toBytes(),
105+
peer: peerIds[0]
106+
}
107+
const newPeer = {
108+
id: peerIds[1].toBytes(),
109+
peer: peerIds[1]
110+
}
111+
112+
table._onPing([oldPeer], newPeer)
113+
114+
// add the old peer
115+
table.kb.add(oldPeer)
116+
117+
// simulate connection succeeding
118+
table.dht.libp2p.dialProtocol.withArgs(oldPeer.peer, PROTOCOL_DHT).resolves({ close: sinon.stub() })
119+
120+
// perform the ping
121+
await fn()
122+
123+
expect(table.dht.libp2p.dialProtocol.callCount).to.equal(1)
124+
expect(table.dht.libp2p.dialProtocol.calledWith(oldPeer.peer)).to.be.true()
125+
126+
// did not add the new peer
127+
expect(table.kb.get(newPeer.id)).to.be.null()
128+
129+
// kept the old peer
130+
expect(table.kb.get(oldPeer.id)).to.not.be.null()
131+
})
132+
133+
it('evicts oldest peer that does not respond to ping', async () => {
134+
let fn
135+
136+
// execute queued functions immediately
137+
table._pingQueue = {
138+
add: (f) => {
139+
fn = f
140+
}
141+
}
142+
143+
const peerIds = [
144+
PeerId.createFromB58String('QmYobx1VAHP7Mi88LcDvLeQoWcc1Aa2rynYHpdEPBqHZi5'),
145+
PeerId.createFromB58String('QmYobx1VAHP7Mi88LcDvLeQoWcc1Aa2rynYHpdEPBqHZi6')
146+
]
147+
148+
const oldPeer = {
149+
id: peerIds[0].toBytes(),
150+
peer: peerIds[0]
151+
}
152+
const newPeer = {
153+
id: peerIds[1].toBytes(),
154+
peer: peerIds[1]
155+
}
156+
157+
table._onPing([oldPeer], newPeer)
158+
159+
// add the old peer
160+
table.kb.add(oldPeer)
161+
162+
// libp2p fails to dial the old peer
163+
table.dht.libp2p.dialProtocol = sinon.stub().withArgs(oldPeer.peer, PROTOCOL_DHT).rejects(new Error('Could not dial peer'))
164+
165+
// perform the ping
166+
await fn()
167+
168+
expect(table.dht.libp2p.dialProtocol.callCount).to.equal(1)
169+
expect(table.dht.libp2p.dialProtocol.calledWith(oldPeer.peer)).to.be.true()
170+
171+
// added the new peer
172+
expect(table.kb.get(newPeer.id)).to.not.be.null()
173+
174+
// evicted the old peer
175+
expect(table.kb.get(oldPeer.id)).to.be.null()
176+
})
76177
})

0 commit comments

Comments
 (0)