|
| 1 | +use std::{ |
| 2 | + collections::HashMap, |
| 3 | + ops::{Index, IndexMut}, |
| 4 | + sync::Arc, |
| 5 | +}; |
| 6 | + |
| 7 | +use egg::{Id, Subst}; |
| 8 | + |
1 | 9 | use crate::{ |
2 | | - compile::rewrite::{ |
3 | | - analysis::OriginalExpr, |
4 | | - column_name_to_member_vec, cube_scan, cube_scan_order, cube_scan_order_empty_tail, |
5 | | - expr_column_name, order, order_replacer, referenced_columns, rewrite, |
6 | | - rewriter::{CubeEGraph, CubeRewrite, RewriteRules}, |
7 | | - sort, sort_exp, sort_exp_empty_tail, sort_expr, transforming_rewrite, LogicalPlanLanguage, |
8 | | - OrderAsc, OrderMember, OrderReplacerColumnNameToMember, SortExprAsc, |
| 10 | + compile::{ |
| 11 | + datafusion::logical_plan::{replace_col_to_expr, Column, Expr}, |
| 12 | + rewrite::{ |
| 13 | + analysis::OriginalExpr, |
| 14 | + column_name_to_member_vec, |
| 15 | + converter::LogicalPlanToLanguageConverter, |
| 16 | + cube_scan, cube_scan_order, cube_scan_order_empty_tail, expr_column_name, limit, order, |
| 17 | + order_replacer, projection, referenced_columns, rewrite, |
| 18 | + rewriter::{CubeEGraph, CubeRewrite, RewriteRules}, |
| 19 | + sort, sort_exp, sort_exp_empty_tail, sort_expr, transforming_rewrite, |
| 20 | + LogicalPlanLanguage, OrderAsc, OrderMember, OrderReplacerColumnNameToMember, |
| 21 | + ProjectionAlias, SortExprAsc, |
| 22 | + }, |
9 | 23 | }, |
10 | | - var, var_iter, |
| 24 | + config::ConfigObj, |
| 25 | + var, var_iter, CubeError, |
11 | 26 | }; |
12 | | -use egg::Subst; |
13 | | -use std::ops::{Index, IndexMut}; |
14 | 27 |
|
15 | | -pub struct OrderRules {} |
| 28 | +pub struct OrderRules { |
| 29 | + config_obj: Arc<dyn ConfigObj>, |
| 30 | +} |
16 | 31 |
|
17 | 32 | impl RewriteRules for OrderRules { |
18 | 33 | fn rewrite_rules(&self) -> Vec<CubeRewrite> { |
@@ -70,13 +85,42 @@ impl RewriteRules for OrderRules { |
70 | 85 | order_replacer(sort_exp_empty_tail(), "?aliases"), |
71 | 86 | cube_scan_order_empty_tail(), |
72 | 87 | ), |
| 88 | + transforming_rewrite( |
| 89 | + "push-down-limit-sort-projection", |
| 90 | + limit( |
| 91 | + "?skip", |
| 92 | + "?fetch", |
| 93 | + sort( |
| 94 | + "?sort_expr", |
| 95 | + projection( |
| 96 | + "?projection_expr", |
| 97 | + "?input", |
| 98 | + "?projection_alias", |
| 99 | + "?projection_split", |
| 100 | + ), |
| 101 | + ), |
| 102 | + ), |
| 103 | + projection( |
| 104 | + "?projection_expr", |
| 105 | + limit("?skip", "?fetch", sort("?new_sort_expr", "?input")), |
| 106 | + "?projection_alias", |
| 107 | + "?projection_split", |
| 108 | + ), |
| 109 | + self.push_down_limit_sort_projection( |
| 110 | + "?input", |
| 111 | + "?projection_expr", |
| 112 | + "?projection_alias", |
| 113 | + "?sort_expr", |
| 114 | + "?new_sort_expr", |
| 115 | + ), |
| 116 | + ), |
73 | 117 | ] |
74 | 118 | } |
75 | 119 | } |
76 | 120 |
|
77 | 121 | impl OrderRules { |
78 | | - pub fn new() -> Self { |
79 | | - Self {} |
| 122 | + pub fn new(config_obj: Arc<dyn ConfigObj>) -> Self { |
| 123 | + Self { config_obj } |
80 | 124 | } |
81 | 125 |
|
82 | 126 | fn push_down_sort( |
@@ -173,4 +217,147 @@ impl OrderRules { |
173 | 217 | false |
174 | 218 | } |
175 | 219 | } |
| 220 | + |
| 221 | + fn push_down_limit_sort_projection( |
| 222 | + &self, |
| 223 | + input_var: &'static str, |
| 224 | + projection_expr_var: &'static str, |
| 225 | + projection_alias_var: &'static str, |
| 226 | + sort_expr_var: &'static str, |
| 227 | + new_sort_expr_var: &'static str, |
| 228 | + ) -> impl Fn(&mut CubeEGraph, &mut Subst) -> bool { |
| 229 | + let input_var = var!(input_var); |
| 230 | + let projection_expr_var = var!(projection_expr_var); |
| 231 | + let projection_alias_var = var!(projection_alias_var); |
| 232 | + let sort_expr_var = var!(sort_expr_var); |
| 233 | + let new_sort_expr_var = var!(new_sort_expr_var); |
| 234 | + let flat_list = self.config_obj.push_down_pull_up_split(); |
| 235 | + move |egraph, subst| { |
| 236 | + let input_is_sort_or_limit = egraph[subst[input_var]].nodes.iter().any(|node| { |
| 237 | + matches!( |
| 238 | + node, |
| 239 | + LogicalPlanLanguage::Sort(_) | LogicalPlanLanguage::Limit(_) |
| 240 | + ) |
| 241 | + }); |
| 242 | + if input_is_sort_or_limit { |
| 243 | + return false; |
| 244 | + } |
| 245 | + |
| 246 | + let Some(expr_to_alias) = egraph[subst[projection_expr_var]] |
| 247 | + .data |
| 248 | + .expr_to_alias |
| 249 | + .as_deref() |
| 250 | + else { |
| 251 | + return false; |
| 252 | + }; |
| 253 | + |
| 254 | + for projection_alias in var_iter!(egraph[subst[projection_alias_var]], ProjectionAlias) |
| 255 | + { |
| 256 | + let mut column_with_expr = vec![]; |
| 257 | + for (expr, alias, _) in expr_to_alias { |
| 258 | + let column = Column::from_name(alias); |
| 259 | + column_with_expr.push((column, expr)); |
| 260 | + if let Some(projection_alias) = projection_alias.as_deref() { |
| 261 | + let column = Column { |
| 262 | + relation: Some(projection_alias.to_string()), |
| 263 | + name: alias.to_string(), |
| 264 | + }; |
| 265 | + column_with_expr.push((column, expr)); |
| 266 | + } |
| 267 | + } |
| 268 | + |
| 269 | + let column_to_expr = column_with_expr |
| 270 | + .iter() |
| 271 | + .map(|(column, expr)| (column, *expr)) |
| 272 | + .collect::<HashMap<_, _>>(); |
| 273 | + |
| 274 | + let Ok(new_sort_expr) = |
| 275 | + rewrite_sort_expr(egraph, subst[sort_expr_var], &column_to_expr) |
| 276 | + else { |
| 277 | + continue; |
| 278 | + }; |
| 279 | + let Ok(new_sort_expr_id) = insert_sort_expr(new_sort_expr, egraph, flat_list) |
| 280 | + else { |
| 281 | + // Insertion failure should never happen as it can be partial, |
| 282 | + // so fail right away. |
| 283 | + return false; |
| 284 | + }; |
| 285 | + |
| 286 | + subst.insert(new_sort_expr_var, new_sort_expr_id); |
| 287 | + return true; |
| 288 | + } |
| 289 | + false |
| 290 | + } |
| 291 | + } |
| 292 | +} |
| 293 | + |
| 294 | +#[derive(Debug, Clone)] |
| 295 | +enum SortExprNode { |
| 296 | + List(Vec<SortExprNode>), |
| 297 | + Expr(Expr, Id, Id), |
| 298 | +} |
| 299 | + |
| 300 | +fn rewrite_sort_expr( |
| 301 | + egraph: &CubeEGraph, |
| 302 | + id: Id, |
| 303 | + column_to_expr: &HashMap<&Column, &Expr>, |
| 304 | +) -> Result<SortExprNode, CubeError> { |
| 305 | + for node in &egraph[id].nodes { |
| 306 | + match node { |
| 307 | + LogicalPlanLanguage::SortExp(sort_exp) => { |
| 308 | + if let Ok(exprs) = sort_exp |
| 309 | + .iter() |
| 310 | + .map(|id| rewrite_sort_expr(egraph, *id, column_to_expr)) |
| 311 | + .collect::<Result<Vec<_>, _>>() |
| 312 | + { |
| 313 | + return Ok(SortExprNode::List(exprs)); |
| 314 | + } |
| 315 | + } |
| 316 | + LogicalPlanLanguage::SortExpr(sort_expr) => { |
| 317 | + let Some(OriginalExpr::Expr(original_expr)) = |
| 318 | + &egraph[sort_expr[0]].data.original_expr |
| 319 | + else { |
| 320 | + continue; |
| 321 | + }; |
| 322 | + let Ok(rewritten_expr) = replace_col_to_expr(original_expr.clone(), column_to_expr) |
| 323 | + else { |
| 324 | + continue; |
| 325 | + }; |
| 326 | + return Ok(SortExprNode::Expr( |
| 327 | + rewritten_expr, |
| 328 | + sort_expr[1], |
| 329 | + sort_expr[2], |
| 330 | + )); |
| 331 | + } |
| 332 | + _ => continue, |
| 333 | + } |
| 334 | + } |
| 335 | + Err(CubeError::internal( |
| 336 | + "Unable to find replacable SortExp or SortExpr while pushing sort down the projection" |
| 337 | + .to_string(), |
| 338 | + )) |
| 339 | +} |
| 340 | + |
| 341 | +fn insert_sort_expr( |
| 342 | + sort_expr: SortExprNode, |
| 343 | + egraph: &mut CubeEGraph, |
| 344 | + flat_list: bool, |
| 345 | +) -> Result<Id, CubeError> { |
| 346 | + match sort_expr { |
| 347 | + SortExprNode::List(list) => { |
| 348 | + let ids = list |
| 349 | + .into_iter() |
| 350 | + .map(|sort_node| insert_sort_expr(sort_node, egraph, flat_list)) |
| 351 | + .collect::<Result<Vec<_>, _>>()?; |
| 352 | + Ok(egraph.add(LogicalPlanLanguage::SortExp(ids))) |
| 353 | + } |
| 354 | + SortExprNode::Expr(expr, asc_id, nulls_first_id) => { |
| 355 | + let expr_id = LogicalPlanToLanguageConverter::add_expr(egraph, &expr, flat_list)?; |
| 356 | + Ok(egraph.add(LogicalPlanLanguage::SortExpr([ |
| 357 | + expr_id, |
| 358 | + asc_id, |
| 359 | + nulls_first_id, |
| 360 | + ]))) |
| 361 | + } |
| 362 | + } |
176 | 363 | } |
0 commit comments