Skip to content

Commit e54eb42

Browse files
authored
Fix predicate_rows_pruned & predicate_rows_matched metrics (#18980)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #. ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent 2bfa64d commit e54eb42

File tree

2 files changed

+116
-3
lines changed

2 files changed

+116
-3
lines changed

datafusion/core/src/datasource/physical_plan/parquet.rs

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1874,6 +1874,100 @@ mod tests {
18741874
assert_contains!(&explain, "projection=[c1]");
18751875
}
18761876

1877+
#[tokio::test]
1878+
async fn parquet_exec_metrics_with_multiple_predicates() {
1879+
// Test that metrics are correctly calculated when multiple predicates
1880+
// are pushed down (connected with AND). This ensures we don't double-count
1881+
// rows when multiple predicates filter the data sequentially.
1882+
1883+
// Create a batch with two columns: c1 (string) and c2 (int32)
1884+
// Total: 10 rows
1885+
let c1: ArrayRef = Arc::new(StringArray::from(vec![
1886+
Some("foo"), // 0 - passes c1 filter, fails c2 filter (5 <= 10)
1887+
Some("bar"), // 1 - fails c1 filter
1888+
Some("bar"), // 2 - fails c1 filter
1889+
Some("baz"), // 3 - passes both filters (20 > 10)
1890+
Some("foo"), // 4 - passes both filters (12 > 10)
1891+
Some("bar"), // 5 - fails c1 filter
1892+
Some("baz"), // 6 - passes both filters (25 > 10)
1893+
Some("foo"), // 7 - passes c1 filter, fails c2 filter (7 <= 10)
1894+
Some("bar"), // 8 - fails c1 filter
1895+
Some("qux"), // 9 - passes both filters (30 > 10)
1896+
]));
1897+
1898+
let c2: ArrayRef = Arc::new(Int32Array::from(vec![
1899+
Some(5),
1900+
Some(15),
1901+
Some(8),
1902+
Some(20),
1903+
Some(12),
1904+
Some(9),
1905+
Some(25),
1906+
Some(7),
1907+
Some(18),
1908+
Some(30),
1909+
]));
1910+
1911+
let batch = create_batch(vec![("c1", c1), ("c2", c2)]);
1912+
1913+
// Create filter: c1 != 'bar' AND c2 > 10
1914+
//
1915+
// First predicate (c1 != 'bar'):
1916+
// - Rows passing: 0, 3, 4, 6, 7, 9 (6 rows)
1917+
// - Rows pruned: 1, 2, 5, 8 (4 rows)
1918+
//
1919+
// Second predicate (c2 > 10) on remaining 6 rows:
1920+
// - Rows passing: 3, 4, 6, 9 (4 rows with c2 = 20, 12, 25, 30)
1921+
// - Rows pruned: 0, 7 (2 rows with c2 = 5, 7)
1922+
//
1923+
// Expected final metrics:
1924+
// - pushdown_rows_matched: 4 (final result)
1925+
// - pushdown_rows_pruned: 4 + 2 = 6 (cumulative)
1926+
// - Total: 4 + 6 = 10
1927+
1928+
let filter = col("c1").not_eq(lit("bar")).and(col("c2").gt(lit(10)));
1929+
1930+
let rt = RoundTrip::new()
1931+
.with_predicate(filter)
1932+
.with_pushdown_predicate()
1933+
.round_trip(vec![batch])
1934+
.await;
1935+
1936+
let metrics = rt.parquet_exec.metrics().unwrap();
1937+
1938+
// Verify the result rows
1939+
assert_snapshot!(batches_to_string(&rt.batches.unwrap()),@r###"
1940+
+-----+----+
1941+
| c1 | c2 |
1942+
+-----+----+
1943+
| baz | 20 |
1944+
| foo | 12 |
1945+
| baz | 25 |
1946+
| qux | 30 |
1947+
+-----+----+
1948+
"###);
1949+
1950+
// Verify metrics - this is the key test
1951+
let pushdown_rows_matched = get_value(&metrics, "pushdown_rows_matched");
1952+
let pushdown_rows_pruned = get_value(&metrics, "pushdown_rows_pruned");
1953+
1954+
assert_eq!(
1955+
pushdown_rows_matched, 4,
1956+
"Expected 4 rows to pass both predicates"
1957+
);
1958+
assert_eq!(
1959+
pushdown_rows_pruned, 6,
1960+
"Expected 6 rows to be pruned (4 by first predicate + 2 by second predicate)"
1961+
);
1962+
1963+
// The sum should equal the total number of rows
1964+
assert_eq!(
1965+
pushdown_rows_matched + pushdown_rows_pruned,
1966+
10,
1967+
"matched + pruned should equal total rows"
1968+
);
1969+
}
1970+
18771971
#[tokio::test]
18781972
async fn parquet_exec_has_no_pruning_predicate_if_can_not_prune() {
18791973
// batch1: c1(string)

datafusion/datasource-parquet/src/row_filter.rs

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -470,14 +470,33 @@ pub fn build_row_filter(
470470
});
471471
}
472472

473+
// To avoid double-counting metrics when multiple predicates are used:
474+
// - All predicates should count rows_pruned (cumulative pruned rows)
475+
// - Only the last predicate should count rows_matched (final result)
476+
// This ensures: rows_matched + rows_pruned = total rows processed
477+
let total_candidates = candidates.len();
478+
473479
candidates
474480
.into_iter()
475-
.map(|candidate| {
481+
.enumerate()
482+
.map(|(idx, candidate)| {
483+
let is_last = idx == total_candidates - 1;
484+
485+
// All predicates share the pruned counter (cumulative)
486+
let predicate_rows_pruned = rows_pruned.clone();
487+
488+
// Only the last predicate tracks matched rows (final result)
489+
let predicate_rows_matched = if is_last {
490+
rows_matched.clone()
491+
} else {
492+
metrics::Count::new()
493+
};
494+
476495
DatafusionArrowPredicate::try_new(
477496
candidate,
478497
metadata,
479-
rows_pruned.clone(),
480-
rows_matched.clone(),
498+
predicate_rows_pruned,
499+
predicate_rows_matched,
481500
time.clone(),
482501
)
483502
.map(|pred| Box::new(pred) as _)

0 commit comments

Comments
 (0)