@@ -25,7 +25,8 @@ use crate::duckdb::query::{
2525} ;
2626use crate :: error:: { OperationOn , OperationType } ;
2727use crate :: models:: { QueryContext , QueryResult } ;
28- use core_history:: HistoryStore ;
28+ use core_history:: query_metrics:: QueryMetric ;
29+ use core_history:: { HistoryStore , QueryRecordId } ;
2930use core_metastore:: {
3031 AwsAccessKeyCredentials , AwsCredentials , FileVolume , Metastore , S3TablesVolume , S3Volume ,
3132 SchemaIdent as MetastoreSchemaIdent , TableCreateRequest as MetastoreTableCreateRequest ,
@@ -71,15 +72,15 @@ use datafusion_expr::logical_plan::dml::{DmlStatement, InsertOp, WriteOp};
7172use datafusion_expr:: planner:: ContextProvider ;
7273use datafusion_expr:: {
7374 BinaryExpr , CreateMemoryTable , DdlStatement , Expr as DFExpr , ExprSchemable , Extension ,
74- JoinType , LogicalPlanBuilder , Operator , Projection , SubqueryAlias , TryCast , and ,
75- build_join_schema, is_null, lit, when,
75+ JoinType , LogicalPlanBuilder , Operator , Projection , SubqueryAlias , TryCast ,
76+ UserDefinedLogicalNode , and , build_join_schema, is_null, lit, when,
7677} ;
7778use datafusion_iceberg:: DataFusionTable ;
7879use datafusion_iceberg:: catalog:: catalog:: IcebergCatalog ;
7980use datafusion_iceberg:: catalog:: mirror:: Mirror ;
8081use datafusion_iceberg:: catalog:: schema:: IcebergSchema ;
8182use datafusion_iceberg:: table:: DataFusionTableConfigBuilder ;
82- use datafusion_physical_plan:: collect;
83+ use datafusion_physical_plan:: { ExecutionPlan , collect, displayable } ;
8384use df_catalog:: catalog:: CachingCatalog ;
8485use df_catalog:: catalog_list:: CachedEntity ;
8586use df_catalog:: table:: CachingTable ;
@@ -92,7 +93,9 @@ use embucket_functions::visitors::{
9293 table_functions_cte_relation, timestamp, top_limit,
9394 unimplemented:: functions_checker:: visit as unimplemented_functions_checker,
9495} ;
96+ use futures:: FutureExt ;
9597use futures:: TryStreamExt ;
98+ use futures:: future:: join_all;
9699use iceberg_rust:: catalog:: Catalog ;
97100use iceberg_rust:: catalog:: create:: CreateTableBuilder ;
98101use iceberg_rust:: catalog:: identifier:: Identifier ;
@@ -108,6 +111,7 @@ use iceberg_rust::spec::values::Value as IcebergValue;
108111use iceberg_rust:: table:: manifest_list:: snapshot_partition_bounds;
109112use object_store:: aws:: { AmazonS3Builder , AmazonS3ConfigKey as S3Key , resolve_bucket_region} ;
110113use object_store:: { ClientOptions , ObjectStore } ;
114+ use serde_json:: json;
111115use snafu:: { OptionExt , ResultExt , location} ;
112116use sqlparser:: ast:: helpers:: key_value_options:: KeyValueOptions ;
113117use sqlparser:: ast:: helpers:: stmt_data_loading:: StageParamsObject ;
@@ -124,7 +128,12 @@ use std::ops::ControlFlow;
124128use std:: result:: Result as StdResult ;
125129use std:: str:: FromStr ;
126130use std:: sync:: Arc ;
127- use tracing:: Instrument ;
131+ use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
132+ use std:: time:: Duration ;
133+ use tokio:: task:: AbortHandle ;
134+ use tokio:: time:: sleep;
135+ use tokio_util:: sync:: CancellationToken ;
136+ use tracing:: { Instrument , error} ;
128137use tracing_attributes:: instrument;
129138use url:: Url ;
130139
@@ -2394,6 +2403,7 @@ impl UserQuery {
23942403 async fn execute_logical_plan ( & self , plan : LogicalPlan ) -> Result < QueryResult > {
23952404 let session = self . session . clone ( ) ;
23962405 let query_id = self . query_context . query_id ;
2406+ let history_store = Arc :: clone ( & self . history_store ) ;
23972407
23982408 let span = tracing:: debug_span!( "UserQuery::execute_logical_plan" ) ;
23992409
@@ -2406,11 +2416,42 @@ impl UserQuery {
24062416 . ctx
24072417 . execute_logical_plan ( plan)
24082418 . await
2409- . context ( ex_error:: DataFusionSnafu ) ?
2410- . collect ( )
2419+ . context ( ex_error:: DataFusionSnafu ) ?;
2420+
2421+ let plan = records
2422+ . create_physical_plan ( )
2423+ . await
2424+ . context ( ex_error:: DataFusionSnafu ) ?;
2425+
2426+ // Run background job to save physical plan metrics
2427+ let token = CancellationToken :: new ( ) ;
2428+ let bg_token = token. clone ( ) ;
2429+ let metrics_plan = Arc :: clone ( & plan) ;
2430+ tokio:: spawn ( async move {
2431+ loop {
2432+ tokio:: select! {
2433+ _ = sleep( Duration :: from_secs( 1 ) ) => {
2434+ if let Err ( e) = save_plan_metrics( history_store. clone( ) , query_id, & metrics_plan) . await {
2435+ error!( "Failed to save intermediate plan metrics: {:?}" , e) ;
2436+ }
2437+ }
2438+ _ = bg_token. cancelled( ) => {
2439+ if let Err ( e) = save_plan_metrics( history_store. clone( ) , query_id, & metrics_plan) . await {
2440+ error!( "Failed to save final plan metrics: {:?}" , e) ;
2441+ }
2442+ break ;
2443+ }
2444+ }
2445+ }
2446+ } ) ;
2447+
2448+ let task_ctx = session. ctx . task_ctx ( ) ;
2449+ let records = collect ( plan, task_ctx)
24112450 . instrument ( span)
24122451 . await
24132452 . context ( ex_error:: DataFusionSnafu ) ?;
2453+ // Stop metrics background job
2454+ token. cancel ( ) ;
24142455 if !records. is_empty ( ) {
24152456 schema = records[ 0 ] . schema ( ) . as_ref ( ) . clone ( ) ;
24162457 }
@@ -3739,3 +3780,57 @@ fn normalize_resolved_ref(table_ref: &ResolvedTableReference) -> ResolvedTableRe
37393780 table : Arc :: from ( table_ref. table . to_ascii_lowercase ( ) ) ,
37403781 }
37413782}
3783+
3784+ pub async fn save_plan_metrics (
3785+ history_store : Arc < dyn HistoryStore > ,
3786+ query_id : QueryRecordId ,
3787+ plan : & Arc < dyn ExecutionPlan > ,
3788+ ) -> Result < ( ) > {
3789+ let counter = AtomicUsize :: new ( 0 ) ;
3790+ let mut collected = Vec :: new ( ) ;
3791+
3792+ collect_plan_metrics ( query_id, plan, None , 0 , & counter, & mut collected) ;
3793+
3794+ tracing:: debug!(
3795+ "Collected {} metrics from plan, saving to metrics store..." ,
3796+ collected. len( )
3797+ ) ;
3798+ history_store
3799+ . add_query_metrics_batch ( & collected)
3800+ . await
3801+ . context ( ex_error:: QueryHistoryMetricsSnafu )
3802+ }
3803+
3804+ /// Recursively collect metrics into a vector (non-async).
3805+ fn collect_plan_metrics (
3806+ query_id : QueryRecordId ,
3807+ plan : & Arc < dyn ExecutionPlan > ,
3808+ parent : Option < usize > ,
3809+ level : usize ,
3810+ counter : & AtomicUsize ,
3811+ out : & mut Vec < QueryMetric > ,
3812+ ) {
3813+ let node_id = counter. fetch_add ( 1 , Ordering :: SeqCst ) ;
3814+
3815+ let metrics_json = if let Some ( metrics) = plan. metrics ( ) {
3816+ json ! ( {
3817+ "spill_count" : metrics. spill_count( ) ,
3818+ "output_rows" : metrics. output_rows( ) ,
3819+ "elapsed_compute" : metrics. elapsed_compute( ) ,
3820+ } )
3821+ } else {
3822+ json ! ( { } )
3823+ } ;
3824+
3825+ out. push ( QueryMetric :: new (
3826+ query_id,
3827+ node_id,
3828+ parent,
3829+ plan. name ( ) ,
3830+ metrics_json,
3831+ ) ) ;
3832+
3833+ for child in plan. children ( ) {
3834+ collect_plan_metrics ( query_id, & child, Some ( node_id) , level + 1 , counter, out) ;
3835+ }
3836+ }
0 commit comments