Skip to content

Commit f210939

Browse files
authored
Adds Object Store Profiling options/commands to CLI (#18004)
* Adds Object Store Profiling options/commands to CLI - Adds a CLI option and command to datafusion-cli to enable or disabled object store profiling - Integrates the command with the instrumented object stores to allow the user input to change the mode of the instrumented stores - Adds tests to exercise the expected behavior of the commands - Adds user docs for the commands/CLI options - Updates visibility of `InstrumentedObjectStore` now that it needs to be interacted with outside of its module * Improves InstrumentedObjectStoreRegistry ergonomics - Adds better methods to build an InstrumentedObjectStoreRegistry to reduce code duplication in common usage - Enhances test success criteria - Normalizes method names
1 parent 8772411 commit f210939

File tree

6 files changed

+249
-36
lines changed

6 files changed

+249
-36
lines changed

datafusion-cli/examples/cli-session-context.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@ use datafusion::{
2828
prelude::SessionContext,
2929
};
3030
use datafusion_cli::{
31-
cli_context::CliSessionContext, exec::exec_from_repl, print_options::PrintOptions,
31+
cli_context::CliSessionContext, exec::exec_from_repl,
32+
object_storage::instrumented::InstrumentedObjectStoreRegistry,
33+
print_options::PrintOptions,
3234
};
3335
use object_store::ObjectStore;
3436

@@ -89,6 +91,7 @@ pub async fn main() {
8991
quiet: false,
9092
maxrows: datafusion_cli::print_options::MaxRows::Unlimited,
9193
color: true,
94+
instrumented_registry: Arc::new(InstrumentedObjectStoreRegistry::new()),
9295
};
9396

9497
exec_from_repl(&my_ctx, &mut print_options).await.unwrap();

datafusion-cli/src/command.rs

Lines changed: 84 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ pub enum Command {
4646
SearchFunctions(String),
4747
QuietMode(Option<bool>),
4848
OutputFormat(Option<String>),
49+
ObjectStoreProfileMode(Option<String>),
4950
}
5051

5152
pub enum OutputFormat {
@@ -122,6 +123,29 @@ impl Command {
122123
Self::OutputFormat(_) => exec_err!(
123124
"Unexpected change output format, this should be handled outside"
124125
),
126+
Self::ObjectStoreProfileMode(mode) => {
127+
if let Some(mode) = mode {
128+
let profile_mode = mode
129+
.parse()
130+
.map_err(|_|
131+
exec_datafusion_err!("Failed to parse input: {mode}. Valid options are disabled, enabled")
132+
)?;
133+
print_options
134+
.instrumented_registry
135+
.set_instrument_mode(profile_mode);
136+
println!(
137+
"ObjectStore Profile mode set to {}",
138+
print_options.instrumented_registry.instrument_mode()
139+
);
140+
} else {
141+
println!(
142+
"ObjectStore Profile mode is {}",
143+
print_options.instrumented_registry.instrument_mode()
144+
);
145+
}
146+
147+
Ok(())
148+
}
125149
}
126150
}
127151

@@ -140,11 +164,15 @@ impl Command {
140164
Self::OutputFormat(_) => {
141165
("\\pset [NAME [VALUE]]", "set table output option\n(format)")
142166
}
167+
Self::ObjectStoreProfileMode(_) => (
168+
"\\object_store_profiling (disabled|enabled)",
169+
"print or set object store profile mode",
170+
),
143171
}
144172
}
145173
}
146174

147-
const ALL_COMMANDS: [Command; 9] = [
175+
const ALL_COMMANDS: [Command; 10] = [
148176
Command::ListTables,
149177
Command::DescribeTableStmt(String::new()),
150178
Command::Quit,
@@ -154,6 +182,7 @@ const ALL_COMMANDS: [Command; 9] = [
154182
Command::SearchFunctions(String::new()),
155183
Command::QuietMode(None),
156184
Command::OutputFormat(None),
185+
Command::ObjectStoreProfileMode(None),
157186
];
158187

159188
fn all_commands_info() -> RecordBatch {
@@ -204,6 +233,10 @@ impl FromStr for Command {
204233
Self::OutputFormat(Some(subcommand.to_string()))
205234
}
206235
("pset", None) => Self::OutputFormat(None),
236+
("object_store_profiling", Some(mode)) => {
237+
Self::ObjectStoreProfileMode(Some(mode.to_string()))
238+
}
239+
("object_store_profiling", None) => Self::ObjectStoreProfileMode(None),
207240
_ => return Err(()),
208241
})
209242
}
@@ -244,3 +277,53 @@ impl OutputFormat {
244277
}
245278
}
246279
}
280+
281+
#[cfg(test)]
282+
mod tests {
283+
use datafusion::prelude::SessionContext;
284+
285+
use crate::{
286+
object_storage::instrumented::{
287+
InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry,
288+
},
289+
print_options::MaxRows,
290+
};
291+
292+
use super::*;
293+
294+
#[tokio::test]
295+
async fn command_execute_profile_mode() {
296+
let ctx = SessionContext::new();
297+
298+
let mut print_options = PrintOptions {
299+
format: PrintFormat::Automatic,
300+
quiet: false,
301+
maxrows: MaxRows::Unlimited,
302+
color: true,
303+
instrumented_registry: Arc::new(InstrumentedObjectStoreRegistry::new()),
304+
};
305+
306+
let mut cmd: Command = "object_store_profiling"
307+
.parse()
308+
.expect("expected parse to succeed");
309+
assert!(cmd.execute(&ctx, &mut print_options).await.is_ok());
310+
assert_eq!(
311+
print_options.instrumented_registry.instrument_mode(),
312+
InstrumentedObjectStoreMode::default()
313+
);
314+
315+
cmd = "object_store_profiling enabled"
316+
.parse()
317+
.expect("expected parse to succeed");
318+
assert!(cmd.execute(&ctx, &mut print_options).await.is_ok());
319+
assert_eq!(
320+
print_options.instrumented_registry.instrument_mode(),
321+
InstrumentedObjectStoreMode::Enabled
322+
);
323+
324+
cmd = "object_store_profiling does_not_exist"
325+
.parse()
326+
.expect("expected parse to succeed");
327+
assert!(cmd.execute(&ctx, &mut print_options).await.is_err());
328+
}
329+
}

datafusion-cli/src/main.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ use datafusion::execution::context::SessionConfig;
2727
use datafusion::execution::memory_pool::{
2828
FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool,
2929
};
30-
use datafusion::execution::object_store::DefaultObjectStoreRegistry;
3130
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
3231
use datafusion::logical_expr::ExplainFormat;
3332
use datafusion::prelude::SessionContext;
@@ -149,6 +148,13 @@ struct Args {
149148
value_parser(extract_disk_limit)
150149
)]
151150
disk_limit: Option<usize>,
151+
152+
#[clap(
153+
long,
154+
help = "Specify the default object_store_profiling mode, defaults to 'disabled'.\n[possible values: disabled, enabled]",
155+
default_value_t = InstrumentedObjectStoreMode::Disabled
156+
)]
157+
object_store_profiling: InstrumentedObjectStoreMode,
152158
}
153159

154160
#[tokio::main]
@@ -210,10 +216,10 @@ async fn main_inner() -> Result<()> {
210216
rt_builder = rt_builder.with_disk_manager_builder(builder);
211217
}
212218

213-
let instrumented_registry = Arc::new(InstrumentedObjectStoreRegistry::new(
214-
Arc::new(DefaultObjectStoreRegistry::new()),
215-
InstrumentedObjectStoreMode::default(),
216-
));
219+
let instrumented_registry = Arc::new(
220+
InstrumentedObjectStoreRegistry::new()
221+
.with_profile_mode(args.object_store_profiling),
222+
);
217223
rt_builder = rt_builder.with_object_store_registry(instrumented_registry.clone());
218224

219225
let runtime_env = rt_builder.build_arc()?;
@@ -243,6 +249,7 @@ async fn main_inner() -> Result<()> {
243249
quiet: args.quiet,
244250
maxrows: args.maxrows,
245251
color: args.color,
252+
instrumented_registry: Arc::clone(&instrumented_registry),
246253
};
247254

248255
let commands = args.command;

datafusion-cli/src/object_storage/instrumented.rs

Lines changed: 66 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,20 @@ use std::{
2525
};
2626

2727
use async_trait::async_trait;
28-
use datafusion::{error::DataFusionError, execution::object_store::ObjectStoreRegistry};
28+
use datafusion::{
29+
error::DataFusionError,
30+
execution::object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry},
31+
};
2932
use futures::stream::BoxStream;
3033
use object_store::{
3134
path::Path, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta,
3235
ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
3336
};
37+
use parking_lot::RwLock;
3438
use url::Url;
3539

36-
/// The profiling mode to use for an [`ObjectStore`] instance that has been instrumented to collect
37-
/// profiling data. Collecting profiling data will have a small negative impact on both CPU and
38-
/// memory usage. Default is `Disabled`
40+
/// The profiling mode to use for an [`InstrumentedObjectStore`] instance. Collecting profiling
41+
/// data will have a small negative impact on both CPU and memory usage. Default is `Disabled`
3942
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
4043
pub enum InstrumentedObjectStoreMode {
4144
/// Disable collection of profiling data
@@ -75,7 +78,7 @@ impl From<u8> for InstrumentedObjectStoreMode {
7578
/// Wrapped [`ObjectStore`] instances that record information for reporting on the usage of the
7679
/// inner [`ObjectStore`]
7780
#[derive(Debug)]
78-
struct InstrumentedObjectStore {
81+
pub struct InstrumentedObjectStore {
7982
inner: Arc<dyn ObjectStore>,
8083
instrument_mode: AtomicU8,
8184
}
@@ -88,6 +91,10 @@ impl InstrumentedObjectStore {
8891
instrument_mode,
8992
}
9093
}
94+
95+
fn set_instrument_mode(&self, mode: InstrumentedObjectStoreMode) {
96+
self.instrument_mode.store(mode as u8, Ordering::Relaxed)
97+
}
9198
}
9299

93100
impl fmt::Display for InstrumentedObjectStore {
@@ -150,23 +157,53 @@ impl ObjectStore for InstrumentedObjectStore {
150157
}
151158
}
152159

153-
/// Provides access to [`ObjectStore`] instances that record requests for reporting
160+
/// Provides access to [`InstrumentedObjectStore`] instances that record requests for reporting
154161
#[derive(Debug)]
155162
pub struct InstrumentedObjectStoreRegistry {
156163
inner: Arc<dyn ObjectStoreRegistry>,
157-
instrument_mode: InstrumentedObjectStoreMode,
164+
instrument_mode: AtomicU8,
165+
stores: RwLock<Vec<Arc<InstrumentedObjectStore>>>,
166+
}
167+
168+
impl Default for InstrumentedObjectStoreRegistry {
169+
fn default() -> Self {
170+
Self::new()
171+
}
158172
}
159173

160174
impl InstrumentedObjectStoreRegistry {
161175
/// Returns a new [`InstrumentedObjectStoreRegistry`] that wraps the provided
162176
/// [`ObjectStoreRegistry`]
163-
pub fn new(
164-
registry: Arc<dyn ObjectStoreRegistry>,
165-
default_mode: InstrumentedObjectStoreMode,
166-
) -> Self {
177+
pub fn new() -> Self {
167178
Self {
168-
inner: registry,
169-
instrument_mode: default_mode,
179+
inner: Arc::new(DefaultObjectStoreRegistry::new()),
180+
instrument_mode: AtomicU8::new(InstrumentedObjectStoreMode::default() as u8),
181+
stores: RwLock::new(Vec::new()),
182+
}
183+
}
184+
185+
pub fn with_profile_mode(self, mode: InstrumentedObjectStoreMode) -> Self {
186+
self.instrument_mode.store(mode as u8, Ordering::Relaxed);
187+
self
188+
}
189+
190+
/// Provides access to all of the [`InstrumentedObjectStore`]s managed by this
191+
/// [`InstrumentedObjectStoreRegistry`]
192+
pub fn stores(&self) -> Vec<Arc<InstrumentedObjectStore>> {
193+
self.stores.read().clone()
194+
}
195+
196+
/// Returns the current [`InstrumentedObjectStoreMode`] for this
197+
/// [`InstrumentedObjectStoreRegistry`]
198+
pub fn instrument_mode(&self) -> InstrumentedObjectStoreMode {
199+
self.instrument_mode.load(Ordering::Relaxed).into()
200+
}
201+
202+
/// Sets the [`InstrumentedObjectStoreMode`] for this [`InstrumentedObjectStoreRegistry`]
203+
pub fn set_instrument_mode(&self, mode: InstrumentedObjectStoreMode) {
204+
self.instrument_mode.store(mode as u8, Ordering::Relaxed);
205+
for s in self.stores.read().iter() {
206+
s.set_instrument_mode(mode)
170207
}
171208
}
172209
}
@@ -177,8 +214,10 @@ impl ObjectStoreRegistry for InstrumentedObjectStoreRegistry {
177214
url: &Url,
178215
store: Arc<dyn ObjectStore>,
179216
) -> Option<Arc<dyn ObjectStore>> {
180-
let mode = AtomicU8::new(self.instrument_mode as u8);
181-
let instrumented = Arc::new(InstrumentedObjectStore::new(store, mode));
217+
let mode = self.instrument_mode.load(Ordering::Relaxed);
218+
let instrumented =
219+
Arc::new(InstrumentedObjectStore::new(store, AtomicU8::new(mode)));
220+
self.stores.write().push(Arc::clone(&instrumented));
182221
self.inner.register_store(url, instrumented)
183222
}
184223

@@ -189,8 +228,6 @@ impl ObjectStoreRegistry for InstrumentedObjectStoreRegistry {
189228

190229
#[cfg(test)]
191230
mod tests {
192-
use datafusion::execution::object_store::DefaultObjectStoreRegistry;
193-
194231
use super::*;
195232

196233
#[test]
@@ -219,17 +256,23 @@ mod tests {
219256

220257
#[test]
221258
fn instrumented_registry() {
222-
let reg = Arc::new(InstrumentedObjectStoreRegistry::new(
223-
Arc::new(DefaultObjectStoreRegistry::new()),
224-
InstrumentedObjectStoreMode::default(),
225-
));
226-
let store = object_store::memory::InMemory::new();
259+
let mut reg = InstrumentedObjectStoreRegistry::new();
260+
assert!(reg.stores().is_empty());
261+
assert_eq!(
262+
reg.instrument_mode(),
263+
InstrumentedObjectStoreMode::default()
264+
);
227265

266+
reg = reg.with_profile_mode(InstrumentedObjectStoreMode::Enabled);
267+
assert_eq!(reg.instrument_mode(), InstrumentedObjectStoreMode::Enabled);
268+
269+
let store = object_store::memory::InMemory::new();
228270
let url = "mem://test".parse().unwrap();
229271
let registered = reg.register_store(&url, Arc::new(store));
230272
assert!(registered.is_none());
231273

232274
let fetched = reg.get_store(&url);
233-
assert!(fetched.is_ok())
275+
assert!(fetched.is_ok());
276+
assert_eq!(reg.stores().len(), 1);
234277
}
235278
}

0 commit comments

Comments
 (0)