@@ -72,15 +72,15 @@ use datafusion_expr::logical_plan::dml::{DmlStatement, InsertOp, WriteOp};
7272use datafusion_expr:: planner:: ContextProvider ;
7373use datafusion_expr:: {
7474 BinaryExpr , CreateMemoryTable , DdlStatement , Expr as DFExpr , ExprSchemable , Extension ,
75- JoinType , LogicalPlanBuilder , Operator , Projection , SubqueryAlias , TryCast ,
76- UserDefinedLogicalNode , and , build_join_schema, is_null, lit, when,
75+ JoinType , LogicalPlanBuilder , Operator , Projection , SubqueryAlias , TryCast , and ,
76+ build_join_schema, is_null, lit, when,
7777} ;
7878use datafusion_iceberg:: DataFusionTable ;
7979use datafusion_iceberg:: catalog:: catalog:: IcebergCatalog ;
8080use datafusion_iceberg:: catalog:: mirror:: Mirror ;
8181use datafusion_iceberg:: catalog:: schema:: IcebergSchema ;
8282use datafusion_iceberg:: table:: DataFusionTableConfigBuilder ;
83- use datafusion_physical_plan:: { ExecutionPlan , collect, displayable } ;
83+ use datafusion_physical_plan:: { ExecutionPlan , collect} ;
8484use df_catalog:: catalog:: CachingCatalog ;
8585use df_catalog:: catalog_list:: CachedEntity ;
8686use df_catalog:: table:: CachingTable ;
@@ -93,9 +93,7 @@ use embucket_functions::visitors::{
9393 table_functions_cte_relation, timestamp, top_limit,
9494 unimplemented:: functions_checker:: visit as unimplemented_functions_checker,
9595} ;
96- use futures:: FutureExt ;
9796use futures:: TryStreamExt ;
98- use futures:: future:: join_all;
9997use iceberg_rust:: catalog:: Catalog ;
10098use iceberg_rust:: catalog:: create:: CreateTableBuilder ;
10199use iceberg_rust:: catalog:: identifier:: Identifier ;
@@ -130,7 +128,6 @@ use std::str::FromStr;
130128use std:: sync:: Arc ;
131129use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
132130use std:: time:: Duration ;
133- use tokio:: task:: AbortHandle ;
134131use tokio:: time:: sleep;
135132use tokio_util:: sync:: CancellationToken ;
136133use tracing:: { Instrument , error} ;
@@ -2430,12 +2427,12 @@ impl UserQuery {
24302427 tokio:: spawn ( async move {
24312428 loop {
24322429 tokio:: select! {
2433- _ = sleep( Duration :: from_secs( 1 ) ) => {
2430+ ( ) = sleep( Duration :: from_secs( 1 ) ) => {
24342431 if let Err ( e) = save_plan_metrics( history_store. clone( ) , query_id, & metrics_plan) . await {
24352432 error!( "Failed to save intermediate plan metrics: {:?}" , e) ;
24362433 }
24372434 }
2438- _ = bg_token. cancelled( ) => {
2435+ ( ) = bg_token. cancelled( ) => {
24392436 if let Err ( e) = save_plan_metrics( history_store. clone( ) , query_id, & metrics_plan) . await {
24402437 error!( "Failed to save final plan metrics: {:?}" , e) ;
24412438 }
@@ -3789,7 +3786,7 @@ pub async fn save_plan_metrics(
37893786 let counter = AtomicUsize :: new ( 0 ) ;
37903787 let mut collected = Vec :: new ( ) ;
37913788
3792- collect_plan_metrics ( query_id, plan, None , 0 , & counter, & mut collected) ;
3789+ collect_plan_metrics ( query_id, plan, None , & counter, & mut collected) ;
37933790
37943791 tracing:: debug!(
37953792 "Collected {} metrics from plan, saving to metrics store..." ,
@@ -3806,7 +3803,6 @@ fn collect_plan_metrics(
38063803 query_id : QueryRecordId ,
38073804 plan : & Arc < dyn ExecutionPlan > ,
38083805 parent : Option < usize > ,
3809- level : usize ,
38103806 counter : & AtomicUsize ,
38113807 out : & mut Vec < QueryMetric > ,
38123808) {
@@ -3831,6 +3827,6 @@ fn collect_plan_metrics(
38313827 ) ) ;
38323828
38333829 for child in plan. children ( ) {
3834- collect_plan_metrics ( query_id, & child, Some ( node_id) , level + 1 , counter, out) ;
3830+ collect_plan_metrics ( query_id, child, Some ( node_id) , counter, out) ;
38353831 }
38363832}
0 commit comments