Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 177 additions & 20 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! Planner for [`LogicalPlan`] to [`ExecutionPlan`]

use std::borrow::Cow;
use std::collections::HashMap;
use std::collections::{BTreeSet, HashMap};
use std::sync::Arc;

use crate::datasource::file_format::file_type_to_format;
Expand Down Expand Up @@ -455,25 +455,8 @@ impl DefaultPhysicalPlanner {
) -> Result<Arc<dyn ExecutionPlan>> {
let exec_node: Arc<dyn ExecutionPlan> = match node {
// Leaves (no children)
LogicalPlan::TableScan(TableScan {
source,
projection,
filters,
fetch,
..
}) => {
let source = source_as_provider(source)?;
// Remove all qualifiers from the scan as the provider
// doesn't know (nor should care) how the relation was
// referred to in the query
let filters = unnormalize_cols(filters.iter().cloned());
let filters_vec = filters.into_iter().collect::<Vec<_>>();
let opts = ScanArgs::default()
.with_projection(projection.as_deref())
.with_filters(Some(&filters_vec))
.with_limit(*fetch);
let res = source.scan_with_args(session_state, opts).await?;
Arc::clone(res.plan())
LogicalPlan::TableScan(scan) => {
self.plan_table_scan(scan, session_state).await?
}
LogicalPlan::Values(Values { values, schema }) => {
let exprs = values
Expand Down Expand Up @@ -1725,6 +1708,180 @@ impl DefaultPhysicalPlanner {
))
}
}

/// Plan a TableScan node, wrapping with ProjectionExec as needed.
///
/// This method handles projection pushdown by:
/// 1. Computing which columns the scan needs to produce
/// 2. Creating the scan with minimal required columns
/// 3. Applying any remainder projection (for complex expressions)
/// 4. Attempting to push non-async expressions into the scan via `try_swapping_with_projection`
async fn plan_table_scan(
&self,
scan: &TableScan,
session_state: &SessionState,
) -> Result<Arc<dyn ExecutionPlan>> {
let provider = source_as_provider(&scan.source)?;
let source_schema = scan.source.schema();

// Remove qualifiers from filters
let filters: Vec<Expr> = unnormalize_cols(scan.filters.iter().cloned());

// Compute required column indices and remainder projection
let (remainder_projection, scan_indices) =
self.compute_scan_projection(&scan.projection, &source_schema)?;

// Create the scan
let scan_args = ScanArgs::default()
.with_projection(scan_indices.as_ref().map(|v| v.as_slice()))
.with_filters(if filters.is_empty() {
None
} else {
Some(&filters)
})
.with_limit(scan.fetch);

let scan_result = provider.scan_with_args(session_state, scan_args).await?;
let mut plan: Arc<dyn ExecutionPlan> = Arc::clone(scan_result.plan());

// Wrap with ProjectionExec if remainder projection is needed
if let Some(ref proj_exprs) = remainder_projection {
let scan_df_schema = DFSchema::try_from(plan.schema().as_ref().clone())?;
let unnormalized_proj_exprs: Vec<Expr> =
unnormalize_cols(proj_exprs.iter().cloned());
plan = self.create_projection_exec(
&unnormalized_proj_exprs,
plan,
&scan_df_schema,
session_state,
)?;
}

Ok(plan)
}

/// Compute the column indices needed for the scan based on projection expressions.
///
/// Returns a tuple of:
/// - `Option<Vec<Expr>>`: Remainder projection to apply on top of the scan output.
/// `None` if the projection is all simple column references (reordering, dropping, etc.)
/// - `Vec<usize>`: Column indices to scan from the source.
fn compute_scan_projection(
&self,
projection: &Option<Vec<Expr>>,
source_schema: &Schema,
) -> Result<(Option<Vec<Expr>>, Option<Vec<usize>>)> {
let Some(exprs) = projection else {
// None means scan all columns, no remainder needed
return Ok((None, None));
};

if exprs.is_empty() {
return Ok((None, Some(vec![])));
}

let mut has_complex_expr = false;
let mut all_required_columns = BTreeSet::new();
let mut remainder_exprs = vec![];

for expr in exprs {
// Collect all column references from this expression
let mut is_complex_expr = false;
expr.apply(|e| {
if let Expr::Column(col) = e {
if let Ok(index) = source_schema.index_of(col.name()) {
// If we made it this far this must be the first level and the whole expression is a simple column reference
// But we don't know if subsequent expressions might have more complex expressions necessitating `remainder_exprs`
// to be populated, so we push to `remainder_exprs` just in case they are needed later.
// It is simpler to do this now than to try to backtrack later since we already matched into Expr::Column
// and thus can simply clone `expr` here.
// If `is_complex_expr` is true then we will append the complex expression itself to `remainder_exprs` instead
// later once we've fully traversed this expression.
if !is_complex_expr {
remainder_exprs.push(expr.clone());
}
all_required_columns.insert(index);
}
} else {
// Nothing to do here except note that we will have to append the full expression later
is_complex_expr = true;
}
Ok(TreeNodeRecursion::Continue)
})?;
if is_complex_expr {
// If any expression in the projection is not a simple column reference we will need to apply a remainder projection
has_complex_expr = true;
// Append the full expression itself to the remainder expressions
// So given a projection like `[a, a + c, d]` we would have:
// all_required_columns = {0, 2, 3}
// original schema: [a: Int, b: Int, c: Int, d: Int]
// projected schema: [a: Int, c: Int, d: Int]
// remainder_exprs = [col(a), col(a) + col(c), col(d)]
remainder_exprs.push(expr.clone());
}
}

// Always return explicit indices to ensure compatibility with all providers.
// Some providers (e.g., FFI) cannot distinguish between None (scan all) and
// empty vec (scan nothing), so we always provide explicit column indices.
Ok((
has_complex_expr.then_some(remainder_exprs),
Some(all_required_columns.into_iter().collect()),
))
}

/// Creates a ProjectionExec from logical expressions, handling async UDF expressions.
///
/// If the expressions contain async UDFs, wraps them with `AsyncFuncExec`.
fn create_projection_exec(
&self,
exprs: &[Expr],
input: Arc<dyn ExecutionPlan>,
input_dfschema: &DFSchema,
session_state: &SessionState,
) -> Result<Arc<dyn ExecutionPlan>> {
let physical_exprs: Vec<(Arc<dyn PhysicalExpr>, String)> = exprs
.iter()
.map(|e| {
let physical =
self.create_physical_expr(e, input_dfschema, session_state)?;
let name = e.schema_name().to_string();
Ok((physical, name))
})
.collect::<Result<Vec<_>>>()?;

let num_input_columns = input.schema().fields().len();
let input_schema = input.schema();

match self.try_plan_async_exprs(
num_input_columns,
PlannedExprResult::ExprWithName(physical_exprs),
input_schema.as_ref(),
)? {
PlanAsyncExpr::Sync(PlannedExprResult::ExprWithName(physical_exprs)) => {
let proj_exprs: Vec<ProjectionExpr> = physical_exprs
.into_iter()
.map(|(expr, alias)| ProjectionExpr { expr, alias })
.collect();
Ok(Arc::new(ProjectionExec::try_new(proj_exprs, input)?))
}
PlanAsyncExpr::Async(
async_map,
PlannedExprResult::ExprWithName(physical_exprs),
) => {
let async_exec = AsyncFuncExec::try_new(async_map.async_exprs, input)?;
let proj_exprs: Vec<ProjectionExpr> = physical_exprs
.into_iter()
.map(|(expr, alias)| ProjectionExpr { expr, alias })
.collect();
Ok(Arc::new(ProjectionExec::try_new(
proj_exprs,
Arc::new(async_exec),
)?))
}
_ => internal_err!("Unexpected PlanAsyncExpressions variant"),
}
}
}

/// Expand and align a GROUPING SET expression.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,8 @@ impl TableProvider for CustomProvider {
filters: &[Expr],
_: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let empty = Vec::new();
let projection = projection.unwrap_or(&empty);
// None means "all columns", Some(empty) means "no columns"
let select_all_columns = projection.is_none() || !projection.unwrap().is_empty();
match &filters[0] {
Expr::BinaryExpr(BinaryExpr { right, .. }) => {
let int_value = match &**right {
Expand Down Expand Up @@ -215,9 +215,10 @@ impl TableProvider for CustomProvider {
};

Ok(Arc::new(CustomPlan::new(
match projection.is_empty() {
true => Arc::new(Schema::empty()),
false => self.zero_batch.schema(),
if select_all_columns {
self.zero_batch.schema()
} else {
Arc::new(Schema::empty())
},
match int_value {
0 => vec![self.zero_batch.clone()],
Expand All @@ -227,9 +228,10 @@ impl TableProvider for CustomProvider {
)))
}
_ => Ok(Arc::new(CustomPlan::new(
match projection.is_empty() {
true => Arc::new(Schema::empty()),
false => self.zero_batch.schema(),
if select_all_columns {
self.zero_batch.schema()
} else {
Arc::new(Schema::empty())
},
vec![],
))),
Expand Down
9 changes: 2 additions & 7 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -520,13 +520,8 @@ impl LogicalPlanBuilder {
{
let sub_plan = p.into_owned();

if let Some(proj) = table_scan.projection {
let projection_exprs = proj
.into_iter()
.map(|i| {
Expr::Column(Column::from(sub_plan.schema().qualified_field(i)))
})
.collect::<Vec<_>>();
if let Some(projection_exprs) = table_scan.projection {
// projection is now Vec<Expr>, use directly
return Self::new(sub_plan)
.project(projection_exprs)?
.alias(table_scan.table_name);
Expand Down
4 changes: 2 additions & 2 deletions datafusion/expr/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ pub use plan::{
EmptyRelation, Explain, ExplainOption, Extension, FetchType, Filter, Join,
JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Projection,
RecursiveQuery, Repartition, SkipType, Sort, StringifiedPlan, Subquery,
SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest, Values, Window,
projection_schema,
SubqueryAlias, TableScan, TableScanBuilder, ToStringifiedPlan, Union, Unnest, Values,
Window, projection_schema,
};
pub use statement::{
Deallocate, Execute, Prepare, ResetVariable, SetVariable, Statement,
Expand Down
Loading
Loading