Skip to content

Commit 23dc3be

Browse files
committed
[WIP] fix(cubesql): Make cube join check stricter
Now it should disallow any plans with ungrouped CubeScan inside, like Join(CubeScan, Projection(CubeScan(ungrouped=true)))
1 parent 84f90c0 commit 23dc3be

File tree

3 files changed

+110
-19
lines changed

3 files changed

+110
-19
lines changed

rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -556,6 +556,44 @@ macro_rules! generate_sql_for_timestamp {
556556
}
557557

558558
impl CubeScanWrapperNode {
559+
pub fn has_ungrouped_scan(&self) -> bool {
560+
Self::has_ungrouped_wrapped_node(self.wrapped_plan.as_ref())
561+
}
562+
563+
fn has_ungrouped_wrapped_node(node: &LogicalPlan) -> bool {
564+
match node {
565+
LogicalPlan::Extension(Extension { node }) => {
566+
if let Some(cube_scan) = node.as_any().downcast_ref::<CubeScanNode>() {
567+
cube_scan.request.ungrouped == Some(true)
568+
} else if let Some(wrapped_select) =
569+
node.as_any().downcast_ref::<WrappedSelectNode>()
570+
{
571+
// Don't really care if push-to-Cube or not, any aggregation should be ok here from execution perspective
572+
if wrapped_select.select_type == WrappedSelectType::Aggregate {
573+
false
574+
} else {
575+
Self::has_ungrouped_wrapped_node(wrapped_select.from.as_ref())
576+
|| wrapped_select
577+
.joins
578+
.iter()
579+
.map(|(join, _, _)| join.as_ref())
580+
.any(Self::has_ungrouped_wrapped_node)
581+
|| wrapped_select
582+
.subqueries
583+
.iter()
584+
.map(|subq| subq.as_ref())
585+
.any(Self::has_ungrouped_wrapped_node)
586+
}
587+
} else {
588+
false
589+
}
590+
}
591+
LogicalPlan::EmptyRelation(_) => false,
592+
// Everything else is unexpected actually
593+
_ => false,
594+
}
595+
}
596+
559597
pub async fn generate_sql(
560598
&self,
561599
transport: Arc<dyn TransportService>,

rust/cubesql/cubesql/src/compile/rewrite/converter.rs

Lines changed: 65 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ use datafusion::{
4545
catalog::TableReference,
4646
error::DataFusionError,
4747
logical_plan::{
48+
Repartition, Subquery,
4849
build_join_schema, build_table_udf_schema, exprlist_to_fields,
4950
exprlist_to_fields_from_schema, normalize_cols,
5051
plan::{Aggregate, Extension, Filter, Join, Projection, Sort, TableUDFs, Window},
@@ -1351,10 +1352,18 @@ impl LanguageToLogicalPlanConverter {
13511352
LogicalPlanLanguage::Join(params) => {
13521353
let left_on = match_data_node!(node_by_id, params[2], JoinLeftOn);
13531354
let right_on = match_data_node!(node_by_id, params[3], JoinRightOn);
1354-
let left = self.to_logical_plan(params[0]);
1355-
let right = self.to_logical_plan(params[1]);
1356-
1357-
if self.is_cube_scan_node(params[0]) && self.is_cube_scan_node(params[1]) {
1355+
let left = self.to_logical_plan(params[0])?;
1356+
let right = self.to_logical_plan(params[1])?;
1357+
1358+
// It's OK to join two grouped queries: expected row count is not that high, so
1359+
// SQL API can, potentially, evaluate it completely
1360+
// We don't really want it, so cost function should make WrappedSelect preferable
1361+
// but still, we don't want to hard error on that
1362+
// But if any one of join sides is ungroued, SQL API does not have much of a choice
1363+
// but to process every row from ungrouped query, and that's Not Good
1364+
if Self::have_ungrouped_cube_scan_inside(&left)
1365+
|| Self::have_ungrouped_cube_scan_inside(&right)
1366+
{
13581367
if left_on.iter().any(|c| c.name == "__cubeJoinField")
13591368
|| right_on.iter().any(|c| c.name == "__cubeJoinField")
13601369
{
@@ -1371,8 +1380,8 @@ impl LanguageToLogicalPlanConverter {
13711380
}
13721381
}
13731382

1374-
let left = Arc::new(left?);
1375-
let right = Arc::new(right?);
1383+
let left = Arc::new(left);
1384+
let right = Arc::new(right);
13761385

13771386
let join_type = match_data_node!(node_by_id, params[4], JoinJoinType);
13781387
let join_constraint = match_data_node!(node_by_id, params[5], JoinJoinConstraint);
@@ -1395,7 +1404,18 @@ impl LanguageToLogicalPlanConverter {
13951404
})
13961405
}
13971406
LogicalPlanLanguage::CrossJoin(params) => {
1398-
if self.is_cube_scan_node(params[0]) && self.is_cube_scan_node(params[1]) {
1407+
let left = self.to_logical_plan(params[0])?;
1408+
let right = self.to_logical_plan(params[1])?;
1409+
1410+
// See comment in Join conversion
1411+
// Note that DF can generate Filter(CrossJoin(...)) for complex join conditions
1412+
// But, from memory or dataset perspective it's the same: DF would buffer left side completely
1413+
// And then iterate over right side, evaluting predicate
1414+
// Regular join would use hash partitioning here, so it would be quicker, and utilize less CPU,
1415+
// but transfer and buffering will be the same
1416+
if Self::have_ungrouped_cube_scan_inside(&left)
1417+
|| Self::have_ungrouped_cube_scan_inside(&right)
1418+
{
13991419
return Err(CubeError::internal(
14001420
"Can not join Cubes. This is most likely due to one of the following reasons:\n\
14011421
• one of the cubes contains a group by\n\
@@ -1404,8 +1424,8 @@ impl LanguageToLogicalPlanConverter {
14041424
));
14051425
}
14061426

1407-
let left = Arc::new(self.to_logical_plan(params[0])?);
1408-
let right = Arc::new(self.to_logical_plan(params[1])?);
1427+
let left = Arc::new(left);
1428+
let right = Arc::new(right);
14091429
let schema = Arc::new(left.schema().join(right.schema())?);
14101430

14111431
LogicalPlan::CrossJoin(CrossJoin {
@@ -2305,16 +2325,44 @@ impl LanguageToLogicalPlanConverter {
23052325
})
23062326
}
23072327

2308-
fn is_cube_scan_node(&self, node_id: Id) -> bool {
2309-
let node_by_id = &self.best_expr;
2310-
match node_by_id.index(node_id) {
2311-
LogicalPlanLanguage::CubeScan(_) | LogicalPlanLanguage::CubeScanWrapper(_) => {
2312-
return true
2328+
fn have_ungrouped_cube_scan_inside(node: &LogicalPlan) -> bool {
2329+
match node {
2330+
LogicalPlan::Projection(Projection { input, .. })
2331+
| LogicalPlan::Filter(Filter { input, .. })
2332+
| LogicalPlan::Window(Window { input, .. })
2333+
| LogicalPlan::Aggregate(Aggregate { input, .. })
2334+
| LogicalPlan::Sort(Sort { input, .. })
2335+
| LogicalPlan::Repartition(Repartition { input, .. })
2336+
| LogicalPlan::Limit(Limit { input, .. }) => {
2337+
Self::have_ungrouped_cube_scan_inside(input)
2338+
}
2339+
LogicalPlan::Join(Join { left, right, .. })
2340+
| LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => {
2341+
Self::have_ungrouped_cube_scan_inside(left)
2342+
|| Self::have_ungrouped_cube_scan_inside(right)
2343+
}
2344+
LogicalPlan::Union(Union { inputs, .. }) => {
2345+
inputs.iter().any(Self::have_ungrouped_cube_scan_inside)
2346+
}
2347+
LogicalPlan::Subquery(Subquery {
2348+
input, subqueries, ..
2349+
}) => {
2350+
Self::have_ungrouped_cube_scan_inside(input)
2351+
|| subqueries.iter().any(Self::have_ungrouped_cube_scan_inside)
2352+
}
2353+
LogicalPlan::Extension(Extension { node }) => {
2354+
if let Some(cube_scan) = node.as_any().downcast_ref::<CubeScanNode>() {
2355+
cube_scan.request.ungrouped == Some(true)
2356+
} else if let Some(cube_scan_wrapper) =
2357+
node.as_any().downcast_ref::<CubeScanWrapperNode>()
2358+
{
2359+
cube_scan_wrapper.has_ungrouped_scan()
2360+
} else {
2361+
false
2362+
}
23132363
}
2314-
_ => (),
2364+
_ => false,
23152365
}
2316-
2317-
return false;
23182366
}
23192367
}
23202368

rust/cubesql/cubesql/src/compile/test/test_cube_join.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use cubeclient::models::{
22
V1LoadRequestQuery, V1LoadRequestQueryFilterItem, V1LoadRequestQueryTimeDimension,
33
};
4+
use datafusion::physical_plan::displayable;
45
use pretty_assertions::assert_eq;
56
use serde_json::json;
67

@@ -497,8 +498,8 @@ async fn test_join_cubes_on_wrong_field_error() {
497498
let query = convert_sql_to_cube_query(
498499
&r#"
499500
SELECT *
500-
FROM KibanaSampleDataEcommerce
501-
LEFT JOIN Logs ON (KibanaSampleDataEcommerce.has_subscription = Logs.read)
501+
FROM (SELECT customer_gender, has_subscription FROM KibanaSampleDataEcommerce) kibana
502+
LEFT JOIN (SELECT read, content FROM Logs) logs ON (kibana.has_subscription = logs.read)
502503
"#
503504
.to_string(),
504505
meta.clone(),
@@ -567,7 +568,9 @@ async fn test_join_cubes_with_aggr_error() {
567568
)
568569
}
569570

571+
// TODO it seems this query should not execute: it has join of grouped CubeScan with ungrouped CubeScan by __cubeJoinField
570572
#[tokio::test]
573+
#[ignore]
571574
async fn test_join_cubes_with_postprocessing() {
572575
if !Rewriter::sql_push_down_enabled() {
573576
return;
@@ -621,7 +624,9 @@ async fn test_join_cubes_with_postprocessing() {
621624
)
622625
}
623626

627+
// TODO it seems this test is not necessary:: this case is covered by ungrouped-grouped join, and we explicitly forbid executing joins with ungrouped scans in DF
624628
#[tokio::test]
629+
#[ignore]
625630
async fn test_join_cubes_with_postprocessing_and_no_cubejoinfield() {
626631
if !Rewriter::sql_push_down_enabled() {
627632
return;

0 commit comments

Comments
 (0)