From b12945c789bba3268092f64a13ce267f4dcf7b18 Mon Sep 17 00:00:00 2001 From: Namgung Chan <9511chn@gmail.com> Date: Thu, 9 Oct 2025 01:13:36 +0900 Subject: [PATCH 1/3] refactor : remove ParquetSource filter merge into predicate --- datafusion/datasource-parquet/src/source.rs | 7 ++++--- datafusion/proto/src/physical_plan/mod.rs | 6 +++--- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 2469962ca0bc..ea54fc01de6b 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -336,6 +336,7 @@ impl ParquetSource { } /// Optional predicate. + #[deprecated(since = "50.2.0", note = "use `filter` instead")] pub fn predicate(&self) -> Option<&Arc> { self.predicate.as_ref() } @@ -631,7 +632,7 @@ impl FileSource for ParquetSource { // (bloom filters use `pruning_predicate` too). // Because filter pushdown may happen dynamically as long as there is a predicate // if we have *any* predicate applied, we can't guarantee the statistics are exact. - if self.predicate().is_some() { + if self.filter().is_some() { Ok(statistics.to_inexact()) } else { Ok(statistics) @@ -646,7 +647,7 @@ impl FileSource for ParquetSource { match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { let predicate_string = self - .predicate() + .filter() .map(|p| format!(", predicate={p}")) .unwrap_or_default(); @@ -686,7 +687,7 @@ impl FileSource for ParquetSource { Ok(()) } DisplayFormatType::TreeRender => { - if let Some(predicate) = self.predicate() { + if let Some(predicate) = self.filter() { writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?; } Ok(()) diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index e7d8479c1405..18dde6a4fe09 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -53,7 +53,7 @@ use datafusion::datasource::physical_plan::AvroSource; #[cfg(feature = "parquet")] use datafusion::datasource::physical_plan::ParquetSource; use datafusion::datasource::physical_plan::{ - CsvSource, FileScanConfig, FileScanConfigBuilder, JsonSource, + CsvSource, FileScanConfig, FileScanConfigBuilder, FileSource, JsonSource, }; use datafusion::datasource::sink::DataSinkExec; use datafusion::datasource::source::{DataSource, DataSourceExec}; @@ -2591,8 +2591,8 @@ impl protobuf::PhysicalPlanNode { data_source_exec.downcast_to_file_source::() { let predicate = conf - .predicate() - .map(|pred| serialize_physical_expr(pred, extension_codec)) + .filter() + .map(|pred| serialize_physical_expr(&pred, extension_codec)) .transpose()?; return Ok(Some(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::ParquetScan( From 99ed0418b6c4c2b6a839021ab9e0f14079b3ee66 Mon Sep 17 00:00:00 2001 From: Namgung Chan <9511chn@gmail.com> Date: Thu, 9 Oct 2025 21:55:41 +0900 Subject: [PATCH 2/3] refactor : unit test --- datafusion/datasource-parquet/src/source.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index ea54fc01de6b..bfc05b1da242 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -781,3 +781,19 @@ impl FileSource for ParquetSource { self.schema_adapter_factory.clone() } } + +#[cfg(test)] +mod tests { + use super::*; + use datafusion_physical_expr::expressions::lit; + + #[test] + #[allow(deprecated)] + fn test_parquet_source_predicate_same_as_filter() { + let predicate = lit(true); + + let parquet_source = ParquetSource::default().with_predicate(predicate); + // same value. but filter() call Arc::clone internally + assert_eq!(parquet_source.predicate(), parquet_source.filter().as_ref()); + } +} \ No newline at end of file From 5c41eb199e41872010a1552b310a20fdc048e427 Mon Sep 17 00:00:00 2001 From: Namgung Chan <9511chn@gmail.com> Date: Thu, 9 Oct 2025 22:47:19 +0900 Subject: [PATCH 3/3] cargo fmt --- datafusion/datasource-parquet/src/source.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index bfc05b1da242..dd10363079f9 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -796,4 +796,4 @@ mod tests { // same value. but filter() call Arc::clone internally assert_eq!(parquet_source.predicate(), parquet_source.filter().as_ref()); } -} \ No newline at end of file +}