Skip to content

Commit 18097a7

Browse files
committed
chore(cubesql): Reaggregation improvements
1 parent 7109039 commit 18097a7

File tree

5 files changed

+233
-75
lines changed

5 files changed

+233
-75
lines changed

rust/cubesql/cubesql/src/compile/mod.rs

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16279,4 +16279,78 @@ ORDER BY \"COUNT(count)\" DESC"
1627916279
}
1628016280
)
1628116281
}
16282+
16283+
#[tokio::test]
16284+
async fn test_metabase_substring_postaggr() {
16285+
init_logger();
16286+
16287+
let logical_plan = convert_select_to_query_plan(
16288+
r#"
16289+
SELECT
16290+
avgPrice avgPrice,
16291+
countDistinct countDistinct,
16292+
customer_gender customer_gender,
16293+
SUBSTRING(customer_gender FROM 1 FOR 1234) substring_400
16294+
FROM KibanaSampleDataEcommerce
16295+
"#
16296+
.to_string(),
16297+
DatabaseProtocol::PostgreSQL,
16298+
)
16299+
.await
16300+
.as_logical_plan();
16301+
16302+
assert_eq!(
16303+
logical_plan.find_cube_scan().request,
16304+
V1LoadRequestQuery {
16305+
measures: Some(vec![
16306+
"KibanaSampleDataEcommerce.avgPrice".to_string(),
16307+
"KibanaSampleDataEcommerce.countDistinct".to_string(),
16308+
]),
16309+
dimensions: Some(vec!["KibanaSampleDataEcommerce.customer_gender".to_string()]),
16310+
segments: Some(vec![]),
16311+
time_dimensions: None,
16312+
order: None,
16313+
limit: None,
16314+
offset: None,
16315+
filters: None,
16316+
}
16317+
)
16318+
}
16319+
16320+
#[tokio::test]
16321+
async fn test_reaggregate_without_aliases() {
16322+
init_logger();
16323+
16324+
let logical_plan = convert_select_to_query_plan(
16325+
r#"
16326+
SELECT
16327+
EXTRACT(YEAR FROM order_date),
16328+
CHAR_LENGTH(customer_gender),
16329+
count
16330+
FROM KibanaSampleDataEcommerce
16331+
"#
16332+
.to_string(),
16333+
DatabaseProtocol::PostgreSQL,
16334+
)
16335+
.await
16336+
.as_logical_plan();
16337+
16338+
assert_eq!(
16339+
logical_plan.find_cube_scan().request,
16340+
V1LoadRequestQuery {
16341+
measures: Some(vec!["KibanaSampleDataEcommerce.count".to_string()]),
16342+
dimensions: Some(vec!["KibanaSampleDataEcommerce.customer_gender".to_string()]),
16343+
segments: Some(vec![]),
16344+
time_dimensions: Some(vec![V1LoadRequestQueryTimeDimension {
16345+
dimension: "KibanaSampleDataEcommerce.order_date".to_string(),
16346+
granularity: Some("year".to_string()),
16347+
date_range: None,
16348+
}]),
16349+
order: None,
16350+
limit: None,
16351+
offset: None,
16352+
filters: None,
16353+
}
16354+
)
16355+
}
1628216356
}

rust/cubesql/cubesql/src/compile/rewrite/analysis.rs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ pub struct LogicalPlanData {
3030
pub original_expr: Option<Expr>,
3131
pub member_name_to_expr: Option<Vec<(Option<String>, Expr)>>,
3232
pub column: Option<Column>,
33-
pub expr_to_alias: Option<Vec<(Expr, String)>>,
33+
pub expr_to_alias: Option<Vec<(Expr, String, Option<bool>)>>,
3434
pub referenced_expr: Option<Vec<Expr>>,
3535
pub constant: Option<ConstantFolding>,
3636
pub constant_in_list: Option<Vec<ScalarValue>>,
@@ -227,7 +227,7 @@ impl LogicalPlanAnalysis {
227227
fn make_expr_to_alias(
228228
egraph: &EGraph<LogicalPlanLanguage, Self>,
229229
enode: &LogicalPlanLanguage,
230-
) -> Option<Vec<(Expr, String)>> {
230+
) -> Option<Vec<(Expr, String, Option<bool>)>> {
231231
let original_expr = |id| egraph.index(id).data.original_expr.clone();
232232
let id_to_column_name = |id| egraph.index(id).data.column.clone();
233233
let column_name_to_alias = |id| egraph.index(id).data.expr_to_alias.clone();
@@ -240,16 +240,27 @@ impl LogicalPlanAnalysis {
240240
.next()
241241
.unwrap()
242242
.to_string(),
243+
Some(true),
243244
));
244245
Some(map)
245246
}
246247
LogicalPlanLanguage::ProjectionExpr(params) => {
247248
for id in params.iter() {
248249
if let Some(col_name) = id_to_column_name(*id) {
249-
map.push((original_expr(*id)?, col_name.name.to_string()));
250-
} else {
251-
map.extend(column_name_to_alias(*id)?.into_iter());
250+
map.push((original_expr(*id)?, col_name.name.to_string(), None));
251+
continue;
252+
}
253+
if let Some(expr) = original_expr(*id) {
254+
match expr {
255+
Expr::Alias(_, _) => (),
256+
expr @ _ => {
257+
let expr_name = expr.name(&DFSchema::empty());
258+
map.push((expr, expr_name.ok()?, Some(false)));
259+
continue;
260+
}
261+
};
252262
}
263+
map.extend(column_name_to_alias(*id)?.into_iter());
253264
}
254265
Some(map)
255266
}

rust/cubesql/cubesql/src/compile/rewrite/rules/members.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1306,7 +1306,7 @@ impl MemberRules {
13061306
let column_name_to_alias = expr_to_alias
13071307
.clone()
13081308
.into_iter()
1309-
.map(|(e, a)| (expr_column_name_with_relation(e, &mut relation), a))
1309+
.map(|(e, a, _)| (expr_column_name_with_relation(e, &mut relation), a))
13101310
.collect::<Vec<_>>();
13111311
if let Some(member_name_to_expr) = egraph
13121312
.index(subst[members_var])

rust/cubesql/cubesql/src/compile/rewrite/rules/split.rs

Lines changed: 132 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use crate::{
2525
};
2626
use datafusion::{
2727
arrow::datatypes::DataType as ArrowDataType,
28-
logical_plan::{Column, DFSchema, Operator},
28+
logical_plan::{Column, DFSchema, Expr, Operator},
2929
physical_plan::aggregates::AggregateFunction,
3030
scalar::ScalarValue,
3131
};
@@ -128,7 +128,6 @@ impl RewriteRules for SplitRules {
128128
"?projection_alias",
129129
),
130130
),
131-
// TODO: reaggregate rule requires aliases for all exprs in projection
132131
transforming_rewrite(
133132
"split-reaggregate-projection",
134133
projection(
@@ -2059,6 +2058,27 @@ impl RewriteRules for SplitRules {
20592058
),
20602059
inner_aggregate_split_replacer("?expr", "?cube"),
20612060
),
2061+
rewrite(
2062+
"split-push-down-substr-outer-replacer-metabase",
2063+
// Reaggregation may not be possible in all cases and won't change the final result
2064+
// for SUBSTRING(column, 1, 1234) issued by Metabase
2065+
outer_projection_split_replacer(
2066+
fun_expr("Substr", vec![
2067+
column_expr("?column"),
2068+
literal_int(1),
2069+
literal_int(1234),
2070+
]),
2071+
"?alias_to_cube",
2072+
),
2073+
fun_expr(
2074+
"Substr",
2075+
vec![
2076+
outer_projection_split_replacer(column_expr("?column"), "?alias_to_cube"),
2077+
literal_int(1),
2078+
literal_int(1234),
2079+
],
2080+
),
2081+
),
20622082
// Alias
20632083
rewrite(
20642084
"split-push-down-alias-inner-replacer",
@@ -3676,8 +3696,7 @@ impl RewriteRules for SplitRules {
36763696
)
36773697
},
36783698
|_, _| true,
3679-
// TODO: change to false after post-aggregation improvements
3680-
true,
3699+
false,
36813700
false,
36823701
true,
36833702
Some(vec![("?expr", column_expr("?column"))]),
@@ -4522,30 +4541,27 @@ impl SplitRules {
45224541
for column in var_iter!(egraph[subst[column_var]], ColumnExprColumn).cloned() {
45234542
if let Some((_, cube)) = meta.find_cube_by_column(&alias_to_cube, &column) {
45244543
if let Some(measure) = cube.lookup_measure(&column.name) {
4525-
if measure.agg_type.is_none() {
4526-
continue;
4544+
if let Some(agg_type) = &measure.agg_type {
4545+
if let Some(output_fun) = utils::reaggragate_fun(agg_type) {
4546+
subst.insert(
4547+
output_fun_var,
4548+
egraph.add(
4549+
LogicalPlanLanguage::AggregateFunctionExprFun(
4550+
AggregateFunctionExprFun(output_fun),
4551+
),
4552+
),
4553+
);
4554+
subst.insert(
4555+
distinct_var,
4556+
egraph.add(
4557+
LogicalPlanLanguage::AggregateFunctionExprDistinct(
4558+
AggregateFunctionExprDistinct(false),
4559+
),
4560+
),
4561+
);
4562+
return true;
4563+
}
45274564
}
4528-
4529-
let output_fun = match measure.agg_type.as_ref().unwrap().as_str() {
4530-
"count" => AggregateFunction::Sum,
4531-
"sum" => AggregateFunction::Sum,
4532-
"min" => AggregateFunction::Min,
4533-
"max" => AggregateFunction::Max,
4534-
_ => continue,
4535-
};
4536-
subst.insert(
4537-
output_fun_var,
4538-
egraph.add(LogicalPlanLanguage::AggregateFunctionExprFun(
4539-
AggregateFunctionExprFun(output_fun),
4540-
)),
4541-
);
4542-
subst.insert(
4543-
distinct_var,
4544-
egraph.add(LogicalPlanLanguage::AggregateFunctionExprDistinct(
4545-
AggregateFunctionExprDistinct(false),
4546-
)),
4547-
);
4548-
return true;
45494565
}
45504566
}
45514567
}
@@ -4615,6 +4631,7 @@ impl SplitRules {
46154631
let group_aggregate_cube_var = var!(group_aggregate_cube_var);
46164632
let new_expr_var = var!(new_expr_var);
46174633
let inner_projection_alias_var = var!(inner_projection_alias_var);
4634+
let meta = self.cube_context.meta.clone();
46184635
move |egraph, subst| {
46194636
if let Some(expr_to_alias) =
46204637
&egraph.index(subst[projection_expr_var]).data.expr_to_alias
@@ -4623,55 +4640,102 @@ impl SplitRules {
46234640
var_iter!(egraph[subst[alias_to_cube_var]], CubeScanAliasToCube).cloned()
46244641
{
46254642
// Replace outer projection columns with unqualified variants
4626-
let expr = expr_to_alias
4643+
if let Some(expr_name_to_alias) = expr_to_alias
46274644
.clone()
46284645
.into_iter()
4629-
.map(|(_, a)| {
4630-
let column = Column::from_name(a);
4631-
let column_expr_column = egraph.add(
4632-
LogicalPlanLanguage::ColumnExprColumn(ColumnExprColumn(column)),
4633-
);
4634-
egraph.add(LogicalPlanLanguage::ColumnExpr([column_expr_column]))
4646+
.map(|(expr, alias, explicit)| {
4647+
let default_alias = Some((alias.clone(), None));
4648+
if explicit == Some(true) {
4649+
return default_alias;
4650+
}
4651+
if let Expr::Column(column) = &expr {
4652+
if let Some((_, cube)) =
4653+
meta.find_cube_by_column(&alias_to_cube, column)
4654+
{
4655+
if let Some(measure) = cube.lookup_measure(&column.name) {
4656+
if let Some(agg_type) = &measure.agg_type {
4657+
let aggr_expr = Expr::AggregateFunction {
4658+
fun: utils::reaggragate_fun(&agg_type)?,
4659+
args: vec![expr],
4660+
distinct: false,
4661+
};
4662+
let expr_name =
4663+
aggr_expr.name(&DFSchema::empty()).ok()?;
4664+
return Some((expr_name, Some(alias)));
4665+
}
4666+
}
4667+
}
4668+
}
4669+
default_alias
46354670
})
4636-
.collect::<Vec<_>>();
4637-
let mut projection_expr =
4638-
egraph.add(LogicalPlanLanguage::ProjectionExpr(vec![]));
4639-
for i in expr.into_iter().rev() {
4640-
projection_expr = egraph.add(LogicalPlanLanguage::ProjectionExpr(vec![
4641-
i,
4642-
projection_expr,
4643-
]));
4644-
}
4645-
subst.insert(new_expr_var, projection_expr);
4671+
.collect::<Option<Vec<_>>>()
4672+
{
4673+
let expr = expr_name_to_alias
4674+
.into_iter()
4675+
.map(|(name, alias)| {
4676+
let column = Column::from_name(name);
4677+
let column_expr_column = egraph.add(
4678+
LogicalPlanLanguage::ColumnExprColumn(ColumnExprColumn(column)),
4679+
);
4680+
let column_expr = egraph
4681+
.add(LogicalPlanLanguage::ColumnExpr([column_expr_column]));
4682+
if let Some(alias) = alias {
4683+
let alias_expr_alias = egraph.add(
4684+
LogicalPlanLanguage::AliasExprAlias(AliasExprAlias(alias)),
4685+
);
4686+
return egraph.add(LogicalPlanLanguage::AliasExpr([
4687+
column_expr,
4688+
alias_expr_alias,
4689+
]));
4690+
}
4691+
column_expr
4692+
})
4693+
.collect::<Vec<_>>();
46464694

4647-
subst.insert(
4648-
inner_projection_alias_var,
4649-
// Do not put alias on inner projection so table name from cube scan can be reused
4650-
egraph.add(LogicalPlanLanguage::ProjectionAlias(ProjectionAlias(None))),
4651-
);
4695+
let mut projection_expr =
4696+
egraph.add(LogicalPlanLanguage::ProjectionExpr(vec![]));
4697+
for i in expr.into_iter().rev() {
4698+
projection_expr =
4699+
egraph.add(LogicalPlanLanguage::ProjectionExpr(vec![
4700+
i,
4701+
projection_expr,
4702+
]));
4703+
}
4704+
subst.insert(new_expr_var, projection_expr);
46524705

4653-
subst.insert(
4654-
inner_aggregate_cube_var,
4655-
egraph.add(LogicalPlanLanguage::InnerAggregateSplitReplacerAliasToCube(
4656-
InnerAggregateSplitReplacerAliasToCube(alias_to_cube.clone()),
4657-
)),
4658-
);
4706+
subst.insert(
4707+
inner_projection_alias_var,
4708+
// Do not put alias on inner projection so table name from cube scan can be reused
4709+
egraph.add(LogicalPlanLanguage::ProjectionAlias(ProjectionAlias(None))),
4710+
);
46594711

4660-
subst.insert(
4661-
group_expr_cube_var,
4662-
egraph.add(LogicalPlanLanguage::GroupExprSplitReplacerAliasToCube(
4663-
GroupExprSplitReplacerAliasToCube(alias_to_cube.clone()),
4664-
)),
4665-
);
4712+
subst.insert(
4713+
inner_aggregate_cube_var,
4714+
egraph.add(
4715+
LogicalPlanLanguage::InnerAggregateSplitReplacerAliasToCube(
4716+
InnerAggregateSplitReplacerAliasToCube(alias_to_cube.clone()),
4717+
),
4718+
),
4719+
);
46664720

4667-
subst.insert(
4668-
group_aggregate_cube_var,
4669-
egraph.add(LogicalPlanLanguage::GroupAggregateSplitReplacerAliasToCube(
4670-
GroupAggregateSplitReplacerAliasToCube(alias_to_cube.clone()),
4671-
)),
4672-
);
4721+
subst.insert(
4722+
group_expr_cube_var,
4723+
egraph.add(LogicalPlanLanguage::GroupExprSplitReplacerAliasToCube(
4724+
GroupExprSplitReplacerAliasToCube(alias_to_cube.clone()),
4725+
)),
4726+
);
46734727

4674-
return true;
4728+
subst.insert(
4729+
group_aggregate_cube_var,
4730+
egraph.add(
4731+
LogicalPlanLanguage::GroupAggregateSplitReplacerAliasToCube(
4732+
GroupAggregateSplitReplacerAliasToCube(alias_to_cube.clone()),
4733+
),
4734+
),
4735+
);
4736+
4737+
return true;
4738+
}
46754739
}
46764740
}
46774741
false

0 commit comments

Comments
 (0)