Skip to content

Commit 4142f3f

Browse files
haohuaijinalamb
authored andcommitted
fix: interval analysis error when have two filterexec that inner filter proves zero selectivity (apache#20743)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes apache#123` indicates that this PR will close issue apache#123. --> - Closes apache#20742 ## Rationale for this change - see apache#20742 ## What changes are included in this PR? In `collect_new_statistics`, when a filter proves no rows can match, use a typed null (e.g., ScalarValue::Int32(None)) instead of untyped ScalarValue::Null for column min/max/sum values. The column's data type is looked up from the schema so that downstream interval analysis can still intersect intervals of the same type. ## Are these changes tested? add one test case ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent 7698fdc commit 4142f3f

File tree

2 files changed

+93
-22
lines changed

2 files changed

+93
-22
lines changed

datafusion/core/tests/physical_optimizer/partition_statistics.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -386,17 +386,17 @@ mod test {
386386
column_statistics: vec![
387387
ColumnStatistics {
388388
null_count: Precision::Exact(0),
389-
max_value: Precision::Exact(ScalarValue::Null),
390-
min_value: Precision::Exact(ScalarValue::Null),
391-
sum_value: Precision::Exact(ScalarValue::Null),
389+
max_value: Precision::Exact(ScalarValue::Int32(None)),
390+
min_value: Precision::Exact(ScalarValue::Int32(None)),
391+
sum_value: Precision::Exact(ScalarValue::Int32(None)),
392392
distinct_count: Precision::Exact(0),
393393
byte_size: Precision::Exact(16),
394394
},
395395
ColumnStatistics {
396396
null_count: Precision::Exact(0),
397-
max_value: Precision::Exact(ScalarValue::Null),
398-
min_value: Precision::Exact(ScalarValue::Null),
399-
sum_value: Precision::Exact(ScalarValue::Null),
397+
max_value: Precision::Exact(ScalarValue::Date32(None)),
398+
min_value: Precision::Exact(ScalarValue::Date32(None)),
399+
sum_value: Precision::Exact(ScalarValue::Date32(None)),
400400
distinct_count: Precision::Exact(0),
401401
byte_size: Precision::Exact(16), // 4 rows * 4 bytes (Date32)
402402
},
@@ -415,17 +415,17 @@ mod test {
415415
column_statistics: vec![
416416
ColumnStatistics {
417417
null_count: Precision::Exact(0),
418-
max_value: Precision::Exact(ScalarValue::Null),
419-
min_value: Precision::Exact(ScalarValue::Null),
420-
sum_value: Precision::Exact(ScalarValue::Null),
418+
max_value: Precision::Exact(ScalarValue::Int32(None)),
419+
min_value: Precision::Exact(ScalarValue::Int32(None)),
420+
sum_value: Precision::Exact(ScalarValue::Int32(None)),
421421
distinct_count: Precision::Exact(0),
422422
byte_size: Precision::Exact(8),
423423
},
424424
ColumnStatistics {
425425
null_count: Precision::Exact(0),
426-
max_value: Precision::Exact(ScalarValue::Null),
427-
min_value: Precision::Exact(ScalarValue::Null),
428-
sum_value: Precision::Exact(ScalarValue::Null),
426+
max_value: Precision::Exact(ScalarValue::Date32(None)),
427+
min_value: Precision::Exact(ScalarValue::Date32(None)),
428+
sum_value: Precision::Exact(ScalarValue::Date32(None)),
429429
distinct_count: Precision::Exact(0),
430430
byte_size: Precision::Exact(8), // 2 rows * 4 bytes (Date32)
431431
},

datafusion/physical-plan/src/filter.rs

Lines changed: 81 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,7 @@ impl FilterExec {
337337
let total_byte_size = total_byte_size.with_estimated_selectivity(selectivity);
338338

339339
let column_statistics = collect_new_statistics(
340+
schema,
340341
&input_stats.column_statistics,
341342
analysis_ctx.boundaries,
342343
);
@@ -757,6 +758,7 @@ impl EmbeddedProjection for FilterExec {
757758
/// is adjusted by using the next/previous value for its data type to convert
758759
/// it into a closed bound.
759760
fn collect_new_statistics(
761+
schema: &SchemaRef,
760762
input_column_stats: &[ColumnStatistics],
761763
analysis_boundaries: Vec<ExprBoundaries>,
762764
) -> Vec<ColumnStatistics> {
@@ -773,12 +775,17 @@ fn collect_new_statistics(
773775
},
774776
)| {
775777
let Some(interval) = interval else {
776-
// If the interval is `None`, we can say that there are no rows:
778+
// If the interval is `None`, we can say that there are no rows.
779+
// Use a typed null to preserve the column's data type, so that
780+
// downstream interval analysis can still intersect intervals
781+
// of the same type.
782+
let typed_null = ScalarValue::try_from(schema.field(idx).data_type())
783+
.unwrap_or(ScalarValue::Null);
777784
return ColumnStatistics {
778785
null_count: Precision::Exact(0),
779-
max_value: Precision::Exact(ScalarValue::Null),
780-
min_value: Precision::Exact(ScalarValue::Null),
781-
sum_value: Precision::Exact(ScalarValue::Null),
786+
max_value: Precision::Exact(typed_null.clone()),
787+
min_value: Precision::Exact(typed_null.clone()),
788+
sum_value: Precision::Exact(typed_null),
782789
distinct_count: Precision::Exact(0),
783790
byte_size: input_column_stats[idx].byte_size,
784791
};
@@ -1471,17 +1478,17 @@ mod tests {
14711478
statistics.column_statistics,
14721479
vec![
14731480
ColumnStatistics {
1474-
min_value: Precision::Exact(ScalarValue::Null),
1475-
max_value: Precision::Exact(ScalarValue::Null),
1476-
sum_value: Precision::Exact(ScalarValue::Null),
1481+
min_value: Precision::Exact(ScalarValue::Int32(None)),
1482+
max_value: Precision::Exact(ScalarValue::Int32(None)),
1483+
sum_value: Precision::Exact(ScalarValue::Int32(None)),
14771484
distinct_count: Precision::Exact(0),
14781485
null_count: Precision::Exact(0),
14791486
byte_size: Precision::Absent,
14801487
},
14811488
ColumnStatistics {
1482-
min_value: Precision::Exact(ScalarValue::Null),
1483-
max_value: Precision::Exact(ScalarValue::Null),
1484-
sum_value: Precision::Exact(ScalarValue::Null),
1489+
min_value: Precision::Exact(ScalarValue::Int32(None)),
1490+
max_value: Precision::Exact(ScalarValue::Int32(None)),
1491+
sum_value: Precision::Exact(ScalarValue::Int32(None)),
14851492
distinct_count: Precision::Exact(0),
14861493
null_count: Precision::Exact(0),
14871494
byte_size: Precision::Absent,
@@ -1492,6 +1499,70 @@ mod tests {
14921499
Ok(())
14931500
}
14941501

1502+
/// Regression test: stacking two FilterExecs where the inner filter
1503+
/// proves zero selectivity should not panic with a type mismatch
1504+
/// during interval intersection.
1505+
///
1506+
/// Previously, when a filter proved no rows could match, the column
1507+
/// statistics used untyped `ScalarValue::Null` (data type `Null`).
1508+
/// If an outer FilterExec then tried to analyze its own predicate
1509+
/// against those statistics, `Interval::intersect` would fail with:
1510+
/// "Only intervals with the same data type are intersectable, lhs:Null, rhs:Int32"
1511+
#[tokio::test]
1512+
async fn test_nested_filter_with_zero_selectivity_inner() -> Result<()> {
1513+
// Inner table: a: [1, 100], b: [1, 3]
1514+
let schema = Schema::new(vec![
1515+
Field::new("a", DataType::Int32, false),
1516+
Field::new("b", DataType::Int32, false),
1517+
]);
1518+
let input = Arc::new(StatisticsExec::new(
1519+
Statistics {
1520+
num_rows: Precision::Inexact(1000),
1521+
total_byte_size: Precision::Inexact(4000),
1522+
column_statistics: vec![
1523+
ColumnStatistics {
1524+
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1525+
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1526+
..Default::default()
1527+
},
1528+
ColumnStatistics {
1529+
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1530+
max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1531+
..Default::default()
1532+
},
1533+
],
1534+
},
1535+
schema,
1536+
));
1537+
1538+
// Inner filter: a > 200 (impossible given a max=100 → zero selectivity)
1539+
let inner_predicate: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
1540+
Arc::new(Column::new("a", 0)),
1541+
Operator::Gt,
1542+
Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
1543+
));
1544+
let inner_filter: Arc<dyn ExecutionPlan> =
1545+
Arc::new(FilterExec::try_new(inner_predicate, input)?);
1546+
1547+
// Outer filter: a = 50
1548+
// Before the fix, this would panic because the inner filter's
1549+
// zero-selectivity statistics produced Null-typed intervals for
1550+
// column `a`, which couldn't intersect with the Int32 literal.
1551+
let outer_predicate: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
1552+
Arc::new(Column::new("a", 0)),
1553+
Operator::Eq,
1554+
Arc::new(Literal::new(ScalarValue::Int32(Some(50)))),
1555+
));
1556+
let outer_filter: Arc<dyn ExecutionPlan> =
1557+
Arc::new(FilterExec::try_new(outer_predicate, inner_filter)?);
1558+
1559+
// Should succeed without error
1560+
let statistics = outer_filter.partition_statistics(None)?;
1561+
assert_eq!(statistics.num_rows, Precision::Inexact(0));
1562+
1563+
Ok(())
1564+
}
1565+
14951566
#[tokio::test]
14961567
async fn test_filter_statistics_more_inputs() -> Result<()> {
14971568
let schema = Schema::new(vec![

0 commit comments

Comments
 (0)