Skip to content

Commit 2bf86e5

Browse files
authored
feat(cubesql): Avoid pushing split down for trivial selects to optimi… (#7556)
* feat(cubesql): Avoid pushing split down for trivial selects to optimize performance * feat(cubesql): Pack projection and aggregate members in binary tree to reduce number of iterations required to replace those
1 parent 412213c commit 2bf86e5

File tree

3 files changed

+167
-7
lines changed

3 files changed

+167
-7
lines changed

rust/cubesql/cubesql/src/compile/rewrite/analysis.rs

Lines changed: 89 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@ use crate::{
33
engine::provider::CubeContext,
44
rewrite::{
55
converter::{is_expr_node, node_to_expr, LogicalPlanToLanguageConverter},
6-
expr_column_name, AliasExprAlias, AllMembersAlias, AllMembersCube, ChangeUserCube,
7-
ColumnExprColumn, DimensionName, FilterMemberMember, FilterMemberOp, LiteralExprValue,
8-
LiteralMemberRelation, LiteralMemberValue, LogicalPlanLanguage, MeasureName,
6+
expr_column_name, AggregateFunctionExprFun, AggregateUDFExprFun, AliasExprAlias,
7+
AllMembersAlias, AllMembersCube, ChangeUserCube, ColumnExprColumn, DimensionName,
8+
FilterMemberMember, FilterMemberOp, LiteralExprValue, LiteralMemberRelation,
9+
LiteralMemberValue, LogicalPlanLanguage, MeasureName, ScalarFunctionExprFun,
910
SegmentName, TableScanSourceTableName, TimeDimensionDateRange,
1011
TimeDimensionGranularity, TimeDimensionName, VirtualFieldCube, VirtualFieldName,
1112
},
@@ -21,6 +22,7 @@ use datafusion::{
2122
},
2223
logical_plan::{Column, DFSchema, Expr},
2324
physical_plan::{
25+
aggregates::AggregateFunction,
2426
functions::{BuiltinScalarFunction, Volatility},
2527
planner::DefaultPhysicalPlanner,
2628
ColumnarValue, PhysicalPlanner,
@@ -34,6 +36,7 @@ use std::{fmt::Debug, ops::Index, sync::Arc};
3436
pub struct LogicalPlanData {
3537
pub original_expr: Option<Expr>,
3638
pub member_name_to_expr: Option<Vec<(Option<String>, Member, Expr)>>,
39+
pub trivial_push_down: Option<usize>,
3740
pub column: Option<Column>,
3841
pub expr_to_alias: Option<Vec<(Expr, String, Option<bool>)>>,
3942
pub referenced_expr: Option<Vec<Expr>>,
@@ -266,6 +269,86 @@ impl LogicalPlanAnalysis {
266269
original_expr
267270
}
268271

272+
fn make_trivial_push_down(
273+
egraph: &EGraph<LogicalPlanLanguage, Self>,
274+
enode: &LogicalPlanLanguage,
275+
) -> Option<usize> {
276+
let trivial_push_down = |id| egraph.index(id).data.trivial_push_down.clone();
277+
match enode {
278+
LogicalPlanLanguage::ColumnExpr(_) => Some(0),
279+
LogicalPlanLanguage::LiteralExpr(_) => Some(0),
280+
LogicalPlanLanguage::AliasExpr(params) => trivial_push_down(params[0]),
281+
LogicalPlanLanguage::ProjectionExpr(params)
282+
| LogicalPlanLanguage::AggregateAggrExpr(params)
283+
| LogicalPlanLanguage::AggregateGroupExpr(params)
284+
| LogicalPlanLanguage::AggregateFunctionExprArgs(params)
285+
| LogicalPlanLanguage::AggregateUDFExprArgs(params) => {
286+
let mut trivial = 0;
287+
for id in params.iter() {
288+
trivial = trivial_push_down(*id)?.max(trivial);
289+
}
290+
Some(trivial)
291+
}
292+
LogicalPlanLanguage::ScalarFunctionExprFun(ScalarFunctionExprFun(fun)) => {
293+
if fun == &BuiltinScalarFunction::DateTrunc {
294+
Some(0)
295+
} else {
296+
None
297+
}
298+
}
299+
LogicalPlanLanguage::ScalarFunctionExpr(params) => {
300+
let mut trivial = 0;
301+
for id in params.iter() {
302+
trivial = trivial_push_down(*id)?.max(trivial);
303+
}
304+
Some(trivial + 1)
305+
}
306+
LogicalPlanLanguage::ScalarFunctionExprArgs(params) => {
307+
let mut trivial = 0;
308+
for id in params.iter() {
309+
trivial = trivial_push_down(*id)?.max(trivial);
310+
}
311+
Some(trivial)
312+
}
313+
LogicalPlanLanguage::AggregateUDFExprFun(AggregateUDFExprFun(fun)) => {
314+
if fun.to_lowercase() == "measure" {
315+
Some(0)
316+
} else {
317+
None
318+
}
319+
}
320+
LogicalPlanLanguage::AggregateFunctionExprFun(AggregateFunctionExprFun(fun)) => {
321+
if matches!(
322+
*fun,
323+
AggregateFunction::Count
324+
| AggregateFunction::Sum
325+
| AggregateFunction::Avg
326+
| AggregateFunction::Min
327+
| AggregateFunction::Max
328+
) {
329+
Some(0)
330+
} else {
331+
None
332+
}
333+
}
334+
LogicalPlanLanguage::AggregateUDFExpr(params) => {
335+
let mut trivial = 0;
336+
for id in params.iter() {
337+
trivial = trivial_push_down(*id)?.max(trivial);
338+
}
339+
Some(trivial + 1)
340+
}
341+
LogicalPlanLanguage::AggregateFunctionExpr(params) => {
342+
let mut trivial = 0;
343+
for id in params.iter() {
344+
trivial = trivial_push_down(*id)?.max(trivial);
345+
}
346+
Some(trivial + 1)
347+
}
348+
_ => None,
349+
}
350+
}
351+
269352
fn make_member_name_to_expr(
270353
egraph: &EGraph<LogicalPlanLanguage, Self>,
271354
enode: &LogicalPlanLanguage,
@@ -1084,6 +1167,7 @@ impl Analysis<LogicalPlanLanguage> for LogicalPlanAnalysis {
10841167
LogicalPlanData {
10851168
original_expr: Self::make_original_expr(egraph, enode),
10861169
member_name_to_expr: Self::make_member_name_to_expr(egraph, enode),
1170+
trivial_push_down: Self::make_trivial_push_down(egraph, enode),
10871171
column: Self::make_column_name(egraph, enode),
10881172
expr_to_alias: Self::make_expr_to_alias(egraph, enode),
10891173
referenced_expr: Self::make_referenced_expr(egraph, enode),
@@ -1099,6 +1183,7 @@ impl Analysis<LogicalPlanLanguage> for LogicalPlanAnalysis {
10991183
let (original_expr, b) = self.merge_option_field(a, b, |d| &mut d.original_expr);
11001184
let (member_name_to_expr, b) =
11011185
self.merge_option_field(a, b, |d| &mut d.member_name_to_expr);
1186+
let (trivial_push_down, b) = self.merge_option_field(a, b, |d| &mut d.trivial_push_down);
11021187
let (column_name_to_alias, b) = self.merge_option_field(a, b, |d| &mut d.expr_to_alias);
11031188
let (referenced_columns, b) = self.merge_option_field(a, b, |d| &mut d.referenced_expr);
11041189
let (constant_in_list, b) = self.merge_option_field(a, b, |d| &mut d.constant_in_list);
@@ -1109,6 +1194,7 @@ impl Analysis<LogicalPlanLanguage> for LogicalPlanAnalysis {
11091194
let (column_name, _) = self.merge_option_field(a, b, |d| &mut d.column);
11101195
original_expr
11111196
| member_name_to_expr
1197+
| trivial_push_down
11121198
| column_name_to_alias
11131199
| referenced_columns
11141200
| constant_in_list

rust/cubesql/cubesql/src/compile/rewrite/converter.rs

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,34 @@ macro_rules! add_expr_list_node {
8787
}};
8888
}
8989

90+
macro_rules! add_binary_expr_list_node {
91+
($graph:expr, $value_expr:expr, $field_variant:ident) => {{
92+
fn to_binary_tree(
93+
graph: &mut EGraph<LogicalPlanLanguage, LogicalPlanAnalysis>,
94+
list: &[Id],
95+
) -> Id {
96+
if list.len() == 0 {
97+
graph.add(LogicalPlanLanguage::$field_variant(Vec::new()))
98+
} else if list.len() == 1 {
99+
let empty = graph.add(LogicalPlanLanguage::$field_variant(Vec::new()));
100+
graph.add(LogicalPlanLanguage::$field_variant(vec![list[0], empty]))
101+
} else if list.len() == 2 {
102+
graph.add(LogicalPlanLanguage::$field_variant(vec![list[0], list[1]]))
103+
} else {
104+
let middle = list.len() / 2;
105+
let left = to_binary_tree(graph, &list[..middle]);
106+
let right = to_binary_tree(graph, &list[middle..]);
107+
graph.add(LogicalPlanLanguage::$field_variant(vec![left, right]))
108+
}
109+
}
110+
let list = $value_expr
111+
.iter()
112+
.map(|expr| Self::add_expr($graph, expr))
113+
.collect::<Result<Vec<_>, _>>()?;
114+
to_binary_tree($graph, &list)
115+
}};
116+
}
117+
90118
macro_rules! add_plan_list_node {
91119
($converter:expr, $value_expr:expr, $field_variant:ident) => {{
92120
let list = $value_expr
@@ -353,7 +381,7 @@ impl LogicalPlanToLanguageConverter {
353381
pub fn add_logical_plan(&mut self, plan: &LogicalPlan) -> Result<Id, CubeError> {
354382
Ok(match plan {
355383
LogicalPlan::Projection(node) => {
356-
let expr = add_expr_list_node!(&mut self.graph, node.expr, ProjectionExpr);
384+
let expr = add_binary_expr_list_node!(&mut self.graph, node.expr, ProjectionExpr);
357385
let input = self.add_logical_plan(node.input.as_ref())?;
358386
let alias = add_data_node!(self, node.alias, ProjectionAlias);
359387
let split = add_data_node!(self, false, ProjectionSplit);
@@ -375,10 +403,13 @@ impl LogicalPlanToLanguageConverter {
375403
}
376404
LogicalPlan::Aggregate(node) => {
377405
let input = self.add_logical_plan(node.input.as_ref())?;
378-
let group_expr =
379-
add_expr_list_node!(&mut self.graph, node.group_expr, AggregateGroupExpr);
406+
let group_expr = add_binary_expr_list_node!(
407+
&mut self.graph,
408+
node.group_expr,
409+
AggregateGroupExpr
410+
);
380411
let aggr_expr =
381-
add_expr_list_node!(&mut self.graph, node.aggr_expr, AggregateAggrExpr);
412+
add_binary_expr_list_node!(&mut self.graph, node.aggr_expr, AggregateAggrExpr);
382413
let split = add_data_node!(self, false, AggregateSplit);
383414
self.graph.add(LogicalPlanLanguage::Aggregate([
384415
input, group_expr, aggr_expr, split,

rust/cubesql/cubesql/src/compile/rewrite/rules/split.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,9 @@ impl RewriteRules for SplitRules {
8484
"ProjectionSplit:true",
8585
),
8686
self.split_projection_aggregate(
87+
None,
88+
Some("?aggr_expr"),
89+
Some("?group_expr"),
8790
"?alias_to_cube",
8891
"?inner_aggregate_cube",
8992
"?outer_projection_cube",
@@ -132,6 +135,8 @@ impl RewriteRules for SplitRules {
132135
"ProjectionSplit:true",
133136
),
134137
self.split_projection_aggregate(
138+
Some("?expr"),
139+
None, None,
135140
"?alias_to_cube",
136141
"?inner_aggregate_cube",
137142
"?outer_projection_cube",
@@ -180,6 +185,8 @@ impl RewriteRules for SplitRules {
180185
"ProjectionSplit:true",
181186
),
182187
self.split_projection_aggregate(
188+
Some("?expr"),
189+
None, None,
183190
"?alias_to_cube",
184191
"?inner_aggregate_cube",
185192
"?outer_projection_cube",
@@ -284,6 +291,8 @@ impl RewriteRules for SplitRules {
284291
"AggregateSplit:true",
285292
),
286293
self.split_aggregate_aggregate(
294+
"?aggr_expr",
295+
"?group_expr",
287296
"?alias_to_cube",
288297
"?inner_aggregate_cube",
289298
"?outer_aggregate_cube",
@@ -5960,11 +5969,17 @@ impl SplitRules {
59605969

59615970
fn split_projection_aggregate(
59625971
&self,
5972+
projection_expr_var: Option<&'static str>,
5973+
aggr_expr_var: Option<&'static str>,
5974+
group_expr_var: Option<&'static str>,
59635975
alias_to_cube_var: &'static str,
59645976
inner_aggregate_cube_expr_var: &'static str,
59655977
outer_projection_cube_expr_var: &'static str,
59665978
projection_alias_var: &'static str,
59675979
) -> impl Fn(&mut EGraph<LogicalPlanLanguage, LogicalPlanAnalysis>, &mut Subst) -> bool {
5980+
let projection_expr_var = projection_expr_var.map(|v| var!(v));
5981+
let aggr_expr_var = aggr_expr_var.map(|v| var!(v));
5982+
let group_expr_var = group_expr_var.map(|v| var!(v));
59685983
let alias_to_cube_var = var!(alias_to_cube_var);
59695984
let inner_aggregate_cube_expr_var = var!(inner_aggregate_cube_expr_var);
59705985
let outer_projection_cube_expr_var = var!(outer_projection_cube_expr_var);
@@ -5973,6 +5988,20 @@ impl SplitRules {
59735988
for alias_to_cube in
59745989
var_iter!(egraph[subst[alias_to_cube_var]], CubeScanAliasToCube).cloned()
59755990
{
5991+
if matches!(
5992+
projection_expr_var.and_then(|v| egraph[subst[v]].data.trivial_push_down),
5993+
Some(0) | Some(1)
5994+
) {
5995+
continue;
5996+
} else if matches!(
5997+
aggr_expr_var
5998+
.and_then(|v| egraph[subst[v]].data.trivial_push_down)
5999+
.zip(group_expr_var.and_then(|v| egraph[subst[v]].data.trivial_push_down))
6000+
.map(|(a, b)| a.max(b)),
6001+
Some(0) | Some(1)
6002+
) {
6003+
continue;
6004+
}
59766005
subst.insert(
59776006
projection_alias_var,
59786007
// Do not put alias on inner projection so table name from cube scan can be reused
@@ -6111,17 +6140,31 @@ impl SplitRules {
61116140

61126141
fn split_aggregate_aggregate(
61136142
&self,
6143+
aggr_expr_var: &'static str,
6144+
group_expr_var: &'static str,
61146145
alias_to_cube_var: &'static str,
61156146
inner_aggregate_cube_expr_var: &'static str,
61166147
outer_aggregate_cube_expr_var: &'static str,
61176148
) -> impl Fn(&mut EGraph<LogicalPlanLanguage, LogicalPlanAnalysis>, &mut Subst) -> bool {
61186149
let alias_to_cube_var = var!(alias_to_cube_var);
6150+
let aggr_expr_var = var!(aggr_expr_var);
6151+
let group_expr_var = var!(group_expr_var);
61196152
let inner_aggregate_cube_expr_var = var!(inner_aggregate_cube_expr_var);
61206153
let outer_aggregate_cube_expr_var = var!(outer_aggregate_cube_expr_var);
61216154
move |egraph, subst| {
61226155
for alias_to_cube in
61236156
var_iter!(egraph[subst[alias_to_cube_var]], CubeScanAliasToCube).cloned()
61246157
{
6158+
if matches!(
6159+
egraph[subst[aggr_expr_var]]
6160+
.data
6161+
.trivial_push_down
6162+
.zip(egraph[subst[group_expr_var]].data.trivial_push_down)
6163+
.map(|(a, b)| a.max(b)),
6164+
Some(0) | Some(1)
6165+
) {
6166+
continue;
6167+
}
61256168
subst.insert(
61266169
inner_aggregate_cube_expr_var,
61276170
egraph.add(LogicalPlanLanguage::InnerAggregateSplitReplacerAliasToCube(

0 commit comments

Comments
 (0)