@@ -35,11 +35,14 @@ use datafusion::{
3535 error:: DataFusionError ,
3636 execution:: object_store:: { DefaultObjectStoreRegistry , ObjectStoreRegistry } ,
3737} ;
38- use futures:: stream:: { BoxStream , Stream } ;
38+ use futures:: {
39+ StreamExt ,
40+ stream:: { BoxStream , Stream } ,
41+ } ;
3942use object_store:: {
40- GetOptions , GetRange , GetResult , ListResult , MultipartUpload , ObjectMeta ,
41- ObjectStore , PutMultipartOptions , PutOptions , PutPayload , PutResult , Result ,
42- path:: Path ,
43+ CopyOptions , GetOptions , GetRange , GetResult , ListResult , MultipartUpload ,
44+ ObjectMeta , ObjectStore , PutMultipartOptions , PutOptions , PutPayload , PutResult ,
45+ Result , path:: Path ,
4346} ;
4447use parking_lot:: { Mutex , RwLock } ;
4548use url:: Url ;
@@ -228,42 +231,56 @@ impl InstrumentedObjectStore {
228231 options : GetOptions ,
229232 ) -> Result < GetResult > {
230233 let timestamp = Utc :: now ( ) ;
234+ let operation = if options. head {
235+ Operation :: Head
236+ } else {
237+ Operation :: Get
238+ } ;
231239 let range = options. range . clone ( ) ;
232240
233241 let start = Instant :: now ( ) ;
234242 let ret = self . inner . get_opts ( location, options) . await ?;
235243 let elapsed = start. elapsed ( ) ;
236244
237245 self . requests . lock ( ) . push ( RequestDetails {
238- op : Operation :: Get ,
246+ op : operation ,
239247 path : location. clone ( ) ,
240248 timestamp,
241249 duration : Some ( elapsed) ,
242- size : Some ( ( ret. range . end - ret. range . start ) as usize ) ,
250+ size : ( operation == Operation :: Get )
251+ . then_some ( ( ret. range . end - ret. range . start ) as usize ) ,
243252 range,
244253 extra_display : None ,
245254 } ) ;
246255
247256 Ok ( ret)
248257 }
249258
250- async fn instrumented_delete ( & self , location : & Path ) -> Result < ( ) > {
259+ fn instrumented_delete_stream (
260+ & self ,
261+ locations : BoxStream < ' static , Result < Path > > ,
262+ ) -> BoxStream < ' static , Result < Path > > {
251263 let timestamp = Utc :: now ( ) ;
252264 let start = Instant :: now ( ) ;
253- self . inner . delete ( location) . await ?;
254- let elapsed = start. elapsed ( ) ;
255-
256- self . requests . lock ( ) . push ( RequestDetails {
257- op : Operation :: Delete ,
258- path : location. clone ( ) ,
259- timestamp,
260- duration : Some ( elapsed) ,
261- size : None ,
262- range : None ,
263- extra_display : None ,
264- } ) ;
265-
266- Ok ( ( ) )
265+ let requests = Arc :: clone ( & self . requests ) ;
266+
267+ self . inner
268+ . delete_stream ( locations)
269+ . inspect ( move |result| {
270+ if let Ok ( path) = result {
271+ let elapsed = start. elapsed ( ) ;
272+ requests. lock ( ) . push ( RequestDetails {
273+ op : Operation :: Delete ,
274+ path : path. clone ( ) ,
275+ timestamp,
276+ duration : Some ( elapsed) ,
277+ size : None ,
278+ range : None ,
279+ extra_display : None ,
280+ } ) ;
281+ }
282+ } )
283+ . boxed ( )
267284 }
268285
269286 fn instrumented_list (
@@ -320,33 +337,15 @@ impl InstrumentedObjectStore {
320337 Ok ( ret)
321338 }
322339
323- async fn instrumented_copy ( & self , from : & Path , to : & Path ) -> Result < ( ) > {
324- let timestamp = Utc :: now ( ) ;
325- let start = Instant :: now ( ) ;
326- self . inner . copy ( from, to) . await ?;
327- let elapsed = start. elapsed ( ) ;
328-
329- self . requests . lock ( ) . push ( RequestDetails {
330- op : Operation :: Copy ,
331- path : from. clone ( ) ,
332- timestamp,
333- duration : Some ( elapsed) ,
334- size : None ,
335- range : None ,
336- extra_display : Some ( format ! ( "copy_to: {to}" ) ) ,
337- } ) ;
338-
339- Ok ( ( ) )
340- }
341-
342- async fn instrumented_copy_if_not_exists (
340+ async fn instrumented_copy_opts (
343341 & self ,
344342 from : & Path ,
345343 to : & Path ,
344+ options : CopyOptions ,
346345 ) -> Result < ( ) > {
347346 let timestamp = Utc :: now ( ) ;
348347 let start = Instant :: now ( ) ;
349- self . inner . copy_if_not_exists ( from, to) . await ?;
348+ self . inner . copy_opts ( from, to, options ) . await ?;
350349 let elapsed = start. elapsed ( ) ;
351350
352351 self . requests . lock ( ) . push ( RequestDetails {
@@ -361,25 +360,6 @@ impl InstrumentedObjectStore {
361360
362361 Ok ( ( ) )
363362 }
364-
365- async fn instrumented_head ( & self , location : & Path ) -> Result < ObjectMeta > {
366- let timestamp = Utc :: now ( ) ;
367- let start = Instant :: now ( ) ;
368- let ret = self . inner . head ( location) . await ?;
369- let elapsed = start. elapsed ( ) ;
370-
371- self . requests . lock ( ) . push ( RequestDetails {
372- op : Operation :: Head ,
373- path : location. clone ( ) ,
374- timestamp,
375- duration : Some ( elapsed) ,
376- size : None ,
377- range : None ,
378- extra_display : None ,
379- } ) ;
380-
381- Ok ( ret)
382- }
383363}
384364
385365impl fmt:: Display for InstrumentedObjectStore {
@@ -429,14 +409,6 @@ impl ObjectStore for InstrumentedObjectStore {
429409 self . inner . get_opts ( location, options) . await
430410 }
431411
432- async fn delete ( & self , location : & Path ) -> Result < ( ) > {
433- if self . enabled ( ) {
434- return self . instrumented_delete ( location) . await ;
435- }
436-
437- self . inner . delete ( location) . await
438- }
439-
440412 fn list ( & self , prefix : Option < & Path > ) -> BoxStream < ' static , Result < ObjectMeta > > {
441413 if self . enabled ( ) {
442414 return self . instrumented_list ( prefix) ;
@@ -453,28 +425,28 @@ impl ObjectStore for InstrumentedObjectStore {
453425 self . inner . list_with_delimiter ( prefix) . await
454426 }
455427
456- async fn copy ( & self , from : & Path , to : & Path ) -> Result < ( ) > {
457- if self . enabled ( ) {
458- return self . instrumented_copy ( from, to) . await ;
459- }
460-
461- self . inner . copy ( from, to) . await
462- }
463-
464- async fn copy_if_not_exists ( & self , from : & Path , to : & Path ) -> Result < ( ) > {
428+ fn delete_stream (
429+ & self ,
430+ locations : BoxStream < ' static , Result < Path > > ,
431+ ) -> BoxStream < ' static , Result < Path > > {
465432 if self . enabled ( ) {
466- return self . instrumented_copy_if_not_exists ( from , to ) . await ;
433+ return self . instrumented_delete_stream ( locations ) ;
467434 }
468435
469- self . inner . copy_if_not_exists ( from , to ) . await
436+ self . inner . delete_stream ( locations )
470437 }
471438
472- async fn head ( & self , location : & Path ) -> Result < ObjectMeta > {
439+ async fn copy_opts (
440+ & self ,
441+ from : & Path ,
442+ to : & Path ,
443+ options : CopyOptions ,
444+ ) -> Result < ( ) > {
473445 if self . enabled ( ) {
474- return self . instrumented_head ( location ) . await ;
446+ return self . instrumented_copy_opts ( from , to , options ) . await ;
475447 }
476448
477- self . inner . head ( location ) . await
449+ self . inner . copy_opts ( from , to , options ) . await
478450 }
479451}
480452
@@ -824,6 +796,7 @@ impl ObjectStoreRegistry for InstrumentedObjectStoreRegistry {
824796#[ cfg( test) ]
825797mod tests {
826798 use futures:: StreamExt ;
799+ use object_store:: ObjectStoreExt ;
827800 use object_store:: WriteMultipart ;
828801
829802 use super :: * ;
0 commit comments