11//! Active Storage server API
22
3+ use crate :: chunk_cache:: { self , ChunkCache } ;
34use crate :: cli:: CommandLineArgs ;
45use crate :: error:: ActiveStorageError ;
56use crate :: filter_pipeline;
@@ -22,7 +23,7 @@ use axum::{
2223 Router , TypedHeader ,
2324} ;
2425use bytes:: Bytes ;
25- use cached:: { proc_macro :: io_cached , stores :: DiskCacheBuilder } ;
26+ use cached:: IOCached ;
2627
2728use std:: sync:: Arc ;
2829use tower:: Layer ;
@@ -56,6 +57,9 @@ struct AppState {
5657
5758 /// Resource manager.
5859 resource_manager : ResourceManager ,
60+
61+ /// Object chunk cache
62+ chunk_cache : ChunkCache ,
5963}
6064
6165impl AppState {
@@ -64,10 +68,13 @@ impl AppState {
6468 let task_limit = args. thread_limit . or_else ( || Some ( num_cpus:: get ( ) - 1 ) ) ;
6569 let resource_manager =
6670 ResourceManager :: new ( args. s3_connection_limit , args. memory_limit , task_limit) ;
71+ let chunk_cache: ChunkCache = chunk_cache:: build ( args) ;
72+
6773 Self {
6874 args : args. clone ( ) ,
6975 s3_client_map : s3_client:: S3ClientMap :: new ( ) ,
7076 resource_manager,
77+ chunk_cache,
7178 }
7279 }
7380}
@@ -159,27 +166,43 @@ async fn schema() -> &'static str {
159166 "Hello, world!"
160167}
161168
162- /// Download an object from S3
169+ /// Download and optionally cache an object from S3
163170///
164171/// Requests a byte range if `offset` or `size` is specified in the request.
165172///
166173/// # Arguments
167174///
168175/// * `client`: S3 client object
169176/// * `request_data`: RequestData object for the request
170- #[ tracing:: instrument( level = "DEBUG" , skip( client, request_data, resource_manager) ) ]
171- #[ io_cached(
172- map_error = r##"|e| ActiveStorageError::CacheError{ error: format!("{:?}", e) }"## ,
173- disk = true ,
174- create = r##"{ DiskCacheBuilder::new("test-cache").set_disk_directory("./").build().expect("valid disk cache builder") }"## ,
175- key = "String" ,
176- convert = r##"{ format!("{},{:?}", client, request_data) }"##
177- ) ]
177+ /// * `resource_manager`: ResourceManager object
178+ /// * `chunk_cache`: ChunkCache object
179+ /// * `args`: CommandLineArgs object
178180async fn download_object < ' a > (
179181 client : & s3_client:: S3Client ,
180182 request_data : & models:: RequestData ,
181- resource_manager : & ResourceManager ,
183+ resource_manager : & ' a ResourceManager ,
184+ chunk_cache : & ChunkCache ,
185+ args : & CommandLineArgs ,
182186) -> Result < Bytes , ActiveStorageError > {
187+
188+ let key = format ! ( "{},{:?}" , client, request_data) ;
189+
190+ // If we're using the chunk cache,
191+ // check if the key is in the cache and return the cached bytes if so.
192+ if args. use_chunk_cache {
193+ match chunk_cache. cache_get ( & key) {
194+ Ok ( cache_value_for_key) => {
195+ if let Some ( cached_bytes) = cache_value_for_key {
196+ return Ok ( cached_bytes) ;
197+ }
198+ } ,
199+ Err ( e) => {
200+ // Propagate any cache error back as an ActiveStorageError
201+ return Err ( ActiveStorageError :: CacheError { error : format ! ( "{:?}" , e) } ) ;
202+ }
203+ } ;
204+ }
205+
183206 // If we're given a size in the request data then use this to
184207 // get an initial guess at the required memory resources.
185208 let memory = request_data. size . unwrap_or ( 0 ) ;
@@ -188,18 +211,33 @@ async fn download_object<'a>(
188211 let range = s3_client:: get_range ( request_data. offset , request_data. size ) ;
189212 let _conn_permits = resource_manager. s3_connection ( ) . await ?;
190213
191- // Increment the prometheus metric for cache misses
192- LOCAL_CACHE_MISSES . with_label_values ( & [ "disk" ] ) . inc ( ) ;
193-
194- client
214+ let data = client
195215 . download_object (
196216 & request_data. bucket ,
197217 & request_data. object ,
198218 range,
199219 resource_manager,
200220 & mut mem_permits,
201221 )
202- . await
222+ . await ;
223+
224+ // If we're using the chunk cache,
225+ // store the data that has been successfully downloaded
226+ if args. use_chunk_cache {
227+ if let Ok ( data_bytes) = & data {
228+ match chunk_cache. cache_set ( key, data_bytes. clone ( ) ) {
229+ Ok ( _) => { } ,
230+ Err ( e) => {
231+ // Propagate any cache error back as an ActiveStorageError
232+ return Err ( ActiveStorageError :: CacheError { error : format ! ( "{:?}" , e) } ) ;
233+ }
234+ }
235+ }
236+ // Increment the prometheus metric for cache misses
237+ LOCAL_CACHE_MISSES . with_label_values ( & [ "disk" ] ) . inc ( ) ;
238+ }
239+
240+ data
203241}
204242
205243/// Handler for Active Storage operations
@@ -231,9 +269,17 @@ async fn operation_handler<T: operation::Operation>(
231269 . get ( & request_data. source , credentials)
232270 . instrument ( tracing:: Span :: current ( ) )
233271 . await ;
234- let data = download_object ( & s3_client, & request_data, & state. resource_manager )
235- . instrument ( tracing:: Span :: current ( ) )
236- . await ?;
272+
273+ let data = download_object (
274+ & s3_client,
275+ & request_data,
276+ & state. resource_manager ,
277+ & state. chunk_cache ,
278+ & state. args ,
279+ )
280+ . instrument ( tracing:: Span :: current ( ) )
281+ . await ?;
282+
237283 // All remaining work is synchronous. If the use_rayon argument was specified, delegate to the
238284 // Rayon thread pool. Otherwise, execute as normal using Tokio.
239285 if state. args . use_rayon {
0 commit comments