diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index 9fb2dd2dce29c..4867807337558 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -23,7 +23,7 @@ use async_trait::async_trait; use datafusion_catalog::{ScanArgs, ScanResult, Session, TableProvider}; use datafusion_common::stats::Precision; use datafusion_common::{ - Constraints, SchemaExt, Statistics, internal_datafusion_err, plan_err, project_schema, + Constraints, SchemaExt, Statistics, internal_datafusion_err, plan_err, }; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_groups::FileGroup; @@ -38,7 +38,10 @@ use datafusion_execution::cache::cache_manager::FileStatisticsCache; use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache; use datafusion_expr::dml::InsertOp; use datafusion_expr::execution_props::ExecutionProps; -use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType}; +use datafusion_expr::{ + Expr, ProjectionExprs, TableProviderFilterPushDown, TableType, + projection_exprs_from_schema_and_indices, +}; use datafusion_physical_expr::create_lex_ordering; use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use datafusion_physical_expr_common::sort_expr::LexOrdering; @@ -384,8 +387,11 @@ impl TableProvider for ListingTable { filters: &[Expr], limit: Option, ) -> datafusion_common::Result> { + let projection_exprs = projection + .map(|p| projection_exprs_from_schema_and_indices(&self.schema(), p)) + .transpose()?; let options = ScanArgs::default() - .with_projection(projection.map(|p| p.as_slice())) + .with_projection(projection_exprs.as_deref()) .with_filters(Some(filters)) .with_limit(limit); Ok(self.scan_with_args(state, options).await?.into_inner()) @@ -434,8 +440,7 @@ impl TableProvider for ListingTable { // if no files need to be read, return an `EmptyExec` if partitioned_file_lists.is_empty() { - let projected_schema = project_schema(&self.schema(), projection.as_ref())?; - return Ok(ScanResult::new(Arc::new(EmptyExec::new(projected_schema)))); + return Ok(ScanResult::new(Arc::new(EmptyExec::new(self.schema())))); } let output_ordering = self.try_create_output_ordering(state.execution_props())?; @@ -478,6 +483,12 @@ impl TableProvider for ListingTable { let file_source = self.create_file_source(); + // Convert projection expressions to indices for FileScanConfigBuilder + let projection_indices = projection + .as_ref() + .map(|p| p.projection_column_indices(&self.schema())) + .transpose()?; + // create the execution plan let plan = self .options @@ -488,7 +499,7 @@ impl TableProvider for ListingTable { .with_file_groups(partitioned_file_lists) .with_constraints(self.constraints.clone()) .with_statistics(statistics) - .with_projection_indices(projection)? + .with_projection_indices(projection_indices)? .with_limit(limit) .with_output_ordering(output_ordering) .with_expr_adapter(self.expr_adapter_factory.clone()) diff --git a/datafusion/catalog/src/cte_worktable.rs b/datafusion/catalog/src/cte_worktable.rs index 9565dcc60141e..5f47f8e93338d 100644 --- a/datafusion/catalog/src/cte_worktable.rs +++ b/datafusion/catalog/src/cte_worktable.rs @@ -23,10 +23,13 @@ use std::sync::Arc; use arrow::datatypes::SchemaRef; use async_trait::async_trait; -use datafusion_common::error::Result; -use datafusion_expr::{Expr, LogicalPlan, TableProviderFilterPushDown, TableType}; +use datafusion_common::{Column, error::Result}; +use datafusion_expr::{ + Expr, LogicalPlan, ProjectionExprs, TableProviderFilterPushDown, TableType, +}; use datafusion_physical_plan::ExecutionPlan; use datafusion_physical_plan::work_table::WorkTableExec; +use itertools::Itertools; use crate::{ScanArgs, ScanResult, Session, TableProvider}; @@ -88,8 +91,17 @@ impl TableProvider for CteWorkTable { filters: &[Expr], limit: Option, ) -> Result> { + let schema = self.schema(); + let projection_exprs = projection.as_ref().map(|p| { + p.as_slice() + .iter() + .map(|i| { + Expr::Column(Column::from_name(schema.field(*i).name().to_string())) + }) + .collect_vec() + }); let options = ScanArgs::default() - .with_projection(projection.map(|p| p.as_slice())) + .with_projection(projection_exprs.as_ref().map(|p| p.as_slice())) .with_filters(Some(filters)) .with_limit(limit); Ok(self.scan_with_args(state, options).await?.into_inner()) @@ -100,10 +112,13 @@ impl TableProvider for CteWorkTable { _state: &dyn Session, args: ScanArgs<'a>, ) -> Result { + let schema = self.schema(); Ok(ScanResult::new(Arc::new(WorkTableExec::new( self.name.clone(), Arc::clone(&self.table_schema), - args.projection().map(|p| p.to_vec()), + args.projection() + .map(|p| p.projection_column_indices(&schema)) + .transpose()?, )?))) } diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs index 1f223852c2b9d..f5fe9d9035751 100644 --- a/datafusion/catalog/src/table.rs +++ b/datafusion/catalog/src/table.rs @@ -25,7 +25,7 @@ use arrow::datatypes::SchemaRef; use async_trait::async_trait; use datafusion_common::Result; use datafusion_common::{Constraints, Statistics, not_impl_err}; -use datafusion_expr::Expr; +use datafusion_expr::{Expr, ProjectionExprs}; use datafusion_expr::dml::InsertOp; use datafusion_expr::{ @@ -194,7 +194,10 @@ pub trait TableProvider: Debug + Sync + Send { args: ScanArgs<'a>, ) -> Result { let filters = args.filters().unwrap_or(&[]); - let projection = args.projection().map(|p| p.to_vec()); + let projection = args + .projection() + .map(|p| p.projection_column_indices(&self.schema())) + .transpose()?; let limit = args.limit(); let plan = self .scan(state, projection.as_ref(), filters, limit) @@ -359,19 +362,19 @@ pub trait TableProvider: Debug + Sync + Send { #[derive(Debug, Clone, Default)] pub struct ScanArgs<'a> { filters: Option<&'a [Expr]>, - projection: Option<&'a [usize]>, + projection: Option<&'a [Expr]>, limit: Option, } impl<'a> ScanArgs<'a> { /// Set the column projection for the scan. /// - /// The projection is a list of column indices from [`TableProvider::schema`] - /// that should be included in the scan results. If `None`, all columns are included. + /// The projection is a list of expressions applied to + /// [`TableProvider::schema`] which may include columns and more complex expressions. /// /// # Arguments - /// * `projection` - Optional slice of column indices to project - pub fn with_projection(mut self, projection: Option<&'a [usize]>) -> Self { + /// * `projection` - Optional slice of projection expressions + pub fn with_projection(mut self, projection: Option<&'a [Expr]>) -> Self { self.projection = projection; self } @@ -380,7 +383,7 @@ impl<'a> ScanArgs<'a> { /// /// Returns a reference to the projection column indices, or `None` if /// no projection was specified (meaning all columns should be included). - pub fn projection(&self) -> Option<&'a [usize]> { + pub fn projection(&self) -> Option<&'a [Expr]> { self.projection } @@ -429,6 +432,14 @@ impl<'a> ScanArgs<'a> { pub struct ScanResult { /// The ExecutionPlan to run. plan: Arc, + /// Filters that we re not completely handled by the scan + /// either via statistics or other plan-time optimizations, + /// or by binding them to the returned plan. + unhandled_filters: Vec, + /// Projections that were not completely handled by the scan + /// either via statistics or other plan-time optimizations, + /// or by binding them to the returned plan. + unhandled_projections: Vec, } impl ScanResult { @@ -437,7 +448,11 @@ impl ScanResult { /// # Arguments /// * `plan` - The execution plan that will perform the table scan pub fn new(plan: Arc) -> Self { - Self { plan } + Self { + plan, + unhandled_filters: vec![], + unhandled_projections: vec![], + } } /// Get a reference to the execution plan for this scan result. @@ -455,6 +470,22 @@ impl ScanResult { pub fn into_inner(self) -> Arc { self.plan } + + /// Record filters that were not handled by the scan. + /// If called multiple times this method appends the provided + /// filters to the existing unhandled filters in the [`ScanResult`]. + pub fn with_unhandled_filters(mut self, filters: Vec) -> Self { + self.unhandled_filters.extend(filters); + self + } + + /// Record projections that were not handled by the scan. + /// If called multiple times this method appends the provided + /// projections to the existing unhandled projections in the [`ScanResult`]. + pub fn with_unhandled_projections(mut self, projections: Vec) -> Self { + self.unhandled_projections.extend(projections); + self + } } impl From> for ScanResult { diff --git a/datafusion/core/src/test_util/mod.rs b/datafusion/core/src/test_util/mod.rs index 466ee38a426fd..c6d2a406345fd 100644 --- a/datafusion/core/src/test_util/mod.rs +++ b/datafusion/core/src/test_util/mod.rs @@ -67,7 +67,7 @@ use crate::execution::RecordBatchStream; pub fn scan_empty( name: Option<&str>, table_schema: &Schema, - projection: Option>, + projection: Option>, ) -> Result { let table_schema = Arc::new(table_schema.clone()); let provider = Arc::new(EmptyTable::new(table_schema)); @@ -79,7 +79,7 @@ pub fn scan_empty( pub fn scan_empty_with_partitions( name: Option<&str>, table_schema: &Schema, - projection: Option>, + projection: Option>, partitions: usize, ) -> Result { let table_schema = Arc::new(table_schema.clone()); diff --git a/datafusion/core/tests/execution/logical_plan.rs b/datafusion/core/tests/execution/logical_plan.rs index 3eaa3fb2ed5e6..300973a03cc9c 100644 --- a/datafusion/core/tests/execution/logical_plan.rs +++ b/datafusion/core/tests/execution/logical_plan.rs @@ -27,7 +27,7 @@ use datafusion_execution::TaskContext; use datafusion_expr::expr::{AggregateFunction, AggregateFunctionParams}; use datafusion_expr::logical_plan::{LogicalPlan, Values}; use datafusion_expr::{ - Aggregate, AggregateUDF, EmptyRelation, Expr, LogicalPlanBuilder, UNNAMED_TABLE, + Aggregate, AggregateUDF, EmptyRelation, Expr, LogicalPlanBuilder, UNNAMED_TABLE, col, }; use datafusion_functions_aggregate::count::Count; use datafusion_physical_plan::collect; @@ -110,7 +110,7 @@ fn inline_scan_projection_test() -> Result<()> { Field::new("a", DataType::Int32, false), Field::new("b", DataType::Int32, false), ]); - let projection = vec![schema.index_of(column)?]; + let projection = vec![col(column)]; let provider = ViewTable::new( LogicalPlan::EmptyRelation(EmptyRelation { diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 6f654428e41a1..9727de8ce0418 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -408,7 +408,7 @@ impl LogicalPlanBuilder { pub fn scan( table_name: impl Into, table_source: Arc, - projection: Option>, + projection: Option>, ) -> Result { Self::scan_with_filters(table_name, table_source, projection, vec![]) } @@ -481,7 +481,7 @@ impl LogicalPlanBuilder { pub fn scan_with_filters( table_name: impl Into, table_source: Arc, - projection: Option>, + projection: Option>, filters: Vec, ) -> Result { Self::scan_with_filters_inner(table_name, table_source, projection, filters, None) @@ -491,7 +491,7 @@ impl LogicalPlanBuilder { pub fn scan_with_filters_fetch( table_name: impl Into, table_source: Arc, - projection: Option>, + projection: Option>, filters: Vec, fetch: Option, ) -> Result { @@ -507,38 +507,19 @@ impl LogicalPlanBuilder { fn scan_with_filters_inner( table_name: impl Into, table_source: Arc, - projection: Option>, + projection: Option>, filters: Vec, fetch: Option, ) -> Result { - let table_scan = - TableScan::try_new(table_name, table_source, projection, filters, fetch)?; - - // Inline TableScan - if table_scan.filters.is_empty() - && let Some(p) = table_scan.source.get_logical_plan() - { - let sub_plan = p.into_owned(); - - if let Some(proj) = table_scan.projection { - let projection_exprs = proj - .into_iter() - .map(|i| { - Expr::Column(Column::from(sub_plan.schema().qualified_field(i))) - }) - .collect::>(); - return Self::new(sub_plan) - .project(projection_exprs)? - .alias(table_scan.table_name); - } - - // Ensures that the reference to the inlined table remains the - // same, meaning we don't have to change any of the parent nodes - // that reference this table. - return Self::new(sub_plan).alias(table_scan.table_name); + let mut builder = TableScanBuilder::new(table_source).with_name(table_name); + if let Some(projection) = projection { + builder = builder.with_projection(projection); } - - Ok(Self::new(LogicalPlan::TableScan(table_scan))) + builder = builder.with_filters(filters); + if let Some(fetch) = fetch { + builder = builder.with_fetch(fetch); + } + builder.build() } /// Wrap a plan in a window @@ -1996,12 +1977,82 @@ pub fn subquery_alias( SubqueryAlias::try_new(Arc::new(plan), alias).map(LogicalPlan::SubqueryAlias) } +pub struct TableScanBuilder { + name: TableReference, + table_source: Arc, + projection: Option>, + filters: Vec, + fetch: Option, +} + +impl TableScanBuilder { + pub fn new(table_source: Arc) -> Self { + Self { + name: TableReference::bare(UNNAMED_TABLE), + table_source, + projection: None, + filters: vec![], + fetch: None, + } + } + + pub fn with_projection(mut self, projection: Vec) -> Self { + self.projection = Some(projection); + self + } + + pub fn with_filters(mut self, filters: Vec) -> Self { + self.filters = filters; + self + } + + pub fn with_fetch(mut self, fetch: usize) -> Self { + self.fetch = Some(fetch); + self + } + + pub fn with_name(mut self, name: impl Into) -> Self { + self.name = name.into(); + self + } + + pub fn build(self) -> Result { + let table_scan = TableScan::try_new( + self.name, + self.table_source, + self.projection, + self.filters, + self.fetch, + )?; + + // Inline TableScan + if table_scan.filters.is_empty() + && let Some(p) = table_scan.source.get_logical_plan() + { + let sub_plan = p.into_owned(); + + if let Some(projection_exprs) = table_scan.projection { + return LogicalPlanBuilder::new(sub_plan) + .project(projection_exprs)? + .alias(table_scan.table_name); + } + + // Ensures that the reference to the inlined table remains the + // same, meaning we don't have to change any of the parent nodes + // that reference this table. + return LogicalPlanBuilder::new(sub_plan).alias(table_scan.table_name); + } + + Ok(LogicalPlanBuilder::new(LogicalPlan::TableScan(table_scan))) + } +} + /// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema. /// This is mostly used for testing and documentation. pub fn table_scan( name: Option>, table_schema: &Schema, - projection: Option>, + projection: Option>, ) -> Result { table_scan_with_filters(name, table_schema, projection, vec![]) } @@ -2012,7 +2063,7 @@ pub fn table_scan( pub fn table_scan_with_filters( name: Option>, table_schema: &Schema, - projection: Option>, + projection: Option>, filters: Vec, ) -> Result { let table_source = table_source(table_schema); @@ -2028,7 +2079,7 @@ pub fn table_scan_with_filters( pub fn table_scan_with_filter_and_fetch( name: Option>, table_schema: &Schema, - projection: Option>, + projection: Option>, filters: Vec, fetch: Option, ) -> Result { @@ -2246,11 +2297,14 @@ mod tests { #[test] fn plan_builder_simple() -> Result<()> { - let plan = - table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))? - .filter(col("state").eq(lit("CO")))? - .project(vec![col("id")])? - .build()?; + let plan = table_scan( + Some("employee_csv"), + &employee_schema(), + Some(vec![col("id"), col("state")]), + )? + .filter(col("state").eq(lit("CO")))? + .project(vec![col("id")])? + .build()?; assert_snapshot!(plan, @r#" Projection: employee_csv.id @@ -2293,13 +2347,16 @@ mod tests { #[test] fn plan_builder_sort() -> Result<()> { - let plan = - table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))? - .sort(vec![ - expr::Sort::new(col("state"), true, true), - expr::Sort::new(col("salary"), false, false), - ])? - .build()?; + let plan = table_scan( + Some("employee_csv"), + &employee_schema(), + Some(vec![col("state"), col("salary")]), + )? + .sort(vec![ + expr::Sort::new(col("state"), true, true), + expr::Sort::new(col("salary"), false, false), + ])? + .build()?; assert_snapshot!(plan, @r" Sort: employee_csv.state ASC NULLS FIRST, employee_csv.salary DESC NULLS LAST @@ -2311,8 +2368,11 @@ mod tests { #[test] fn plan_builder_union() -> Result<()> { - let plan = - table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?; + let plan = table_scan( + Some("employee_csv"), + &employee_schema(), + Some(vec![col("state"), col("salary")]), + )?; let plan = plan .clone() @@ -2336,8 +2396,11 @@ mod tests { #[test] fn plan_builder_union_distinct() -> Result<()> { - let plan = - table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?; + let plan = table_scan( + Some("employee_csv"), + &employee_schema(), + Some(vec![col("state"), col("salary")]), + )?; let plan = plan .clone() @@ -2364,12 +2427,15 @@ mod tests { #[test] fn plan_builder_simple_distinct() -> Result<()> { - let plan = - table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))? - .filter(col("state").eq(lit("CO")))? - .project(vec![col("id")])? - .distinct()? - .build()?; + let plan = table_scan( + Some("employee_csv"), + &employee_schema(), + Some(vec![col("id"), col("state")]), + )? + .filter(col("state").eq(lit("CO")))? + .project(vec![col("id")])? + .distinct()? + .build()?; assert_snapshot!(plan, @r#" Distinct: @@ -2470,8 +2536,8 @@ mod tests { let plan = table_scan( Some("employee_csv"), &employee_schema(), - // project id and first_name by column index - Some(vec![0, 1]), + // project id and first_name by column expression + Some(vec![col("id"), col("first_name")]), )? // two columns with the same name => error .project(vec![col("id"), col("first_name").alias("id")]); @@ -2553,10 +2619,16 @@ mod tests { #[test] fn plan_builder_intersect_different_num_columns_error() -> Result<()> { - let plan1 = - table_scan(TableReference::none(), &employee_schema(), Some(vec![3]))?; - let plan2 = - table_scan(TableReference::none(), &employee_schema(), Some(vec![3, 4]))?; + let plan1 = table_scan( + TableReference::none(), + &employee_schema(), + Some(vec![col("state")]), + )?; + let plan2 = table_scan( + TableReference::none(), + &employee_schema(), + Some(vec![col("state"), col("salary")]), + )?; let err_msg1 = LogicalPlanBuilder::intersect(plan1.build()?, plan2.build()?, true) @@ -2773,13 +2845,16 @@ mod tests { #[test] fn plan_builder_from_logical_plan() -> Result<()> { - let plan = - table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))? - .sort(vec![ - expr::Sort::new(col("state"), true, true), - expr::Sort::new(col("salary"), false, false), - ])? - .build()?; + let plan = table_scan( + Some("employee_csv"), + &employee_schema(), + Some(vec![col("state"), col("salary")]), + )? + .sort(vec![ + expr::Sort::new(col("state"), true, true), + expr::Sort::new(col("salary"), false, false), + ])? + .build()?; let plan_expected = format!("{plan}"); let plan_builder: LogicalPlanBuilder = Arc::new(plan).into(); @@ -2794,10 +2869,13 @@ mod tests { Constraints::new_unverified(vec![Constraint::PrimaryKey(vec![0])]); let table_source = table_source_with_constraints(&employee_schema(), constraints); - let plan = - LogicalPlanBuilder::scan("employee_csv", table_source, Some(vec![0, 3, 4]))? - .aggregate(vec![col("id")], vec![sum(col("salary"))])? - .build()?; + let plan = LogicalPlanBuilder::scan( + "employee_csv", + table_source, + Some(vec![col("id"), col("state"), col("salary")]), + )? + .aggregate(vec![col("id")], vec![sum(col("salary"))])? + .build()?; assert_snapshot!(plan, @r" Aggregate: groupBy=[[employee_csv.id]], aggr=[[sum(employee_csv.salary)]] @@ -2815,11 +2893,14 @@ mod tests { let options = LogicalPlanBuilderOptions::new().with_add_implicit_group_by_exprs(true); - let plan = - LogicalPlanBuilder::scan("employee_csv", table_source, Some(vec![0, 3, 4]))? - .with_options(options) - .aggregate(vec![col("id")], vec![sum(col("salary"))])? - .build()?; + let plan = LogicalPlanBuilder::scan( + "employee_csv", + table_source, + Some(vec![col("id"), col("state"), col("salary")]), + )? + .with_options(options) + .aggregate(vec![col("id")], vec![sum(col("salary"))])? + .build()?; assert_snapshot!(plan, @r" Aggregate: groupBy=[[employee_csv.id, employee_csv.state, employee_csv.salary]], aggr=[[sum(employee_csv.salary)]] diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs index c2b01868c97f3..7c9c9f048d5d4 100644 --- a/datafusion/expr/src/logical_plan/mod.rs +++ b/datafusion/expr/src/logical_plan/mod.rs @@ -41,9 +41,9 @@ pub use plan::{ Aggregate, Analyze, ColumnUnnestList, DescribeTable, Distinct, DistinctOn, EmptyRelation, Explain, ExplainOption, Extension, FetchType, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Projection, - RecursiveQuery, Repartition, SkipType, Sort, StringifiedPlan, Subquery, - SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest, Values, Window, - projection_schema, + ProjectionExprs, RecursiveQuery, Repartition, SkipType, Sort, StringifiedPlan, + Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest, Values, Window, + projection_exprs_from_schema_and_indices, projection_schema, }; pub use statement::{ Deallocate, Execute, Prepare, ResetVariable, SetVariable, Statement, diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 4219c24bfc9c9..dc31acf48da41 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1811,12 +1811,9 @@ impl LogicalPlan { .. }) => { let projected_fields = match projection { - Some(indices) => { - let schema = source.schema(); - let names: Vec<&str> = indices - .iter() - .map(|i| schema.field(*i).name().as_str()) - .collect(); + Some(exprs) => { + let names: Vec = + exprs.iter().map(|e| e.to_string()).collect(); format!(" projection=[{}]", names.join(", ")) } _ => "".to_string(), @@ -2195,6 +2192,52 @@ impl PartialOrd for Values { } } +pub fn projection_exprs_from_schema_and_indices( + schema: &Schema, + indices: &[usize], +) -> Result> { + let fields = schema.fields(); + indices + .iter() + .map(|i| { + if i >= &fields.len() { + return plan_err!( + "Index {i} out of bounds for schema with {} fields", + fields.len() + ); + } + let field = schema.field(*i); + Ok(Expr::Column(Column::new_unqualified(field.name()))) + }) + .collect::>>() +} + +pub trait ProjectionExprs { + fn projection_columns(&self) -> Vec; + fn projection_column_indices(&self, schema: &Schema) -> Result>; +} + +impl ProjectionExprs for [Expr] { + fn projection_columns(&self) -> Vec { + self.iter() + .filter_map(|e| match e { + Expr::Column(c) => Some(c.clone()), + _ => None, + }) + .collect() + } + + fn projection_column_indices(&self, schema: &Schema) -> Result> { + self.iter() + .filter_map(|e| match e { + Expr::Column(c) => Some(c.name()), + _ => None, + }) + .map(|name| Ok(schema.index_of(name)?)) + .collect() + } +} + /// Evaluates an arbitrary list of expressions (essentially a /// SELECT with an expression list) on its input. #[derive(Clone, PartialEq, Eq, Hash, Debug)] @@ -2245,7 +2288,7 @@ impl Projection { ); } Ok(Self { - expr, + expr: expr.into(), input, schema, }) @@ -2255,7 +2298,7 @@ impl Projection { pub fn new_from_schema(input: Arc, schema: DFSchemaRef) -> Self { let expr: Vec = schema.columns().into_iter().map(Expr::Column).collect(); Self { - expr, + expr: expr.into(), input, schema, } @@ -2675,8 +2718,10 @@ pub struct TableScan { pub table_name: TableReference, /// The source of the table pub source: Arc, - /// Optional column indices to use as a projection - pub projection: Option>, + /// Optional projection expressions to select which columns are returned. + /// Each expression is typically a column reference, but can also be + /// more complex expressions. + pub projection: Option>, /// The schema description of the output pub projected_schema: DFSchemaRef, /// Optional expressions to be used as filters by the table provider @@ -2718,8 +2763,8 @@ impl PartialOrd for TableScan { struct ComparableTableScan<'a> { /// The name of the table pub table_name: &'a TableReference, - /// Optional column indices to use as a projection - pub projection: &'a Option>, + /// Optional projection expressions + pub projection: &'a Option>, /// Optional expressions to be used as filters by the table provider pub filters: &'a Vec, /// Optional number of rows to read @@ -2760,7 +2805,7 @@ impl TableScan { pub fn try_new( table_name: impl Into, table_source: Arc, - projection: Option>, + projection: Option>, filters: Vec, fetch: Option, ) -> Result { @@ -2777,11 +2822,15 @@ impl TableScan { let projected_schema = projection .as_ref() .map(|p| { - let projected_func_dependencies = - func_dependencies.project_functional_dependencies(p, p.len()); + // Convert projection expressions to column indices + let indices = p.projection_column_indices(&schema)?; + + let projected_func_dependencies = func_dependencies + .project_functional_dependencies(&indices, indices.len()); let df_schema = DFSchema::new_with_metadata( - p.iter() + indices + .iter() .map(|i| { (Some(table_name.clone()), Arc::clone(&schema.fields()[*i])) }) @@ -4328,13 +4377,21 @@ mod tests { } fn display_plan() -> Result { - let plan1 = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3]))? - .build()?; - - table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))? - .filter(in_subquery(col("state"), Arc::new(plan1)))? - .project(vec![col("id")])? - .build() + let plan1 = table_scan( + Some("employee_csv"), + &employee_schema(), + Some(vec![col("state")]), + )? + .build()?; + + table_scan( + Some("employee_csv"), + &employee_schema(), + Some(vec![col("id"), col("state")]), + )? + .filter(in_subquery(col("state"), Arc::new(plan1)))? + .project(vec![col("id")])? + .build() } #[test] @@ -4367,14 +4424,21 @@ mod tests { #[test] fn test_display_subquery_alias() -> Result<()> { - let plan1 = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3]))? - .build()?; + let plan1 = table_scan( + Some("employee_csv"), + &employee_schema(), + Some(vec![col("state")]), + )? + .build()?; let plan1 = Arc::new(plan1); - let plan = - table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))? - .project(vec![col("id"), exists(plan1).alias("exists")])? - .build(); + let plan = table_scan( + Some("employee_csv"), + &employee_schema(), + Some(vec![col("id"), col("state")]), + )? + .project(vec![col("id"), exists(plan1).alias("exists")])? + .build(); assert_snapshot!(plan?.display_indent(), @r" Projection: employee_csv.id, EXISTS () AS exists @@ -4797,14 +4861,18 @@ mod tests { Field::new("state", DataType::Utf8, false), ]); - table_scan(TableReference::none(), &schema, Some(vec![0, 1])) - .unwrap() - .filter(col("state").eq(lit("CO"))) - .unwrap() - .project(vec![col("id")]) - .unwrap() - .build() - .unwrap() + table_scan( + TableReference::none(), + &schema, + Some(vec![col("id"), col("state")]), + ) + .unwrap() + .filter(col("state").eq(lit("CO"))) + .unwrap() + .project(vec![col("id")]) + .unwrap() + .build() + .unwrap() } #[test] @@ -5144,17 +5212,20 @@ mod tests { let subquery_schema = Schema::new(vec![Field::new("sub_id", DataType::Int32, false)]); - let subquery_plan = - table_scan(TableReference::none(), &subquery_schema, Some(vec![0])) - .unwrap() - .filter(col("sub_id").eq(lit(0))) - .unwrap() - .build() - .unwrap(); + let subquery_plan = table_scan( + TableReference::none(), + &subquery_schema, + Some(vec![col("sub_id")]), + ) + .unwrap() + .filter(col("sub_id").eq(lit(0))) + .unwrap() + .build() + .unwrap(); let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]); - let plan = table_scan(TableReference::none(), &schema, Some(vec![0])) + let plan = table_scan(TableReference::none(), &schema, Some(vec![col("id")])) .unwrap() .filter(col("id").eq(lit(0))) .unwrap() diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 548eadffa242e..91fc7afcc58e0 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -31,7 +31,7 @@ use datafusion_common::{ use datafusion_expr::expr::Alias; use datafusion_expr::{ Aggregate, Distinct, EmptyRelation, Expr, Projection, TableScan, Unnest, Window, - logical_plan::LogicalPlan, + logical_plan::LogicalPlan, projection_exprs_from_schema_and_indices, }; use crate::optimize_projections::required_indices::RequiredIndices; @@ -262,16 +262,31 @@ fn optimize_projections( projected_schema: _, } = table_scan; - // Get indices referred to in the original (schema with all fields) - // given projected indices. - let projection = match &projection { - Some(projection) => indices.into_mapped_indices(|idx| projection[idx]), - None => indices.into_inner(), + // Get the projection expressions based on required indices. + // If there's an existing projection, select the expressions at the required indices. + // If there's no projection, create expressions from the indices using the source schema. + let new_projection = match &projection { + Some(proj_exprs) => { + // Select the expressions at the required indices + let required_indices = indices.into_inner(); + required_indices + .into_iter() + .map(|idx| proj_exprs[idx].clone()) + .collect::>() + } + None => { + // Convert indices to column expressions + let required_indices = indices.into_inner(); + projection_exprs_from_schema_and_indices( + &source.schema(), + &required_indices, + )? + } }; return TableScan::try_new( table_name, source, - Some(projection), + Some(new_projection), filters, fetch, ) @@ -836,6 +851,63 @@ fn rewrite_projection_given_requirements( let exprs_used = indices.get_at_indices(&expr); + // Special case: when the input is a TableScan and all projection expressions + // are simple column references, push the projection directly into the scan. + // This preserves the desired column order and eliminates the Projection node. + if let LogicalPlan::TableScan(table_scan) = input.as_ref() { + if exprs_used.iter().all(|e| matches!(e, Expr::Column(_))) { + // All expressions are column references - push directly into scan + let TableScan { + table_name, + source, + projection, + filters, + fetch, + projected_schema: _, + } = table_scan.clone(); + + // Build the new projection expressions by mapping through the existing + // scan projection (if any) or directly from the source schema + let new_projection_exprs: Vec = exprs_used + .iter() + .filter_map(|e| { + if let Expr::Column(col) = e { + // Find the index of this column in the current scan's schema + let idx = + table_scan.projected_schema.maybe_index_of_column(col)?; + // Get the corresponding expression from the existing projection + // or create a column reference from the source schema + match &projection { + Some(proj_exprs) => Some(proj_exprs[idx].clone()), + None => { + // No existing projection - create unqualified column + // to match the behavior of projection_exprs_from_schema_and_indices + let (_, field) = + table_scan.projected_schema.qualified_field(idx); + Some(Expr::Column(Column::new_unqualified(field.name()))) + } + } + } else { + None + } + }) + .collect(); + + // Only proceed if we successfully mapped all columns + if new_projection_exprs.len() == exprs_used.len() { + return TableScan::try_new( + table_name, + source, + Some(new_projection_exprs), + filters, + fetch, + ) + .map(LogicalPlan::TableScan) + .map(Transformed::yes); + } + } + } + let required_indices = RequiredIndices::new().with_exprs(input.schema(), exprs_used.iter()); @@ -1752,16 +1824,15 @@ mod tests { fn redundant_project() -> Result<()> { let table_scan = test_table_scan()?; + // Two projections that are just column reordering get merged and + // pushed directly into the scan let plan = LogicalPlanBuilder::from(table_scan) .project(vec![col("a"), col("b"), col("c")])? .project(vec![col("a"), col("c"), col("b")])? .build()?; assert_optimized_plan_equal!( plan, - @r" - Projection: test.a, test.c, test.b - TableScan: test projection=[a, b, c] - " + @"TableScan: test projection=[a, c, b]" ) } @@ -1769,7 +1840,12 @@ mod tests { fn reorder_scan() -> Result<()> { let schema = Schema::new(test_table_scan_fields()); - let plan = table_scan(Some("test"), &schema, Some(vec![1, 0, 2]))?.build()?; + let plan = table_scan( + Some("test"), + &schema, + Some(vec![col("b"), col("a"), col("c")]), + )? + .build()?; assert_optimized_plan_equal!( plan, @"TableScan: test projection=[b, a, c]" @@ -1780,15 +1856,18 @@ mod tests { fn reorder_scan_projection() -> Result<()> { let schema = Schema::new(test_table_scan_fields()); - let plan = table_scan(Some("test"), &schema, Some(vec![1, 0, 2]))? - .project(vec![col("a"), col("b")])? - .build()?; + // Projection over TableScan with column reordering should push + // the projection directly into the scan, eliminating the Projection node + let plan = table_scan( + Some("test"), + &schema, + Some(vec![col("b"), col("a"), col("c")]), + )? + .project(vec![col("a"), col("b")])? + .build()?; assert_optimized_plan_equal!( plan, - @r" - Projection: test.a, test.b - TableScan: test projection=[b, a] - " + @"TableScan: test projection=[a, b]" ) } @@ -1796,15 +1875,13 @@ mod tests { fn reorder_projection() -> Result<()> { let table_scan = test_table_scan()?; + // Projection that just reorders columns gets pushed into the scan let plan = LogicalPlanBuilder::from(table_scan) .project(vec![col("c"), col("b"), col("a")])? .build()?; assert_optimized_plan_equal!( plan, - @r" - Projection: test.c, test.b, test.a - TableScan: test projection=[a, b, c] - " + @"TableScan: test projection=[c, b, a]" ) } @@ -1812,6 +1889,7 @@ mod tests { fn noncontinuous_redundant_projection() -> Result<()> { let table_scan = test_table_scan()?; + // The innermost projection-over-scan gets pushed into the scan let plan = LogicalPlanBuilder::from(table_scan) .project(vec![col("c"), col("b"), col("a")])? .filter(col("c").gt(lit(1)))? @@ -1828,8 +1906,7 @@ mod tests { Filter: test.b > Int32(1) Projection: test.c, test.a, test.b Filter: test.c > Int32(1) - Projection: test.c, test.b, test.a - TableScan: test projection=[a, b, c] + TableScan: test projection=[c, b, a] " ) } @@ -2057,6 +2134,7 @@ mod tests { assert_eq!(3, table_scan.schema().fields().len()); assert_fields_eq(&table_scan, vec!["a", "b", "c"]); + // Projection with column reordering gets pushed into the scan let plan = LogicalPlanBuilder::from(table_scan) .project(vec![col("c"), col("a")])? .limit(0, Some(5))? @@ -2068,8 +2146,7 @@ mod tests { plan, @r" Limit: skip=0, fetch=5 - Projection: test.c, test.a - TableScan: test projection=[a, c] + TableScan: test projection=[c, a] " ) } @@ -2108,6 +2185,7 @@ mod tests { assert_fields_eq(&table_scan, vec!["a", "b", "c"]); // we never use "b" in the first projection => remove it + // the projection over scan gets pushed into the scan let plan = LogicalPlanBuilder::from(table_scan) .project(vec![col("c"), col("a"), col("b")])? .filter(col("c").gt(lit(1)))? @@ -2122,8 +2200,7 @@ mod tests { @r" Aggregate: groupBy=[[test.c]], aggr=[[max(test.a)]] Filter: test.c > Int32(1) - Projection: test.c, test.a - TableScan: test projection=[a, c] + TableScan: test projection=[c, a] " ) } diff --git a/datafusion/optimizer/src/optimize_projections/required_indices.rs b/datafusion/optimizer/src/optimize_projections/required_indices.rs index c1e0885c9b5f2..20691beb58eb6 100644 --- a/datafusion/optimizer/src/optimize_projections/required_indices.rs +++ b/datafusion/optimizer/src/optimize_projections/required_indices.rs @@ -193,15 +193,6 @@ impl RequiredIndices { self } - /// Apply the given function `f` to each index in this instance, returning - /// the mapped indices - pub fn into_mapped_indices(self, f: F) -> Vec - where - F: Fn(usize) -> usize, - { - self.map_indices(f).into_inner() - } - /// Returns the `Expr`s from `exprs` that are at the indices in this instance pub fn get_at_indices(&self, exprs: &[Expr]) -> Vec { self.indices.iter().map(|&idx| exprs[idx].clone()).collect() diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 755ffdbafc869..2bbb0e48ab139 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -3108,7 +3108,7 @@ mod tests { fn table_scan_with_pushdown_provider_builder( filter_support: TableProviderFilterPushDown, filters: Vec, - projection: Option>, + projection: Option>, ) -> Result { let test_provider = PushDownProvider { filter_support }; @@ -3196,7 +3196,7 @@ mod tests { let plan = table_scan_with_pushdown_provider_builder( TableProviderFilterPushDown::Inexact, vec![col("a").eq(lit(10i64)), col("b").gt(lit(11i64))], - Some(vec![0]), + Some(vec![col("a")]), )? .filter(and(col("a").eq(lit(10i64)), col("b").gt(lit(11i64))))? .project(vec![col("a"), col("b")])? @@ -3217,7 +3217,7 @@ mod tests { let plan = table_scan_with_pushdown_provider_builder( TableProviderFilterPushDown::Exact, vec![], - Some(vec![0]), + Some(vec![col("a")]), )? .filter(and(col("a").eq(lit(10i64)), col("b").gt(lit(11i64))))? .project(vec![col("a"), col("b")])? diff --git a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs index 1b25c5ce8a632..12448688d2576 100644 --- a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs +++ b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs @@ -210,7 +210,7 @@ mod tests { let table_scan = table_scan_with_filters( Some("test"), &schema, - Some(vec![0]), + Some(vec![col("a")]), vec![col("b").is_not_null()], )? .build()?; diff --git a/datafusion/optimizer/src/test/mod.rs b/datafusion/optimizer/src/test/mod.rs index a45983950496d..b28987eededf6 100644 --- a/datafusion/optimizer/src/test/mod.rs +++ b/datafusion/optimizer/src/test/mod.rs @@ -21,7 +21,7 @@ use crate::{OptimizerContext, OptimizerRule}; use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::config::ConfigOptions; use datafusion_common::{Result, assert_contains}; -use datafusion_expr::{LogicalPlan, LogicalPlanBuilder, logical_plan::table_scan}; +use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder, logical_plan::table_scan}; use std::sync::Arc; pub mod user_defined; @@ -49,7 +49,7 @@ pub fn test_table_scan() -> Result { pub fn scan_empty( name: Option<&str>, table_schema: &Schema, - projection: Option>, + projection: Option>, ) -> Result { table_scan(name, table_schema, projection) } diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 218c2e4e47d04..6bcaea99bc91b 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -63,6 +63,7 @@ use datafusion_expr::{ Projection, Repartition, Sort, SubqueryAlias, TableScan, Values, Window, builder::project, }, + projection_exprs_from_schema_and_indices, }; use self::to_proto::{serialize_expr, serialize_exprs}; @@ -480,15 +481,19 @@ impl AsLogicalPlan for LogicalPlanNode { let table_name = from_table_reference(scan.table_name.as_ref(), "ListingTableScan")?; - let mut projection = None; - if let Some(columns) = &scan.projection { - let column_indices = columns + let projection = if let Some(columns) = &scan.projection { + let column_indices: Vec = columns .columns .iter() .map(|name| provider.schema().index_of(name)) .collect::, _>>()?; - projection = Some(column_indices); - } + Some(projection_exprs_from_schema_and_indices( + &provider.schema(), + &column_indices, + )?) + } else { + None + }; LogicalPlanBuilder::scan_with_filters( table_name, @@ -501,15 +506,19 @@ impl AsLogicalPlan for LogicalPlanNode { LogicalPlanType::CustomScan(scan) => { let schema: Schema = convert_required!(scan.schema)?; let schema = Arc::new(schema); - let mut projection = None; - if let Some(columns) = &scan.projection { - let column_indices = columns + let projection = if let Some(columns) = &scan.projection { + let column_indices: Vec = columns .columns .iter() .map(|name| schema.index_of(name)) .collect::, _>>()?; - projection = Some(column_indices); - } + Some(projection_exprs_from_schema_and_indices( + &schema, + &column_indices, + )?) + } else { + None + }; let filters = from_proto::parse_exprs(&scan.filters, ctx, extension_codec)?; @@ -838,15 +847,19 @@ impl AsLogicalPlan for LogicalPlanNode { LogicalPlanType::ViewScan(scan) => { let schema: Schema = convert_required!(scan.schema)?; - let mut projection = None; - if let Some(columns) = &scan.projection { - let column_indices = columns + let projection = if let Some(columns) = &scan.projection { + let column_indices: Vec = columns .columns .iter() .map(|name| schema.index_of(name)) .collect::, _>>()?; - projection = Some(column_indices); - } + Some(projection_exprs_from_schema_and_indices( + &schema, + &column_indices, + )?) + } else { + None + }; let input: LogicalPlan = into_logical_plan!(scan.input, ctx, extension_codec)?; @@ -1023,10 +1036,18 @@ impl AsLogicalPlan for LogicalPlanNode { let projection = match projection { None => None, - Some(columns) => { - let column_names = columns + Some(exprs) => { + // Extract column names from projection expressions + let column_names: Vec = exprs .iter() - .map(|i| schema.field(*i).name().to_owned()) + .filter_map(|expr| { + if let Expr::Column(col) = expr { + Some(col.name().to_owned()) + } else { + // For non-column expressions, use their string representation + Some(expr.to_string()) + } + }) .collect(); Some(protobuf::ProjectionColumns { columns: column_names, diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index 77676fc2fd2d9..3c75b0e69d5d2 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -82,7 +82,7 @@ use datafusion_expr::{ Accumulator, AggregateUDF, ColumnarValue, ExprFunctionExt, ExprSchemable, LimitEffect, Literal, LogicalPlan, LogicalPlanBuilder, Operator, PartitionEvaluator, ScalarUDF, Signature, TryCast, Volatility, WindowFrame, WindowFrameBound, - WindowFrameUnits, WindowFunctionDefinition, WindowUDF, WindowUDFImpl, + WindowFrameUnits, WindowFunctionDefinition, WindowUDF, WindowUDFImpl, col, }; use datafusion_functions_aggregate::average::avg_udaf; use datafusion_functions_aggregate::expr_fn::{ @@ -2813,8 +2813,8 @@ async fn roundtrip_custom_listing_tables_schema_table_scan_projection() -> Resul let projection = ["part", "value"] .iter() - .map(|field_name| listing_table.schema().index_of(field_name)) - .collect::, _>>()?; + .map(|field_name| col(*field_name)) + .collect::>(); let plan = LogicalPlanBuilder::scan( "hive_style", diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index 56bf887dbde43..c05d5b01b5618 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -1092,23 +1092,26 @@ impl Unparser<'_> { if project_vec.is_empty() { builder = builder.project(self.empty_projection_fallback())?; } else { - let project_columns = project_vec - .iter() - .cloned() - .map(|i| { - let schema = table_scan.source.schema(); - let field = schema.field(i); - if alias.is_some() { - Column::new(alias.clone(), field.name().clone()) - } else { - Column::new( - Some(table_scan.table_name.clone()), - field.name().clone(), - ) - } - }) - .collect::>(); - builder = builder.project(project_columns)?; + // project_vec is already Vec, use directly + // If there's an alias, we may need to re-qualify the columns + let project_exprs: Vec = if alias.is_some() { + project_vec + .iter() + .map(|expr| { + if let Expr::Column(col) = expr { + Expr::Column(Column::new( + alias.clone(), + col.name(), + )) + } else { + expr.clone() + } + }) + .collect() + } else { + project_vec.clone() + }; + builder = builder.project(project_exprs)?; }; } diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index 46a42ae534af0..1c6cad7e3e176 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -994,7 +994,8 @@ fn test_aggregation_without_projection() -> Result<()> { ]); let plan = LogicalPlanBuilder::from( - table_scan(Some("users"), &schema, Some(vec![0, 1]))?.build()?, + table_scan(Some("users"), &schema, Some(vec![col("name"), col("age")]))? + .build()?, ) .aggregate(vec![col("name")], vec![sum(col("age"))])? .build()?; @@ -1315,7 +1316,7 @@ fn test_table_scan_with_empty_projection_and_filter_default_dialect() { fn table_scan_with_empty_projection_and_none_projection_helper( table_name: &str, table_schema: Schema, - projection: Option>, + projection: Option>, ) -> LogicalPlan { table_scan(Some(table_name), &table_schema, projection) .unwrap() @@ -1497,7 +1498,7 @@ fn test_table_scan_alias() -> Result<()> { let table_scan_with_pushdown_all = table_scan_with_filter_and_fetch( Some("t1"), &schema, - Some(vec![0, 1]), + Some(vec![col("id"), col("age")]), vec![col("id").gt(lit(1))], Some(10), )? @@ -1519,14 +1520,15 @@ fn test_table_scan_pushdown() -> Result<()> { Field::new("age", DataType::Utf8, false), ]); let scan_with_projection = - table_scan(Some("t1"), &schema, Some(vec![0, 1]))?.build()?; + table_scan(Some("t1"), &schema, Some(vec![col("id"), col("age")]))?.build()?; let scan_with_projection = plan_to_sql(&scan_with_projection)?; assert_snapshot!( scan_with_projection, @"SELECT t1.id, t1.age FROM t1" ); - let scan_with_projection = table_scan(Some("t1"), &schema, Some(vec![1]))?.build()?; + let scan_with_projection = + table_scan(Some("t1"), &schema, Some(vec![col("age")]))?.build()?; let scan_with_projection = plan_to_sql(&scan_with_projection)?; assert_snapshot!( scan_with_projection, @@ -1541,7 +1543,7 @@ fn test_table_scan_pushdown() -> Result<()> { ); let table_scan_with_projection_alias = - table_scan(Some("t1"), &schema, Some(vec![0, 1]))? + table_scan(Some("t1"), &schema, Some(vec![col("id"), col("age")]))? .alias("ta")? .build()?; let table_scan_with_projection_alias = @@ -1552,7 +1554,7 @@ fn test_table_scan_pushdown() -> Result<()> { ); let table_scan_with_projection_alias = - table_scan(Some("t1"), &schema, Some(vec![1]))? + table_scan(Some("t1"), &schema, Some(vec![col("age")]))? .alias("ta")? .build()?; let table_scan_with_projection_alias = @@ -1573,7 +1575,7 @@ fn test_table_scan_pushdown() -> Result<()> { ); let query_from_table_scan_with_projection = LogicalPlanBuilder::from( - table_scan(Some("t1"), &schema, Some(vec![0, 1]))?.build()?, + table_scan(Some("t1"), &schema, Some(vec![col("id"), col("age")]))?.build()?, ) .project(vec![col("id"), col("age")])? .build()?; @@ -1585,7 +1587,7 @@ fn test_table_scan_pushdown() -> Result<()> { ); let query_from_table_scan_with_two_projections = LogicalPlanBuilder::from( - table_scan(Some("t1"), &schema, Some(vec![0, 1]))?.build()?, + table_scan(Some("t1"), &schema, Some(vec![col("id"), col("age")]))?.build()?, ) .project(vec![col("id"), col("age")])? .project(vec![wildcard()])? @@ -1640,7 +1642,7 @@ fn test_table_scan_pushdown() -> Result<()> { let table_scan_with_projection_and_filter = table_scan_with_filters( Some("t1"), &schema, - Some(vec![0, 1]), + Some(vec![col("id"), col("age")]), vec![col("id").gt(col("age"))], )? .build()?; @@ -1654,7 +1656,7 @@ fn test_table_scan_pushdown() -> Result<()> { let table_scan_with_projection_and_filter = table_scan_with_filters( Some("t1"), &schema, - Some(vec![1]), + Some(vec![col("age")]), vec![col("id").gt(col("age"))], )? .build()?; @@ -1677,7 +1679,7 @@ fn test_table_scan_pushdown() -> Result<()> { let table_scan_with_projection_and_inline_fetch = table_scan_with_filter_and_fetch( Some("t1"), &schema, - Some(vec![0, 1]), + Some(vec![col("id"), col("age")]), vec![], Some(10), )? @@ -1692,7 +1694,7 @@ fn test_table_scan_pushdown() -> Result<()> { let table_scan_with_all = table_scan_with_filter_and_fetch( Some("t1"), &schema, - Some(vec![0, 1]), + Some(vec![col("id"), col("age")]), vec![col("id").gt(col("age"))], Some(10), )? @@ -2306,7 +2308,12 @@ fn test_unparse_subquery_alias_with_table_pushdown() -> Result<()> { Field::new("c_name", DataType::Utf8, false), ]); - let table_scan = table_scan(Some("customer"), &schema, Some(vec![0, 1]))?.build()?; + let table_scan = table_scan( + Some("customer"), + &schema, + Some(vec![col("c_custkey"), col("c_name")]), + )? + .build()?; let plan = LogicalPlanBuilder::from(table_scan) .alias("customer")? @@ -2348,8 +2355,9 @@ fn test_unparse_left_anti_join() -> Result<()> { // SubqueryAlias: __correlated_sq_1 // TableScan: t2 projection=[c] - let table_scan1 = table_scan(Some("t1"), &schema, Some(vec![0, 1]))?.build()?; - let table_scan2 = table_scan(Some("t2"), &schema, Some(vec![0]))?.build()?; + let table_scan1 = + table_scan(Some("t1"), &schema, Some(vec![col("c"), col("d")]))?.build()?; + let table_scan2 = table_scan(Some("t2"), &schema, Some(vec![col("c")]))?.build()?; let subquery = subquery_alias(table_scan2, "__correlated_sq_1")?; let plan = LogicalPlanBuilder::from(table_scan1) .project(vec![col("t1.d")])? @@ -2382,8 +2390,9 @@ fn test_unparse_left_semi_join() -> Result<()> { // SubqueryAlias: __correlated_sq_1 // TableScan: t2 projection=[c] - let table_scan1 = table_scan(Some("t1"), &schema, Some(vec![0, 1]))?.build()?; - let table_scan2 = table_scan(Some("t2"), &schema, Some(vec![0]))?.build()?; + let table_scan1 = + table_scan(Some("t1"), &schema, Some(vec![col("c"), col("d")]))?.build()?; + let table_scan2 = table_scan(Some("t2"), &schema, Some(vec![col("c")]))?.build()?; let subquery = subquery_alias(table_scan2, "__correlated_sq_1")?; let plan = LogicalPlanBuilder::from(table_scan1) .project(vec![col("t1.d")])? @@ -2416,8 +2425,9 @@ fn test_unparse_left_mark_join() -> Result<()> { // TableScan: t1 projection=[c, d] // SubqueryAlias: __correlated_sq_1 // TableScan: t2 projection=[c] - let table_scan1 = table_scan(Some("t1"), &schema, Some(vec![0, 1]))?.build()?; - let table_scan2 = table_scan(Some("t2"), &schema, Some(vec![0]))?.build()?; + let table_scan1 = + table_scan(Some("t1"), &schema, Some(vec![col("c"), col("d")]))?.build()?; + let table_scan2 = table_scan(Some("t2"), &schema, Some(vec![col("c")]))?.build()?; let subquery = subquery_alias(table_scan2, "__correlated_sq_1")?; let plan = LogicalPlanBuilder::from(table_scan1) .join_on( @@ -2450,8 +2460,10 @@ fn test_unparse_right_semi_join() -> Result<()> { // TableScan: t1 projection=[c, d] // Projection: t2.c, t2.d // TableScan: t2 projection=[c, d] - let left = table_scan(Some("t1"), &schema, Some(vec![0, 1]))?.build()?; - let right_table_scan = table_scan(Some("t2"), &schema, Some(vec![0, 1]))?.build()?; + let left = + table_scan(Some("t1"), &schema, Some(vec![col("c"), col("d")]))?.build()?; + let right_table_scan = + table_scan(Some("t2"), &schema, Some(vec![col("c"), col("d")]))?.build()?; let right = LogicalPlanBuilder::from(right_table_scan) .project(vec![col("c"), col("d")])? .build()?; @@ -2488,8 +2500,10 @@ fn test_unparse_right_anti_join() -> Result<()> { // TableScan: t1 projection=[c, d] // Projection: t2.c, t2.d // TableScan: t2 projection=[c, d] - let left = table_scan(Some("t1"), &schema, Some(vec![0, 1]))?.build()?; - let right_table_scan = table_scan(Some("t2"), &schema, Some(vec![0, 1]))?.build()?; + let left = + table_scan(Some("t1"), &schema, Some(vec![col("c"), col("d")]))?.build()?; + let right_table_scan = + table_scan(Some("t2"), &schema, Some(vec![col("c"), col("d")]))?.build()?; let right = LogicalPlanBuilder::from(right_table_scan) .project(vec![col("c"), col("d")])? .build()?; @@ -2525,8 +2539,8 @@ fn test_unparse_cross_join_with_table_scan_projection() -> Result<()> { // TableScan: test projection=[v] // SubqueryAlias: t2 // TableScan: test projection=[v] - let table_scan1 = table_scan(Some("test"), &schema, Some(vec![1]))?.build()?; - let table_scan2 = table_scan(Some("test"), &schema, Some(vec![1]))?.build()?; + let table_scan1 = table_scan(Some("test"), &schema, Some(vec![col("v")]))?.build()?; + let table_scan2 = table_scan(Some("test"), &schema, Some(vec![col("v")]))?.build()?; let plan = LogicalPlanBuilder::from(subquery_alias(table_scan1, "t1")?) .cross_join(subquery_alias(table_scan2, "t2")?)? .build()?; @@ -2550,8 +2564,8 @@ fn test_unparse_inner_join_with_table_scan_projection() -> Result<()> { // TableScan: test projection=[v] // SubqueryAlias: t2 // TableScan: test projection=[v] - let table_scan1 = table_scan(Some("test"), &schema, Some(vec![1]))?.build()?; - let table_scan2 = table_scan(Some("test"), &schema, Some(vec![1]))?.build()?; + let table_scan1 = table_scan(Some("test"), &schema, Some(vec![col("v")]))?.build()?; + let table_scan2 = table_scan(Some("test"), &schema, Some(vec![col("v")]))?.build()?; let plan = LogicalPlanBuilder::from(subquery_alias(table_scan1, "t1")?) .join_on( subquery_alias(table_scan2, "t2")?, @@ -2579,8 +2593,8 @@ fn test_unparse_left_semi_join_with_table_scan_projection() -> Result<()> { // TableScan: test projection=[v] // SubqueryAlias: t2 // TableScan: test projection=[v] - let table_scan1 = table_scan(Some("test"), &schema, Some(vec![1]))?.build()?; - let table_scan2 = table_scan(Some("test"), &schema, Some(vec![1]))?.build()?; + let table_scan1 = table_scan(Some("test"), &schema, Some(vec![col("v")]))?.build()?; + let table_scan2 = table_scan(Some("test"), &schema, Some(vec![col("v")]))?.build()?; let plan = LogicalPlanBuilder::from(subquery_alias(table_scan1, "t1")?) .join_on( subquery_alias(table_scan2, "t2")?, @@ -2621,7 +2635,8 @@ fn test_unparse_window() -> Result<()> { filter: None, }, })); - let table = table_scan(Some("test"), &schema, Some(vec![0, 1]))?.build()?; + let table = + table_scan(Some("test"), &schema, Some(vec![col("k"), col("v")]))?.build()?; let plan = LogicalPlanBuilder::window_plan(table, vec![window_expr.clone()])?; let name = plan.schema().fields().last().unwrap().name().clone(); @@ -2659,7 +2674,8 @@ fn test_unparse_window() -> Result<()> { ); // without table qualifier - let table = table_scan(Some("test"), &schema, Some(vec![0, 1]))?.build()?; + let table = + table_scan(Some("test"), &schema, Some(vec![col("k"), col("v")]))?.build()?; let table = LogicalPlanBuilder::from(table) .project(vec![col("k").alias("k"), col("v").alias("v")])? .build()?; diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 38037ede21db2..d9ba4bb2516c1 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -2314,8 +2314,7 @@ logical_plan 03)--SubqueryAlias: __correlated_sq_1 04)----Projection: join_t2.t2_id 05)------Aggregate: groupBy=[[join_t2.t2_int, join_t2.t2_id]], aggr=[[]] -06)--------Projection: join_t2.t2_int, join_t2.t2_id -07)----------TableScan: join_t2 projection=[t2_id, t2_int] +06)--------TableScan: join_t2 projection=[t2_int, t2_id] statement ok set datafusion.optimizer.repartition_joins = false; diff --git a/datafusion/sqllogictest/test_files/parquet_sorted_statistics.slt b/datafusion/sqllogictest/test_files/parquet_sorted_statistics.slt index 5a559bdb94835..f95c0876ab8a2 100644 --- a/datafusion/sqllogictest/test_files/parquet_sorted_statistics.slt +++ b/datafusion/sqllogictest/test_files/parquet_sorted_statistics.slt @@ -132,8 +132,7 @@ ORDER BY int_col, bigint_col, nulls_first_col NULLS FIRST, nulls_last_col NULLS logical_plan 01)Projection: test_table.string_col 02)--Sort: test_table.int_col ASC NULLS LAST, test_table.bigint_col ASC NULLS LAST, test_table.nulls_first_col ASC NULLS FIRST, test_table.nulls_last_col ASC NULLS LAST -03)----Projection: test_table.string_col, test_table.int_col, test_table.bigint_col, test_table.nulls_first_col, test_table.nulls_last_col -04)------TableScan: test_table projection=[int_col, string_col, bigint_col, nulls_first_col, nulls_last_col] +03)----TableScan: test_table projection=[string_col, int_col, bigint_col, nulls_first_col, nulls_last_col] physical_plan 01)ProjectionExec: expr=[string_col@0 as string_col] 02)--SortPreservingMergeExec: [int_col@1 ASC NULLS LAST, bigint_col@2 ASC NULLS LAST, nulls_first_col@3 ASC, nulls_last_col@4 ASC NULLS LAST] diff --git a/datafusion/sqllogictest/test_files/select.slt b/datafusion/sqllogictest/test_files/select.slt index 490df4b72d17b..e9c4d335c87b1 100644 --- a/datafusion/sqllogictest/test_files/select.slt +++ b/datafusion/sqllogictest/test_files/select.slt @@ -1538,8 +1538,7 @@ logical_plan 02)--Aggregate: groupBy=[[aggregate_test_100.c2]], aggr=[[count(Int64(1))]] 03)----Projection: aggregate_test_100.c2 04)------Sort: aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST, fetch=4 -05)--------Projection: aggregate_test_100.c2, aggregate_test_100.c1 -06)----------TableScan: aggregate_test_100 projection=[c1, c2] +05)--------TableScan: aggregate_test_100 projection=[c2, c1] physical_plan 01)ProjectionExec: expr=[c2@0 as c2, count(Int64(1))@1 as count(*)] 02)--AggregateExec: mode=FinalPartitioned, gby=[c2@0 as c2], aggr=[count(Int64(1))] diff --git a/datafusion/sqllogictest/test_files/subquery.slt b/datafusion/sqllogictest/test_files/subquery.slt index da0bfc89d5848..db4b8db5db0ba 100644 --- a/datafusion/sqllogictest/test_files/subquery.slt +++ b/datafusion/sqllogictest/test_files/subquery.slt @@ -364,8 +364,7 @@ logical_plan 02)--LeftSemi Join: orders.o_orderstatus = __correlated_sq_1.l_linestatus, orders.o_orderkey = __correlated_sq_1.l_orderkey 03)----TableScan: orders projection=[o_orderkey, o_orderstatus] 04)----SubqueryAlias: __correlated_sq_1 -05)------Projection: lineitem.l_linestatus, lineitem.l_orderkey -06)--------TableScan: lineitem projection=[l_orderkey, l_linestatus] +05)------TableScan: lineitem projection=[l_linestatus, l_orderkey] query I rowsort select o_orderkey from orders diff --git a/datafusion/substrait/src/logical_plan/consumer/rel/read_rel.rs b/datafusion/substrait/src/logical_plan/consumer/rel/read_rel.rs index 832110e11131c..65cc661cc3815 100644 --- a/datafusion/substrait/src/logical_plan/consumer/rel/read_rel.rs +++ b/datafusion/substrait/src/logical_plan/consumer/rel/read_rel.rs @@ -27,6 +27,7 @@ use datafusion::datasource::provider_as_source; use datafusion::logical_expr::utils::split_conjunction_owned; use datafusion::logical_expr::{ EmptyRelation, Expr, LogicalPlan, LogicalPlanBuilder, Values, + projection_exprs_from_schema_and_indices, }; use std::sync::Arc; use substrait::proto::expression::MaskExpression; @@ -329,7 +330,12 @@ fn apply_projection( fields, df_schema.metadata().clone(), )?); - scan.projection = Some(column_indices); + // Convert column indices to projection expressions + let projection_exprs = projection_exprs_from_schema_and_indices( + scan.source.schema().as_ref(), + &column_indices, + )?; + scan.projection = Some(projection_exprs); Ok(LogicalPlan::TableScan(scan)) } diff --git a/datafusion/substrait/src/logical_plan/producer/rel/read_rel.rs b/datafusion/substrait/src/logical_plan/producer/rel/read_rel.rs index 8dfbb36d3767d..83d3782f73311 100644 --- a/datafusion/substrait/src/logical_plan/producer/rel/read_rel.rs +++ b/datafusion/substrait/src/logical_plan/producer/rel/read_rel.rs @@ -20,7 +20,7 @@ use crate::logical_plan::producer::{ }; use datafusion::common::{DFSchema, ToDFSchema, substrait_datafusion_err}; use datafusion::logical_expr::utils::conjunction; -use datafusion::logical_expr::{EmptyRelation, Expr, TableScan, Values}; +use datafusion::logical_expr::{EmptyRelation, Expr, ProjectionExprs, TableScan, Values}; use datafusion::scalar::ScalarValue; use std::sync::Arc; use substrait::proto::expression::MaskExpression; @@ -97,14 +97,23 @@ pub fn from_table_scan( producer: &mut impl SubstraitProducer, scan: &TableScan, ) -> datafusion::common::Result> { - let projection = scan.projection.as_ref().map(|p| { - p.iter() - .map(|i| StructItem { - field: *i as i32, - child: None, - }) - .collect() - }); + // Convert projection expressions to column indices for Substrait + let projection = scan + .projection + .as_ref() + .map(|proj_exprs| { + let indices = proj_exprs.projection_column_indices(&scan.source.schema())?; + Ok::<_, datafusion::common::DataFusionError>( + indices + .into_iter() + .map(|i| StructItem { + field: i as i32, + child: None, + }) + .collect::>(), + ) + }) + .transpose()?; let projection = projection.map(|struct_items| MaskExpression { select: Some(StructSelect { struct_items }),