Skip to content

Commit 417968e

Browse files
feat(kad): configurable bucket size
Making bucket size configurable. Currently `K_VALUE` is used by default, and the only way to change the bucket size is to edit the const. Resolves #5389 Pull-Request: #5414.
1 parent 98da34a commit 417968e

File tree

4 files changed

+151
-52
lines changed

4 files changed

+151
-52
lines changed

protocols/kad/CHANGELOG.md

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@
33
- Included multiaddresses of found peers alongside peer IDs in `GetClosestPeers` query results.
44
See [PR 5475](https://github.com/libp2p/rust-libp2p/pull/5475)
55
- Changed `FIND_NODE` response: now includes a list of closest peers when querying the recipient peer ID. Previously, this request yielded an empty response.
6-
See [PR 5270](https://github.com/libp2p/rust-libp2p/pull/5270)
7-
- Update to DHT republish interval and expiration time defaults to 22h and 48h respectively, rationale in [libp2p/specs#451](https://github.com/libp2p/specs/pull/451)
8-
See [PR 3230](https://github.com/libp2p/rust-libp2p/pull/3230)
6+
See [PR 5270](https://github.com/libp2p/rust-libp2p/pull/5270).
7+
- Update to DHT republish interval and expiration time defaults to 22h and 48h respectively, rationale in [libp2p/specs#451](https://github.com/libp2p/specs/pull/451).
8+
See [PR 3230](https://github.com/libp2p/rust-libp2p/pull/3230).
99
- Use default dial conditions more consistently.
1010
See [PR 4957](https://github.com/libp2p/rust-libp2p/pull/4957)
1111
- QueryClose progress whenever closer in range, instead of having to be the closest.
@@ -19,6 +19,8 @@
1919
See [PR 5148](https://github.com/libp2p/rust-libp2p/pull/5148).
2020
- Derive `Copy` for `kbucket::key::Key<T>`.
2121
See [PR 5317](https://github.com/libp2p/rust-libp2p/pull/5317).
22+
`KBucket` size can now be modified without changing the `K_VALUE`.
23+
See [PR 5414](https://github.com/libp2p/rust-libp2p/pull/5414).
2224
- Use `web-time` instead of `instant`.
2325
See [PR 5347](https://github.com/libp2p/rust-libp2p/pull/5347).
2426
<!-- Update to libp2p-swarm v0.45.0 -->

protocols/kad/src/behaviour.rs

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ mod test;
2424

2525
use crate::addresses::Addresses;
2626
use crate::handler::{Handler, HandlerEvent, HandlerIn, RequestId};
27-
use crate::kbucket::{self, Distance, KBucketsTable, NodeStatus};
27+
use crate::kbucket::{self, Distance, KBucketConfig, KBucketsTable, NodeStatus};
2828
use crate::protocol::{ConnectionType, KadPeer, ProtocolConfig};
2929
use crate::query::{Query, QueryConfig, QueryId, QueryPool, QueryPoolState};
3030
use crate::record::{
@@ -172,7 +172,7 @@ pub enum StoreInserts {
172172
/// The configuration is consumed by [`Behaviour::new`].
173173
#[derive(Debug, Clone)]
174174
pub struct Config {
175-
kbucket_pending_timeout: Duration,
175+
kbucket_config: KBucketConfig,
176176
query_config: QueryConfig,
177177
protocol_config: ProtocolConfig,
178178
record_ttl: Option<Duration>,
@@ -215,7 +215,7 @@ impl Config {
215215
/// Builds a new `Config` with the given protocol name.
216216
pub fn new(protocol_name: StreamProtocol) -> Self {
217217
Config {
218-
kbucket_pending_timeout: Duration::from_secs(60),
218+
kbucket_config: KBucketConfig::default(),
219219
query_config: QueryConfig::default(),
220220
protocol_config: ProtocolConfig::new(protocol_name),
221221
record_ttl: Some(Duration::from_secs(48 * 60 * 60)),
@@ -424,6 +424,24 @@ impl Config {
424424
self
425425
}
426426

427+
/// Sets the configuration for the k-buckets.
428+
///
429+
/// * Default to K_VALUE.
430+
pub fn set_kbucket_size(&mut self, size: NonZeroUsize) -> &mut Self {
431+
self.kbucket_config.set_bucket_size(size);
432+
self
433+
}
434+
435+
/// Sets the timeout duration after creation of a pending entry after which
436+
/// it becomes eligible for insertion into a full bucket, replacing the
437+
/// least-recently (dis)connected node.
438+
///
439+
/// * Default to `60` s.
440+
pub fn set_kbucket_pending_timeout(&mut self, timeout: Duration) -> &mut Self {
441+
self.kbucket_config.set_pending_timeout(timeout);
442+
self
443+
}
444+
427445
/// Sets the time to wait before calling [`Behaviour::bootstrap`] after a new peer is inserted in the routing table.
428446
/// This prevent cascading bootstrap requests when multiple peers are inserted into the routing table "at the same time".
429447
/// This also allows to wait a little bit for other potential peers to be inserted into the routing table before
@@ -481,7 +499,7 @@ where
481499
Behaviour {
482500
store,
483501
caching: config.caching,
484-
kbuckets: KBucketsTable::new(local_key, config.kbucket_pending_timeout),
502+
kbuckets: KBucketsTable::new(local_key, config.kbucket_config),
485503
kbucket_inserts: config.kbucket_inserts,
486504
protocol_config: config.protocol_config,
487505
record_filtering: config.record_filtering,

protocols/kad/src/kbucket.rs

Lines changed: 68 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -75,22 +75,58 @@ mod key;
7575
pub use bucket::NodeStatus;
7676
pub use entry::*;
7777

78-
use arrayvec::ArrayVec;
7978
use bucket::KBucket;
8079
use std::collections::VecDeque;
80+
use std::num::NonZeroUsize;
8181
use std::time::Duration;
8282
use web_time::Instant;
8383

8484
/// Maximum number of k-buckets.
8585
const NUM_BUCKETS: usize = 256;
8686

87+
/// The configuration for `KBucketsTable`.
88+
#[derive(Debug, Clone, Copy)]
89+
pub(crate) struct KBucketConfig {
90+
/// Maximal number of nodes that a bucket can contain.
91+
bucket_size: usize,
92+
/// Specifies the duration after creation of a [`PendingEntry`] after which
93+
/// it becomes eligible for insertion into a full bucket, replacing the
94+
/// least-recently (dis)connected node.
95+
pending_timeout: Duration,
96+
}
97+
98+
impl Default for KBucketConfig {
99+
fn default() -> Self {
100+
KBucketConfig {
101+
bucket_size: K_VALUE.get(),
102+
pending_timeout: Duration::from_secs(60),
103+
}
104+
}
105+
}
106+
107+
impl KBucketConfig {
108+
/// Modifies the maximal number of nodes that a bucket can contain.
109+
pub(crate) fn set_bucket_size(&mut self, bucket_size: NonZeroUsize) {
110+
self.bucket_size = bucket_size.get();
111+
}
112+
113+
/// Modifies the duration after creation of a [`PendingEntry`] after which
114+
/// it becomes eligible for insertion into a full bucket, replacing the
115+
/// least-recently (dis)connected node.
116+
pub(crate) fn set_pending_timeout(&mut self, pending_timeout: Duration) {
117+
self.pending_timeout = pending_timeout;
118+
}
119+
}
120+
87121
/// A `KBucketsTable` represents a Kademlia routing table.
88122
#[derive(Debug, Clone)]
89123
pub(crate) struct KBucketsTable<TKey, TVal> {
90124
/// The key identifying the local peer that owns the routing table.
91125
local_key: TKey,
92126
/// The buckets comprising the routing table.
93127
buckets: Vec<KBucket<TKey, TVal>>,
128+
/// The maximal number of nodes that a bucket can contain.
129+
bucket_size: usize,
94130
/// The list of evicted entries that have been replaced with pending
95131
/// entries since the last call to [`KBucketsTable::take_applied_pending`].
96132
applied_pending: VecDeque<AppliedPending<TKey, TVal>>,
@@ -151,17 +187,12 @@ where
151187
TVal: Clone,
152188
{
153189
/// Creates a new, empty Kademlia routing table with entries partitioned
154-
/// into buckets as per the Kademlia protocol.
155-
///
156-
/// The given `pending_timeout` specifies the duration after creation of
157-
/// a [`PendingEntry`] after which it becomes eligible for insertion into
158-
/// a full bucket, replacing the least-recently (dis)connected node.
159-
pub(crate) fn new(local_key: TKey, pending_timeout: Duration) -> Self {
190+
/// into buckets as per the Kademlia protocol using the provided config.
191+
pub(crate) fn new(local_key: TKey, config: KBucketConfig) -> Self {
160192
KBucketsTable {
161193
local_key,
162-
buckets: (0..NUM_BUCKETS)
163-
.map(|_| KBucket::new(pending_timeout))
164-
.collect(),
194+
buckets: (0..NUM_BUCKETS).map(|_| KBucket::new(config)).collect(),
195+
bucket_size: config.bucket_size,
165196
applied_pending: VecDeque::new(),
166197
}
167198
}
@@ -247,13 +278,16 @@ where
247278
T: AsRef<KeyBytes>,
248279
{
249280
let distance = self.local_key.as_ref().distance(target);
281+
let bucket_size = self.bucket_size;
250282
ClosestIter {
251283
target,
252284
iter: None,
253285
table: self,
254286
buckets_iter: ClosestBucketsIter::new(distance),
255-
fmap: |b: &KBucket<TKey, _>| -> ArrayVec<_, { K_VALUE.get() }> {
256-
b.iter().map(|(n, _)| n.key.clone()).collect()
287+
fmap: move |b: &KBucket<TKey, _>| -> Vec<_> {
288+
let mut vec = Vec::with_capacity(bucket_size);
289+
vec.extend(b.iter().map(|(n, _)| n.key.clone()));
290+
vec
257291
},
258292
}
259293
}
@@ -269,13 +303,15 @@ where
269303
TVal: Clone,
270304
{
271305
let distance = self.local_key.as_ref().distance(target);
306+
let bucket_size = self.bucket_size;
272307
ClosestIter {
273308
target,
274309
iter: None,
275310
table: self,
276311
buckets_iter: ClosestBucketsIter::new(distance),
277-
fmap: |b: &KBucket<_, TVal>| -> ArrayVec<_, { K_VALUE.get() }> {
312+
fmap: move |b: &KBucket<_, TVal>| -> Vec<_> {
278313
b.iter()
314+
.take(bucket_size)
279315
.map(|(n, status)| EntryView {
280316
node: n.clone(),
281317
status,
@@ -324,7 +360,7 @@ struct ClosestIter<'a, TTarget, TKey, TVal, TMap, TOut> {
324360
/// distance of the local key to the target.
325361
buckets_iter: ClosestBucketsIter,
326362
/// The iterator over the entries in the currently traversed bucket.
327-
iter: Option<arrayvec::IntoIter<TOut, { K_VALUE.get() }>>,
363+
iter: Option<std::vec::IntoIter<TOut>>,
328364
/// The projection function / mapping applied on each bucket as
329365
/// it is encountered, producing the next `iter`ator.
330366
fmap: TMap,
@@ -429,7 +465,7 @@ where
429465
TTarget: AsRef<KeyBytes>,
430466
TKey: Clone + AsRef<KeyBytes>,
431467
TVal: Clone,
432-
TMap: Fn(&KBucket<TKey, TVal>) -> ArrayVec<TOut, { K_VALUE.get() }>,
468+
TMap: Fn(&KBucket<TKey, TVal>) -> Vec<TOut>,
433469
TOut: AsRef<KeyBytes>,
434470
{
435471
type Item = TOut;
@@ -535,11 +571,14 @@ mod tests {
535571
fn arbitrary(g: &mut Gen) -> TestTable {
536572
let local_key = Key::from(PeerId::random());
537573
let timeout = Duration::from_secs(g.gen_range(1..360));
538-
let mut table = TestTable::new(local_key.into(), timeout);
574+
let mut config = KBucketConfig::default();
575+
config.set_pending_timeout(timeout);
576+
let bucket_size = config.bucket_size;
577+
let mut table = TestTable::new(local_key.into(), config);
539578
let mut num_total = g.gen_range(0..100);
540579
for (i, b) in &mut table.buckets.iter_mut().enumerate().rev() {
541580
let ix = BucketIndex(i);
542-
let num = g.gen_range(0..usize::min(K_VALUE.get(), num_total) + 1);
581+
let num = g.gen_range(0..usize::min(bucket_size, num_total) + 1);
543582
num_total -= num;
544583
for _ in 0..num {
545584
let distance = ix.rand_distance(&mut rand::thread_rng());
@@ -560,7 +599,9 @@ mod tests {
560599
fn buckets_are_non_overlapping_and_exhaustive() {
561600
let local_key = Key::from(PeerId::random());
562601
let timeout = Duration::from_secs(0);
563-
let mut table = KBucketsTable::<KeyBytes, ()>::new(local_key.into(), timeout);
602+
let mut config = KBucketConfig::default();
603+
config.set_pending_timeout(timeout);
604+
let mut table = KBucketsTable::<KeyBytes, ()>::new(local_key.into(), config);
564605

565606
let mut prev_max = U256::from(0);
566607

@@ -577,7 +618,9 @@ mod tests {
577618
fn bucket_contains_range() {
578619
fn prop(ix: u8) {
579620
let index = BucketIndex(ix as usize);
580-
let mut bucket = KBucket::<Key<PeerId>, ()>::new(Duration::from_secs(0));
621+
let mut config = KBucketConfig::default();
622+
config.set_pending_timeout(Duration::from_secs(0));
623+
let mut bucket = KBucket::<Key<PeerId>, ()>::new(config);
581624
let bucket_ref = KBucketRef {
582625
index,
583626
bucket: &mut bucket,
@@ -623,7 +666,7 @@ mod tests {
623666
let local_key = Key::from(PeerId::random());
624667
let other_id = Key::from(PeerId::random());
625668

626-
let mut table = KBucketsTable::<_, ()>::new(local_key, Duration::from_secs(5));
669+
let mut table = KBucketsTable::<_, ()>::new(local_key, KBucketConfig::default());
627670
if let Some(Entry::Absent(entry)) = table.entry(&other_id) {
628671
match entry.insert((), NodeStatus::Connected) {
629672
InsertResult::Inserted => (),
@@ -641,15 +684,15 @@ mod tests {
641684
#[test]
642685
fn entry_self() {
643686
let local_key = Key::from(PeerId::random());
644-
let mut table = KBucketsTable::<_, ()>::new(local_key, Duration::from_secs(5));
687+
let mut table = KBucketsTable::<_, ()>::new(local_key, KBucketConfig::default());
645688

646689
assert!(table.entry(&local_key).is_none())
647690
}
648691

649692
#[test]
650693
fn closest() {
651694
let local_key = Key::from(PeerId::random());
652-
let mut table = KBucketsTable::<_, ()>::new(local_key, Duration::from_secs(5));
695+
let mut table = KBucketsTable::<_, ()>::new(local_key, KBucketConfig::default());
653696
let mut count = 0;
654697
loop {
655698
if count == 100 {
@@ -684,7 +727,9 @@ mod tests {
684727
#[test]
685728
fn applied_pending() {
686729
let local_key = Key::from(PeerId::random());
687-
let mut table = KBucketsTable::<_, ()>::new(local_key, Duration::from_millis(1));
730+
let mut config = KBucketConfig::default();
731+
config.set_pending_timeout(Duration::from_millis(1));
732+
let mut table = KBucketsTable::<_, ()>::new(local_key, config);
688733
let expected_applied;
689734
let full_bucket_index;
690735
loop {

0 commit comments

Comments
 (0)