Skip to content

[SPARK-55702][SQL] Support filter predicate in window aggregate functions#54501

Open
cloud-fan wants to merge 1 commit intoapache:masterfrom
cloud-fan:window-agg-filter
Open

[SPARK-55702][SQL] Support filter predicate in window aggregate functions#54501
cloud-fan wants to merge 1 commit intoapache:masterfrom
cloud-fan:window-agg-filter

Conversation

@cloud-fan
Copy link
Contributor

@cloud-fan cloud-fan commented Feb 26, 2026

What changes were proposed in this pull request?

This PR adds support for the FILTER (WHERE ...) clause on aggregate functions used within window expressions. Previously, Spark rejected this with an AnalysisException ("Window aggregate function with filter predicate is not supported yet.").

The changes are:

  1. Remove the analysis rejection in Analyzer.scala that blocked FILTER in window aggregates, and extract filter expressions alongside aggregate function children.
  2. Add filter support to AggregateProcessor so that AggregateExpression.filter is honored during window frame evaluation:
    • For DeclarativeAggregate: update expressions are wrapped with If(filter, updateExpr, bufferAttr) to conditionally skip rows.
    • For ImperativeAggregate: the filter predicate is evaluated before calling update().
  3. Pass filter expressions from WindowEvaluatorFactoryBase to AggregateProcessor.

Why are the changes needed?

The SQL standard allows FILTER on aggregate functions in window contexts. Other databases (PostgreSQL, etc.) support this. Spark already supports FILTER for regular (non-window) aggregates but rejected it in window contexts.

Does this PR introduce any user-facing change?

Yes. Window aggregate expressions with FILTER now execute instead of throwing an AnalysisException. For example:

SELECT val, cate,
  sum(val) FILTER (WHERE val > 1) OVER (PARTITION BY cate ORDER BY val
    ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS running_sum_filtered
FROM testData

How was this patch tested?

Added 4 SQL test cases in window.sql covering:

  • Running sum with filter
  • first_value/last_value with filter (verifying no interference with NULL handling)
  • Multiple aggregates with different filters in the same window
  • Entire partition frame with filter

The existing test case (count(val) FILTER (WHERE val > 1) OVER(...)) now produces correct results instead of an error.

Was this patch authored or co-authored using generative AI tooling?

Yes.

Made with Cursor

@cloud-fan cloud-fan force-pushed the window-agg-filter branch 2 times, most recently from c1874cf to d4b1eaa Compare February 26, 2026 03:49
@cloud-fan
Copy link
Contributor Author

cc @peter-toth @viirya

Comment on lines 60 to 61
if (filters.length == functions.length) filters
else Array.fill(functions.length)(None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add an assert like assert(filters.isEmpty || filters.length == functions.length)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why we need the Array.empty default value for filters and I don't see how the sizes could differ so why not just assert(filters.length == functions.length) or change the contract to functionsAndFilters: Seq[(Expression, Option[Expression])].

filterOpt match {
case Some(filter) =>
updateExpressions ++= agg.updateExpressions.zip(agg.aggBufferAttributes).map {
case (updateExpr, attr) => If(filter, updateExpr, attr)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it mean filter will be evaluated multiple times? Maybe common expression evaluation helps.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's pretty much the same as interpreted version of HashAggregateExec: AggregationIterator

var i = 0
while (i < numImperatives) {
imperatives(i).update(buffer, input)
val shouldUpdate = imperativeFilters(i) match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like there is no common expression evaluation here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines +187 to +191
first_value(val) FILTER (WHERE cate = 'a') OVER(ORDER BY val_long
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS first_a,
last_value(val) FILTER (WHERE cate = 'a') OVER(ORDER BY val_long
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS last_a
FROM testData ORDER BY val_long, cate;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests all use either UNBOUNDED PRECEDING AND CURRENT ROW (growing frame) or no-frame PARTITION BY cate (full partition). There's no test for a true sliding window like:

sum(val) FILTER (WHERE val > 1) OVER (ORDER BY val_long ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING)

SELECT val, cate,
sum(val) FILTER (WHERE cate = 'a') OVER(PARTITION BY cate) AS total_sum_filtered
FROM testData ORDER BY cate, val;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No test for RANGE frame?

All new tests use ROW frames. There's no test for:

sum(val) FILTER (WHERE cate = 'a') OVER (ORDER BY val_long RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)

Remove the analysis check that rejected FILTER in window aggregates
and add filter support to AggregateProcessor in WindowExec so that
AggregateExpression.filter is honored during window frame evaluation.

For DeclarativeAggregate, the update expressions are wrapped with
If(filter, updateExpr, bufferAttr) to skip rows that don't match.
For ImperativeAggregate, the filter predicate is evaluated before
calling update().

Made-with: Cursor
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants