Skip to content

Commit 2d03f7a

Browse files
committed
dev
1 parent daf7088 commit 2d03f7a

File tree

2 files changed

+55
-9
lines changed

2 files changed

+55
-9
lines changed

rust/cubestore/cubestore/src/queryplanner/planning.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,28 @@ impl PlanRewriter for CollectConstraints {
438438
c: &Self::Context,
439439
) -> Result<LogicalPlan, DataFusionError> {
440440
match &n {
441+
LogicalPlan::Projection {
442+
expr,
443+
input,
444+
schema,
445+
} => {
446+
let mut alias_to_column = HashMap::new();
447+
expr.iter().for_each(|e| {
448+
if let Expr::Alias(box Expr::Column(c), alias) = e {
449+
alias_to_column.insert(alias.clone(), c.clone());
450+
}
451+
});
452+
453+
self.constraints.iter_mut().for_each(|c| {
454+
c.sort_on.iter_mut().for_each(|sort_columns| {
455+
sort_columns.sort_on.iter_mut().for_each(|sort_column| {
456+
if let Some(column) = alias_to_column.get(sort_column) {
457+
*sort_column = column.name.clone();
458+
}
459+
});
460+
});
461+
});
462+
}
441463
LogicalPlan::TableScan {
442464
projection,
443465
filters,
@@ -755,6 +777,32 @@ impl PlanRewriter for ChooseIndex<'_> {
755777

756778
fn enter_node(&mut self, n: &LogicalPlan, context: &Self::Context) -> Option<Self::Context> {
757779
match n {
780+
LogicalPlan::Projection { expr, .. } => {
781+
let mut alias_to_column = HashMap::new();
782+
expr.iter().for_each(|e| {
783+
if let Expr::Alias(box Expr::Column(c), alias) = e {
784+
alias_to_column.insert(alias.clone(), c.clone());
785+
}
786+
});
787+
788+
let names: Vec<String> = context
789+
.sort
790+
.clone()
791+
.unwrap_or_default()
792+
.iter()
793+
.map(|k| {
794+
alias_to_column
795+
.get(k)
796+
.map_or_else(|| k.clone(), |v| v.name.clone())
797+
})
798+
.collect();
799+
800+
if !names.is_empty() {
801+
Some(context.update_sort(names, context.sort_is_asc))
802+
} else {
803+
None
804+
}
805+
}
758806
LogicalPlan::Limit { n, .. } => Some(context.update_limit(Some(*n))),
759807
LogicalPlan::Skip { n, .. } => {
760808
if let Some(limit) = context.limit {

rust/cubestore/cubestore/src/streaming/kafka_post_processing.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -481,7 +481,7 @@ impl KafkaPostProcessPlanner {
481481
if let Some(existing_name) = &column_name {
482482
if existing_name != &name {
483483
return Err(CubeError::user(
484-
"Scalar function can only use a single column".to_string(),
484+
format!("Scalar function can only use a single column, expression: {:?}", expr),
485485
));
486486
}
487487
} else {
@@ -500,17 +500,15 @@ impl KafkaPostProcessPlanner {
500500
Expr::Alias(e, _) => match &**e {
501501
Expr::Column(c) => Ok(c.name.clone()),
502502
Expr::ScalarUDF { .. } => find_column_name(expr)?.ok_or_else(|| {
503-
CubeError::user("Scalar function must contain at least one column".to_string())
503+
CubeError::user(format!("Scalar function must contain at least one column, expression: {:?}", expr))
504504
}),
505-
_ => {
506-
println!("unknown expr: {:?}", e);
507-
Err(CubeError::user(format!(
508-
"Unique key can't be an expression in kafka streaming queries"
509-
)))
510-
}
505+
_ => Err(CubeError::user(format!(
506+
"Unique key can't be an expression in kafka streaming queries, expression: {:?}",
507+
expr
508+
))),
511509
},
512510
_ => Err(CubeError::user(
513-
"All expressions must have aliases in kafka streaming queries".to_string(),
511+
format!("All expressions must have aliases in kafka streaming queries, expression: {:?}", expr),
514512
)),
515513
}?;
516514

0 commit comments

Comments
 (0)