Skip to content

Commit 05ec0ae

Browse files
committed
Reduce cloning in LogicalPlanBuilder
- Migrate function arguments from `LogicalPlan` to `impl Into<Arc<LogicalPlan>>` - Update usages (mostly in tests)
1 parent 3aa0ab7 commit 05ec0ae

File tree

9 files changed

+203
-200
lines changed

9 files changed

+203
-200
lines changed

datafusion/core/src/physical_planner.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1054,17 +1054,15 @@ impl DefaultPhysicalPlanner {
10541054
let (left, left_col_keys, left_projected) =
10551055
wrap_projection_for_join_if_necessary(
10561056
&left_keys,
1057-
original_left.as_ref().clone(),
1057+
Arc::clone(original_left),
10581058
)?;
10591059
let (right, right_col_keys, right_projected) =
10601060
wrap_projection_for_join_if_necessary(
10611061
&right_keys,
1062-
original_right.as_ref().clone(),
1062+
Arc::clone(original_right),
10631063
)?;
1064-
let column_on = (left_col_keys, right_col_keys);
10651064

1066-
let left = Arc::new(left);
1067-
let right = Arc::new(right);
1065+
let column_on = (left_col_keys, right_col_keys);
10681066
let (new_join, requalified) = Join::try_new_with_project_input(
10691067
node,
10701068
Arc::clone(&left),

datafusion/expr/src/expr_rewriter/mod.rs

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,15 @@ pub use order_by::rewrite_sort_cols_by_aggs;
4141

4242
/// Trait for rewriting [`Expr`]s into function calls.
4343
///
44-
/// This trait is used with `FunctionRegistry::register_function_rewrite` to
44+
/// This trait is used with `FunctionRegistry::register_function_rewrite`
4545
/// to evaluating `Expr`s using functions that may not be built in to DataFusion
4646
///
4747
/// For example, concatenating arrays `a || b` is represented as
4848
/// `Operator::ArrowAt`, but can be implemented by calling a function
4949
/// `array_concat` from the `functions-nested` crate.
5050
// This is not used in datafusion internally, but it is still helpful for downstream project so don't remove it.
5151
pub trait FunctionRewrite: Debug {
52-
/// Return a human readable name for this rewrite
52+
/// Return a human-readable name for this rewrite
5353
fn name(&self) -> &str;
5454

5555
/// Potentially rewrite `expr` to some other expression
@@ -219,26 +219,29 @@ pub fn strip_outer_reference(expr: Expr) -> Expr {
219219
/// Returns plan with expressions coerced to types compatible with
220220
/// schema types
221221
pub fn coerce_plan_expr_for_schema(
222-
plan: LogicalPlan,
222+
plan: Arc<LogicalPlan>,
223223
schema: &DFSchema,
224-
) -> Result<LogicalPlan> {
225-
match plan {
224+
) -> Result<Arc<LogicalPlan>> {
225+
if matches!(plan.as_ref(), LogicalPlan::Projection(_)) {
226226
// special case Projection to avoid adding multiple projections
227-
LogicalPlan::Projection(Projection { expr, input, .. }) => {
228-
let new_exprs = coerce_exprs_for_schema(expr, input.schema(), schema)?;
229-
let projection = Projection::try_new(new_exprs, input)?;
230-
Ok(LogicalPlan::Projection(projection))
231-
}
232-
_ => {
233-
let exprs: Vec<Expr> = plan.schema().iter().map(Expr::from).collect();
234-
let new_exprs = coerce_exprs_for_schema(exprs, plan.schema(), schema)?;
235-
let add_project = new_exprs.iter().any(|expr| expr.try_as_col().is_none());
236-
if add_project {
237-
let projection = Projection::try_new(new_exprs, Arc::new(plan))?;
238-
Ok(LogicalPlan::Projection(projection))
239-
} else {
240-
Ok(plan)
241-
}
227+
let LogicalPlan::Projection(Projection { expr, input, .. }) =
228+
Arc::unwrap_or_clone(plan)
229+
else {
230+
unreachable!()
231+
};
232+
233+
let new_exprs = coerce_exprs_for_schema(expr, input.schema(), schema)?;
234+
let projection = Projection::try_new(new_exprs, input)?;
235+
Ok(Arc::new(LogicalPlan::Projection(projection)))
236+
} else {
237+
let exprs: Vec<Expr> = plan.schema().iter().map(Expr::from).collect();
238+
let new_exprs = coerce_exprs_for_schema(exprs, plan.schema(), schema)?;
239+
let add_project = new_exprs.iter().any(|expr| expr.try_as_col().is_none());
240+
if add_project {
241+
let projection = Projection::try_new(new_exprs, plan)?;
242+
Ok(Arc::new(LogicalPlan::Projection(projection)))
243+
} else {
244+
Ok(plan)
242245
}
243246
}
244247
}
@@ -427,7 +430,7 @@ mod test {
427430
fn normalize_cols() {
428431
let expr = col("a") + col("b") + col("c");
429432

430-
// Schemas with some matching and some non matching cols
433+
// Schemas with some matching and some non-matching cols
431434
let schema_a = make_schema_with_empty_metadata(
432435
vec![Some("tableA".into()), Some("tableA".into())],
433436
vec!["a", "aa"],

0 commit comments

Comments
 (0)