Skip to content

Commit 8eec60f

Browse files
committed
Reduce cloning in LogicalPlanBuilder
- Migrate function arguments from `LogicalPlan` to `impl Into<Arc<LogicalPlan>>` - Update usages (mostly in tests)
1 parent 155b56e commit 8eec60f

File tree

10 files changed

+223
-222
lines changed

10 files changed

+223
-222
lines changed

datafusion/core/src/physical_planner.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1018,17 +1018,15 @@ impl DefaultPhysicalPlanner {
10181018
let (left, left_col_keys, left_projected) =
10191019
wrap_projection_for_join_if_necessary(
10201020
&left_keys,
1021-
original_left.as_ref().clone(),
1021+
Arc::clone(original_left),
10221022
)?;
10231023
let (right, right_col_keys, right_projected) =
10241024
wrap_projection_for_join_if_necessary(
10251025
&right_keys,
1026-
original_right.as_ref().clone(),
1026+
Arc::clone(original_right),
10271027
)?;
1028-
let column_on = (left_col_keys, right_col_keys);
10291028

1030-
let left = Arc::new(left);
1031-
let right = Arc::new(right);
1029+
let column_on = (left_col_keys, right_col_keys);
10321030
let (new_join, requalified) = Join::try_new_with_project_input(
10331031
node,
10341032
Arc::clone(&left),

datafusion/expr/src/expr_rewriter/mod.rs

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,15 @@ pub use order_by::rewrite_sort_cols_by_aggs;
3636

3737
/// Trait for rewriting [`Expr`]s into function calls.
3838
///
39-
/// This trait is used with `FunctionRegistry::register_function_rewrite` to
39+
/// This trait is used with `FunctionRegistry::register_function_rewrite`
4040
/// to evaluating `Expr`s using functions that may not be built in to DataFusion
4141
///
4242
/// For example, concatenating arrays `a || b` is represented as
4343
/// `Operator::ArrowAt`, but can be implemented by calling a function
4444
/// `array_concat` from the `functions-nested` crate.
4545
// This is not used in datafusion internally, but it is still helpful for downstream project so don't remove it.
4646
pub trait FunctionRewrite: Debug {
47-
/// Return a human readable name for this rewrite
47+
/// Return a human-readable name for this rewrite
4848
fn name(&self) -> &str;
4949

5050
/// Potentially rewrite `expr` to some other expression
@@ -214,26 +214,29 @@ pub fn strip_outer_reference(expr: Expr) -> Expr {
214214
/// Returns plan with expressions coerced to types compatible with
215215
/// schema types
216216
pub fn coerce_plan_expr_for_schema(
217-
plan: LogicalPlan,
217+
plan: Arc<LogicalPlan>,
218218
schema: &DFSchema,
219-
) -> Result<LogicalPlan> {
220-
match plan {
219+
) -> Result<Arc<LogicalPlan>> {
220+
if matches!(plan.as_ref(), LogicalPlan::Projection(_)) {
221221
// special case Projection to avoid adding multiple projections
222-
LogicalPlan::Projection(Projection { expr, input, .. }) => {
223-
let new_exprs = coerce_exprs_for_schema(expr, input.schema(), schema)?;
224-
let projection = Projection::try_new(new_exprs, input)?;
225-
Ok(LogicalPlan::Projection(projection))
226-
}
227-
_ => {
228-
let exprs: Vec<Expr> = plan.schema().iter().map(Expr::from).collect();
229-
let new_exprs = coerce_exprs_for_schema(exprs, plan.schema(), schema)?;
230-
let add_project = new_exprs.iter().any(|expr| expr.try_as_col().is_none());
231-
if add_project {
232-
let projection = Projection::try_new(new_exprs, Arc::new(plan))?;
233-
Ok(LogicalPlan::Projection(projection))
234-
} else {
235-
Ok(plan)
236-
}
222+
let LogicalPlan::Projection(Projection { expr, input, .. }) =
223+
Arc::unwrap_or_clone(plan)
224+
else {
225+
unreachable!()
226+
};
227+
228+
let new_exprs = coerce_exprs_for_schema(expr, input.schema(), schema)?;
229+
let projection = Projection::try_new(new_exprs, input)?;
230+
Ok(Arc::new(LogicalPlan::Projection(projection)))
231+
} else {
232+
let exprs: Vec<Expr> = plan.schema().iter().map(Expr::from).collect();
233+
let new_exprs = coerce_exprs_for_schema(exprs, plan.schema(), schema)?;
234+
let add_project = new_exprs.iter().any(|expr| expr.try_as_col().is_none());
235+
if add_project {
236+
let projection = Projection::try_new(new_exprs, plan)?;
237+
Ok(Arc::new(LogicalPlan::Projection(projection)))
238+
} else {
239+
Ok(plan)
237240
}
238241
}
239242
}
@@ -418,7 +421,7 @@ mod test {
418421
fn normalize_cols() {
419422
let expr = col("a") + col("b") + col("c");
420423

421-
// Schemas with some matching and some non matching cols
424+
// Schemas with some matching and some non-matching cols
422425
let schema_a = make_schema_with_empty_metadata(
423426
vec![Some("tableA".into()), Some("tableA".into())],
424427
vec!["a", "aa"],

0 commit comments

Comments
 (0)