Skip to content

Commit b4ba1c6

Browse files
authored
Skip re-pruning based on partition values and file level stats if there are no dynamic filters (#16424)
1 parent 1884175 commit b4ba1c6

File tree

15 files changed

+543
-115
lines changed

15 files changed

+543
-115
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/core/tests/parquet/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,10 @@ impl TestOutput {
152152
self.metric_value("row_groups_pruned_statistics")
153153
}
154154

155+
fn files_ranges_pruned_statistics(&self) -> Option<usize> {
156+
self.metric_value("files_ranges_pruned_statistics")
157+
}
158+
155159
/// The number of row_groups matched by bloom filter or statistics
156160
fn row_groups_matched(&self) -> Option<usize> {
157161
self.row_groups_matched_bloom_filter()
@@ -192,6 +196,8 @@ impl ContextWithParquet {
192196
unit: Unit,
193197
mut config: SessionConfig,
194198
) -> Self {
199+
// Use a single partition for deterministic results no matter how many CPUs the host has
200+
config = config.with_target_partitions(1);
195201
let file = match unit {
196202
Unit::RowGroup(row_per_group) => {
197203
config = config.with_parquet_bloom_filter_pruning(true);

datafusion/core/tests/parquet/row_group_pruning.rs

Lines changed: 100 additions & 0 deletions
Large diffs are not rendered by default.

datafusion/datasource-parquet/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ datafusion-physical-expr = { workspace = true }
4545
datafusion-physical-expr-common = { workspace = true }
4646
datafusion-physical-optimizer = { workspace = true }
4747
datafusion-physical-plan = { workspace = true }
48+
datafusion-pruning = { workspace = true }
4849
datafusion-session = { workspace = true }
4950
futures = { workspace = true }
5051
itertools = { workspace = true }

datafusion/datasource-parquet/src/metrics.rs

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,21 @@ use datafusion_physical_plan::metrics::{
2727
/// [`ParquetFileReaderFactory`]: super::ParquetFileReaderFactory
2828
#[derive(Debug, Clone)]
2929
pub struct ParquetFileMetrics {
30-
/// Number of files pruned by partition of file level statistics
31-
/// This often happens at planning time but may happen at execution time
30+
/// Number of file **ranges** pruned by partition or file level statistics.
31+
/// Pruning of files often happens at planning time but may happen at execution time
3232
/// if dynamic filters (e.g. from a join) result in additional pruning.
33-
pub files_pruned_statistics: Count,
33+
///
34+
/// This does **not** necessarily equal the number of files pruned:
35+
/// files may be scanned in sub-ranges to increase parallelism,
36+
/// in which case this will represent the number of sub-ranges pruned, not the number of files.
37+
/// The number of files pruned will always be less than or equal to this number.
38+
///
39+
/// A single file may have some ranges that are not pruned and some that are pruned.
40+
/// For example, with a query like `ORDER BY col LIMIT 10`, the TopK dynamic filter
41+
/// pushdown optimization may fill up the TopK heap when reading the first part of a file,
42+
/// then skip the second part if file statistics indicate it cannot contain rows
43+
/// that would be in the TopK.
44+
pub files_ranges_pruned_statistics: Count,
3445
/// Number of times the predicate could not be evaluated
3546
pub predicate_evaluation_errors: Count,
3647
/// Number of row groups whose bloom filters were checked and matched (not pruned)
@@ -126,11 +137,11 @@ impl ParquetFileMetrics {
126137
.with_new_label("filename", filename.to_string())
127138
.subset_time("metadata_load_time", partition);
128139

129-
let files_pruned_statistics =
130-
MetricBuilder::new(metrics).counter("files_pruned_statistics", partition);
140+
let files_ranges_pruned_statistics = MetricBuilder::new(metrics)
141+
.counter("files_ranges_pruned_statistics", partition);
131142

132143
Self {
133-
files_pruned_statistics,
144+
files_ranges_pruned_statistics,
134145
predicate_evaluation_errors,
135146
row_groups_matched_bloom_filter,
136147
row_groups_pruned_bloom_filter,

datafusion/datasource-parquet/src/opener.rs

Lines changed: 140 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -29,21 +29,18 @@ use datafusion_datasource::file_meta::FileMeta;
2929
use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
3030
use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
3131

32-
use arrow::datatypes::{FieldRef, Schema, SchemaRef, TimeUnit};
32+
use arrow::datatypes::{FieldRef, SchemaRef, TimeUnit};
3333
use arrow::error::ArrowError;
34-
use datafusion_common::pruning::{
35-
CompositePruningStatistics, PartitionPruningStatistics, PrunableStatistics,
36-
PruningStatistics,
37-
};
38-
use datafusion_common::{exec_err, Result};
34+
use datafusion_common::{exec_err, DataFusionError, Result};
3935
use datafusion_datasource::PartitionedFile;
4036
use datafusion_physical_expr::PhysicalExprSchemaRewriter;
41-
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
42-
use datafusion_physical_optimizer::pruning::PruningPredicate;
37+
use datafusion_physical_expr_common::physical_expr::{
38+
is_dynamic_physical_expr, PhysicalExpr,
39+
};
4340
use datafusion_physical_plan::metrics::{Count, ExecutionPlanMetricsSet, MetricBuilder};
41+
use datafusion_pruning::{build_pruning_predicate, FilePruner, PruningPredicate};
4442

4543
use futures::{StreamExt, TryStreamExt};
46-
use itertools::Itertools;
4744
use log::debug;
4845
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
4946
use parquet::arrow::async_reader::AsyncFileReader;
@@ -134,66 +131,40 @@ impl FileOpener for ParquetOpener {
134131
let enable_page_index = self.enable_page_index;
135132

136133
Ok(Box::pin(async move {
137-
// Prune this file using the file level statistics.
134+
// Prune this file using the file level statistics and partition values.
138135
// Since dynamic filters may have been updated since planning it is possible that we are able
139136
// to prune files now that we couldn't prune at planning time.
140-
if let Some(predicate) = &predicate {
141-
// Build a pruning schema that combines the file fields and partition fields.
142-
// Partition fileds are always at the end.
143-
let pruning_schema = Arc::new(
144-
Schema::new(
145-
logical_file_schema
146-
.fields()
147-
.iter()
148-
.cloned()
149-
.chain(partition_fields.iter().cloned())
150-
.collect_vec(),
137+
// It is assumed that there is no point in doing pruning here if the predicate is not dynamic,
138+
// as it would have been done at planning time.
139+
// We'll also check this after every record batch we read,
140+
// and if at some point we are able to prove we can prune the file using just the file level statistics
141+
// we can end the stream early.
142+
let mut file_pruner = predicate
143+
.as_ref()
144+
.map(|p| {
145+
Ok::<_, DataFusionError>(
146+
(is_dynamic_physical_expr(p) | file.has_statistics()).then_some(
147+
FilePruner::new(
148+
Arc::clone(p),
149+
&logical_file_schema,
150+
partition_fields.clone(),
151+
file.clone(),
152+
predicate_creation_errors.clone(),
153+
)?,
154+
),
151155
)
152-
.with_metadata(logical_file_schema.metadata().clone()),
153-
);
154-
let pruning_predicate = build_pruning_predicate(
155-
Arc::clone(predicate),
156-
&pruning_schema,
157-
&predicate_creation_errors,
158-
);
159-
if let Some(pruning_predicate) = pruning_predicate {
160-
// The partition column schema is the schema of the table - the schema of the file
161-
let mut pruning = Box::new(PartitionPruningStatistics::try_new(
162-
vec![file.partition_values.clone()],
163-
partition_fields.clone(),
164-
)?)
165-
as Box<dyn PruningStatistics>;
166-
if let Some(stats) = file.statistics {
167-
let stats_pruning = Box::new(PrunableStatistics::new(
168-
vec![stats],
169-
Arc::clone(&pruning_schema),
170-
));
171-
pruning = Box::new(CompositePruningStatistics::new(vec![
172-
pruning,
173-
stats_pruning,
174-
]));
175-
}
176-
match pruning_predicate.prune(pruning.as_ref()) {
177-
Ok(values) => {
178-
assert!(values.len() == 1);
179-
// We expect a single container -> if all containers are false skip this file
180-
if values.into_iter().all(|v| !v) {
181-
// Return an empty stream
182-
file_metrics.files_pruned_statistics.add(1);
183-
return Ok(futures::stream::empty().boxed());
184-
}
185-
}
186-
// Stats filter array could not be built, so we can't prune
187-
Err(e) => {
188-
debug!(
189-
"Ignoring error building pruning predicate for file '{}': {e}",
190-
file_meta.location(),
191-
);
192-
predicate_creation_errors.add(1);
193-
}
194-
}
156+
})
157+
.transpose()?
158+
.flatten();
159+
160+
if let Some(file_pruner) = &mut file_pruner {
161+
if file_pruner.should_prune()? {
162+
// Return an empty stream immediately to skip the work of setting up the actual stream
163+
file_metrics.files_ranges_pruned_statistics.add(1);
164+
return Ok(futures::stream::empty().boxed());
195165
}
196166
}
167+
197168
// Don't load the page index yet. Since it is not stored inline in
198169
// the footer, loading the page index if it is not needed will do
199170
// unecessary I/O. We decide later if it is needed to evaluate the
@@ -439,30 +410,6 @@ fn create_initial_plan(
439410
Ok(ParquetAccessPlan::new_all(row_group_count))
440411
}
441412

442-
/// Build a pruning predicate from an optional predicate expression.
443-
/// If the predicate is None or the predicate cannot be converted to a pruning
444-
/// predicate, return None.
445-
/// If there is an error creating the pruning predicate it is recorded by incrementing
446-
/// the `predicate_creation_errors` counter.
447-
pub(crate) fn build_pruning_predicate(
448-
predicate: Arc<dyn PhysicalExpr>,
449-
file_schema: &SchemaRef,
450-
predicate_creation_errors: &Count,
451-
) -> Option<Arc<PruningPredicate>> {
452-
match PruningPredicate::try_new(predicate, Arc::clone(file_schema)) {
453-
Ok(pruning_predicate) => {
454-
if !pruning_predicate.always_true() {
455-
return Some(Arc::new(pruning_predicate));
456-
}
457-
}
458-
Err(e) => {
459-
debug!("Could not create pruning predicate for: {e}");
460-
predicate_creation_errors.add(1);
461-
}
462-
}
463-
None
464-
}
465-
466413
/// Build a page pruning predicate from an optional predicate expression.
467414
/// If the predicate is None or the predicate cannot be converted to a page pruning
468415
/// predicate, return None.
@@ -554,7 +501,9 @@ mod test {
554501
schema_adapter::DefaultSchemaAdapterFactory, PartitionedFile,
555502
};
556503
use datafusion_expr::{col, lit};
557-
use datafusion_physical_expr::planner::logical2physical;
504+
use datafusion_physical_expr::{
505+
expressions::DynamicFilterPhysicalExpr, planner::logical2physical, PhysicalExpr,
506+
};
558507
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
559508
use futures::{Stream, StreamExt};
560509
use object_store::{memory::InMemory, path::Path, ObjectMeta, ObjectStore};
@@ -601,6 +550,13 @@ mod test {
601550
data_len
602551
}
603552

553+
fn make_dynamic_expr(expr: Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr> {
554+
Arc::new(DynamicFilterPhysicalExpr::new(
555+
expr.children().into_iter().map(Arc::clone).collect(),
556+
expr,
557+
))
558+
}
559+
604560
#[tokio::test]
605561
async fn test_prune_on_statistics() {
606562
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
@@ -691,7 +647,7 @@ mod test {
691647
}
692648

693649
#[tokio::test]
694-
async fn test_prune_on_partition_statistics() {
650+
async fn test_prune_on_partition_statistics_with_dynamic_expression() {
695651
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
696652

697653
let batch = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap();
@@ -753,7 +709,9 @@ mod test {
753709

754710
// Filter should match the partition value
755711
let expr = col("part").eq(lit(1));
756-
let predicate = logical2physical(&expr, &table_schema);
712+
// Mark the expression as dynamic even if it's not to force partition pruning to happen
713+
// Otherwise we assume it already happened at the planning stage and won't re-do the work here
714+
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
757715
let opener = make_opener(predicate);
758716
let stream = opener
759717
.open(make_meta(), file.clone())
@@ -766,7 +724,9 @@ mod test {
766724

767725
// Filter should not match the partition value
768726
let expr = col("part").eq(lit(2));
769-
let predicate = logical2physical(&expr, &table_schema);
727+
// Mark the expression as dynamic even if it's not to force partition pruning to happen
728+
// Otherwise we assume it already happened at the planning stage and won't re-do the work here
729+
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
770730
let opener = make_opener(predicate);
771731
let stream = opener.open(make_meta(), file).unwrap().await.unwrap();
772732
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
@@ -1005,4 +965,92 @@ mod test {
1005965
assert_eq!(num_batches, 0);
1006966
assert_eq!(num_rows, 0);
1007967
}
968+
969+
/// Test that if the filter is not a dynamic filter and we have no stats we don't do extra pruning work at the file level.
970+
#[tokio::test]
971+
async fn test_opener_pruning_skipped_on_static_filters() {
972+
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
973+
974+
let batch = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap();
975+
let data_size =
976+
write_parquet(Arc::clone(&store), "part=1/file.parquet", batch.clone()).await;
977+
978+
let file_schema = batch.schema();
979+
let mut file = PartitionedFile::new(
980+
"part=1/file.parquet".to_string(),
981+
u64::try_from(data_size).unwrap(),
982+
);
983+
file.partition_values = vec![ScalarValue::Int32(Some(1))];
984+
985+
let table_schema = Arc::new(Schema::new(vec![
986+
Field::new("part", DataType::Int32, false),
987+
Field::new("a", DataType::Int32, false),
988+
]));
989+
990+
let make_opener = |predicate| {
991+
ParquetOpener {
992+
partition_index: 0,
993+
projection: Arc::new([0]),
994+
batch_size: 1024,
995+
limit: None,
996+
predicate: Some(predicate),
997+
logical_file_schema: file_schema.clone(),
998+
metadata_size_hint: None,
999+
metrics: ExecutionPlanMetricsSet::new(),
1000+
parquet_file_reader_factory: Arc::new(
1001+
DefaultParquetFileReaderFactory::new(Arc::clone(&store)),
1002+
),
1003+
partition_fields: vec![Arc::new(Field::new(
1004+
"part",
1005+
DataType::Int32,
1006+
false,
1007+
))],
1008+
pushdown_filters: false, // note that this is false!
1009+
reorder_filters: false,
1010+
enable_page_index: false,
1011+
enable_bloom_filter: false,
1012+
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
1013+
enable_row_group_stats_pruning: true,
1014+
coerce_int96: None,
1015+
}
1016+
};
1017+
1018+
let make_meta = || FileMeta {
1019+
object_meta: ObjectMeta {
1020+
location: Path::from("part=1/file.parquet"),
1021+
last_modified: Utc::now(),
1022+
size: u64::try_from(data_size).unwrap(),
1023+
e_tag: None,
1024+
version: None,
1025+
},
1026+
range: None,
1027+
extensions: None,
1028+
metadata_size_hint: None,
1029+
};
1030+
1031+
// Filter should NOT match the stats but the file is never attempted to be pruned because the filters are not dynamic
1032+
let expr = col("part").eq(lit(2));
1033+
let predicate = logical2physical(&expr, &table_schema);
1034+
let opener = make_opener(predicate);
1035+
let stream = opener
1036+
.open(make_meta(), file.clone())
1037+
.unwrap()
1038+
.await
1039+
.unwrap();
1040+
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
1041+
assert_eq!(num_batches, 1);
1042+
assert_eq!(num_rows, 3);
1043+
1044+
// If we make the filter dynamic, it should prune
1045+
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
1046+
let opener = make_opener(predicate);
1047+
let stream = opener
1048+
.open(make_meta(), file.clone())
1049+
.unwrap()
1050+
.await
1051+
.unwrap();
1052+
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
1053+
assert_eq!(num_batches, 0);
1054+
assert_eq!(num_rows, 0);
1055+
}
10081056
}

0 commit comments

Comments
 (0)