Skip to content

Commit 96574fb

Browse files
committed
[WIP] fix(cubesql): Make cube join check stricter
Now it should disallow any plans with ungrouped CubeScan inside, like Join(CubeScan, Projection(CubeScan(ungrouped=true)))
1 parent 84f90c0 commit 96574fb

File tree

3 files changed

+109
-20
lines changed

3 files changed

+109
-20
lines changed

rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -556,6 +556,44 @@ macro_rules! generate_sql_for_timestamp {
556556
}
557557

558558
impl CubeScanWrapperNode {
559+
pub fn has_ungrouped_scan(&self) -> bool {
560+
Self::has_ungrouped_wrapped_node(self.wrapped_plan.as_ref())
561+
}
562+
563+
fn has_ungrouped_wrapped_node(node: &LogicalPlan) -> bool {
564+
match node {
565+
LogicalPlan::Extension(Extension { node }) => {
566+
if let Some(cube_scan) = node.as_any().downcast_ref::<CubeScanNode>() {
567+
cube_scan.request.ungrouped == Some(true)
568+
} else if let Some(wrapped_select) =
569+
node.as_any().downcast_ref::<WrappedSelectNode>()
570+
{
571+
// Don't really care if push-to-Cube or not, any aggregation should be ok here from execution perspective
572+
if wrapped_select.select_type == WrappedSelectType::Aggregate {
573+
false
574+
} else {
575+
Self::has_ungrouped_wrapped_node(wrapped_select.from.as_ref())
576+
|| wrapped_select
577+
.joins
578+
.iter()
579+
.map(|(join, _, _)| join.as_ref())
580+
.any(Self::has_ungrouped_wrapped_node)
581+
|| wrapped_select
582+
.subqueries
583+
.iter()
584+
.map(|subq| subq.as_ref())
585+
.any(Self::has_ungrouped_wrapped_node)
586+
}
587+
} else {
588+
false
589+
}
590+
}
591+
LogicalPlan::EmptyRelation(_) => false,
592+
// Everything else is unexpected actually
593+
_ => false,
594+
}
595+
}
596+
559597
pub async fn generate_sql(
560598
&self,
561599
transport: Arc<dyn TransportService>,

rust/cubesql/cubesql/src/compile/rewrite/converter.rs

Lines changed: 65 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ use datafusion::{
5050
plan::{Aggregate, Extension, Filter, Join, Projection, Sort, TableUDFs, Window},
5151
replace_col_to_expr, Column, CrossJoin, DFField, DFSchema, DFSchemaRef, Distinct,
5252
EmptyRelation, Expr, ExprRewritable, ExprRewriter, GroupingSet, Like, Limit, LogicalPlan,
53-
LogicalPlanBuilder, TableScan, Union,
53+
LogicalPlanBuilder, Repartition, Subquery, TableScan, Union,
5454
},
5555
physical_plan::planner::DefaultPhysicalPlanner,
5656
scalar::ScalarValue,
@@ -1351,10 +1351,18 @@ impl LanguageToLogicalPlanConverter {
13511351
LogicalPlanLanguage::Join(params) => {
13521352
let left_on = match_data_node!(node_by_id, params[2], JoinLeftOn);
13531353
let right_on = match_data_node!(node_by_id, params[3], JoinRightOn);
1354-
let left = self.to_logical_plan(params[0]);
1355-
let right = self.to_logical_plan(params[1]);
1356-
1357-
if self.is_cube_scan_node(params[0]) && self.is_cube_scan_node(params[1]) {
1354+
let left = self.to_logical_plan(params[0])?;
1355+
let right = self.to_logical_plan(params[1])?;
1356+
1357+
// It's OK to join two grouped queries: expected row count is not that high, so
1358+
// SQL API can, potentially, evaluate it completely
1359+
// We don't really want it, so cost function should make WrappedSelect preferable
1360+
// but still, we don't want to hard error on that
1361+
// But if any one of join sides is ungroued, SQL API does not have much of a choice
1362+
// but to process every row from ungrouped query, and that's Not Good
1363+
if Self::have_ungrouped_cube_scan_inside(&left)
1364+
|| Self::have_ungrouped_cube_scan_inside(&right)
1365+
{
13581366
if left_on.iter().any(|c| c.name == "__cubeJoinField")
13591367
|| right_on.iter().any(|c| c.name == "__cubeJoinField")
13601368
{
@@ -1371,8 +1379,8 @@ impl LanguageToLogicalPlanConverter {
13711379
}
13721380
}
13731381

1374-
let left = Arc::new(left?);
1375-
let right = Arc::new(right?);
1382+
let left = Arc::new(left);
1383+
let right = Arc::new(right);
13761384

13771385
let join_type = match_data_node!(node_by_id, params[4], JoinJoinType);
13781386
let join_constraint = match_data_node!(node_by_id, params[5], JoinJoinConstraint);
@@ -1395,7 +1403,18 @@ impl LanguageToLogicalPlanConverter {
13951403
})
13961404
}
13971405
LogicalPlanLanguage::CrossJoin(params) => {
1398-
if self.is_cube_scan_node(params[0]) && self.is_cube_scan_node(params[1]) {
1406+
let left = self.to_logical_plan(params[0])?;
1407+
let right = self.to_logical_plan(params[1])?;
1408+
1409+
// See comment in Join conversion
1410+
// Note that DF can generate Filter(CrossJoin(...)) for complex join conditions
1411+
// But, from memory or dataset perspective it's the same: DF would buffer left side completely
1412+
// And then iterate over right side, evaluting predicate
1413+
// Regular join would use hash partitioning here, so it would be quicker, and utilize less CPU,
1414+
// but transfer and buffering will be the same
1415+
if Self::have_ungrouped_cube_scan_inside(&left)
1416+
|| Self::have_ungrouped_cube_scan_inside(&right)
1417+
{
13991418
return Err(CubeError::internal(
14001419
"Can not join Cubes. This is most likely due to one of the following reasons:\n\
14011420
• one of the cubes contains a group by\n\
@@ -1404,8 +1423,8 @@ impl LanguageToLogicalPlanConverter {
14041423
));
14051424
}
14061425

1407-
let left = Arc::new(self.to_logical_plan(params[0])?);
1408-
let right = Arc::new(self.to_logical_plan(params[1])?);
1426+
let left = Arc::new(left);
1427+
let right = Arc::new(right);
14091428
let schema = Arc::new(left.schema().join(right.schema())?);
14101429

14111430
LogicalPlan::CrossJoin(CrossJoin {
@@ -2305,16 +2324,44 @@ impl LanguageToLogicalPlanConverter {
23052324
})
23062325
}
23072326

2308-
fn is_cube_scan_node(&self, node_id: Id) -> bool {
2309-
let node_by_id = &self.best_expr;
2310-
match node_by_id.index(node_id) {
2311-
LogicalPlanLanguage::CubeScan(_) | LogicalPlanLanguage::CubeScanWrapper(_) => {
2312-
return true
2327+
fn have_ungrouped_cube_scan_inside(node: &LogicalPlan) -> bool {
2328+
match node {
2329+
LogicalPlan::Projection(Projection { input, .. })
2330+
| LogicalPlan::Filter(Filter { input, .. })
2331+
| LogicalPlan::Window(Window { input, .. })
2332+
| LogicalPlan::Aggregate(Aggregate { input, .. })
2333+
| LogicalPlan::Sort(Sort { input, .. })
2334+
| LogicalPlan::Repartition(Repartition { input, .. })
2335+
| LogicalPlan::Limit(Limit { input, .. }) => {
2336+
Self::have_ungrouped_cube_scan_inside(input)
2337+
}
2338+
LogicalPlan::Join(Join { left, right, .. })
2339+
| LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => {
2340+
Self::have_ungrouped_cube_scan_inside(left)
2341+
|| Self::have_ungrouped_cube_scan_inside(right)
2342+
}
2343+
LogicalPlan::Union(Union { inputs, .. }) => {
2344+
inputs.iter().any(Self::have_ungrouped_cube_scan_inside)
2345+
}
2346+
LogicalPlan::Subquery(Subquery {
2347+
input, subqueries, ..
2348+
}) => {
2349+
Self::have_ungrouped_cube_scan_inside(input)
2350+
|| subqueries.iter().any(Self::have_ungrouped_cube_scan_inside)
2351+
}
2352+
LogicalPlan::Extension(Extension { node }) => {
2353+
if let Some(cube_scan) = node.as_any().downcast_ref::<CubeScanNode>() {
2354+
cube_scan.request.ungrouped == Some(true)
2355+
} else if let Some(cube_scan_wrapper) =
2356+
node.as_any().downcast_ref::<CubeScanWrapperNode>()
2357+
{
2358+
cube_scan_wrapper.has_ungrouped_scan()
2359+
} else {
2360+
false
2361+
}
23132362
}
2314-
_ => (),
2363+
_ => false,
23152364
}
2316-
2317-
return false;
23182365
}
23192366
}
23202367

rust/cubesql/cubesql/src/compile/test/test_cube_join.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -497,8 +497,8 @@ async fn test_join_cubes_on_wrong_field_error() {
497497
let query = convert_sql_to_cube_query(
498498
&r#"
499499
SELECT *
500-
FROM KibanaSampleDataEcommerce
501-
LEFT JOIN Logs ON (KibanaSampleDataEcommerce.has_subscription = Logs.read)
500+
FROM (SELECT customer_gender, has_subscription FROM KibanaSampleDataEcommerce) kibana
501+
LEFT JOIN (SELECT read, content FROM Logs) logs ON (kibana.has_subscription = logs.read)
502502
"#
503503
.to_string(),
504504
meta.clone(),
@@ -567,7 +567,9 @@ async fn test_join_cubes_with_aggr_error() {
567567
)
568568
}
569569

570+
// TODO it seems this query should not execute: it has join of grouped CubeScan with ungrouped CubeScan by __cubeJoinField
570571
#[tokio::test]
572+
#[ignore]
571573
async fn test_join_cubes_with_postprocessing() {
572574
if !Rewriter::sql_push_down_enabled() {
573575
return;
@@ -621,7 +623,9 @@ async fn test_join_cubes_with_postprocessing() {
621623
)
622624
}
623625

626+
// TODO it seems this test is not necessary:: this case is covered by ungrouped-grouped join, and we explicitly forbid executing joins with ungrouped scans in DF
624627
#[tokio::test]
628+
#[ignore]
625629
async fn test_join_cubes_with_postprocessing_and_no_cubejoinfield() {
626630
if !Rewriter::sql_push_down_enabled() {
627631
return;

0 commit comments

Comments
 (0)