Skip to content

Commit c0b86d7

Browse files
committed
feat: adaptive filter selectivity tracking for Parquet row filters
1 parent b9a3b9f commit c0b86d7

File tree

34 files changed

+1265
-137
lines changed

34 files changed

+1265
-137
lines changed

datafusion/common/src/config.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -688,7 +688,7 @@ config_namespace! {
688688

689689
/// (reading) If true, filter expressions are be applied during the parquet decoding operation to
690690
/// reduce the number of rows decoded. This optimization is sometimes called "late materialization".
691-
pub pushdown_filters: bool, default = false
691+
pub pushdown_filters: bool, default = true
692692

693693
/// (reading) If true, filter expressions evaluated during the parquet decoding operation
694694
/// will be reordered heuristically to minimize the cost of evaluation. If false,
@@ -732,6 +732,16 @@ config_namespace! {
732732
/// parquet reader setting. 0 means no caching.
733733
pub max_predicate_cache_size: Option<usize>, default = None
734734

735+
/// (reading) Minimum filter effectiveness threshold for adaptive filter
736+
/// pushdown.
737+
/// Only filters that filter out at least this fraction of rows will be
738+
/// promoted to row filters during adaptive filter pushdown.
739+
/// A value of 1.0 means only filters that filter out all rows will be
740+
/// promoted. A value of 0.0 means all filters will be promoted.
741+
/// Because there can be a high I/O cost to pushing down ineffective filters,
742+
/// recommended values are in the range [0.8, 0.95], depending on random I/0 costs.
743+
pub filter_effectiveness_threshold: f64, default = 1.0
744+
735745
// The following options affect writing to parquet files
736746
// and map to parquet::file::properties::WriterProperties
737747

datafusion/common/src/file_options/parquet_writer.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ impl ParquetOptions {
209209
coerce_int96: _, // not used for writer props
210210
skip_arrow_metadata: _,
211211
max_predicate_cache_size: _,
212+
filter_effectiveness_threshold: _, // not used for writer props
212213
} = self;
213214

214215
let mut builder = WriterProperties::builder()
@@ -464,6 +465,7 @@ mod tests {
464465
skip_arrow_metadata: defaults.skip_arrow_metadata,
465466
coerce_int96: None,
466467
max_predicate_cache_size: defaults.max_predicate_cache_size,
468+
filter_effectiveness_threshold: defaults.filter_effectiveness_threshold,
467469
}
468470
}
469471

@@ -578,6 +580,8 @@ mod tests {
578580
binary_as_string: global_options_defaults.binary_as_string,
579581
skip_arrow_metadata: global_options_defaults.skip_arrow_metadata,
580582
coerce_int96: None,
583+
filter_effectiveness_threshold: global_options_defaults
584+
.filter_effectiveness_threshold,
581585
},
582586
column_specific_options,
583587
key_value_metadata,

datafusion/core/src/dataframe/parquet.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,14 @@ mod tests {
150150
let plan = df.explain(false, false)?.collect().await?;
151151
// Filters all the way to Parquet
152152
let formatted = pretty::pretty_format_batches(&plan)?.to_string();
153-
assert!(formatted.contains("FilterExec: id@0 = 1"), "{formatted}");
153+
let data_source_exec_row = formatted
154+
.lines()
155+
.find(|line| line.contains("DataSourceExec:"))
156+
.unwrap();
157+
assert!(
158+
data_source_exec_row.contains("predicate=id@0 = 1"),
159+
"{formatted}"
160+
);
154161

155162
Ok(())
156163
}

datafusion/core/src/datasource/physical_plan/parquet.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,8 @@ mod tests {
169169
if self.pushdown_predicate {
170170
source = source
171171
.with_pushdown_filters(true)
172-
.with_reorder_filters(true);
172+
.with_reorder_filters(true)
173+
.with_filter_effectiveness_threshold(0.0);
173174
} else {
174175
source = source.with_pushdown_filters(false);
175176
}

datafusion/core/src/datasource/view_test.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,11 @@ mod tests {
301301

302302
#[tokio::test]
303303
async fn filter_pushdown_view() -> Result<()> {
304-
let ctx = SessionContext::new();
304+
// Disable parquet pushdown_filters to ensure filters stay as FilterExec nodes
305+
// rather than being pushed into the Parquet reader
306+
let config = SessionConfig::new()
307+
.set_bool("datafusion.execution.parquet.pushdown_filters", false);
308+
let ctx = SessionContext::new_with_config(config);
305309

306310
ctx.register_parquet(
307311
"test",

datafusion/core/tests/parquet/filter_pushdown.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -621,10 +621,10 @@ fn get_value(metrics: &MetricsSet, metric_name: &str) -> usize {
621621
#[tokio::test]
622622
async fn predicate_cache_default() -> datafusion_common::Result<()> {
623623
let ctx = SessionContext::new();
624-
// The cache is on by default, but not used unless filter pushdown is enabled
624+
// The cache is on by default, and used since pushdown_filters is now true by default
625625
PredicateCacheTest {
626-
expected_inner_records: 0,
627-
expected_records: 0,
626+
expected_inner_records: 8,
627+
expected_records: 7, // reads more than necessary from the cache as then another bitmap is applied
628628
}
629629
.run(&ctx)
630630
.await

datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1721,6 +1721,11 @@ async fn test_topk_dynamic_filter_pushdown_integration() {
17211721
let mut cfg = SessionConfig::new();
17221722
cfg.options_mut().execution.parquet.pushdown_filters = true;
17231723
cfg.options_mut().execution.parquet.max_row_group_size = 128;
1724+
// Always pushdown filters into row filters for this test
1725+
cfg.options_mut()
1726+
.execution
1727+
.parquet
1728+
.filter_effectiveness_threshold = 0.0;
17241729
let ctx = SessionContext::new_with_config(cfg);
17251730
ctx.register_object_store(
17261731
ObjectStoreUrl::parse("memory://").unwrap().as_ref(),

datafusion/core/tests/sql/explain_analyze.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -862,7 +862,7 @@ async fn parquet_explain_analyze() {
862862
.to_string();
863863

864864
// should contain aggregated stats
865-
assert_contains!(&formatted, "output_rows=8");
865+
assert_contains!(&formatted, "output_rows=5");
866866
assert_contains!(
867867
&formatted,
868868
"row_groups_pruned_bloom_filter=1 total \u{2192} 1 matched"
@@ -995,11 +995,7 @@ async fn parquet_recursive_projection_pushdown() -> Result<()> {
995995
@r"
996996
SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false]
997997
RecursiveQueryExec: name=number_series, is_distinct=false
998-
CoalescePartitionsExec
999-
ProjectionExec: expr=[id@0 as id, 1 as level]
1000-
FilterExec: id@0 = 1
1001-
RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), input_partitions=1
1002-
DataSourceExec: file_groups={1 group: [[TMP_DIR/hierarchy.parquet]]}, projection=[id], file_type=parquet, predicate=id@0 = 1, pruning_predicate=id_null_count@2 != row_count@3 AND id_min@0 <= 1 AND 1 <= id_max@1, required_guarantees=[id in (1)]
998+
DataSourceExec: file_groups={1 group: [[TMP_DIR/hierarchy.parquet]]}, projection=[id, 1 as level], file_type=parquet, predicate=id@0 = 1, pruning_predicate=id_null_count@2 != row_count@3 AND id_min@0 <= 1 AND 1 <= id_max@1, required_guarantees=[id in (1)]
1003999
CoalescePartitionsExec
10041000
ProjectionExec: expr=[id@0 + 1 as ns.id + Int64(1), level@1 + 1 as ns.level + Int64(1)]
10051001
FilterExec: id@0 < 10

datafusion/datasource-parquet/src/file_format.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -456,6 +456,12 @@ impl FileFormat for ParquetFormat {
456456
) -> Result<Arc<dyn ExecutionPlan>> {
457457
let mut metadata_size_hint = None;
458458

459+
let filter_effectiveness_threshold = state
460+
.config_options()
461+
.execution
462+
.parquet
463+
.filter_effectiveness_threshold;
464+
459465
if let Some(metadata) = self.metadata_size_hint() {
460466
metadata_size_hint = Some(metadata);
461467
}
@@ -467,6 +473,7 @@ impl FileFormat for ParquetFormat {
467473
.cloned()
468474
.ok_or_else(|| internal_datafusion_err!("Expected ParquetSource"))?;
469475
source = source.with_table_parquet_options(self.options.clone());
476+
source = source.with_filter_pushdown_selectivity(filter_effectiveness_threshold);
470477

471478
// Use the CachedParquetFileReaderFactory
472479
let metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache();

datafusion/datasource-parquet/src/metrics.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ pub struct ParquetFileMetrics {
7676
/// number of rows that were stored in the cache after evaluating predicates
7777
/// reused for the output.
7878
pub predicate_cache_records: Count,
79+
//// Time spent applying filters
80+
pub filter_apply_time: Time,
7981
}
8082

8183
impl ParquetFileMetrics {
@@ -162,6 +164,10 @@ impl ParquetFileMetrics {
162164
.with_new_label("filename", filename.to_string())
163165
.counter("predicate_cache_records", partition);
164166

167+
let filter_apply_time = MetricBuilder::new(metrics)
168+
.with_new_label("filename", filename.to_string())
169+
.subset_time("filter_apply_time", partition);
170+
165171
Self {
166172
files_ranges_pruned_statistics,
167173
predicate_evaluation_errors,
@@ -179,6 +185,7 @@ impl ParquetFileMetrics {
179185
scan_efficiency_ratio,
180186
predicate_cache_inner_records,
181187
predicate_cache_records,
188+
filter_apply_time,
182189
}
183190
}
184191
}

0 commit comments

Comments
 (0)