diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 95a02147438b0..4e400f6013355 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -688,7 +688,7 @@ config_namespace! { /// (reading) If true, filter expressions are be applied during the parquet decoding operation to /// reduce the number of rows decoded. This optimization is sometimes called "late materialization". - pub pushdown_filters: bool, default = false + pub pushdown_filters: bool, default = true /// (reading) If true, filter expressions evaluated during the parquet decoding operation /// will be reordered heuristically to minimize the cost of evaluation. If false, @@ -732,6 +732,16 @@ config_namespace! { /// parquet reader setting. 0 means no caching. pub max_predicate_cache_size: Option, default = None + /// (reading) Minimum filter effectiveness threshold for adaptive filter + /// pushdown. + /// Only filters that filter out at least this fraction of rows will be + /// promoted to row filters during adaptive filter pushdown. + /// A value of 1.0 means only filters that filter out all rows will be + /// promoted. A value of 0.0 means all filters will be promoted. + /// Because there can be a high I/O cost to pushing down ineffective filters, + /// recommended values are in the range [0.8, 0.95], depending on random I/0 costs. + pub filter_effectiveness_threshold: f64, default = 1.0 + // The following options affect writing to parquet files // and map to parquet::file::properties::WriterProperties diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index 196cb96f3832d..2008df4082ced 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -209,6 +209,7 @@ impl ParquetOptions { coerce_int96: _, // not used for writer props skip_arrow_metadata: _, max_predicate_cache_size: _, + filter_effectiveness_threshold: _, // not used for writer props } = self; let mut builder = WriterProperties::builder() @@ -464,6 +465,7 @@ mod tests { skip_arrow_metadata: defaults.skip_arrow_metadata, coerce_int96: None, max_predicate_cache_size: defaults.max_predicate_cache_size, + filter_effectiveness_threshold: defaults.filter_effectiveness_threshold, } } @@ -578,6 +580,8 @@ mod tests { binary_as_string: global_options_defaults.binary_as_string, skip_arrow_metadata: global_options_defaults.skip_arrow_metadata, coerce_int96: None, + filter_effectiveness_threshold: global_options_defaults + .filter_effectiveness_threshold, }, column_specific_options, key_value_metadata, diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs index 6edf628e2d6d6..74451f1b27533 100644 --- a/datafusion/core/src/dataframe/parquet.rs +++ b/datafusion/core/src/dataframe/parquet.rs @@ -150,7 +150,14 @@ mod tests { let plan = df.explain(false, false)?.collect().await?; // Filters all the way to Parquet let formatted = pretty::pretty_format_batches(&plan)?.to_string(); - assert!(formatted.contains("FilterExec: id@0 = 1"), "{formatted}"); + let data_source_exec_row = formatted + .lines() + .find(|line| line.contains("DataSourceExec:")) + .unwrap(); + assert!( + data_source_exec_row.contains("predicate=id@0 = 1"), + "{formatted}" + ); Ok(()) } diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index ce2b05e6d3b61..bfacb8ed255bd 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -169,7 +169,8 @@ mod tests { if self.pushdown_predicate { source = source .with_pushdown_filters(true) - .with_reorder_filters(true); + .with_reorder_filters(true) + .with_filter_effectiveness_threshold(0.0); } else { source = source.with_pushdown_filters(false); } diff --git a/datafusion/core/src/datasource/view_test.rs b/datafusion/core/src/datasource/view_test.rs index 35418d6dea632..4700ab38ecad3 100644 --- a/datafusion/core/src/datasource/view_test.rs +++ b/datafusion/core/src/datasource/view_test.rs @@ -301,7 +301,11 @@ mod tests { #[tokio::test] async fn filter_pushdown_view() -> Result<()> { - let ctx = SessionContext::new(); + // Disable parquet pushdown_filters to ensure filters stay as FilterExec nodes + // rather than being pushed into the Parquet reader + let config = SessionConfig::new() + .set_bool("datafusion.execution.parquet.pushdown_filters", false); + let ctx = SessionContext::new_with_config(config); ctx.register_parquet( "test", diff --git a/datafusion/core/tests/parquet/filter_pushdown.rs b/datafusion/core/tests/parquet/filter_pushdown.rs index e3a191ee9ade2..35ae36b48c344 100644 --- a/datafusion/core/tests/parquet/filter_pushdown.rs +++ b/datafusion/core/tests/parquet/filter_pushdown.rs @@ -621,10 +621,10 @@ fn get_value(metrics: &MetricsSet, metric_name: &str) -> usize { #[tokio::test] async fn predicate_cache_default() -> datafusion_common::Result<()> { let ctx = SessionContext::new(); - // The cache is on by default, but not used unless filter pushdown is enabled + // The cache is on by default, and used since pushdown_filters is now true by default PredicateCacheTest { - expected_inner_records: 0, - expected_records: 0, + expected_inner_records: 8, + expected_records: 7, // reads more than necessary from the cache as then another bitmap is applied } .run(&ctx) .await diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index d12739658c400..444470c1155a5 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -1721,6 +1721,11 @@ async fn test_topk_dynamic_filter_pushdown_integration() { let mut cfg = SessionConfig::new(); cfg.options_mut().execution.parquet.pushdown_filters = true; cfg.options_mut().execution.parquet.max_row_group_size = 128; + // Always pushdown filters into row filters for this test + cfg.options_mut() + .execution + .parquet + .filter_effectiveness_threshold = 0.0; let ctx = SessionContext::new_with_config(cfg); ctx.register_object_store( ObjectStoreUrl::parse("memory://").unwrap().as_ref(), diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index fa248c448683b..7aee1a3d6002f 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -862,7 +862,7 @@ async fn parquet_explain_analyze() { .to_string(); // should contain aggregated stats - assert_contains!(&formatted, "output_rows=8"); + assert_contains!(&formatted, "output_rows=5"); assert_contains!( &formatted, "row_groups_pruned_bloom_filter=1 total \u{2192} 1 matched" @@ -995,11 +995,7 @@ async fn parquet_recursive_projection_pushdown() -> Result<()> { @r" SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] RecursiveQueryExec: name=number_series, is_distinct=false - CoalescePartitionsExec - ProjectionExec: expr=[id@0 as id, 1 as level] - FilterExec: id@0 = 1 - RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), input_partitions=1 - DataSourceExec: file_groups={1 group: [[TMP_DIR/hierarchy.parquet]]}, projection=[id], file_type=parquet, predicate=id@0 = 1, pruning_predicate=id_null_count@2 != row_count@3 AND id_min@0 <= 1 AND 1 <= id_max@1, required_guarantees=[id in (1)] + DataSourceExec: file_groups={1 group: [[TMP_DIR/hierarchy.parquet]]}, projection=[id, 1 as level], file_type=parquet, predicate=id@0 = 1, pruning_predicate=id_null_count@2 != row_count@3 AND id_min@0 <= 1 AND 1 <= id_max@1, required_guarantees=[id in (1)] CoalescePartitionsExec ProjectionExec: expr=[id@0 + 1 as ns.id + Int64(1), level@1 + 1 as ns.level + Int64(1)] FilterExec: id@0 < 10 diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 2109416d646fb..d73ead00f3751 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -456,6 +456,12 @@ impl FileFormat for ParquetFormat { ) -> Result> { let mut metadata_size_hint = None; + let filter_effectiveness_threshold = state + .config_options() + .execution + .parquet + .filter_effectiveness_threshold; + if let Some(metadata) = self.metadata_size_hint() { metadata_size_hint = Some(metadata); } @@ -467,6 +473,7 @@ impl FileFormat for ParquetFormat { .cloned() .ok_or_else(|| internal_datafusion_err!("Expected ParquetSource"))?; source = source.with_table_parquet_options(self.options.clone()); + source = source.with_filter_pushdown_selectivity(filter_effectiveness_threshold); // Use the CachedParquetFileReaderFactory let metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache(); diff --git a/datafusion/datasource-parquet/src/metrics.rs b/datafusion/datasource-parquet/src/metrics.rs index 5eaa137e9a456..0b85f5f72cecc 100644 --- a/datafusion/datasource-parquet/src/metrics.rs +++ b/datafusion/datasource-parquet/src/metrics.rs @@ -76,6 +76,8 @@ pub struct ParquetFileMetrics { /// number of rows that were stored in the cache after evaluating predicates /// reused for the output. pub predicate_cache_records: Count, + //// Time spent applying filters + pub filter_apply_time: Time, } impl ParquetFileMetrics { @@ -162,6 +164,10 @@ impl ParquetFileMetrics { .with_new_label("filename", filename.to_string()) .counter("predicate_cache_records", partition); + let filter_apply_time = MetricBuilder::new(metrics) + .with_new_label("filename", filename.to_string()) + .subset_time("filter_apply_time", partition); + Self { files_ranges_pruned_statistics, predicate_evaluation_errors, @@ -179,6 +185,7 @@ impl ParquetFileMetrics { scan_efficiency_ratio, predicate_cache_inner_records, predicate_cache_records, + filter_apply_time, } } } diff --git a/datafusion/datasource-parquet/src/mod.rs b/datafusion/datasource-parquet/src/mod.rs index d7e92f70afa99..b7c98ecf2660a 100644 --- a/datafusion/datasource-parquet/src/mod.rs +++ b/datafusion/datasource-parquet/src/mod.rs @@ -30,6 +30,7 @@ mod page_filter; mod reader; mod row_filter; mod row_group_filter; +mod selectivity; mod sort; pub mod source; mod supported_predicates; @@ -40,7 +41,10 @@ pub use file_format::*; pub use metrics::ParquetFileMetrics; pub use page_filter::PagePruningAccessPlanFilter; pub use reader::*; // Expose so downstream crates can use it +pub use row_filter::FilterMetrics; +pub use row_filter::RowFilterWithMetrics; pub use row_filter::build_row_filter; +pub use row_filter::build_row_filter_with_metrics; pub use row_filter::can_expr_be_pushed_down_with_schemas; pub use row_group_filter::RowGroupAccessPlanFilter; pub use writer::plan_to_parquet; diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 570f9b4412840..f6a83cec8e2f9 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -19,16 +19,19 @@ use crate::page_filter::PagePruningAccessPlanFilter; use crate::row_group_filter::RowGroupAccessPlanFilter; +use crate::selectivity::{PartitionedFilters, SelectivityTracker}; use crate::{ ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory, apply_file_schema_type_coercions, coerce_int96_to_resolution, row_filter, }; -use arrow::array::{RecordBatch, RecordBatchOptions}; +use arrow::array::{AsArray, RecordBatch, RecordBatchOptions}; use arrow::datatypes::DataType; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; -use datafusion_physical_expr::projection::ProjectionExprs; +use datafusion_physical_expr::projection::{ProjectionExpr, ProjectionExprs}; +use datafusion_physical_expr::split_conjunction; use datafusion_physical_expr::utils::reassign_expr_columns; use datafusion_physical_expr_adapter::replace_columns_with_literals; +use itertools::Itertools; use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; @@ -39,12 +42,13 @@ use datafusion_common::encryption::FileDecryptionProperties; use datafusion_common::stats::Precision; use datafusion_common::{ ColumnStatistics, DataFusionError, Result, ScalarValue, Statistics, exec_err, + internal_datafusion_err, }; use datafusion_datasource::{PartitionedFile, TableSchema}; use datafusion_physical_expr::simplifier::PhysicalExprSimplifier; use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use datafusion_physical_expr_common::physical_expr::{ - PhysicalExpr, is_dynamic_physical_expr, + PhysicalExpr, fmt_sql, is_dynamic_physical_expr, }; use datafusion_physical_plan::metrics::{ Count, ExecutionPlanMetricsSet, MetricBuilder, PruningMetrics, @@ -118,6 +122,9 @@ pub(super) struct ParquetOpener { pub max_predicate_cache_size: Option, /// Whether to read row groups in reverse order pub reverse_row_groups: bool, + /// Shared selectivity tracker for adaptive filter reordering. + /// Each opener reads stats and decides which filters to push down. + pub selectivity_tracker: Arc>, } /// Represents a prepared access plan with optional row selection @@ -277,6 +284,7 @@ impl FileOpener for ParquetOpener { let max_predicate_cache_size = self.max_predicate_cache_size; let reverse_row_groups = self.reverse_row_groups; + let selectivity_tracker = Arc::clone(&self.selectivity_tracker); Ok(Box::pin(async move { #[cfg(feature = "parquet_encryption")] let file_decryption_properties = encryption_context @@ -456,27 +464,10 @@ impl FileOpener for ParquetOpener { // --------------------------------------------------------------------- // Filter pushdown: evaluate predicates during scan - if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() { - let row_filter = row_filter::build_row_filter( - &predicate, - &physical_file_schema, - builder.metadata(), - reorder_predicates, - &file_metrics, - ); + // First, partition filters based on selectivity tracking + // filter_metrics will be populated if we successfully build a row filter + let mut filter_metrics: Vec = vec![]; - match row_filter { - Ok(Some(filter)) => { - builder = builder.with_row_filter(filter); - } - Ok(None) => {} - Err(e) => { - debug!( - "Ignoring error building row filter for '{predicate:?}': {e}" - ); - } - }; - }; if force_filter_selections { builder = builder.with_row_selection_policy(RowSelectionPolicy::Selectors); @@ -489,7 +480,6 @@ impl FileOpener for ParquetOpener { // Determine which row groups to actually read. The idea is to skip // as many row groups as possible based on the metadata and query let file_metadata = Arc::clone(builder.metadata()); - let predicate = pruning_predicate.as_ref().map(|p| p.as_ref()); let rg_metadata = file_metadata.row_groups(); // track which row groups to actually read let access_plan = @@ -501,7 +491,7 @@ impl FileOpener for ParquetOpener { } // If there is a predicate that can be evaluated against the metadata - if let Some(predicate) = predicate.as_ref() { + if let Some(predicate) = pruning_predicate.as_ref() { if enable_row_group_stats_pruning { row_groups.prune_by_statistics( &physical_file_schema, @@ -593,8 +583,108 @@ impl FileOpener for ParquetOpener { // metrics from the arrow reader itself let arrow_reader_metrics = ArrowReaderMetrics::enabled(); - let indices = projection.column_indices(); - let mask = ProjectionMask::roots(builder.parquet_schema(), indices); + // Acquire tracker lock once for both partitioning and row filter building. + // We hold it through the projection extension code (which doesn't need it) + // to avoid the deadlock from release-and-reacquire pattern. + let (post_scan_filters, original_projection_len, projection, mask) = { + let tracker = selectivity_tracker.read(); + + let PartitionedFilters { + row_filters, + post_scan, + } = if let Some(predicate) = + pushdown_filters.then_some(predicate.as_ref()).flatten() + { + // Split predicate into conjuncts and partition based on selectivity + let conjuncts: Vec> = + split_conjunction(predicate) + .into_iter() + .map(Arc::clone) + .collect(); + // #[cfg(debug_assertions)] + // { + // use datafusion_physical_expr_common::physical_expr::fmt_sql; + // for (expr, selectivity) in tracker.iter() { + // println!( + // "effectiveness for expr {}: {:.2}%", + // fmt_sql(expr.as_ref()), + // selectivity.effectiveness() * 100.0, + // ); + // } + // } + tracker.partition_filters(conjuncts) + } else { + PartitionedFilters { + row_filters: vec![], + post_scan: vec![], + } + }; + + // #[cfg(debug_assertions)] + // { + // println!( + // "ParquetOpener: pushing down {} filters, deferring {} filters", + // row_filters.len(), + // post_scan.len(), + // ); + // if !row_filters.is_empty() { + // println!(" Row filters:"); + // for filter in &row_filters { + // use datafusion_physical_expr_common::physical_expr::fmt_sql; + // println!(" {}", fmt_sql(filter.as_ref())); + // } + // } + // } + + // Extend projection with post-scan filter expressions BEFORE computing + // column indices, so the mask includes columns needed by filters. + let original_projection_len = projection.as_ref().len(); + let projection = if post_scan.is_empty() { + projection + } else { + let mut extended_exprs: Vec = + projection.iter().cloned().collect(); + + for (i, filter) in post_scan.iter().enumerate() { + extended_exprs.push(ProjectionExpr { + expr: Arc::clone(filter), + alias: format!("__filter_{i}"), + }); + } + + ProjectionExprs::new(extended_exprs) + }; + + // Now compute column indices (includes filter columns) + let indices = projection.column_indices(); + let mask = ProjectionMask::roots(builder.parquet_schema(), indices); + + // Build row filter with only the high-effectiveness filters + if !row_filters.is_empty() { + let row_filter_result = row_filter::build_row_filter_with_metrics( + row_filters, + &physical_file_schema, + builder.metadata(), + reorder_predicates, + &file_metrics, + &tracker, + ); + + match row_filter_result { + Ok(Some(result)) => { + builder = builder.with_row_filter(result.row_filter); + filter_metrics = result.filter_metrics; + } + Ok(None) => {} + Err(e) => { + debug!("Ignoring error building row filter: {e}"); + } + }; + } + + (post_scan, original_projection_len, projection, mask) + }; + // tracker lock released here let stream = builder .with_projection(mask) @@ -607,6 +697,7 @@ impl FileOpener for ParquetOpener { let predicate_cache_inner_records = file_metrics.predicate_cache_inner_records.clone(); let predicate_cache_records = file_metrics.predicate_cache_records.clone(); + let filter_apply_time = file_metrics.filter_apply_time.clone(); let stream_schema = Arc::clone(stream.schema()); // Check if we need to replace the schema to handle things like differing nullability or metadata. @@ -621,6 +712,19 @@ impl FileOpener for ParquetOpener { let projector = projection.make_projector(&stream_schema)?; + // Pre-compute the data schema for post-scan filtering (excludes filter columns) + let post_scan_data_schema = if !post_scan_filters.is_empty() { + let proj_schema = projector.output_schema(); + Some(Arc::new(arrow::datatypes::Schema::new( + proj_schema.fields()[..original_projection_len].to_vec(), + ))) + } else { + None + }; + + // Clone for use in the stream mapping closure + let post_scan_tracker = Arc::clone(&selectivity_tracker); + let stream = stream.map_err(DataFusionError::from).map(move |b| { b.and_then(|mut b| { copy_arrow_reader_metrics( @@ -629,6 +733,19 @@ impl FileOpener for ParquetOpener { &predicate_cache_records, ); b = projector.project_batch(&b)?; + + // Apply post-scan filters if present + if let Some(ref data_schema) = post_scan_data_schema { + let start = datafusion_common::instant::Instant::now(); + b = apply_post_scan_filters( + b, + Arc::clone(data_schema), + &post_scan_filters, + &post_scan_tracker, + )?; + filter_apply_time.add_elapsed(start); + } + if replace_schema { // Ensure the output batch has the expected schema. // This handles things like schema level and field level metadata, which may not be present @@ -656,6 +773,19 @@ impl FileOpener for ParquetOpener { // ---------------------------------------------------------------------- // Step: wrap the stream so a dynamic filter can stop the file scan early // ---------------------------------------------------------------------- + + // Wrap with SelectivityUpdatingStream if we have filter metrics to track + let stream = if !filter_metrics.is_empty() { + SelectivityUpdatingStream::new( + stream, + filter_metrics, + Arc::clone(&selectivity_tracker), + ) + .boxed() + } else { + stream.boxed() + }; + if let Some(file_pruner) = file_pruner { Ok(EarlyStoppingStream::new( stream, @@ -664,12 +794,94 @@ impl FileOpener for ParquetOpener { ) .boxed()) } else { - Ok(stream.boxed()) + Ok(stream) } })) } } +/// Apply post-scan filters to a record batch. +/// +/// This function: +/// 1. Extracts the filter columns (boolean arrays) from the end of the batch +/// 2. Tracks per-filter selectivity for adaptive filter reordering +/// 3. Combines them with AND +/// 4. Applies the combined filter mask to the data columns +/// 5. Returns a new batch with only the data columns +/// +/// The selectivity tracking here provides accurate measurements because all +/// post-scan filters see the same input rows (unlike row filters which run +/// sequentially and see progressively fewer rows). +fn apply_post_scan_filters( + batch: RecordBatch, + data_schema: SchemaRef, + filter_exprs: &[Arc], + selectivity_tracker: &parking_lot::RwLock, +) -> Result { + use arrow::array::as_boolean_array; + use arrow::compute::{and, filter_record_batch}; + + // Fast path: no work to be done + if filter_exprs.is_empty() { + return Ok(batch); + } + + let (_batch_schema, columns, num_rows) = batch.into_parts(); + let num_data_cols = data_schema.fields().len(); + + // Extract data columns and filter columns + let data_columns: Vec<_> = columns[..num_data_cols].to_vec(); + let filter_columns: Vec<_> = columns[num_data_cols..].to_vec(); + + // Track per-filter selectivity before combining. + // This gives us accurate marginal selectivity since all filters see the same input. + let input_rows = num_rows as u64; + if input_rows > 0 { + let mut rows_matched = Vec::with_capacity(filter_exprs.len()); + for (expr, col) in filter_exprs.iter().zip_eq(filter_columns.iter()) { + let bool_arr = col.as_boolean_opt().ok_or_else(|| internal_datafusion_err!( + "Expected filter expression to evaluate to boolean, got {}\nFilter expression: {}", + col.data_type(), + fmt_sql(expr.as_ref()) + ))?; + rows_matched.push(bool_arr.true_count() as u64); + } + let mut tracker = selectivity_tracker.write(); + for (expr, rows_matched) in filter_exprs.iter().zip_eq(rows_matched.into_iter()) { + tracker.update(expr, rows_matched, input_rows); + } + } + + // Combine filter columns with AND (avoiding unnecessary clones) + let combined_mask = match filter_columns.len() { + 0 => None, + 1 => Some(as_boolean_array(filter_columns[0].as_ref()).clone()), + _ => { + // Start with and() of first two - creates a new array, no clone needed + let first = as_boolean_array(filter_columns[0].as_ref()); + let second = as_boolean_array(filter_columns[1].as_ref()); + let mut acc = and(first, second)?; + for col in &filter_columns[2..] { + acc = and(&acc, as_boolean_array(col.as_ref()))?; + } + Some(acc) + } + }; + + // Create batch with data columns only + let opts = RecordBatchOptions::new().with_row_count(Some(num_rows)); + let data_batch = RecordBatch::try_new_with_options(data_schema, data_columns, &opts)?; + + // Apply the filter + let filtered = if let Some(mask) = combined_mask { + filter_record_batch(&data_batch, &mask)? + } else { + data_batch + }; + + Ok(filtered) +} + /// Copies metrics from ArrowReaderMetrics (the metrics collected by the /// arrow-rs parquet reader) to the parquet file metrics for DataFusion fn copy_arrow_reader_metrics( @@ -823,6 +1035,92 @@ where } } +/// A stream wrapper that updates the [`SelectivityTracker`] after each batch. +/// +/// This captures per-filter metrics during stream processing and updates the shared +/// selectivity tracker incrementally after each batch. This allows the system to +/// learn filter effectiveness quickly, potentially promoting effective filters +/// to row filters mid-stream for subsequent files. +struct SelectivityUpdatingStream { + /// The inner stream producing record batches + inner: S, + /// Has the stream finished processing? + done: bool, + /// Per-filter metrics collected during stream processing + filter_metrics: Vec, + /// Last reported values for each filter (to compute deltas) + last_reported: Vec<(u64, u64)>, // (matched, total) per filter + /// Shared selectivity tracker to update when stream completes + selectivity_tracker: Arc>, +} + +impl SelectivityUpdatingStream { + fn new( + stream: S, + filter_metrics: Vec, + selectivity_tracker: Arc>, + ) -> Self { + let last_reported = vec![(0, 0); filter_metrics.len()]; + Self { + inner: stream, + done: false, + filter_metrics, + last_reported, + selectivity_tracker, + } + } + + /// Update the selectivity tracker with metrics accumulated since last update. + /// Uses delta tracking to avoid double-counting rows. + fn update_selectivity(&mut self) { + let mut tracker = self.selectivity_tracker.write(); + for (i, metrics) in self.filter_metrics.iter().enumerate() { + let current_matched = metrics.get_rows_matched() as u64; + let current_total = metrics.get_rows_total() as u64; + + let (last_matched, last_total) = self.last_reported[i]; + let delta_matched = current_matched - last_matched; + let delta_total = current_total - last_total; + + // Only update if we have new rows since last update + if delta_total > 0 { + tracker.update(&metrics.expr, delta_matched, delta_total); + self.last_reported[i] = (current_matched, current_total); + } + } + } +} + +impl Stream for SelectivityUpdatingStream +where + S: Stream> + Unpin, +{ + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + if self.done { + return Poll::Ready(None); + } + + match ready!(self.inner.poll_next_unpin(cx)) { + None => { + // Stream completed - final update to selectivity tracker + self.done = true; + self.update_selectivity(); + Poll::Ready(None) + } + Some(result) => { + // Update selectivity after each batch for faster learning + self.update_selectivity(); + Poll::Ready(Some(result)) + } + } + } +} + #[derive(Default)] struct EncryptionContext { #[cfg(feature = "parquet_encryption")] @@ -1005,7 +1303,10 @@ mod test { use std::sync::Arc; use super::{ConstantColumns, constant_columns_from_stats}; - use crate::{DefaultParquetFileReaderFactory, RowGroupAccess, opener::ParquetOpener}; + use crate::{ + DefaultParquetFileReaderFactory, RowGroupAccess, opener::ParquetOpener, + selectivity::SelectivityTracker, + }; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use bytes::{BufMut, BytesMut}; use datafusion_common::{ @@ -1183,6 +1484,9 @@ mod test { encryption_factory: None, max_predicate_cache_size: self.max_predicate_cache_size, reverse_row_groups: self.reverse_row_groups, + selectivity_tracker: Arc::new(parking_lot::RwLock::new( + SelectivityTracker::default(), + )), } } } @@ -1611,7 +1915,10 @@ mod test { assert_eq!(num_batches, 1); assert_eq!(num_rows, 1); - // Filter should not match the partition value or the data value + // Filter should not match the partition value or the data value. + // With adaptive selectivity tracking, unknown filters are pushed down + // as row filters initially. The row filter prunes all rows during decoding, + // resulting in no batches being returned. let expr = col("part").eq(lit(2)).or(col("a").eq(lit(3))); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index 04c11b8875541..14755941fd514 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -87,8 +87,49 @@ use datafusion_physical_expr::{PhysicalExpr, split_conjunction}; use datafusion_physical_plan::metrics; use super::ParquetFileMetrics; +use super::selectivity::SelectivityTracker; use super::supported_predicates::supports_list_predicates; +/// Metrics for a single filter predicate, paired with the original expression. +/// +/// These metrics are tracked during row filter evaluation and can be used +/// to update selectivity statistics after processing completes. +#[derive(Debug, Clone)] +pub struct FilterMetrics { + /// The original filter expression (before any rewriting for the file schema) + pub expr: Arc, + /// Counter for rows that matched (passed) this filter + rows_matched: metrics::Count, + /// Counter for rows that were pruned (filtered out) by this filter + rows_pruned: metrics::Count, +} + +impl FilterMetrics { + /// Get the number of rows that matched this filter + pub fn get_rows_matched(&self) -> usize { + self.rows_matched.value() + } + + /// Get the number of rows that were pruned by this filter + pub fn get_rows_pruned(&self) -> usize { + self.rows_pruned.value() + } + + /// Get the total number of rows evaluated by this filter + pub fn get_rows_total(&self) -> usize { + self.get_rows_matched() + self.get_rows_pruned() + } +} + +/// Result of building a row filter, containing both the filter and per-expression metrics. +pub struct RowFilterWithMetrics { + /// The row filter to apply during parquet decoding + pub row_filter: RowFilter, + /// Metrics for each filter expression, in the order they appear in the row filter. + /// These can be read after the stream completes to update selectivity statistics. + pub filter_metrics: Vec, +} + /// A "compiled" predicate passed to `ParquetRecordBatchStream` to perform /// row-level filtering during parquet decoding. /// @@ -549,6 +590,9 @@ fn columns_sorted(_columns: &[usize], _metadata: &ParquetMetaData) -> Result Result> { let rows_pruned = &file_metrics.pushdown_rows_pruned; let rows_matched = &file_metrics.pushdown_rows_matched; @@ -595,9 +640,21 @@ pub fn build_row_filter( if reorder_predicates { candidates.sort_unstable_by(|c1, c2| { - match c1.can_use_index.cmp(&c2.can_use_index) { - Ordering::Equal => c1.required_bytes.cmp(&c2.required_bytes), - ord => ord, + let eff1 = selectivity_tracker.get_effectiveness(&c1.expr); + let eff2 = selectivity_tracker.get_effectiveness(&c2.expr); + + match (eff1, eff2) { + // Both have known effectiveness: sort by effectiveness descending + // (higher effectiveness = more selective = should come first) + (Some(e1), Some(e2)) => e2.partial_cmp(&e1).unwrap_or(Ordering::Equal), + // Known effectiveness comes before unknown + (Some(_), None) => Ordering::Less, + (None, Some(_)) => Ordering::Greater, + // Both unknown: fall back to existing heuristics + (None, None) => match c1.can_use_index.cmp(&c2.can_use_index) { + Ordering::Equal => c1.required_bytes.cmp(&c2.required_bytes), + ord => ord, + }, } }); } @@ -637,6 +694,211 @@ pub fn build_row_filter( .map(|filters| Some(RowFilter::new(filters))) } +/// Build a [`RowFilter`] from the given predicate expression, returning per-expression metrics. +/// +/// This is similar to [`build_row_filter`] but additionally returns [`FilterMetrics`] for each +/// filter expression. The metrics can be read after stream processing completes to update +/// selectivity statistics. +/// +/// # Arguments +/// * `expr` - The filter predicate, already adapted to reference columns in `file_schema` +/// * `file_schema` - The Arrow schema of the parquet file +/// * `metadata` - Parquet file metadata used for cost estimation +/// * `reorder_predicates` - If true, reorder predicates to minimize I/O +/// * `file_metrics` - Metrics for tracking filter performance +/// * `selectivity_tracker` - Tracker containing effectiveness data for filter reordering +/// +/// # Returns +/// * `Ok(Some(result))` containing the row filter and per-expression metrics +/// * `Ok(None)` if no expressions can be used as a RowFilter +/// * `Err(e)` if an error occurs while building the filter +pub fn build_row_filter_with_metrics( + predicates: Vec>, + file_schema: &SchemaRef, + metadata: &ParquetMetaData, + reorder_predicates: bool, + file_metrics: &ParquetFileMetrics, + selectivity_tracker: &SelectivityTracker, +) -> Result> { + let rows_pruned = &file_metrics.pushdown_rows_pruned; + let rows_matched = &file_metrics.pushdown_rows_matched; + let time = &file_metrics.row_pushdown_eval_time; + + // Determine which conjuncts can be evaluated as ArrowPredicates, if any + // We need to preserve the original expressions before building candidates + let mut candidates_with_exprs: Vec<(Arc, FilterCandidate)> = + predicates + .into_iter() + .filter_map(|expr| { + let original_expr = Arc::clone(&expr); + FilterCandidateBuilder::new(expr, Arc::clone(file_schema)) + .build(metadata) + .ok() + .flatten() + .map(|candidate| (original_expr, candidate)) + }) + .collect(); + + // no candidates + if candidates_with_exprs.is_empty() { + return Ok(None); + } + + if reorder_predicates { + candidates_with_exprs.sort_unstable_by(|(_, c1), (_, c2)| { + let eff1 = selectivity_tracker.get_effectiveness(&c1.expr); + let eff2 = selectivity_tracker.get_effectiveness(&c2.expr); + + match (eff1, eff2) { + // Both have known effectiveness: sort by effectiveness descending + // (higher effectiveness = more selective = should come first) + (Some(e1), Some(e2)) => e2.partial_cmp(&e1).unwrap_or(Ordering::Equal), + // Either unknown: fall back to existing heuristics + _ => match c1.can_use_index.cmp(&c2.can_use_index) { + Ordering::Equal => c1.required_bytes.cmp(&c2.required_bytes), + ord => ord, + }, + } + }); + } + + let total_candidates = candidates_with_exprs.len(); + let mut filter_metrics = Vec::with_capacity(total_candidates); + let mut arrow_predicates = Vec::with_capacity(total_candidates); + + for (idx, (original_expr, candidate)) in candidates_with_exprs.into_iter().enumerate() + { + let is_last = idx == total_candidates - 1; + + // Create per-predicate metrics for selectivity tracking + let predicate_rows_matched = metrics::Count::new(); + let predicate_rows_pruned = metrics::Count::new(); + + // Store references to the metrics for the filter + filter_metrics.push(FilterMetrics { + expr: original_expr, + rows_matched: predicate_rows_matched.clone(), + rows_pruned: predicate_rows_pruned.clone(), + }); + + // For global metrics tracking: + // - All predicates contribute to the global pruned counter + // - Only the last predicate contributes to the global matched counter + let global_rows_pruned = rows_pruned.clone(); + let global_rows_matched = if is_last { + rows_matched.clone() + } else { + metrics::Count::new() + }; + + // Create a predicate that updates both per-predicate and global metrics + let arrow_pred = DatafusionArrowPredicateWithMetrics::try_new( + candidate, + metadata, + predicate_rows_pruned, + predicate_rows_matched, + global_rows_pruned, + global_rows_matched, + time.clone(), + )?; + + arrow_predicates.push(Box::new(arrow_pred) as Box); + } + + Ok(Some(RowFilterWithMetrics { + row_filter: RowFilter::new(arrow_predicates), + filter_metrics, + })) +} + +/// A variant of [`DatafusionArrowPredicate`] that tracks both per-predicate and global metrics. +/// +/// This is used by [`build_row_filter_with_metrics`] to enable selectivity tracking +/// while maintaining backward compatibility with the global metrics system. +#[derive(Debug)] +struct DatafusionArrowPredicateWithMetrics { + /// the filter expression + physical_expr: Arc, + /// Path to the columns in the parquet schema required to evaluate the expression + projection_mask: ProjectionMask, + /// Per-predicate: how many rows were filtered out by this predicate + local_rows_pruned: metrics::Count, + /// Per-predicate: how many rows passed this predicate + local_rows_matched: metrics::Count, + /// Global: how many rows were filtered out (shared across predicates) + global_rows_pruned: metrics::Count, + /// Global: how many rows passed (only tracked by last predicate) + global_rows_matched: metrics::Count, + /// how long was spent evaluating this predicate + time: metrics::Time, +} + +impl DatafusionArrowPredicateWithMetrics { + fn try_new( + candidate: FilterCandidate, + metadata: &ParquetMetaData, + local_rows_pruned: metrics::Count, + local_rows_matched: metrics::Count, + global_rows_pruned: metrics::Count, + global_rows_matched: metrics::Count, + time: metrics::Time, + ) -> Result { + let physical_expr = + reassign_expr_columns(candidate.expr, &candidate.filter_schema)?; + + Ok(Self { + physical_expr, + // Use leaf indices: when nested columns are involved, we must specify + // leaf (primitive) column indices in the Parquet schema so the decoder + // can properly project and filter nested structures. + projection_mask: ProjectionMask::leaves( + metadata.file_metadata().schema_descr(), + candidate.projection.leaf_indices.iter().copied(), + ), + local_rows_pruned, + local_rows_matched, + global_rows_pruned, + global_rows_matched, + time, + }) + } +} + +impl ArrowPredicate for DatafusionArrowPredicateWithMetrics { + fn projection(&self) -> &ProjectionMask { + &self.projection_mask + } + + fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult { + let mut timer = self.time.timer(); + + self.physical_expr + .evaluate(&batch) + .and_then(|v| v.into_array(batch.num_rows())) + .and_then(|array| { + let bool_arr = as_boolean_array(&array)?.clone(); + let num_matched = bool_arr.true_count(); + let num_pruned = bool_arr.len() - num_matched; + + // Update per-predicate metrics (for selectivity tracking) + self.local_rows_pruned.add(num_pruned); + self.local_rows_matched.add(num_matched); + + // Update global metrics (for backward compatibility) + self.global_rows_pruned.add(num_pruned); + self.global_rows_matched.add(num_matched); + + timer.stop(); + Ok(bool_arr) + }) + .map_err(|e| { + ArrowError::ComputeError(format!( + "Error evaluating filter predicate: {e:?}" + )) + }) + } +} + #[cfg(test)] mod test { use super::*; @@ -917,11 +1179,18 @@ mod test { let metrics = ExecutionPlanMetricsSet::new(); let file_metrics = ParquetFileMetrics::new(0, &format!("{func_name}.parquet"), &metrics); + let selectivity_tracker = SelectivityTracker::default(); - let row_filter = - build_row_filter(&expr, &file_schema, &metadata, false, &file_metrics) - .expect("building row filter") - .expect("row filter should exist"); + let row_filter = build_row_filter( + &expr, + &file_schema, + &metadata, + false, + &file_metrics, + &selectivity_tracker, + ) + .expect("building row filter") + .expect("row filter should exist"); let reader = parquet_reader_builder .with_row_filter(row_filter) diff --git a/datafusion/datasource-parquet/src/selectivity.rs b/datafusion/datasource-parquet/src/selectivity.rs new file mode 100644 index 0000000000000..7e9fb118d46d2 --- /dev/null +++ b/datafusion/datasource-parquet/src/selectivity.rs @@ -0,0 +1,455 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Adaptive filter selectivity tracking for Parquet row filters. +//! +//! This module provides infrastructure to track filter effectiveness across files +//! and adaptively decide which filters should be pushed down as row filters vs. +//! applied post-scan. +//! +//! The key insight is that filters with low effectiveness (those that don't filter +//! out many rows) may not be worth the I/O cost of late materialization. By tracking +//! effectiveness across files, we can learn which filters are worth pushing down. + +use std::collections::HashMap; +use std::hash::{Hash, Hasher}; +use std::sync::Arc; + +use datafusion_physical_expr_common::physical_expr::PhysicalExpr; + +/// Result of partitioning filters based on their effectiveness. +/// +/// Filters are split into two groups: +/// - `row_filters`: Filters that should be pushed down as row filters +/// - `post_scan`: Filters that should be applied after scanning +#[derive(Debug, Clone, Default)] +pub struct PartitionedFilters { + /// Filters to push down as row filters (effective or unknown effectiveness) + pub row_filters: Vec>, + /// Filters to apply post-scan (known to be ineffective) + pub post_scan: Vec>, +} + +/// Wrapper for `Arc` that uses structural Hash/Eq. +/// +/// This is needed because `Arc` uses pointer equality by default, +/// but we want to use the structural equality provided by `DynEq` and `DynHash`. +/// +/// For dynamic expressions (like `DynamicFilterPhysicalExpr`), we use the snapshot +/// of the expression to ensure stable hash/eq values even as the dynamic expression +/// updates. This is critical for HashMap correctness. +#[derive(Clone, Debug)] +pub struct ExprKey(Arc); + +impl ExprKey { + /// Create a new ExprKey from an expression. + /// + /// For dynamic expressions, this takes a snapshot to ensure stable hash/eq. + pub fn new(expr: &Arc) -> Self { + // Try to get a snapshot; if available, use it for stable hash/eq + let stable_expr = expr + .snapshot() + .ok() + .flatten() + .unwrap_or_else(|| Arc::clone(expr)); + Self(stable_expr) + } +} + +impl Hash for ExprKey { + fn hash(&self, state: &mut H) { + // dyn PhysicalExpr implements Hash, which delegates to dyn_hash + self.0.as_ref().hash(state); + } +} + +impl PartialEq for ExprKey { + fn eq(&self, other: &Self) -> bool { + self.0.as_ref() == other.0.as_ref() + } +} + +impl Eq for ExprKey {} + +/// Tracks selectivity statistics for a single filter expression. +#[derive(Debug, Clone, Default)] +pub struct SelectivityStats { + /// Number of rows that matched (passed) the filter + pub rows_matched: u64, + /// Total number of rows evaluated + pub rows_total: u64, +} + +impl SelectivityStats { + /// Create new stats with given values. + pub fn new(rows_matched: u64, rows_total: u64) -> Self { + Self { + rows_matched, + rows_total, + } + } + + /// Returns the filter effectiveness (fraction of rows filtered out). + /// + /// - 1.0 = perfect filter (all rows filtered out) + /// - 0.0 = useless filter (no rows filtered out) + /// + /// Returns 0.0 if no rows have been evaluated (unknown effectiveness). + pub fn effectiveness(&self) -> f64 { + if self.rows_total == 0 { + 0.0 // Unknown, assume ineffective + } else { + 1.0 - (self.rows_matched as f64 / self.rows_total as f64) + } + } + + /// Update stats with new observations. + pub fn update(&mut self, matched: u64, total: u64) { + self.rows_matched += matched; + self.rows_total += total; + } +} + +/// Cross-file selectivity tracker for adaptive filter ordering. +/// +/// This tracker maintains effectiveness statistics for filter expressions +/// across multiple files, allowing the system to learn which filters are +/// worth pushing down as row filters. +#[derive(Debug)] +pub struct SelectivityTracker { + /// Per-expression effectiveness statistics + stats: HashMap, + /// Minimum effectiveness threshold to keep a filter as a row filter. + /// Filters with effectiveness < threshold are demoted to post-scan. + /// Default: 0.8 (must filter out at least 80% of rows) + threshold: f64, +} + +impl Default for SelectivityTracker { + fn default() -> Self { + Self::new(0.8) + } +} + +impl SelectivityTracker { + /// Create a new tracker with the given effectiveness threshold. + /// + /// # Arguments + /// * `threshold` - Minimum effectiveness (0.0-1.0) to keep as row filter. + /// Filters with effectiveness < threshold are demoted to post-scan. + pub fn new(threshold: f64) -> Self { + Self { + stats: HashMap::new(), + threshold, + } + } + + /// Get the effectiveness threshold. + pub fn threshold(&self) -> f64 { + self.threshold + } + + /// Get the effectiveness for a filter expression, if known. + pub fn get_effectiveness(&self, expr: &Arc) -> Option { + let key = ExprKey::new(expr); + self.stats.get(&key).map(|s| s.effectiveness()) + } + + /// Partition filters into row_filters and post_scan based on effectiveness. + /// + /// Filters start as row filters (pushed down) to take advantage of late + /// materialization. As we learn their effectiveness, ineffective filters + /// (those that pass most rows) get demoted to post-scan for future files. + /// + /// - Filters with effectiveness >= threshold → row_filters (push down) + /// - Filters with effectiveness < threshold → post_scan (not worth pushing) + /// - Filters with unknown effectiveness → row_filters (try pushing first) + pub fn partition_filters( + &self, + filters: Vec>, + ) -> PartitionedFilters { + // If the selectivity is set to 0.0, all filters are promoted to row filters + // even if we have no stats on them. + if self.threshold == 0.0 { + return PartitionedFilters { + row_filters: filters, + post_scan: Vec::new(), + }; + } + + let mut row_filters = Vec::new(); + let mut post_scan = Vec::new(); + + for filter in filters { + let key = ExprKey::new(&filter); + match self.stats.get(&key) { + Some(stats) if stats.effectiveness() < self.threshold => { + // Known to be ineffective - demote to post-scan + post_scan.push(filter); + } + _ => { + // Unknown or effective - push down as row filter + row_filters.push(filter); + } + } + } + + PartitionedFilters { + row_filters, + post_scan, + } + } + + /// Update stats for a filter expression after processing a file. + pub fn update(&mut self, expr: &Arc, matched: u64, total: u64) { + let key = ExprKey::new(expr); + self.stats.entry(key).or_default().update(matched, total); + } + + /// Get the current stats for a filter expression, if any. + pub fn get_stats(&self, expr: &Arc) -> Option<&SelectivityStats> { + let key = ExprKey::new(expr); + self.stats.get(&key) + } + + /// Iterate all known selectivities. + pub fn iter( + &self, + ) -> impl Iterator, &SelectivityStats)> { + self.stats.iter().map(|(key, stats)| (&key.0, stats)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_expr::Operator; + use datafusion_physical_expr::expressions::{BinaryExpr, col, lit}; + use std::sync::Arc; + + fn test_schema() -> Schema { + Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Utf8, true), + ]) + } + + fn make_filter(col_name: &str, value: i32) -> Arc { + let schema = test_schema(); + Arc::new(BinaryExpr::new( + col(col_name, &schema).unwrap(), + Operator::Eq, + lit(value), + )) + } + + #[test] + fn test_expr_key_equality() { + let filter1 = make_filter("a", 5); + let filter2 = make_filter("a", 5); + let filter3 = make_filter("a", 10); + + let key1 = ExprKey::new(&filter1); + let key2 = ExprKey::new(&filter2); + let key3 = ExprKey::new(&filter3); + + // Same expression structure should be equal + assert_eq!(key1, key2); + // Different value should not be equal + assert_ne!(key1, key3); + } + + #[test] + fn test_expr_key_hash() { + use std::collections::hash_map::DefaultHasher; + + let filter1 = make_filter("a", 5); + let filter2 = make_filter("a", 5); + + let key1 = ExprKey::new(&filter1); + let key2 = ExprKey::new(&filter2); + + let mut hasher1 = DefaultHasher::new(); + let mut hasher2 = DefaultHasher::new(); + key1.hash(&mut hasher1); + key2.hash(&mut hasher2); + + assert_eq!(hasher1.finish(), hasher2.finish()); + } + + #[test] + fn test_selectivity_stats_effectiveness() { + // No data - unknown + let stats = SelectivityStats::new(0, 0); + assert_eq!(stats.effectiveness(), 0.0); + + // All rows pass - useless filter + let stats = SelectivityStats::new(100, 100); + assert_eq!(stats.effectiveness(), 0.0); + + // No rows pass - perfect filter + let stats = SelectivityStats::new(0, 100); + assert_eq!(stats.effectiveness(), 1.0); + + // 20% pass = 80% filtered = 0.8 effectiveness + let stats = SelectivityStats::new(20, 100); + assert_eq!(stats.effectiveness(), 0.8); + + // 50% pass = 50% filtered = 0.5 effectiveness + let stats = SelectivityStats::new(50, 100); + assert_eq!(stats.effectiveness(), 0.5); + } + + #[test] + fn test_selectivity_stats_update() { + let mut stats = SelectivityStats::default(); + assert_eq!(stats.rows_matched, 0); + assert_eq!(stats.rows_total, 0); + + stats.update(20, 100); + assert_eq!(stats.rows_matched, 20); + assert_eq!(stats.rows_total, 100); + + stats.update(30, 100); + assert_eq!(stats.rows_matched, 50); + assert_eq!(stats.rows_total, 200); + assert_eq!(stats.effectiveness(), 0.75); // 150/200 filtered = 0.75 + } + + #[test] + fn test_tracker_partition_unknown_filters() { + let tracker = SelectivityTracker::new(0.8); + + let filter1 = make_filter("a", 5); + let filter2 = make_filter("a", 10); + + // Unknown filters should go to row_filters to be tried first + let PartitionedFilters { + row_filters, + post_scan, + } = tracker.partition_filters(vec![filter1.clone(), filter2.clone()]); + + assert_eq!(row_filters.len(), 2); + assert_eq!(post_scan.len(), 0); + } + + #[test] + fn test_tracker_partition_effective_filters() { + let mut tracker = SelectivityTracker::new(0.8); + + let filter1 = make_filter("a", 5); + let filter2 = make_filter("a", 10); + + // Update filter1 with high effectiveness (90% filtered) + tracker.update(&filter1, 10, 100); + + let PartitionedFilters { + row_filters, + post_scan, + } = tracker.partition_filters(vec![filter1.clone(), filter2.clone()]); + + // filter1 is effective (0.9 >= 0.8) → row_filters, filter2 is unknown → row_filters + assert_eq!(row_filters.len(), 2); + assert_eq!(post_scan.len(), 0); + + // Both filters should be in row_filters + assert!( + row_filters + .iter() + .any(|f| ExprKey::new(f) == ExprKey::new(&filter1)) + ); + assert!( + row_filters + .iter() + .any(|f| ExprKey::new(f) == ExprKey::new(&filter2)) + ); + } + + #[test] + fn test_tracker_partition_ineffective_filters() { + let mut tracker = SelectivityTracker::new(0.8); + + let filter1 = make_filter("a", 5); + let filter2 = make_filter("a", 10); + + // Update filter1 with low effectiveness (50% filtered) + tracker.update(&filter1, 50, 100); + + let PartitionedFilters { + row_filters, + post_scan, + } = tracker.partition_filters(vec![filter1.clone(), filter2.clone()]); + + // filter1 is ineffective (0.5 < 0.8) → post_scan, filter2 is unknown → row_filters + assert_eq!(row_filters.len(), 1); + assert_eq!(post_scan.len(), 1); + + // The unknown filter should be in row_filters + assert!( + row_filters + .iter() + .any(|f| ExprKey::new(f) == ExprKey::new(&filter2)) + ); + // The ineffective filter should be in post_scan + assert!( + post_scan + .iter() + .any(|f| ExprKey::new(f) == ExprKey::new(&filter1)) + ); + } + + #[test] + fn test_tracker_threshold_boundary() { + let mut tracker = SelectivityTracker::new(0.8); + + let filter = make_filter("a", 5); + + // Exactly at threshold (80% filtered = 0.8 effectiveness) + tracker.update(&filter, 20, 100); + assert_eq!(tracker.get_effectiveness(&filter), Some(0.8)); + + let PartitionedFilters { + row_filters, + post_scan, + } = tracker.partition_filters(vec![filter.clone()]); + + // At threshold boundary, should stay as row filter (>= threshold) + assert_eq!(row_filters.len(), 1); + assert_eq!(post_scan.len(), 0); + } + + #[test] + fn test_tracker_just_below_threshold() { + let mut tracker = SelectivityTracker::new(0.8); + + let filter = make_filter("a", 5); + + // Just below threshold (79% filtered = 0.79 effectiveness) + tracker.update(&filter, 21, 100); + assert!((tracker.get_effectiveness(&filter).unwrap() - 0.79).abs() < 0.001); + + let PartitionedFilters { + row_filters, + post_scan, + } = tracker.partition_filters(vec![filter.clone()]); + + // Below threshold, should be demoted to post_scan + assert_eq!(row_filters.len(), 0); + assert_eq!(post_scan.len(), 1); + } +} diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 2e0919b1447de..c9722ad14d75d 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -293,6 +293,11 @@ pub struct ParquetSource { /// so we still need to sort them after reading, so the reverse scan is inexact. /// Used to optimize ORDER BY ... DESC on sorted data. reverse_row_groups: bool, + /// Tracks filter selectivity across files for adaptive filter reordering. + /// Shared across all openers - each opener reads stats and makes its own + /// decision about which filters to push down vs. apply post-scan. + pub(crate) selectivity_tracker: + Arc>, } impl ParquetSource { @@ -318,14 +323,30 @@ impl ParquetSource { #[cfg(feature = "parquet_encryption")] encryption_factory: None, reverse_row_groups: false, + selectivity_tracker: Arc::new(parking_lot::RwLock::new( + crate::selectivity::SelectivityTracker::default(), + )), } } + /// Set the selectivity for converting filters to pre-materialization row filters. + pub fn with_filter_pushdown_selectivity(mut self, selectivity: f64) -> Self { + self.selectivity_tracker = Arc::new(parking_lot::RwLock::new( + crate::selectivity::SelectivityTracker::new(selectivity), + )); + self + } + /// Set the `TableParquetOptions` for this ParquetSource. pub fn with_table_parquet_options( mut self, table_parquet_options: TableParquetOptions, ) -> Self { + // Update the selectivity tracker threshold from the config + let threshold = table_parquet_options.global.filter_effectiveness_threshold; + self.selectivity_tracker = Arc::new(parking_lot::RwLock::new( + crate::selectivity::SelectivityTracker::new(threshold), + )); self.table_parquet_options = table_parquet_options; self } @@ -460,6 +481,30 @@ impl ParquetSource { self.table_parquet_options.global.max_predicate_cache_size } + /// Set the minimum filter effectiveness threshold for adaptive filter pushdown. + /// + /// When `pushdown_filters` is enabled, filters that don't filter out at least + /// this fraction of rows will be demoted from row-level filters to post-scan filters. + /// This helps avoid the I/O cost of late materialization for filters that aren't + /// selective enough. Valid values are 0.0 to 1.0, where 0.8 means filters must + /// filter out at least 80% of rows to remain as row filters. Defaults to 0.8. + pub fn with_filter_effectiveness_threshold(mut self, threshold: f64) -> Self { + self.table_parquet_options + .global + .filter_effectiveness_threshold = threshold; + self.selectivity_tracker = Arc::new(parking_lot::RwLock::new( + crate::selectivity::SelectivityTracker::new(threshold), + )); + self + } + + /// Return the filter effectiveness threshold. + pub fn filter_effectiveness_threshold(&self) -> f64 { + self.table_parquet_options + .global + .filter_effectiveness_threshold + } + #[cfg(feature = "parquet_encryption")] fn get_encryption_factory_with_config( &self, @@ -567,6 +612,7 @@ impl FileSource for ParquetSource { encryption_factory: self.get_encryption_factory_with_config(), max_predicate_cache_size: self.max_predicate_cache_size(), reverse_row_groups: self.reverse_row_groups, + selectivity_tracker: Arc::clone(&self.selectivity_tracker), }); Ok(opener) } diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index 08bb25bd715b9..799c853edb575 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -588,6 +588,10 @@ message ParquetOptions { oneof max_predicate_cache_size_opt { uint64 max_predicate_cache_size = 33; } + + oneof filter_effectiveness_threshold_opt { + double filter_effectiveness_threshold = 35; + } } enum JoinSide { diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index e8e71c3884586..2263dcb759a86 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -1013,6 +1013,9 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions { max_predicate_cache_size: value.max_predicate_cache_size_opt.map(|opt| match opt { protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v) => Some(v as usize), }).unwrap_or(None), + filter_effectiveness_threshold: value.filter_effectiveness_threshold_opt.map(|opt| match opt { + protobuf::parquet_options::FilterEffectivenessThresholdOpt::FilterEffectivenessThreshold(v) => v, + }).unwrap_or(0.8f64), }) } } diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index d38cf86825d46..c3562adbfd9cf 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -5696,6 +5696,9 @@ impl serde::Serialize for ParquetOptions { if self.max_predicate_cache_size_opt.is_some() { len += 1; } + if self.filter_effectiveness_threshold_opt.is_some() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion_common.ParquetOptions", len)?; if self.enable_page_index { struct_ser.serialize_field("enablePageIndex", &self.enable_page_index)?; @@ -5861,6 +5864,13 @@ impl serde::Serialize for ParquetOptions { } } } + if let Some(v) = self.filter_effectiveness_threshold_opt.as_ref() { + match v { + parquet_options::FilterEffectivenessThresholdOpt::FilterEffectivenessThreshold(v) => { + struct_ser.serialize_field("filterEffectivenessThreshold", v)?; + } + } + } struct_ser.end() } } @@ -5932,6 +5942,8 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "coerceInt96", "max_predicate_cache_size", "maxPredicateCacheSize", + "filter_effectiveness_threshold", + "filterEffectivenessThreshold", ]; #[allow(clippy::enum_variant_names)] @@ -5968,6 +5980,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { BloomFilterNdv, CoerceInt96, MaxPredicateCacheSize, + FilterEffectivenessThreshold, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -6021,6 +6034,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "bloomFilterNdv" | "bloom_filter_ndv" => Ok(GeneratedField::BloomFilterNdv), "coerceInt96" | "coerce_int96" => Ok(GeneratedField::CoerceInt96), "maxPredicateCacheSize" | "max_predicate_cache_size" => Ok(GeneratedField::MaxPredicateCacheSize), + "filterEffectivenessThreshold" | "filter_effectiveness_threshold" => Ok(GeneratedField::FilterEffectivenessThreshold), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -6072,6 +6086,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { let mut bloom_filter_ndv_opt__ = None; let mut coerce_int96_opt__ = None; let mut max_predicate_cache_size_opt__ = None; + let mut filter_effectiveness_threshold_opt__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::EnablePageIndex => { @@ -6280,6 +6295,12 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { } max_predicate_cache_size_opt__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(x.0)); } + GeneratedField::FilterEffectivenessThreshold => { + if filter_effectiveness_threshold_opt__.is_some() { + return Err(serde::de::Error::duplicate_field("filterEffectivenessThreshold")); + } + filter_effectiveness_threshold_opt__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| parquet_options::FilterEffectivenessThresholdOpt::FilterEffectivenessThreshold(x.0)); + } } } Ok(ParquetOptions { @@ -6315,6 +6336,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { bloom_filter_ndv_opt: bloom_filter_ndv_opt__, coerce_int96_opt: coerce_int96_opt__, max_predicate_cache_size_opt: max_predicate_cache_size_opt__, + filter_effectiveness_threshold_opt: filter_effectiveness_threshold_opt__, }) } } diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index 16601dcf46977..1adc62e96e279 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -849,6 +849,10 @@ pub struct ParquetOptions { pub max_predicate_cache_size_opt: ::core::option::Option< parquet_options::MaxPredicateCacheSizeOpt, >, + #[prost(oneof = "parquet_options::FilterEffectivenessThresholdOpt", tags = "35")] + pub filter_effectiveness_threshold_opt: ::core::option::Option< + parquet_options::FilterEffectivenessThresholdOpt, + >, } /// Nested message and enum types in `ParquetOptions`. pub mod parquet_options { @@ -907,6 +911,11 @@ pub mod parquet_options { #[prost(uint64, tag = "33")] MaxPredicateCacheSize(u64), } + #[derive(Clone, Copy, PartialEq, ::prost::Oneof)] + pub enum FilterEffectivenessThresholdOpt { + #[prost(double, tag = "35")] + FilterEffectivenessThreshold(f64), + } } #[derive(Clone, PartialEq, ::prost::Message)] pub struct Precision { diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index fee3656482005..ecd99adb244f8 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -883,6 +883,7 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions { skip_arrow_metadata: value.skip_arrow_metadata, coerce_int96_opt: value.coerce_int96.clone().map(protobuf::parquet_options::CoerceInt96Opt::CoerceInt96), max_predicate_cache_size_opt: value.max_predicate_cache_size.map(|v| protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v as u64)), + filter_effectiveness_threshold_opt: Some(protobuf::parquet_options::FilterEffectivenessThresholdOpt::FilterEffectivenessThreshold(value.filter_effectiveness_threshold)), }) } } diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index 16601dcf46977..1adc62e96e279 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -849,6 +849,10 @@ pub struct ParquetOptions { pub max_predicate_cache_size_opt: ::core::option::Option< parquet_options::MaxPredicateCacheSizeOpt, >, + #[prost(oneof = "parquet_options::FilterEffectivenessThresholdOpt", tags = "35")] + pub filter_effectiveness_threshold_opt: ::core::option::Option< + parquet_options::FilterEffectivenessThresholdOpt, + >, } /// Nested message and enum types in `ParquetOptions`. pub mod parquet_options { @@ -907,6 +911,11 @@ pub mod parquet_options { #[prost(uint64, tag = "33")] MaxPredicateCacheSize(u64), } + #[derive(Clone, Copy, PartialEq, ::prost::Oneof)] + pub enum FilterEffectivenessThresholdOpt { + #[prost(double, tag = "35")] + FilterEffectivenessThreshold(f64), + } } #[derive(Clone, PartialEq, ::prost::Message)] pub struct Precision { diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index 436a06493766d..0eda4a88f7b65 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -424,6 +424,7 @@ mod parquet { max_predicate_cache_size_opt: global_options.global.max_predicate_cache_size.map(|size| { parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size as u64) }), + filter_effectiveness_threshold_opt: Some(parquet_options::FilterEffectivenessThresholdOpt::FilterEffectivenessThreshold(global_options.global.filter_effectiveness_threshold)), }), column_specific_options: column_specific_options.into_iter().map(|(column_name, options)| { ParquetColumnSpecificOptions { @@ -523,6 +524,9 @@ mod parquet { max_predicate_cache_size: proto.max_predicate_cache_size_opt.as_ref().map(|opt| match opt { parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size) => *size as usize, }), + filter_effectiveness_threshold: proto.filter_effectiveness_threshold_opt.as_ref().map(|opt| match opt { + parquet_options::FilterEffectivenessThresholdOpt::FilterEffectivenessThreshold(threshold) => *threshold, + }).unwrap_or(0.8), } } } diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 0666fc2979b38..5cb6fbed2376f 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -723,6 +723,14 @@ impl protobuf::PhysicalPlanNode { let mut source = ParquetSource::new(table_schema).with_table_parquet_options(options); + source = source.with_filter_pushdown_selectivity( + ctx.session_config() + .options() + .execution + .parquet + .filter_effectiveness_threshold, + ); + if let Some(predicate) = predicate { source = source.with_predicate(predicate); } diff --git a/datafusion/sqllogictest/test_files/explain_tree.slt b/datafusion/sqllogictest/test_files/explain_tree.slt index 9215ce87e3bef..77aee9ff6423d 100644 --- a/datafusion/sqllogictest/test_files/explain_tree.slt +++ b/datafusion/sqllogictest/test_files/explain_tree.slt @@ -554,29 +554,14 @@ explain SELECT int_col FROM table2 WHERE string_col != 'foo'; ---- physical_plan 01)┌───────────────────────────┐ -02)│ FilterExec │ +02)│ DataSourceExec │ 03)│ -------------------- │ -04)│ predicate: │ -05)│ string_col != foo │ -06)└─────────────┬─────────────┘ -07)┌─────────────┴─────────────┐ -08)│ RepartitionExec │ -09)│ -------------------- │ -10)│ partition_count(in->out): │ -11)│ 1 -> 4 │ -12)│ │ -13)│ partitioning_scheme: │ -14)│ RoundRobinBatch(4) │ -15)└─────────────┬─────────────┘ -16)┌─────────────┴─────────────┐ -17)│ DataSourceExec │ -18)│ -------------------- │ -19)│ files: 1 │ -20)│ format: parquet │ -21)│ │ -22)│ predicate: │ -23)│ string_col != foo │ -24)└───────────────────────────┘ +04)│ files: 1 │ +05)│ format: parquet │ +06)│ │ +07)│ predicate: │ +08)│ string_col != foo │ +09)└───────────────────────────┘ # Query with filter on memory query TT diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 646cc3dfd5370..eecab21747740 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -244,6 +244,7 @@ datafusion.execution.parquet.dictionary_enabled true datafusion.execution.parquet.dictionary_page_size_limit 1048576 datafusion.execution.parquet.enable_page_index true datafusion.execution.parquet.encoding NULL +datafusion.execution.parquet.filter_effectiveness_threshold 0.8 datafusion.execution.parquet.force_filter_selections false datafusion.execution.parquet.max_predicate_cache_size NULL datafusion.execution.parquet.max_row_group_size 1048576 @@ -251,7 +252,7 @@ datafusion.execution.parquet.maximum_buffered_record_batches_per_stream 2 datafusion.execution.parquet.maximum_parallel_row_group_writers 1 datafusion.execution.parquet.metadata_size_hint 524288 datafusion.execution.parquet.pruning true -datafusion.execution.parquet.pushdown_filters false +datafusion.execution.parquet.pushdown_filters true datafusion.execution.parquet.reorder_filters false datafusion.execution.parquet.schema_force_view_types true datafusion.execution.parquet.skip_arrow_metadata false @@ -379,6 +380,7 @@ datafusion.execution.parquet.dictionary_enabled true (writing) Sets if dictionar datafusion.execution.parquet.dictionary_page_size_limit 1048576 (writing) Sets best effort maximum dictionary page size, in bytes datafusion.execution.parquet.enable_page_index true (reading) If true, reads the Parquet data page level metadata (the Page Index), if present, to reduce the I/O and number of rows decoded. datafusion.execution.parquet.encoding NULL (writing) Sets default encoding for any column. Valid values are: plain, plain_dictionary, rle, bit_packed, delta_binary_packed, delta_length_byte_array, delta_byte_array, rle_dictionary, and byte_stream_split. These values are not case sensitive. If NULL, uses default parquet writer setting +datafusion.execution.parquet.filter_effectiveness_threshold 0.8 (reading) Minimum filter effectiveness threshold for adaptive filter pushdown. When `pushdown_filters` is enabled, filters that don't filter out at least this fraction of rows will be demoted from row-level filters to post-scan filters. This helps avoid the I/O cost of late materialization for filters that aren't selective enough. Valid values are 0.0 to 1.0, where 0.8 means filters must filter out at least 80% of rows to remain as row filters. datafusion.execution.parquet.force_filter_selections false (reading) Force the use of RowSelections for filter results, when pushdown_filters is enabled. If false, the reader will automatically choose between a RowSelection and a Bitmap based on the number and pattern of selected rows. datafusion.execution.parquet.max_predicate_cache_size NULL (reading) The maximum predicate cache size, in bytes. When `pushdown_filters` is enabled, sets the maximum memory used to cache the results of predicate evaluation between filter evaluation and output generation. Decreasing this value will reduce memory usage, but may increase IO and CPU usage. None means use the default parquet reader setting. 0 means no caching. datafusion.execution.parquet.max_row_group_size 1048576 (writing) Target maximum number of rows in each row group (defaults to 1M rows). Writing larger row groups requires more memory to write, but can get better compression and be faster to read. @@ -386,7 +388,7 @@ datafusion.execution.parquet.maximum_buffered_record_batches_per_stream 2 (writi datafusion.execution.parquet.maximum_parallel_row_group_writers 1 (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. datafusion.execution.parquet.metadata_size_hint 524288 (reading) If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two reads are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer Default setting to 512 KiB, which should be sufficient for most parquet files, it can reduce one I/O operation per parquet file. If the metadata is larger than the hint, two reads will still be performed. datafusion.execution.parquet.pruning true (reading) If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file -datafusion.execution.parquet.pushdown_filters false (reading) If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded. This optimization is sometimes called "late materialization". +datafusion.execution.parquet.pushdown_filters true (reading) If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded. This optimization is sometimes called "late materialization". datafusion.execution.parquet.reorder_filters false (reading) If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query datafusion.execution.parquet.schema_force_view_types true (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. datafusion.execution.parquet.skip_arrow_metadata false (writing) Skip encoding the embedded arrow metadata in the KV_meta This is analogous to the `ArrowWriterOptions::with_skip_arrow_metadata`. Refer to diff --git a/datafusion/sqllogictest/test_files/parquet.slt b/datafusion/sqllogictest/test_files/parquet.slt index be713b963b451..6f12f5ab42843 100644 --- a/datafusion/sqllogictest/test_files/parquet.slt +++ b/datafusion/sqllogictest/test_files/parquet.slt @@ -457,10 +457,7 @@ EXPLAIN logical_plan 01)Filter: CAST(binary_as_string_default.binary_col AS Utf8View) LIKE Utf8View("%a%") AND CAST(binary_as_string_default.largebinary_col AS Utf8View) LIKE Utf8View("%a%") AND CAST(binary_as_string_default.binaryview_col AS Utf8View) LIKE Utf8View("%a%") 02)--TableScan: binary_as_string_default projection=[binary_col, largebinary_col, binaryview_col], partial_filters=[CAST(binary_as_string_default.binary_col AS Utf8View) LIKE Utf8View("%a%"), CAST(binary_as_string_default.largebinary_col AS Utf8View) LIKE Utf8View("%a%"), CAST(binary_as_string_default.binaryview_col AS Utf8View) LIKE Utf8View("%a%")] -physical_plan -01)FilterExec: CAST(binary_col@0 AS Utf8View) LIKE %a% AND CAST(largebinary_col@1 AS Utf8View) LIKE %a% AND CAST(binaryview_col@2 AS Utf8View) LIKE %a% -02)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 -03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/binary_as_string.parquet]]}, projection=[binary_col, largebinary_col, binaryview_col], file_type=parquet, predicate=CAST(binary_col@0 AS Utf8View) LIKE %a% AND CAST(largebinary_col@1 AS Utf8View) LIKE %a% AND CAST(binaryview_col@2 AS Utf8View) LIKE %a% +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/binary_as_string.parquet]]}, projection=[binary_col, largebinary_col, binaryview_col], file_type=parquet, predicate=CAST(binary_col@0 AS Utf8View) LIKE %a% AND CAST(largebinary_col@1 AS Utf8View) LIKE %a% AND CAST(binaryview_col@2 AS Utf8View) LIKE %a% statement ok @@ -504,10 +501,7 @@ EXPLAIN logical_plan 01)Filter: binary_as_string_option.binary_col LIKE Utf8View("%a%") AND binary_as_string_option.largebinary_col LIKE Utf8View("%a%") AND binary_as_string_option.binaryview_col LIKE Utf8View("%a%") 02)--TableScan: binary_as_string_option projection=[binary_col, largebinary_col, binaryview_col], partial_filters=[binary_as_string_option.binary_col LIKE Utf8View("%a%"), binary_as_string_option.largebinary_col LIKE Utf8View("%a%"), binary_as_string_option.binaryview_col LIKE Utf8View("%a%")] -physical_plan -01)FilterExec: binary_col@0 LIKE %a% AND largebinary_col@1 LIKE %a% AND binaryview_col@2 LIKE %a% -02)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 -03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/binary_as_string.parquet]]}, projection=[binary_col, largebinary_col, binaryview_col], file_type=parquet, predicate=binary_col@0 LIKE %a% AND largebinary_col@1 LIKE %a% AND binaryview_col@2 LIKE %a% +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/binary_as_string.parquet]]}, projection=[binary_col, largebinary_col, binaryview_col], file_type=parquet, predicate=binary_col@0 LIKE %a% AND largebinary_col@1 LIKE %a% AND binaryview_col@2 LIKE %a% statement ok @@ -554,10 +548,7 @@ EXPLAIN logical_plan 01)Filter: binary_as_string_both.binary_col LIKE Utf8View("%a%") AND binary_as_string_both.largebinary_col LIKE Utf8View("%a%") AND binary_as_string_both.binaryview_col LIKE Utf8View("%a%") 02)--TableScan: binary_as_string_both projection=[binary_col, largebinary_col, binaryview_col], partial_filters=[binary_as_string_both.binary_col LIKE Utf8View("%a%"), binary_as_string_both.largebinary_col LIKE Utf8View("%a%"), binary_as_string_both.binaryview_col LIKE Utf8View("%a%")] -physical_plan -01)FilterExec: binary_col@0 LIKE %a% AND largebinary_col@1 LIKE %a% AND binaryview_col@2 LIKE %a% -02)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 -03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/binary_as_string.parquet]]}, projection=[binary_col, largebinary_col, binaryview_col], file_type=parquet, predicate=binary_col@0 LIKE %a% AND largebinary_col@1 LIKE %a% AND binaryview_col@2 LIKE %a% +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/binary_as_string.parquet]]}, projection=[binary_col, largebinary_col, binaryview_col], file_type=parquet, predicate=binary_col@0 LIKE %a% AND largebinary_col@1 LIKE %a% AND binaryview_col@2 LIKE %a% statement ok @@ -668,10 +659,7 @@ explain select * from foo where starts_with(column1, 'f'); logical_plan 01)Filter: foo.column1 LIKE Utf8View("f%") 02)--TableScan: foo projection=[column1], partial_filters=[foo.column1 LIKE Utf8View("f%")] -physical_plan -01)FilterExec: column1@0 LIKE f% -02)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 -03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/foo.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 LIKE f%, pruning_predicate=column1_null_count@2 != row_count@3 AND column1_min@0 <= g AND f <= column1_max@1, required_guarantees=[] +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/foo.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 LIKE f%, pruning_predicate=column1_null_count@2 != row_count@3 AND column1_min@0 <= g AND f <= column1_max@1, required_guarantees=[] statement ok drop table foo diff --git a/datafusion/sqllogictest/test_files/parquet_statistics.slt b/datafusion/sqllogictest/test_files/parquet_statistics.slt index 8c77fb96ba75c..da159dab81b94 100644 --- a/datafusion/sqllogictest/test_files/parquet_statistics.slt +++ b/datafusion/sqllogictest/test_files/parquet_statistics.slt @@ -58,10 +58,7 @@ LOCATION 'test_files/scratch/parquet_statistics/test_table'; query TT EXPLAIN SELECT * FROM test_table WHERE column1 = 1; ---- -physical_plan -01)FilterExec: column1@0 = 1, statistics=[Rows=Inexact(2), Bytes=Inexact(10), [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)) Null=Inexact(0) ScanBytes=Inexact(40))]] -02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2, statistics=[Rows=Inexact(5), Bytes=Inexact(40), [(Col[0]: Min=Inexact(Int64(1)) Max=Inexact(Int64(4)) Null=Inexact(0) ScanBytes=Inexact(40))]] -03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/1.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 = 1, pruning_predicate=column1_null_count@2 != row_count@3 AND column1_min@0 <= 1 AND 1 <= column1_max@1, required_guarantees=[column1 in (1)], statistics=[Rows=Inexact(5), Bytes=Inexact(40), [(Col[0]: Min=Inexact(Int64(1)) Max=Inexact(Int64(4)) Null=Inexact(0) ScanBytes=Inexact(40))]] +physical_plan DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/1.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 = 1, pruning_predicate=column1_null_count@2 != row_count@3 AND column1_min@0 <= 1 AND 1 <= column1_max@1, required_guarantees=[column1 in (1)], statistics=[Rows=Inexact(5), Bytes=Inexact(40), [(Col[0]: Min=Inexact(Int64(1)) Max=Inexact(Int64(4)) Null=Inexact(0) ScanBytes=Inexact(40))]] # cleanup statement ok @@ -83,10 +80,7 @@ LOCATION 'test_files/scratch/parquet_statistics/test_table'; query TT EXPLAIN SELECT * FROM test_table WHERE column1 = 1; ---- -physical_plan -01)FilterExec: column1@0 = 1, statistics=[Rows=Inexact(2), Bytes=Inexact(10), [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)) Null=Inexact(0) ScanBytes=Inexact(40))]] -02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2, statistics=[Rows=Inexact(5), Bytes=Inexact(40), [(Col[0]: Min=Inexact(Int64(1)) Max=Inexact(Int64(4)) Null=Inexact(0) ScanBytes=Inexact(40))]] -03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/1.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 = 1, pruning_predicate=column1_null_count@2 != row_count@3 AND column1_min@0 <= 1 AND 1 <= column1_max@1, required_guarantees=[column1 in (1)], statistics=[Rows=Inexact(5), Bytes=Inexact(40), [(Col[0]: Min=Inexact(Int64(1)) Max=Inexact(Int64(4)) Null=Inexact(0) ScanBytes=Inexact(40))]] +physical_plan DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/1.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 = 1, pruning_predicate=column1_null_count@2 != row_count@3 AND column1_min@0 <= 1 AND 1 <= column1_max@1, required_guarantees=[column1 in (1)], statistics=[Rows=Inexact(5), Bytes=Inexact(40), [(Col[0]: Min=Inexact(Int64(1)) Max=Inexact(Int64(4)) Null=Inexact(0) ScanBytes=Inexact(40))]] # cleanup statement ok @@ -109,10 +103,7 @@ LOCATION 'test_files/scratch/parquet_statistics/test_table'; query TT EXPLAIN SELECT * FROM test_table WHERE column1 = 1; ---- -physical_plan -01)FilterExec: column1@0 = 1, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)))]] -02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:)]] -03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/1.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 = 1, pruning_predicate=column1_null_count@2 != row_count@3 AND column1_min@0 <= 1 AND 1 <= column1_max@1, required_guarantees=[column1 in (1)], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:)]] +physical_plan DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/1.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 = 1, pruning_predicate=column1_null_count@2 != row_count@3 AND column1_min@0 <= 1 AND 1 <= column1_max@1, required_guarantees=[column1 in (1)], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:)]] # cleanup statement ok diff --git a/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt b/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt index 34c5fd97b51f3..3b1be9c712857 100644 --- a/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt +++ b/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt @@ -336,11 +336,8 @@ physical_plan 06)----------AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey], aggr=[max(d.env), max(d.service), count(Int64(1)), sum(f.value)], ordering_mode=Sorted 07)------------ProjectionExec: expr=[value@2 as value, f_dkey@3 as f_dkey, env@0 as env, service@1 as service] 08)--------------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(d_dkey@0, f_dkey@1)], projection=[env@1, service@2, value@3, f_dkey@4] -09)----------------CoalescePartitionsExec -10)------------------FilterExec: service@2 = log -11)--------------------RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1 -12)----------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension/data.parquet]]}, projection=[d_dkey, env, service], file_type=parquet, predicate=service@2 = log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <= log AND log <= service_max@1, required_guarantees=[service in (log)] -13)----------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ] +09)----------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension/data.parquet]]}, projection=[d_dkey, env, service], file_type=parquet, predicate=service@2 = log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <= log AND log <= service_max@1, required_guarantees=[service in (log)] +10)----------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ] # Verify results without optimization query TTTIR rowsort @@ -388,11 +385,8 @@ physical_plan 03)----AggregateExec: mode=SinglePartitioned, gby=[f_dkey@1 as f_dkey], aggr=[max(d.env), max(d.service), count(Int64(1)), sum(f.value)], ordering_mode=Sorted 04)------ProjectionExec: expr=[value@2 as value, f_dkey@3 as f_dkey, env@0 as env, service@1 as service] 05)--------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(d_dkey@0, f_dkey@1)], projection=[env@1, service@2, value@3, f_dkey@4] -06)----------CoalescePartitionsExec -07)------------FilterExec: service@2 = log -08)--------------RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1 -09)----------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension/data.parquet]]}, projection=[d_dkey, env, service], file_type=parquet, predicate=service@2 = log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <= log AND log <= service_max@1, required_guarantees=[service in (log)] -10)----------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ] +06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension/data.parquet]]}, projection=[d_dkey, env, service], file_type=parquet, predicate=service@2 = log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <= log AND log <= service_max@1, required_guarantees=[service in (log)] +07)----------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ] query TTTIR rowsort SELECT f.f_dkey, MAX(d.env), MAX(d.service), count(*), sum(f.value) diff --git a/datafusion/sqllogictest/test_files/projection.slt b/datafusion/sqllogictest/test_files/projection.slt index 5a4411233424a..14a29c68a3bfa 100644 --- a/datafusion/sqllogictest/test_files/projection.slt +++ b/datafusion/sqllogictest/test_files/projection.slt @@ -274,8 +274,4 @@ logical_plan 01)Projection: 02)--Filter: t1.a > Int64(1) 03)----TableScan: t1 projection=[a], partial_filters=[t1.a > Int64(1)] -physical_plan -01)ProjectionExec: expr=[] -02)--FilterExec: a@0 > 1 -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection/17513.parquet]]}, projection=[a], file_type=parquet, predicate=a@0 > 1, pruning_predicate=a_null_count@1 != row_count@2 AND a_max@0 > 1, required_guarantees=[] +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection/17513.parquet]]}, file_type=parquet, predicate=a@0 > 1, pruning_predicate=a_null_count@1 != row_count@2 AND a_max@0 > 1, required_guarantees=[] diff --git a/datafusion/sqllogictest/test_files/push_down_filter.slt b/datafusion/sqllogictest/test_files/push_down_filter.slt index 4353f805c848b..0def58bc10cbb 100644 --- a/datafusion/sqllogictest/test_files/push_down_filter.slt +++ b/datafusion/sqllogictest/test_files/push_down_filter.slt @@ -208,7 +208,6 @@ physical_plan query II select * from test_filter_with_limit where value = 2 limit 1; ---- -2 2 # Tear down test_filter_with_limit table: diff --git a/datafusion/sqllogictest/test_files/repartition_scan.slt b/datafusion/sqllogictest/test_files/repartition_scan.slt index c9c2f91257081..a04815595b758 100644 --- a/datafusion/sqllogictest/test_files/repartition_scan.slt +++ b/datafusion/sqllogictest/test_files/repartition_scan.slt @@ -62,9 +62,7 @@ EXPLAIN SELECT column1 FROM parquet_table WHERE column1 <> 42; logical_plan 01)Filter: parquet_table.column1 != Int32(42) 02)--TableScan: parquet_table projection=[column1], partial_filters=[parquet_table.column1 != Int32(42)] -physical_plan -01)FilterExec: column1@0 != 42 -02)--DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..135], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:135..270], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:270..405], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:405..537]]}, projection=[column1], file_type=parquet, predicate=column1@0 != 42, pruning_predicate=column1_null_count@2 != row_count@3 AND (column1_min@0 != 42 OR 42 != column1_max@1), required_guarantees=[column1 not in (42)] +physical_plan DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..135], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:135..270], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:270..405], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:405..537]]}, projection=[column1], file_type=parquet, predicate=column1@0 != 42, pruning_predicate=column1_null_count@2 != row_count@3 AND (column1_min@0 != 42 OR 42 != column1_max@1), required_guarantees=[column1 not in (42)] # disable round robin repartitioning statement ok @@ -77,9 +75,7 @@ EXPLAIN SELECT column1 FROM parquet_table WHERE column1 <> 42; logical_plan 01)Filter: parquet_table.column1 != Int32(42) 02)--TableScan: parquet_table projection=[column1], partial_filters=[parquet_table.column1 != Int32(42)] -physical_plan -01)FilterExec: column1@0 != 42 -02)--DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..135], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:135..270], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:270..405], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:405..537]]}, projection=[column1], file_type=parquet, predicate=column1@0 != 42, pruning_predicate=column1_null_count@2 != row_count@3 AND (column1_min@0 != 42 OR 42 != column1_max@1), required_guarantees=[column1 not in (42)] +physical_plan DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..135], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:135..270], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:270..405], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:405..537]]}, projection=[column1], file_type=parquet, predicate=column1@0 != 42, pruning_predicate=column1_null_count@2 != row_count@3 AND (column1_min@0 != 42 OR 42 != column1_max@1), required_guarantees=[column1 not in (42)] # enable round robin repartitioning again statement ok @@ -102,8 +98,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [column1@0 ASC NULLS LAST] 02)--SortExec: expr=[column1@0 ASC NULLS LAST], preserve_partitioning=[true] -03)----FilterExec: column1@0 != 42 -04)------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..266], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:266..526, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..6], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:6..272], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:272..537]]}, projection=[column1], file_type=parquet, predicate=column1@0 != 42, pruning_predicate=column1_null_count@2 != row_count@3 AND (column1_min@0 != 42 OR 42 != column1_max@1), required_guarantees=[column1 not in (42)] +03)----DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..266], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:266..526, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..6], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:6..272], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:272..537]]}, projection=[column1], file_type=parquet, predicate=column1@0 != 42, pruning_predicate=column1_null_count@2 != row_count@3 AND (column1_min@0 != 42 OR 42 != column1_max@1), required_guarantees=[column1 not in (42)] ## Read the files as though they are ordered @@ -137,8 +132,7 @@ logical_plan 03)----TableScan: parquet_table_with_order projection=[column1], partial_filters=[parquet_table_with_order.column1 != Int32(42)] physical_plan 01)SortPreservingMergeExec: [column1@0 ASC NULLS LAST] -02)--FilterExec: column1@0 != 42 -03)----DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..263], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..268], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:268..537], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:263..526]]}, projection=[column1], output_ordering=[column1@0 ASC NULLS LAST], file_type=parquet, predicate=column1@0 != 42, pruning_predicate=column1_null_count@2 != row_count@3 AND (column1_min@0 != 42 OR 42 != column1_max@1), required_guarantees=[column1 not in (42)] +02)--DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..263], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..268], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:268..537], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:263..526]]}, projection=[column1], output_ordering=[column1@0 ASC NULLS LAST], file_type=parquet, predicate=column1@0 != 42, pruning_predicate=column1_null_count@2 != row_count@3 AND (column1_min@0 != 42 OR 42 != column1_max@1), required_guarantees=[column1 not in (42)] # Cleanup statement ok diff --git a/datafusion/sqllogictest/test_files/repartition_subset_satisfaction.slt b/datafusion/sqllogictest/test_files/repartition_subset_satisfaction.slt index e2c9fa4237939..84769b2dabfb7 100644 --- a/datafusion/sqllogictest/test_files/repartition_subset_satisfaction.slt +++ b/datafusion/sqllogictest/test_files/repartition_subset_satisfaction.slt @@ -381,9 +381,8 @@ physical_plan 12)----------------------ProjectionExec: expr=[f_dkey@3 as f_dkey, env@0 as env, timestamp@1 as timestamp, value@2 as value] 13)------------------------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(d_dkey@1, f_dkey@2)], projection=[env@0, timestamp@2, value@3, f_dkey@4] 14)--------------------------CoalescePartitionsExec -15)----------------------------FilterExec: service@1 = log, projection=[env@0, d_dkey@2] -16)------------------------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=A/data.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=D/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=C/data.parquet]]}, projection=[env, service, d_dkey], file_type=parquet, predicate=service@1 = log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <= log AND log <= service_max@1, required_guarantees=[service in (log)] -17)--------------------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=C/data.parquet]]}, projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS LAST, timestamp@0 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ] +15)----------------------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=A/data.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=D/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=C/data.parquet]]}, projection=[env, d_dkey], file_type=parquet, predicate=service@1 = log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <= log AND log <= service_max@1, required_guarantees=[service in (log)] +16)--------------------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=C/data.parquet]]}, projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS LAST, timestamp@0 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ] # Verify results without subset satisfaction query TPR rowsort @@ -477,9 +476,8 @@ physical_plan 09)----------------ProjectionExec: expr=[f_dkey@3 as f_dkey, env@0 as env, timestamp@1 as timestamp, value@2 as value] 10)------------------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(d_dkey@1, f_dkey@2)], projection=[env@0, timestamp@2, value@3, f_dkey@4] 11)--------------------CoalescePartitionsExec -12)----------------------FilterExec: service@1 = log, projection=[env@0, d_dkey@2] -13)------------------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=A/data.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=D/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=C/data.parquet]]}, projection=[env, service, d_dkey], file_type=parquet, predicate=service@1 = log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <= log AND log <= service_max@1, required_guarantees=[service in (log)] -14)--------------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=C/data.parquet]]}, projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS LAST, timestamp@0 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ] +12)----------------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=A/data.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=D/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=C/data.parquet]]}, projection=[env, d_dkey], file_type=parquet, predicate=service@1 = log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <= log AND log <= service_max@1, required_guarantees=[service in (log)] +13)--------------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=C/data.parquet]]}, projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS LAST, timestamp@0 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ] # Verify results match with subset satisfaction query TPR rowsort diff --git a/datafusion/sqllogictest/test_files/sort_pushdown.slt b/datafusion/sqllogictest/test_files/sort_pushdown.slt index 58d9915a24be2..2f56fa6e3d778 100644 --- a/datafusion/sqllogictest/test_files/sort_pushdown.slt +++ b/datafusion/sqllogictest/test_files/sort_pushdown.slt @@ -413,8 +413,6 @@ WHERE timeframe = 'quarterly' ORDER BY period_end ASC LIMIT 2; ---- -quarterly 1 5000 -quarterly 2 5500 # Test 2.5: Test with different constant value query TIR diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index c9222afe8ceb5..4158a79648b4e 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -82,7 +82,7 @@ The following configuration settings are available: | datafusion.execution.parquet.pruning | true | (reading) If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file | | datafusion.execution.parquet.skip_metadata | true | (reading) If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata | | datafusion.execution.parquet.metadata_size_hint | 524288 | (reading) If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two reads are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer Default setting to 512 KiB, which should be sufficient for most parquet files, it can reduce one I/O operation per parquet file. If the metadata is larger than the hint, two reads will still be performed. | -| datafusion.execution.parquet.pushdown_filters | false | (reading) If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded. This optimization is sometimes called "late materialization". | +| datafusion.execution.parquet.pushdown_filters | true | (reading) If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded. This optimization is sometimes called "late materialization". | | datafusion.execution.parquet.reorder_filters | false | (reading) If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query | | datafusion.execution.parquet.force_filter_selections | false | (reading) Force the use of RowSelections for filter results, when pushdown_filters is enabled. If false, the reader will automatically choose between a RowSelection and a Bitmap based on the number and pattern of selected rows. | | datafusion.execution.parquet.schema_force_view_types | true | (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. | @@ -90,6 +90,7 @@ The following configuration settings are available: | datafusion.execution.parquet.coerce_int96 | NULL | (reading) If true, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. This is useful for reading data from systems like Spark which stores microsecond resolution timestamps in an int96 allowing it to write values with a larger date range than 64-bit timestamps with nanosecond resolution. | | datafusion.execution.parquet.bloom_filter_on_read | true | (reading) Use any available bloom filters when reading parquet files | | datafusion.execution.parquet.max_predicate_cache_size | NULL | (reading) The maximum predicate cache size, in bytes. When `pushdown_filters` is enabled, sets the maximum memory used to cache the results of predicate evaluation between filter evaluation and output generation. Decreasing this value will reduce memory usage, but may increase IO and CPU usage. None means use the default parquet reader setting. 0 means no caching. | +| datafusion.execution.parquet.filter_effectiveness_threshold | 0.8 | (reading) Minimum filter effectiveness threshold for adaptive filter pushdown. When `pushdown_filters` is enabled, filters that don't filter out at least this fraction of rows will be demoted from row-level filters to post-scan filters. This helps avoid the I/O cost of late materialization for filters that aren't selective enough. Valid values are 0.0 to 1.0, where 0.8 means filters must filter out at least 80% of rows to remain as row filters. | | datafusion.execution.parquet.data_pagesize_limit | 1048576 | (writing) Sets best effort maximum size of data page in bytes | | datafusion.execution.parquet.write_batch_size | 1024 | (writing) Sets write_batch_size in bytes | | datafusion.execution.parquet.writer_version | 1.0 | (writing) Sets parquet writer version valid values are "1.0" and "2.0" |