@@ -23,6 +23,7 @@ use axum::{
2323 Router , TypedHeader ,
2424} ;
2525use bytes:: Bytes ;
26+ use tokio:: sync:: SemaphorePermit ;
2627
2728use std:: sync:: Arc ;
2829use tower:: Layer ;
@@ -193,13 +194,11 @@ async fn download_s3_object<'a>(
193194 client : & s3_client:: S3Client ,
194195 request_data : & models:: RequestData ,
195196 resource_manager : & ' a ResourceManager ,
197+ mut mem_permits : Option < SemaphorePermit < ' a > > ,
196198) -> Result < Bytes , ActiveStorageError > {
197- // If we're given a size in the request data then use this to
198- // get an initial guess at the required memory resources.
199- let memory = request_data. size . unwrap_or ( 0 ) ;
200- let mut mem_permits = resource_manager. memory ( memory) . await ?;
201-
199+ // Convert request data to byte range for S3 request
202200 let range = s3_client:: get_range ( request_data. offset , request_data. size ) ;
201+ // Acquire connection permit to be freed via drop when this function returns
203202 let _conn_permits = resource_manager. s3_connection ( ) . await ?;
204203
205204 client
@@ -231,16 +230,37 @@ async fn download_and_cache_s3_object<'a>(
231230 client : & s3_client:: S3Client ,
232231 request_data : & models:: RequestData ,
233232 resource_manager : & ' a ResourceManager ,
233+ mut mem_permits : Option < SemaphorePermit < ' a > > ,
234234 chunk_cache : & ChunkCache ,
235235) -> Result < Bytes , ActiveStorageError > {
236236 let key = format ! ( "{},{:?}" , client, request_data) ;
237237
238- let cache_value = chunk_cache. get ( & key) . await ?;
239- if let Some ( bytes) = cache_value {
240- return Ok ( bytes) ;
238+ if let Some ( metadata) = chunk_cache. get_metadata ( & key) . await {
239+ // Update memory requested from resource manager to account for actual
240+ // size of data if we were previously unable to guess the size from request
241+ // data's size + offset parameters.
242+ // FIXME: how to account for compressed data?
243+ let mem_permits = & mut mem_permits;
244+ match mem_permits {
245+ None => {
246+ * mem_permits = resource_manager. memory ( metadata. size_bytes ) . await ?;
247+ }
248+ Some ( permits) => {
249+ if permits. num_permits ( ) == 0 {
250+ * mem_permits = resource_manager. memory ( metadata. size_bytes ) . await ?;
251+ }
252+ }
253+ }
254+ // We only want to get chunks for which the metadata check succeeded too,
255+ // otherwise chunks which are missing metadata could bypass the resource
256+ // manager and exhaust system resources
257+ let cache_value = chunk_cache. get ( & key) . await ?;
258+ if let Some ( bytes) = cache_value {
259+ return Ok ( bytes) ;
260+ }
241261 }
242262
243- let data = download_s3_object ( client, request_data, resource_manager) . await ?;
263+ let data = download_s3_object ( client, request_data, resource_manager, mem_permits ) . await ?;
244264
245265 // Write data to cache
246266 chunk_cache. set ( & key, data. clone ( ) ) . await ?;
@@ -270,6 +290,15 @@ async fn operation_handler<T: operation::Operation>(
270290 auth : Option < TypedHeader < Authorization < Basic > > > ,
271291 ValidatedJson ( request_data) : ValidatedJson < models:: RequestData > ,
272292) -> Result < models:: Response , ActiveStorageError > {
293+ // NOTE(sd109): We acquire memory permits semaphore here so that
294+ // they are owned by this top-level function and not freed until
295+ // the permits are dropped when the this function returns.
296+
297+ // If we're given a size in the request data then use this to
298+ // get an initial guess at the required memory resources.
299+ let memory = request_data. size . unwrap_or ( 0 ) ;
300+ let mut _mem_permits = state. resource_manager . memory ( memory) . await ?;
301+
273302 let credentials = if let Some ( TypedHeader ( auth) ) = auth {
274303 s3_client:: S3Credentials :: access_key ( auth. username ( ) , auth. password ( ) )
275304 } else {
@@ -283,12 +312,12 @@ async fn operation_handler<T: operation::Operation>(
283312
284313 let data = match ( & state. args . use_chunk_cache , & state. chunk_cache ) {
285314 ( false , _) => {
286- download_s3_object ( & s3_client, & request_data, & state. resource_manager )
315+ download_s3_object ( & s3_client, & request_data, & state. resource_manager , _mem_permits )
287316 . instrument ( tracing:: Span :: current ( ) )
288317 . await ?
289318 }
290319 ( true , Some ( cache) ) => {
291- download_and_cache_s3_object ( & s3_client, & request_data, & state. resource_manager , cache)
320+ download_and_cache_s3_object ( & s3_client, & request_data, & state. resource_manager , _mem_permits , cache)
292321 . await ?
293322 }
294323 ( true , None ) => panic ! (
0 commit comments