Skip to content

Commit 32f281a

Browse files
authored
feat: capture more metrics in distributed_query (#1353)
* add more metrics * use queue_at --------- Signed-off-by: Hoang Pham <[email protected]>
1 parent 5017084 commit 32f281a

File tree

2 files changed

+83
-4
lines changed

2 files changed

+83
-4
lines changed

ballista/client/tests/context_checks.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,31 @@ mod supported {
123123
> 0
124124
);
125125

126+
// Verify timing metrics
127+
let job_execution_time = metrics.sum_by_name("job_execution_time_ms").unwrap();
128+
assert!(
129+
job_execution_time.as_usize() > 0,
130+
"job_execution_time_ms should be greater than 0"
131+
);
132+
133+
let scheduling_time = metrics.sum_by_name("job_scheduling_in_ms").unwrap();
134+
assert!(
135+
scheduling_time.as_usize() > 0,
136+
"job_scheduling_in_ms should be non-negative"
137+
);
138+
139+
let total_time = metrics.sum_by_name("total_query_time_ms").unwrap();
140+
assert!(
141+
total_time.as_usize() > 0,
142+
"total_query_time_ms should be greater than 0"
143+
);
144+
145+
// Total time should be at least as long as execution time
146+
assert!(
147+
total_time.as_usize() >= job_execution_time.as_usize(),
148+
"total_query_time_ms should be >= job_execution_time_ms"
149+
);
150+
126151
Ok(())
127152
}
128153

@@ -182,6 +207,20 @@ mod supported {
182207

183208
assert_batches_eq!(expected, &result);
184209

210+
// Verify timing metrics
211+
let metrics = plan.metrics().unwrap();
212+
let job_execution_time = metrics.sum_by_name("job_execution_time_ms").unwrap();
213+
assert!(
214+
job_execution_time.as_usize() > 0,
215+
"job_execution_time_ms should be greater than 0"
216+
);
217+
218+
let total_time = metrics.sum_by_name("total_query_time_ms").unwrap();
219+
assert!(
220+
total_time.as_usize() >= job_execution_time.as_usize(),
221+
"total_query_time_ms should be >= job_execution_time_ms"
222+
);
223+
185224
Ok(())
186225
}
187226

ballista/core/src/execution_plans/distributed_query.rs

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,12 @@ pub struct DistributedQueryExec<T: 'static + AsLogicalPlan> {
7171
/// Plan properties
7272
properties: PlanProperties,
7373
/// Execution metrics, currently exposes:
74-
/// - row count
75-
/// - transferred_bytes
74+
/// - output_rows: Total number of rows returned
75+
/// - transferred_bytes: Total bytes transferred from executors
76+
/// - job_execution_time_ms: Time spent executing on the cluster (server-side)
77+
/// - job_scheduling_in_ms: Time from query submission to job start (includes queue time)
78+
/// - job_execution_time_ms: Time spent executing on the cluster (ended_at - started_at)
79+
/// - job_scheduling_in_ms: Time job waited in scheduler queue (started_at - queued_at)
7680
metrics: ExecutionPlanMetricsSet,
7781
}
7882

@@ -234,13 +238,16 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for DistributedQueryExec<T> {
234238
let metric_row_count = MetricBuilder::new(&self.metrics).output_rows(partition);
235239
let metric_total_bytes =
236240
MetricBuilder::new(&self.metrics).counter("transferred_bytes", partition);
241+
237242
let stream = futures::stream::once(
238243
execute_query(
239244
self.scheduler_url.clone(),
240245
self.session_id.clone(),
241246
query,
242247
self.config.default_grpc_client_max_message_size(),
243248
GrpcClientConfig::from(&self.config),
249+
Arc::new(self.metrics.clone()),
250+
partition,
244251
)
245252
.map_err(|e| ArrowError::ExternalError(Box::new(e))),
246253
)
@@ -278,7 +285,12 @@ async fn execute_query(
278285
query: ExecuteQueryParams,
279286
max_message_size: usize,
280287
grpc_config: GrpcClientConfig,
288+
metrics: Arc<ExecutionPlanMetricsSet>,
289+
partition: usize,
281290
) -> Result<impl Stream<Item = Result<RecordBatch>> + Send> {
291+
// Capture query submission time for total_query_time_ms
292+
let query_start_time = std::time::Instant::now();
293+
282294
info!("Connecting to Ballista scheduler at {scheduler_url}");
283295
// TODO reuse the scheduler to avoid connecting to the Ballista scheduler again and again
284296
let connection = create_grpc_client_connection(scheduler_url, &grpc_config)
@@ -351,15 +363,43 @@ async fn execute_query(
351363
break Err(DataFusionError::Execution(msg));
352364
}
353365
Some(job_status::Status::Successful(SuccessfulJob {
366+
queued_at,
354367
started_at,
355368
ended_at,
356369
partition_location,
357370
..
358371
})) => {
359-
let duration = ended_at.saturating_sub(started_at);
360-
let duration = Duration::from_millis(duration);
372+
// Calculate job execution time (server-side execution)
373+
let job_execution_ms = ended_at.saturating_sub(started_at);
374+
let duration = Duration::from_millis(job_execution_ms);
361375

362376
info!("Job {job_id} finished executing in {duration:?} ");
377+
378+
// Calculate scheduling time (server-side queue time)
379+
// This includes network latency and actual queue time
380+
let scheduling_ms = started_at.saturating_sub(queued_at);
381+
382+
// Calculate total query time (end-to-end from client perspective)
383+
let total_elapsed = query_start_time.elapsed();
384+
let total_ms = total_elapsed.as_millis();
385+
386+
// Set timing metrics
387+
let metric_job_execution = MetricBuilder::new(&metrics)
388+
.gauge("job_execution_time_ms", partition);
389+
metric_job_execution.set(job_execution_ms as usize);
390+
391+
let metric_scheduling =
392+
MetricBuilder::new(&metrics).gauge("job_scheduling_in_ms", partition);
393+
metric_scheduling.set(scheduling_ms as usize);
394+
395+
let metric_total_time =
396+
MetricBuilder::new(&metrics).gauge("total_query_time_ms", partition);
397+
metric_total_time.set(total_ms as usize);
398+
399+
// Note: data_transfer_time_ms is not set here because partition fetching
400+
// happens lazily when the stream is consumed, not during execute_query.
401+
// This could be added in a future enhancement by wrapping the stream.
402+
363403
let streams = partition_location.into_iter().map(move |partition| {
364404
let f = fetch_partition(partition, max_message_size, true)
365405
.map_err(|e| ArrowError::ExternalError(Box::new(e)));

0 commit comments

Comments
 (0)