Skip to content

Commit eb3cc80

Browse files
committed
feat(cubesql): Handle grouped subqueries in SQL generation for push to Cube
1 parent aca5383 commit eb3cc80

File tree

3 files changed

+235
-20
lines changed

3 files changed

+235
-20
lines changed

rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs

Lines changed: 225 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@ use crate::{
1919
CubeError,
2020
};
2121
use chrono::{Days, NaiveDate, SecondsFormat, TimeZone, Utc};
22-
use cubeclient::models::V1LoadRequestQuery;
22+
use cubeclient::models::{V1LoadRequestQuery, V1LoadRequestQueryJoinSubquery};
2323
use datafusion::{
2424
error::{DataFusionError, Result},
2525
logical_plan::{
26-
plan::Extension, replace_col, Column, DFSchema, DFSchemaRef, Expr, GroupingSet,
26+
plan::Extension, replace_col, Column, DFSchema, DFSchemaRef, Expr, GroupingSet, JoinType,
2727
LogicalPlan, UserDefinedLogicalNode,
2828
},
2929
physical_plan::{aggregates::AggregateFunction, functions::BuiltinScalarFunction},
@@ -45,8 +45,18 @@ use std::{
4545
sync::{Arc, LazyLock},
4646
};
4747

48+
pub struct JoinSubquery {
49+
alias: String,
50+
sql: String,
51+
condition: Expr,
52+
join_type: JoinType,
53+
}
54+
4855
pub struct PushToCubeContext<'l> {
4956
ungrouped_scan_node: &'l CubeScanNode,
57+
// Known join subquery qualifiers, to generate proper column expressions
58+
known_join_subqueries: HashSet<String>,
59+
join_subqueries: Vec<JoinSubquery>,
5060
}
5161

5262
#[derive(Debug, Clone, Deserialize)]
@@ -342,6 +352,10 @@ impl ColumnRemapping {
342352
)
343353
.map_err(|_| CubeError::internal(format!("Can't rename columns for expr: {expr:?}",)))
344354
}
355+
356+
pub fn extend(&mut self, other: ColumnRemapping) {
357+
self.column_remapping.extend(other.column_remapping);
358+
}
345359
}
346360

347361
/// Builds new column mapping
@@ -845,7 +859,7 @@ impl CubeScanWrapperNode {
845859
aggr_expr,
846860
window_expr,
847861
from,
848-
joins: _joins,
862+
joins,
849863
filter_expr,
850864
having_expr: _having_expr,
851865
limit,
@@ -856,8 +870,8 @@ impl CubeScanWrapperNode {
856870
push_to_cube,
857871
}) = wrapped_select_node
858872
{
859-
// TODO support joins
860-
let push_to_cube_context = if push_to_cube {
873+
// TODO support ungrouped joins
874+
let ungrouped_scan_node = if push_to_cube {
861875
if let LogicalPlan::Extension(Extension { node }) = from.as_ref() {
862876
if let Some(cube_scan_node) =
863877
node.as_any().downcast_ref::<CubeScanNode>()
@@ -867,9 +881,7 @@ impl CubeScanWrapperNode {
867881
"Expected ungrouped CubeScan node but found: {cube_scan_node:?}"
868882
)));
869883
}
870-
Some(PushToCubeContext {
871-
ungrouped_scan_node: cube_scan_node,
872-
})
884+
Some(cube_scan_node)
873885
} else {
874886
return Err(CubeError::internal(format!(
875887
"Expected CubeScan node but found: {:?}",
@@ -886,16 +898,13 @@ impl CubeScanWrapperNode {
886898
None
887899
};
888900

889-
let push_to_cube_context = push_to_cube_context.as_ref();
890-
891901
let SqlGenerationResult {
892902
data_source,
893903
from_alias,
894-
column_remapping,
904+
mut column_remapping,
895905
mut sql,
896906
request,
897-
} = if let Some(push_to_cube_context) = push_to_cube_context {
898-
let ungrouped_scan_node = push_to_cube_context.ungrouped_scan_node;
907+
} = if let Some(ungrouped_scan_node) = &ungrouped_scan_node {
899908
let data_sources = ungrouped_scan_node
900909
.used_cubes
901910
.iter()
@@ -940,8 +949,6 @@ impl CubeScanWrapperNode {
940949
.await?
941950
};
942951

943-
let column_remapping = column_remapping.as_ref();
944-
945952
let mut subqueries_sql = HashMap::new();
946953
for subquery in subqueries.iter() {
947954
let SqlGenerationResult {
@@ -963,12 +970,123 @@ impl CubeScanWrapperNode {
963970

964971
let (sql_string, new_values) = subquery_sql.unpack();
965972
sql.extend_values(&new_values);
973+
// TODO why only field 0 is a key?
966974
let field = subquery.schema().field(0);
967975
subqueries_sql.insert(field.qualified_name(), sql_string);
968976
}
969977
let subqueries_sql = Arc::new(subqueries_sql);
970978
let alias = alias.or(from_alias.clone());
971979
let mut next_remapper = Remapper::new(alias.clone(), can_rename_columns);
980+
981+
let push_to_cube_context = if let Some(ungrouped_scan_node) =
982+
ungrouped_scan_node
983+
{
984+
let mut join_subqueries = vec![];
985+
let mut known_join_subqueries = HashSet::new();
986+
for (lp, cond, join_type) in joins {
987+
match lp.as_ref() {
988+
LogicalPlan::Extension(Extension { node }) => {
989+
if let Some(join_cube_scan) =
990+
node.as_any().downcast_ref::<CubeScanNode>()
991+
{
992+
if join_cube_scan.request.ungrouped == Some(true) {
993+
return Err(CubeError::internal(format!(
994+
"Unsupported ungrouped CubeScan as join subquery: {join_cube_scan:?}"
995+
)));
996+
}
997+
} else {
998+
// TODO support more grouped cases here
999+
return Err(CubeError::internal(format!(
1000+
"Unsupported unknown extension as join subquery: {node:?}"
1001+
)));
1002+
}
1003+
}
1004+
_ => {
1005+
// TODO support more grouped cases here
1006+
return Err(CubeError::internal(format!(
1007+
"Unsupported logical plan node as join subquery: {lp:?}"
1008+
)));
1009+
}
1010+
}
1011+
1012+
match join_type {
1013+
JoinType::Inner | JoinType::Left => {
1014+
// Do nothing
1015+
}
1016+
_ => {
1017+
return Err(CubeError::internal(format!(
1018+
"Unsupported join type for join subquery: {join_type:?}"
1019+
)));
1020+
}
1021+
}
1022+
1023+
// TODO avoid using direct alias from schema, implement remapping for qualifiers instead
1024+
let alias = lp
1025+
.schema()
1026+
.fields()
1027+
.iter()
1028+
.filter_map(|f| f.qualifier())
1029+
.next()
1030+
.ok_or_else(|| {
1031+
CubeError::internal(format!(
1032+
"Alias not found for join subquery {lp:?}"
1033+
))
1034+
})?;
1035+
1036+
let subq_sql = Self::generate_sql_for_node(
1037+
plan.clone(),
1038+
transport.clone(),
1039+
load_request_meta.clone(),
1040+
lp.clone(),
1041+
true,
1042+
sql.values.clone(),
1043+
data_source.clone(),
1044+
)
1045+
.await?;
1046+
let (subq_sql_string, new_values) = subq_sql.sql.unpack();
1047+
sql.extend_values(&new_values);
1048+
let subq_alias = subq_sql.from_alias;
1049+
// Expect that subq_sql.column_remapping already incorporates subq_alias/
1050+
// TODO does it?
1051+
1052+
// TODO expect returned from_alias to be fine, but still need to remap it from original alias somewhere in generate_sql_for_node
1053+
1054+
// grouped join subquery can have its columns remapped, and expressions current node can reference original columns
1055+
column_remapping = {
1056+
match (column_remapping, subq_sql.column_remapping) {
1057+
(None, None) => None,
1058+
(None, Some(remapping)) | (Some(remapping), None) => {
1059+
Some(remapping)
1060+
}
1061+
(Some(mut left), Some(right)) => {
1062+
left.extend(right);
1063+
Some(left)
1064+
}
1065+
}
1066+
};
1067+
1068+
join_subqueries.push(JoinSubquery {
1069+
// TODO what alias to actually use here? two more-or-less valid options: returned from generate_sql_for_node ot realiased from `alias`. Plain `alias` is incorrect here
1070+
alias: subq_alias.unwrap_or_else(|| alias.clone()),
1071+
sql: subq_sql_string,
1072+
condition: cond.clone(),
1073+
join_type: join_type.clone(),
1074+
});
1075+
known_join_subqueries.insert(alias.clone());
1076+
}
1077+
1078+
Some(PushToCubeContext {
1079+
ungrouped_scan_node,
1080+
join_subqueries,
1081+
known_join_subqueries,
1082+
})
1083+
} else {
1084+
None
1085+
};
1086+
// Drop mut, turn to ref
1087+
let column_remapping = column_remapping.as_ref();
1088+
// Turn to ref
1089+
let push_to_cube_context = push_to_cube_context.as_ref();
9721090
if let Some(data_source) = data_source {
9731091
let generator = plan
9741092
.meta
@@ -1064,8 +1182,74 @@ impl CubeScanWrapperNode {
10641182
subqueries_sql.clone(),
10651183
)
10661184
.await?;
1067-
if let Some(push_to_cube_context) = push_to_cube_context {
1068-
let ungrouped_scan_node = push_to_cube_context.ungrouped_scan_node;
1185+
if let Some(PushToCubeContext {
1186+
ungrouped_scan_node,
1187+
join_subqueries,
1188+
known_join_subqueries: _,
1189+
}) = push_to_cube_context
1190+
{
1191+
let mut prepared_join_subqueries = vec![];
1192+
for JoinSubquery {
1193+
alias: subq_alias,
1194+
sql: subq_sql,
1195+
condition,
1196+
join_type,
1197+
} in join_subqueries
1198+
{
1199+
// Need to call generate_column_expr to apply column_remapping
1200+
let (join_condition, new_sql) = Self::generate_column_expr(
1201+
plan.clone(),
1202+
schema.clone(),
1203+
[condition.clone()],
1204+
sql,
1205+
generator.clone(),
1206+
column_remapping,
1207+
&mut next_remapper,
1208+
true,
1209+
push_to_cube_context,
1210+
subqueries_sql.clone(),
1211+
)
1212+
.await?;
1213+
let join_condition = join_condition[0].expr.clone();
1214+
sql = new_sql;
1215+
1216+
let join_sql_expression = {
1217+
// TODO this is NOT a proper way to generate member expr here
1218+
// TODO Do we even want a full-blown member expression here? or arguments + expr will be enough?
1219+
let res = Self::make_member_def(
1220+
&AliasedColumn {
1221+
expr: join_condition,
1222+
alias: "__join__alias__unused".to_string(),
1223+
},
1224+
&ungrouped_scan_node.used_cubes,
1225+
)?;
1226+
serde_json::json!(res).to_string()
1227+
};
1228+
1229+
let join_type = match join_type {
1230+
JoinType::Left => generator
1231+
.get_sql_templates()
1232+
.left_join()?,
1233+
JoinType::Inner => generator
1234+
.get_sql_templates()
1235+
.inner_join()?,
1236+
_ => {
1237+
return Err(CubeError::internal(format!(
1238+
"Unsupported join type for join subquery: {join_type:?}"
1239+
)))
1240+
}
1241+
};
1242+
1243+
// for simple ungrouped-grouped joins everything should already be present in from
1244+
// so we can just attach this join to the end, no need to look for a proper spot
1245+
prepared_join_subqueries.push(V1LoadRequestQueryJoinSubquery {
1246+
sql: subq_sql.clone(),
1247+
on: join_sql_expression,
1248+
join_type,
1249+
alias: subq_alias.clone(),
1250+
});
1251+
}
1252+
10691253
let load_request = &ungrouped_scan_node.request;
10701254

10711255
let load_request = V1LoadRequestQuery {
@@ -1198,7 +1382,8 @@ impl CubeScanWrapperNode {
11981382
filters: ungrouped_scan_node.request.filters.clone(),
11991383

12001384
time_dimensions: load_request.time_dimensions.clone(),
1201-
subquery_joins: None,
1385+
subquery_joins: (!prepared_join_subqueries.is_empty())
1386+
.then_some(prepared_join_subqueries),
12021387
};
12031388

12041389
// TODO time dimensions, filters, segments
@@ -1302,7 +1487,7 @@ impl CubeScanWrapperNode {
13021487
async fn generate_column_expr(
13031488
plan: Arc<Self>,
13041489
schema: DFSchemaRef,
1305-
exprs: Vec<Expr>,
1490+
exprs: impl IntoIterator<Item = Expr>,
13061491
mut sql: SqlQuery,
13071492
generator: Arc<dyn SqlGenerator>,
13081493
column_remapping: Option<&ColumnRemapping>,
@@ -1431,7 +1616,7 @@ impl CubeScanWrapperNode {
14311616
Ok((expr, sql_query))
14321617
}
14331618
// Expr::OuterColumn(_, _) => {}
1434-
Expr::Column(c) => {
1619+
Expr::Column(ref c) => {
14351620
if let Some(subquery) = subqueries.get(&c.flat_name()) {
14361621
Ok((
14371622
sql_generator
@@ -1447,8 +1632,28 @@ impl CubeScanWrapperNode {
14471632
))
14481633
} else if let Some(PushToCubeContext {
14491634
ungrouped_scan_node,
1635+
join_subqueries: _,
1636+
known_join_subqueries,
14501637
}) = push_to_cube_context
14511638
{
1639+
if let Some(relation) = c.relation.as_ref() {
1640+
if known_join_subqueries.contains(relation) {
1641+
// SQL API passes fixed aliases to Cube.js for join subqueries
1642+
// It means we don't need to use member expressions here, and can just use that fixed alias
1643+
// So we can generate that as if it were regular column expression
1644+
1645+
return Self::generate_sql_for_expr(
1646+
plan.clone(),
1647+
sql_query,
1648+
sql_generator.clone(),
1649+
expr,
1650+
None,
1651+
subqueries.clone(),
1652+
)
1653+
.await;
1654+
}
1655+
}
1656+
14521657
let field_index = ungrouped_scan_node
14531658
.schema
14541659
.fields()

rust/cubesql/cubesql/src/compile/test/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -583,6 +583,8 @@ OFFSET {{ offset }}{% endif %}"#.to_string(),
583583
("expressions/like".to_string(), "{{ expr }} {% if negated %}NOT {% endif %}LIKE {{ pattern }}".to_string()),
584584
("expressions/ilike".to_string(), "{{ expr }} {% if negated %}NOT {% endif %}ILIKE {{ pattern }}".to_string()),
585585
("expressions/like_escape".to_string(), "{{ like_expr }} ESCAPE {{ escape_char }}".to_string()),
586+
("join_types/inner".to_string(), "INNER".to_string()),
587+
("join_types/left".to_string(), "LEFT".to_string()),
586588
("quotes/identifiers".to_string(), "\"".to_string()),
587589
("quotes/escape".to_string(), "\"\"".to_string()),
588590
("params/param".to_string(), "${{ param_index + 1 }}".to_string()),

rust/cubesql/cubesql/src/transport/service.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -906,4 +906,12 @@ impl SqlTemplates {
906906
};
907907
self.render_template(&format!("types/{}", data_type), context! {})
908908
}
909+
910+
pub fn left_join(&self) -> Result<String, CubeError> {
911+
self.render_template("join_types/left", context! {})
912+
}
913+
914+
pub fn inner_join(&self) -> Result<String, CubeError> {
915+
self.render_template("join_types/inner", context! {})
916+
}
909917
}

0 commit comments

Comments
 (0)