@@ -182,33 +182,41 @@ async fn schema() -> &'static str {
182182
183183/// Download an object from S3
184184///
185+ /// Requests a byte range if `offset` or `size` is specified in the request.
186+ ///
185187/// # Arguments
186188///
187189/// * `client`: S3 client object
188- /// * `bucket`: Name of the bucket
189- /// * `key`: Name of the object in the bucket
190- /// * `range`: Optional byte range
190+ /// * `request_data`: RequestData object for the request
191191/// * `resource_manager`: ResourceManager object
192192/// * `mem_permits`: Memory permits for the request
193- #[ tracing:: instrument( level = "DEBUG" , skip( client, bucket , key , range , resource_manager) ) ]
193+ #[ tracing:: instrument( level = "DEBUG" , skip( client, request_data , resource_manager) ) ]
194194async fn download_s3_object < ' a > (
195195 client : & s3_client:: S3Client ,
196- bucket : & str ,
197- key : & str ,
198- range : Option < String > ,
196+ request_data : & models:: RequestData ,
199197 resource_manager : & ' a ResourceManager ,
200198 mut mem_permits : Option < SemaphorePermit < ' a > > ,
201199) -> Result < Bytes , ActiveStorageError > {
200+ // Convert request data to byte range for S3 request
201+ let range = s3_client:: get_range ( request_data. offset , request_data. size ) ;
202202 // Acquire connection permit to be freed via drop when this function returns
203203 let _conn_permits = resource_manager. s3_connection ( ) . await ?;
204204
205205 client
206- . download_object ( bucket, key, range, resource_manager, & mut mem_permits)
206+ . download_object (
207+ & request_data. bucket ,
208+ & request_data. object ,
209+ range,
210+ resource_manager,
211+ & mut mem_permits,
212+ )
207213 . await
208214}
209215
210216/// Download and cache an object from S3
211217///
218+ /// Requests a byte range if `offset` or `size` is specified in the request.
219+ ///
212220/// # Arguments
213221///
214222/// * `client`: S3 client object
@@ -232,12 +240,14 @@ async fn download_and_cache_s3_object<'a>(
232240 // which may feasibly indicate a change to the upstream object
233241 // lead to a new cache key.
234242 let key = format ! (
235- "{}-{}-{}-{}-{:?}-{:?}" ,
243+ "{}-{}-{}-{}-{:?}-{:?}-{:?}-{:?} " ,
236244 request_data. source. as_str( ) ,
237245 request_data. bucket,
238246 request_data. object,
239247 request_data. dtype,
240248 request_data. byte_order,
249+ request_data. offset,
250+ request_data. size,
241251 request_data. compression,
242252 ) ;
243253
@@ -279,35 +289,19 @@ async fn download_and_cache_s3_object<'a>(
279289 . instrument ( tracing:: Span :: current ( ) )
280290 . await ?;
281291 if let Some ( bytes) = cache_value {
282- return Ok ( s3_client:: apply_range (
283- bytes,
284- request_data. offset ,
285- request_data. size ,
286- ) ) ;
292+ return Ok ( bytes) ;
287293 }
288294 }
289295
290- let data = download_s3_object (
291- client,
292- & request_data. bucket ,
293- & request_data. object ,
294- None ,
295- resource_manager,
296- mem_permits,
297- )
298- . await ?;
296+ let data = download_s3_object ( client, request_data, resource_manager, mem_permits) . await ?;
299297
300298 // Write data to cache
301299 chunk_cache. set ( & key, & data) . await ?;
302300
303301 // Increment the prometheus metric for cache misses
304302 LOCAL_CACHE_MISSES . with_label_values ( & [ "disk" ] ) . inc ( ) ;
305303
306- Ok ( s3_client:: apply_range (
307- data,
308- request_data. offset ,
309- request_data. size ,
310- ) )
304+ Ok ( data)
311305}
312306
313307/// Handler for Active Storage operations
@@ -351,9 +345,7 @@ async fn operation_handler<T: operation::Operation>(
351345
352346 let data = match ( & state. args . use_chunk_cache , & state. chunk_cache ) {
353347 ( false , _) => {
354- // Convert request data offset and size to byte range for S3 request
355- let range = s3_client:: get_range ( request_data. offset , request_data. size ) ;
356- download_s3_object ( & s3_client, & request_data. bucket , & request_data. object , range, & state. resource_manager , _mem_permits)
348+ download_s3_object ( & s3_client, & request_data, & state. resource_manager , _mem_permits)
357349 . instrument ( tracing:: Span :: current ( ) )
358350 . await ?
359351 }
@@ -405,7 +397,10 @@ fn operation<T: operation::Operation>(
405397 assert_eq ! ( ptr, data. as_ptr( ) ) ;
406398 }
407399 // Convert to a mutable vector to allow in-place byte order conversion.
400+ let ptr = data. as_ptr ( ) ;
408401 let vec: Vec < u8 > = data. into ( ) ;
402+ // Assert that we're using zero-copy.
403+ assert_eq ! ( ptr, vec. as_ptr( ) ) ;
409404 debug_span ! ( "operation" ) . in_scope ( || T :: execute ( & request_data, vec) )
410405}
411406
0 commit comments