Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions rust/cubestore/cubestore-sql-tests/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8001,6 +8001,10 @@ async fn limit_pushdown_without_group(service: Box<dyn SqlClient>) {
.exec_query("CREATE TABLE foo.pushdown_where_group2 (a int, b int, c int) index ind1 (a, b, c) index ind2 (c, b)")
.await
.unwrap();
service
.exec_query("CREATE TABLE foo.pushdown_where_group2_with_alias (a_alias int, b_alias int, c_alias int) index ind1 (a_alias, b_alias, c_alias) index ind2 (c_alias, b_alias)")
.await
.unwrap();
service
.exec_query(
"INSERT INTO foo.pushdown_where_group1
Expand Down Expand Up @@ -8279,6 +8283,22 @@ async fn limit_pushdown_without_group(service: Box<dyn SqlClient>) {
]),
]
);

// ====================================
let res = assert_limit_pushdown(
&service,
"SELECT a, b, c FROM (
SELECT a, b, c FROM foo.pushdown_where_group1
union all
SELECT a_alias a, b_alias b, c_alias c FROM foo.pushdown_where_group2_with_alias
) as `tb`
ORDER BY 3 DESC LIMIT 3",
Some("ind2"),
true,
true,
)
.await
.unwrap();
}
async fn limit_pushdown_without_group_resort(service: Box<dyn SqlClient>) {
service.exec_query("CREATE SCHEMA foo").await.unwrap();
Expand Down
73 changes: 53 additions & 20 deletions rust/cubestore/cubestore/src/queryplanner/planning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,28 @@ impl PlanRewriter for CollectConstraints {
order_col_names: current_context.order_col_names.clone(),
})
}
LogicalPlan::Projection { expr, .. } => {
let alias_to_column = get_alias_to_column(expr);

if let Some(order_col_names) = &current_context.order_col_names {
let names: Vec<String> = order_col_names
.iter()
.map(|k| {
alias_to_column
.get(k)
.map_or_else(|| k.clone(), |v| v.name.clone())
})
.collect();

if !names.is_empty() {
return Some(current_context.update_order_col_names(names));
} else {
return None;
}
}

None
}
LogicalPlan::Sort { expr, input, .. } => {
let (names, _) = sort_to_column_names(expr, input);

Expand Down Expand Up @@ -606,26 +628,15 @@ fn extract_column_name(expr: &Expr) -> Option<String> {
}
}

///Try to get original column namse from if underlined projection or aggregates contains columns aliases
fn get_original_name(may_be_alias: &String, input: &LogicalPlan) -> String {
fn get_name(exprs: &Vec<Expr>, may_be_alias: &String) -> String {
let expr = exprs.iter().find(|&expr| match expr {
Expr::Alias(_, name) => name == may_be_alias,
_ => false,
});
if let Some(expr) = expr {
if let Some(original_name) = extract_column_name(expr) {
return original_name;
}
fn get_alias_to_column(expr: &Vec<Expr>) -> HashMap<String, logical_plan::Column> {
let mut alias_to_column = HashMap::new();
expr.iter().for_each(|e| {
if let Expr::Alias(box Expr::Column(c), alias) = e {
alias_to_column.insert(alias.clone(), c.clone());
}
may_be_alias.clone()
}
match input {
LogicalPlan::Projection { expr, .. } => get_name(expr, may_be_alias),
LogicalPlan::Filter { input, .. } => get_original_name(may_be_alias, input),
LogicalPlan::Aggregate { group_expr, .. } => get_name(group_expr, may_be_alias),
_ => may_be_alias.clone(),
}
});

alias_to_column
}

fn sort_to_column_names(sort_exprs: &Vec<Expr>, input: &LogicalPlan) -> (Vec<String>, bool) {
Expand All @@ -642,7 +653,7 @@ fn sort_to_column_names(sort_exprs: &Vec<Expr>, input: &LogicalPlan) -> (Vec<Str
}
match expr.as_ref() {
Expr::Column(c) => {
res.push(get_original_name(&c.name, input));
res.push(c.name.clone());
}
_ => {
return (Vec::new(), true);
Expand Down Expand Up @@ -755,6 +766,28 @@ impl PlanRewriter for ChooseIndex<'_> {

fn enter_node(&mut self, n: &LogicalPlan, context: &Self::Context) -> Option<Self::Context> {
match n {
LogicalPlan::Projection { expr, .. } => {
let alias_to_column = get_alias_to_column(expr);

if let Some(sort) = &context.sort {
let names: Vec<String> = sort
.clone()
.iter()
.map(|k| {
alias_to_column
.get(k)
.map_or_else(|| k.clone(), |v| v.name.clone())
})
.collect();

if !names.is_empty() {
return Some(context.update_sort(names, context.sort_is_asc));
} else {
return None;
}
}
None
}
LogicalPlan::Limit { n, .. } => Some(context.update_limit(Some(*n))),
LogicalPlan::Skip { n, .. } => {
if let Some(limit) = context.limit {
Expand Down
33 changes: 31 additions & 2 deletions rust/cubestore/cubestore/src/streaming/kafka_post_processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,16 +470,45 @@ impl KafkaPostProcessPlanner {
}

fn get_source_unique_column(&self, expr: &Expr) -> Result<Column, CubeError> {
fn find_column_name(expr: &Expr) -> Result<Option<String>, CubeError> {
match expr {
Expr::Column(c) => Ok(Some(c.name.clone())),
Expr::Alias(e, _) => find_column_name(&**e),
Expr::ScalarUDF { args, .. } => {
let mut column_name: Option<String> = None;
for arg in args {
if let Some(name) = find_column_name(arg)? {
if let Some(existing_name) = &column_name {
if existing_name != &name {
return Err(CubeError::user(
format!("Scalar function can only use a single column, expression: {:?}", expr),
));
}
} else {
column_name = Some(name);
}
}
}
Ok(column_name)
}
_ => Ok(None),
}
}

let source_name = match expr {
Expr::Column(c) => Ok(c.name.clone()),
Expr::Alias(e, _) => match &**e {
Expr::Column(c) => Ok(c.name.clone()),
Expr::ScalarUDF { .. } => find_column_name(expr)?.ok_or_else(|| {
CubeError::user(format!("Scalar function must contain at least one column, expression: {:?}", expr))
}),
_ => Err(CubeError::user(format!(
"Unique key can't be an expression in kafka streaming queries"
"Unique key can't be an expression in kafka streaming queries, expression: {:?}",
expr
))),
},
_ => Err(CubeError::user(
"All expressions must have aliases in kafka streaming queries".to_string(),
format!("All expressions must have aliases in kafka streaming queries, expression: {:?}", expr),
)),
}?;

Expand Down
Loading