Skip to content

Commit ffa7a73

Browse files
committed
initial implementation of projection pushdown for nexted expressions
1 parent bb4e0ec commit ffa7a73

File tree

11 files changed

+93
-25
lines changed

11 files changed

+93
-25
lines changed

datafusion/common/src/column.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use std::fmt;
3030
pub struct Column {
3131
/// relation/table reference.
3232
pub relation: Option<TableReference>,
33-
/// field/column name.
33+
/// Field/column name.
3434
pub name: String,
3535
/// Original source code location, if known
3636
pub spans: Spans,

datafusion/expr/src/expr.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1883,6 +1883,28 @@ impl Expr {
18831883
}
18841884
}
18851885

1886+
/// Returns true if this expression is trivial (cheap to evaluate).
1887+
///
1888+
/// Trivial expressions include column references, literals, and nested
1889+
/// field access via `get_field`.
1890+
///
1891+
/// # Example
1892+
/// ```
1893+
/// # use datafusion_expr::col;
1894+
/// let expr = col("foo");
1895+
/// assert!(expr.is_trivial());
1896+
/// ```
1897+
pub fn is_trivial(&self) -> bool {
1898+
match self {
1899+
Expr::Column(_) | Expr::Literal(_, _) => true,
1900+
Expr::ScalarFunction(func) => {
1901+
func.func.is_trivial()
1902+
&& func.args.first().is_some_and(|arg| arg.is_trivial())
1903+
}
1904+
_ => false,
1905+
}
1906+
}
1907+
18861908
/// Return all references to columns in this expression.
18871909
///
18881910
/// # Example

datafusion/expr/src/udf.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,11 @@ impl ScalarUDF {
122122
Self { inner: fun }
123123
}
124124

125+
/// Returns true if this function is trivial (cheap to evaluate).
126+
pub fn is_trivial(&self) -> bool {
127+
self.inner.is_trivial()
128+
}
129+
125130
/// Return the underlying [`ScalarUDFImpl`] trait object for this function
126131
pub fn inner(&self) -> &Arc<dyn ScalarUDFImpl> {
127132
&self.inner
@@ -846,6 +851,18 @@ pub trait ScalarUDFImpl: Debug + DynEq + DynHash + Send + Sync {
846851
fn documentation(&self) -> Option<&Documentation> {
847852
None
848853
}
854+
855+
/// Returns true if this function is trivial (cheap to evaluate).
856+
///
857+
/// Trivial functions are lightweight accessor functions like `get_field`
858+
/// (struct field access) that simply access nested data within a column
859+
/// without significant computation.
860+
///
861+
/// This is used to identify expressions that are cheap to duplicate or
862+
/// don't benefit from caching/partitioning optimizations.
863+
fn is_trivial(&self) -> bool {
864+
false
865+
}
849866
}
850867

851868
/// ScalarUDF that adds an alias to the underlying function. It is better to

datafusion/functions/src/core/getfield.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -499,6 +499,10 @@ impl ScalarUDFImpl for GetFieldFunc {
499499
fn documentation(&self) -> Option<&Documentation> {
500500
self.doc()
501501
}
502+
503+
fn is_trivial(&self) -> bool {
504+
true
505+
}
502506
}
503507

504508
#[cfg(test)]

datafusion/optimizer/src/optimize_projections/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -593,7 +593,7 @@ fn merge_consecutive_projections(proj: Projection) -> Result<Transformed<Project
593593

594594
// Check whether `expr` is trivial; i.e. it doesn't imply any computation.
595595
fn is_expr_trivial(expr: &Expr) -> bool {
596-
matches!(expr, Expr::Column(_) | Expr::Literal(_, _))
596+
expr.is_trivial()
597597
}
598598

599599
/// Rewrites a projection expression using the projection before it (i.e. its input)

datafusion/physical-expr-common/src/physical_expr.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,20 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash {
430430
fn is_volatile_node(&self) -> bool {
431431
false
432432
}
433+
434+
/// Returns true if this expression is trivial (cheap to evaluate).
435+
///
436+
/// Trivial expressions include:
437+
/// - Column references
438+
/// - Literal values
439+
/// - Struct field access via `get_field`
440+
/// - Nested combinations of field accessors (e.g., `col['a']['b']`)
441+
///
442+
/// This is used to identify expressions that are cheap to duplicate or
443+
/// don't benefit from caching/partitioning optimizations.
444+
fn is_trivial(&self) -> bool {
445+
false
446+
}
433447
}
434448

435449
#[deprecated(

datafusion/physical-expr/src/expressions/column.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,10 @@ impl PhysicalExpr for Column {
146146
fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
147147
write!(f, "{}", self.name)
148148
}
149+
150+
fn is_trivial(&self) -> bool {
151+
true
152+
}
149153
}
150154

151155
impl Column {

datafusion/physical-expr/src/expressions/literal.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,10 @@ impl PhysicalExpr for Literal {
134134
fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
135135
std::fmt::Display::fmt(self, f)
136136
}
137+
138+
fn is_trivial(&self) -> bool {
139+
true
140+
}
137141
}
138142

139143
/// Create a literal expression

datafusion/physical-expr/src/scalar_function.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,13 @@ impl PhysicalExpr for ScalarFunctionExpr {
366366
fn is_volatile_node(&self) -> bool {
367367
self.fun.signature().volatility == Volatility::Volatile
368368
}
369+
370+
fn is_trivial(&self) -> bool {
371+
if !self.fun.is_trivial() {
372+
return false;
373+
}
374+
self.args.iter().all(|arg| arg.is_trivial())
375+
}
369376
}
370377

371378
#[cfg(test)]

datafusion/physical-plan/src/projection.rs

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
//! of a projection on table `t1` where the expressions `a`, `b`, and `a+b` are the
2121
//! projection expressions. `SELECT` without `FROM` will only evaluate expressions.
2222
23-
use super::expressions::{Column, Literal};
23+
use super::expressions::Column;
2424
use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
2525
use super::{
2626
DisplayAs, ExecutionPlanProperties, PlanProperties, RecordBatchStream,
@@ -253,18 +253,16 @@ impl ExecutionPlan for ProjectionExec {
253253
}
254254

255255
fn benefits_from_input_partitioning(&self) -> Vec<bool> {
256-
let all_simple_exprs =
257-
self.projector
258-
.projection()
259-
.as_ref()
260-
.iter()
261-
.all(|proj_expr| {
262-
proj_expr.expr.as_any().is::<Column>()
263-
|| proj_expr.expr.as_any().is::<Literal>()
264-
});
265-
// If expressions are all either column_expr or Literal, then all computations in this projection are reorder or rename,
266-
// and projection would not benefit from the repartition, benefits_from_input_partitioning will return false.
267-
vec![!all_simple_exprs]
256+
let all_trivial_exprs = self
257+
.projector
258+
.projection()
259+
.as_ref()
260+
.iter()
261+
.all(|proj_expr| proj_expr.expr.is_trivial());
262+
// If expressions are all trivial (columns, literals, or field accessors),
263+
// then all computations in this projection are reorder or rename,
264+
// and projection would not benefit from the repartition.
265+
vec![!all_trivial_exprs]
268266
}
269267

270268
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
@@ -630,11 +628,10 @@ pub fn make_with_child(
630628
.map(|e| Arc::new(e) as _)
631629
}
632630

633-
/// Returns `true` if all the expressions in the argument are `Column`s.
634-
pub fn all_columns(exprs: &[ProjectionExpr]) -> bool {
635-
exprs
636-
.iter()
637-
.all(|proj_expr| proj_expr.expr.as_any().is::<Column>())
631+
/// Returns `true` if all the expressions in the argument are trivial
632+
/// (columns, literals, or field accessors).
633+
pub fn all_trivial(exprs: &[ProjectionExpr]) -> bool {
634+
exprs.iter().all(|proj_expr| proj_expr.expr.is_trivial())
638635
}
639636

640637
/// Updates the given lexicographic ordering according to given projected
@@ -990,10 +987,9 @@ fn new_columns_for_join_on(
990987
}
991988

992989
/// Checks if the given expression is trivial.
993-
/// An expression is considered trivial if it is either a `Column` or a `Literal`.
990+
/// An expression is considered trivial if it is a `Column`, `Literal`, or field accessor.
994991
fn is_expr_trivial(expr: &Arc<dyn PhysicalExpr>) -> bool {
995-
expr.as_any().downcast_ref::<Column>().is_some()
996-
|| expr.as_any().downcast_ref::<Literal>().is_some()
992+
expr.is_trivial()
997993
}
998994

999995
#[cfg(test)]

0 commit comments

Comments
 (0)