Skip to content

Commit a951fc9

Browse files
authored
Use a struct for ProjectionExpr (#17398)
1 parent 24e2f7f commit a951fc9

File tree

18 files changed

+476
-245
lines changed

18 files changed

+476
-245
lines changed

datafusion/core/src/physical_planner.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ use crate::physical_plan::joins::{
4545
CrossJoinExec, HashJoinExec, NestedLoopJoinExec, PartitionMode, SortMergeJoinExec,
4646
};
4747
use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
48-
use crate::physical_plan::projection::ProjectionExec;
48+
use crate::physical_plan::projection::{ProjectionExec, ProjectionExpr};
4949
use crate::physical_plan::repartition::RepartitionExec;
5050
use crate::physical_plan::sorts::sort::SortExec;
5151
use crate::physical_plan::union::UnionExec;
@@ -2185,17 +2185,25 @@ impl DefaultPhysicalPlanner {
21852185
PlannedExprResult::ExprWithName(physical_exprs),
21862186
input_physical_schema.as_ref(),
21872187
)? {
2188-
PlanAsyncExpr::Sync(PlannedExprResult::ExprWithName(physical_exprs)) => Ok(
2189-
Arc::new(ProjectionExec::try_new(physical_exprs, input_exec)?),
2190-
),
2188+
PlanAsyncExpr::Sync(PlannedExprResult::ExprWithName(physical_exprs)) => {
2189+
let proj_exprs: Vec<ProjectionExpr> = physical_exprs
2190+
.into_iter()
2191+
.map(|(expr, alias)| ProjectionExpr { expr, alias })
2192+
.collect();
2193+
Ok(Arc::new(ProjectionExec::try_new(proj_exprs, input_exec)?))
2194+
}
21912195
PlanAsyncExpr::Async(
21922196
async_map,
21932197
PlannedExprResult::ExprWithName(physical_exprs),
21942198
) => {
21952199
let async_exec =
21962200
AsyncFuncExec::try_new(async_map.async_exprs, input_exec)?;
2201+
let proj_exprs: Vec<ProjectionExpr> = physical_exprs
2202+
.into_iter()
2203+
.map(|(expr, alias)| ProjectionExpr { expr, alias })
2204+
.collect();
21972205
let new_proj_exec =
2198-
ProjectionExec::try_new(physical_exprs, Arc::new(async_exec))?;
2206+
ProjectionExec::try_new(proj_exprs, Arc::new(async_exec))?;
21992207
Ok(Arc::new(new_proj_exec))
22002208
}
22012209
_ => internal_err!("Unexpected PlanAsyncExpressions variant"),
@@ -2700,7 +2708,7 @@ mod tests {
27002708
let execution_plan = plan(&logical_plan).await?;
27012709
// verify that the plan correctly adds cast from Int64(1) to Utf8, and the const will be evaluated.
27022710

2703-
let expected = "expr: [(BinaryExpr { left: BinaryExpr { left: Column { name: \"c1\", index: 0 }, op: Eq, right: Literal { value: Utf8(\"a\"), field: Field { name: \"lit\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, fail_on_overflow: false }, op: Or, right: BinaryExpr { left: Column { name: \"c1\", index: 0 }, op: Eq, right: Literal { value: Utf8(\"1\"), field: Field { name: \"lit\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, fail_on_overflow: false }, fail_on_overflow: false }";
2711+
let expected = "expr: [ProjectionExpr { expr: BinaryExpr { left: BinaryExpr { left: Column { name: \"c1\", index: 0 }, op: Eq, right: Literal { value: Utf8(\"a\"), field: Field { name: \"lit\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, fail_on_overflow: false }, op: Or, right: BinaryExpr { left: Column { name: \"c1\", index: 0 }, op: Eq, right: Literal { value: Utf8(\"1\"), field: Field { name: \"lit\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, fail_on_overflow: false }, fail_on_overflow: false }";
27042712

27052713
let actual = format!("{execution_plan:?}");
27062714
assert!(actual.contains(expected), "{}", actual);

datafusion/core/tests/execution/coop.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ use datafusion_physical_plan::coop::make_cooperative;
4646
use datafusion_physical_plan::filter::FilterExec;
4747
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode, SortMergeJoinExec};
4848
use datafusion_physical_plan::memory::{LazyBatchGenerator, LazyMemoryExec};
49-
use datafusion_physical_plan::projection::ProjectionExec;
49+
use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr};
5050
use datafusion_physical_plan::repartition::RepartitionExec;
5151
use datafusion_physical_plan::sorts::sort::SortExec;
5252
use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
@@ -651,7 +651,7 @@ async fn join_agg_yields(
651651
// Project only one column (“value” from the left side) because we just want to sum that
652652
let input_schema = join.schema();
653653

654-
let proj_expr = vec![(
654+
let proj_expr = vec![ProjectionExpr::new(
655655
Arc::new(Column::new_with_schema("value", &input_schema)?) as _,
656656
"value".to_string(),
657657
)];

datafusion/core/tests/physical_optimizer/enforce_distribution.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ use datafusion_physical_plan::expressions::col;
6262
use datafusion_physical_plan::filter::FilterExec;
6363
use datafusion_physical_plan::joins::utils::JoinOn;
6464
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
65-
use datafusion_physical_plan::projection::ProjectionExec;
65+
use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr};
6666
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
6767
use datafusion_physical_plan::union::UnionExec;
6868
use datafusion_physical_plan::{
@@ -243,7 +243,10 @@ fn projection_exec_with_alias(
243243
) -> Arc<dyn ExecutionPlan> {
244244
let mut exprs = vec![];
245245
for (column, alias) in alias_pairs.iter() {
246-
exprs.push((col(column, &input.schema()).unwrap(), alias.to_string()));
246+
exprs.push(ProjectionExpr {
247+
expr: col(column, &input.schema()).unwrap(),
248+
alias: alias.to_string(),
249+
});
247250
}
248251
Arc::new(ProjectionExec::try_new(exprs, input).unwrap())
249252
}
@@ -2207,14 +2210,14 @@ fn repartition_does_not_destroy_sort_more_complex() -> Result<()> {
22072210
#[test]
22082211
fn repartition_transitively_with_projection() -> Result<()> {
22092212
let schema = schema();
2210-
let proj_exprs = vec![(
2211-
Arc::new(BinaryExpr::new(
2213+
let proj_exprs = vec![ProjectionExpr {
2214+
expr: Arc::new(BinaryExpr::new(
22122215
col("a", &schema)?,
22132216
Operator::Plus,
22142217
col("b", &schema)?,
22152218
)) as _,
2216-
"sum".to_string(),
2217-
)];
2219+
alias: "sum".to_string(),
2220+
}];
22182221
// non sorted input
22192222
let proj = Arc::new(ProjectionExec::try_new(proj_exprs, parquet_exec())?);
22202223
let sort_key = [PhysicalSortExpr {

datafusion/core/tests/physical_optimizer/join_selection.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -235,12 +235,12 @@ async fn test_join_with_swap() {
235235
.expect("A proj is required to swap columns back to their original order");
236236

237237
assert_eq!(swapping_projection.expr().len(), 2);
238-
let (col, name) = &swapping_projection.expr()[0];
239-
assert_eq!(name, "big_col");
240-
assert_col_expr(col, "big_col", 1);
241-
let (col, name) = &swapping_projection.expr()[1];
242-
assert_eq!(name, "small_col");
243-
assert_col_expr(col, "small_col", 0);
238+
let proj_expr = &swapping_projection.expr()[0];
239+
assert_eq!(proj_expr.alias, "big_col");
240+
assert_col_expr(&proj_expr.expr, "big_col", 1);
241+
let proj_expr = &swapping_projection.expr()[1];
242+
assert_eq!(proj_expr.alias, "small_col");
243+
assert_col_expr(&proj_expr.expr, "small_col", 0);
244244

245245
let swapped_join = swapping_projection
246246
.input()
@@ -526,12 +526,12 @@ async fn test_nl_join_with_swap(join_type: JoinType) {
526526
.expect("A proj is required to swap columns back to their original order");
527527

528528
assert_eq!(swapping_projection.expr().len(), 2);
529-
let (col, name) = &swapping_projection.expr()[0];
530-
assert_eq!(name, "big_col");
531-
assert_col_expr(col, "big_col", 1);
532-
let (col, name) = &swapping_projection.expr()[1];
533-
assert_eq!(name, "small_col");
534-
assert_col_expr(col, "small_col", 0);
529+
let proj_expr = &swapping_projection.expr()[0];
530+
assert_eq!(proj_expr.alias, "big_col");
531+
assert_col_expr(&proj_expr.expr, "big_col", 1);
532+
let proj_expr = &swapping_projection.expr()[1];
533+
assert_eq!(proj_expr.alias, "small_col");
534+
assert_col_expr(&proj_expr.expr, "small_col", 0);
535535

536536
let swapped_join = swapping_projection
537537
.input()

datafusion/core/tests/physical_optimizer/partition_statistics.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ mod test {
4747
use datafusion_physical_plan::joins::CrossJoinExec;
4848
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
4949
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
50-
use datafusion_physical_plan::projection::ProjectionExec;
50+
use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr};
5151
use datafusion_physical_plan::repartition::RepartitionExec;
5252
use datafusion_physical_plan::sorts::sort::SortExec;
5353
use datafusion_physical_plan::union::UnionExec;
@@ -235,8 +235,10 @@ mod test {
235235
async fn test_statistics_by_partition_of_projection() -> Result<()> {
236236
let scan = create_scan_exec_with_statistics(None, Some(2)).await;
237237
// Add projection execution plan
238-
let exprs: Vec<(Arc<dyn PhysicalExpr>, String)> =
239-
vec![(Arc::new(Column::new("id", 0)), "id".to_string())];
238+
let exprs = vec![ProjectionExpr {
239+
expr: Arc::new(Column::new("id", 0)) as Arc<dyn PhysicalExpr>,
240+
alias: "id".to_string(),
241+
}];
240242
let projection: Arc<dyn ExecutionPlan> =
241243
Arc::new(ProjectionExec::try_new(exprs, scan)?);
242244
let statistics = (0..projection.output_partitioning().partition_count())

0 commit comments

Comments
 (0)