Skip to content

Commit bbb830f

Browse files
committed
feat(cubesql): Handle grouped subqueries in SQL generation for push to Cube
1 parent efdd413 commit bbb830f

File tree

3 files changed

+234
-19
lines changed

3 files changed

+234
-19
lines changed

rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs

Lines changed: 224 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@ use crate::{
1919
CubeError,
2020
};
2121
use chrono::{Days, NaiveDate, SecondsFormat, TimeZone, Utc};
22-
use cubeclient::models::V1LoadRequestQuery;
22+
use cubeclient::models::{V1LoadRequestQuery, V1LoadRequestQueryJoinSubquery};
2323
use datafusion::{
2424
error::{DataFusionError, Result},
2525
logical_plan::{
26-
plan::Extension, replace_col, Column, DFSchema, DFSchemaRef, Expr, GroupingSet,
26+
plan::Extension, replace_col, Column, DFSchema, DFSchemaRef, Expr, GroupingSet, JoinType,
2727
LogicalPlan, UserDefinedLogicalNode,
2828
},
2929
physical_plan::{aggregates::AggregateFunction, functions::BuiltinScalarFunction},
@@ -35,7 +35,7 @@ use serde::{Deserialize, Serialize};
3535
use std::{
3636
any::Any,
3737
cmp::min,
38-
collections::HashMap,
38+
collections::{HashMap, HashSet},
3939
convert::TryInto,
4040
fmt,
4141
future::Future,
@@ -45,8 +45,18 @@ use std::{
4545
sync::{Arc, LazyLock},
4646
};
4747

48+
pub struct JoinSubquery {
49+
alias: String,
50+
sql: String,
51+
condition: Expr,
52+
join_type: JoinType,
53+
}
54+
4855
pub struct PushToCubeContext<'l> {
4956
ungrouped_scan_node: &'l CubeScanNode,
57+
// Known join subquery qualifiers, to generate proper column expressions
58+
known_join_subqueries: HashSet<String>,
59+
join_subqueries: Vec<JoinSubquery>,
5060
}
5161

5262
#[derive(Debug, Clone, Deserialize)]
@@ -560,7 +570,7 @@ impl CubeScanWrapperNode {
560570
aggr_expr,
561571
window_expr,
562572
from,
563-
joins: _joins,
573+
joins,
564574
filter_expr,
565575
having_expr: _having_expr,
566576
limit,
@@ -571,8 +581,8 @@ impl CubeScanWrapperNode {
571581
push_to_cube,
572582
}) = wrapped_select_node
573583
{
574-
// TODO support joins
575-
let push_to_cube_context = if push_to_cube {
584+
// TODO support ungrouped joins
585+
let ungrouped_scan_node = if push_to_cube {
576586
if let LogicalPlan::Extension(Extension { node }) = from.as_ref() {
577587
if let Some(cube_scan_node) =
578588
node.as_any().downcast_ref::<CubeScanNode>()
@@ -582,9 +592,7 @@ impl CubeScanWrapperNode {
582592
"Expected ungrouped CubeScan node but found: {cube_scan_node:?}"
583593
)));
584594
}
585-
Some(PushToCubeContext {
586-
ungrouped_scan_node: cube_scan_node,
587-
})
595+
Some(cube_scan_node)
588596
} else {
589597
return Err(CubeError::internal(format!(
590598
"Expected CubeScan node but found: {:?}",
@@ -601,16 +609,13 @@ impl CubeScanWrapperNode {
601609
None
602610
};
603611

604-
let push_to_cube_context = push_to_cube_context.as_ref();
605-
606612
let SqlGenerationResult {
607613
data_source,
608614
from_alias,
609-
column_remapping,
615+
mut column_remapping,
610616
mut sql,
611617
request,
612-
} = if let Some(push_to_cube_context) = push_to_cube_context {
613-
let ungrouped_scan_node = push_to_cube_context.ungrouped_scan_node;
618+
} = if let Some(ungrouped_scan_node) = &ungrouped_scan_node {
614619
let data_sources = ungrouped_scan_node
615620
.used_cubes
616621
.iter()
@@ -676,12 +681,123 @@ impl CubeScanWrapperNode {
676681

677682
let (sql_string, new_values) = subquery_sql.unpack();
678683
sql.extend_values(&new_values);
684+
// TODO why only field 0 is a key?
679685
let field = subquery.schema().field(0);
680686
subqueries_sql.insert(field.qualified_name(), sql_string);
681687
}
682688
let subqueries_sql = Arc::new(subqueries_sql);
683689
let mut next_remapping = HashMap::new();
684690
let alias = alias.or(from_alias.clone());
691+
692+
let push_to_cube_context = if let Some(ungrouped_scan_node) =
693+
ungrouped_scan_node
694+
{
695+
let mut join_subqueries = vec![];
696+
let mut known_join_subqueries = HashSet::new();
697+
for (lp, cond, join_type) in joins {
698+
match lp.as_ref() {
699+
LogicalPlan::Extension(Extension { node }) => {
700+
if let Some(join_cube_scan) =
701+
node.as_any().downcast_ref::<CubeScanNode>()
702+
{
703+
if join_cube_scan.request.ungrouped == Some(true) {
704+
return Err(CubeError::internal(format!(
705+
"Unsupported ungrouped CubeScan as join subquery: {join_cube_scan:?}"
706+
)));
707+
}
708+
} else {
709+
// TODO support more grouped cases here
710+
return Err(CubeError::internal(format!(
711+
"Unsupported unknown extension as join subquery: {node:?}"
712+
)));
713+
}
714+
}
715+
_ => {
716+
// TODO support more grouped cases here
717+
return Err(CubeError::internal(format!(
718+
"Unsupported logical plan node as join subquery: {lp:?}"
719+
)));
720+
}
721+
}
722+
723+
match join_type {
724+
JoinType::Inner | JoinType::Left => {
725+
// Do nothing
726+
}
727+
_ => {
728+
return Err(CubeError::internal(format!(
729+
"Unsupported join type for join subquery: {join_type:?}"
730+
)));
731+
}
732+
}
733+
734+
// TODO avoid using direct alias from schema, implement remapping for qualifiers instead
735+
let alias = lp
736+
.schema()
737+
.fields()
738+
.iter()
739+
.filter_map(|f| f.qualifier())
740+
.next()
741+
.ok_or_else(|| {
742+
CubeError::internal(format!(
743+
"Alias not found for join subquery {lp:?}"
744+
))
745+
})?;
746+
747+
let subq_sql = Self::generate_sql_for_node(
748+
plan.clone(),
749+
transport.clone(),
750+
load_request_meta.clone(),
751+
lp.clone(),
752+
true,
753+
sql.values.clone(),
754+
data_source.clone(),
755+
)
756+
.await?;
757+
let (subq_sql_string, new_values) = subq_sql.sql.unpack();
758+
sql.extend_values(&new_values);
759+
let subq_alias = subq_sql.from_alias;
760+
// Expect that subq_sql.column_remapping already incorporates subq_alias/
761+
// TODO does it?
762+
763+
// TODO expect returned from_alias to be fine, but still need to remap it from original alias somewhere in generate_sql_for_node
764+
765+
// grouped join subquery can have its columns remapped, and expressions current node can reference original columns
766+
column_remapping = {
767+
match (column_remapping, subq_sql.column_remapping) {
768+
(None, None) => None,
769+
(None, Some(remapping)) | (Some(remapping), None) => {
770+
Some(remapping)
771+
}
772+
(Some(mut left), Some(right)) => {
773+
left.extend(right);
774+
Some(left)
775+
}
776+
}
777+
};
778+
779+
join_subqueries.push(JoinSubquery {
780+
// TODO what alias to actually use here? two more-or-less valid options: returned from generate_sql_for_node ot realiased from `alias`. Plain `alias` is incorrect here
781+
alias: subq_alias.unwrap_or_else(|| alias.clone()),
782+
sql: subq_sql_string,
783+
condition: cond.clone(),
784+
join_type: join_type.clone(),
785+
});
786+
known_join_subqueries.insert(alias.clone());
787+
}
788+
789+
Some(PushToCubeContext {
790+
ungrouped_scan_node,
791+
join_subqueries,
792+
known_join_subqueries,
793+
})
794+
} else {
795+
None
796+
};
797+
// Drop mut
798+
let column_remapping = column_remapping;
799+
// Turn to ref
800+
let push_to_cube_context = push_to_cube_context.as_ref();
685801
if let Some(data_source) = data_source {
686802
let generator = plan
687803
.meta
@@ -783,8 +899,76 @@ impl CubeScanWrapperNode {
783899
subqueries_sql.clone(),
784900
)
785901
.await?;
786-
if let Some(push_to_cube_context) = push_to_cube_context {
787-
let ungrouped_scan_node = push_to_cube_context.ungrouped_scan_node;
902+
if let Some(PushToCubeContext {
903+
ungrouped_scan_node,
904+
join_subqueries,
905+
known_join_subqueries: _,
906+
}) = push_to_cube_context
907+
{
908+
let mut prepared_join_subqueries = vec![];
909+
for JoinSubquery {
910+
alias: subq_alias,
911+
sql: subq_sql,
912+
condition,
913+
join_type,
914+
} in join_subqueries
915+
{
916+
// Need to call generate_column_expr to apply column_remapping
917+
let (join_condition, new_sql) = Self::generate_column_expr(
918+
plan.clone(),
919+
schema.clone(),
920+
[condition.clone()],
921+
sql,
922+
generator.clone(),
923+
&column_remapping,
924+
&mut next_remapping,
925+
// from_alias for join condition does not make much sense
926+
None,
927+
true,
928+
push_to_cube_context,
929+
subqueries_sql.clone(),
930+
)
931+
.await?;
932+
let join_condition = join_condition[0].expr.clone();
933+
sql = new_sql;
934+
935+
let join_sql_expression = {
936+
// TODO this is NOT a proper way to generate member expr here
937+
// TODO Do we even want a full-blown member expression here? or arguments + expr will be enough?
938+
let res = Self::make_member_def(
939+
&AliasedColumn {
940+
expr: join_condition,
941+
alias: "__join__alias__unused".to_string(),
942+
},
943+
&ungrouped_scan_node.used_cubes,
944+
)?;
945+
serde_json::json!(res).to_string()
946+
};
947+
948+
let join_type = match join_type {
949+
JoinType::Left => generator
950+
.get_sql_templates()
951+
.left_join()?,
952+
JoinType::Inner => generator
953+
.get_sql_templates()
954+
.inner_join()?,
955+
_ => {
956+
return Err(CubeError::internal(format!(
957+
"Unsupported join type for join subquery: {join_type:?}"
958+
)))
959+
}
960+
};
961+
962+
// for simple ungrouped-grouped joins everything should already be present in from
963+
// so we can just attach this join to the end, no need to look for a proper spot
964+
prepared_join_subqueries.push(V1LoadRequestQueryJoinSubquery {
965+
sql: subq_sql.clone(),
966+
on: join_sql_expression,
967+
join_type,
968+
alias: subq_alias.clone(),
969+
});
970+
}
971+
788972
let load_request = &ungrouped_scan_node.request;
789973

790974
let load_request = V1LoadRequestQuery {
@@ -917,7 +1101,8 @@ impl CubeScanWrapperNode {
9171101
filters: ungrouped_scan_node.request.filters.clone(),
9181102

9191103
time_dimensions: load_request.time_dimensions.clone(),
920-
subquery_joins: None,
1104+
subquery_joins: (!prepared_join_subqueries.is_empty())
1105+
.then_some(prepared_join_subqueries),
9211106
};
9221107

9231108
// TODO time dimensions, filters, segments
@@ -1029,7 +1214,7 @@ impl CubeScanWrapperNode {
10291214
async fn generate_column_expr(
10301215
plan: Arc<Self>,
10311216
schema: DFSchemaRef,
1032-
exprs: Vec<Expr>,
1217+
exprs: impl IntoIterator<Item = Expr>,
10331218
mut sql: SqlQuery,
10341219
generator: Arc<dyn SqlGenerator>,
10351220
column_remapping: &Option<HashMap<Column, Column>>,
@@ -1239,7 +1424,7 @@ impl CubeScanWrapperNode {
12391424
Ok((expr, sql_query))
12401425
}
12411426
// Expr::OuterColumn(_, _) => {}
1242-
Expr::Column(c) => {
1427+
Expr::Column(ref c) => {
12431428
if let Some(subquery) = subqueries.get(&c.flat_name()) {
12441429
Ok((
12451430
sql_generator
@@ -1255,8 +1440,28 @@ impl CubeScanWrapperNode {
12551440
))
12561441
} else if let Some(PushToCubeContext {
12571442
ungrouped_scan_node,
1443+
join_subqueries: _,
1444+
known_join_subqueries,
12581445
}) = push_to_cube_context
12591446
{
1447+
if let Some(relation) = c.relation.as_ref() {
1448+
if known_join_subqueries.contains(relation) {
1449+
// SQL API passes fixed aliases to Cube.js for join subqueries
1450+
// It means we don't need to use member expressions here, and can just use that fixed alias
1451+
// So we can generate that as if it were regular column expression
1452+
1453+
return Self::generate_sql_for_expr(
1454+
plan.clone(),
1455+
sql_query,
1456+
sql_generator.clone(),
1457+
expr,
1458+
None,
1459+
subqueries.clone(),
1460+
)
1461+
.await;
1462+
}
1463+
}
1464+
12601465
let field_index = ungrouped_scan_node
12611466
.schema
12621467
.fields()

rust/cubesql/cubesql/src/compile/test/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -583,6 +583,8 @@ OFFSET {{ offset }}{% endif %}"#.to_string(),
583583
("expressions/like".to_string(), "{{ expr }} {% if negated %}NOT {% endif %}LIKE {{ pattern }}".to_string()),
584584
("expressions/ilike".to_string(), "{{ expr }} {% if negated %}NOT {% endif %}ILIKE {{ pattern }}".to_string()),
585585
("expressions/like_escape".to_string(), "{{ like_expr }} ESCAPE {{ escape_char }}".to_string()),
586+
("join_types/inner".to_string(), "INNER".to_string()),
587+
("join_types/left".to_string(), "LEFT".to_string()),
586588
("quotes/identifiers".to_string(), "\"".to_string()),
587589
("quotes/escape".to_string(), "\"\"".to_string()),
588590
("params/param".to_string(), "${{ param_index + 1 }}".to_string()),

rust/cubesql/cubesql/src/transport/service.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -906,4 +906,12 @@ impl SqlTemplates {
906906
};
907907
self.render_template(&format!("types/{}", data_type), context! {})
908908
}
909+
910+
pub fn left_join(&self) -> Result<String, CubeError> {
911+
self.render_template("join_types/left", context! {})
912+
}
913+
914+
pub fn inner_join(&self) -> Result<String, CubeError> {
915+
self.render_template("join_types/inner", context! {})
916+
}
909917
}

0 commit comments

Comments
 (0)