Skip to content

Commit 9c3b201

Browse files
committed
Address renaming for columns that are not in the top level as well
1 parent b1ccf2c commit 9c3b201

File tree

1 file changed

+31
-21
lines changed

1 file changed

+31
-21
lines changed

datafusion/core/src/physical_planner.rs

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,9 @@ use arrow::array::{builder::StringBuilder, RecordBatch};
6262
use arrow::compute::SortOptions;
6363
use arrow::datatypes::{Schema, SchemaRef};
6464
use datafusion_common::display::ToStringifiedPlan;
65-
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
65+
use datafusion_common::tree_node::{
66+
Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeVisitor,
67+
};
6668
use datafusion_common::{
6769
exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema,
6870
ScalarValue,
@@ -2069,29 +2071,37 @@ fn maybe_fix_physical_column_name(
20692071
expr: Result<Arc<dyn PhysicalExpr>>,
20702072
input_physical_schema: &SchemaRef,
20712073
) -> Result<Arc<dyn PhysicalExpr>> {
2072-
if let Ok(e) = &expr {
2073-
if let Some(column) = e.as_any().downcast_ref::<Column>() {
2074-
let physical_field = input_physical_schema.field(column.index());
2075-
let expr_col_name = column.name();
2076-
let physical_name = physical_field.name();
2077-
2078-
if physical_name != expr_col_name {
2079-
// handle edge cases where the physical_name contains ':'.
2080-
let colon_count = physical_name.matches(':').count();
2081-
let mut splits = expr_col_name.match_indices(':');
2082-
let split_pos = splits.nth(colon_count);
2083-
2084-
if let Some((idx, _)) = split_pos {
2085-
let base_name = &expr_col_name[..idx];
2086-
if base_name == physical_name {
2087-
let updated_column = Column::new(physical_name, column.index());
2088-
return Ok(Arc::new(updated_column));
2074+
expr.and_then(|e| {
2075+
e.transform_down(|node| {
2076+
if let Some(column) = node.as_any().downcast_ref::<Column>() {
2077+
let idx = column.index();
2078+
let physical_field = input_physical_schema.field(idx);
2079+
let expr_col_name = column.name();
2080+
let physical_name = physical_field.name();
2081+
2082+
if expr_col_name != physical_name {
2083+
// handle edge cases where the physical_name contains ':'.
2084+
let colon_count = physical_name.matches(':').count();
2085+
let mut splits = expr_col_name.match_indices(':');
2086+
let split_pos = splits.nth(colon_count);
2087+
2088+
if let Some((i, _)) = split_pos {
2089+
let base_name = &expr_col_name[..i];
2090+
if base_name == physical_name {
2091+
let updated_column = Column::new(physical_name, idx);
2092+
return Ok(Transformed::yes(Arc::new(updated_column)));
2093+
}
20892094
}
20902095
}
2096+
2097+
// If names already match or fix is not possible, just leave it as it is
2098+
Ok(Transformed::no(node))
2099+
} else {
2100+
Ok(Transformed::no(node))
20912101
}
2092-
}
2093-
}
2094-
expr
2102+
})
2103+
.data()
2104+
})
20952105
}
20962106

20972107
struct OptimizationInvariantChecker<'a> {

0 commit comments

Comments
 (0)