@@ -19,11 +19,11 @@ use crate::{
1919 CubeError ,
2020} ;
2121use chrono:: { Days , NaiveDate , SecondsFormat , TimeZone , Utc } ;
22- use cubeclient:: models:: V1LoadRequestQuery ;
22+ use cubeclient:: models:: { V1LoadRequestQuery , V1LoadRequestQueryJoinSubquery } ;
2323use datafusion:: {
2424 error:: { DataFusionError , Result } ,
2525 logical_plan:: {
26- plan:: Extension , replace_col, Column , DFSchema , DFSchemaRef , Expr , GroupingSet ,
26+ plan:: Extension , replace_col, Column , DFSchema , DFSchemaRef , Expr , GroupingSet , JoinType ,
2727 LogicalPlan , UserDefinedLogicalNode ,
2828 } ,
2929 physical_plan:: { aggregates:: AggregateFunction , functions:: BuiltinScalarFunction } ,
@@ -45,8 +45,18 @@ use std::{
4545 sync:: { Arc , LazyLock } ,
4646} ;
4747
48+ pub struct JoinSubquery {
49+ alias : String ,
50+ sql : String ,
51+ condition : Expr ,
52+ join_type : JoinType ,
53+ }
54+
4855pub struct PushToCubeContext < ' l > {
4956 ungrouped_scan_node : & ' l CubeScanNode ,
57+ // Known join subquery qualifiers, to generate proper column expressions
58+ known_join_subqueries : HashSet < String > ,
59+ join_subqueries : Vec < JoinSubquery > ,
5060}
5161
5262#[ derive( Debug , Clone , Deserialize ) ]
@@ -342,6 +352,10 @@ impl ColumnRemapping {
342352 )
343353 . map_err ( |_| CubeError :: internal ( format ! ( "Can't rename columns for expr: {expr:?}" , ) ) )
344354 }
355+
356+ pub fn extend ( & mut self , other : ColumnRemapping ) {
357+ self . column_remapping . extend ( other. column_remapping ) ;
358+ }
345359}
346360
347361/// Builds new column mapping
@@ -845,7 +859,7 @@ impl CubeScanWrapperNode {
845859 aggr_expr,
846860 window_expr,
847861 from,
848- joins : _joins ,
862+ joins,
849863 filter_expr,
850864 having_expr : _having_expr,
851865 limit,
@@ -856,8 +870,8 @@ impl CubeScanWrapperNode {
856870 push_to_cube,
857871 } ) = wrapped_select_node
858872 {
859- // TODO support joins
860- let push_to_cube_context = if push_to_cube {
873+ // TODO support ungrouped joins
874+ let ungrouped_scan_node = if push_to_cube {
861875 if let LogicalPlan :: Extension ( Extension { node } ) = from. as_ref ( ) {
862876 if let Some ( cube_scan_node) =
863877 node. as_any ( ) . downcast_ref :: < CubeScanNode > ( )
@@ -867,9 +881,7 @@ impl CubeScanWrapperNode {
867881 "Expected ungrouped CubeScan node but found: {cube_scan_node:?}"
868882 ) ) ) ;
869883 }
870- Some ( PushToCubeContext {
871- ungrouped_scan_node : cube_scan_node,
872- } )
884+ Some ( cube_scan_node)
873885 } else {
874886 return Err ( CubeError :: internal ( format ! (
875887 "Expected CubeScan node but found: {:?}" ,
@@ -886,16 +898,13 @@ impl CubeScanWrapperNode {
886898 None
887899 } ;
888900
889- let push_to_cube_context = push_to_cube_context. as_ref ( ) ;
890-
891901 let SqlGenerationResult {
892902 data_source,
893903 from_alias,
894- column_remapping,
904+ mut column_remapping,
895905 mut sql,
896906 request,
897- } = if let Some ( push_to_cube_context) = push_to_cube_context {
898- let ungrouped_scan_node = push_to_cube_context. ungrouped_scan_node ;
907+ } = if let Some ( ungrouped_scan_node) = & ungrouped_scan_node {
899908 let data_sources = ungrouped_scan_node
900909 . used_cubes
901910 . iter ( )
@@ -940,8 +949,6 @@ impl CubeScanWrapperNode {
940949 . await ?
941950 } ;
942951
943- let column_remapping = column_remapping. as_ref ( ) ;
944-
945952 let mut subqueries_sql = HashMap :: new ( ) ;
946953 for subquery in subqueries. iter ( ) {
947954 let SqlGenerationResult {
@@ -963,12 +970,123 @@ impl CubeScanWrapperNode {
963970
964971 let ( sql_string, new_values) = subquery_sql. unpack ( ) ;
965972 sql. extend_values ( & new_values) ;
973+ // TODO why only field 0 is a key?
966974 let field = subquery. schema ( ) . field ( 0 ) ;
967975 subqueries_sql. insert ( field. qualified_name ( ) , sql_string) ;
968976 }
969977 let subqueries_sql = Arc :: new ( subqueries_sql) ;
970978 let alias = alias. or ( from_alias. clone ( ) ) ;
971979 let mut next_remapper = Remapper :: new ( alias. clone ( ) , can_rename_columns) ;
980+
981+ let push_to_cube_context = if let Some ( ungrouped_scan_node) =
982+ ungrouped_scan_node
983+ {
984+ let mut join_subqueries = vec ! [ ] ;
985+ let mut known_join_subqueries = HashSet :: new ( ) ;
986+ for ( lp, cond, join_type) in joins {
987+ match lp. as_ref ( ) {
988+ LogicalPlan :: Extension ( Extension { node } ) => {
989+ if let Some ( join_cube_scan) =
990+ node. as_any ( ) . downcast_ref :: < CubeScanNode > ( )
991+ {
992+ if join_cube_scan. request . ungrouped == Some ( true ) {
993+ return Err ( CubeError :: internal ( format ! (
994+ "Unsupported ungrouped CubeScan as join subquery: {join_cube_scan:?}"
995+ ) ) ) ;
996+ }
997+ } else {
998+ // TODO support more grouped cases here
999+ return Err ( CubeError :: internal ( format ! (
1000+ "Unsupported unknown extension as join subquery: {node:?}"
1001+ ) ) ) ;
1002+ }
1003+ }
1004+ _ => {
1005+ // TODO support more grouped cases here
1006+ return Err ( CubeError :: internal ( format ! (
1007+ "Unsupported logical plan node as join subquery: {lp:?}"
1008+ ) ) ) ;
1009+ }
1010+ }
1011+
1012+ match join_type {
1013+ JoinType :: Inner | JoinType :: Left => {
1014+ // Do nothing
1015+ }
1016+ _ => {
1017+ return Err ( CubeError :: internal ( format ! (
1018+ "Unsupported join type for join subquery: {join_type:?}"
1019+ ) ) ) ;
1020+ }
1021+ }
1022+
1023+ // TODO avoid using direct alias from schema, implement remapping for qualifiers instead
1024+ let alias = lp
1025+ . schema ( )
1026+ . fields ( )
1027+ . iter ( )
1028+ . filter_map ( |f| f. qualifier ( ) )
1029+ . next ( )
1030+ . ok_or_else ( || {
1031+ CubeError :: internal ( format ! (
1032+ "Alias not found for join subquery {lp:?}"
1033+ ) )
1034+ } ) ?;
1035+
1036+ let subq_sql = Self :: generate_sql_for_node (
1037+ plan. clone ( ) ,
1038+ transport. clone ( ) ,
1039+ load_request_meta. clone ( ) ,
1040+ lp. clone ( ) ,
1041+ true ,
1042+ sql. values . clone ( ) ,
1043+ data_source. clone ( ) ,
1044+ )
1045+ . await ?;
1046+ let ( subq_sql_string, new_values) = subq_sql. sql . unpack ( ) ;
1047+ sql. extend_values ( & new_values) ;
1048+ let subq_alias = subq_sql. from_alias ;
1049+ // Expect that subq_sql.column_remapping already incorporates subq_alias/
1050+ // TODO does it?
1051+
1052+ // TODO expect returned from_alias to be fine, but still need to remap it from original alias somewhere in generate_sql_for_node
1053+
1054+ // grouped join subquery can have its columns remapped, and expressions current node can reference original columns
1055+ column_remapping = {
1056+ match ( column_remapping, subq_sql. column_remapping ) {
1057+ ( None , None ) => None ,
1058+ ( None , Some ( remapping) ) | ( Some ( remapping) , None ) => {
1059+ Some ( remapping)
1060+ }
1061+ ( Some ( mut left) , Some ( right) ) => {
1062+ left. extend ( right) ;
1063+ Some ( left)
1064+ }
1065+ }
1066+ } ;
1067+
1068+ join_subqueries. push ( JoinSubquery {
1069+ // TODO what alias to actually use here? two more-or-less valid options: returned from generate_sql_for_node ot realiased from `alias`. Plain `alias` is incorrect here
1070+ alias : subq_alias. unwrap_or_else ( || alias. clone ( ) ) ,
1071+ sql : subq_sql_string,
1072+ condition : cond. clone ( ) ,
1073+ join_type : join_type. clone ( ) ,
1074+ } ) ;
1075+ known_join_subqueries. insert ( alias. clone ( ) ) ;
1076+ }
1077+
1078+ Some ( PushToCubeContext {
1079+ ungrouped_scan_node,
1080+ join_subqueries,
1081+ known_join_subqueries,
1082+ } )
1083+ } else {
1084+ None
1085+ } ;
1086+ // Drop mut, turn to ref
1087+ let column_remapping = column_remapping. as_ref ( ) ;
1088+ // Turn to ref
1089+ let push_to_cube_context = push_to_cube_context. as_ref ( ) ;
9721090 if let Some ( data_source) = data_source {
9731091 let generator = plan
9741092 . meta
@@ -1064,8 +1182,74 @@ impl CubeScanWrapperNode {
10641182 subqueries_sql. clone ( ) ,
10651183 )
10661184 . await ?;
1067- if let Some ( push_to_cube_context) = push_to_cube_context {
1068- let ungrouped_scan_node = push_to_cube_context. ungrouped_scan_node ;
1185+ if let Some ( PushToCubeContext {
1186+ ungrouped_scan_node,
1187+ join_subqueries,
1188+ known_join_subqueries : _,
1189+ } ) = push_to_cube_context
1190+ {
1191+ let mut prepared_join_subqueries = vec ! [ ] ;
1192+ for JoinSubquery {
1193+ alias : subq_alias,
1194+ sql : subq_sql,
1195+ condition,
1196+ join_type,
1197+ } in join_subqueries
1198+ {
1199+ // Need to call generate_column_expr to apply column_remapping
1200+ let ( join_condition, new_sql) = Self :: generate_column_expr (
1201+ plan. clone ( ) ,
1202+ schema. clone ( ) ,
1203+ [ condition. clone ( ) ] ,
1204+ sql,
1205+ generator. clone ( ) ,
1206+ column_remapping,
1207+ & mut next_remapper,
1208+ true ,
1209+ push_to_cube_context,
1210+ subqueries_sql. clone ( ) ,
1211+ )
1212+ . await ?;
1213+ let join_condition = join_condition[ 0 ] . expr . clone ( ) ;
1214+ sql = new_sql;
1215+
1216+ let join_sql_expression = {
1217+ // TODO this is NOT a proper way to generate member expr here
1218+ // TODO Do we even want a full-blown member expression here? or arguments + expr will be enough?
1219+ let res = Self :: make_member_def (
1220+ & AliasedColumn {
1221+ expr : join_condition,
1222+ alias : "__join__alias__unused" . to_string ( ) ,
1223+ } ,
1224+ & ungrouped_scan_node. used_cubes ,
1225+ ) ?;
1226+ serde_json:: json!( res) . to_string ( )
1227+ } ;
1228+
1229+ let join_type = match join_type {
1230+ JoinType :: Left => generator
1231+ . get_sql_templates ( )
1232+ . left_join ( ) ?,
1233+ JoinType :: Inner => generator
1234+ . get_sql_templates ( )
1235+ . inner_join ( ) ?,
1236+ _ => {
1237+ return Err ( CubeError :: internal ( format ! (
1238+ "Unsupported join type for join subquery: {join_type:?}"
1239+ ) ) )
1240+ }
1241+ } ;
1242+
1243+ // for simple ungrouped-grouped joins everything should already be present in from
1244+ // so we can just attach this join to the end, no need to look for a proper spot
1245+ prepared_join_subqueries. push ( V1LoadRequestQueryJoinSubquery {
1246+ sql : subq_sql. clone ( ) ,
1247+ on : join_sql_expression,
1248+ join_type,
1249+ alias : subq_alias. clone ( ) ,
1250+ } ) ;
1251+ }
1252+
10691253 let load_request = & ungrouped_scan_node. request ;
10701254
10711255 let load_request = V1LoadRequestQuery {
@@ -1198,7 +1382,8 @@ impl CubeScanWrapperNode {
11981382 filters : ungrouped_scan_node. request . filters . clone ( ) ,
11991383
12001384 time_dimensions : load_request. time_dimensions . clone ( ) ,
1201- subquery_joins : None ,
1385+ subquery_joins : ( !prepared_join_subqueries. is_empty ( ) )
1386+ . then_some ( prepared_join_subqueries) ,
12021387 } ;
12031388
12041389 // TODO time dimensions, filters, segments
@@ -1302,7 +1487,7 @@ impl CubeScanWrapperNode {
13021487 async fn generate_column_expr (
13031488 plan : Arc < Self > ,
13041489 schema : DFSchemaRef ,
1305- exprs : Vec < Expr > ,
1490+ exprs : impl IntoIterator < Item = Expr > ,
13061491 mut sql : SqlQuery ,
13071492 generator : Arc < dyn SqlGenerator > ,
13081493 column_remapping : Option < & ColumnRemapping > ,
@@ -1431,7 +1616,7 @@ impl CubeScanWrapperNode {
14311616 Ok ( ( expr, sql_query) )
14321617 }
14331618 // Expr::OuterColumn(_, _) => {}
1434- Expr :: Column ( c) => {
1619+ Expr :: Column ( ref c) => {
14351620 if let Some ( subquery) = subqueries. get ( & c. flat_name ( ) ) {
14361621 Ok ( (
14371622 sql_generator
@@ -1447,8 +1632,28 @@ impl CubeScanWrapperNode {
14471632 ) )
14481633 } else if let Some ( PushToCubeContext {
14491634 ungrouped_scan_node,
1635+ join_subqueries : _,
1636+ known_join_subqueries,
14501637 } ) = push_to_cube_context
14511638 {
1639+ if let Some ( relation) = c. relation . as_ref ( ) {
1640+ if known_join_subqueries. contains ( relation) {
1641+ // SQL API passes fixed aliases to Cube.js for join subqueries
1642+ // It means we don't need to use member expressions here, and can just use that fixed alias
1643+ // So we can generate that as if it were regular column expression
1644+
1645+ return Self :: generate_sql_for_expr (
1646+ plan. clone ( ) ,
1647+ sql_query,
1648+ sql_generator. clone ( ) ,
1649+ expr,
1650+ None ,
1651+ subqueries. clone ( ) ,
1652+ )
1653+ . await ;
1654+ }
1655+ }
1656+
14521657 let field_index = ungrouped_scan_node
14531658 . schema
14541659 . fields ( )
0 commit comments