Skip to content

Commit feaf03b

Browse files
authored
fix(cubesql): Make cube join check stricter (#9043)
Now it should disallow any plans with ungrouped CubeScan inside, like Join(CubeScan, Projection(CubeScan(ungrouped=true))) test_join_cubes_with_postprocessing was broken: it joined grouped CubeScan with ungrouped CubeScan by __cubeJoinField test_join_cubes_with_postprocessing_and_no_cubejoinfield should already be covered by ungrouped-grouped join pushdown
1 parent 0e95f18 commit feaf03b

File tree

3 files changed

+105
-130
lines changed

3 files changed

+105
-130
lines changed

rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -553,6 +553,44 @@ macro_rules! generate_sql_for_timestamp {
553553
}
554554

555555
impl CubeScanWrapperNode {
556+
pub fn has_ungrouped_scan(&self) -> bool {
557+
Self::has_ungrouped_wrapped_node(self.wrapped_plan.as_ref())
558+
}
559+
560+
fn has_ungrouped_wrapped_node(node: &LogicalPlan) -> bool {
561+
match node {
562+
LogicalPlan::Extension(Extension { node }) => {
563+
if let Some(cube_scan) = node.as_any().downcast_ref::<CubeScanNode>() {
564+
cube_scan.request.ungrouped == Some(true)
565+
} else if let Some(wrapped_select) =
566+
node.as_any().downcast_ref::<WrappedSelectNode>()
567+
{
568+
// Don't really care if push-to-Cube or not, any aggregation should be ok here from execution perspective
569+
if wrapped_select.select_type == WrappedSelectType::Aggregate {
570+
false
571+
} else {
572+
Self::has_ungrouped_wrapped_node(wrapped_select.from.as_ref())
573+
|| wrapped_select
574+
.joins
575+
.iter()
576+
.map(|(join, _, _)| join.as_ref())
577+
.any(Self::has_ungrouped_wrapped_node)
578+
|| wrapped_select
579+
.subqueries
580+
.iter()
581+
.map(|subq| subq.as_ref())
582+
.any(Self::has_ungrouped_wrapped_node)
583+
}
584+
} else {
585+
false
586+
}
587+
}
588+
LogicalPlan::EmptyRelation(_) => false,
589+
// Everything else is unexpected actually
590+
_ => false,
591+
}
592+
}
593+
556594
pub async fn generate_sql(
557595
&self,
558596
transport: Arc<dyn TransportService>,

rust/cubesql/cubesql/src/compile/rewrite/converter.rs

Lines changed: 65 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ use datafusion::{
5050
plan::{Aggregate, Extension, Filter, Join, Projection, Sort, TableUDFs, Window},
5151
replace_col_to_expr, Column, CrossJoin, DFField, DFSchema, DFSchemaRef, Distinct,
5252
EmptyRelation, Expr, ExprRewritable, ExprRewriter, GroupingSet, Like, Limit, LogicalPlan,
53-
LogicalPlanBuilder, TableScan, Union,
53+
LogicalPlanBuilder, Repartition, Subquery, TableScan, Union,
5454
},
5555
physical_plan::planner::DefaultPhysicalPlanner,
5656
scalar::ScalarValue,
@@ -1350,10 +1350,18 @@ impl LanguageToLogicalPlanConverter {
13501350
LogicalPlanLanguage::Join(params) => {
13511351
let left_on = match_data_node!(node_by_id, params[2], JoinLeftOn);
13521352
let right_on = match_data_node!(node_by_id, params[3], JoinRightOn);
1353-
let left = self.to_logical_plan(params[0]);
1354-
let right = self.to_logical_plan(params[1]);
1355-
1356-
if self.is_cube_scan_node(params[0]) && self.is_cube_scan_node(params[1]) {
1353+
let left = self.to_logical_plan(params[0])?;
1354+
let right = self.to_logical_plan(params[1])?;
1355+
1356+
// It's OK to join two grouped queries: expected row count is not that high, so
1357+
// SQL API can, potentially, evaluate it completely
1358+
// We don't really want it, so cost function should make WrappedSelect preferable
1359+
// but still, we don't want to hard error on that
1360+
// But if any one of join sides is ungroued, SQL API does not have much of a choice
1361+
// but to process every row from ungrouped query, and that's Not Good
1362+
if Self::have_ungrouped_cube_scan_inside(&left)
1363+
|| Self::have_ungrouped_cube_scan_inside(&right)
1364+
{
13571365
if left_on.iter().any(|c| c.name == "__cubeJoinField")
13581366
|| right_on.iter().any(|c| c.name == "__cubeJoinField")
13591367
{
@@ -1370,8 +1378,8 @@ impl LanguageToLogicalPlanConverter {
13701378
}
13711379
}
13721380

1373-
let left = Arc::new(left?);
1374-
let right = Arc::new(right?);
1381+
let left = Arc::new(left);
1382+
let right = Arc::new(right);
13751383

13761384
let join_type = match_data_node!(node_by_id, params[4], JoinJoinType);
13771385
let join_constraint = match_data_node!(node_by_id, params[5], JoinJoinConstraint);
@@ -1394,7 +1402,18 @@ impl LanguageToLogicalPlanConverter {
13941402
})
13951403
}
13961404
LogicalPlanLanguage::CrossJoin(params) => {
1397-
if self.is_cube_scan_node(params[0]) && self.is_cube_scan_node(params[1]) {
1405+
let left = self.to_logical_plan(params[0])?;
1406+
let right = self.to_logical_plan(params[1])?;
1407+
1408+
// See comment in Join conversion
1409+
// Note that DF can generate Filter(CrossJoin(...)) for complex join conditions
1410+
// But, from memory or dataset perspective it's the same: DF would buffer left side completely
1411+
// And then iterate over right side, evaluting predicate
1412+
// Regular join would use hash partitioning here, so it would be quicker, and utilize less CPU,
1413+
// but transfer and buffering will be the same
1414+
if Self::have_ungrouped_cube_scan_inside(&left)
1415+
|| Self::have_ungrouped_cube_scan_inside(&right)
1416+
{
13981417
return Err(CubeError::internal(
13991418
"Can not join Cubes. This is most likely due to one of the following reasons:\n\
14001419
• one of the cubes contains a group by\n\
@@ -1403,8 +1422,8 @@ impl LanguageToLogicalPlanConverter {
14031422
));
14041423
}
14051424

1406-
let left = Arc::new(self.to_logical_plan(params[0])?);
1407-
let right = Arc::new(self.to_logical_plan(params[1])?);
1425+
let left = Arc::new(left);
1426+
let right = Arc::new(right);
14081427
let schema = Arc::new(left.schema().join(right.schema())?);
14091428

14101429
LogicalPlan::CrossJoin(CrossJoin {
@@ -2304,16 +2323,44 @@ impl LanguageToLogicalPlanConverter {
23042323
})
23052324
}
23062325

2307-
fn is_cube_scan_node(&self, node_id: Id) -> bool {
2308-
let node_by_id = &self.best_expr;
2309-
match node_by_id.index(node_id) {
2310-
LogicalPlanLanguage::CubeScan(_) | LogicalPlanLanguage::CubeScanWrapper(_) => {
2311-
return true
2326+
fn have_ungrouped_cube_scan_inside(node: &LogicalPlan) -> bool {
2327+
match node {
2328+
LogicalPlan::Projection(Projection { input, .. })
2329+
| LogicalPlan::Filter(Filter { input, .. })
2330+
| LogicalPlan::Window(Window { input, .. })
2331+
| LogicalPlan::Aggregate(Aggregate { input, .. })
2332+
| LogicalPlan::Sort(Sort { input, .. })
2333+
| LogicalPlan::Repartition(Repartition { input, .. })
2334+
| LogicalPlan::Limit(Limit { input, .. }) => {
2335+
Self::have_ungrouped_cube_scan_inside(input)
2336+
}
2337+
LogicalPlan::Join(Join { left, right, .. })
2338+
| LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => {
2339+
Self::have_ungrouped_cube_scan_inside(left)
2340+
|| Self::have_ungrouped_cube_scan_inside(right)
2341+
}
2342+
LogicalPlan::Union(Union { inputs, .. }) => {
2343+
inputs.iter().any(Self::have_ungrouped_cube_scan_inside)
2344+
}
2345+
LogicalPlan::Subquery(Subquery {
2346+
input, subqueries, ..
2347+
}) => {
2348+
Self::have_ungrouped_cube_scan_inside(input)
2349+
|| subqueries.iter().any(Self::have_ungrouped_cube_scan_inside)
2350+
}
2351+
LogicalPlan::Extension(Extension { node }) => {
2352+
if let Some(cube_scan) = node.as_any().downcast_ref::<CubeScanNode>() {
2353+
cube_scan.request.ungrouped == Some(true)
2354+
} else if let Some(cube_scan_wrapper) =
2355+
node.as_any().downcast_ref::<CubeScanWrapperNode>()
2356+
{
2357+
cube_scan_wrapper.has_ungrouped_scan()
2358+
} else {
2359+
false
2360+
}
23122361
}
2313-
_ => (),
2362+
_ => false,
23142363
}
2315-
2316-
return false;
23172364
}
23182365
}
23192366

rust/cubesql/cubesql/src/compile/test/test_cube_join.rs

Lines changed: 2 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -497,8 +497,8 @@ async fn test_join_cubes_on_wrong_field_error() {
497497
let query = convert_sql_to_cube_query(
498498
&r#"
499499
SELECT *
500-
FROM KibanaSampleDataEcommerce
501-
LEFT JOIN Logs ON (KibanaSampleDataEcommerce.has_subscription = Logs.read)
500+
FROM (SELECT customer_gender, has_subscription FROM KibanaSampleDataEcommerce) kibana
501+
LEFT JOIN (SELECT read, content FROM Logs) logs ON (kibana.has_subscription = logs.read)
502502
"#
503503
.to_string(),
504504
meta.clone(),
@@ -566,113 +566,3 @@ async fn test_join_cubes_with_aggr_error() {
566566
. Please check logs for additional information.".to_string()
567567
)
568568
}
569-
570-
#[tokio::test]
571-
async fn test_join_cubes_with_postprocessing() {
572-
if !Rewriter::sql_push_down_enabled() {
573-
return;
574-
}
575-
init_testing_logger();
576-
577-
let logical_plan = convert_select_to_query_plan(
578-
r#"
579-
SELECT *
580-
FROM (SELECT count(count), __cubeJoinField, extract(MONTH from order_date) FROM KibanaSampleDataEcommerce group by 2, 3) KibanaSampleDataEcommerce
581-
LEFT JOIN (SELECT read, __cubeJoinField FROM Logs) Logs ON (KibanaSampleDataEcommerce.__cubeJoinField = Logs.__cubeJoinField)
582-
"#
583-
.to_string(),
584-
DatabaseProtocol::PostgreSQL,
585-
)
586-
.await
587-
.as_logical_plan();
588-
589-
let cube_scans = logical_plan
590-
.find_cube_scans()
591-
.iter()
592-
.map(|cube| cube.request.clone())
593-
.collect::<Vec<V1LoadRequestQuery>>();
594-
595-
assert_eq!(
596-
cube_scans.contains(&V1LoadRequestQuery {
597-
measures: Some(vec!["KibanaSampleDataEcommerce.count".to_string()]),
598-
dimensions: Some(vec![]),
599-
segments: Some(vec![]),
600-
time_dimensions: Some(vec![V1LoadRequestQueryTimeDimension {
601-
dimension: "KibanaSampleDataEcommerce.order_date".to_string(),
602-
granularity: Some("month".to_string()),
603-
date_range: None,
604-
}]),
605-
order: Some(vec![]),
606-
..Default::default()
607-
}),
608-
true
609-
);
610-
611-
assert_eq!(
612-
cube_scans.contains(&V1LoadRequestQuery {
613-
measures: Some(vec![]),
614-
dimensions: Some(vec!["Logs.read".to_string()]),
615-
segments: Some(vec![]),
616-
order: Some(vec![]),
617-
ungrouped: Some(true),
618-
..Default::default()
619-
}),
620-
true
621-
)
622-
}
623-
624-
#[tokio::test]
625-
async fn test_join_cubes_with_postprocessing_and_no_cubejoinfield() {
626-
if !Rewriter::sql_push_down_enabled() {
627-
return;
628-
}
629-
init_testing_logger();
630-
631-
let logical_plan = convert_select_to_query_plan(
632-
r#"
633-
SELECT *
634-
FROM (SELECT count(count), extract(MONTH from order_date), taxful_total_price FROM KibanaSampleDataEcommerce group by 2, 3) KibanaSampleDataEcommerce
635-
LEFT JOIN (SELECT id, read FROM Logs) Logs ON (KibanaSampleDataEcommerce.taxful_total_price = Logs.id)
636-
"#
637-
.to_string(),
638-
DatabaseProtocol::PostgreSQL,
639-
)
640-
.await
641-
.as_logical_plan();
642-
643-
let cube_scans = logical_plan
644-
.find_cube_scans()
645-
.iter()
646-
.map(|cube| cube.request.clone())
647-
.collect::<Vec<V1LoadRequestQuery>>();
648-
649-
assert_eq!(
650-
cube_scans.contains(&V1LoadRequestQuery {
651-
measures: Some(vec!["KibanaSampleDataEcommerce.count".to_string()]),
652-
dimensions: Some(vec![
653-
"KibanaSampleDataEcommerce.taxful_total_price".to_string()
654-
]),
655-
segments: Some(vec![]),
656-
time_dimensions: Some(vec![V1LoadRequestQueryTimeDimension {
657-
dimension: "KibanaSampleDataEcommerce.order_date".to_string(),
658-
granularity: Some("month".to_string()),
659-
date_range: None,
660-
}]),
661-
order: Some(vec![]),
662-
..Default::default()
663-
}),
664-
true
665-
);
666-
667-
assert_eq!(
668-
cube_scans.contains(&V1LoadRequestQuery {
669-
measures: Some(vec![]),
670-
dimensions: Some(vec!["Logs.id".to_string(), "Logs.read".to_string(),]),
671-
segments: Some(vec![]),
672-
order: Some(vec![]),
673-
ungrouped: Some(true),
674-
..Default::default()
675-
}),
676-
true
677-
)
678-
}

0 commit comments

Comments
 (0)