Skip to content

Commit 3e6ed6d

Browse files
committed
Add SimpleChunkCache pruning, with tests, and integrate with the Reductionist by using the ChunkCache object as an interfacing wrapper for ActiveStorageError.
Currently loads and saves state to disk, rather than keeping in memory, which could be a future performance improvement.
1 parent a414f0c commit 3e6ed6d

File tree

4 files changed

+536
-391
lines changed

4 files changed

+536
-391
lines changed

src/app.rs

Lines changed: 26 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
//! Active Storage server API
22
3-
use crate::chunk_cache::SelfPruningChunkCache;
3+
use crate::chunk_cache::ChunkCache;
44
use crate::cli::CommandLineArgs;
55
use crate::error::ActiveStorageError;
66
use crate::filter_pipeline;
@@ -58,7 +58,7 @@ struct AppState {
5858
resource_manager: ResourceManager,
5959

6060
/// Object chunk cache
61-
chunk_cache: SelfPruningChunkCache,
61+
chunk_cache: ChunkCache,
6262
}
6363

6464
impl AppState {
@@ -67,7 +67,7 @@ impl AppState {
6767
let task_limit = args.thread_limit.or_else(|| Some(num_cpus::get() - 1));
6868
let resource_manager =
6969
ResourceManager::new(args.s3_connection_limit, args.memory_limit, task_limit);
70-
let chunk_cache = SelfPruningChunkCache::new(args);
70+
let chunk_cache = ChunkCache::new(args);
7171

7272
Self {
7373
args: args.clone(),
@@ -174,36 +174,29 @@ async fn schema() -> &'static str {
174174
/// * `client`: S3 client object
175175
/// * `request_data`: RequestData object for the request
176176
/// * `resource_manager`: ResourceManager object
177-
/// * `chunk_cache`: SelfPruningChunkCache object
178-
/// * `args`: CommandLineArgs object
177+
/// * `chunk_cache`: ChunkCache object
179178
async fn download_object<'a>(
180179
client: &s3_client::S3Client,
181180
request_data: &models::RequestData,
182181
resource_manager: &'a ResourceManager,
183-
chunk_cache: &SelfPruningChunkCache,
184-
args: &CommandLineArgs,
182+
chunk_cache: &ChunkCache,
185183
) -> Result<Bytes, ActiveStorageError> {
186184

187185
let key = format!("{},{:?}", client, request_data);
188186

189-
// If we're using the chunk cache,
190-
// check if the key is in the cache and return the cached bytes if so.
191-
if args.use_chunk_cache {
192-
match chunk_cache.get(&key) {
193-
Ok(cache_value) => {
194-
if let Some(cached_bytes) = cache_value {
195-
// TODO: remove debug
196-
println!("Cache hit for key: {}", key);
197-
return Ok(cached_bytes);
198-
}
199-
},
200-
Err(e) => {
201-
// Propagate any cache error back as an ActiveStorageError
202-
return Err(ActiveStorageError::CacheError{ error: format!("{:?}", e) });
187+
// If we're using the chunk cache and have a hit it'll return Some(bytes), None when disabled.
188+
match chunk_cache.get(&key).await {
189+
Ok(value) => {
190+
if let Some(bytes) = value {
191+
// TODO: remove debug
192+
println!("Cache hit for key: {}", key);
193+
return Ok(bytes);
203194
}
204-
};
195+
},
196+
Err(e) => {
197+
return Err(e);
198+
}
205199
}
206-
207200
// TODO: remove debug
208201
println!("Cache miss for key: {}", key);
209202

@@ -225,22 +218,19 @@ async fn download_object<'a>(
225218
)
226219
.await;
227220

228-
// If we're using the chunk cache,
229-
// store the data that has been successfully downloaded
230-
if args.use_chunk_cache {
231-
if let Ok(data_bytes) = &data {
232-
match chunk_cache.set(&key, &data_bytes) {
233-
Ok(_) => {},
234-
Err(e) => {
235-
// Propagate any cache error back as an ActiveStorageError
236-
return Err(ActiveStorageError::CacheError{ error: format!("{:?}", e) });
237-
}
221+
if let Ok(data_bytes) = &data {
222+
// Store the data against this key if the chunk cache is enabled.
223+
match chunk_cache.set(&key, data_bytes.clone()).await {
224+
Ok(_) => {},
225+
Err(e) => {
226+
return Err(e);
238227
}
239228
}
240-
// Increment the prometheus metric for cache misses
241-
LOCAL_CACHE_MISSES.with_label_values(&["disk"]).inc();
242229
}
243230

231+
// Increment the prometheus metric for cache misses
232+
LOCAL_CACHE_MISSES.with_label_values(&["disk"]).inc();
233+
244234
data
245235
}
246236

@@ -279,12 +269,11 @@ async fn operation_handler<T: operation::Operation>(
279269
&request_data,
280270
&state.resource_manager,
281271
&state.chunk_cache,
282-
&state.args,
283272
)
284273
.instrument(tracing::Span::current())
285274
.await?;
286275

287-
// All remaining work is synchronous. If the use_rayon argument was specified, delegate to the
276+
// All remaining work i s synchronous. If the use_rayon argument was specified, delegate to the
288277
// Rayon thread pool. Otherwise, execute as normal using Tokio.
289278
if state.args.use_rayon {
290279
tokio_rayon::spawn(move || operation::<T>(request_data, data)).await

0 commit comments

Comments
 (0)