Skip to content

Commit 53ebed7

Browse files
committed
add row_groups_fully_matched_statistics
1 parent 91e9735 commit 53ebed7

File tree

4 files changed

+54
-20
lines changed

4 files changed

+54
-20
lines changed

datafusion/core/tests/parquet/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,11 @@ impl TestOutput {
150150
self.metric_value("row_groups_matched_statistics")
151151
}
152152

153+
/// The number of row_groups fully matched by statistics
154+
fn row_groups_fully_matched_statistics(&self) -> Option<usize> {
155+
self.metric_value("row_groups_fully_matched_statistics")
156+
}
157+
153158
/// The number of row_groups pruned by statistics
154159
fn row_groups_pruned_statistics(&self) -> Option<usize> {
155160
self.metric_value("row_groups_pruned_statistics")

datafusion/core/tests/parquet/row_group_pruning.rs

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ struct RowGroupPruningTest {
3434
query: String,
3535
expected_errors: Option<usize>,
3636
expected_row_group_matched_by_statistics: Option<usize>,
37+
expected_row_group_fully_matched_by_statistics: Option<usize>,
3738
expected_row_group_pruned_by_statistics: Option<usize>,
3839
expected_files_pruned_by_statistics: Option<usize>,
3940
expected_row_group_matched_by_bloom_filter: Option<usize>,
@@ -50,6 +51,7 @@ impl RowGroupPruningTest {
5051
expected_errors: None,
5152
expected_row_group_matched_by_statistics: None,
5253
expected_row_group_pruned_by_statistics: None,
54+
expected_row_group_fully_matched_by_statistics: None,
5355
expected_files_pruned_by_statistics: None,
5456
expected_row_group_matched_by_bloom_filter: None,
5557
expected_row_group_pruned_by_bloom_filter: None,
@@ -82,6 +84,15 @@ impl RowGroupPruningTest {
8284
self
8385
}
8486

87+
// Set the expected fully matched row groups by statistics
88+
fn with_fully_matched_by_stats(
89+
mut self,
90+
fully_matched_by_stats: Option<usize>,
91+
) -> Self {
92+
self.expected_row_group_fully_matched_by_statistics = fully_matched_by_stats;
93+
self
94+
}
95+
8596
// Set the expected pruned row groups by statistics
8697
fn with_pruned_by_stats(mut self, pruned_by_stats: Option<usize>) -> Self {
8798
self.expected_row_group_pruned_by_statistics = pruned_by_stats;
@@ -197,6 +208,11 @@ impl RowGroupPruningTest {
197208
self.expected_row_group_matched_by_statistics,
198209
"mismatched row_groups_matched_statistics",
199210
);
211+
assert_eq!(
212+
output.row_groups_fully_matched_statistics(),
213+
self.expected_row_group_fully_matched_by_statistics,
214+
"mismatched row_groups_fully_matched_statistics",
215+
);
200216
assert_eq!(
201217
output.row_groups_pruned_statistics(),
202218
self.expected_row_group_pruned_by_statistics,
@@ -1719,8 +1735,24 @@ fn make_i32_batch(
17191735
RecordBatch::try_new(schema, vec![array]).map_err(DataFusionError::from)
17201736
}
17211737

1738+
// Helper function to create a batch with two Int32 columns
1739+
fn make_two_col_i32_batch(
1740+
name_a: &str,
1741+
name_b: &str,
1742+
values_a: Vec<i32>,
1743+
values_b: Vec<i32>,
1744+
) -> datafusion_common::error::Result<RecordBatch> {
1745+
let schema = Arc::new(Schema::new(vec![
1746+
Field::new(name_a, DataType::Int32, false),
1747+
Field::new(name_b, DataType::Int32, false),
1748+
]));
1749+
let array_a: ArrayRef = Arc::new(Int32Array::from(values_a));
1750+
let array_b: ArrayRef = Arc::new(Int32Array::from(values_b));
1751+
RecordBatch::try_new(schema, vec![array_a, array_b]).map_err(DataFusionError::from)
1752+
}
1753+
17221754
#[tokio::test]
1723-
async fn test_limit_pruning() -> datafusion_common::error::Result<()> {
1755+
async fn test_limit_pruning_basic() -> datafusion_common::error::Result<()> {
17241756
// Scenario: Simple integer column, multiple row groups
17251757
// Query: SELECT c1 FROM t WHERE c1 = 0 LIMIT 2
17261758
// We expect 2 rows in total.
@@ -1754,6 +1786,7 @@ async fn test_limit_pruning() -> datafusion_common::error::Result<()> {
17541786
.with_expected_rows(2)
17551787
.with_pruned_files(Some(0))
17561788
.with_matched_by_stats(Some(4))
1789+
.with_fully_matched_by_stats(Some(3))
17571790
.with_pruned_by_stats(Some(1))
17581791
.with_limit_pruned_row_groups(Some(3))
17591792
.test_row_group_prune_with_custom_data(schema, batches, 2)
@@ -1762,22 +1795,6 @@ async fn test_limit_pruning() -> datafusion_common::error::Result<()> {
17621795
Ok(())
17631796
}
17641797

1765-
// Helper function to create a batch with two Int32 columns
1766-
fn make_two_col_i32_batch(
1767-
name_a: &str,
1768-
name_b: &str,
1769-
values_a: Vec<i32>,
1770-
values_b: Vec<i32>,
1771-
) -> datafusion_common::error::Result<RecordBatch> {
1772-
let schema = Arc::new(Schema::new(vec![
1773-
Field::new(name_a, DataType::Int32, false),
1774-
Field::new(name_b, DataType::Int32, false),
1775-
]));
1776-
let array_a: ArrayRef = Arc::new(Int32Array::from(values_a));
1777-
let array_b: ArrayRef = Arc::new(Int32Array::from(values_b));
1778-
RecordBatch::try_new(schema, vec![array_a, array_b]).map_err(DataFusionError::from)
1779-
}
1780-
17811798
#[tokio::test]
17821799
async fn test_limit_pruning_complex_filter() -> datafusion_common::error::Result<()> {
17831800
// Test Case 1: Complex filter with two columns (a = 1 AND b > 1 AND b < 4)
@@ -1815,6 +1832,7 @@ async fn test_limit_pruning_complex_filter() -> datafusion_common::error::Result
18151832
.with_expected_rows(5)
18161833
.with_pruned_files(Some(0))
18171834
.with_matched_by_stats(Some(4)) // RG0,1,2,3 are matched
1835+
.with_fully_matched_by_stats(Some(3))
18181836
.with_pruned_by_stats(Some(2)) // RG4,5 are pruned
18191837
.with_limit_pruned_row_groups(Some(2)) // RG0, RG3 is pruned by limit
18201838
.test_row_group_prune_with_custom_data(schema, batches, 3)
@@ -1855,6 +1873,7 @@ async fn test_limit_pruning_multiple_fully_matched(
18551873
.with_expected_rows(8)
18561874
.with_pruned_files(Some(0))
18571875
.with_matched_by_stats(Some(4)) // RG0,1,2,3 matched
1876+
.with_fully_matched_by_stats(Some(4))
18581877
.with_pruned_by_stats(Some(1)) // RG4 pruned
18591878
.with_limit_pruned_row_groups(Some(2)) // RG2,3 pruned by limit
18601879
.test_row_group_prune_with_custom_data(schema, batches, 4)
@@ -1894,6 +1913,7 @@ async fn test_limit_pruning_no_fully_matched() -> datafusion_common::error::Resu
18941913
.with_expected_rows(3)
18951914
.with_pruned_files(Some(0))
18961915
.with_matched_by_stats(Some(4)) // RG0,1,2,3 matched
1916+
.with_fully_matched_by_stats(Some(0))
18971917
.with_pruned_by_stats(Some(1)) // RG4 pruned
18981918
.with_limit_pruned_row_groups(Some(0)) // RG3 pruned by limit
18991919
.test_row_group_prune_with_custom_data(schema, batches, 3)
@@ -1934,6 +1954,7 @@ async fn test_limit_pruning_exceeds_fully_matched() -> datafusion_common::error:
19341954
.with_expected_rows(10) // Total: 1 + 3 + 4 + 1 = 9 (less than limit)
19351955
.with_pruned_files(Some(0))
19361956
.with_matched_by_stats(Some(4)) // RG0,1,2,3 matched
1957+
.with_fully_matched_by_stats(Some(2))
19371958
.with_pruned_by_stats(Some(1)) // RG4 pruned
19381959
.with_limit_pruned_row_groups(Some(0)) // No limit pruning since we need all RGs
19391960
.test_row_group_prune_with_custom_data(schema, batches, 4)

datafusion/datasource-parquet/src/metrics.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ pub struct ParquetFileMetrics {
5050
pub row_groups_pruned_bloom_filter: Count,
5151
/// Number of row groups pruned due to limit pruning.
5252
pub limit_pruned_row_groups: Count,
53+
/// Number of row groups whose statistics were checked and fully matched
54+
pub row_groups_fully_matched_statistics: Count,
5355
/// Number of row groups whose statistics were checked and matched (not pruned)
5456
pub row_groups_matched_statistics: Count,
5557
/// Number of row groups pruned by statistics
@@ -98,6 +100,10 @@ impl ParquetFileMetrics {
98100
let limit_pruned_row_groups = MetricBuilder::new(metrics)
99101
.with_new_label("filename", filename.to_string())
100102
.counter("limit_pruned_row_groups", partition);
103+
104+
let row_groups_fully_matched_statistics = MetricBuilder::new(metrics)
105+
.with_new_label("filename", filename.to_string())
106+
.counter("row_groups_fully_matched_statistics", partition);
101107

102108
let row_groups_matched_statistics = MetricBuilder::new(metrics)
103109
.with_new_label("filename", filename.to_string())
@@ -151,6 +157,7 @@ impl ParquetFileMetrics {
151157
predicate_evaluation_errors,
152158
row_groups_matched_bloom_filter,
153159
row_groups_pruned_bloom_filter,
160+
row_groups_fully_matched_statistics,
154161
row_groups_matched_statistics,
155162
row_groups_pruned_statistics,
156163
limit_pruned_row_groups,

datafusion/datasource-parquet/src/row_group_filter.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -166,8 +166,6 @@ impl RowGroupAccessPlanFilter {
166166

167167
assert_eq!(groups.len(), self.access_plan.len());
168168
// Indexes of row groups still to scan
169-
let row_group_indexes_to_consider = self.access_plan.row_group_indexes();
170-
// Indexes of row groups still to scan
171169
let row_group_indexes = self.access_plan.row_group_indexes();
172170
let row_group_metadatas = row_group_indexes
173171
.iter()
@@ -190,7 +188,7 @@ impl RowGroupAccessPlanFilter {
190188
values.iter().enumerate()
191189
{
192190
let original_row_group_idx =
193-
row_group_indexes_to_consider[idx_in_pruning_stats_result];
191+
row_group_indexes[idx_in_pruning_stats_result];
194192
if !pruning_result {
195193
new_access_plan.skip(original_row_group_idx);
196194
metrics.row_groups_pruned_statistics.add(1);
@@ -201,6 +199,8 @@ impl RowGroupAccessPlanFilter {
201199
}
202200
}
203201

202+
// Note: this part of code shouldn't be expensive with a limited number of row groups
203+
// If we do find it's expensive, we can consider optimizing it further.
204204
if !fully_contained_candidates_original_idx.is_empty() {
205205
// Use NotExpr to create the inverted predicate
206206
let inverted_expr =
@@ -232,6 +232,7 @@ impl RowGroupAccessPlanFilter {
232232
// it implies that *all* rows in this group satisfy the original predicate.
233233
if !inverted_values[i] {
234234
self.is_fully_matched[original_row_group_idx] = true;
235+
metrics.row_groups_fully_matched_statistics.add(1);
235236
}
236237
}
237238
}

0 commit comments

Comments
 (0)