11//! Active Storage server API
22
3+ use crate :: chunk_cache:: ChunkCache ;
34use crate :: cli:: CommandLineArgs ;
45use crate :: error:: ActiveStorageError ;
56use crate :: filter_pipeline;
6- use crate :: metrics:: { metrics_handler, track_metrics} ;
7+ use crate :: metrics:: { metrics_handler, track_metrics, LOCAL_CACHE_MISSES } ;
78use crate :: models;
89use crate :: operation;
910use crate :: operations;
@@ -14,17 +15,16 @@ use crate::validated_json::ValidatedJson;
1415
1516use axum:: middleware;
1617use axum:: {
17- body:: Bytes ,
1818 extract:: { Path , State } ,
1919 headers:: authorization:: { Authorization , Basic } ,
2020 http:: header,
2121 response:: { IntoResponse , Response } ,
2222 routing:: { get, post} ,
2323 Router , TypedHeader ,
2424} ;
25+ use bytes:: Bytes ;
2526
2627use std:: sync:: Arc ;
27- use tokio:: sync:: SemaphorePermit ;
2828use tower:: Layer ;
2929use tower:: ServiceBuilder ;
3030use tower_http:: normalize_path:: NormalizePathLayer ;
@@ -56,6 +56,9 @@ struct AppState {
5656
5757 /// Resource manager.
5858 resource_manager : ResourceManager ,
59+
60+ /// Object chunk cache
61+ chunk_cache : Option < ChunkCache > ,
5962}
6063
6164impl AppState {
@@ -64,10 +67,17 @@ impl AppState {
6467 let task_limit = args. thread_limit . or_else ( || Some ( num_cpus:: get ( ) - 1 ) ) ;
6568 let resource_manager =
6669 ResourceManager :: new ( args. s3_connection_limit , args. memory_limit , task_limit) ;
70+ let chunk_cache = if args. use_chunk_cache {
71+ Some ( ChunkCache :: new ( args) )
72+ } else {
73+ None
74+ } ;
75+
6776 Self {
6877 args : args. clone ( ) ,
6978 s3_client_map : s3_client:: S3ClientMap :: new ( ) ,
7079 resource_manager,
80+ chunk_cache,
7181 }
7282 }
7383}
@@ -167,27 +177,94 @@ async fn schema() -> &'static str {
167177///
168178/// * `client`: S3 client object
169179/// * `request_data`: RequestData object for the request
170- #[ tracing:: instrument(
171- level = "DEBUG" ,
172- skip( client, request_data, resource_manager, mem_permits)
173- ) ]
174- async fn download_object < ' a > (
180+ /// * `resource_manager`: ResourceManager object
181+ async fn download_s3_object < ' a > (
175182 client : & s3_client:: S3Client ,
176183 request_data : & models:: RequestData ,
177184 resource_manager : & ' a ResourceManager ,
178- mem_permits : & mut Option < SemaphorePermit < ' a > > ,
179185) -> Result < Bytes , ActiveStorageError > {
186+
187+ // If we're given a size in the request data then use this to
188+ // get an initial guess at the required memory resources.
189+ let memory = request_data. size . unwrap_or ( 0 ) ;
190+ let mut mem_permits = resource_manager. memory ( memory) . await ?;
191+
180192 let range = s3_client:: get_range ( request_data. offset , request_data. size ) ;
181193 let _conn_permits = resource_manager. s3_connection ( ) . await ?;
194+
182195 client
196+ . download_object (
197+ & request_data. bucket ,
198+ & request_data. object ,
199+ range,
200+ resource_manager,
201+ & mut mem_permits,
202+ )
203+ . await
204+ }
205+
206+ /// Download and cache an object from S3
207+ ///
208+ /// Requests a byte range if `offset` or `size` is specified in the request.
209+ ///
210+ /// # Arguments
211+ ///
212+ /// * `client`: S3 client object
213+ /// * `request_data`: RequestData object for the request
214+ /// * `resource_manager`: ResourceManager object
215+ /// * `chunk_cache`: ChunkCache object
216+ async fn download_and_cache_s3_object < ' a > (
217+ client : & s3_client:: S3Client ,
218+ request_data : & models:: RequestData ,
219+ resource_manager : & ' a ResourceManager ,
220+ chunk_cache : & ChunkCache ,
221+ ) -> Result < Bytes , ActiveStorageError > {
222+
223+ let key = format ! ( "{},{:?}" , client, request_data) ;
224+
225+ match chunk_cache. get ( & key) . await {
226+ Ok ( value) => {
227+ if let Some ( bytes) = value {
228+ return Ok ( bytes) ;
229+ }
230+ } ,
231+ Err ( e) => {
232+ return Err ( e) ;
233+ }
234+ }
235+
236+ // If we're given a size in the request data then use this to
237+ // get an initial guess at the required memory resources.
238+ let memory = request_data. size . unwrap_or ( 0 ) ;
239+ let mut mem_permits = resource_manager. memory ( memory) . await ?;
240+
241+ let range = s3_client:: get_range ( request_data. offset , request_data. size ) ;
242+ let _conn_permits = resource_manager. s3_connection ( ) . await ?;
243+
244+ let data = client
183245 . download_object (
184246 & request_data. bucket ,
185247 & request_data. object ,
186248 range,
187249 resource_manager,
188- mem_permits,
250+ & mut mem_permits,
189251 )
190- . await
252+ . await ;
253+
254+ if let Ok ( data_bytes) = & data {
255+ // Store the data against this key if the chunk cache is enabled.
256+ match chunk_cache. set ( & key, data_bytes. clone ( ) ) . await {
257+ Ok ( _) => { } ,
258+ Err ( e) => {
259+ return Err ( e) ;
260+ }
261+ }
262+ }
263+
264+ // Increment the prometheus metric for cache misses
265+ LOCAL_CACHE_MISSES . with_label_values ( & [ "disk" ] ) . inc ( ) ;
266+
267+ data
191268}
192269
193270/// Handler for Active Storage operations
@@ -209,8 +286,6 @@ async fn operation_handler<T: operation::Operation>(
209286 auth : Option < TypedHeader < Authorization < Basic > > > ,
210287 ValidatedJson ( request_data) : ValidatedJson < models:: RequestData > ,
211288) -> Result < models:: Response , ActiveStorageError > {
212- let memory = request_data. size . unwrap_or ( 0 ) ;
213- let mut _mem_permits = state. resource_manager . memory ( memory) . await ?;
214289 let credentials = if let Some ( TypedHeader ( auth) ) = auth {
215290 s3_client:: S3Credentials :: access_key ( auth. username ( ) , auth. password ( ) )
216291 } else {
@@ -221,15 +296,27 @@ async fn operation_handler<T: operation::Operation>(
221296 . get ( & request_data. source , credentials)
222297 . instrument ( tracing:: Span :: current ( ) )
223298 . await ;
224- let data = download_object (
225- & s3_client,
226- & request_data,
227- & state. resource_manager ,
228- & mut _mem_permits,
229- )
230- . instrument ( tracing:: Span :: current ( ) )
231- . await ?;
232- // All remaining work is synchronous. If the use_rayon argument was specified, delegate to the
299+
300+ let data = if state. args . use_chunk_cache {
301+ download_and_cache_s3_object (
302+ & s3_client,
303+ & request_data,
304+ & state. resource_manager ,
305+ state. chunk_cache . as_ref ( ) . unwrap ( ) ,
306+ )
307+ . instrument ( tracing:: Span :: current ( ) )
308+ . await ?
309+ } else {
310+ download_s3_object (
311+ & s3_client,
312+ & request_data,
313+ & state. resource_manager ,
314+ )
315+ . instrument ( tracing:: Span :: current ( ) )
316+ . await ?
317+ } ;
318+
319+ // All remaining work i s synchronous. If the use_rayon argument was specified, delegate to the
233320 // Rayon thread pool. Otherwise, execute as normal using Tokio.
234321 if state. args . use_rayon {
235322 tokio_rayon:: spawn ( move || operation :: < T > ( request_data, data) ) . await
0 commit comments