Skip to content

Commit d63dab1

Browse files
authored
feat(peer-store): Limit number of peers in memory store
To avoid unbounded memory growth, limit the number of peers stored in the memory store. We use an LRU cache, pushing the peer back if it is updated, but not if it is read from. This is additional to the LRU cache per peer, which stores addresses per peer. Pull-Request: #6150.
1 parent 79fc6f1 commit d63dab1

File tree

1 file changed

+21
-8
lines changed

1 file changed

+21
-8
lines changed

misc/peer-store/src/memory_store.rs

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
//! ```
1010
1111
use std::{
12-
collections::{HashMap, VecDeque},
12+
collections::VecDeque,
1313
num::NonZeroUsize,
1414
task::{Poll, Waker},
1515
};
@@ -49,10 +49,9 @@ pub enum Event {
4949

5050
/// A in-memory store that uses LRU cache for bounded storage of addresses
5151
/// and a frequency-based ordering of addresses.
52-
#[derive(Default)]
5352
pub struct MemoryStore<T = ()> {
5453
/// The internal store.
55-
records: HashMap<PeerId, PeerRecord<T>>,
54+
records: LruCache<PeerId, PeerRecord<T>>,
5655
/// Events to emit to [`Behaviour`](crate::Behaviour) and [`Swarm`](libp2p_swarm::Swarm).
5756
pending_events: VecDeque<Event>,
5857
/// Config of the store.
@@ -65,8 +64,8 @@ impl<T> MemoryStore<T> {
6564
/// Create a new [`MemoryStore`] with the given config.
6665
pub fn new(config: Config) -> Self {
6766
Self {
67+
records: LruCache::new(config.peer_capacity().get()),
6868
config,
69-
records: HashMap::new(),
7069
pending_events: VecDeque::default(),
7170
waker: None,
7271
}
@@ -137,7 +136,7 @@ impl<T> MemoryStore<T> {
137136

138137
/// Get a reference to a peer's custom data.
139138
pub fn get_custom_data(&self, peer: &PeerId) -> Option<&T> {
140-
self.records.get(peer).and_then(|r| r.get_custom_data())
139+
self.records.peek(peer).and_then(|r| r.get_custom_data())
141140
}
142141

143142
/// Take ownership of the internal data, leaving `None` in its place.
@@ -240,7 +239,7 @@ impl<T> Store for MemoryStore<T> {
240239
}
241240

242241
fn addresses_of_peer(&self, peer: &PeerId) -> Option<impl Iterator<Item = &Multiaddr>> {
243-
self.records.get(peer).map(|record| record.addresses())
242+
self.records.peek(peer).map(|record| record.addresses())
244243
}
245244

246245
fn poll(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Self::Event> {
@@ -257,26 +256,40 @@ impl<T> Store for MemoryStore<T> {
257256
/// Config for [`MemoryStore`]. The available options are documented via their setters.
258257
#[derive(Debug, Clone)]
259258
pub struct Config {
259+
peer_capacity: NonZeroUsize,
260260
record_capacity: NonZeroUsize,
261261
remove_addr_on_dial_error: bool,
262262
}
263263

264264
impl Default for Config {
265265
fn default() -> Self {
266266
Self {
267+
peer_capacity: NonZeroUsize::try_from(1000).expect("1000 > 0"),
267268
record_capacity: NonZeroUsize::try_from(8).expect("8 > 0"),
268269
remove_addr_on_dial_error: true,
269270
}
270271
}
271272
}
272273

273274
impl Config {
275+
pub fn peer_capacity(&self) -> &NonZeroUsize {
276+
&self.peer_capacity
277+
}
278+
/// The capacity of the address store per peer.
279+
///
280+
/// The least recently updated peer will be discarded to make room for a new peer.
281+
///
282+
/// `1000` by default.
283+
pub fn set_peer_capacity(mut self, capacity: NonZeroUsize) -> Self {
284+
self.peer_capacity = capacity;
285+
self
286+
}
274287
pub fn record_capacity(&self) -> &NonZeroUsize {
275288
&self.record_capacity
276289
}
277-
/// The capacity of an address store.
290+
/// The capacity of the address store per peer.
278291
///
279-
/// The least active address will be discarded to make room for new address.
292+
/// The least active address will be discarded to make room for a new address.
280293
///
281294
/// `8` by default.
282295
pub fn set_record_capacity(mut self, capacity: NonZeroUsize) -> Self {

0 commit comments

Comments
 (0)