Skip to content

Commit 3ebf259

Browse files
committed
Add more tests
1 parent 794f121 commit 3ebf259

File tree

2 files changed

+144
-45
lines changed
  • datafusion
    • core/tests/physical_optimizer/filter_pushdown
    • physical-plan/src/aggregates

2 files changed

+144
-45
lines changed

datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs

Lines changed: 142 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1956,73 +1956,55 @@ async fn test_aggregate_filter_pushdown() {
19561956
);
19571957
}
19581958

1959-
#[tokio::test]
1960-
async fn test_no_pushdown_aggregate_filter_on_non_grouping_column() {
1961-
// Test that filters on non-grouping columns (like aggregate results) are NOT pushed through
1962-
// Simulates: SELECT a, COUNT(b) as cnt FROM table GROUP BY a HAVING cnt > 5
1963-
// The filter on 'cnt' cannot be pushed down because it's an aggregate result, not a grouping column
1959+
#[test]
1960+
fn test_no_pushdown_aggregate_filter_on_non_grouping_column() {
1961+
// Test that filters on non-first grouping columns are still pushed down
1962+
// SELECT a, b, count(*) as cnt FROM table GROUP BY a, b HAVING b = 'bar'
1963+
// The filter is on 'b' (second grouping column), should push down
1964+
let scan = TestScanBuilder::new(schema()).with_support(true).build();
19641965

1965-
let batches =
1966+
let aggregate_expr =
19661967
vec![
1967-
record_batch!(("a", Utf8, ["x", "y"]), ("b", Utf8, ["foo", "bar"])).unwrap(),
1968+
AggregateExprBuilder::new(count_udaf(), vec![col("c", &schema()).unwrap()])
1969+
.schema(schema())
1970+
.alias("cnt")
1971+
.build()
1972+
.map(Arc::new)
1973+
.unwrap(),
19681974
];
19691975

1970-
let scan = TestScanBuilder::new(schema())
1971-
.with_support(true)
1972-
.with_batches(batches)
1973-
.build();
1974-
1975-
// Create an aggregate: GROUP BY a with COUNT(b)
1976-
let group_by = PhysicalGroupBy::new_single(vec![(
1977-
col("a", &schema()).unwrap(),
1978-
"a".to_string(),
1979-
)]);
1980-
1981-
// Add COUNT aggregate
1982-
let count_expr =
1983-
AggregateExprBuilder::new(count_udaf(), vec![col("b", &schema()).unwrap()])
1984-
.schema(schema())
1985-
.alias("count")
1986-
.build()
1987-
.unwrap();
1976+
let group_by = PhysicalGroupBy::new_single(vec![
1977+
(col("a", &schema()).unwrap(), "a".to_string()),
1978+
(col("b", &schema()).unwrap(), "b".to_string()),
1979+
]);
19881980

19891981
let aggregate = Arc::new(
19901982
AggregateExec::try_new(
1991-
AggregateMode::Partial,
1983+
AggregateMode::Final,
19921984
group_by,
1993-
vec![count_expr.into()],
1985+
aggregate_expr.clone(),
19941986
vec![None],
1995-
Arc::clone(&scan),
1987+
scan,
19961988
schema(),
19971989
)
19981990
.unwrap(),
19991991
);
20001992

2001-
// Add a filter on the aggregate output column
2002-
// This simulates filtering on COUNT result, which should NOT be pushed through
2003-
let agg_schema = aggregate.schema();
2004-
let predicate = Arc::new(BinaryExpr::new(
2005-
Arc::new(Column::new_with_schema("count[count]", &agg_schema).unwrap()),
2006-
Operator::Gt,
2007-
Arc::new(Literal::new(ScalarValue::Int64(Some(5)))),
2008-
));
2009-
let plan = Arc::new(FilterExec::try_new(predicate, aggregate).unwrap())
2010-
as Arc<dyn ExecutionPlan>;
1993+
let predicate = col_lit_predicate("b", "bar", &schema());
1994+
let plan = Arc::new(FilterExec::try_new(predicate, aggregate).unwrap());
20111995

2012-
// The filter should NOT be pushed through the aggregate since it references a non-grouping column
20131996
insta::assert_snapshot!(
2014-
OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new(), true),
1997+
OptimizationTest::new(plan, FilterPushdown::new(), true),
20151998
@r"
20161999
OptimizationTest:
20172000
input:
2018-
- FilterExec: count[count]@1 > 5
2019-
- AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count]
2001+
- FilterExec: b@1 = bar
2002+
- AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], aggr=[cnt]
20202003
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
20212004
output:
20222005
Ok:
2023-
- FilterExec: count[count]@1 > 5
2024-
- AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count]
2025-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
2006+
- AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], aggr=[cnt], ordering_mode=PartiallySorted([1])
2007+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=b@1 = bar
20262008
"
20272009
);
20282010
}
@@ -2165,3 +2147,118 @@ fn test_pushdown_grouping_sets_filter_on_common_column() {
21652147
"
21662148
);
21672149
}
2150+
2151+
#[test]
2152+
fn test_pushdown_with_empty_group_by() {
2153+
// Test that filters can be pushed down when GROUP BY is empty (no grouping columns)
2154+
// SELECT count(*) as cnt FROM table WHERE a = 'foo'
2155+
// There are no grouping columns, so the filter should still push down
2156+
let scan = TestScanBuilder::new(schema()).with_support(true).build();
2157+
2158+
let aggregate_expr =
2159+
vec![
2160+
AggregateExprBuilder::new(count_udaf(), vec![col("c", &schema()).unwrap()])
2161+
.schema(schema())
2162+
.alias("cnt")
2163+
.build()
2164+
.map(Arc::new)
2165+
.unwrap(),
2166+
];
2167+
2168+
// Empty GROUP BY - no grouping columns
2169+
let group_by = PhysicalGroupBy::new_single(vec![]);
2170+
2171+
let aggregate = Arc::new(
2172+
AggregateExec::try_new(
2173+
AggregateMode::Final,
2174+
group_by,
2175+
aggregate_expr.clone(),
2176+
vec![None],
2177+
scan,
2178+
schema(),
2179+
)
2180+
.unwrap(),
2181+
);
2182+
2183+
// Filter on 'a'
2184+
let predicate = col_lit_predicate("a", "foo", &schema());
2185+
let plan = Arc::new(FilterExec::try_new(predicate, aggregate).unwrap());
2186+
2187+
// The filter should be pushed down even with empty GROUP BY
2188+
insta::assert_snapshot!(
2189+
OptimizationTest::new(plan, FilterPushdown::new(), true),
2190+
@r"
2191+
OptimizationTest:
2192+
input:
2193+
- FilterExec: a@0 = foo
2194+
- AggregateExec: mode=Final, gby=[], aggr=[cnt]
2195+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
2196+
output:
2197+
Ok:
2198+
- AggregateExec: mode=Final, gby=[], aggr=[cnt]
2199+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo
2200+
"
2201+
);
2202+
}
2203+
2204+
#[test]
2205+
fn test_pushdown_with_computed_grouping_key() {
2206+
// Test filter pushdown with computed grouping expression
2207+
// SELECT (c + 1.0) as c_plus_1, count(*) FROM table WHERE c > 5.0 GROUP BY (c + 1.0)
2208+
2209+
let scan = TestScanBuilder::new(schema()).with_support(true).build();
2210+
2211+
let predicate = Arc::new(BinaryExpr::new(
2212+
col("c", &schema()).unwrap(),
2213+
Operator::Gt,
2214+
Arc::new(Literal::new(ScalarValue::Float64(Some(5.0)))),
2215+
)) as Arc<dyn PhysicalExpr>;
2216+
let filter = Arc::new(FilterExec::try_new(predicate, scan).unwrap());
2217+
2218+
let aggregate_expr =
2219+
vec![
2220+
AggregateExprBuilder::new(count_udaf(), vec![col("a", &schema()).unwrap()])
2221+
.schema(schema())
2222+
.alias("cnt")
2223+
.build()
2224+
.map(Arc::new)
2225+
.unwrap(),
2226+
];
2227+
2228+
let c_plus_one = Arc::new(BinaryExpr::new(
2229+
col("c", &schema()).unwrap(),
2230+
Operator::Plus,
2231+
Arc::new(Literal::new(ScalarValue::Float64(Some(1.0)))),
2232+
)) as Arc<dyn PhysicalExpr>;
2233+
2234+
let group_by =
2235+
PhysicalGroupBy::new_single(vec![(c_plus_one, "c_plus_1".to_string())]);
2236+
2237+
let plan = Arc::new(
2238+
AggregateExec::try_new(
2239+
AggregateMode::Final,
2240+
group_by,
2241+
aggregate_expr.clone(),
2242+
vec![None],
2243+
filter,
2244+
schema(),
2245+
)
2246+
.unwrap(),
2247+
);
2248+
2249+
// The filter should be pushed down because 'c' is extracted from the grouping expression (c + 1.0)
2250+
insta::assert_snapshot!(
2251+
OptimizationTest::new(plan, FilterPushdown::new(), true),
2252+
@r"
2253+
OptimizationTest:
2254+
input:
2255+
- AggregateExec: mode=Final, gby=[c@2 + 1 as c_plus_1], aggr=[cnt]
2256+
- FilterExec: c@2 > 5
2257+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
2258+
output:
2259+
Ok:
2260+
- AggregateExec: mode=Final, gby=[c@2 + 1 as c_plus_1], aggr=[cnt]
2261+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=c@2 > 5
2262+
"
2263+
);
2264+
}

datafusion/physical-plan/src/aggregates/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1032,6 +1032,8 @@ impl ExecutionPlan for AggregateExec {
10321032
CardinalityEffect::LowerEqual
10331033
}
10341034

1035+
/// Push down parent filters when possible (see implementation comment for details),
1036+
/// but do not introduce any new self filters.
10351037
fn gather_filters_for_pushdown(
10361038
&self,
10371039
_phase: FilterPushdownPhase,

0 commit comments

Comments
 (0)