1919use std:: collections:: hash_map:: RandomState ;
2020use std:: collections:: { HashMap , HashSet } ;
2121use std:: sync:: Arc ;
22+ use std:: time:: SystemTime ;
2223
2324use async_trait:: async_trait;
2425use datafusion:: arrow:: datatypes:: Field ;
@@ -54,7 +55,7 @@ use crate::queryplanner::topk::{plan_topk, DummyTopKLowerExec};
5455use crate :: queryplanner:: topk:: { ClusterAggregateTopKLower , ClusterAggregateTopKUpper } ;
5556use crate :: queryplanner:: { CubeTableLogical , InfoSchemaTableProvider } ;
5657use crate :: table:: { cmp_same_types, Row } ;
57- use crate :: CubeError ;
58+ use crate :: { app_metrics , CubeError } ;
5859use datafusion:: common;
5960use datafusion:: common:: tree_node:: { TreeNode , TreeNodeRecursion , TreeNodeVisitor } ;
6061use datafusion:: common:: DFSchemaRef ;
@@ -105,6 +106,10 @@ fn de_vec_as_map<'de, D: Deserializer<'de>>(
105106 Vec :: < ( u64 , MultiPartition ) > :: deserialize ( d) . map ( HashMap :: from_iter)
106107}
107108
109+ fn system_time_to_df_error ( e : std:: time:: SystemTimeError ) -> DataFusionError {
110+ DataFusionError :: Execution ( e. to_string ( ) )
111+ }
112+
108113pub async fn choose_index_ext (
109114 p : LogicalPlan ,
110115 metastore : & dyn PlanIndexStore ,
@@ -117,6 +122,7 @@ pub async fn choose_index_ext(
117122
118123 // Consult metastore to choose the index.
119124 // TODO should be single snapshot read to ensure read consistency here
125+ let get_tables_with_indices_start = SystemTime :: now ( ) ;
120126 let tables = metastore
121127 . get_tables_with_indexes (
122128 collector
@@ -131,11 +137,25 @@ pub async fn choose_index_ext(
131137 . collect_vec ( ) ,
132138 )
133139 . await ?;
140+ let time_2 = SystemTime :: now ( ) ;
141+ let get_tables_with_indices_micros = time_2
142+ . duration_since ( get_tables_with_indices_start)
143+ . map_err ( system_time_to_df_error) ?
144+ . as_micros ( ) as i64 ;
145+ let mut cumulative_await_micros = get_tables_with_indices_micros;
146+ app_metrics:: DATA_QUERY_CHOOSE_INDEX_EXT_GET_TABLES_WITH_INDICES_TIME_US
147+ . report ( get_tables_with_indices_micros) ;
134148 assert_eq ! ( tables. len( ) , collector. constraints. len( ) ) ;
135149 let mut candidates = Vec :: new ( ) ;
136150 for ( c, inputs) in collector. constraints . iter ( ) . zip ( tables) {
137- candidates. push ( pick_index ( c, inputs. 0 , inputs. 1 , inputs. 2 ) . await ?)
151+ candidates. push ( pick_index ( c, inputs. 0 , inputs. 1 , inputs. 2 ) ?)
138152 }
153+ app_metrics:: DATA_QUERY_CHOOSE_INDEX_EXT_PICK_INDEX_TIME_US . report (
154+ time_2
155+ . elapsed ( )
156+ . map_err ( system_time_to_df_error) ?
157+ . as_micros ( ) as i64 ,
158+ ) ;
139159
140160 // We pick partitioned index only when all tables request the same one.
141161 let mut indices: Vec < _ > = match all_have_same_partitioned_index ( & candidates) {
@@ -150,12 +170,20 @@ pub async fn choose_index_ext(
150170 . collect :: < Result < _ , DataFusionError > > ( ) ?,
151171 } ;
152172
173+ let get_active_partitions_and_chunks_start = SystemTime :: now ( ) ;
153174 // TODO should be single snapshot read to ensure read consistency here
154175 let partitions = metastore
155176 . get_active_partitions_and_chunks_by_index_id_for_select (
156177 indices. iter ( ) . map ( |i| i. index . get_id ( ) ) . collect_vec ( ) ,
157178 )
158179 . await ?;
180+ let get_active_partitions_and_chunks_micros = get_active_partitions_and_chunks_start
181+ . elapsed ( )
182+ . map_err ( system_time_to_df_error) ?
183+ . as_micros ( ) as i64 ;
184+ app_metrics:: DATA_QUERY_CHOOSE_INDEX_EXT_GET_ACTIVE_PARTITIONS_AND_CHUNKS_BY_INDEX_ID_TIME_US
185+ . report ( get_active_partitions_and_chunks_micros) ;
186+ cumulative_await_micros += get_active_partitions_and_chunks_micros;
159187
160188 assert_eq ! ( partitions. len( ) , indices. len( ) ) ;
161189 for ( ( i, c) , ps) in indices
@@ -187,8 +215,17 @@ pub async fn choose_index_ext(
187215 }
188216 }
189217
218+ let get_multi_partition_subtree_start_time = SystemTime :: now ( ) ;
190219 // TODO should be single snapshot read to ensure read consistency here
191220 let multi_part_subtree = metastore. get_multi_partition_subtree ( multi_parts) . await ?;
221+ let get_multi_partition_subtree_micros = get_multi_partition_subtree_start_time
222+ . elapsed ( )
223+ . map_err ( system_time_to_df_error) ?
224+ . as_micros ( ) as i64 ;
225+ app_metrics:: DATA_QUERY_CHOOSE_INDEX_EXT_GET_MULTI_PARTITION_SUBTREE_TIME_US
226+ . report ( get_multi_partition_subtree_micros) ;
227+ cumulative_await_micros += get_multi_partition_subtree_micros;
228+ app_metrics:: DATA_QUERY_CHOOSE_INDEX_EXT_TOTAL_AWAITING_TIME_US . report ( cumulative_await_micros) ;
192229 Ok ( (
193230 plan,
194231 PlanningMeta {
@@ -1070,7 +1107,7 @@ fn check_aggregates_expr(table: &IdRow<Table>, aggregates: &Vec<Expr>) -> bool {
10701107}
10711108
10721109// Picks the index, but not partitions snapshots.
1073- async fn pick_index (
1110+ fn pick_index (
10741111 c : & IndexConstraints ,
10751112 schema : IdRow < Schema > ,
10761113 table : IdRow < Table > ,
0 commit comments