Skip to content

Commit a1acf77

Browse files
BlakeOrthalamb
andauthored
Adds instrumentation to get requests for datafusion-cli (#18016)
* Adds instrumentation to get requests for datafusion-cli - Adds request instrumentation to GET operations when using the InstrumentedObjectStore in datafusion-cli - Adds RequestDetails type to store and format details about instrumented requests - Adds tests for new functionality * Add cli-integration test * Verify the contents of the reqest --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent f210939 commit a1acf77

File tree

6 files changed

+282
-12
lines changed

6 files changed

+282
-12
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion-cli/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ arrow = { workspace = true }
3939
async-trait = { workspace = true }
4040
aws-config = "1.8.6"
4141
aws-credential-types = "1.2.7"
42+
chrono = { workspace = true }
4243
clap = { version = "4.5.47", features = ["derive", "cargo"] }
4344
datafusion = { workspace = true, features = [
4445
"avro",

datafusion-cli/src/object_storage/instrumented.rs

Lines changed: 147 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,22 @@ use std::{
2222
atomic::{AtomicU8, Ordering},
2323
Arc,
2424
},
25+
time::Duration,
2526
};
2627

2728
use async_trait::async_trait;
29+
use chrono::Utc;
2830
use datafusion::{
31+
common::instant::Instant,
2932
error::DataFusionError,
3033
execution::object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry},
3134
};
3235
use futures::stream::BoxStream;
3336
use object_store::{
34-
path::Path, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta,
37+
path::Path, GetOptions, GetRange, GetResult, ListResult, MultipartUpload, ObjectMeta,
3538
ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
3639
};
37-
use parking_lot::RwLock;
40+
use parking_lot::{Mutex, RwLock};
3841
use url::Url;
3942

4043
/// The profiling mode to use for an [`InstrumentedObjectStore`] instance. Collecting profiling
@@ -81,6 +84,7 @@ impl From<u8> for InstrumentedObjectStoreMode {
8184
pub struct InstrumentedObjectStore {
8285
inner: Arc<dyn ObjectStore>,
8386
instrument_mode: AtomicU8,
87+
requests: Mutex<Vec<RequestDetails>>,
8488
}
8589

8690
impl InstrumentedObjectStore {
@@ -89,12 +93,46 @@ impl InstrumentedObjectStore {
8993
Self {
9094
inner: object_store,
9195
instrument_mode,
96+
requests: Mutex::new(Vec::new()),
9297
}
9398
}
9499

95100
fn set_instrument_mode(&self, mode: InstrumentedObjectStoreMode) {
96101
self.instrument_mode.store(mode as u8, Ordering::Relaxed)
97102
}
103+
104+
/// Returns all [`RequestDetails`] accumulated in this [`InstrumentedObjectStore`] and clears
105+
/// the stored requests
106+
pub fn take_requests(&self) -> Vec<RequestDetails> {
107+
let mut req = self.requests.lock();
108+
109+
req.drain(..).collect()
110+
}
111+
112+
async fn instrumented_get_opts(
113+
&self,
114+
location: &Path,
115+
options: GetOptions,
116+
) -> Result<GetResult> {
117+
let timestamp = Utc::now();
118+
let range = options.range.clone();
119+
120+
let start = Instant::now();
121+
let ret = self.inner.get_opts(location, options).await?;
122+
let elapsed = start.elapsed();
123+
124+
self.requests.lock().push(RequestDetails {
125+
op: Operation::Get,
126+
path: location.clone(),
127+
timestamp,
128+
duration: Some(elapsed),
129+
size: Some((ret.range.end - ret.range.start) as usize),
130+
range,
131+
extra_display: None,
132+
});
133+
134+
Ok(ret)
135+
}
98136
}
99137

100138
impl fmt::Display for InstrumentedObjectStore {
@@ -129,6 +167,12 @@ impl ObjectStore for InstrumentedObjectStore {
129167
}
130168

131169
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
170+
if self.instrument_mode.load(Ordering::Relaxed)
171+
!= InstrumentedObjectStoreMode::Disabled as u8
172+
{
173+
return self.instrumented_get_opts(location, options).await;
174+
}
175+
132176
self.inner.get_opts(location, options).await
133177
}
134178

@@ -157,6 +201,55 @@ impl ObjectStore for InstrumentedObjectStore {
157201
}
158202
}
159203

204+
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
205+
enum Operation {
206+
_Copy,
207+
_Delete,
208+
Get,
209+
_Head,
210+
_List,
211+
_Put,
212+
}
213+
214+
/// Holds profiling details about individual requests made through an [`InstrumentedObjectStore`]
215+
#[derive(Debug)]
216+
pub struct RequestDetails {
217+
op: Operation,
218+
path: Path,
219+
timestamp: chrono::DateTime<Utc>,
220+
duration: Option<Duration>,
221+
size: Option<usize>,
222+
range: Option<GetRange>,
223+
extra_display: Option<String>,
224+
}
225+
226+
impl fmt::Display for RequestDetails {
227+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
228+
let mut output_parts = vec![format!(
229+
"{} operation={:?}",
230+
self.timestamp.to_rfc3339(),
231+
self.op
232+
)];
233+
234+
if let Some(d) = self.duration {
235+
output_parts.push(format!("duration={:.6}s", d.as_secs_f32()));
236+
}
237+
if let Some(s) = self.size {
238+
output_parts.push(format!("size={s}"));
239+
}
240+
if let Some(r) = &self.range {
241+
output_parts.push(format!("range: {r}"));
242+
}
243+
output_parts.push(format!("path={}", self.path));
244+
245+
if let Some(ed) = &self.extra_display {
246+
output_parts.push(ed.clone());
247+
}
248+
249+
write!(f, "{}", output_parts.join(" "))
250+
}
251+
}
252+
160253
/// Provides access to [`InstrumentedObjectStore`] instances that record requests for reporting
161254
#[derive(Debug)]
162255
pub struct InstrumentedObjectStoreRegistry {
@@ -275,4 +368,56 @@ mod tests {
275368
assert!(fetched.is_ok());
276369
assert_eq!(reg.stores().len(), 1);
277370
}
371+
372+
#[tokio::test]
373+
async fn instrumented_store() {
374+
let store = Arc::new(object_store::memory::InMemory::new());
375+
let mode = AtomicU8::new(InstrumentedObjectStoreMode::default() as u8);
376+
let instrumented = InstrumentedObjectStore::new(store, mode);
377+
378+
// Load the test store with some data we can read
379+
let path = Path::from("test/data");
380+
let payload = PutPayload::from_static(b"test_data");
381+
instrumented.put(&path, payload).await.unwrap();
382+
383+
// By default no requests should be instrumented/stored
384+
assert!(instrumented.requests.lock().is_empty());
385+
let _ = instrumented.get(&path).await.unwrap();
386+
assert!(instrumented.requests.lock().is_empty());
387+
388+
instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Enabled);
389+
assert!(instrumented.requests.lock().is_empty());
390+
let _ = instrumented.get(&path).await.unwrap();
391+
assert_eq!(instrumented.requests.lock().len(), 1);
392+
393+
let mut requests = instrumented.take_requests();
394+
assert_eq!(requests.len(), 1);
395+
assert!(instrumented.requests.lock().is_empty());
396+
397+
let request = requests.pop().unwrap();
398+
assert_eq!(request.op, Operation::Get);
399+
assert_eq!(request.path, path);
400+
assert!(request.duration.is_some());
401+
assert_eq!(request.size, Some(9));
402+
assert_eq!(request.range, None);
403+
assert!(request.extra_display.is_none());
404+
}
405+
406+
#[test]
407+
fn request_details() {
408+
let rd = RequestDetails {
409+
op: Operation::Get,
410+
path: Path::from("test"),
411+
timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
412+
duration: Some(Duration::new(5, 0)),
413+
size: Some(10),
414+
range: Some((..10).into()),
415+
extra_display: Some(String::from("extra info")),
416+
};
417+
418+
assert_eq!(
419+
format!("{rd}"),
420+
"1970-01-01T00:00:00+00:00 operation=Get duration=5.000000s size=10 range: bytes=0-9 path=test extra info"
421+
);
422+
}
278423
}

datafusion-cli/src/print_options.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,16 @@ impl PrintOptions {
193193
{
194194
writeln!(writer, "{OBJECT_STORE_PROFILING_HEADER}")?;
195195
for store in self.instrumented_registry.stores() {
196-
writeln!(writer, "{store}")?;
196+
let requests = store.take_requests();
197+
198+
if !requests.is_empty() {
199+
writeln!(writer, "{store}")?;
200+
for req in requests.iter() {
201+
writeln!(writer, "{req}")?;
202+
}
203+
// Add an extra blank line to help visually organize the output
204+
writeln!(writer)?;
205+
}
197206
}
198207
}
199208
}

datafusion-cli/tests/cli_integration.rs

Lines changed: 70 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use std::process::Command;
1919

2020
use rstest::rstest;
2121

22+
use async_trait::async_trait;
2223
use insta::{glob, Settings};
2324
use insta_cmd::{assert_cmd_snapshot, get_cargo_bin};
2425
use std::path::PathBuf;
@@ -374,8 +375,6 @@ async fn test_s3_url_fallback() {
374375
settings.set_snapshot_suffix("s3_url_fallback");
375376
let _bound = settings.bind_to_scope();
376377

377-
let port = container.get_host_port_ipv4(9000).await.unwrap();
378-
379378
// Create a table using a prefix path (without trailing slash)
380379
// This should trigger the fallback logic where head() fails on the prefix
381380
// and list() is used to discover the actual files
@@ -389,11 +388,73 @@ OPTIONS (
389388
SELECT * FROM partitioned_data ORDER BY column_1, column_2 LIMIT 5;
390389
"#;
391390

392-
assert_cmd_snapshot!(cli()
393-
.env_clear()
394-
.env("AWS_ACCESS_KEY_ID", "TEST-DataFusionLogin")
395-
.env("AWS_SECRET_ACCESS_KEY", "TEST-DataFusionPassword")
396-
.env("AWS_ENDPOINT", format!("http://localhost:{port}"))
397-
.env("AWS_ALLOW_HTTP", "true")
398-
.pass_stdin(input));
391+
assert_cmd_snapshot!(cli().with_minio(&container).await.pass_stdin(input));
392+
}
393+
394+
/// Validate object store profiling output
395+
#[tokio::test]
396+
async fn test_object_store_profiling() {
397+
if env::var("TEST_STORAGE_INTEGRATION").is_err() {
398+
eprintln!("Skipping external storages integration tests");
399+
return;
400+
}
401+
402+
let container = setup_minio_container().await;
403+
404+
let mut settings = make_settings();
405+
settings.set_snapshot_suffix("s3_url_fallback");
406+
407+
// as the object store profiling contains timestamps and durations, we must
408+
// filter them out to have stable snapshots
409+
//
410+
// Example line to filter:
411+
// 2025-10-11T12:02:59.722646+00:00 operation=Get duration=0.001495s size=1006 path=cars.csv
412+
// Output:
413+
// <TIMESTAMP> operation=Get duration=[DURATION] size=1006 path=cars.csv
414+
settings.add_filter(
415+
r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?[+-]\d{2}:\d{2} operation=(Get|Put|Delete|List|Head) duration=\d+\.\d{6}s size=(\d+) path=(.*)",
416+
"<TIMESTAMP> operation=$1 duration=[DURATION] size=$2 path=$3",
417+
);
418+
419+
let _bound = settings.bind_to_scope();
420+
421+
let input = r#"
422+
CREATE EXTERNAL TABLE CARS
423+
STORED AS CSV
424+
LOCATION 's3://data/cars.csv';
425+
426+
-- Initial query should not show any profiling as the object store is not instrumented yet
427+
SELECT * from CARS LIMIT 1;
428+
\object_store_profiling enabled
429+
-- Query again to see the profiling output
430+
SELECT * from CARS LIMIT 1;
431+
\object_store_profiling disabled
432+
-- Final query should not show any profiling as we disabled it again
433+
SELECT * from CARS LIMIT 1;
434+
"#;
435+
436+
assert_cmd_snapshot!(cli().with_minio(&container).await.pass_stdin(input));
437+
}
438+
439+
/// Extension trait to Add the minio connection information to a Command
440+
#[async_trait]
441+
trait MinioCommandExt {
442+
async fn with_minio(&mut self, container: &ContainerAsync<minio::MinIO>)
443+
-> &mut Self;
444+
}
445+
446+
#[async_trait]
447+
impl MinioCommandExt for Command {
448+
async fn with_minio(
449+
&mut self,
450+
container: &ContainerAsync<minio::MinIO>,
451+
) -> &mut Self {
452+
let port = container.get_host_port_ipv4(9000).await.unwrap();
453+
454+
self.env_clear()
455+
.env("AWS_ACCESS_KEY_ID", "TEST-DataFusionLogin")
456+
.env("AWS_SECRET_ACCESS_KEY", "TEST-DataFusionPassword")
457+
.env("AWS_ENDPOINT", format!("http://localhost:{port}"))
458+
.env("AWS_ALLOW_HTTP", "true")
459+
}
399460
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
---
2+
source: datafusion-cli/tests/cli_integration.rs
3+
info:
4+
program: datafusion-cli
5+
args: []
6+
env:
7+
AWS_ACCESS_KEY_ID: TEST-DataFusionLogin
8+
AWS_ALLOW_HTTP: "true"
9+
AWS_ENDPOINT: "http://localhost:55031"
10+
AWS_SECRET_ACCESS_KEY: TEST-DataFusionPassword
11+
stdin: "\n CREATE EXTERNAL TABLE CARS\nSTORED AS CSV\nLOCATION 's3://data/cars.csv';\n\n-- Initial query should not show any profiling as the object store is not instrumented yet\nSELECT * from CARS LIMIT 1;\n\\object_store_profiling enabled\n-- Query again to see the profiling output\nSELECT * from CARS LIMIT 1;\n\\object_store_profiling disabled\n-- Final query should not show any profiling as we disabled it again\nSELECT * from CARS LIMIT 1;\n"
12+
snapshot_kind: text
13+
---
14+
success: true
15+
exit_code: 0
16+
----- stdout -----
17+
[CLI_VERSION]
18+
0 row(s) fetched.
19+
[ELAPSED]
20+
21+
+-----+-------+---------------------+
22+
| car | speed | time |
23+
+-----+-------+---------------------+
24+
| red | 20.0 | 1996-04-12T12:05:03 |
25+
+-----+-------+---------------------+
26+
1 row(s) fetched.
27+
[ELAPSED]
28+
29+
ObjectStore Profile mode set to Enabled
30+
+-----+-------+---------------------+
31+
| car | speed | time |
32+
+-----+-------+---------------------+
33+
| red | 20.0 | 1996-04-12T12:05:03 |
34+
+-----+-------+---------------------+
35+
1 row(s) fetched.
36+
[ELAPSED]
37+
38+
Object Store Profiling
39+
Instrumented Object Store: instrument_mode: Enabled, inner: AmazonS3(data)
40+
<TIMESTAMP> operation=Get duration=[DURATION] size=1006 path=cars.csv
41+
42+
ObjectStore Profile mode set to Disabled
43+
+-----+-------+---------------------+
44+
| car | speed | time |
45+
+-----+-------+---------------------+
46+
| red | 20.0 | 1996-04-12T12:05:03 |
47+
+-----+-------+---------------------+
48+
1 row(s) fetched.
49+
[ELAPSED]
50+
51+
\q
52+
53+
----- stderr -----

0 commit comments

Comments
 (0)