Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,44 @@
}

impl CubeScanWrapperNode {
pub fn has_ungrouped_scan(&self) -> bool {
Self::has_ungrouped_wrapped_node(self.wrapped_plan.as_ref())
}

Check warning on line 558 in rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs#L556-L558

Added lines #L556 - L558 were not covered by tests

fn has_ungrouped_wrapped_node(node: &LogicalPlan) -> bool {
match node {
LogicalPlan::Extension(Extension { node }) => {
if let Some(cube_scan) = node.as_any().downcast_ref::<CubeScanNode>() {
cube_scan.request.ungrouped == Some(true)
} else if let Some(wrapped_select) =
node.as_any().downcast_ref::<WrappedSelectNode>()

Check warning on line 566 in rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs#L560-L566

Added lines #L560 - L566 were not covered by tests
{
// Don't really care if push-to-Cube or not, any aggregation should be ok here from execution perspective
if wrapped_select.select_type == WrappedSelectType::Aggregate {
false

Check warning on line 570 in rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs#L569-L570

Added lines #L569 - L570 were not covered by tests
} else {
Self::has_ungrouped_wrapped_node(wrapped_select.from.as_ref())
|| wrapped_select
.joins
.iter()
.map(|(join, _, _)| join.as_ref())
.any(Self::has_ungrouped_wrapped_node)
|| wrapped_select
.subqueries
.iter()
.map(|subq| subq.as_ref())
.any(Self::has_ungrouped_wrapped_node)

Check warning on line 582 in rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs#L572-L582

Added lines #L572 - L582 were not covered by tests
}
} else {
false

Check warning on line 585 in rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs#L585

Added line #L585 was not covered by tests
}
}
LogicalPlan::EmptyRelation(_) => false,

Check warning on line 588 in rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs#L588

Added line #L588 was not covered by tests
// Everything else is unexpected actually
_ => false,

Check warning on line 590 in rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs#L590

Added line #L590 was not covered by tests
}
}

Check warning on line 592 in rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs#L592

Added line #L592 was not covered by tests

pub async fn generate_sql(
&self,
transport: Arc<dyn TransportService>,
Expand Down
83 changes: 65 additions & 18 deletions rust/cubesql/cubesql/src/compile/rewrite/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
plan::{Aggregate, Extension, Filter, Join, Projection, Sort, TableUDFs, Window},
replace_col_to_expr, Column, CrossJoin, DFField, DFSchema, DFSchemaRef, Distinct,
EmptyRelation, Expr, ExprRewritable, ExprRewriter, GroupingSet, Like, Limit, LogicalPlan,
LogicalPlanBuilder, TableScan, Union,
LogicalPlanBuilder, Repartition, Subquery, TableScan, Union,
},
physical_plan::planner::DefaultPhysicalPlanner,
scalar::ScalarValue,
Expand Down Expand Up @@ -1350,10 +1350,18 @@
LogicalPlanLanguage::Join(params) => {
let left_on = match_data_node!(node_by_id, params[2], JoinLeftOn);
let right_on = match_data_node!(node_by_id, params[3], JoinRightOn);
let left = self.to_logical_plan(params[0]);
let right = self.to_logical_plan(params[1]);

if self.is_cube_scan_node(params[0]) && self.is_cube_scan_node(params[1]) {
let left = self.to_logical_plan(params[0])?;
let right = self.to_logical_plan(params[1])?;

// It's OK to join two grouped queries: expected row count is not that high, so
// SQL API can, potentially, evaluate it completely
// We don't really want it, so cost function should make WrappedSelect preferable
// but still, we don't want to hard error on that
// But if any one of join sides is ungroued, SQL API does not have much of a choice
// but to process every row from ungrouped query, and that's Not Good
if Self::have_ungrouped_cube_scan_inside(&left)
|| Self::have_ungrouped_cube_scan_inside(&right)
{
if left_on.iter().any(|c| c.name == "__cubeJoinField")
|| right_on.iter().any(|c| c.name == "__cubeJoinField")
{
Expand All @@ -1370,8 +1378,8 @@
}
}

let left = Arc::new(left?);
let right = Arc::new(right?);
let left = Arc::new(left);
let right = Arc::new(right);

let join_type = match_data_node!(node_by_id, params[4], JoinJoinType);
let join_constraint = match_data_node!(node_by_id, params[5], JoinJoinConstraint);
Expand All @@ -1394,7 +1402,18 @@
})
}
LogicalPlanLanguage::CrossJoin(params) => {
if self.is_cube_scan_node(params[0]) && self.is_cube_scan_node(params[1]) {
let left = self.to_logical_plan(params[0])?;
let right = self.to_logical_plan(params[1])?;

// See comment in Join conversion
// Note that DF can generate Filter(CrossJoin(...)) for complex join conditions
// But, from memory or dataset perspective it's the same: DF would buffer left side completely
// And then iterate over right side, evaluting predicate
// Regular join would use hash partitioning here, so it would be quicker, and utilize less CPU,
// but transfer and buffering will be the same
if Self::have_ungrouped_cube_scan_inside(&left)
|| Self::have_ungrouped_cube_scan_inside(&right)
{
return Err(CubeError::internal(
"Can not join Cubes. This is most likely due to one of the following reasons:\n\
• one of the cubes contains a group by\n\
Expand All @@ -1403,8 +1422,8 @@
));
}

let left = Arc::new(self.to_logical_plan(params[0])?);
let right = Arc::new(self.to_logical_plan(params[1])?);
let left = Arc::new(left);
let right = Arc::new(right);
let schema = Arc::new(left.schema().join(right.schema())?);

LogicalPlan::CrossJoin(CrossJoin {
Expand Down Expand Up @@ -2304,16 +2323,44 @@
})
}

fn is_cube_scan_node(&self, node_id: Id) -> bool {
let node_by_id = &self.best_expr;
match node_by_id.index(node_id) {
LogicalPlanLanguage::CubeScan(_) | LogicalPlanLanguage::CubeScanWrapper(_) => {
return true
fn have_ungrouped_cube_scan_inside(node: &LogicalPlan) -> bool {
match node {
LogicalPlan::Projection(Projection { input, .. })
| LogicalPlan::Filter(Filter { input, .. })
| LogicalPlan::Window(Window { input, .. })

Check warning on line 2330 in rust/cubesql/cubesql/src/compile/rewrite/converter.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/rewrite/converter.rs#L2330

Added line #L2330 was not covered by tests
| LogicalPlan::Aggregate(Aggregate { input, .. })
| LogicalPlan::Sort(Sort { input, .. })
| LogicalPlan::Repartition(Repartition { input, .. })
| LogicalPlan::Limit(Limit { input, .. }) => {

Check warning on line 2334 in rust/cubesql/cubesql/src/compile/rewrite/converter.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/rewrite/converter.rs#L2332-L2334

Added lines #L2332 - L2334 were not covered by tests
Self::have_ungrouped_cube_scan_inside(input)
}
LogicalPlan::Join(Join { left, right, .. })
| LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => {
Self::have_ungrouped_cube_scan_inside(left)
|| Self::have_ungrouped_cube_scan_inside(right)
}
LogicalPlan::Union(Union { inputs, .. }) => {
inputs.iter().any(Self::have_ungrouped_cube_scan_inside)
}
LogicalPlan::Subquery(Subquery {
input, subqueries, ..
}) => {
Self::have_ungrouped_cube_scan_inside(input)
|| subqueries.iter().any(Self::have_ungrouped_cube_scan_inside)

Check warning on line 2349 in rust/cubesql/cubesql/src/compile/rewrite/converter.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/rewrite/converter.rs#L2346-L2349

Added lines #L2346 - L2349 were not covered by tests
}
LogicalPlan::Extension(Extension { node }) => {
if let Some(cube_scan) = node.as_any().downcast_ref::<CubeScanNode>() {
cube_scan.request.ungrouped == Some(true)
} else if let Some(cube_scan_wrapper) =
node.as_any().downcast_ref::<CubeScanWrapperNode>()

Check warning on line 2355 in rust/cubesql/cubesql/src/compile/rewrite/converter.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/rewrite/converter.rs#L2354-L2355

Added lines #L2354 - L2355 were not covered by tests
{
cube_scan_wrapper.has_ungrouped_scan()

Check warning on line 2357 in rust/cubesql/cubesql/src/compile/rewrite/converter.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/rewrite/converter.rs#L2357

Added line #L2357 was not covered by tests
} else {
false

Check warning on line 2359 in rust/cubesql/cubesql/src/compile/rewrite/converter.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/rewrite/converter.rs#L2359

Added line #L2359 was not covered by tests
}
}
_ => (),
_ => false,
}

return false;
}
}

Expand Down
114 changes: 2 additions & 112 deletions rust/cubesql/cubesql/src/compile/test/test_cube_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,8 +497,8 @@ async fn test_join_cubes_on_wrong_field_error() {
let query = convert_sql_to_cube_query(
&r#"
SELECT *
FROM KibanaSampleDataEcommerce
LEFT JOIN Logs ON (KibanaSampleDataEcommerce.has_subscription = Logs.read)
FROM (SELECT customer_gender, has_subscription FROM KibanaSampleDataEcommerce) kibana
LEFT JOIN (SELECT read, content FROM Logs) logs ON (kibana.has_subscription = logs.read)
"#
.to_string(),
meta.clone(),
Expand Down Expand Up @@ -566,113 +566,3 @@ async fn test_join_cubes_with_aggr_error() {
. Please check logs for additional information.".to_string()
)
}

#[tokio::test]
async fn test_join_cubes_with_postprocessing() {
if !Rewriter::sql_push_down_enabled() {
return;
}
init_testing_logger();

let logical_plan = convert_select_to_query_plan(
r#"
SELECT *
FROM (SELECT count(count), __cubeJoinField, extract(MONTH from order_date) FROM KibanaSampleDataEcommerce group by 2, 3) KibanaSampleDataEcommerce
LEFT JOIN (SELECT read, __cubeJoinField FROM Logs) Logs ON (KibanaSampleDataEcommerce.__cubeJoinField = Logs.__cubeJoinField)
"#
.to_string(),
DatabaseProtocol::PostgreSQL,
)
.await
.as_logical_plan();

let cube_scans = logical_plan
.find_cube_scans()
.iter()
.map(|cube| cube.request.clone())
.collect::<Vec<V1LoadRequestQuery>>();

assert_eq!(
cube_scans.contains(&V1LoadRequestQuery {
measures: Some(vec!["KibanaSampleDataEcommerce.count".to_string()]),
dimensions: Some(vec![]),
segments: Some(vec![]),
time_dimensions: Some(vec![V1LoadRequestQueryTimeDimension {
dimension: "KibanaSampleDataEcommerce.order_date".to_string(),
granularity: Some("month".to_string()),
date_range: None,
}]),
order: Some(vec![]),
..Default::default()
}),
true
);

assert_eq!(
cube_scans.contains(&V1LoadRequestQuery {
measures: Some(vec![]),
dimensions: Some(vec!["Logs.read".to_string()]),
segments: Some(vec![]),
order: Some(vec![]),
ungrouped: Some(true),
..Default::default()
}),
true
)
}

#[tokio::test]
async fn test_join_cubes_with_postprocessing_and_no_cubejoinfield() {
if !Rewriter::sql_push_down_enabled() {
return;
}
init_testing_logger();

let logical_plan = convert_select_to_query_plan(
r#"
SELECT *
FROM (SELECT count(count), extract(MONTH from order_date), taxful_total_price FROM KibanaSampleDataEcommerce group by 2, 3) KibanaSampleDataEcommerce
LEFT JOIN (SELECT id, read FROM Logs) Logs ON (KibanaSampleDataEcommerce.taxful_total_price = Logs.id)
"#
.to_string(),
DatabaseProtocol::PostgreSQL,
)
.await
.as_logical_plan();

let cube_scans = logical_plan
.find_cube_scans()
.iter()
.map(|cube| cube.request.clone())
.collect::<Vec<V1LoadRequestQuery>>();

assert_eq!(
cube_scans.contains(&V1LoadRequestQuery {
measures: Some(vec!["KibanaSampleDataEcommerce.count".to_string()]),
dimensions: Some(vec![
"KibanaSampleDataEcommerce.taxful_total_price".to_string()
]),
segments: Some(vec![]),
time_dimensions: Some(vec![V1LoadRequestQueryTimeDimension {
dimension: "KibanaSampleDataEcommerce.order_date".to_string(),
granularity: Some("month".to_string()),
date_range: None,
}]),
order: Some(vec![]),
..Default::default()
}),
true
);

assert_eq!(
cube_scans.contains(&V1LoadRequestQuery {
measures: Some(vec![]),
dimensions: Some(vec!["Logs.id".to_string(), "Logs.read".to_string(),]),
segments: Some(vec![]),
order: Some(vec![]),
ungrouped: Some(true),
..Default::default()
}),
true
)
}
Loading