Skip to content

Commit 5b10a68

Browse files
authored
fix(cubesql): Allow aggregation pushdown only for unlimited CubeScan (#8929)
Also disallow pulling up push-to-Cube wrapper over CubeScan with limit
1 parent e9fdf95 commit 5b10a68

File tree

3 files changed

+155
-7
lines changed

3 files changed

+155
-7
lines changed

rust/cubesql/cubesql/src/compile/rewrite/rules/members.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,8 +137,11 @@ impl RewriteRules for MemberRules {
137137
"?old_members",
138138
"?filters",
139139
"?orders",
140-
"?limit",
141-
"?offset",
140+
// If CubeScan already have limit and offset it would be incorrect to push aggregation into it
141+
// Aggregate(CubeScan(limit, offset)) would run aggregation over limited rows
142+
// CubeScan(aggregation, limit, offset) would return limited groups
143+
"CubeScanLimit:None",
144+
"CubeScanOffset:None",
142145
"?split",
143146
"?can_pushdown_join",
144147
"CubeScanWrapped:false",
@@ -164,8 +167,8 @@ impl RewriteRules for MemberRules {
164167
),
165168
"?filters",
166169
"?orders",
167-
"?limit",
168-
"?offset",
170+
"CubeScanLimit:None",
171+
"CubeScanOffset:None",
169172
"?split",
170173
"?new_pushdown_join",
171174
"CubeScanWrapped:false",

rust/cubesql/cubesql/src/compile/rewrite/rules/wrapper/cube_scan_wrapper.rs

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ use crate::{
33
cube_scan, cube_scan_wrapper, rewrite,
44
rewriter::{CubeEGraph, CubeRewrite},
55
rules::wrapper::WrapperRules,
6-
transforming_rewrite, wrapper_pullup_replacer, CubeScanAliasToCube, CubeScanUngrouped,
7-
LogicalPlanLanguage, WrapperPullupReplacerAliasToCube, WrapperPullupReplacerPushToCube,
6+
transforming_rewrite, wrapper_pullup_replacer, CubeScanAliasToCube, CubeScanLimit,
7+
CubeScanOffset, CubeScanUngrouped, LogicalPlanLanguage, WrapperPullupReplacerAliasToCube,
8+
WrapperPullupReplacerPushToCube,
89
},
910
var, var_iter,
1011
};
@@ -51,6 +52,8 @@ impl WrapperRules {
5152
self.transform_wrap_cube_scan(
5253
"?members",
5354
"?alias_to_cube",
55+
"?limit",
56+
"?offset",
5457
"?ungrouped",
5558
"?alias_to_cube_out",
5659
"?push_to_cube_out",
@@ -77,27 +80,44 @@ impl WrapperRules {
7780
&self,
7881
members_var: &'static str,
7982
alias_to_cube_var: &'static str,
83+
limit_var: &'static str,
84+
offset_var: &'static str,
8085
ungrouped_cube_var: &'static str,
8186
alias_to_cube_var_out: &'static str,
8287
push_to_cube_out_var: &'static str,
8388
) -> impl Fn(&mut CubeEGraph, &mut Subst) -> bool {
8489
let members_var = var!(members_var);
8590
let alias_to_cube_var = var!(alias_to_cube_var);
91+
let limit_var = var!(limit_var);
92+
let offset_var = var!(offset_var);
8693
let ungrouped_cube_var = var!(ungrouped_cube_var);
8794
let alias_to_cube_var_out = var!(alias_to_cube_var_out);
8895
let push_to_cube_out_var = var!(push_to_cube_out_var);
8996
move |egraph, subst| {
97+
let mut has_no_limit_or_offset = true;
98+
for limit in var_iter!(egraph[subst[limit_var]], CubeScanLimit).cloned() {
99+
has_no_limit_or_offset &= limit.is_none();
100+
}
101+
for offset in var_iter!(egraph[subst[offset_var]], CubeScanOffset).cloned() {
102+
has_no_limit_or_offset &= offset.is_none();
103+
}
104+
90105
if let Some(_) = egraph[subst[members_var]].data.member_name_to_expr {
91106
for alias_to_cube in
92107
var_iter!(egraph[subst[alias_to_cube_var]], CubeScanAliasToCube).cloned()
93108
{
94109
for ungrouped in
95110
var_iter!(egraph[subst[ungrouped_cube_var]], CubeScanUngrouped).cloned()
96111
{
112+
// When CubeScan already has limit or offset, it's unsafe to allow to push
113+
// anything on top to Cube.
114+
// Especially aggregation: aggregate does not commute with limit,
115+
// so it would be incorrect to join them to single CubeScan
116+
let push_to_cube_out = ungrouped && has_no_limit_or_offset;
97117
subst.insert(
98118
push_to_cube_out_var,
99119
egraph.add(LogicalPlanLanguage::WrapperPullupReplacerPushToCube(
100-
WrapperPullupReplacerPushToCube(ungrouped),
120+
WrapperPullupReplacerPushToCube(push_to_cube_out),
101121
)),
102122
);
103123
subst.insert(

rust/cubesql/cubesql/src/compile/test/test_wrapper.rs

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use cubeclient::models::V1LoadRequestQuery;
12
use datafusion::physical_plan::displayable;
23
use pretty_assertions::assert_eq;
34
use serde_json::json;
@@ -1258,3 +1259,127 @@ async fn test_wrapper_filter_flatten() {
12581259
}
12591260
);
12601261
}
1262+
1263+
/// Regular aggregation over CubeScan(limit=n, ungrouped=true) is NOT pushed to CubeScan
1264+
#[tokio::test]
1265+
async fn wrapper_agg_over_limit() {
1266+
if !Rewriter::sql_push_down_enabled() {
1267+
return;
1268+
}
1269+
init_testing_logger();
1270+
1271+
let query_plan = convert_select_to_query_plan(
1272+
// language=PostgreSQL
1273+
r#"
1274+
SELECT
1275+
customer_gender
1276+
FROM (
1277+
SELECT
1278+
customer_gender
1279+
FROM
1280+
KibanaSampleDataEcommerce
1281+
LIMIT 5
1282+
) scan
1283+
GROUP BY
1284+
1
1285+
"#
1286+
.to_string(),
1287+
DatabaseProtocol::PostgreSQL,
1288+
)
1289+
.await;
1290+
1291+
let physical_plan = query_plan.as_physical_plan().await.unwrap();
1292+
println!(
1293+
"Physical plan: {}",
1294+
displayable(physical_plan.as_ref()).indent()
1295+
);
1296+
1297+
let logical_plan = query_plan.as_logical_plan();
1298+
assert_eq!(
1299+
logical_plan.find_cube_scan().request,
1300+
V1LoadRequestQuery {
1301+
measures: Some(vec![]),
1302+
dimensions: Some(vec![]),
1303+
segments: Some(vec![]),
1304+
order: Some(vec![]),
1305+
limit: Some(5),
1306+
ungrouped: Some(true),
1307+
..Default::default()
1308+
}
1309+
);
1310+
1311+
assert!(logical_plan
1312+
.find_cube_scan_wrapper()
1313+
.wrapped_sql
1314+
.unwrap()
1315+
.sql
1316+
.contains("\"limit\": 5"));
1317+
assert!(query_plan
1318+
.as_logical_plan()
1319+
.find_cube_scan_wrapper()
1320+
.wrapped_sql
1321+
.unwrap()
1322+
.sql
1323+
.contains("\"ungrouped\": true"));
1324+
}
1325+
1326+
/// Aggregation(dimension) over CubeScan(limit=n, ungrouped=true) is NOT pushed to CubeScan
1327+
#[tokio::test]
1328+
async fn wrapper_agg_dimension_over_limit() {
1329+
if !Rewriter::sql_push_down_enabled() {
1330+
return;
1331+
}
1332+
init_testing_logger();
1333+
1334+
let query_plan = convert_select_to_query_plan(
1335+
// language=PostgreSQL
1336+
r#"
1337+
SELECT
1338+
MAX(customer_gender)
1339+
FROM (
1340+
SELECT
1341+
customer_gender
1342+
FROM
1343+
KibanaSampleDataEcommerce
1344+
LIMIT 5
1345+
) scan
1346+
"#
1347+
.to_string(),
1348+
DatabaseProtocol::PostgreSQL,
1349+
)
1350+
.await;
1351+
1352+
let physical_plan = query_plan.as_physical_plan().await.unwrap();
1353+
println!(
1354+
"Physical plan: {}",
1355+
displayable(physical_plan.as_ref()).indent()
1356+
);
1357+
1358+
let logical_plan = query_plan.as_logical_plan();
1359+
assert_eq!(
1360+
logical_plan.find_cube_scan().request,
1361+
V1LoadRequestQuery {
1362+
measures: Some(vec![]),
1363+
dimensions: Some(vec![]),
1364+
segments: Some(vec![]),
1365+
order: Some(vec![]),
1366+
limit: Some(5),
1367+
ungrouped: Some(true),
1368+
..Default::default()
1369+
}
1370+
);
1371+
1372+
assert!(logical_plan
1373+
.find_cube_scan_wrapper()
1374+
.wrapped_sql
1375+
.unwrap()
1376+
.sql
1377+
.contains("\"limit\": 5"));
1378+
assert!(query_plan
1379+
.as_logical_plan()
1380+
.find_cube_scan_wrapper()
1381+
.wrapped_sql
1382+
.unwrap()
1383+
.sql
1384+
.contains("\"ungrouped\": true"));
1385+
}

0 commit comments

Comments
 (0)