@@ -25,7 +25,8 @@ use crate::duckdb::query::{
2525} ;
2626use crate :: error:: { OperationOn , OperationType } ;
2727use crate :: models:: { QueryContext , QueryResult } ;
28- use core_history:: HistoryStore ;
28+ use core_history:: query_metrics:: QueryMetric ;
29+ use core_history:: { HistoryStore , QueryRecordId } ;
2930use core_metastore:: {
3031 AwsAccessKeyCredentials , AwsCredentials , FileVolume , Metastore , S3TablesVolume , S3Volume ,
3132 SchemaIdent as MetastoreSchemaIdent , TableCreateRequest as MetastoreTableCreateRequest ,
@@ -71,15 +72,15 @@ use datafusion_expr::logical_plan::dml::{DmlStatement, InsertOp, WriteOp};
7172use datafusion_expr:: planner:: ContextProvider ;
7273use datafusion_expr:: {
7374 BinaryExpr , CreateMemoryTable , DdlStatement , Expr as DFExpr , ExprSchemable , Extension ,
74- JoinType , LogicalPlanBuilder , Operator , Projection , SubqueryAlias , TryCast , and ,
75- build_join_schema, is_null, lit, when,
75+ JoinType , LogicalPlanBuilder , Operator , Projection , SubqueryAlias , TryCast ,
76+ UserDefinedLogicalNode , and , build_join_schema, is_null, lit, when,
7677} ;
7778use datafusion_iceberg:: DataFusionTable ;
7879use datafusion_iceberg:: catalog:: catalog:: IcebergCatalog ;
7980use datafusion_iceberg:: catalog:: mirror:: Mirror ;
8081use datafusion_iceberg:: catalog:: schema:: IcebergSchema ;
8182use datafusion_iceberg:: table:: DataFusionTableConfigBuilder ;
82- use datafusion_physical_plan:: collect;
83+ use datafusion_physical_plan:: { ExecutionPlan , collect, displayable } ;
8384use df_catalog:: catalog:: CachingCatalog ;
8485use df_catalog:: catalog_list:: CachedEntity ;
8586use df_catalog:: table:: CachingTable ;
@@ -92,7 +93,9 @@ use embucket_functions::visitors::{
9293 table_functions_cte_relation, timestamp, top_limit,
9394 unimplemented:: functions_checker:: visit as unimplemented_functions_checker,
9495} ;
96+ use futures:: FutureExt ;
9597use futures:: TryStreamExt ;
98+ use futures:: future:: join_all;
9699use iceberg_rust:: catalog:: Catalog ;
97100use iceberg_rust:: catalog:: create:: CreateTableBuilder ;
98101use iceberg_rust:: catalog:: identifier:: Identifier ;
@@ -108,6 +111,7 @@ use iceberg_rust::spec::values::Value as IcebergValue;
108111use iceberg_rust:: table:: manifest_list:: snapshot_partition_bounds;
109112use object_store:: aws:: { AmazonS3Builder , AmazonS3ConfigKey as S3Key , resolve_bucket_region} ;
110113use object_store:: { ClientOptions , ObjectStore } ;
114+ use serde_json:: json;
111115use snafu:: { OptionExt , ResultExt , location} ;
112116use sqlparser:: ast:: helpers:: key_value_options:: KeyValueOptions ;
113117use sqlparser:: ast:: helpers:: stmt_data_loading:: StageParamsObject ;
@@ -124,7 +128,12 @@ use std::ops::ControlFlow;
124128use std:: result:: Result as StdResult ;
125129use std:: str:: FromStr ;
126130use std:: sync:: Arc ;
127- use tracing:: Instrument ;
131+ use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
132+ use std:: time:: Duration ;
133+ use tokio:: task:: AbortHandle ;
134+ use tokio:: time:: sleep;
135+ use tokio_util:: sync:: CancellationToken ;
136+ use tracing:: { Instrument , error} ;
128137use tracing_attributes:: instrument;
129138use url:: Url ;
130139
@@ -2389,6 +2398,7 @@ impl UserQuery {
23892398 async fn execute_logical_plan ( & self , plan : LogicalPlan ) -> Result < QueryResult > {
23902399 let session = self . session . clone ( ) ;
23912400 let query_id = self . query_context . query_id ;
2401+ let history_store = Arc :: clone ( & self . history_store ) ;
23922402
23932403 let span = tracing:: debug_span!( "UserQuery::execute_logical_plan" ) ;
23942404
@@ -2401,11 +2411,42 @@ impl UserQuery {
24012411 . ctx
24022412 . execute_logical_plan ( plan)
24032413 . await
2404- . context ( ex_error:: DataFusionSnafu ) ?
2405- . collect ( )
2414+ . context ( ex_error:: DataFusionSnafu ) ?;
2415+
2416+ let plan = records
2417+ . create_physical_plan ( )
2418+ . await
2419+ . context ( ex_error:: DataFusionSnafu ) ?;
2420+
2421+ // Run background job to save physical plan metrics
2422+ let token = CancellationToken :: new ( ) ;
2423+ let bg_token = token. clone ( ) ;
2424+ let metrics_plan = Arc :: clone ( & plan) ;
2425+ tokio:: spawn ( async move {
2426+ loop {
2427+ tokio:: select! {
2428+ _ = sleep( Duration :: from_secs( 1 ) ) => {
2429+ if let Err ( e) = save_plan_metrics( history_store. clone( ) , query_id, & metrics_plan) . await {
2430+ error!( "Failed to save intermediate plan metrics: {:?}" , e) ;
2431+ }
2432+ }
2433+ _ = bg_token. cancelled( ) => {
2434+ if let Err ( e) = save_plan_metrics( history_store. clone( ) , query_id, & metrics_plan) . await {
2435+ error!( "Failed to save final plan metrics: {:?}" , e) ;
2436+ }
2437+ break ;
2438+ }
2439+ }
2440+ }
2441+ } ) ;
2442+
2443+ let task_ctx = session. ctx . task_ctx ( ) ;
2444+ let records = collect ( plan, task_ctx)
24062445 . instrument ( span)
24072446 . await
24082447 . context ( ex_error:: DataFusionSnafu ) ?;
2448+ // Stop metrics background job
2449+ token. cancel ( ) ;
24092450 if !records. is_empty ( ) {
24102451 schema = records[ 0 ] . schema ( ) . as_ref ( ) . clone ( ) ;
24112452 }
@@ -3735,3 +3776,57 @@ fn normalize_resolved_ref(table_ref: &ResolvedTableReference) -> ResolvedTableRe
37353776 table : Arc :: from ( table_ref. table . to_ascii_lowercase ( ) ) ,
37363777 }
37373778}
3779+
3780+ pub async fn save_plan_metrics (
3781+ history_store : Arc < dyn HistoryStore > ,
3782+ query_id : QueryRecordId ,
3783+ plan : & Arc < dyn ExecutionPlan > ,
3784+ ) -> Result < ( ) > {
3785+ let counter = AtomicUsize :: new ( 0 ) ;
3786+ let mut collected = Vec :: new ( ) ;
3787+
3788+ collect_plan_metrics ( query_id, plan, None , 0 , & counter, & mut collected) ;
3789+
3790+ tracing:: debug!(
3791+ "Collected {} metrics from plan, saving to metrics store..." ,
3792+ collected. len( )
3793+ ) ;
3794+ history_store
3795+ . add_query_metrics_batch ( & collected)
3796+ . await
3797+ . context ( ex_error:: QueryHistoryMetricsSnafu )
3798+ }
3799+
3800+ /// Recursively collect metrics into a vector (non-async).
3801+ fn collect_plan_metrics (
3802+ query_id : QueryRecordId ,
3803+ plan : & Arc < dyn ExecutionPlan > ,
3804+ parent : Option < usize > ,
3805+ level : usize ,
3806+ counter : & AtomicUsize ,
3807+ out : & mut Vec < QueryMetric > ,
3808+ ) {
3809+ let node_id = counter. fetch_add ( 1 , Ordering :: SeqCst ) ;
3810+
3811+ let metrics_json = if let Some ( metrics) = plan. metrics ( ) {
3812+ json ! ( {
3813+ "spill_count" : metrics. spill_count( ) ,
3814+ "output_rows" : metrics. output_rows( ) ,
3815+ "elapsed_compute" : metrics. elapsed_compute( ) ,
3816+ } )
3817+ } else {
3818+ json ! ( { } )
3819+ } ;
3820+
3821+ out. push ( QueryMetric :: new (
3822+ query_id,
3823+ node_id,
3824+ parent,
3825+ plan. name ( ) ,
3826+ metrics_json,
3827+ ) ) ;
3828+
3829+ for child in plan. children ( ) {
3830+ collect_plan_metrics ( query_id, & child, Some ( node_id) , level + 1 , counter, out) ;
3831+ }
3832+ }
0 commit comments