Skip to content

Commit ab8ee1e

Browse files
committed
chore(cubestore): Upgrade DF: Add choose_index_ext metrics
1 parent 2c11416 commit ab8ee1e

File tree

2 files changed

+34
-3
lines changed

2 files changed

+34
-3
lines changed

rust/cubestore/cubestore/src/app_metrics.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,17 @@ pub static DATA_QUERY_LOGICAL_PLAN_IS_DATA_SELECT_QUERY_US: Histogram =
3131

3232
pub static DATA_QUERY_CHOOSE_INDEX_AND_WORKERS_TIME_US: Histogram =
3333
metrics::histogram("cs.sql.query.data.planning.choose_index_and_workers.us");
34+
pub static DATA_QUERY_CHOOSE_INDEX_EXT_GET_TABLES_WITH_INDICES_TIME_US: Histogram =
35+
metrics::histogram("cs.sql.query.data.planning.choose_index_ext.get_tables_with_indices.us");
36+
pub static DATA_QUERY_CHOOSE_INDEX_EXT_PICK_INDEX_TIME_US: Histogram =
37+
metrics::histogram("cs.sql.query.data.planning.choose_index_ext.pick_index.us");
38+
pub static DATA_QUERY_CHOOSE_INDEX_EXT_GET_ACTIVE_PARTITIONS_AND_CHUNKS_BY_INDEX_ID_TIME_US: Histogram =
39+
metrics::histogram("cs.sql.query.data.planning.choose_index_ext.get_active_partitions_and_chunks_by_index_id.us");
40+
pub static DATA_QUERY_CHOOSE_INDEX_EXT_GET_MULTI_PARTITION_SUBTREE_TIME_US: Histogram =
41+
metrics::histogram("cs.sql.query.data.planning.choose_index_ext.get_multi_partition_subtree.us");
42+
pub static DATA_QUERY_CHOOSE_INDEX_EXT_TOTAL_AWAITING_TIME_US: Histogram =
43+
metrics::histogram("cs.sql.query.data.planning.choose_index_ext.total_awaiting.us");
44+
3445
pub static DATA_QUERY_TO_SERIALIZED_PLAN_TIME_US: Histogram =
3546
metrics::histogram("cs.sql.query.data.planning.to_serialized_plan.us");
3647
pub static DATA_QUERY_CREATE_ROUTER_PHYSICAL_PLAN_US: Histogram =

rust/cubestore/cubestore/src/queryplanner/planning.rs

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
use std::collections::hash_map::RandomState;
2020
use std::collections::{HashMap, HashSet};
2121
use std::sync::Arc;
22+
use std::time::SystemTime;
2223

2324
use async_trait::async_trait;
2425
use datafusion::arrow::datatypes::Field;
@@ -54,7 +55,7 @@ use crate::queryplanner::topk::{plan_topk, DummyTopKLowerExec};
5455
use crate::queryplanner::topk::{ClusterAggregateTopKLower, ClusterAggregateTopKUpper};
5556
use crate::queryplanner::{CubeTableLogical, InfoSchemaTableProvider};
5657
use crate::table::{cmp_same_types, Row};
57-
use crate::CubeError;
58+
use crate::{app_metrics, CubeError};
5859
use datafusion::common;
5960
use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
6061
use datafusion::common::DFSchemaRef;
@@ -105,6 +106,10 @@ fn de_vec_as_map<'de, D: Deserializer<'de>>(
105106
Vec::<(u64, MultiPartition)>::deserialize(d).map(HashMap::from_iter)
106107
}
107108

109+
fn system_time_to_df_error(e: std::time::SystemTimeError) -> DataFusionError {
110+
DataFusionError::Execution(e.to_string())
111+
}
112+
108113
pub async fn choose_index_ext(
109114
p: LogicalPlan,
110115
metastore: &dyn PlanIndexStore,
@@ -117,6 +122,7 @@ pub async fn choose_index_ext(
117122

118123
// Consult metastore to choose the index.
119124
// TODO should be single snapshot read to ensure read consistency here
125+
let get_tables_with_indices_start = SystemTime::now();
120126
let tables = metastore
121127
.get_tables_with_indexes(
122128
collector
@@ -131,11 +137,16 @@ pub async fn choose_index_ext(
131137
.collect_vec(),
132138
)
133139
.await?;
140+
let time_2 = SystemTime::now();
141+
let get_tables_with_indices_micros = time_2.duration_since(get_tables_with_indices_start).map_err(system_time_to_df_error)?.as_micros() as i64;
142+
let mut cumulative_await_micros = get_tables_with_indices_micros;
143+
app_metrics::DATA_QUERY_CHOOSE_INDEX_EXT_GET_TABLES_WITH_INDICES_TIME_US.report(get_tables_with_indices_micros);
134144
assert_eq!(tables.len(), collector.constraints.len());
135145
let mut candidates = Vec::new();
136146
for (c, inputs) in collector.constraints.iter().zip(tables) {
137-
candidates.push(pick_index(c, inputs.0, inputs.1, inputs.2).await?)
147+
candidates.push(pick_index(c, inputs.0, inputs.1, inputs.2)?)
138148
}
149+
app_metrics::DATA_QUERY_CHOOSE_INDEX_EXT_PICK_INDEX_TIME_US.report(time_2.elapsed().map_err(system_time_to_df_error)?.as_micros() as i64);
139150

140151
// We pick partitioned index only when all tables request the same one.
141152
let mut indices: Vec<_> = match all_have_same_partitioned_index(&candidates) {
@@ -150,12 +161,16 @@ pub async fn choose_index_ext(
150161
.collect::<Result<_, DataFusionError>>()?,
151162
};
152163

164+
let get_active_partitions_and_chunks_start = SystemTime::now();
153165
// TODO should be single snapshot read to ensure read consistency here
154166
let partitions = metastore
155167
.get_active_partitions_and_chunks_by_index_id_for_select(
156168
indices.iter().map(|i| i.index.get_id()).collect_vec(),
157169
)
158170
.await?;
171+
let get_active_partitions_and_chunks_micros = get_active_partitions_and_chunks_start.elapsed().map_err(system_time_to_df_error)?.as_micros() as i64;
172+
app_metrics::DATA_QUERY_CHOOSE_INDEX_EXT_GET_ACTIVE_PARTITIONS_AND_CHUNKS_BY_INDEX_ID_TIME_US.report(get_active_partitions_and_chunks_micros);
173+
cumulative_await_micros += get_active_partitions_and_chunks_micros;
159174

160175
assert_eq!(partitions.len(), indices.len());
161176
for ((i, c), ps) in indices
@@ -187,8 +202,13 @@ pub async fn choose_index_ext(
187202
}
188203
}
189204

205+
let get_multi_partition_subtree_start_time = SystemTime::now();
190206
// TODO should be single snapshot read to ensure read consistency here
191207
let multi_part_subtree = metastore.get_multi_partition_subtree(multi_parts).await?;
208+
let get_multi_partition_subtree_micros = get_multi_partition_subtree_start_time.elapsed().map_err(system_time_to_df_error)?.as_micros() as i64;
209+
app_metrics::DATA_QUERY_CHOOSE_INDEX_EXT_GET_MULTI_PARTITION_SUBTREE_TIME_US.report(get_multi_partition_subtree_micros);
210+
cumulative_await_micros += get_multi_partition_subtree_micros;
211+
app_metrics::DATA_QUERY_CHOOSE_INDEX_EXT_TOTAL_AWAITING_TIME_US.report(cumulative_await_micros);
192212
Ok((
193213
plan,
194214
PlanningMeta {
@@ -1070,7 +1090,7 @@ fn check_aggregates_expr(table: &IdRow<Table>, aggregates: &Vec<Expr>) -> bool {
10701090
}
10711091

10721092
// Picks the index, but not partitions snapshots.
1073-
async fn pick_index(
1093+
fn pick_index(
10741094
c: &IndexConstraints,
10751095
schema: IdRow<Schema>,
10761096
table: IdRow<Table>,

0 commit comments

Comments
 (0)