1515// specific language governing permissions and limitations
1616// under the License.
1717
18- use std:: { fmt, sync:: Arc } ;
18+ use std:: {
19+ fmt,
20+ str:: FromStr ,
21+ sync:: {
22+ atomic:: { AtomicU8 , Ordering } ,
23+ Arc ,
24+ } ,
25+ } ;
1926
2027use async_trait:: async_trait;
21- use datafusion:: execution:: object_store:: ObjectStoreRegistry ;
28+ use datafusion:: { error :: DataFusionError , execution:: object_store:: ObjectStoreRegistry } ;
2229use futures:: stream:: BoxStream ;
2330use object_store:: {
2431 path:: Path , GetOptions , GetResult , ListResult , MultipartUpload , ObjectMeta ,
2532 ObjectStore , PutMultipartOptions , PutOptions , PutPayload , PutResult , Result ,
2633} ;
2734use url:: Url ;
2835
36+ /// The profiling mode to use for an [`ObjectStore`] instance that has been instrumented to collect
37+ /// profiling data. Collecting profiling data will have a small negative impact on both CPU and
38+ /// memory usage. Default is `Disabled`
39+ #[ derive( Copy , Clone , Debug , Default , PartialEq , Eq ) ]
40+ pub enum InstrumentedObjectStoreMode {
41+ /// Disable collection of profiling data
42+ #[ default]
43+ Disabled ,
44+ /// Enable collection of profiling data
45+ Enabled ,
46+ }
47+
48+ impl fmt:: Display for InstrumentedObjectStoreMode {
49+ fn fmt ( & self , f : & mut std:: fmt:: Formatter < ' _ > ) -> std:: fmt:: Result {
50+ write ! ( f, "{:?}" , self )
51+ }
52+ }
53+
54+ impl FromStr for InstrumentedObjectStoreMode {
55+ type Err = DataFusionError ;
56+
57+ fn from_str ( s : & str ) -> std:: result:: Result < Self , Self :: Err > {
58+ match s. to_lowercase ( ) . as_str ( ) {
59+ "disabled" => Ok ( Self :: Disabled ) ,
60+ "enabled" => Ok ( Self :: Enabled ) ,
61+ _ => Err ( DataFusionError :: Execution ( format ! ( "Unrecognized mode {s}" ) ) ) ,
62+ }
63+ }
64+ }
65+
66+ impl From < u8 > for InstrumentedObjectStoreMode {
67+ fn from ( value : u8 ) -> Self {
68+ match value {
69+ 1 => InstrumentedObjectStoreMode :: Enabled ,
70+ _ => InstrumentedObjectStoreMode :: Disabled ,
71+ }
72+ }
73+ }
74+
2975/// Wrapped [`ObjectStore`] instances that record information for reporting on the usage of the
3076/// inner [`ObjectStore`]
3177#[ derive( Debug ) ]
3278struct InstrumentedObjectStore {
3379 inner : Arc < dyn ObjectStore > ,
80+ instrument_mode : AtomicU8 ,
3481}
3582
3683impl InstrumentedObjectStore {
3784 /// Returns a new [`InstrumentedObjectStore`] that wraps the provided [`ObjectStore`]
38- fn new ( object_store : Arc < dyn ObjectStore > ) -> Self {
85+ fn new ( object_store : Arc < dyn ObjectStore > , instrument_mode : AtomicU8 ) -> Self {
3986 Self {
4087 inner : object_store,
88+ instrument_mode,
4189 }
4290 }
4391}
4492
4593impl fmt:: Display for InstrumentedObjectStore {
4694 fn fmt ( & self , f : & mut std:: fmt:: Formatter < ' _ > ) -> std:: fmt:: Result {
47- write ! ( f, "Instrumented Object Store: {}" , self . inner)
95+ let mode: InstrumentedObjectStoreMode =
96+ self . instrument_mode . load ( Ordering :: Relaxed ) . into ( ) ;
97+ write ! (
98+ f,
99+ "Instrumented Object Store: instrument_mode: {mode}, inner: {}" ,
100+ self . inner
101+ )
48102 }
49103}
50104
@@ -100,13 +154,20 @@ impl ObjectStore for InstrumentedObjectStore {
100154#[ derive( Debug ) ]
101155pub struct InstrumentedObjectStoreRegistry {
102156 inner : Arc < dyn ObjectStoreRegistry > ,
157+ instrument_mode : InstrumentedObjectStoreMode ,
103158}
104159
105160impl InstrumentedObjectStoreRegistry {
106161 /// Returns a new [`InstrumentedObjectStoreRegistry`] that wraps the provided
107162 /// [`ObjectStoreRegistry`]
108- pub fn new ( registry : Arc < dyn ObjectStoreRegistry > ) -> Self {
109- Self { inner : registry }
163+ pub fn new (
164+ registry : Arc < dyn ObjectStoreRegistry > ,
165+ default_mode : InstrumentedObjectStoreMode ,
166+ ) -> Self {
167+ Self {
168+ inner : registry,
169+ instrument_mode : default_mode,
170+ }
110171 }
111172}
112173
@@ -116,7 +177,8 @@ impl ObjectStoreRegistry for InstrumentedObjectStoreRegistry {
116177 url : & Url ,
117178 store : Arc < dyn ObjectStore > ,
118179 ) -> Option < Arc < dyn ObjectStore > > {
119- let instrumented = Arc :: new ( InstrumentedObjectStore :: new ( store) ) ;
180+ let mode = AtomicU8 :: new ( self . instrument_mode as u8 ) ;
181+ let instrumented = Arc :: new ( InstrumentedObjectStore :: new ( store, mode) ) ;
120182 self . inner . register_store ( url, instrumented)
121183 }
122184
@@ -131,11 +193,36 @@ mod tests {
131193
132194 use super :: * ;
133195
196+ #[ test]
197+ fn instrumented_mode ( ) {
198+ assert ! ( matches!(
199+ InstrumentedObjectStoreMode :: default ( ) ,
200+ InstrumentedObjectStoreMode :: Disabled
201+ ) ) ;
202+
203+ assert ! ( matches!(
204+ "dIsABleD" . parse( ) . unwrap( ) ,
205+ InstrumentedObjectStoreMode :: Disabled
206+ ) ) ;
207+ assert ! ( matches!(
208+ "EnABlEd" . parse( ) . unwrap( ) ,
209+ InstrumentedObjectStoreMode :: Enabled
210+ ) ) ;
211+ assert ! ( "does_not_exist"
212+ . parse:: <InstrumentedObjectStoreMode >( )
213+ . is_err( ) ) ;
214+
215+ assert ! ( matches!( 0 . into( ) , InstrumentedObjectStoreMode :: Disabled ) ) ;
216+ assert ! ( matches!( 1 . into( ) , InstrumentedObjectStoreMode :: Enabled ) ) ;
217+ assert ! ( matches!( 2 . into( ) , InstrumentedObjectStoreMode :: Disabled ) ) ;
218+ }
219+
134220 #[ test]
135221 fn instrumented_registry ( ) {
136- let reg = Arc :: new ( InstrumentedObjectStoreRegistry :: new ( Arc :: new (
137- DefaultObjectStoreRegistry :: new ( ) ,
138- ) ) ) ;
222+ let reg = Arc :: new ( InstrumentedObjectStoreRegistry :: new (
223+ Arc :: new ( DefaultObjectStoreRegistry :: new ( ) ) ,
224+ InstrumentedObjectStoreMode :: default ( ) ,
225+ ) ) ;
139226 let store = object_store:: memory:: InMemory :: new ( ) ;
140227
141228 let url = "mem://test" . parse ( ) . unwrap ( ) ;
0 commit comments