Skip to content

Commit f64ef3d

Browse files
committed
Misc fixes
1 parent 9a7023e commit f64ef3d

File tree

2 files changed

+82
-81
lines changed

2 files changed

+82
-81
lines changed

src/app.rs

Lines changed: 27 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,7 @@ impl AppState {
7171
let path = args
7272
.chunk_cache_path
7373
.as_ref()
74-
.expect("The chunk cache path must be specified when the chunk cache is enabled")
75-
.clone();
74+
.expect("The chunk cache path must be specified when the chunk cache is enabled");
7675
Some(ChunkCache::new(
7776
path,
7877
args.chunk_cache_age,
@@ -189,6 +188,7 @@ async fn schema() -> &'static str {
189188
/// * `client`: S3 client object
190189
/// * `request_data`: RequestData object for the request
191190
/// * `resource_manager`: ResourceManager object
191+
#[tracing::instrument(level = "DEBUG", skip(client, request_data, resource_manager))]
192192
async fn download_s3_object<'a>(
193193
client: &s3_client::S3Client,
194194
request_data: &models::RequestData,
@@ -223,6 +223,10 @@ async fn download_s3_object<'a>(
223223
/// * `request_data`: RequestData object for the request
224224
/// * `resource_manager`: ResourceManager object
225225
/// * `chunk_cache`: ChunkCache object
226+
#[tracing::instrument(
227+
level = "DEBUG",
228+
skip(client, request_data, resource_manager, chunk_cache)
229+
)]
226230
async fn download_and_cache_s3_object<'a>(
227231
client: &s3_client::S3Client,
228232
request_data: &models::RequestData,
@@ -231,33 +235,20 @@ async fn download_and_cache_s3_object<'a>(
231235
) -> Result<Bytes, ActiveStorageError> {
232236
let key = format!("{},{:?}", client, request_data);
233237

234-
match chunk_cache.get(&key).await {
235-
Ok(value) => {
236-
if let Some(bytes) = value {
237-
return Ok(bytes);
238-
}
239-
}
240-
Err(e) => {
241-
return Err(e);
242-
}
238+
let cache_value = chunk_cache.get(&key).await?;
239+
if let Some(bytes) = cache_value {
240+
return Ok(bytes);
243241
}
244242

245-
let data = download_s3_object(client, request_data, resource_manager).await;
243+
let data = download_s3_object(client, request_data, resource_manager).await?;
246244

247-
if let Ok(data_bytes) = &data {
248-
// Store the data against this key if the chunk cache is enabled.
249-
match chunk_cache.set(&key, data_bytes.clone()).await {
250-
Ok(_) => {}
251-
Err(e) => {
252-
return Err(e);
253-
}
254-
}
255-
}
245+
// Write data to cache
246+
chunk_cache.set(&key, data.clone()).await?;
256247

257248
// Increment the prometheus metric for cache misses
258249
LOCAL_CACHE_MISSES.with_label_values(&["disk"]).inc();
259250

260-
data
251+
Ok(data)
261252
}
262253

263254
/// Handler for Active Storage operations
@@ -290,22 +281,22 @@ async fn operation_handler<T: operation::Operation>(
290281
.instrument(tracing::Span::current())
291282
.await;
292283

293-
let data = if state.args.use_chunk_cache {
294-
download_and_cache_s3_object(
295-
&s3_client,
296-
&request_data,
297-
&state.resource_manager,
298-
state.chunk_cache.as_ref().unwrap(),
299-
)
300-
.instrument(tracing::Span::current())
301-
.await?
302-
} else {
303-
download_s3_object(&s3_client, &request_data, &state.resource_manager)
304-
.instrument(tracing::Span::current())
305-
.await?
284+
let data = match (&state.args.use_chunk_cache, &state.chunk_cache) {
285+
(false, _) => {
286+
download_s3_object(&s3_client, &request_data, &state.resource_manager)
287+
.instrument(tracing::Span::current())
288+
.await?
289+
}
290+
(true, Some(cache)) => {
291+
download_and_cache_s3_object(&s3_client, &request_data, &state.resource_manager, cache)
292+
.await?
293+
}
294+
(true, None) => panic!(
295+
"Chunk cache enabled but no chunk cache provided.\nThis is a bug. Please report it to the application developers."
296+
),
306297
};
307298

308-
// All remaining work i s synchronous. If the use_rayon argument was specified, delegate to the
299+
// All remaining work is synchronous. If the use_rayon argument was specified, delegate to the
309300
// Rayon thread pool. Otherwise, execute as normal using Tokio.
310301
if state.args.use_rayon {
311302
tokio_rayon::spawn(move || operation::<T>(request_data, data)).await

src/chunk_cache.rs

Lines changed: 55 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -5,27 +5,28 @@ use bytes::Bytes;
55
use serde::{Deserialize, Serialize};
66
use std::{
77
collections::HashMap,
8-
ops::Add,
98
path::PathBuf,
9+
process::exit,
1010
sync::Arc,
1111
time::{SystemTime, UNIX_EPOCH},
1212
};
1313
use tokio::{fs, spawn, sync::mpsc};
1414

15-
/// ChunkKeyValue stores a chunk ready to be cached.
16-
struct ChunkKeyValue {
15+
/// ChunkCacheEntry stores a chunk ready to be cached.
16+
struct ChunkCacheEntry {
1717
/// Key to uniquely identify the chunk in the cache.
1818
key: String,
1919
/// Bytes to be cached.
2020
value: Bytes,
2121
}
2222

23-
impl ChunkKeyValue {
24-
/// Return a ChunkKeyValue object
25-
fn new(key: String, value: Bytes) -> Self {
23+
impl ChunkCacheEntry {
24+
/// Return a ChunkCacheEntry object
25+
fn new(key: &str, value: Bytes) -> Self {
26+
let key = key.to_owned();
2627
// Make sure we own the `Bytes` so we don't see unexpected, but not incorrect,
2728
// behaviour caused by the zero copy of `Bytes`. i.e. let us choose when to copy.
28-
let value = Bytes::from(value.to_vec());
29+
let value = Bytes::copy_from_slice(&value);
2930
Self { key, value }
3031
}
3132
}
@@ -43,7 +44,7 @@ pub struct ChunkCache {
4344
/// The underlying cache object.
4445
cache: Arc<SimpleDiskCache>,
4546
/// Sync primitive for managing write access to the cache.
46-
sender: mpsc::Sender<ChunkKeyValue>,
47+
sender: mpsc::Sender<ChunkCacheEntry>,
4748
}
4849

4950
impl ChunkCache {
@@ -57,7 +58,7 @@ impl ChunkCache {
5758
/// * `max_size_bytes`: An optional maximum cache size expressed as a string, i.e. "100GB"
5859
/// * `buffer_size`: An optional size for the chunk write buffer
5960
pub fn new(
60-
path: String,
61+
path: &str,
6162
ttl_seconds: u64,
6263
prune_interval_seconds: u64,
6364
max_size_bytes: Option<String>,
@@ -73,7 +74,7 @@ impl ChunkCache {
7374
};
7475
let cache = Arc::new(SimpleDiskCache::new(
7576
"chunk_cache",
76-
&path,
77+
path,
7778
ttl_seconds,
7879
prune_interval_seconds,
7980
max_size_bytes,
@@ -86,13 +87,10 @@ impl ChunkCache {
8687
// A download request storing to the cache need only wait for the chunk
8788
// to be sent to the channel.
8889
let buffer_size = buffer_size.unwrap_or(num_cpus::get() - 1);
89-
let (sender, mut receiver) = mpsc::channel::<ChunkKeyValue>(buffer_size);
90+
let (sender, mut receiver) = mpsc::channel::<ChunkCacheEntry>(buffer_size);
9091
spawn(async move {
9192
while let Some(message) = receiver.recv().await {
92-
cache_clone
93-
.set(message.key.as_str(), message.value)
94-
.await
95-
.unwrap();
93+
cache_clone.set(&message.key, message.value).await.unwrap();
9694
}
9795
});
9896

@@ -105,15 +103,11 @@ impl ChunkCache {
105103
///
106104
/// * `key`: Unique key identifying the chunk
107105
/// * `value`: Chunk `Bytes` to be cached
108-
pub async fn set(&self, key: &str, value: Bytes) -> Result<Option<Bytes>, ActiveStorageError> {
109-
match self
110-
.sender
111-
.send(ChunkKeyValue::new(String::from(key), value))
112-
.await
113-
{
114-
Ok(_) => Ok(None),
106+
pub async fn set(&self, key: &str, value: Bytes) -> Result<(), ActiveStorageError> {
107+
match self.sender.send(ChunkCacheEntry::new(key, value)).await {
108+
Ok(_) => Ok(()),
115109
Err(e) => Err(ActiveStorageError::ChunkCacheError {
116-
error: format!("{:?}", e),
110+
error: format!("{}", e),
117111
}),
118112
}
119113
}
@@ -136,7 +130,7 @@ impl ChunkCache {
136130
/// Metadata stored against each cache chunk.
137131
#[derive(Debug, Serialize, Deserialize)]
138132
struct Metadata {
139-
/// Seconds after unix epoch for ache item expiry.
133+
/// Seconds after unix epoch for cache item expiry.
140134
expires: u64,
141135
/// Cache value size.
142136
size_bytes: usize,
@@ -147,9 +141,10 @@ impl Metadata {
147141
fn new(size: usize, ttl: u64) -> Self {
148142
let expires = SystemTime::now()
149143
.duration_since(UNIX_EPOCH)
150-
.unwrap()
144+
// Only panics if 'now' is before epoch
145+
.expect("System time to be set correctly")
151146
.as_secs()
152-
.add(ttl);
147+
+ ttl;
153148
Metadata {
154149
expires,
155150
size_bytes: size,
@@ -178,7 +173,8 @@ impl State {
178173
fn new(prune_interval_secs: u64) -> Self {
179174
let next_prune = SystemTime::now()
180175
.duration_since(UNIX_EPOCH)
181-
.unwrap()
176+
// Only panics if 'now' is before epoch
177+
.expect("System time to be set correctly")
182178
.as_secs()
183179
+ prune_interval_secs;
184180
State {
@@ -192,9 +188,10 @@ impl State {
192188
/// The SimpleDiskCache takes chunks of `Bytes` data, identified by an unique key,
193189
/// storing each chunk as a separate file on disk. Keys are stored in a hashmap
194190
/// serialised to a JSON state file on disk.
195-
/// Each chunk stored has a TTL, time to live, stored as a number seconds from epoch,
196-
/// after which the chunk will have expired and can be pruned from the cache.
197-
/// Pruning takes place at time intervals or when the total size of the cache
191+
/// Each chunk stored has a 'time to live' (TTL) stored as a number seconds from
192+
/// the unix epoch, after which the chunk will have expired and can be pruned from
193+
/// the cache.
194+
/// Pruning takes place periodically or when the total size of the cache
198195
/// reaches a maximum size threshold.
199196
/// The decision whether to prune the cache is made when chunks are stored.
200197
#[derive(Debug)]
@@ -229,10 +226,27 @@ impl SimpleDiskCache {
229226
if !dir.as_path().exists() {
230227
panic!("Cache parent dir {} must exist", dir.to_str().unwrap())
231228
} else if path.exists() {
232-
panic!("Cache folder {} already exists", path.to_str().unwrap())
233-
} else {
234-
std::fs::create_dir(&path).unwrap();
229+
let stdin = std::io::stdin();
230+
println!(
231+
"Cache folder {} already exists. Do you want to wipe it? (y/n)",
232+
path.to_str().unwrap()
233+
);
234+
for line in stdin.lines() {
235+
match line {
236+
Ok(response) => match response.to_lowercase().as_str() {
237+
"y" | "yes" => {
238+
std::fs::remove_dir_all(&path).expect("failed to delete cache dir");
239+
println!("Cache dir cleared");
240+
break;
241+
}
242+
"n" | "no" => exit(0),
243+
_ => println!("Please entry 'y' for yes or 'n' for no"),
244+
},
245+
Err(e) => panic!("{}", e),
246+
};
247+
}
235248
}
249+
std::fs::create_dir(&path).expect("failed to create cache dir");
236250
SimpleDiskCache {
237251
name,
238252
dir,
@@ -344,18 +358,13 @@ impl SimpleDiskCache {
344358
/// * `key`: Unique key identifying the chunk
345359
async fn remove(&self, key: &str) {
346360
let mut state = self.load_state().await;
347-
let (mut remove, mut size_bytes) = (false, 0);
348-
if let Some(data) = state.metadata.get(key) {
349-
(remove, size_bytes) = (true, data.size_bytes);
350-
}
351-
if remove {
361+
if let Some(data) = state.metadata.remove(key) {
352362
let path = self
353363
.dir
354364
.join(&self.name)
355365
.join(self.filename_for_key(key).await);
356366
fs::remove_file(path).await.unwrap();
357-
state.metadata.remove(key);
358-
state.current_size_bytes -= size_bytes;
367+
state.current_size_bytes -= data.size_bytes;
359368
self.save_state(state).await;
360369
}
361370
}
@@ -365,7 +374,7 @@ impl SimpleDiskCache {
365374
let state = self.load_state().await;
366375
let timestamp = SystemTime::now()
367376
.duration_since(UNIX_EPOCH)
368-
.unwrap()
377+
.expect("System time to be set correctly")
369378
.as_secs();
370379
for (key, data) in state.metadata.iter() {
371380
if data.expires <= timestamp {
@@ -386,6 +395,7 @@ impl SimpleDiskCache {
386395
/// * `headroom_bytes`: specifies additional free space that must be left available
387396
async fn prune_disk_space(&self, headroom_bytes: usize) -> Result<(), String> {
388397
if let Some(max_size_bytes) = self.max_size_bytes {
398+
// TODO: Make this a std::io::ErrorKind::QuotaExceeded error once MSRV is 1.85
389399
if headroom_bytes > max_size_bytes {
390400
return Err("Chunk cannot fit within cache maximum size threshold".to_string());
391401
}
@@ -428,7 +438,7 @@ impl SimpleDiskCache {
428438
// We also prune at time intervals.
429439
let timestamp = SystemTime::now()
430440
.duration_since(UNIX_EPOCH)
431-
.unwrap()
441+
.expect("System time to be set correctly")
432442
.as_secs();
433443
prune_expired |= state.next_prune <= timestamp;
434444
// Prune if either of the above thresholds were crossed.
@@ -605,7 +615,7 @@ mod tests {
605615
tmp_dir.path().to_str().unwrap(),
606616
ttl, // ttl for cache entries
607617
1000, // purge expired interval set large to not trigger expiry on "set"
608-
Some(size * 2), // max cache size accomodates two entries
618+
Some(size * 2), // max cache size accommodates two entries
609619
);
610620

611621
// Action: populate cache with large entry
@@ -647,7 +657,7 @@ mod tests {
647657
tmp_dir.path().to_str().unwrap(),
648658
ttl, // ttl for cache entries
649659
1000, // purge expired interval set large to not trigger expiry on "set"
650-
Some(size * 2), // max cache size accomodates two entries
660+
Some(size * 2), // max cache size accommodates two entries
651661
);
652662

653663
// Action: populate cache with large entry
@@ -760,7 +770,7 @@ mod tests {
760770

761771
#[tokio::test]
762772
async fn test_simple_disk_cache_chunk_too_big() {
763-
// Setup the cache with a size limit so small it can't accomodate our test data.
773+
// Setup the cache with a size limit so small it can't accommodate our test data.
764774
let max_size_bytes = 100;
765775
let tmp_dir = TempDir::new().unwrap();
766776
let cache = SimpleDiskCache::new(

0 commit comments

Comments
 (0)