Skip to content
This repository was archived by the owner on Jul 21, 2023. It is now read-only.

Commit 4df2c3f

Browse files
authored
fix: add default query timeouts (#266)
If an `AbortSignal` is not passed to the query manager, create a default timeout to prevent queries from running forever. Also add configurable default timeouts for self-query and table refresh.
1 parent 92dc892 commit 4df2c3f

File tree

5 files changed

+42
-11
lines changed

5 files changed

+42
-11
lines changed

src/constants.js

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,5 +39,14 @@ exports.ALPHA = 3
3939
// How often we look for our closest DHT neighbours
4040
exports.QUERY_SELF_INTERVAL = Number(5 * minute)
4141

42+
// How long to look for our closest DHT neighbours for
43+
exports.QUERY_SELF_TIMEOUT = Number(30 * second)
44+
4245
// How often we try to find new peers
4346
exports.TABLE_REFRESH_INTERVAL = Number(5 * minute)
47+
48+
// How how long to look for new peers for
49+
exports.TABLE_REFRESH_QUERY_TIMEOUT = Number(30 * second)
50+
51+
// When a timeout is not specified, run a query for this long
52+
exports.DEFAULT_QUERY_TIMEOUT = Number(30 * second)

src/kad-dht.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,6 @@ class KadDHT extends EventEmitter {
363363
this._queryManager.start(),
364364
this._network.start(),
365365
this._routingTable.start(),
366-
this._routingTableRefresh.start(),
367366
this._topologyListener.start(),
368367
this._querySelf.start()
369368
])
@@ -372,6 +371,7 @@ class KadDHT extends EventEmitter {
372371
this._bootstrapPeers.map(peerData => this._routingTable.add(peerData.id))
373372
)
374373

374+
await this._routingTableRefresh.start()
375375
await this.refreshRoutingTable()
376376
}
377377

src/query-self.js

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33
const { EventEmitter } = require('events')
44
const take = require('it-take')
55
const length = require('it-length')
6-
const { QUERY_SELF_INTERVAL, K } = require('./constants')
6+
const { QUERY_SELF_INTERVAL, QUERY_SELF_TIMEOUT, K } = require('./constants')
77
const utils = require('./utils')
8+
const { TimeoutController } = require('timeout-abort-controller')
9+
const { anySignal } = require('any-signal')
810

911
/**
1012
* Receives notifications of new peers joining the network that support the DHT protocol
@@ -18,17 +20,19 @@ class QuerySelf extends EventEmitter {
1820
* @param {import('./peer-routing').PeerRouting} params.peerRouting
1921
* @param {number} [params.count] - how many peers to find
2022
* @param {number} [params.interval] - how often to find them
23+
* @param {number} [params.queryTimeout] - how long to let queries run
2124
* @param {boolean} params.lan
2225
*/
23-
constructor ({ peerId, peerRouting, lan, count = K, interval = QUERY_SELF_INTERVAL }) {
26+
constructor ({ peerId, peerRouting, lan, count = K, interval = QUERY_SELF_INTERVAL, queryTimeout = QUERY_SELF_TIMEOUT }) {
2427
super()
2528

2629
this._log = utils.logger(`libp2p:kad-dht:${lan ? 'lan' : 'wan'}:query-self`)
2730
this._running = false
2831
this._peerId = peerId
2932
this._peerRouting = peerRouting
30-
this._count = count
31-
this._interval = interval
33+
this._count = count || K
34+
this._interval = interval || QUERY_SELF_INTERVAL
35+
this._queryTimeout = queryTimeout || QUERY_SELF_TIMEOUT
3236
}
3337

3438
/**
@@ -59,18 +63,20 @@ class QuerySelf extends EventEmitter {
5963
}
6064

6165
async _querySelf () {
66+
const timeoutController = new TimeoutController(this._queryTimeout)
67+
6268
try {
6369
this._controller = new AbortController()
64-
6570
const found = await length(await take(this._peerRouting.getClosestPeers(this._peerId.toBytes(), {
66-
signal: this._controller.signal
71+
signal: anySignal([this._controller.signal, timeoutController.signal])
6772
}), this._count))
6873

6974
this._log('query ran successfully - found %d peers', found)
7075
} catch (/** @type {any} */ err) {
7176
this._log('query error', err)
7277
} finally {
7378
this._timeoutId = setTimeout(this._querySelf.bind(this), this._interval)
79+
timeoutController.clear()
7480
}
7581
}
7682
}

src/query/manager.js

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
'use strict'
22

33
const { AbortController } = require('native-abort-controller')
4+
const { TimeoutController } = require('timeout-abort-controller')
45
const { anySignal } = require('any-signal')
56
const {
6-
ALPHA, K
7+
ALPHA, K, DEFAULT_QUERY_TIMEOUT
78
} = require('../constants')
89
const { toString: uint8ArrayToString } = require('uint8arrays/to-string')
910
const { logger } = require('../utils')
@@ -78,6 +79,14 @@ class QueryManager {
7879
throw new Error('QueryManager not started')
7980
}
8081

82+
let timeoutController
83+
84+
if (!options.signal) {
85+
// don't let queries run forever
86+
timeoutController = new TimeoutController(DEFAULT_QUERY_TIMEOUT)
87+
options.signal = timeoutController.signal
88+
}
89+
8190
// allow us to stop queries on shut down
8291
const abortController = new AbortController()
8392
this._controllers.add(abortController)
@@ -139,6 +148,11 @@ class QueryManager {
139148
}
140149
} finally {
141150
this._controllers.delete(abortController)
151+
152+
if (timeoutController) {
153+
timeoutController.clear()
154+
}
155+
142156
cleanUp.emit('cleanup')
143157
log(`query:done in ${Date.now() - (startTime || 0)}ms`)
144158
}

src/routing-table/refresh.js

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ const PeerId = require('peer-id')
88
const utils = require('../utils')
99
const length = require('it-length')
1010
const { TimeoutController } = require('timeout-abort-controller')
11-
const { TABLE_REFRESH_INTERVAL } = require('../constants')
11+
const { TABLE_REFRESH_INTERVAL, TABLE_REFRESH_QUERY_TIMEOUT } = require('../constants')
1212

1313
/**
1414
* @typedef {import('./types').KBucketPeer} KBucketPeer
@@ -32,12 +32,14 @@ class RoutingTableRefresh {
3232
* @param {import('./').RoutingTable} params.routingTable
3333
* @param {boolean} params.lan
3434
* @param {number} [params.refreshInterval]
35+
* @param {number} [params.refreshQueryTimeout]
3536
*/
36-
constructor ({ peerRouting, routingTable, refreshInterval, lan }) {
37+
constructor ({ peerRouting, routingTable, refreshInterval, refreshQueryTimeout, lan }) {
3738
this._log = utils.logger(`libp2p:kad-dht:${lan ? 'lan' : 'wan'}:routing-table:refresh`)
3839
this._peerRouting = peerRouting
3940
this._routingTable = routingTable
4041
this._refreshInterval = refreshInterval || TABLE_REFRESH_INTERVAL
42+
this._refreshQueryTimeout = refreshQueryTimeout || TABLE_REFRESH_QUERY_TIMEOUT
4143

4244
/** @type {Date[]} */
4345
this.commonPrefixLengthRefreshedAt = []
@@ -135,7 +137,7 @@ class RoutingTableRefresh {
135137

136138
this._log('starting refreshing cpl %s with key %p (routing table size was %s)', cpl, peerId, this._routingTable.kb.count())
137139

138-
const controller = new TimeoutController(60000)
140+
const controller = new TimeoutController(this._refreshQueryTimeout)
139141

140142
try {
141143
const peers = await length(this._peerRouting.getClosestPeers(peerId.toBytes(), { signal: controller.signal }))

0 commit comments

Comments
 (0)