Skip to content

Commit e8591cb

Browse files
committed
fix not expr
1 parent 8262a7f commit e8591cb

File tree

5 files changed

+344
-110
lines changed

5 files changed

+344
-110
lines changed

datafusion/core/tests/parquet/row_group_pruning.rs

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -207,16 +207,6 @@ impl RowGroupPruningTest {
207207
self.expected_files_pruned_by_statistics,
208208
"mismatched files_ranges_pruned_statistics",
209209
);
210-
assert_eq!(
211-
output.row_groups_matched_bloom_filter(),
212-
self.expected_row_group_matched_by_bloom_filter,
213-
"mismatched row_groups_matched_bloom_filter",
214-
);
215-
assert_eq!(
216-
output.row_groups_pruned_bloom_filter(),
217-
self.expected_row_group_pruned_by_bloom_filter,
218-
"mismatched row_groups_pruned_bloom_filter",
219-
);
220210
assert_eq!(
221211
output.limit_pruned_row_groups(),
222212
self.expected_limit_pruned_row_groups,
@@ -1746,12 +1736,13 @@ async fn test_limit_pruning() -> datafusion_common::error::Result<()> {
17461736
// So 3 row groups are effectively pruned due to limit pruning.
17471737

17481738
let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)]));
1749-
let query = "SELECT c1 FROM t WHERE c1 > 0 LIMIT 2";
1739+
let query = "SELECT c1 FROM t WHERE c1 >= 0 LIMIT 2";
17501740

17511741
let batches = vec![
1752-
make_i32_batch("c1", vec![1, 2])?, // RG0: Fully matched, 2 rows
1753-
make_i32_batch("c1", vec![3, 4])?, // RG1: Fully matched, 2 rows
1754-
make_i32_batch("c1", vec![5, 6])?, // RG2: Fully matched, 2 rows
1742+
make_i32_batch("c1", vec![0, -2])?,
1743+
make_i32_batch("c1", vec![0, 0])?, // RG0: Fully matched, 2 rows
1744+
make_i32_batch("c1", vec![0, 0])?, // RG1: Fully matched, 2 rows
1745+
make_i32_batch("c1", vec![0, 0])?, // RG2: Fully matched, 2 rows
17551746
make_i32_batch("c1", vec![-1, 0])?, // RG3: Pruned by statistics, 0 rows
17561747
];
17571748

@@ -1761,11 +1752,9 @@ async fn test_limit_pruning() -> datafusion_common::error::Result<()> {
17611752
.with_expected_errors(Some(0))
17621753
.with_expected_rows(2)
17631754
.with_pruned_files(Some(0))
1764-
.with_matched_by_bloom_filter(Some(0))
1765-
.with_pruned_by_bloom_filter(Some(0))
1766-
.with_matched_by_stats(Some(3)) // RG0, RG1, RG2 are matched by stats (c1 > 0)
1767-
.with_pruned_by_stats(Some(1)) // RG3 is pruned by stats (c1 = [-1, 0] does not satisfy c1 > 0)
1768-
.with_limit_pruned_row_groups(Some(2)) // RG1, RG2 are pruned by limit. (RG3 is already pruned by stats)
1755+
.with_matched_by_stats(Some(5)) // RG0, RG1, RG2 are matched by stats (c1 > 0)
1756+
.with_pruned_by_stats(Some(0)) // RG3 is pruned by stats (c1 = [-1, 0] does not satisfy c1 > 0)
1757+
.with_limit_pruned_row_groups(Some(4)) // RG1, RG2 are pruned by limit. (RG3 is already pruned by stats)
17691758
.test_row_group_prune_with_custom_data(schema, batches, 2)
17701759
.await;
17711760

datafusion/datasource-parquet/src/row_group_filter.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use datafusion_common::pruning::PruningStatistics;
2525
use datafusion_common::{Column, Result, ScalarValue};
2626
use datafusion_datasource::FileRange;
2727
use datafusion_physical_expr::expressions::NotExpr;
28+
use datafusion_physical_expr::PhysicalExprSimplifier;
2829
use datafusion_pruning::PruningPredicate;
2930
use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
3031
use parquet::arrow::parquet_column;
@@ -205,6 +206,10 @@ impl RowGroupAccessPlanFilter {
205206
// Use NotExpr to create the inverted predicate
206207
let inverted_expr =
207208
Arc::new(NotExpr::new(predicate.orig_expr().clone()));
209+
// Simplify the NOT expression (e.g., NOT(c1 = 0) -> c1 != 0)
210+
// before building the pruning predicate
211+
let mut simplifier = PhysicalExprSimplifier::new(arrow_schema);
212+
let inverted_expr = simplifier.simplify(inverted_expr).unwrap();
208213
if let Ok(inverted_predicate) = PruningPredicate::try_new(
209214
inverted_expr,
210215
predicate.schema().clone(),

datafusion/physical-expr/src/simplifier/mod.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@ use datafusion_common::{
2424
};
2525
use std::sync::Arc;
2626

27-
use crate::PhysicalExpr;
27+
use crate::{simplifier::not::simplify_not_expr_recursive, PhysicalExpr};
2828

29+
pub mod not;
2930
pub mod unwrap_cast;
3031

3132
/// Simplifies physical expressions by applying various optimizations
@@ -56,6 +57,11 @@ impl<'a> TreeNodeRewriter for PhysicalExprSimplifier<'a> {
5657
type Node = Arc<dyn PhysicalExpr>;
5758

5859
fn f_up(&mut self, node: Self::Node) -> Result<Transformed<Self::Node>> {
60+
// Apply NOT expression simplification first
61+
let not_simplified = simplify_not_expr_recursive(node, self.schema)?;
62+
let node = not_simplified.data;
63+
let transformed = not_simplified.transformed;
64+
5965
// Apply unwrap cast optimization
6066
#[cfg(test)]
6167
let original_type = node.data_type(self.schema).unwrap();
@@ -66,7 +72,12 @@ impl<'a> TreeNodeRewriter for PhysicalExprSimplifier<'a> {
6672
original_type,
6773
"Simplified expression should have the same data type as the original"
6874
);
69-
Ok(unwrapped)
75+
// Combine transformation results
76+
let final_transformed = transformed || unwrapped.transformed;
77+
Ok(Transformed::new_transformed(
78+
unwrapped.data,
79+
final_transformed,
80+
))
7081
}
7182
}
7283

0 commit comments

Comments
 (0)