Skip to content

Commit 77b89fe

Browse files
committed
dev
1 parent 232c626 commit 77b89fe

File tree

2 files changed

+150
-20
lines changed

2 files changed

+150
-20
lines changed

rust/cubestore/cubestore-sql-tests/src/tests.rs

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8001,6 +8001,18 @@ async fn limit_pushdown_without_group(service: Box<dyn SqlClient>) {
80018001
.exec_query("CREATE TABLE foo.pushdown_where_group2 (a int, b int, c int) index ind1 (a, b, c) index ind2 (c, b)")
80028002
.await
80038003
.unwrap();
8004+
service
8005+
.exec_query("CREATE TABLE foo.pushdown_where_group2_with_alias (a_alias int, b_alias int, c_alias int) index ind1 (a_alias, b_alias, c_alias) index ind2 (c_alias, b_alias)")
8006+
.await
8007+
.unwrap();
8008+
service
8009+
.exec_query("CREATE TABLE foo.pushdown_where_group3_with_alias (a_alias int, b_alias int, c_alias int) index ind1 (c_alias, b_alias)")
8010+
.await
8011+
.unwrap();
8012+
service
8013+
.exec_query("CREATE TABLE foo.pushdown_where_group4_with_alias (a_alias_2 int, b_alias_2 int, c_alias_2 int) index ind1 (c_alias_2, b_alias_2)")
8014+
.await
8015+
.unwrap();
80048016
service
80058017
.exec_query(
80068018
"INSERT INTO foo.pushdown_where_group1
@@ -8279,6 +8291,80 @@ async fn limit_pushdown_without_group(service: Box<dyn SqlClient>) {
82798291
]),
82808292
]
82818293
);
8294+
8295+
// ====================================
8296+
assert_limit_pushdown(
8297+
&service,
8298+
"SELECT a, b, c FROM (
8299+
SELECT a, b, c FROM foo.pushdown_where_group1
8300+
UNION ALL
8301+
SELECT a_alias a, b_alias b, c_alias c FROM foo.pushdown_where_group2_with_alias
8302+
) as `tb`
8303+
ORDER BY 3 DESC
8304+
LIMIT 10",
8305+
Some("ind2"),
8306+
true,
8307+
true,
8308+
)
8309+
.await
8310+
.unwrap();
8311+
8312+
// ====================================
8313+
assert_limit_pushdown(
8314+
&service,
8315+
"SELECT a, b, c FROM (
8316+
SELECT a, b, c FROM foo.pushdown_where_group1
8317+
UNION ALL
8318+
SELECT a_alias a, b_alias b, c_alias c FROM foo.pushdown_where_group2_with_alias
8319+
) as `tb`
8320+
WHERE b = 20
8321+
ORDER BY 1 DESC, 3 DESC
8322+
LIMIT 3",
8323+
Some("ind1"),
8324+
true,
8325+
true,
8326+
)
8327+
.await
8328+
.unwrap();
8329+
8330+
// ====================================
8331+
// TODO: theses cases still don't use an optimal index
8332+
// Filters outside the index are a priority right now.
8333+
// The second problem is that ORDER BY does not affect the score when selecting an index
8334+
// assert_limit_pushdown(
8335+
// &service,
8336+
// "SELECT a, b, c FROM (
8337+
// SELECT a_alias a, b_alias b, c_alias c FROM foo.pushdown_where_group3_with_alias
8338+
// UNION ALL
8339+
// SELECT a_alias_2 a, b_alias_2 b, c_alias_2 c FROM foo.pushdown_where_group4_with_alias
8340+
// ) as `tb`
8341+
// WHERE a = 20
8342+
// ORDER BY 3 DESC
8343+
// LIMIT 3",
8344+
// Some("ind1"),
8345+
// true,
8346+
// true,
8347+
// )
8348+
// .await
8349+
// .unwrap();
8350+
8351+
// ====================================
8352+
// assert_limit_pushdown(
8353+
// &service,
8354+
// "SELECT a, b, c FROM (
8355+
// SELECT a_alias a, b_alias b, c_alias c FROM foo.pushdown_where_group3_with_alias
8356+
// UNION ALL
8357+
// SELECT a_alias_2 a, b_alias_2 b, c_alias_2 c FROM foo.pushdown_where_group4_with_alias
8358+
// ) as `tb`
8359+
// WHERE a > 20
8360+
// ORDER BY 3 DESC
8361+
// LIMIT 3",
8362+
// Some("ind1"),
8363+
// true,
8364+
// true,
8365+
// )
8366+
// .await
8367+
// .unwrap();
82828368
}
82838369
async fn limit_pushdown_without_group_resort(service: Box<dyn SqlClient>) {
82848370
service.exec_query("CREATE SCHEMA foo").await.unwrap();

rust/cubestore/cubestore/src/queryplanner/planning.rs

Lines changed: 64 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -519,6 +519,28 @@ impl PlanRewriter for CollectConstraints {
519519
order_col_names: current_context.order_col_names.clone(),
520520
})
521521
}
522+
LogicalPlan::Projection { expr, .. } => {
523+
let alias_to_column = get_alias_to_column(expr);
524+
525+
if let Some(order_col_names) = &current_context.order_col_names {
526+
let names: Vec<String> = order_col_names
527+
.iter()
528+
.map(|k| {
529+
alias_to_column
530+
.get(k)
531+
.map_or_else(|| k.clone(), |v| v.name.clone())
532+
})
533+
.collect();
534+
535+
if !names.is_empty() {
536+
return Some(current_context.update_order_col_names(names));
537+
} else {
538+
return None;
539+
}
540+
}
541+
542+
None
543+
}
522544
LogicalPlan::Sort { expr, input, .. } => {
523545
let (names, _) = sort_to_column_names(expr, input);
524546

@@ -606,26 +628,15 @@ fn extract_column_name(expr: &Expr) -> Option<String> {
606628
}
607629
}
608630

609-
///Try to get original column namse from if underlined projection or aggregates contains columns aliases
610-
fn get_original_name(may_be_alias: &String, input: &LogicalPlan) -> String {
611-
fn get_name(exprs: &Vec<Expr>, may_be_alias: &String) -> String {
612-
let expr = exprs.iter().find(|&expr| match expr {
613-
Expr::Alias(_, name) => name == may_be_alias,
614-
_ => false,
615-
});
616-
if let Some(expr) = expr {
617-
if let Some(original_name) = extract_column_name(expr) {
618-
return original_name;
619-
}
631+
fn get_alias_to_column(expr: &Vec<Expr>) -> HashMap<String, logical_plan::Column> {
632+
let mut alias_to_column = HashMap::new();
633+
expr.iter().for_each(|e| {
634+
if let Expr::Alias(box Expr::Column(c), alias) = e {
635+
alias_to_column.insert(alias.clone(), c.clone());
620636
}
621-
may_be_alias.clone()
622-
}
623-
match input {
624-
LogicalPlan::Projection { expr, .. } => get_name(expr, may_be_alias),
625-
LogicalPlan::Filter { input, .. } => get_original_name(may_be_alias, input),
626-
LogicalPlan::Aggregate { group_expr, .. } => get_name(group_expr, may_be_alias),
627-
_ => may_be_alias.clone(),
628-
}
637+
});
638+
639+
alias_to_column
629640
}
630641

631642
fn sort_to_column_names(sort_exprs: &Vec<Expr>, input: &LogicalPlan) -> (Vec<String>, bool) {
@@ -642,7 +653,7 @@ fn sort_to_column_names(sort_exprs: &Vec<Expr>, input: &LogicalPlan) -> (Vec<Str
642653
}
643654
match expr.as_ref() {
644655
Expr::Column(c) => {
645-
res.push(get_original_name(&c.name, input));
656+
res.push(c.name.clone());
646657
}
647658
_ => {
648659
return (Vec::new(), true);
@@ -755,6 +766,39 @@ impl PlanRewriter for ChooseIndex<'_> {
755766

756767
fn enter_node(&mut self, n: &LogicalPlan, context: &Self::Context) -> Option<Self::Context> {
757768
match n {
769+
LogicalPlan::Projection { expr, .. } => {
770+
let alias_to_column = get_alias_to_column(expr);
771+
772+
let new_single_value_filtered_cols = context
773+
.single_value_filtered_cols
774+
.iter()
775+
.map(|name| {
776+
alias_to_column
777+
.get(name)
778+
.map_or_else(|| name.clone(), |col| col.name.clone())
779+
})
780+
.collect();
781+
782+
let mut new_context =
783+
context.update_single_value_filtered_cols(new_single_value_filtered_cols);
784+
785+
if let Some(sort) = &new_context.sort {
786+
let names: Vec<String> = sort
787+
.iter()
788+
.map(|k| {
789+
alias_to_column
790+
.get(k)
791+
.map_or_else(|| k.clone(), |col| col.name.clone())
792+
})
793+
.collect();
794+
795+
if !names.is_empty() {
796+
new_context = new_context.update_sort(names, context.sort_is_asc);
797+
}
798+
}
799+
800+
Some(new_context)
801+
}
758802
LogicalPlan::Limit { n, .. } => Some(context.update_limit(Some(*n))),
759803
LogicalPlan::Skip { n, .. } => {
760804
if let Some(limit) = context.limit {

0 commit comments

Comments
 (0)