Skip to content
This repository was archived by the owner on Jul 21, 2023. It is now read-only.

Commit 7b70fa7

Browse files
jacobheunvasco-santos
authored andcommitted
feat: add delay support to random walk (#101)
1 parent c1517a0 commit 7b70fa7

File tree

4 files changed

+60
-41
lines changed

4 files changed

+60
-41
lines changed

src/constants.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,5 +44,6 @@ exports.defaultRandomWalk = {
4444
enabled: true,
4545
queriesPerPeriod: 1,
4646
interval: 5 * minute,
47-
timeout: 10 * second
47+
timeout: 10 * second,
48+
delay: 10 * second
4849
}

src/index.js

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ const Message = require('./message')
2424
const RandomWalk = require('./random-walk')
2525
const QueryManager = require('./query-manager')
2626
const assert = require('assert')
27-
const mergeOptions = require('merge-options')
2827

2928
/**
3029
* A DHT implementation modeled after Kademlia with S/Kademlia modifications.
@@ -40,6 +39,7 @@ class KadDHT extends EventEmitter {
4039
* @property {number} queriesPerPeriod how many queries to run per period (default: 1)
4140
* @property {number} interval how often to run the the random-walk process, in milliseconds (default: 300000)
4241
* @property {number} timeout how long to wait for the the random-walk query to run, in milliseconds (default: 10000)
42+
* @property {number} delay how long to wait before starting the first random walk, in milliseconds (default: 10000)
4343
*/
4444

4545
/**
@@ -59,7 +59,6 @@ class KadDHT extends EventEmitter {
5959
options = options || {}
6060
options.validators = options.validators || {}
6161
options.selectors = options.selectors || {}
62-
options.randomWalk = mergeOptions(c.defaultRandomWalk, options.randomWalk)
6362

6463
/**
6564
* Local reference to the libp2p-switch instance
@@ -126,15 +125,7 @@ class KadDHT extends EventEmitter {
126125
*
127126
* @type {RandomWalk}
128127
*/
129-
this.randomWalk = new RandomWalk(this)
130-
131-
/**
132-
* Random walk state, default true
133-
*/
134-
this.randomWalkEnabled = Boolean(options.randomWalk.enabled)
135-
this.randomWalkQueriesPerPeriod = parseInt(options.randomWalk.queriesPerPeriod)
136-
this.randomWalkInterval = parseInt(options.randomWalk.interval)
137-
this.randomWalkTimeout = parseInt(options.randomWalk.timeout)
128+
this.randomWalk = new RandomWalk(this, options.randomWalk)
138129

139130
/**
140131
* Keeps track of running queries
@@ -167,8 +158,8 @@ class KadDHT extends EventEmitter {
167158
return callback(err)
168159
}
169160

170-
// Start random walk if enabled
171-
this.randomWalkEnabled && this.randomWalk.start(this.randomWalkQueriesPerPeriod, this.randomWalkInterval, this.randomWalkTimeout)
161+
// Start random walk, it will not run if it's disabled
162+
this.randomWalk.start()
172163
callback()
173164
})
174165
}

src/random-walk.js

Lines changed: 36 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -8,45 +8,56 @@ const multihashing = require('multihashing-async')
88
const PeerId = require('peer-id')
99
const assert = require('assert')
1010
const c = require('./constants')
11+
const { logger } = require('./utils')
1112

1213
const errcode = require('err-code')
1314

1415
class RandomWalk {
15-
constructor (kadDHT) {
16-
assert(kadDHT, 'Random Walk needs an instance of the Kademlia DHT')
16+
/**
17+
* @constructor
18+
* @param {DHT} dht
19+
* @param {object} options
20+
* @param {randomWalkOptions.enabled} options.enabled
21+
* @param {randomWalkOptions.queriesPerPeriod} options.queriesPerPeriod
22+
* @param {randomWalkOptions.interval} options.interval
23+
* @param {randomWalkOptions.timeout} options.timeout
24+
* @param {randomWalkOptions.delay} options.delay
25+
* @param {DHT} options.dht
26+
*/
27+
constructor (dht, options) {
28+
this._options = { ...c.defaultRandomWalk, ...options }
29+
assert(dht, 'Random Walk needs an instance of the Kademlia DHT')
1730
this._runningHandle = null
18-
this._kadDHT = kadDHT
31+
this._kadDHT = dht
32+
this.log = logger(dht.peerInfo.id, 'random-walk')
1933
}
2034

2135
/**
2236
* Start the Random Walk process. This means running a number of queries
2337
* every interval requesting random data. This is done to keep the dht
2438
* healthy over time.
2539
*
26-
* @param {number} [queries=1] - how many queries to run per period
27-
* @param {number} [period=300000] - how often to run the the random-walk process, in milliseconds (5min)
28-
* @param {number} [timeout=10000] - how long to wait for the the random-walk query to run, in milliseconds (10s)
2940
* @returns {void}
3041
*/
31-
start (queries = c.defaultRandomWalk.queriesPerPeriod, period = c.defaultRandomWalk.interval, timeout = c.defaultRandomWalk.timeout) {
42+
start () {
3243
// Don't run twice
33-
if (this._running) { return }
44+
if (this._running || !this._options.enabled) { return }
3445

3546
// Create running handle
3647
const runningHandle = {
3748
_onCancel: null,
3849
_timeoutId: null,
39-
runPeriodically: (fn, period) => {
50+
runPeriodically: (walk, period) => {
4051
runningHandle._timeoutId = setTimeout(() => {
4152
runningHandle._timeoutId = null
4253

43-
fn((nextPeriod) => {
54+
walk((nextPeriod) => {
4455
// Was walk cancelled while fn was being called?
4556
if (runningHandle._onCancel) {
4657
return runningHandle._onCancel()
4758
}
4859
// Schedule next
49-
runningHandle.runPeriodically(fn, nextPeriod)
60+
runningHandle.runPeriodically(walk, nextPeriod)
5061
})
5162
}, period)
5263
},
@@ -61,10 +72,15 @@ class RandomWalk {
6172
}
6273
}
6374

64-
// Start runner
65-
runningHandle.runPeriodically((done) => {
66-
this._walk(queries, timeout, () => done(period))
67-
}, period)
75+
// Start doing random walks after `this._options.delay`
76+
runningHandle._timeoutId = setTimeout(() => {
77+
// Start runner immediately
78+
runningHandle.runPeriodically((done) => {
79+
// Each subsequent walk should run on a `this._options.interval` interval
80+
this._walk(this._options.queriesPerPeriod, this._options.timeout, () => done(this._options.interval))
81+
}, 0)
82+
}, this._options.delay)
83+
6884
this._runningHandle = runningHandle
6985
}
7086

@@ -96,7 +112,7 @@ class RandomWalk {
96112
* @private
97113
*/
98114
_walk (queries, walkTimeout, callback) {
99-
this._kadDHT._log('random-walk:start')
115+
this.log('start')
100116

101117
times(queries, (i, cb) => {
102118
waterfall([
@@ -106,11 +122,11 @@ class RandomWalk {
106122
}, walkTimeout)(cb)
107123
], (err) => {
108124
if (err) {
109-
this._kadDHT._log.error('random-walk:error', err)
125+
this.log.error('query finished with error', err)
110126
return callback(err)
111127
}
112128

113-
this._kadDHT._log('random-walk:done')
129+
this.log('done')
114130
callback(null)
115131
})
116132
})
@@ -126,7 +142,7 @@ class RandomWalk {
126142
* @private
127143
*/
128144
_query (id, callback) {
129-
this._kadDHT._log('random-walk:query:%s', id.toB58String())
145+
this.log('query:%s', id.toB58String())
130146

131147
this._kadDHT.findPeer(id, (err, peer) => {
132148
if (err.code === 'ERR_NOT_FOUND') {
@@ -136,7 +152,7 @@ class RandomWalk {
136152
if (err) {
137153
return callback(err)
138154
}
139-
this._kadDHT._log('random-walk:query:found', err, peer)
155+
this.log('query:found', peer)
140156

141157
// wait what, there was something found? Lucky day!
142158
callback(errcode(new Error(`random-walk: ACTUALLY FOUND PEER: ${peer}, ${id.toB58String()}`), 'ERR_FOUND_RANDOM_PEER'))

test/kad-dht.spec.js

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,7 @@ function connect (a, b, callback) {
7474

7575
function bootstrap (dhts) {
7676
dhts.forEach((dht) => {
77-
// dht.randomWalk._walk(3, 10000, () => {}) // don't need to know when it finishes
78-
dht.randomWalk.start(1, 1000) // don't need to know when it finishes
77+
dht.randomWalk._walk(1, 1000, () => {})
7978
})
8079
}
8180

@@ -212,7 +211,7 @@ describe('KadDHT', () => {
212211
(cb) => dht.start(cb),
213212
(cb) => {
214213
expect(dht.network.start.calledOnce).to.equal(true)
215-
expect(dht.randomWalk.start.calledOnce).to.equal(false)
214+
expect(dht.randomWalk._runningHandle).to.not.exist()
216215

217216
cb()
218217
},
@@ -231,7 +230,11 @@ describe('KadDHT', () => {
231230
sw.transport.add('tcp', new TCP())
232231
sw.connection.addStreamMuxer(Mplex)
233232
sw.connection.reuse()
234-
const dht = new KadDHT(sw, { enabledDiscovery: false })
233+
const dht = new KadDHT(sw, {
234+
randomWalk: {
235+
enabled: false
236+
}
237+
})
235238

236239
series([
237240
(cb) => dht.start(cb),
@@ -247,7 +250,11 @@ describe('KadDHT', () => {
247250
sw.transport.add('tcp', new TCP())
248251
sw.connection.addStreamMuxer(Mplex)
249252
sw.connection.reuse()
250-
const dht = new KadDHT(sw, { enabledDiscovery: false })
253+
const dht = new KadDHT(sw, {
254+
randomWalk: {
255+
enabled: false
256+
}
257+
})
251258

252259
series([
253260
(cb) => dht.stop(cb)
@@ -567,13 +574,17 @@ describe('KadDHT', () => {
567574
})
568575

569576
it('random-walk', function (done) {
570-
this.timeout(40 * 1000)
577+
this.timeout(10 * 1000)
571578

572579
const nDHTs = 20
573580
const tdht = new TestDHT()
574581

575582
// random walk disabled for a manual usage
576-
tdht.spawn(nDHTs, { enabledDiscovery: false }, (err, dhts) => {
583+
tdht.spawn(nDHTs, {
584+
randomWalk: {
585+
enabled: false
586+
}
587+
}, (err, dhts) => {
577588
expect(err).to.not.exist()
578589

579590
series([

0 commit comments

Comments
 (0)