Skip to content

Commit df3334c

Browse files
authored
fix(cubesql): Don't push down aggregate to grouped query with filters
1 parent f90a384 commit df3334c

File tree

2 files changed

+132
-56
lines changed

2 files changed

+132
-56
lines changed

rust/cubesql/cubesql/src/compile/mod.rs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19367,4 +19367,61 @@ LIMIT {{ limit }}{% endif %}"#.to_string(),
1936719367
assert!(sql.contains(" AS DOUBLE PRECISION)"));
1936819368
assert!(sql.contains(" AS DECIMAL(38,10))"));
1936919369
}
19370+
19371+
#[tokio::test]
19372+
async fn test_push_down_to_grouped_query_with_filters() {
19373+
if !Rewriter::sql_push_down_enabled() {
19374+
return;
19375+
}
19376+
init_testing_logger();
19377+
19378+
let logical_plan = convert_select_to_query_plan(
19379+
r#"
19380+
select
19381+
sum(t1.sum)
19382+
from (
19383+
select
19384+
customer_gender,
19385+
measure(sumPrice) as sum
19386+
from KibanaSampleDataEcommerce
19387+
where order_date >= '2024-01-01'
19388+
and order_date <= '2024-02-29'
19389+
group by 1
19390+
having measure(sumPrice) >= 5
19391+
) t1
19392+
;"#
19393+
.to_string(),
19394+
DatabaseProtocol::PostgreSQL,
19395+
)
19396+
.await
19397+
.as_logical_plan();
19398+
19399+
assert_eq!(
19400+
logical_plan.find_cube_scan().request,
19401+
V1LoadRequestQuery {
19402+
measures: Some(vec!["KibanaSampleDataEcommerce.sumPrice".to_string()]),
19403+
dimensions: Some(vec!["KibanaSampleDataEcommerce.customer_gender".to_string()]),
19404+
segments: Some(vec![]),
19405+
time_dimensions: Some(vec![V1LoadRequestQueryTimeDimension {
19406+
dimension: "KibanaSampleDataEcommerce.order_date".to_string(),
19407+
granularity: None,
19408+
date_range: Some(json!(vec![
19409+
"2024-01-01".to_string(),
19410+
"2024-02-29".to_string()
19411+
]))
19412+
}]),
19413+
order: None,
19414+
limit: None,
19415+
offset: None,
19416+
filters: Some(vec![V1LoadRequestQueryFilterItem {
19417+
member: Some("KibanaSampleDataEcommerce.sumPrice".to_string()),
19418+
operator: Some("gte".to_string()),
19419+
values: Some(vec!["5".to_string()]),
19420+
or: None,
19421+
and: None
19422+
}]),
19423+
ungrouped: None,
19424+
}
19425+
)
19426+
}
1937019427
}

rust/cubesql/cubesql/src/compile/rewrite/rules/members.rs

Lines changed: 75 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,8 @@ impl RewriteRules for MemberRules {
178178
"?can_pushdown_join",
179179
"?member_pushdown_replacer_alias_to_cube",
180180
"?new_pushdown_join",
181+
"?ungrouped",
182+
"?filters",
181183
),
182184
),
183185
transforming_rewrite(
@@ -1451,6 +1453,8 @@ impl MemberRules {
14511453
can_pushdown_join_var: &'static str,
14521454
member_pushdown_replacer_alias_to_cube_var: &'static str,
14531455
new_pushdown_join_var: &'static str,
1456+
ungrouped_var: &'static str,
1457+
filters_var: &'static str,
14541458
) -> impl Fn(&mut EGraph<LogicalPlanLanguage, LogicalPlanAnalysis>, &mut Subst) -> bool {
14551459
let alias_to_cube_var = var!(alias_to_cube_var);
14561460
let group_expr_var = var!(group_expr_var);
@@ -1460,74 +1464,89 @@ impl MemberRules {
14601464
let member_pushdown_replacer_alias_to_cube_var =
14611465
var!(member_pushdown_replacer_alias_to_cube_var);
14621466
let new_pushdown_join_var = var!(new_pushdown_join_var);
1467+
let ungrouped_var = var!(ungrouped_var);
1468+
let filters_var = var!(filters_var);
1469+
let enable_ungrouped = self.enable_ungrouped;
14631470
move |egraph, subst| {
1471+
let Some(referenced_group_expr) =
1472+
&egraph.index(subst[group_expr_var]).data.referenced_expr
1473+
else {
1474+
return false;
1475+
};
1476+
1477+
let Some(referenced_aggr_expr) =
1478+
&egraph.index(subst[aggregate_expr_var]).data.referenced_expr
1479+
else {
1480+
return false;
1481+
};
1482+
1483+
if enable_ungrouped {
1484+
// Pushing down members might eliminate dimensions, so if the query is grouped
1485+
// and contains filters over measures, the results will be incorrect.
1486+
for ungrouped in var_iter!(egraph[subst[ungrouped_var]], CubeScanUngrouped) {
1487+
if *ungrouped {
1488+
continue;
1489+
}
1490+
let Some(empty_filters) = &egraph.index(subst[filters_var]).data.is_empty_list
1491+
else {
1492+
return false;
1493+
};
1494+
if !empty_filters {
1495+
return false;
1496+
}
1497+
}
1498+
}
1499+
1500+
let mut columns = HashSet::new();
1501+
columns.extend(referenced_columns(referenced_group_expr).into_iter());
1502+
columns.extend(referenced_columns(referenced_aggr_expr).into_iter());
1503+
1504+
let new_pushdown_join = referenced_aggr_expr.is_empty();
1505+
14641506
let aliases_to_cube: Vec<_> =
14651507
var_iter!(egraph[subst[alias_to_cube_var]], CubeScanAliasToCube)
14661508
.cloned()
14671509
.collect();
14681510

14691511
for alias_to_cube in aliases_to_cube {
1470-
if let Some(referenced_group_expr) =
1471-
&egraph.index(subst[group_expr_var]).data.referenced_expr
1472-
{
1473-
if let Some(referenced_aggr_expr) =
1474-
&egraph.index(subst[aggregate_expr_var]).data.referenced_expr
1475-
{
1476-
let mut columns = HashSet::new();
1477-
columns.extend(referenced_columns(referenced_group_expr).into_iter());
1478-
columns.extend(referenced_columns(referenced_aggr_expr).into_iter());
1479-
1480-
let new_pushdown_join = referenced_aggr_expr.is_empty();
1512+
let can_pushdown_joins: Vec<_> = var_iter!(
1513+
egraph[subst[can_pushdown_join_var]],
1514+
CubeScanCanPushdownJoin
1515+
)
1516+
.cloned()
1517+
.collect();
14811518

1482-
let can_pushdown_joins: Vec<_> = var_iter!(
1483-
egraph[subst[can_pushdown_join_var]],
1484-
CubeScanCanPushdownJoin
1485-
)
1486-
.cloned()
1487-
.collect();
1519+
for can_pushdown_join in can_pushdown_joins {
1520+
if let Some(member_names_to_expr) = &mut egraph
1521+
.index_mut(subst[members_var])
1522+
.data
1523+
.member_name_to_expr
1524+
{
1525+
// TODO default count member is not in the columns set but it should be there
14881526

1489-
for can_pushdown_join in can_pushdown_joins {
1490-
if let Some(member_names_to_expr) = &mut egraph
1491-
.index_mut(subst[members_var])
1492-
.data
1493-
.member_name_to_expr
1494-
{
1495-
// TODO default count member is not in the columns set but it should be there
1496-
1497-
if columns.iter().all(|c| {
1498-
LogicalPlanData::do_find_member_by_alias(
1499-
member_names_to_expr,
1500-
c,
1501-
)
1502-
.is_some()
1503-
}) {
1504-
let member_pushdown_replacer_alias_to_cube = egraph.add(
1505-
LogicalPlanLanguage::MemberPushdownReplacerAliasToCube(
1506-
MemberPushdownReplacerAliasToCube(
1507-
Self::member_replacer_alias_to_cube(
1508-
&alias_to_cube,
1509-
&None,
1510-
),
1511-
),
1512-
),
1513-
);
1527+
if columns.iter().all(|c| {
1528+
LogicalPlanData::do_find_member_by_alias(member_names_to_expr, c)
1529+
.is_some()
1530+
}) {
1531+
let member_pushdown_replacer_alias_to_cube =
1532+
egraph.add(LogicalPlanLanguage::MemberPushdownReplacerAliasToCube(
1533+
MemberPushdownReplacerAliasToCube(
1534+
Self::member_replacer_alias_to_cube(&alias_to_cube, &None),
1535+
),
1536+
));
15141537

1515-
subst.insert(
1516-
member_pushdown_replacer_alias_to_cube_var,
1517-
member_pushdown_replacer_alias_to_cube,
1518-
);
1538+
subst.insert(
1539+
member_pushdown_replacer_alias_to_cube_var,
1540+
member_pushdown_replacer_alias_to_cube,
1541+
);
15191542

1520-
let new_pushdown_join =
1521-
egraph.add(LogicalPlanLanguage::CubeScanCanPushdownJoin(
1522-
CubeScanCanPushdownJoin(
1523-
new_pushdown_join && can_pushdown_join,
1524-
),
1525-
));
1526-
subst.insert(new_pushdown_join_var, new_pushdown_join);
1543+
let new_pushdown_join =
1544+
egraph.add(LogicalPlanLanguage::CubeScanCanPushdownJoin(
1545+
CubeScanCanPushdownJoin(new_pushdown_join && can_pushdown_join),
1546+
));
1547+
subst.insert(new_pushdown_join_var, new_pushdown_join);
15271548

1528-
return true;
1529-
}
1530-
}
1549+
return true;
15311550
}
15321551
}
15331552
}

0 commit comments

Comments
 (0)