Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions datafusion/datasource-parquet/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ impl ParquetSource {
}

/// Optional predicate.
#[deprecated(since = "50.2.0", note = "use `filter` instead")]
pub fn predicate(&self) -> Option<&Arc<dyn PhysicalExpr>> {
self.predicate.as_ref()
}
Expand Down Expand Up @@ -631,7 +632,7 @@ impl FileSource for ParquetSource {
// (bloom filters use `pruning_predicate` too).
// Because filter pushdown may happen dynamically as long as there is a predicate
// if we have *any* predicate applied, we can't guarantee the statistics are exact.
if self.predicate().is_some() {
if self.filter().is_some() {
Ok(statistics.to_inexact())
} else {
Ok(statistics)
Expand All @@ -646,7 +647,7 @@ impl FileSource for ParquetSource {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
let predicate_string = self
.predicate()
.filter()
.map(|p| format!(", predicate={p}"))
.unwrap_or_default();

Expand Down Expand Up @@ -686,7 +687,7 @@ impl FileSource for ParquetSource {
Ok(())
}
DisplayFormatType::TreeRender => {
if let Some(predicate) = self.predicate() {
if let Some(predicate) = self.filter() {
writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?;
}
Ok(())
Expand Down Expand Up @@ -780,3 +781,19 @@ impl FileSource for ParquetSource {
self.schema_adapter_factory.clone()
}
}

#[cfg(test)]
mod tests {
use super::*;
use datafusion_physical_expr::expressions::lit;

#[test]
#[allow(deprecated)]
fn test_parquet_source_predicate_same_as_filter() {
let predicate = lit(true);

let parquet_source = ParquetSource::default().with_predicate(predicate);
// same value. but filter() call Arc::clone internally
assert_eq!(parquet_source.predicate(), parquet_source.filter().as_ref());
}
}
6 changes: 3 additions & 3 deletions datafusion/proto/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use datafusion::datasource::physical_plan::AvroSource;
#[cfg(feature = "parquet")]
use datafusion::datasource::physical_plan::ParquetSource;
use datafusion::datasource::physical_plan::{
CsvSource, FileScanConfig, FileScanConfigBuilder, JsonSource,
CsvSource, FileScanConfig, FileScanConfigBuilder, FileSource, JsonSource,
};
use datafusion::datasource::sink::DataSinkExec;
use datafusion::datasource::source::{DataSource, DataSourceExec};
Expand Down Expand Up @@ -2591,8 +2591,8 @@ impl protobuf::PhysicalPlanNode {
data_source_exec.downcast_to_file_source::<ParquetSource>()
{
let predicate = conf
.predicate()
.map(|pred| serialize_physical_expr(pred, extension_codec))
.filter()
.map(|pred| serialize_physical_expr(&pred, extension_codec))
.transpose()?;
return Ok(Some(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::ParquetScan(
Expand Down