@@ -71,8 +71,12 @@ pub struct DistributedQueryExec<T: 'static + AsLogicalPlan> {
7171 /// Plan properties
7272 properties : PlanProperties ,
7373 /// Execution metrics, currently exposes:
74- /// - row count
75- /// - transferred_bytes
74+ /// - output_rows: Total number of rows returned
75+ /// - transferred_bytes: Total bytes transferred from executors
76+ /// - job_execution_time_ms: Time spent executing on the cluster (server-side)
77+ /// - job_scheduling_in_ms: Time from query submission to job start (includes queue time)
78+ /// - total_query_time_ms: Total end-to-end query latency from client perspective
79+ /// - data_transfer_time_ms: Time spent fetching results (not yet implemented)
7680 metrics : ExecutionPlanMetricsSet ,
7781}
7882
@@ -234,13 +238,16 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for DistributedQueryExec<T> {
234238 let metric_row_count = MetricBuilder :: new ( & self . metrics ) . output_rows ( partition) ;
235239 let metric_total_bytes =
236240 MetricBuilder :: new ( & self . metrics ) . counter ( "transferred_bytes" , partition) ;
241+
237242 let stream = futures:: stream:: once (
238243 execute_query (
239244 self . scheduler_url . clone ( ) ,
240245 self . session_id . clone ( ) ,
241246 query,
242247 self . config . default_grpc_client_max_message_size ( ) ,
243248 GrpcClientConfig :: from ( & self . config ) ,
249+ Arc :: new ( self . metrics . clone ( ) ) ,
250+ partition,
244251 )
245252 . map_err ( |e| ArrowError :: ExternalError ( Box :: new ( e) ) ) ,
246253 )
@@ -278,7 +285,12 @@ async fn execute_query(
278285 query : ExecuteQueryParams ,
279286 max_message_size : usize ,
280287 grpc_config : GrpcClientConfig ,
288+ metrics : Arc < ExecutionPlanMetricsSet > ,
289+ partition : usize ,
281290) -> Result < impl Stream < Item = Result < RecordBatch > > + Send > {
291+ // Capture query submission time for total_query_time_ms
292+ let query_start_time = std:: time:: Instant :: now ( ) ;
293+
282294 info ! ( "Connecting to Ballista scheduler at {scheduler_url}" ) ;
283295 // TODO reuse the scheduler to avoid connecting to the Ballista scheduler again and again
284296 let connection = create_grpc_client_connection ( scheduler_url, & grpc_config)
@@ -289,6 +301,8 @@ async fn execute_query(
289301 . max_encoding_message_size ( max_message_size)
290302 . max_decoding_message_size ( max_message_size) ;
291303
304+ let query_submit_time = std:: time:: Instant :: now ( ) ;
305+
292306 let query_result = scheduler
293307 . execute_query ( query)
294308 . await
@@ -356,10 +370,40 @@ async fn execute_query(
356370 partition_location,
357371 ..
358372 } ) ) => {
359- let duration = ended_at. saturating_sub ( started_at) ;
360- let duration = Duration :: from_millis ( duration) ;
373+ // Calculate job execution time (server-side execution)
374+ let job_execution_ms = ended_at. saturating_sub ( started_at) ;
375+ let duration = Duration :: from_millis ( job_execution_ms) ;
361376
362377 info ! ( "Job {job_id} finished executing in {duration:?} " ) ;
378+
379+ // Calculate scheduling time (client-perceived time from query submission to job start)
380+ // This includes network latency and actual queue time
381+ let scheduling_elapsed = query_submit_time. elapsed ( ) ;
382+ let scheduling_ms = scheduling_elapsed
383+ . as_millis ( )
384+ . saturating_sub ( job_execution_ms as u128 ) ;
385+
386+ // Calculate total query time (end-to-end from client perspective)
387+ let total_elapsed = query_start_time. elapsed ( ) ;
388+ let total_ms = total_elapsed. as_millis ( ) ;
389+
390+ // Set timing metrics
391+ let metric_job_execution = MetricBuilder :: new ( & metrics)
392+ . gauge ( "job_execution_time_ms" , partition) ;
393+ metric_job_execution. set ( job_execution_ms as usize ) ;
394+
395+ let metric_scheduling =
396+ MetricBuilder :: new ( & metrics) . gauge ( "job_scheduling_in_ms" , partition) ;
397+ metric_scheduling. set ( scheduling_ms as usize ) ;
398+
399+ let metric_total_time =
400+ MetricBuilder :: new ( & metrics) . gauge ( "total_query_time_ms" , partition) ;
401+ metric_total_time. set ( total_ms as usize ) ;
402+
403+ // Note: data_transfer_time_ms is not set here because partition fetching
404+ // happens lazily when the stream is consumed, not during execute_query.
405+ // This could be added in a future enhancement by wrapping the stream.
406+
363407 let streams = partition_location. into_iter ( ) . map ( move |partition| {
364408 let f = fetch_partition ( partition, max_message_size, true )
365409 . map_err ( |e| ArrowError :: ExternalError ( Box :: new ( e) ) ) ;
0 commit comments