Skip to content

Commit c6937f1

Browse files
committed
feat: add lru memcache layer
1 parent c1304d1 commit c6937f1

File tree

6 files changed

+230
-17
lines changed

6 files changed

+230
-17
lines changed

Cargo.lock

Lines changed: 20 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ serde = {version = "1.0.125", features = ["derive"]}
2222
tokio = {version = "1.5.0", features = ["fs", "io-util"]}
2323
bytes = "1.0.1"
2424
async-trait = "0.1.50"
25+
lru = "0.6.5"
26+
parking_lot = "0.11.1"
2527

2628
[dev-dependencies]
2729
tokio = {version = "1.5.0", features = ["full"]}

src/builder.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ impl CacheBuilder {
4747
dir_depth: 2,
4848
track_access: false,
4949

50+
// default to no in-mem lru
51+
lru_size: 0,
52+
5053
// default buffer sizes to 8kb
5154
rbuff_sz: 8192,
5255
wbuff_sz: 8192,
@@ -71,6 +74,17 @@ impl CacheBuilder {
7174
self
7275
}
7376

77+
/// Sets the maximum size (in bytes) for the in-memory Least-Recently-Used cache.
78+
///
79+
/// **Default is `0`**
80+
///
81+
/// Setting this above `0` will create an in-memory store for recently-used entries. This will
82+
/// use up more RAM, but will also significantly increase speed on memcache `HIT`s.
83+
pub fn memory_lru_max_size(mut self, size: usize) -> Self {
84+
self.opts.lru_size = size;
85+
self
86+
}
87+
7488
/// Changes the in-memory buffer sizes for reading and writing `fs` operations.
7589
///
7690
/// **Default is `8kb` (`8196`)**

src/cache.rs

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::{ForcepError, MetaDb, Metadata, Result};
1+
use crate::{mem_cache::MemCache, ForcepError, MetaDb, Metadata, Result};
22
use bytes::Bytes;
33
use std::io;
44
use std::path;
@@ -25,6 +25,9 @@ pub(crate) struct Options {
2525
pub(crate) dir_depth: u8,
2626
pub(crate) track_access: bool,
2727

28+
// maximum size of the in-memory lru in bytes
29+
pub(crate) lru_size: usize,
30+
2831
// read and write buffer sizes
2932
pub(crate) rbuff_sz: usize,
3033
pub(crate) wbuff_sz: usize,
@@ -41,6 +44,12 @@ pub(crate) struct Options {
4144
/// This cache can evict items with a number of different eviction algorithms. To see more, see
4245
/// [`evict_with`] and the [`evictors`] module.
4346
///
47+
/// # Memory Cache
48+
///
49+
/// An in-memory cache can be optionally enabled as a layer over the regular on-disk cache. The
50+
/// memcache provides fast `HIT`s for recently used entries, circumventing filesystem operations
51+
/// altogether. To enable, use the [`CacheBuilder`]`::memory_lru_max_size` method.
52+
///
4453
/// # Examples
4554
///
4655
/// ```rust
@@ -57,9 +66,11 @@ pub(crate) struct Options {
5766
///
5867
/// [`evict_with`]: #method.evict_with
5968
/// [`evictors`]: crate::evictors
69+
/// [`CacheBuilder`]: crate::CacheBuilder
6070
#[derive(Debug)]
6171
pub struct Cache {
6272
meta: MetaDb,
73+
mem: MemCache,
6374
opts: Options,
6475
}
6576

@@ -97,6 +108,7 @@ impl Cache {
97108
meta_path.push("index");
98109
Ok(Self {
99110
meta: MetaDb::new(&meta_path)?,
111+
mem: MemCache::new(opts.lru_size),
100112
opts,
101113
})
102114
}
@@ -120,6 +132,15 @@ impl Cache {
120132
buf
121133
}
122134

135+
/// Tracks the access for a cache entry if the option is enabled
136+
#[inline]
137+
fn track_access_for(&self, k: &[u8]) -> Result<()> {
138+
if self.opts.track_access {
139+
self.meta.track_access_for(k)?;
140+
}
141+
Ok(())
142+
}
143+
123144
/// Reads an entry from the database, returning a vector of bytes that represent the entry.
124145
///
125146
/// # Not Found
@@ -152,9 +173,15 @@ impl Cache {
152173
/// ```
153174
pub async fn read<K: AsRef<[u8]>>(&self, key: K) -> Result<Bytes> {
154175
use tokio::io::AsyncReadExt;
176+
let k = key.as_ref();
177+
178+
// look in the memory cache to see if it's there and return if it is
179+
if let Some(val) = self.mem.get(k) {
180+
return self.track_access_for(k).map(|_| val);
181+
}
155182

156183
let file = {
157-
let path = self.path_from_key(key.as_ref());
184+
let path = self.path_from_key(k);
158185
afs::OpenOptions::new()
159186
.read(true)
160187
.open(&path)
@@ -175,12 +202,10 @@ impl Cache {
175202
.await
176203
.map_err(ForcepError::Io)?;
177204

178-
// track this access if the flag is set
179-
if self.opts.track_access {
180-
self.meta.track_access_for(key.as_ref())?;
181-
}
182-
183-
Ok(Bytes::from(buf))
205+
self.track_access_for(k)?;
206+
let bytes = Bytes::from(buf);
207+
self.mem.put(k, Bytes::clone(&bytes));
208+
Ok(bytes)
184209
}
185210

186211
/// Writes an entry with the specified key to the cache database. This will replace the
@@ -227,6 +252,9 @@ impl Cache {
227252
.await
228253
.map_err(ForcepError::Io)?;
229254

255+
if !self.mem.is_nil() {
256+
self.mem.put(key, Bytes::from(Vec::from(value)));
257+
}
230258
self.meta.insert_metadata_for(key, value)
231259
}
232260

src/lib.rs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
//!
1414
//! - Asynchronous APIs
1515
//! - Fast and reliable reading/writing
16+
//! - Optional memory-cache layer
1617
//! - Tuned for large-file databases
1718
//! - Included cache eviction (LRU/FIFO)
1819
//! - Easily accessible value metadata
@@ -33,18 +34,20 @@
3334
//! # Examples
3435
//!
3536
//! ```rust,no_run
36-
//! #[tokio::main]
37-
//! async fn main() {
37+
//! use std::error::Error;
3838
//! use forceps::Cache;
3939
//!
40-
//! let cache = Cache::new("./cache")
41-
//! .build()
42-
//! .await
43-
//! .unwrap();
40+
//! #[tokio::main]
41+
//! async fn main() -> Result<(), Box<dyn Error>> {
42+
//! let cache = Cache::new("./cache")
43+
//! .build()
44+
//! .await?;
45+
//!
46+
//! cache.write(b"MY_KEY", b"Hello World").await?;
47+
//! let data = cache.read(b"MY_KEY").await?;
48+
//! assert_eq!(data.as_ref(), b"Hello World");
4449
//!
45-
//! cache.write(b"MY_KEY", b"Hello World").await.unwrap();
46-
//! let data = cache.read(b"MY_KEY").await.unwrap();
47-
//! assert_eq!(data.as_ref(), b"Hello World");
50+
//! Ok(())
4851
//! }
4952
//! ```
5053
@@ -97,6 +100,7 @@ impl error::Error for ForcepError {
97100
}
98101
}
99102

103+
mod mem_cache;
100104
mod tmp;
101105

102106
mod builder;

src/mem_cache.rs

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
use crate::Md5Bytes;
2+
use bytes::Bytes;
3+
use lru::LruCache;
4+
use parking_lot::Mutex;
5+
use std::sync::atomic::{AtomicUsize, Ordering};
6+
7+
type Lru = LruCache<Md5Bytes, Bytes>;
8+
9+
#[inline]
10+
fn hash_key(k: &[u8]) -> Md5Bytes {
11+
md5::compute(k).into()
12+
}
13+
14+
#[derive(Debug)]
15+
struct MemCacheInner {
16+
cache: Mutex<Lru>,
17+
18+
cap: usize,
19+
current: AtomicUsize,
20+
}
21+
22+
#[derive(Debug)]
23+
pub(crate) struct MemCache(Option<MemCacheInner>);
24+
25+
impl MemCacheInner {
26+
fn new(cap: usize) -> Self {
27+
Self {
28+
cache: Mutex::new(Lru::unbounded()),
29+
cap,
30+
current: AtomicUsize::new(0),
31+
}
32+
}
33+
34+
fn get(&self, k: &[u8]) -> Option<Bytes> {
35+
let mut guard = self.cache.lock();
36+
guard.get(&hash_key(k)).map(Bytes::clone)
37+
}
38+
39+
fn put(&self, k: &[u8], v: Bytes) -> Option<Bytes> {
40+
let mut guard = self.cache.lock();
41+
let v_sz = v.len();
42+
let other = guard.put(hash_key(k), v);
43+
44+
// sub the last value and add the new one to the total bytes counter
45+
if let Some(ref other) = other {
46+
self.current.fetch_sub(other.len(), Ordering::Relaxed);
47+
}
48+
let updated_sz = self.current.fetch_add(v_sz, Ordering::SeqCst) + v_sz;
49+
50+
// if the new size is greater than the cap, start evicting items
51+
println!("{} / {}", updated_sz, self.cap);
52+
if updated_sz > self.cap {
53+
self.evict(&mut guard, updated_sz);
54+
}
55+
56+
other
57+
}
58+
59+
fn evict(&self, lru: &mut Lru, mut current: usize) -> usize {
60+
// pop items until it meets size requirement
61+
loop {
62+
match lru.pop_lru() {
63+
Some((_, b)) => current -= b.len(),
64+
None => break,
65+
}
66+
if current <= self.cap {
67+
break;
68+
}
69+
}
70+
self.current.swap(current, Ordering::SeqCst);
71+
current
72+
}
73+
74+
#[cfg(test)]
75+
fn peek(&self, k: &[u8]) -> Option<Bytes> {
76+
let guard = self.cache.lock();
77+
guard.peek(&hash_key(k)).map(Bytes::clone)
78+
}
79+
}
80+
81+
impl MemCache {
82+
pub(crate) fn new(bytes_cap: usize) -> Self {
83+
if bytes_cap == 0 {
84+
Self(None)
85+
} else {
86+
Self(Some(MemCacheInner::new(bytes_cap)))
87+
}
88+
}
89+
90+
#[inline]
91+
pub(crate) fn is_nil(&self) -> bool {
92+
self.0.is_none()
93+
}
94+
95+
#[inline]
96+
pub(crate) fn get(&self, k: &[u8]) -> Option<Bytes> {
97+
self.0.as_ref().and_then(|c| c.get(k))
98+
}
99+
#[inline]
100+
pub(crate) fn put(&self, k: &[u8], v: Bytes) -> Option<Bytes> {
101+
self.0.as_ref().and_then(|c| c.put(k, v))
102+
}
103+
104+
// functions for tests
105+
#[cfg(test)]
106+
fn peek(&self, k: &[u8]) -> Option<Bytes> {
107+
self.0.as_ref().and_then(|c| c.peek(k))
108+
}
109+
#[cfg(test)]
110+
fn current_size(&self) -> Option<usize> {
111+
self.0.as_ref().map(|x| x.current.load(Ordering::SeqCst))
112+
}
113+
}
114+
115+
#[cfg(test)]
116+
mod test {
117+
use super::*;
118+
const D: &'static [u8] = &[0; 4096];
119+
120+
#[test]
121+
fn verify_eviction() {
122+
let cache = MemCache::new(D.len() * 2);
123+
cache.put(b"ENT1", Bytes::from(D));
124+
cache.put(b"ENT2", Bytes::from(D));
125+
// verify that ENT1 still exists in cache (no eviction yet)
126+
assert!(cache.peek(b"ENT1").is_some());
127+
128+
// with the new put, cache should evict ENT1
129+
cache.put(b"ENT3", Bytes::from(D));
130+
assert!(cache.peek(b"ENT1").is_none());
131+
}
132+
133+
#[test]
134+
fn size_updates() {
135+
// make sure current size is being updated after insertion of items
136+
let cache = MemCache::new(D.len() * 2);
137+
cache.put(b"ENT1", Bytes::from(D));
138+
cache.put(b"ENT2", Bytes::from(D));
139+
assert_eq!(cache.current_size().unwrap(), D.len() * 2);
140+
141+
// this will verify that the last replaced entry has its byte count removed
142+
cache.put(b"ENT2", Bytes::from(D));
143+
assert_eq!(cache.current_size().unwrap(), D.len() * 2);
144+
}
145+
}

0 commit comments

Comments
 (0)