Skip to content

Commit 587aa29

Browse files
committed
Fix: the chunk cache state file is being loaded by every request processing task to determine if a chunk is cached and if so its size.
There's a MPSC channel used to buffer all cache write requests to a single task responsible for cache updates but the update task could happen at the same time request processing tasks read the state. Reading the state file whilst it's being written results in a serde parsing error. Instead of adding some type of thread safety around the load/save of the state file a simpler solution is to store a chunk's metadata in its own metadata file. This mirrors the chunk get which ignores the state file and simply retrieves files from disk.
1 parent 57df55d commit 587aa29

File tree

2 files changed

+63
-17
lines changed

2 files changed

+63
-17
lines changed

src/app.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,6 @@ async fn download_and_cache_s3_object<'a>(
247247
request_data.offset,
248248
request_data.size,
249249
);
250-
let key = format!("{:?}", md5::compute(key));
251250

252251
if let Some(metadata) = chunk_cache.get_metadata(&key).await {
253252
if !allow_cache_auth_bypass {

src/chunk_cache.rs

Lines changed: 63 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,7 @@ impl ChunkCache {
118118
///
119119
/// * `key`: Unique key identifying the chunk
120120
pub async fn get_metadata(&self, key: &str) -> Option<Metadata> {
121-
let state = self.cache.load_state().await;
122-
state.metadata.get(key).cloned()
121+
self.cache.get_metadata(key).await
123122
}
124123

125124
/// Retrieves chunk `Bytes` from the cache for an unique key.
@@ -320,6 +319,26 @@ impl SimpleDiskCache {
320319
}
321320
}
322321

322+
/// Retrieves chunk metadata from the cache for an unique key.
323+
/// The metadata simply needs to exist on disk to be returned.
324+
/// This function does not modify the state of the cache and is thread safe.
325+
///
326+
/// # Arguments
327+
///
328+
/// * `key`: Unique key identifying the chunk
329+
async fn get_metadata(&self, key: &str) -> Option<Metadata> {
330+
match fs::read_to_string(
331+
self.dir
332+
.join(&self.name)
333+
.join(self.filename_for_key(key).await + ".meta"),
334+
)
335+
.await
336+
{
337+
Ok(content) => Some(serde_json::from_str(content.as_str()).unwrap()),
338+
_ => None,
339+
}
340+
}
341+
323342
/// Stores chunk `Bytes` in the cache against an unique key.
324343
/// The cache is checked and if necessary pruned before storing the chunk.
325344
/// Where a maximum size limit has been set the check will take into account the size
@@ -334,18 +353,24 @@ impl SimpleDiskCache {
334353
let size = value.len();
335354
// Run the prune before storing to ensure we have sufficient space
336355
self.prune(/* headroom */ size).await?;
337-
// Write the cache value and then update the metadata
338-
let path = self
339-
.dir
340-
.join(&self.name)
341-
.join(self.filename_for_key(key).await);
342-
if let Err(e) = fs::write(path, value).await {
356+
// Write the cache value to a file
357+
let path = self.dir.join(&self.name);
358+
if let Err(e) = fs::write(path.join(self.filename_for_key(key).await), value).await {
343359
return Err(format!("{:?}", e));
344360
}
361+
// Write the metadata to a separate file
362+
let metadata = Metadata::new(size, self.ttl_seconds);
363+
if let Err(e) = fs::write(
364+
path.join(self.filename_for_key(key).await + ".meta"),
365+
serde_json::to_string(&metadata).unwrap(),
366+
)
367+
.await
368+
{
369+
return Err(format!("{:?}", e));
370+
}
371+
// Update the global state
345372
let mut state = self.load_state().await;
346-
state
347-
.metadata
348-
.insert(key.to_owned(), Metadata::new(size, self.ttl_seconds));
373+
state.metadata.insert(key.to_owned(), metadata);
349374
state.current_size_bytes += size;
350375
self.save_state(state).await;
351376
Ok(())
@@ -359,11 +384,16 @@ impl SimpleDiskCache {
359384
async fn remove(&self, key: &str) {
360385
let mut state = self.load_state().await;
361386
if let Some(data) = state.metadata.remove(key) {
362-
let path = self
363-
.dir
364-
.join(&self.name)
365-
.join(self.filename_for_key(key).await);
366-
fs::remove_file(path).await.unwrap();
387+
let path = self.dir.join(&self.name);
388+
// Remove the chunk file
389+
fs::remove_file(path.join(self.filename_for_key(key).await))
390+
.await
391+
.unwrap();
392+
// Remove the metadata file
393+
fs::remove_file(path.join(self.filename_for_key(key).await + ".meta"))
394+
.await
395+
.unwrap();
396+
// Update the global state
367397
state.current_size_bytes -= data.size_bytes;
368398
self.save_state(state).await;
369399
}
@@ -491,6 +521,14 @@ mod tests {
491521
assert_eq!(metadata.len(), 1);
492522
assert_eq!(metadata.get(key_1).unwrap().size_bytes, value_1.len());
493523
assert_eq!(cache_item_1.unwrap(), Some(value_1));
524+
assert_eq!(
525+
cache.get_metadata(key_1).await.unwrap().expires,
526+
metadata.get(key_1).unwrap().expires
527+
);
528+
assert_eq!(
529+
cache.get_metadata(key_1).await.unwrap().size_bytes,
530+
metadata.get(key_1).unwrap().size_bytes
531+
);
494532

495533
// Act
496534
let key_2 = "item-2";
@@ -503,6 +541,14 @@ mod tests {
503541
assert_eq!(metadata.len(), 2);
504542
assert_eq!(metadata.get(key_2).unwrap().size_bytes, value_2.len());
505543
assert_eq!(cache_item_2.unwrap(), Some(value_2));
544+
assert_eq!(
545+
cache.get_metadata(key_2).await.unwrap().expires,
546+
metadata.get(key_2).unwrap().expires
547+
);
548+
assert_eq!(
549+
cache.get_metadata(key_2).await.unwrap().size_bytes,
550+
metadata.get(key_2).unwrap().size_bytes
551+
);
506552

507553
// Act
508554
cache.remove(key_1).await;
@@ -514,6 +560,7 @@ mod tests {
514560
assert!(!metadata.contains_key(key_1));
515561
assert!(metadata.contains_key(key_2));
516562
assert_eq!(cache_item_1.unwrap(), None);
563+
assert!(cache.get_metadata(key_1).await.is_none());
517564
}
518565

519566
#[tokio::test]

0 commit comments

Comments
 (0)