Skip to content

Commit 7803f7e

Browse files
committed
fix: Fix adding missing columns for ORDER BY clause
Signed-off-by: Alex Qyoun-ae <[email protected]>
1 parent 637003a commit 7803f7e

File tree

3 files changed

+58
-28
lines changed

3 files changed

+58
-28
lines changed

datafusion/core/src/logical_plan/builder.rs

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ use crate::logical_plan::{
5151
CrossJoin, DFField, DFSchema, DFSchemaRef, Limit, Partitioning, Repartition,
5252
SubqueryType, Values,
5353
};
54-
use crate::sql::utils::group_window_expr_by_sort_keys;
54+
use crate::sql::utils::{group_window_expr_by_sort_keys, resolve_exprs_to_aliases};
5555

5656
/// Default table name for unnamed table
5757
pub const UNNAMED_TABLE: &str = "?table?";
@@ -549,23 +549,36 @@ impl LogicalPlanBuilder {
549549
&self,
550550
curr_plan: LogicalPlan,
551551
missing_cols: &[Column],
552+
alias_map: &mut HashMap<String, String>,
552553
) -> Result<LogicalPlan> {
553554
match curr_plan {
554555
LogicalPlan::Projection(Projection {
555556
input,
556557
mut expr,
557-
schema: _,
558+
schema,
558559
alias,
559560
}) if missing_cols
560561
.iter()
561562
.all(|c| input.schema().field_from_column(c).is_ok()) =>
562563
{
563564
let input_schema = input.schema();
564565

565-
let missing_exprs = missing_cols
566-
.iter()
567-
.map(|c| normalize_col(Expr::Column(c.clone()), &input))
568-
.collect::<Result<Vec<_>>>()?;
566+
let mut missing_exprs = Vec::with_capacity(missing_cols.len());
567+
for missing_col in missing_cols {
568+
let mut normalized_col =
569+
normalize_col(Expr::Column(missing_col.clone()), &input)?;
570+
if let Ok(old_field) =
571+
schema.field_with_unqualified_name(&missing_col.name)
572+
{
573+
if old_field.qualifier().is_none() {
574+
let expr_name = normalized_col.name(input_schema)?;
575+
let alias = missing_col.flat_name();
576+
normalized_col = normalized_col.alias(&alias);
577+
alias_map.insert(expr_name, alias);
578+
}
579+
}
580+
missing_exprs.push(normalized_col);
581+
}
569582

570583
expr.extend(missing_exprs);
571584

@@ -586,7 +599,11 @@ impl LogicalPlanBuilder {
586599
.inputs()
587600
.into_iter()
588601
.map(|input_plan| {
589-
self.add_missing_columns((*input_plan).clone(), missing_cols)
602+
self.add_missing_columns(
603+
(*input_plan).clone(),
604+
missing_cols,
605+
alias_map,
606+
)
590607
})
591608
.collect::<Result<Vec<_>>>()?;
592609

@@ -607,21 +624,18 @@ impl LogicalPlanBuilder {
607624

608625
// Collect sort columns that are missing in the input plan's schema
609626
let mut missing_cols: Vec<Column> = vec![];
627+
let mut columns: HashSet<Column> = HashSet::new();
610628
exprs
611629
.clone()
612630
.into_iter()
613631
.try_for_each::<_, Result<()>>(|expr| {
614-
let mut columns: HashSet<Column> = HashSet::new();
615-
utils::expr_to_columns(&expr, &mut columns)?;
616-
617-
columns.into_iter().for_each(|c| {
618-
if schema.field_from_column(&c).is_err() {
619-
missing_cols.push(c);
620-
}
621-
});
622-
623-
Ok(())
632+
utils::expr_to_columns(&expr, &mut columns)
624633
})?;
634+
columns.into_iter().for_each(|c| {
635+
if schema.field_from_column(&c).is_err() {
636+
missing_cols.push(c);
637+
}
638+
});
625639

626640
if missing_cols.is_empty() {
627641
return Ok(Self::from(LogicalPlan::Sort(Sort {
@@ -630,7 +644,18 @@ impl LogicalPlanBuilder {
630644
})));
631645
}
632646

633-
let plan = self.add_missing_columns(self.plan.clone(), &missing_cols)?;
647+
let mut alias_map = HashMap::new();
648+
let plan =
649+
self.add_missing_columns(self.plan.clone(), &missing_cols, &mut alias_map)?;
650+
let exprs = if alias_map.is_empty() {
651+
exprs
652+
} else {
653+
exprs
654+
.into_iter()
655+
.map(|expr| resolve_exprs_to_aliases(&expr, &alias_map, plan.schema()))
656+
.collect::<Result<Vec<_>>>()?
657+
};
658+
634659
let sort_plan = LogicalPlan::Sort(Sort {
635660
expr: normalize_cols(exprs, &plan)?,
636661
input: Arc::new(plan.clone()),

datafusion/core/src/logical_plan/expr_rewriter.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -399,19 +399,16 @@ fn rewrite_sort_col_by_aggs(expr: Expr, plan: &LogicalPlan) -> Result<Expr> {
399399
LogicalPlan::Projection(Projection {
400400
input,
401401
expr: projection_expr,
402+
alias,
402403
..
403404
}) => {
404-
let alias_map =
405-
extract_aliased_expr_names(projection_expr, input.schema());
405+
let alias_map = extract_aliased_expr_names(
406+
projection_expr,
407+
input.schema(),
408+
alias.is_some(),
409+
);
406410
let res = resolve_exprs_to_aliases(&expr, &alias_map, input.schema())?;
407-
let res = normalize_col(
408-
unnormalize_col(rebase_expr(
409-
&res,
410-
projection_expr.as_slice(),
411-
input,
412-
)?),
413-
plan,
414-
)?;
411+
let res = rebase_expr(&res, projection_expr.as_slice(), input)?;
415412

416413
Ok(if let LogicalPlan::Aggregate(_) = **input {
417414
rewrite_sort_col(res, input)?

datafusion/core/src/sql/utils.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -573,6 +573,7 @@ pub(crate) fn extract_aliases(exprs: &[Expr]) -> HashMap<String, Expr> {
573573
pub(crate) fn extract_aliased_expr_names(
574574
exprs: &[Expr],
575575
input_schema: &DFSchema,
576+
aliased_projection: bool,
576577
) -> HashMap<String, String> {
577578
exprs
578579
.iter()
@@ -584,6 +585,13 @@ pub(crate) fn extract_aliased_expr_names(
584585
None
585586
}
586587
}
588+
Expr::Column(column) if aliased_projection => {
589+
if let Ok(expr_name) = expr.name(input_schema) {
590+
Some((expr_name, column.name.clone()))
591+
} else {
592+
None
593+
}
594+
}
587595
_ => None,
588596
})
589597
.collect::<HashMap<String, String>>()

0 commit comments

Comments
 (0)