Skip to content

Commit 8262a7f

Browse files
committed
Add test for page-level limit pruning
1 parent c73e9f9 commit 8262a7f

File tree

4 files changed

+54
-31
lines changed

4 files changed

+54
-31
lines changed

datafusion/core/tests/parquet/page_pruning.rs

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
use std::sync::Arc;
1919

20+
use crate::parquet::utils::MetricsFinder;
2021
use crate::parquet::Unit::Page;
2122
use crate::parquet::{ContextWithParquet, Scenario};
2223

@@ -45,6 +46,7 @@ async fn get_parquet_exec(
4546
state: &SessionState,
4647
filter: Expr,
4748
pushdown_filters: bool,
49+
limit: Option<usize>,
4850
) -> DataSourceExec {
4951
let object_store_url = ObjectStoreUrl::local_filesystem();
5052
let store = state.runtime_env().object_store(&object_store_url).unwrap();
@@ -88,6 +90,7 @@ async fn get_parquet_exec(
8890
);
8991
let base_config = FileScanConfigBuilder::new(object_store_url, schema, source)
9092
.with_file(partitioned_file)
93+
.with_limit(limit)
9194
.build();
9295

9396
DataSourceExec::new(Arc::new(base_config))
@@ -98,7 +101,7 @@ async fn get_filter_results(
98101
filter: Expr,
99102
pushdown_filters: bool,
100103
) -> Vec<RecordBatch> {
101-
let parquet_exec = get_parquet_exec(state, filter, pushdown_filters).await;
104+
let parquet_exec = get_parquet_exec(state, filter, pushdown_filters, None).await;
102105
let task_ctx = state.task_ctx();
103106
let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap();
104107
let mut batches = Vec::new();
@@ -108,6 +111,29 @@ async fn get_filter_results(
108111
batches
109112
}
110113

114+
#[tokio::test]
115+
async fn limit_pruning_filter_one_col() {
116+
let session_ctx = SessionContext::new();
117+
let state = session_ctx.state();
118+
let filter = col("month").eq(lit(1_i32));
119+
120+
let parquet_exec = get_parquet_exec(&state, filter, false, Some(10)).await;
121+
let task_ctx = state.task_ctx();
122+
let mut results = parquet_exec.execute(0, task_ctx).unwrap();
123+
let mut limited_rows = 0;
124+
while let Some(Ok(batch)) = results.next().await {
125+
limited_rows += batch.num_rows();
126+
}
127+
128+
assert_eq!(limited_rows, 10);
129+
let metrics = MetricsFinder::find_metrics(&parquet_exec).unwrap();
130+
if let Some(matched_rows) =
131+
cast_count_metric(metrics.sum_by_name("limit_pruning_matched_rows").unwrap())
132+
{
133+
assert_eq!(matched_rows, 10);
134+
}
135+
}
136+
111137
#[tokio::test]
112138
async fn page_index_filter_one_col() {
113139
let session_ctx = SessionContext::new();

datafusion/datasource-parquet/src/metrics.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ pub struct ParquetFileMetrics {
5050
pub row_groups_pruned_bloom_filter: Count,
5151
/// Number of row groups pruned due to limit pruning.
5252
pub limit_pruned_row_groups: Count,
53+
/// Number of rows matched by limit pruning.
54+
pub limit_pruning_matched_rows: Count,
5355
/// Number of row groups whose statistics were checked and matched (not pruned)
5456
pub row_groups_matched_statistics: Count,
5557
/// Number of row groups pruned by statistics
@@ -99,6 +101,10 @@ impl ParquetFileMetrics {
99101
.with_new_label("filename", filename.to_string())
100102
.counter("limit_pruned_row_groups", partition);
101103

104+
let limit_pruning_matched_rows = MetricBuilder::new(metrics)
105+
.with_new_label("filename", filename.to_string())
106+
.counter("limit_pruning_matched_rows", partition);
107+
102108
let row_groups_matched_statistics = MetricBuilder::new(metrics)
103109
.with_new_label("filename", filename.to_string())
104110
.counter("row_groups_matched_statistics", partition);
@@ -154,6 +160,7 @@ impl ParquetFileMetrics {
154160
row_groups_matched_statistics,
155161
row_groups_pruned_statistics,
156162
limit_pruned_row_groups,
163+
limit_pruning_matched_rows,
157164
bytes_scanned,
158165
pushdown_rows_pruned,
159166
pushdown_rows_matched,

datafusion/datasource-parquet/src/opener.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,6 @@ impl FileOpener for ParquetOpener {
395395
);
396396

397397
access_plan = new_access_plan;
398-
399398
// Apply page-level limit pruning if limit is specified
400399
if let Some(limit) = limit {
401400
access_plan = p.prune_by_limit(
@@ -411,7 +410,7 @@ impl FileOpener for ParquetOpener {
411410

412411
let row_group_indexes = access_plan.row_group_indexes();
413412
if let Some(row_selection) =
414-
access_plan.into_overall_row_selection(rg_metadata)?
413+
access_plan.into_overall_row_selection(rg_metadata).unwrap()
415414
{
416415
builder = builder.with_row_selection(row_selection);
417416
}

datafusion/datasource-parquet/src/page_filter.rs

Lines changed: 19 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -115,11 +115,6 @@ pub struct PagePruningAccessPlanFilter {
115115
/// single column predicates (e.g. (`col = 5`) extracted from the overall
116116
/// predicate. Must all be true for a row to be included in the result.
117117
predicates: Vec<PruningPredicate>,
118-
/// For each row group, tracks which pages are fully matched (all rows satisfy predicates)
119-
/// Key: row group index, Value: Vec of booleans (one per page)
120-
fully_matched_pages: HashMap<usize, Vec<bool>>,
121-
/// For each row group, stores the row counts per page
122-
page_row_counts: HashMap<usize, Vec<usize>>,
123118
}
124119

125120
impl PagePruningAccessPlanFilter {
@@ -154,11 +149,7 @@ impl PagePruningAccessPlanFilter {
154149
Some(pp)
155150
})
156151
.collect::<Vec<_>>();
157-
Self {
158-
predicates,
159-
fully_matched_pages: HashMap::new(),
160-
page_row_counts: HashMap::new(),
161-
}
152+
Self { predicates }
162153
}
163154

164155
/// Returns an updated [`ParquetAccessPlan`] by applying predicates to the
@@ -337,7 +328,7 @@ impl PagePruningAccessPlanFilter {
337328
rg_metadata: &[RowGroupMetaData],
338329
file_metrics: &ParquetFileMetrics,
339330
) -> ParquetAccessPlan {
340-
if self.fully_matched_pages.is_empty() {
331+
if page_match_infos.is_empty() {
341332
return access_plan;
342333
}
343334

@@ -388,15 +379,16 @@ impl PagePruningAccessPlanFilter {
388379
}
389380

390381
// Build a RowSelection for this row group that includes only the fully matched pages
391-
let page_row_counts = self.page_row_counts.get(&rg_idx).unwrap();
392-
let row_selection = build_row_selection_for_pages(
393-
&page_indices,
394-
&page_row_counts,
395-
limit - rows_selected,
396-
);
397-
398-
new_access_plan.scan_selection(rg_idx, row_selection);
399-
rows_selected += std::cmp::min(row_count, limit - rows_selected);
382+
if let Some(page_match_info) = page_match_infos.get(&rg_idx) {
383+
let row_selection = build_row_selection_for_pages(
384+
&page_indices,
385+
&page_match_info.page_row_counts,
386+
limit - rows_selected,
387+
);
388+
new_access_plan.scan(rg_idx);
389+
new_access_plan.scan_selection(rg_idx, row_selection);
390+
rows_selected += std::cmp::min(row_count, limit - rows_selected);
391+
}
400392
}
401393

402394
let original_row_groups = access_plan.row_group_indexes().len();
@@ -406,7 +398,7 @@ impl PagePruningAccessPlanFilter {
406398
if pruned_row_groups > 0 {
407399
file_metrics.limit_pruned_row_groups.add(pruned_row_groups);
408400
}
409-
401+
file_metrics.limit_pruning_matched_rows.add(rows_selected);
410402
return new_access_plan;
411403
}
412404

@@ -490,7 +482,10 @@ fn build_row_selection_for_pages(
490482
let mut page_idx_iter = page_indices.iter().peekable();
491483

492484
for (page_num, &page_row_count) in page_row_counts.iter().enumerate() {
493-
if let Some(&&next_page_idx) = page_idx_iter.peek() {
485+
if rows_selected >= limit {
486+
// Once we've reached the limit, skip all remaining pages
487+
current_row += page_row_count;
488+
} else if let Some(&&next_page_idx) = page_idx_iter.peek() {
494489
if page_num == next_page_idx {
495490
// This page should be selected
496491
page_idx_iter.next();
@@ -510,10 +505,6 @@ fn build_row_selection_for_pages(
510505
if rows_to_select < page_row_count {
511506
current_row = page_row_count - rows_to_select;
512507
}
513-
514-
if rows_selected >= limit {
515-
break;
516-
}
517508
} else {
518509
// This page should be skipped
519510
current_row += page_row_count;
@@ -876,7 +867,7 @@ mod tests {
876867
build_row_selection_for_pages(&page_indices, &page_row_counts, limit);
877868
let result = row_selection_to_vec(&selection);
878869

879-
assert_eq!(result, vec![(true, 200), (false, 50)]);
870+
assert_eq!(result, vec![(true, 200), (false, 250)]);
880871
}
881872

882873
#[test]
@@ -890,6 +881,6 @@ mod tests {
890881
build_row_selection_for_pages(&page_indices, &page_row_counts, limit);
891882
let result = row_selection_to_vec(&selection);
892883

893-
assert_eq!(result, vec![(false, 50), (true, 150), (false, 30),]);
884+
assert_eq!(result, vec![(false, 50), (true, 150), (false, 210),]);
894885
}
895886
}

0 commit comments

Comments
 (0)