@@ -3,8 +3,9 @@ use crate::{
33 cube_scan, cube_scan_wrapper, rewrite,
44 rewriter:: { CubeEGraph , CubeRewrite } ,
55 rules:: wrapper:: WrapperRules ,
6- transforming_rewrite, wrapper_pullup_replacer, CubeScanAliasToCube , CubeScanUngrouped ,
7- LogicalPlanLanguage , WrapperPullupReplacerAliasToCube , WrapperPullupReplacerUngrouped ,
6+ transforming_rewrite, wrapper_pullup_replacer, CubeScanAliasToCube , CubeScanLimit ,
7+ CubeScanOffset , CubeScanUngrouped , LogicalPlanLanguage , WrapperPullupReplacerAliasToCube ,
8+ WrapperPullupReplacerUngrouped ,
89 } ,
910 var, var_iter,
1011} ;
@@ -51,6 +52,8 @@ impl WrapperRules {
5152 self . transform_wrap_cube_scan(
5253 "?members" ,
5354 "?alias_to_cube" ,
55+ "?limit" ,
56+ "?offset" ,
5457 "?ungrouped" ,
5558 "?alias_to_cube_out" ,
5659 "?ungrouped_out" ,
@@ -77,27 +80,44 @@ impl WrapperRules {
7780 & self ,
7881 members_var : & ' static str ,
7982 alias_to_cube_var : & ' static str ,
83+ limit_var : & ' static str ,
84+ offset_var : & ' static str ,
8085 ungrouped_cube_var : & ' static str ,
8186 alias_to_cube_var_out : & ' static str ,
8287 ungrouped_cube_var_out : & ' static str ,
8388 ) -> impl Fn ( & mut CubeEGraph , & mut Subst ) -> bool {
8489 let members_var = var ! ( members_var) ;
8590 let alias_to_cube_var = var ! ( alias_to_cube_var) ;
91+ let limit_var = var ! ( limit_var) ;
92+ let offset_var = var ! ( offset_var) ;
8693 let ungrouped_cube_var = var ! ( ungrouped_cube_var) ;
8794 let alias_to_cube_var_out = var ! ( alias_to_cube_var_out) ;
8895 let ungrouped_cube_var_out = var ! ( ungrouped_cube_var_out) ;
8996 move |egraph, subst| {
97+ let mut has_no_limit_or_offset = true ;
98+ for limit in var_iter ! ( egraph[ subst[ limit_var] ] , CubeScanLimit ) . cloned ( ) {
99+ has_no_limit_or_offset &= limit. is_none ( ) ;
100+ }
101+ for offset in var_iter ! ( egraph[ subst[ offset_var] ] , CubeScanOffset ) . cloned ( ) {
102+ has_no_limit_or_offset &= offset. is_none ( ) ;
103+ }
104+
90105 if let Some ( _) = egraph[ subst[ members_var] ] . data . member_name_to_expr {
91106 for alias_to_cube in
92107 var_iter ! ( egraph[ subst[ alias_to_cube_var] ] , CubeScanAliasToCube ) . cloned ( )
93108 {
94109 for ungrouped in
95110 var_iter ! ( egraph[ subst[ ungrouped_cube_var] ] , CubeScanUngrouped ) . cloned ( )
96111 {
112+ // When CubeScan already has limit or offset, it's unsafe to allow to push
113+ // anything on top to Cube.
114+ // Especially aggregation: aggregate does not commute with limit,
115+ // so it would be incorrect to join them to single CubeScan
116+ let ungrouped_out = ungrouped && has_no_limit_or_offset;
97117 subst. insert (
98118 ungrouped_cube_var_out,
99119 egraph. add ( LogicalPlanLanguage :: WrapperPullupReplacerUngrouped (
100- WrapperPullupReplacerUngrouped ( ungrouped ) ,
120+ WrapperPullupReplacerUngrouped ( ungrouped_out ) ,
101121 ) ) ,
102122 ) ;
103123 subst. insert (
0 commit comments