Skip to content

Commit 3be7104

Browse files
fix(kad): improve memory allocation when iterating over kbuckets
Proposal to fix #5712. I have changed to `ClosestIter` structure to only allocate when `kbucket_size` is higher than `K_VALUE` and only once along the life of `ClosestIter`. I think I did not break anything but I would really like some experienced people with Kademlia to take a look (@guillaumemichel 😉). Pull-Request: #5715.
1 parent 54d7f21 commit 3be7104

File tree

3 files changed

+76
-37
lines changed

3 files changed

+76
-37
lines changed

protocols/kad/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
- Expose Distance private field U256 to public.
44
See [PR 5705](https://github.com/libp2p/rust-libp2p/pull/5705).
5+
- Fix systematic memory allocation when iterating over `KBuckets`.
6+
See [PR 5715](https://github.com/libp2p/rust-libp2p/pull/5715).
57

68
## 0.47.0
79

protocols/kad/src/behaviour.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,8 @@ impl Config {
434434
/// Sets the configuration for the k-buckets.
435435
///
436436
/// * Default to K_VALUE.
437+
///
438+
/// **WARNING**: setting a `size` higher that `K_VALUE` may imply additional memory allocations.
437439
pub fn set_kbucket_size(&mut self, size: NonZeroUsize) -> &mut Self {
438440
self.kbucket_config.set_bucket_size(size);
439441
self

protocols/kad/src/kbucket.rs

Lines changed: 72 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ use std::{collections::VecDeque, num::NonZeroUsize, time::Duration};
7777
use bucket::KBucket;
7878
pub use bucket::NodeStatus;
7979
pub use entry::*;
80+
use smallvec::SmallVec;
8081
use web_time::Instant;
8182

8283
/// Maximum number of k-buckets.
@@ -282,11 +283,8 @@ where
282283
iter: None,
283284
table: self,
284285
buckets_iter: ClosestBucketsIter::new(distance),
285-
fmap: move |b: &KBucket<TKey, _>| -> Vec<_> {
286-
let mut vec = Vec::with_capacity(bucket_size);
287-
vec.extend(b.iter().map(|(n, _)| n.key.clone()));
288-
vec
289-
},
286+
fmap: |(n, _status): (&Node<TKey, TVal>, NodeStatus)| n.key.clone(),
287+
bucket_size,
290288
}
291289
}
292290

@@ -307,15 +305,11 @@ where
307305
iter: None,
308306
table: self,
309307
buckets_iter: ClosestBucketsIter::new(distance),
310-
fmap: move |b: &KBucket<_, TVal>| -> Vec<_> {
311-
b.iter()
312-
.take(bucket_size)
313-
.map(|(n, status)| EntryView {
314-
node: n.clone(),
315-
status,
316-
})
317-
.collect()
308+
fmap: |(n, status): (&Node<TKey, TVal>, NodeStatus)| EntryView {
309+
node: n.clone(),
310+
status,
318311
},
312+
bucket_size,
319313
}
320314
}
321315

@@ -358,10 +352,12 @@ struct ClosestIter<'a, TTarget, TKey, TVal, TMap, TOut> {
358352
/// distance of the local key to the target.
359353
buckets_iter: ClosestBucketsIter,
360354
/// The iterator over the entries in the currently traversed bucket.
361-
iter: Option<std::vec::IntoIter<TOut>>,
355+
iter: Option<ClosestIterBuffer<TOut>>,
362356
/// The projection function / mapping applied on each bucket as
363357
/// it is encountered, producing the next `iter`ator.
364358
fmap: TMap,
359+
/// The maximal number of nodes that a bucket can contain.
360+
bucket_size: usize,
365361
}
366362

367363
/// An iterator over the bucket indices, in the order determined by the `Distance` of
@@ -463,41 +459,80 @@ where
463459
TTarget: AsRef<KeyBytes>,
464460
TKey: Clone + AsRef<KeyBytes>,
465461
TVal: Clone,
466-
TMap: Fn(&KBucket<TKey, TVal>) -> Vec<TOut>,
462+
TMap: Fn((&Node<TKey, TVal>, NodeStatus)) -> TOut,
467463
TOut: AsRef<KeyBytes>,
468464
{
469465
type Item = TOut;
470466

471467
fn next(&mut self) -> Option<Self::Item> {
472468
loop {
473-
match &mut self.iter {
474-
Some(iter) => match iter.next() {
475-
Some(k) => return Some(k),
476-
None => self.iter = None,
477-
},
478-
None => {
479-
if let Some(i) = self.buckets_iter.next() {
480-
let bucket = &mut self.table.buckets[i.get()];
481-
if let Some(applied) = bucket.apply_pending() {
482-
self.table.applied_pending.push_back(applied)
483-
}
484-
let mut v = (self.fmap)(bucket);
485-
v.sort_by(|a, b| {
486-
self.target
487-
.as_ref()
488-
.distance(a.as_ref())
489-
.cmp(&self.target.as_ref().distance(b.as_ref()))
490-
});
491-
self.iter = Some(v.into_iter());
492-
} else {
493-
return None;
494-
}
469+
let (mut buffer, bucket_index) = if let Some(mut iter) = self.iter.take() {
470+
if let Some(next) = iter.next() {
471+
self.iter = Some(iter);
472+
return Some(next);
495473
}
474+
475+
let bucket_index = self.buckets_iter.next()?;
476+
477+
// Reusing the same buffer so if there were any allocation, it only happen once over
478+
// a `ClosestIter` life.
479+
iter.buffer.clear();
480+
481+
(iter.buffer, bucket_index)
482+
} else {
483+
let bucket_index = self.buckets_iter.next()?;
484+
485+
// Allocation only occurs if `kbucket_size` is greater than `K_VALUE`.
486+
(SmallVec::with_capacity(self.bucket_size), bucket_index)
487+
};
488+
489+
let bucket = &mut self.table.buckets[bucket_index.get()];
490+
if let Some(applied) = bucket.apply_pending() {
491+
self.table.applied_pending.push_back(applied)
496492
}
493+
494+
buffer.extend(
495+
bucket
496+
.iter()
497+
.take(self.bucket_size)
498+
.map(|e| (self.fmap)(e))
499+
.map(Some),
500+
);
501+
buffer.sort_by(|a, b| {
502+
let a = a.as_ref().expect("just initialized");
503+
let b = b.as_ref().expect("just initialized");
504+
self.target
505+
.as_ref()
506+
.distance(a.as_ref())
507+
.cmp(&self.target.as_ref().distance(b.as_ref()))
508+
});
509+
510+
self.iter = Some(ClosestIterBuffer::new(buffer));
497511
}
498512
}
499513
}
500514

515+
struct ClosestIterBuffer<TOut> {
516+
buffer: SmallVec<[Option<TOut>; K_VALUE.get()]>,
517+
index: usize,
518+
}
519+
520+
impl<TOut> ClosestIterBuffer<TOut> {
521+
fn new(buffer: SmallVec<[Option<TOut>; K_VALUE.get()]>) -> Self {
522+
Self { buffer, index: 0 }
523+
}
524+
}
525+
526+
impl<TOut> Iterator for ClosestIterBuffer<TOut> {
527+
type Item = TOut;
528+
529+
fn next(&mut self) -> Option<Self::Item> {
530+
let entry = self.buffer.get_mut(self.index)?;
531+
self.index += 1;
532+
entry.take()
533+
}
534+
}
535+
501536
/// A reference to a bucket.
502537
pub struct KBucketRef<'a, TKey, TVal> {
503538
index: BucketIndex,

0 commit comments

Comments
 (0)