Skip to content
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ async fn schema() -> &'static str {
/// * `client`: S3 client object
/// * `request_data`: RequestData object for the request
/// * `resource_manager`: ResourceManager object
/// * `mem_permits`: Memory permits for the request
#[tracing::instrument(level = "DEBUG", skip(client, request_data, resource_manager))]
async fn download_s3_object<'a>(
client: &s3_client::S3Client,
Expand Down Expand Up @@ -221,6 +222,7 @@ async fn download_s3_object<'a>(
/// * `client`: S3 client object
/// * `request_data`: RequestData object for the request
/// * `resource_manager`: ResourceManager object
/// * `mem_permits`: Memory permits for the request
/// * `chunk_cache`: ChunkCache object
#[tracing::instrument(
level = "DEBUG",
Expand All @@ -234,17 +236,16 @@ async fn download_and_cache_s3_object<'a>(
chunk_cache: &ChunkCache,
allow_cache_auth_bypass: bool,
) -> Result<Bytes, ActiveStorageError> {
// We chose a cache key such that any changes to request data
// We choose a cache key such that any changes to request data
// which may feasibly indicate a change to the upstream object
// lead to a new cache key.
let key = format!(
"{}-{}-{}-{}-{:?}-{:?}",
"{}-{}-{}-{:?}-{:?}",
request_data.source.as_str(),
request_data.bucket,
request_data.object,
request_data.dtype,
request_data.byte_order,
request_data.compression,
request_data.offset,
request_data.size,
);

if let Some(metadata) = chunk_cache.get_metadata(&key).await {
Expand Down Expand Up @@ -292,7 +293,7 @@ async fn download_and_cache_s3_object<'a>(
let data = download_s3_object(client, request_data, resource_manager, mem_permits).await?;

// Write data to cache
chunk_cache.set(&key, data.clone()).await?;
chunk_cache.set(&key, &data).await?;

// Increment the prometheus metric for cache misses
LOCAL_CACHE_MISSES.with_label_values(&["disk"]).inc();
Expand Down
85 changes: 66 additions & 19 deletions src/chunk_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ struct ChunkCacheEntry {

impl ChunkCacheEntry {
/// Return a ChunkCacheEntry object
fn new(key: &str, value: Bytes) -> Self {
fn new(key: &str, value: &Bytes) -> Self {
let key = key.to_owned();
// Make sure we own the `Bytes` so we don't see unexpected, but not incorrect,
// behaviour caused by the zero copy of `Bytes`. i.e. let us choose when to copy.
let value = Bytes::copy_from_slice(&value);
let value = Bytes::copy_from_slice(value);
Self { key, value }
}
}
Expand Down Expand Up @@ -103,7 +103,7 @@ impl ChunkCache {
///
/// * `key`: Unique key identifying the chunk
/// * `value`: Chunk `Bytes` to be cached
pub async fn set(&self, key: &str, value: Bytes) -> Result<(), ActiveStorageError> {
pub async fn set(&self, key: &str, value: &Bytes) -> Result<(), ActiveStorageError> {
match self.sender.send(ChunkCacheEntry::new(key, value)).await {
Ok(_) => Ok(()),
Err(e) => Err(ActiveStorageError::ChunkCacheError {
Expand All @@ -118,8 +118,7 @@ impl ChunkCache {
///
/// * `key`: Unique key identifying the chunk
pub async fn get_metadata(&self, key: &str) -> Option<Metadata> {
let state = self.cache.load_state().await;
state.metadata.get(key).cloned()
self.cache.get_metadata(key).await
}

/// Retrieves chunk `Bytes` from the cache for an unique key.
Expand Down Expand Up @@ -320,6 +319,26 @@ impl SimpleDiskCache {
}
}

/// Retrieves chunk metadata from the cache for an unique key.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

Suggested change
/// Retrieves chunk metadata from the cache for an unique key.
/// Retrieves chunk metadata from the cache for a unique key.

/// The metadata simply needs to exist on disk to be returned.
/// This function does not modify the state of the cache and is thread safe.
///
/// # Arguments
///
/// * `key`: Unique key identifying the chunk
async fn get_metadata(&self, key: &str) -> Option<Metadata> {
match fs::read_to_string(
self.dir
.join(&self.name)
.join(self.filename_for_key(key).await + ".meta"),
)
.await
{
Ok(content) => Some(serde_json::from_str(content.as_str()).unwrap()),
_ => None,
}
}

/// Stores chunk `Bytes` in the cache against an unique key.
/// The cache is checked and if necessary pruned before storing the chunk.
/// Where a maximum size limit has been set the check will take into account the size
Expand All @@ -334,18 +353,24 @@ impl SimpleDiskCache {
let size = value.len();
// Run the prune before storing to ensure we have sufficient space
self.prune(/* headroom */ size).await?;
// Write the cache value and then update the metadata
let path = self
.dir
.join(&self.name)
.join(self.filename_for_key(key).await);
if let Err(e) = fs::write(path, value).await {
// Write the cache value to a file
let path = self.dir.join(&self.name);
if let Err(e) = fs::write(path.join(self.filename_for_key(key).await), value).await {
return Err(format!("{:?}", e));
}
// Write the metadata to a separate file
let metadata = Metadata::new(size, self.ttl_seconds);
if let Err(e) = fs::write(
path.join(self.filename_for_key(key).await + ".meta"),
serde_json::to_string(&metadata).unwrap(),
)
.await
{
return Err(format!("{:?}", e));
}
// Update the global state
let mut state = self.load_state().await;
state
.metadata
.insert(key.to_owned(), Metadata::new(size, self.ttl_seconds));
state.metadata.insert(key.to_owned(), metadata);
state.current_size_bytes += size;
self.save_state(state).await;
Ok(())
Expand All @@ -359,11 +384,16 @@ impl SimpleDiskCache {
async fn remove(&self, key: &str) {
let mut state = self.load_state().await;
if let Some(data) = state.metadata.remove(key) {
let path = self
.dir
.join(&self.name)
.join(self.filename_for_key(key).await);
fs::remove_file(path).await.unwrap();
let path = self.dir.join(&self.name);
// Remove the chunk file
fs::remove_file(path.join(self.filename_for_key(key).await))
.await
.unwrap();
// Remove the metadata file
fs::remove_file(path.join(self.filename_for_key(key).await + ".meta"))
.await
.unwrap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there anything we can do to avoid the unwraps here? Or we could at least log the error for easier debugging with something like

Suggested change
fs::remove_file(path.join(self.filename_for_key(key).await))
.await
.unwrap();
// Remove the metadata file
fs::remove_file(path.join(self.filename_for_key(key).await + ".meta"))
.await
.unwrap();
fs::remove_file(path.join(self.filename_for_key(key).await))
.await
.map_err(|e| log::error!("Failed to remove cache data with: {}", e)
.unwrap();
// Remove the metadata file
fs::remove_file(path.join(self.filename_for_key(key).await + ".meta"))
.await
.map_err(|e| log::error!("Failed to remove cache metadata with: {}", e)
.unwrap();

N.B. We might also want to add this map_error pattern anywhere else in the code that we need to use unwrap / expect.

// Update the global state
state.current_size_bytes -= data.size_bytes;
self.save_state(state).await;
}
Expand Down Expand Up @@ -491,6 +521,14 @@ mod tests {
assert_eq!(metadata.len(), 1);
assert_eq!(metadata.get(key_1).unwrap().size_bytes, value_1.len());
assert_eq!(cache_item_1.unwrap(), Some(value_1));
assert_eq!(
cache.get_metadata(key_1).await.unwrap().expires,
metadata.get(key_1).unwrap().expires
);
assert_eq!(
cache.get_metadata(key_1).await.unwrap().size_bytes,
metadata.get(key_1).unwrap().size_bytes
);

// Act
let key_2 = "item-2";
Expand All @@ -503,6 +541,14 @@ mod tests {
assert_eq!(metadata.len(), 2);
assert_eq!(metadata.get(key_2).unwrap().size_bytes, value_2.len());
assert_eq!(cache_item_2.unwrap(), Some(value_2));
assert_eq!(
cache.get_metadata(key_2).await.unwrap().expires,
metadata.get(key_2).unwrap().expires
);
assert_eq!(
cache.get_metadata(key_2).await.unwrap().size_bytes,
metadata.get(key_2).unwrap().size_bytes
);

// Act
cache.remove(key_1).await;
Expand All @@ -514,6 +560,7 @@ mod tests {
assert!(!metadata.contains_key(key_1));
assert!(metadata.contains_key(key_2));
assert_eq!(cache_item_1.unwrap(), None);
assert!(cache.get_metadata(key_1).await.is_none());
}

#[tokio::test]
Expand Down
Loading