Skip to content

Commit 8c9739a

Browse files
committed
Update seqno cache to use dashmap internally
Signed-off-by: Lee Smet <lee.smet@hotmail.com>
1 parent f58df6c commit 8c9739a

File tree

1 file changed

+27
-46
lines changed

1 file changed

+27
-46
lines changed

mycelium/src/seqno_cache.rs

Lines changed: 27 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@
33
//! relevant updates.
44
55
use std::{
6-
collections::HashMap,
7-
sync::{Arc, RwLock},
6+
sync::Arc,
87
time::{Duration, Instant},
98
};
109

10+
use dashmap::DashMap;
1111
use tokio::time::MissedTickBehavior;
1212
use tracing::{debug, trace};
1313

@@ -55,33 +55,31 @@ struct SeqnoForwardInfo {
5555
#[derive(Clone)]
5656
pub struct SeqnoCache {
5757
/// Actual cache wrapped in an Arc to make it sharaeble.
58-
inner: Arc<SeqnoCacheInner>,
59-
}
60-
61-
/// Actual implementation of the cache.
62-
struct SeqnoCacheInner {
63-
/// Actual cache, maps requests to the peers who originated them. The local node is not
64-
/// represented, since it always processes the update.
65-
cache: RwLock<HashMap<SeqnoRequestCacheKey, SeqnoForwardInfo>>,
58+
cache: Arc<DashMap<SeqnoRequestCacheKey, SeqnoForwardInfo, ahash::RandomState>>,
6659
}
6760

6861
impl SeqnoCache {
6962
/// Create a new [`SeqnoCache`].
7063
pub fn new() -> Self {
7164
trace!(capacity = 0, "Creating new seqno cache");
7265

73-
let inner = Arc::new(SeqnoCacheInner::new());
66+
let cache = Arc::new(DashMap::with_hasher_and_shard_amount(
67+
ahash::RandomState::new(),
68+
// This number has been chosen completely at random
69+
1024,
70+
));
71+
let sc = Self { cache };
72+
7473
// Spawn background cleanup task.
75-
tokio::spawn(inner.clone().sweep_entries());
74+
tokio::spawn(sc.clone().sweep_entries());
7675

77-
Self { inner }
76+
sc
7877
}
7978

8079
/// Record a forwarded seqno request to a given target. Also keep track of the origin of the
8180
/// request. If the local node generated the request, source must be [`None`]
8281
pub fn forward(&self, request: SeqnoRequestCacheKey, target: Peer, source: Option<Peer>) {
83-
let mut cache = self.inner.cache.write().unwrap();
84-
let info = cache.entry(request).or_default();
82+
let mut info = self.cache.entry(request).or_default();
8583
info.last_sent = Instant::now();
8684
if !info.targets.contains(&target) {
8785
info.targets.push(target);
@@ -104,36 +102,25 @@ impl SeqnoCache {
104102
/// Get a list of all peers which we've already sent the given seqno request to, as well as
105103
/// when we've last sent a request.
106104
pub fn info(&self, request: &SeqnoRequestCacheKey) -> Option<(Instant, Vec<Peer>)> {
107-
self.inner
108-
.cache
109-
.read()
110-
.unwrap()
105+
self.cache
111106
.get(request)
112107
.map(|info| (info.last_sent, info.targets.clone()))
113108
}
114109

115110
/// Removes forwarding info from the seqno cache. If forwarding info is available, the source
116111
/// peers (peers which requested us to forward this request) are returned.
117112
pub fn remove(&self, request: &SeqnoRequestCacheKey) -> Option<Vec<Peer>> {
118-
self.inner
119-
.cache
120-
.write()
121-
.unwrap()
122-
.remove(request)
123-
.map(|info| info.sources)
113+
self.cache.remove(request).map(|(_, info)| info.sources)
124114
}
125-
}
126115

127-
impl SeqnoCacheInner {
128-
/// Create a new empty `SeqnoCacheInner`.
129-
fn new() -> Self {
130-
Self {
131-
cache: RwLock::new(HashMap::new()),
132-
}
116+
/// Get forwarding info from the seqno cache. If forwarding info is available, the source
117+
/// peers (peers which requested us to forward this request) are returned.
118+
pub fn get(&self, request: &SeqnoRequestCacheKey) -> Option<Vec<Peer>> {
119+
self.cache.get(request).map(|info| info.sources.clone())
133120
}
134121

135122
/// Periodic task to clear old entries for which no reply came in.
136-
async fn sweep_entries(self: Arc<Self>) {
123+
async fn sweep_entries(self) {
137124
let mut interval = tokio::time::interval(SEQNO_DEDUP_TTL);
138125
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
139126

@@ -142,15 +129,15 @@ impl SeqnoCacheInner {
142129

143130
debug!("Cleaning up expired seqno requests from seqno cache");
144131

145-
let mut cache = self.cache.write().unwrap();
146-
let prev_entries = cache.len();
147-
let prev_cap = cache.capacity();
148-
cache.retain(|_, info| info.first_sent.elapsed() <= SEQNO_DEDUP_TTL);
149-
cache.shrink_to_fit();
132+
let prev_entries = self.cache.len();
133+
let prev_cap = self.cache.capacity();
134+
self.cache
135+
.retain(|_, info| info.first_sent.elapsed() <= SEQNO_DEDUP_TTL);
136+
self.cache.shrink_to_fit();
150137

151138
debug!(
152-
cleaned_entries = prev_entries - cache.len(),
153-
removed_capacity = prev_cap - cache.capacity(),
139+
cleaned_entries = prev_entries - self.cache.len(),
140+
removed_capacity = prev_cap - self.cache.capacity(),
154141
"Cleaned up stale seqno request cache entries"
155142
);
156143
}
@@ -163,12 +150,6 @@ impl Default for SeqnoCache {
163150
}
164151
}
165152

166-
impl Default for SeqnoCacheInner {
167-
fn default() -> Self {
168-
Self::new()
169-
}
170-
}
171-
172153
impl Default for SeqnoForwardInfo {
173154
fn default() -> Self {
174155
Self {

0 commit comments

Comments
 (0)