@@ -260,6 +260,44 @@ fn expr_name(e: &Expr, schema: &DFSchema) -> Result<String> {
260260 }
261261}
262262
263+ /// Holds column remapping for generated SQL
264+ /// Can be used to remap expression in logical plans on top,
265+ /// and to generate mapping between schema and Cube load query in wrapper
266+ pub struct ColumnRemapping {
267+ column_remapping : HashMap < Column , Column > ,
268+ }
269+
270+ impl ColumnRemapping {
271+ /// Generate member_fields for CubeScanExecutionPlan, which contains SQL with this remapping.
272+ /// Cube will respond with aliases after remapping, which we must use to read response.
273+ /// Schema in DF will stay the same as before remapping.
274+ /// So result would have all aliases after remapping in order derived from `schema`.
275+ pub fn member_fields ( & self , schema : & DFSchema ) -> Vec < MemberField > {
276+ schema
277+ . fields ( )
278+ . iter ( )
279+ . map ( |f| {
280+ MemberField :: Member (
281+ self . column_remapping
282+ . get ( & Column :: from_name ( f. name ( ) . to_string ( ) ) )
283+ . map ( |x| x. name . to_string ( ) )
284+ . unwrap_or ( f. name ( ) . to_string ( ) ) ,
285+ )
286+ } )
287+ . collect ( )
288+ }
289+
290+ /// Replace every column expression in `expr` according to this remapping. Column expressions
291+ /// not present in `self` will stay the same.
292+ pub fn remap ( & self , expr : & Expr ) -> result:: Result < Expr , CubeError > {
293+ replace_col (
294+ expr. clone ( ) ,
295+ & self . column_remapping . iter ( ) . map ( |( k, v) | ( k, v) ) . collect ( ) ,
296+ )
297+ . map_err ( |_| CubeError :: internal ( format ! ( "Can't rename columns for expr: {expr:?}" , ) ) )
298+ }
299+ }
300+
263301/// Builds new column mapping
264302/// One remapper for one context: all unqualified columns with same name are assumed the same column
265303struct Remapper {
@@ -365,9 +403,11 @@ impl Remapper {
365403 Ok ( alias)
366404 }
367405
368- pub fn into_remapping ( self ) -> Option < HashMap < Column , Column > > {
406+ pub fn into_remapping ( self ) -> Option < ColumnRemapping > {
369407 if self . remapping . len ( ) > 0 {
370- Some ( self . remapping )
408+ Some ( ColumnRemapping {
409+ column_remapping : self . remapping ,
410+ } )
371411 } else {
372412 None
373413 }
@@ -377,7 +417,7 @@ impl Remapper {
377417pub struct SqlGenerationResult {
378418 pub data_source : Option < String > ,
379419 pub from_alias : Option < String > ,
380- pub column_remapping : Option < HashMap < Column , Column > > ,
420+ pub column_remapping : Option < ColumnRemapping > ,
381421 pub sql : SqlQuery ,
382422 pub request : TransportLoadRequestQuery ,
383423}
@@ -446,11 +486,7 @@ impl CubeScanWrapperNode {
446486 . await
447487 . and_then ( |SqlGenerationResult { data_source, mut sql, request, column_remapping, .. } | -> result:: Result < _ , CubeError > {
448488 let member_fields = if let Some ( column_remapping) = column_remapping {
449- schema
450- . fields ( )
451- . iter ( )
452- . map ( |f| MemberField :: Member ( column_remapping. get ( & Column :: from_name ( f. name ( ) . to_string ( ) ) ) . map ( |x| x. name . to_string ( ) ) . unwrap_or ( f. name ( ) . to_string ( ) ) ) )
453- . collect ( )
489+ column_remapping. member_fields ( schema)
454490 } else {
455491 schema
456492 . fields ( )
@@ -714,6 +750,8 @@ impl CubeScanWrapperNode {
714750 . await ?
715751 } ;
716752
753+ let column_remapping = column_remapping. as_ref ( ) ;
754+
717755 let mut subqueries_sql = HashMap :: new ( ) ;
718756 for subquery in subqueries. iter ( ) {
719757 let SqlGenerationResult {
@@ -759,7 +797,7 @@ impl CubeScanWrapperNode {
759797 projection_expr. clone ( ) ,
760798 sql,
761799 generator. clone ( ) ,
762- & column_remapping,
800+ column_remapping,
763801 & mut next_remapper,
764802 can_rename_columns,
765803 ungrouped_scan_node. clone ( ) ,
@@ -773,7 +811,7 @@ impl CubeScanWrapperNode {
773811 flat_group_expr. clone ( ) ,
774812 sql,
775813 generator. clone ( ) ,
776- & column_remapping,
814+ column_remapping,
777815 & mut next_remapper,
778816 can_rename_columns,
779817 ungrouped_scan_node. clone ( ) ,
@@ -787,7 +825,7 @@ impl CubeScanWrapperNode {
787825 aggr_expr. clone ( ) ,
788826 sql,
789827 generator. clone ( ) ,
790- & column_remapping,
828+ column_remapping,
791829 & mut next_remapper,
792830 can_rename_columns,
793831 ungrouped_scan_node. clone ( ) ,
@@ -801,7 +839,7 @@ impl CubeScanWrapperNode {
801839 filter_expr. clone ( ) ,
802840 sql,
803841 generator. clone ( ) ,
804- & column_remapping,
842+ column_remapping,
805843 & mut next_remapper,
806844 can_rename_columns,
807845 ungrouped_scan_node. clone ( ) ,
@@ -815,7 +853,7 @@ impl CubeScanWrapperNode {
815853 window_expr. clone ( ) ,
816854 sql,
817855 generator. clone ( ) ,
818- & column_remapping,
856+ column_remapping,
819857 & mut next_remapper,
820858 can_rename_columns,
821859 ungrouped_scan_node. clone ( ) ,
@@ -829,7 +867,7 @@ impl CubeScanWrapperNode {
829867 order_expr. clone ( ) ,
830868 sql,
831869 generator. clone ( ) ,
832- & column_remapping,
870+ column_remapping,
833871 & mut next_remapper,
834872 can_rename_columns,
835873 ungrouped_scan_node. clone ( ) ,
@@ -1060,25 +1098,16 @@ impl CubeScanWrapperNode {
10601098 exprs : Vec < Expr > ,
10611099 mut sql : SqlQuery ,
10621100 generator : Arc < dyn SqlGenerator > ,
1063- column_remapping : & Option < HashMap < Column , Column > > ,
1101+ column_remapping : Option < & ColumnRemapping > ,
10641102 next_remapper : & mut Remapper ,
10651103 can_rename_columns : bool ,
10661104 ungrouped_scan_node : Option < Arc < CubeScanNode > > ,
10671105 subqueries : Arc < HashMap < String , String > > ,
10681106 ) -> result:: Result < ( Vec < AliasedColumn > , SqlQuery ) , CubeError > {
10691107 let mut aliased_columns = Vec :: new ( ) ;
10701108 for original_expr in exprs {
1071- let expr = if let Some ( column_remapping) = column_remapping. as_ref ( ) {
1072- let mut expr = replace_col (
1073- original_expr. clone ( ) ,
1074- & column_remapping. iter ( ) . map ( |( k, v) | ( k, v) ) . collect ( ) ,
1075- )
1076- . map_err ( |_| {
1077- CubeError :: internal ( format ! (
1078- "Can't rename columns for expr: {:?}" ,
1079- original_expr
1080- ) )
1081- } ) ?;
1109+ let expr = if let Some ( column_remapping) = column_remapping {
1110+ let mut expr = column_remapping. remap ( & original_expr) ?;
10821111 if !can_rename_columns {
10831112 let original_alias = expr_name ( & original_expr, & schema) ?;
10841113 if original_alias != expr_name ( & expr, & schema) ? {
0 commit comments