From 53bc15b2fe38420768589831ffcaa84e0d8c24a6 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 4 Sep 2025 12:44:43 -0500 Subject: [PATCH 1/7] Refactor TableProvider::scan into TableProvider::scan_with_args --- datafusion/catalog/src/table.rs | 150 +++++++++++++++++- .../core/src/datasource/listing/table.rs | 36 ++++- datafusion/core/src/physical_planner.rs | 10 +- 3 files changed, 185 insertions(+), 11 deletions(-) diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs index ac2e1884ba92..1282a6d9ab0d 100644 --- a/datafusion/catalog/src/table.rs +++ b/datafusion/catalog/src/table.rs @@ -25,7 +25,7 @@ use arrow::datatypes::SchemaRef; use async_trait::async_trait; use datafusion_common::Result; use datafusion_common::{not_impl_err, Constraints, Statistics}; -use datafusion_expr::Expr; +use datafusion_expr::{Expr, SortExpr}; use datafusion_expr::dml::InsertOp; use datafusion_expr::{ @@ -171,6 +171,41 @@ pub trait TableProvider: Debug + Sync + Send { limit: Option, ) -> Result>; + /// Create an [`ExecutionPlan`] for scanning the table using structured arguments. + /// + /// This method uses [`ScanArgs`] to pass scan parameters in a structured way + /// and returns a [`ScanResult`] containing the execution plan. This approach + /// allows for extensible parameter passing and result handling. + /// + /// Table providers can override this method to take advantage of additional + /// parameters like `preferred_ordering` that may not be available through + /// other scan methods. + /// + /// # Arguments + /// * `state` - The session state containing configuration and context + /// * `args` - Structured scan arguments including projection, filters, limit, and ordering preferences + /// + /// # Returns + /// A [`ScanResult`] containing the [`ExecutionPlan`] for scanning the table + /// + /// See [`Self::scan`] for detailed documentation about projection, filters, and limits. + async fn scan_with_args( + &self, + state: &dyn Session, + args: ScanArgs, + ) -> Result { + let ScanArgs { + filters, + projection, + limit, + } = args; + let filters = filters.unwrap_or_default(); + let plan = self + .scan(state, projection.as_ref(), &filters, limit) + .await?; + Ok(ScanResult::new(plan)) + } + /// Specify if DataFusion should provide filter expressions to the /// TableProvider to apply *during* the scan. /// @@ -299,6 +334,119 @@ pub trait TableProvider: Debug + Sync + Send { } } +/// Arguments for scanning a table with [`TableProvider::scan_with_args`]. +/// +/// `ScanArgs` provides a structured way to pass scan parameters to table providers, +/// replacing the multiple individual parameters used by [`TableProvider::scan`]. +/// This struct uses the builder pattern for convenient construction. +/// +/// # Examples +/// +/// ``` +/// # use datafusion_catalog::ScanArgs; +/// # use datafusion_expr::Expr; +/// let args = ScanArgs::default() +/// .with_projection(Some(vec![0, 2, 4])) +/// .with_limit(Some(1000)); +/// ``` +#[derive(Debug, Clone, Default)] +pub struct ScanArgs { + filters: Option>, + projection: Option>, + limit: Option, +} + +impl ScanArgs { + /// Set the column projection for the scan. + /// + /// The projection is a list of column indices from [`TableProvider::schema`] + /// that should be included in the scan results. If `None`, all columns are included. + /// + /// # Arguments + /// * `projection` - Optional list of column indices to project + pub fn with_projection(mut self, projection: Option>) -> Self { + self.projection = projection; + self + } + + /// Get the column projection for the scan. + /// + /// Returns a cloned copy of the projection column indices, or `None` if + /// no projection was specified (meaning all columns should be included). + pub fn projection(&self) -> Option> { + self.projection.clone() + } + + /// Set the filter expressions for the scan. + /// + /// Filters are boolean expressions that should be evaluated during the scan + /// to reduce the number of rows returned. All expressions are combined with AND logic. + /// Whether filters are actually pushed down depends on [`TableProvider::supports_filters_pushdown`]. + /// + /// # Arguments + /// * `filters` - Optional list of filter expressions + pub fn with_filters(mut self, filters: Option>) -> Self { + self.filters = filters; + self + } + + /// Get the filter expressions for the scan. + /// + /// Returns a reference to the filter expressions, or `None` if no filters were specified. + pub fn filters(&self) -> Option<&[Expr]> { + self.filters.as_deref() + } + + /// Set the maximum number of rows to return from the scan. + /// + /// If specified, the scan should return at most this many rows. This is typically + /// used to optimize queries with `LIMIT` clauses. + /// + /// # Arguments + /// * `limit` - Optional maximum number of rows to return + pub fn with_limit(mut self, limit: Option) -> Self { + self.limit = limit; + self + } + + /// Get the maximum number of rows to return from the scan. + /// + /// Returns the row limit, or `None` if no limit was specified. + pub fn limit(&self) -> Option { + self.limit + } +} + +/// Result of a table scan operation from [`TableProvider::scan_with_args`]. +/// +/// `ScanResult` encapsulates the [`ExecutionPlan`] produced by a table scan, +/// providing a typed return value instead of returning the plan directly. +/// This allows for future extensibility of scan results without breaking +/// the API. +#[derive(Debug, Clone)] +pub struct ScanResult { + /// The ExecutionPlan to run. + plan: Arc, +} + +impl ScanResult { + /// Create a new `ScanResult` with the given execution plan. + /// + /// # Arguments + /// * `plan` - The execution plan that will perform the table scan + pub fn new(plan: Arc) -> Self { + Self { plan } + } + + /// Get the execution plan for this scan result. + /// + /// Returns a cloned reference to the [`ExecutionPlan`] that will perform + /// the actual table scanning and data retrieval. + pub fn plan(&self) -> Arc { + Arc::clone(&self.plan) + } +} + /// A factory which creates [`TableProvider`]s at runtime given a URL. /// /// For example, this can be used to create a table "on the fly" diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 9858b109125f..36eb99a80d42 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -29,7 +29,7 @@ use crate::{ use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef}; use arrow_schema::Schema; use async_trait::async_trait; -use datafusion_catalog::{Session, TableProvider}; +use datafusion_catalog::{ScanArgs, ScanResult, Session, TableProvider}; use datafusion_common::{ config_datafusion_err, config_err, internal_err, plan_err, project_schema, stats::Precision, Constraints, DataFusionError, Result, SchemaExt, @@ -1169,6 +1169,22 @@ impl TableProvider for ListingTable { filters: &[Expr], limit: Option, ) -> Result> { + let options = ScanArgs::default() + .with_projection(projection.cloned()) + .with_filters(Some(filters.to_vec())) + .with_limit(limit); + Ok(self.scan_with_args(state, options).await?.plan()) + } + + async fn scan_with_args( + &self, + state: &dyn Session, + args: ScanArgs, + ) -> Result { + let projection = args.projection(); + let filters = args.filters().map(|f| f.to_vec()).unwrap_or_default(); + let limit = args.limit(); + // extract types of partition columns let table_partition_cols = self .options @@ -1181,6 +1197,7 @@ impl TableProvider for ListingTable { .iter() .map(|field| field.name().as_str()) .collect::>(); + // If the filters can be resolved using only partition cols, there is no need to // pushdown it to TableScan, otherwise, `unhandled` pruning predicates will be generated let (partition_filters, filters): (Vec<_>, Vec<_>) = @@ -1198,8 +1215,8 @@ impl TableProvider for ListingTable { // if no files need to be read, return an `EmptyExec` if partitioned_file_lists.is_empty() { - let projected_schema = project_schema(&self.schema(), projection)?; - return Ok(Arc::new(EmptyExec::new(projected_schema))); + let projected_schema = project_schema(&self.schema(), projection.as_ref())?; + return Ok(ScanResult::new(Arc::new(EmptyExec::new(projected_schema)))); } let output_ordering = self.try_create_output_ordering()?; @@ -1233,13 +1250,16 @@ impl TableProvider for ListingTable { let Some(object_store_url) = self.table_paths.first().map(ListingTableUrl::object_store) else { - return Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty())))); + return Ok(ScanResult::new(Arc::new(EmptyExec::new(Arc::new( + Schema::empty(), + ))))); }; let file_source = self.create_file_source_with_schema_adapter()?; // create the execution plan - self.options + let plan = self + .options .format .create_physical_plan( state, @@ -1251,14 +1271,16 @@ impl TableProvider for ListingTable { .with_file_groups(partitioned_file_lists) .with_constraints(self.constraints.clone()) .with_statistics(statistics) - .with_projection(projection.cloned()) + .with_projection(projection) .with_limit(limit) .with_output_ordering(output_ordering) .with_table_partition_cols(table_partition_cols) .with_expr_adapter(self.expr_adapter_factory.clone()) .build(), ) - .await + .await?; + + Ok(ScanResult::new(plan)) } fn supports_filters_pushdown( diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 6618d9495d78..200bac0df843 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -60,6 +60,7 @@ use crate::schema_equivalence::schema_satisfied_by; use arrow::array::{builder::StringBuilder, RecordBatch}; use arrow::compute::SortOptions; use arrow::datatypes::{Schema, SchemaRef}; +use datafusion_catalog::ScanArgs; use datafusion_common::display::ToStringifiedPlan; use datafusion_common::tree_node::{ Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeVisitor, @@ -459,9 +460,12 @@ impl DefaultPhysicalPlanner { // doesn't know (nor should care) how the relation was // referred to in the query let filters = unnormalize_cols(filters.iter().cloned()); - source - .scan(session_state, projection.as_ref(), &filters, *fetch) - .await? + let opts = ScanArgs::default() + .with_projection(projection.clone()) + .with_filters(Some(filters)) + .with_limit(*fetch); + let res = source.scan_with_args(session_state, opts).await?; + res.plan() } LogicalPlan::Values(Values { values, schema }) => { let exec_schema = schema.as_ref().to_owned().into(); From 527834926c24398e5fcddded9440f683e04a3181 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 5 Sep 2025 11:07:44 -0500 Subject: [PATCH 2/7] lint --- datafusion/catalog/src/table.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs index 1282a6d9ab0d..7b304d334e7c 100644 --- a/datafusion/catalog/src/table.rs +++ b/datafusion/catalog/src/table.rs @@ -25,7 +25,7 @@ use arrow::datatypes::SchemaRef; use async_trait::async_trait; use datafusion_common::Result; use datafusion_common::{not_impl_err, Constraints, Statistics}; -use datafusion_expr::{Expr, SortExpr}; +use datafusion_expr::Expr; use datafusion_expr::dml::InsertOp; use datafusion_expr::{ From 1d5bad19f42d45033c81dd771e4ae2dcfb77aa96 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 12 Sep 2025 11:16:52 -0500 Subject: [PATCH 3/7] fix --- datafusion/catalog/src/table.rs | 84 +++++++++---------- .../core/src/datasource/listing/table.rs | 14 ++-- datafusion/core/src/physical_planner.rs | 7 +- 3 files changed, 50 insertions(+), 55 deletions(-) diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs index 7b304d334e7c..7362e9556b8f 100644 --- a/datafusion/catalog/src/table.rs +++ b/datafusion/catalog/src/table.rs @@ -189,21 +189,18 @@ pub trait TableProvider: Debug + Sync + Send { /// A [`ScanResult`] containing the [`ExecutionPlan`] for scanning the table /// /// See [`Self::scan`] for detailed documentation about projection, filters, and limits. - async fn scan_with_args( + async fn scan_with_args<'a>( &self, state: &dyn Session, - args: ScanArgs, + args: ScanArgs<'a>, ) -> Result { - let ScanArgs { - filters, - projection, - limit, - } = args; - let filters = filters.unwrap_or_default(); + let filters = args.filters().unwrap_or(&[]); + let projection = args.projection().map(|p| p.to_vec()); + let limit = args.limit(); let plan = self - .scan(state, projection.as_ref(), &filters, limit) + .scan(state, projection.as_ref(), filters, limit) .await?; - Ok(ScanResult::new(plan)) + Ok(plan.into()) } /// Specify if DataFusion should provide filter expressions to the @@ -335,46 +332,32 @@ pub trait TableProvider: Debug + Sync + Send { } /// Arguments for scanning a table with [`TableProvider::scan_with_args`]. -/// -/// `ScanArgs` provides a structured way to pass scan parameters to table providers, -/// replacing the multiple individual parameters used by [`TableProvider::scan`]. -/// This struct uses the builder pattern for convenient construction. -/// -/// # Examples -/// -/// ``` -/// # use datafusion_catalog::ScanArgs; -/// # use datafusion_expr::Expr; -/// let args = ScanArgs::default() -/// .with_projection(Some(vec![0, 2, 4])) -/// .with_limit(Some(1000)); -/// ``` #[derive(Debug, Clone, Default)] -pub struct ScanArgs { - filters: Option>, - projection: Option>, +pub struct ScanArgs<'a> { + filters: Option<&'a [Expr]>, + projection: Option<&'a [usize]>, limit: Option, } -impl ScanArgs { +impl<'a> ScanArgs<'a> { /// Set the column projection for the scan. /// /// The projection is a list of column indices from [`TableProvider::schema`] /// that should be included in the scan results. If `None`, all columns are included. /// /// # Arguments - /// * `projection` - Optional list of column indices to project - pub fn with_projection(mut self, projection: Option>) -> Self { + /// * `projection` - Optional slice of column indices to project + pub fn with_projection(mut self, projection: Option<&'a [usize]>) -> Self { self.projection = projection; self } /// Get the column projection for the scan. /// - /// Returns a cloned copy of the projection column indices, or `None` if + /// Returns a reference to the projection column indices, or `None` if /// no projection was specified (meaning all columns should be included). - pub fn projection(&self) -> Option> { - self.projection.clone() + pub fn projection(&self) -> Option<&'a [usize]> { + self.projection } /// Set the filter expressions for the scan. @@ -384,8 +367,8 @@ impl ScanArgs { /// Whether filters are actually pushed down depends on [`TableProvider::supports_filters_pushdown`]. /// /// # Arguments - /// * `filters` - Optional list of filter expressions - pub fn with_filters(mut self, filters: Option>) -> Self { + /// * `filters` - Optional slice of filter expressions + pub fn with_filters(mut self, filters: Option<&'a [Expr]>) -> Self { self.filters = filters; self } @@ -393,8 +376,8 @@ impl ScanArgs { /// Get the filter expressions for the scan. /// /// Returns a reference to the filter expressions, or `None` if no filters were specified. - pub fn filters(&self) -> Option<&[Expr]> { - self.filters.as_deref() + pub fn filters(&self) -> Option<&'a [Expr]> { + self.filters } /// Set the maximum number of rows to return from the scan. @@ -418,11 +401,6 @@ impl ScanArgs { } /// Result of a table scan operation from [`TableProvider::scan_with_args`]. -/// -/// `ScanResult` encapsulates the [`ExecutionPlan`] produced by a table scan, -/// providing a typed return value instead of returning the plan directly. -/// This allows for future extensibility of scan results without breaking -/// the API. #[derive(Debug, Clone)] pub struct ScanResult { /// The ExecutionPlan to run. @@ -438,12 +416,26 @@ impl ScanResult { Self { plan } } - /// Get the execution plan for this scan result. + /// Get a reference to the execution plan for this scan result. /// - /// Returns a cloned reference to the [`ExecutionPlan`] that will perform + /// Returns a reference to the [`ExecutionPlan`] that will perform /// the actual table scanning and data retrieval. - pub fn plan(&self) -> Arc { - Arc::clone(&self.plan) + pub fn plan(&self) -> &Arc { + &self.plan + } + + /// Consume this ScanResult and return the execution plan. + /// + /// Returns the owned [`ExecutionPlan`] that will perform + /// the actual table scanning and data retrieval. + pub fn into_inner(self) -> Arc { + self.plan + } +} + +impl From> for ScanResult { + fn from(plan: Arc) -> Self { + Self::new(plan) } } diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 36eb99a80d42..e9c331770d87 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -1170,18 +1170,20 @@ impl TableProvider for ListingTable { limit: Option, ) -> Result> { let options = ScanArgs::default() - .with_projection(projection.cloned()) - .with_filters(Some(filters.to_vec())) + .with_projection(projection.map(|p| p.as_slice())) + .with_filters(Some(filters)) .with_limit(limit); - Ok(self.scan_with_args(state, options).await?.plan()) + Ok(Arc::clone( + self.scan_with_args(state, options).await?.plan(), + )) } - async fn scan_with_args( + async fn scan_with_args<'a>( &self, state: &dyn Session, - args: ScanArgs, + args: ScanArgs<'a>, ) -> Result { - let projection = args.projection(); + let projection = args.projection().map(|p| p.to_vec()); let filters = args.filters().map(|f| f.to_vec()).unwrap_or_default(); let limit = args.limit(); diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 200bac0df843..d7f30609a459 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -460,12 +460,13 @@ impl DefaultPhysicalPlanner { // doesn't know (nor should care) how the relation was // referred to in the query let filters = unnormalize_cols(filters.iter().cloned()); + let filters_vec = filters.into_iter().collect::>(); let opts = ScanArgs::default() - .with_projection(projection.clone()) - .with_filters(Some(filters)) + .with_projection(projection.as_deref()) + .with_filters(Some(&filters_vec)) .with_limit(*fetch); let res = source.scan_with_args(session_state, opts).await?; - res.plan() + Arc::clone(res.plan()) } LogicalPlan::Values(Values { values, schema }) => { let exec_schema = schema.as_ref().to_owned().into(); From b7d00a3c5811cfd0e71e43db97fb762f3530382c Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 12 Sep 2025 15:20:39 -0500 Subject: [PATCH 4/7] Update datafusion/catalog/src/table.rs Co-authored-by: Andrew Lamb --- datafusion/catalog/src/table.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs index 7362e9556b8f..f34daf49a46b 100644 --- a/datafusion/catalog/src/table.rs +++ b/datafusion/catalog/src/table.rs @@ -174,8 +174,7 @@ pub trait TableProvider: Debug + Sync + Send { /// Create an [`ExecutionPlan`] for scanning the table using structured arguments. /// /// This method uses [`ScanArgs`] to pass scan parameters in a structured way - /// and returns a [`ScanResult`] containing the execution plan. This approach - /// allows for extensible parameter passing and result handling. + /// and returns a [`ScanResult`] containing the execution plan. /// /// Table providers can override this method to take advantage of additional /// parameters like `preferred_ordering` that may not be available through From 550cf26fd74e134c7d58b822d97458af63085e8a Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 12 Sep 2025 15:20:53 -0500 Subject: [PATCH 5/7] Update datafusion/core/src/datasource/listing/table.rs Co-authored-by: Andrew Lamb --- datafusion/core/src/datasource/listing/table.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index e9c331770d87..18d84c4ba0c2 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -1173,9 +1173,7 @@ impl TableProvider for ListingTable { .with_projection(projection.map(|p| p.as_slice())) .with_filters(Some(filters)) .with_limit(limit); - Ok(Arc::clone( - self.scan_with_args(state, options).await?.plan(), - )) + Ok(self.scan_with_args(state, options).await?.into_inner()) } async fn scan_with_args<'a>( From 46967c783849ca8cee0358e6aad86348d69e19bd Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 12 Sep 2025 15:21:01 -0500 Subject: [PATCH 6/7] Update datafusion/catalog/src/table.rs Co-authored-by: Andrew Lamb --- datafusion/catalog/src/table.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs index f34daf49a46b..b581394ae3b9 100644 --- a/datafusion/catalog/src/table.rs +++ b/datafusion/catalog/src/table.rs @@ -177,7 +177,7 @@ pub trait TableProvider: Debug + Sync + Send { /// and returns a [`ScanResult`] containing the execution plan. /// /// Table providers can override this method to take advantage of additional - /// parameters like `preferred_ordering` that may not be available through + /// parameters like the upcoming `preferred_ordering` that may not be available through /// other scan methods. /// /// # Arguments From 184f15ab65903fff002e9e0d7f3d876d7e1ede6d Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 12 Sep 2025 19:03:13 -0500 Subject: [PATCH 7/7] fmt --- datafusion/catalog/src/table.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs index b581394ae3b9..11c9af01a7a5 100644 --- a/datafusion/catalog/src/table.rs +++ b/datafusion/catalog/src/table.rs @@ -174,7 +174,7 @@ pub trait TableProvider: Debug + Sync + Send { /// Create an [`ExecutionPlan`] for scanning the table using structured arguments. /// /// This method uses [`ScanArgs`] to pass scan parameters in a structured way - /// and returns a [`ScanResult`] containing the execution plan. + /// and returns a [`ScanResult`] containing the execution plan. /// /// Table providers can override this method to take advantage of additional /// parameters like the upcoming `preferred_ordering` that may not be available through