Skip to content

Commit d102659

Browse files
authored
ESQL: Introduce per agg filter (#113735)
Add support for aggregation scoped filters that work dynamically on the data in each group. | STATS success = COUNT(*) WHERE 200 <= code AND code < 300, redirect = COUNT(*) WHERE 300 <= code AND code < 400, client_err = COUNT(*) WHERE 400 <= code AND code < 500, server_err = COUNT(*) WHERE 500 <= code AND code < 600, total_count = COUNT(*) Implementation wise, the base AggregateFunction has been extended to allow a filter to be passed on. This is required to incorporate the filter as part of the aggregate equality/identify which would fail with the filter as an external component. As part of the process, the serialization for the existing aggregations had to be fixed so AggregateFunction implementations so that it delegates to their parent first.
1 parent 7ad1a0c commit d102659

File tree

57 files changed

+3181
-2113
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+3181
-2113
lines changed

docs/changelog/113735.yaml

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
pr: 113735
2+
summary: "ESQL: Introduce per agg filter"
3+
area: ES|QL
4+
type: feature
5+
issues: []
6+
highlight:
7+
title: "ESQL: Introduce per agg filter"
8+
body: |-
9+
Add support for aggregation scoped filters that work dynamically on the
10+
data in each group.
11+
12+
[source,esql]
13+
----
14+
| STATS success = COUNT(*) WHERE 200 <= code AND code < 300,
15+
redirect = COUNT(*) WHERE 300 <= code AND code < 400,
16+
client_err = COUNT(*) WHERE 400 <= code AND code < 500,
17+
server_err = COUNT(*) WHERE 500 <= code AND code < 600,
18+
total_count = COUNT(*)
19+
----
20+
21+
Implementation wise, the base AggregateFunction has been extended to
22+
allow a filter to be passed on. This is required to incorporate the
23+
filter as part of the aggregate equality/identity which would fail with
24+
the filter as an external component.
25+
As part of the process, the serialization for the existing aggregations
26+
had to be fixed so AggregateFunction implementations so that it
27+
delegates to their parent first.
28+
notable: true

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,7 @@ static TransportVersion def(int id) {
243243
public static final TransportVersion CHUNK_SENTENCE_OVERLAP_SETTING_ADDED = def(8_767_00_0);
244244
public static final TransportVersion OPT_IN_ESQL_CCS_EXECUTION_INFO = def(8_768_00_0);
245245
public static final TransportVersion QUERY_RULE_TEST_API = def(8_769_00_0);
246+
public static final TransportVersion ESQL_PER_AGGREGATE_FILTER = def(8_770_00_0);
246247

247248
/*
248249
* STOP! READ THIS FIRST! No, really,

x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/util/CollectionUtils.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,4 +79,19 @@ public static int mapSize(int size) {
7979
}
8080
return (int) (size / 0.75f + 1f);
8181
}
82+
83+
@SafeVarargs
84+
@SuppressWarnings("varargs")
85+
public static <T> List<T> nullSafeList(T... entries) {
86+
if (entries == null || entries.length == 0) {
87+
return emptyList();
88+
}
89+
List<T> list = new ArrayList<>(entries.length);
90+
for (T entry : entries) {
91+
if (entry != null) {
92+
list.add(entry);
93+
}
94+
}
95+
return list;
96+
}
8297
}

x-pack/plugin/esql/qa/testFixtures/src/main/resources/stats.csv-spec

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2290,3 +2290,186 @@ from employees
22902290
m:integer |a:double |x:integer
22912291
74999 |48249.0 |0
22922292
;
2293+
2294+
2295+
statsWithFiltering
2296+
required_capability: per_agg_filtering
2297+
from employees
2298+
| stats max = max(salary), max_f = max(salary) where salary < 50000, max_a = max(salary) where salary > 100,
2299+
min = min(salary), min_f = min(salary) where salary > 50000, min_a = min(salary) where salary > 100
2300+
;
2301+
2302+
max:integer |max_f:integer |max_a:integer | min:integer | min_f:integer | min_a:integer
2303+
74999 |49818 |74999 | 25324 | 50064 | 25324
2304+
;
2305+
2306+
statsWithEverythingFiltered
2307+
required_capability: per_agg_filtering
2308+
from employees
2309+
| stats max = max(salary), max_a = max(salary) where salary < 100,
2310+
min = min(salary), min_a = min(salary) where salary > 99999
2311+
;
2312+
2313+
max:integer |max_a:integer|min:integer | min_a:integer
2314+
74999 |null |25324 | null
2315+
;
2316+
2317+
statsWithNullFilter
2318+
required_capability: per_agg_filtering
2319+
from employees
2320+
| stats max = max(salary), max_a = max(salary) where null,
2321+
min = min(salary), min_a = min(salary) where to_string(null) == "abc"
2322+
;
2323+
2324+
max:integer |max_a:integer|min:integer | min_a:integer
2325+
74999 |null |25324 | null
2326+
;
2327+
2328+
statsWithBasicExpressionFiltered
2329+
required_capability: per_agg_filtering
2330+
from employees
2331+
| stats max = max(salary), max_f = max(salary) where salary < 50000,
2332+
min = min(salary), min_f = min(salary) where salary > 50000,
2333+
exp_p = max(salary) + 10000 where salary < 50000,
2334+
exp_m = min(salary) % 10000 where salary > 50000
2335+
;
2336+
2337+
max:integer |max_f:integer|min:integer | min_f:integer|exp_p:integer | exp_m:integer
2338+
74999 |49818 |25324 | 50064 |59818 | 64
2339+
;
2340+
2341+
statsWithExpressionOverFilters
2342+
required_capability: per_agg_filtering
2343+
from employees
2344+
| stats max = max(salary), max_f = max(salary) where salary < 50000,
2345+
min = min(salary), min_f = min(salary) where salary > 50000,
2346+
exp_gt = max(salary) - min(salary) where salary > 50000,
2347+
exp_lt = max(salary) - min(salary) where salary < 50000
2348+
2349+
;
2350+
2351+
max:integer |max_f:integer | min:integer | min_f:integer |exp_gt:integer | exp_lt:integer
2352+
74999 |49818 | 25324 | 50064 |24935 | 24494
2353+
;
2354+
2355+
2356+
statsWithExpressionOfExpressionsOverFilters
2357+
required_capability: per_agg_filtering
2358+
from employees
2359+
| stats max = max(salary + 1), max_f = max(salary + 2) where salary < 50000,
2360+
min = min(salary - 1), min_f = min(salary - 2) where salary > 50000,
2361+
exp_gt = max(salary + 3) - min(salary - 3) where salary > 50000,
2362+
exp_lt = max(salary + 4) - min(salary - 4) where salary < 50000
2363+
2364+
;
2365+
2366+
max:integer |max_f:integer | min:integer | min_f:integer |exp_gt:integer | exp_lt:integer
2367+
75000 |49820 | 25323 | 50062 |24941 | 24502
2368+
;
2369+
2370+
statsWithSubstitutedExpressionOverFilters
2371+
required_capability: per_agg_filtering
2372+
from employees
2373+
| stats sum = sum(salary), s_l = sum(salary) where salary < 50000, s_u = sum(salary) where salary > 50000,
2374+
count = count(salary), c_l = count(salary) where salary < 50000, c_u = count(salary) where salary > 50000,
2375+
avg = round(avg(salary), 2), a_l = round(avg(salary), 2) where salary < 50000, a_u = round(avg(salary),2) where salary > 50000
2376+
;
2377+
2378+
sum:l |s_l:l | s_u:l | count:l |c_l:l |c_u:l |avg:double |a_l:double | a_u:double
2379+
4824855 |2220951 | 2603904 | 100 |58 |42 |48248.55 |38292.26 | 61997.71
2380+
;
2381+
2382+
2383+
statsWithFilterAndGroupBy
2384+
required_capability: per_agg_filtering
2385+
from employees
2386+
| stats m = max(height),
2387+
m_f = max(height + 1) where gender == "M" OR is_rehired is null
2388+
BY gender, is_rehired
2389+
| sort gender, is_rehired
2390+
;
2391+
2392+
m:d |m_f:d |gender:s|is_rehired:bool
2393+
2.1 |null |F |false
2394+
2.1 |null |F |true
2395+
1.85|2.85 |F |null
2396+
2.1 |3.1 |M |false
2397+
2.1 |3.1 |M |true
2398+
2.01|3.01 |M |null
2399+
2.06|null |null |false
2400+
1.97|null |null |true
2401+
1.99|2.99 |null |null
2402+
;
2403+
2404+
statsWithFilterOnGroupBy
2405+
required_capability: per_agg_filtering
2406+
from employees
2407+
| stats m_f = max(height) where gender == "M" BY gender
2408+
| sort gender
2409+
;
2410+
2411+
m_f:d |gender:s
2412+
null |F
2413+
2.1 |M
2414+
null |null
2415+
;
2416+
2417+
statsWithGroupByLiteral
2418+
required_capability: per_agg_filtering
2419+
from employees
2420+
| stats m = max(languages) by salary = 2
2421+
;
2422+
2423+
m:i |salary:i
2424+
5 |2
2425+
;
2426+
2427+
2428+
statsWithFilterOnSameColumn
2429+
required_capability: per_agg_filtering
2430+
from employees
2431+
| stats m = max(languages), m_f = max(languages) where salary > 50000 by salary = 2
2432+
| sort salary
2433+
;
2434+
2435+
m:i |m_f:i |salary:i
2436+
5 |null |2
2437+
;
2438+
2439+
# the query is reused below in a multi-stats
2440+
statsWithFilteringAndGrouping
2441+
required_capability: per_agg_filtering
2442+
from employees
2443+
| stats c = count(), c_f = count(languages) where l > 1,
2444+
m_f = max(height) where salary > 50000
2445+
by l = languages
2446+
| sort c
2447+
;
2448+
2449+
c:l |c_f:l |m_f:d |l:i
2450+
10 |0 |2.08 |null
2451+
15 |0 |2.06 |1
2452+
17 |17 |2.1 |3
2453+
18 |18 |1.83 |4
2454+
19 |19 |2.03 |2
2455+
21 |21 |2.1 |5
2456+
;
2457+
2458+
multiStatsWithFiltering
2459+
required_capability: per_agg_filtering
2460+
from employees
2461+
| stats c = count(), c_f = count(languages) where l > 1,
2462+
m_f = max(height) where salary > 50000
2463+
by l = languages
2464+
| stats c2 = count(), c2_f = count() where m_f > 2.06 , m2 = max(l), m2_f = max(l) where l > 1 by c
2465+
| sort c
2466+
;
2467+
2468+
c2:l |c2_f:l |m2:i |m2_f:i |c:l
2469+
1 |1 |null |null |10
2470+
1 |0 |1 |null |15
2471+
1 |1 |3 |3 |17
2472+
1 |0 |4 |4 |18
2473+
1 |0 |2 |2 |19
2474+
1 |1 |5 |5 |21
2475+
;

x-pack/plugin/esql/src/main/antlr/EsqlBaseLexer.g4

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ SLASH : '/';
209209
PERCENT : '%';
210210

211211
MATCH : 'match';
212+
NESTED_WHERE : {this.isDevVersion()}? WHERE -> type(WHERE);
212213

213214
NAMED_OR_POSITIONAL_PARAM
214215
: PARAM (LETTER | UNDERSCORE) UNQUOTED_ID_BODY*

x-pack/plugin/esql/src/main/antlr/EsqlBaseParser.g4

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -123,17 +123,15 @@ fields
123123
;
124124

125125
field
126-
: booleanExpression
127-
| qualifiedName ASSIGN booleanExpression
126+
: (qualifiedName ASSIGN)? booleanExpression
128127
;
129128

130129
fromCommand
131130
: FROM indexPattern (COMMA indexPattern)* metadata?
132131
;
133132

134133
indexPattern
135-
: clusterString COLON indexString
136-
| indexString
134+
: (clusterString COLON)? indexString
137135
;
138136

139137
clusterString
@@ -159,15 +157,23 @@ deprecated_metadata
159157
;
160158

161159
metricsCommand
162-
: DEV_METRICS indexPattern (COMMA indexPattern)* aggregates=fields? (BY grouping=fields)?
160+
: DEV_METRICS indexPattern (COMMA indexPattern)* aggregates=aggFields? (BY grouping=fields)?
163161
;
164162

165163
evalCommand
166164
: EVAL fields
167165
;
168166

169167
statsCommand
170-
: STATS stats=fields? (BY grouping=fields)?
168+
: STATS stats=aggFields? (BY grouping=fields)?
169+
;
170+
171+
aggFields
172+
: aggField (COMMA aggField)*
173+
;
174+
175+
aggField
176+
: field {this.isDevVersion()}? (WHERE booleanExpression)?
171177
;
172178

173179
qualifiedName
@@ -316,5 +322,5 @@ lookupCommand
316322
;
317323

318324
inlinestatsCommand
319-
: DEV_INLINESTATS stats=fields (BY grouping=fields)?
325+
: DEV_INLINESTATS stats=aggFields (BY grouping=fields)?
320326
;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,12 @@ public enum Cap {
370370
/**
371371
* Fix sorting not allowed on _source and counters.
372372
*/
373-
SORTING_ON_SOURCE_AND_COUNTERS_FORBIDDEN;
373+
SORTING_ON_SOURCE_AND_COUNTERS_FORBIDDEN,
374+
375+
/**
376+
* Allow filter per individual aggregation.
377+
*/
378+
PER_AGG_FILTERING;
374379

375380
private final boolean snapshotOnly;
376381
private final FeatureFlag featureFlag;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -488,6 +488,7 @@ private LogicalPlan resolveStats(Stats stats, List<Attribute> childrenOutput) {
488488
newAggregates.add(agg);
489489
}
490490

491+
// TODO: remove this when Stats interface is removed
491492
stats = changed.get() ? stats.with(stats.child(), groupings, newAggregates) : stats;
492493
}
493494

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Verifier.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.xpack.esql.core.util.Holder;
3131
import org.elasticsearch.xpack.esql.expression.function.UnsupportedAttribute;
3232
import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction;
33+
import org.elasticsearch.xpack.esql.expression.function.aggregate.FilteredExpression;
3334
import org.elasticsearch.xpack.esql.expression.function.aggregate.Rate;
3435
import org.elasticsearch.xpack.esql.expression.function.fulltext.FullTextFunction;
3536
import org.elasticsearch.xpack.esql.expression.function.fulltext.Match;
@@ -308,6 +309,29 @@ private static void checkInvalidNamedExpressionUsage(
308309
Set<Failure> failures,
309310
int level
310311
) {
312+
// unwrap filtered expression
313+
if (e instanceof FilteredExpression fe) {
314+
e = fe.delegate();
315+
// make sure they work on aggregate functions
316+
if (e.anyMatch(AggregateFunction.class::isInstance) == false) {
317+
Expression filter = fe.filter();
318+
failures.add(fail(filter, "WHERE clause allowed only for aggregate functions, none found in [{}]", fe.sourceText()));
319+
}
320+
// but that the filter doesn't use grouping or aggregate functions
321+
fe.filter().forEachDown(c -> {
322+
if (c instanceof AggregateFunction af) {
323+
failures.add(
324+
fail(af, "cannot use aggregate function [{}] in aggregate WHERE clause [{}]", af.sourceText(), fe.sourceText())
325+
);
326+
}
327+
// check the bucketing function against the group
328+
else if (c instanceof GroupingFunction gf) {
329+
if (Expressions.anyMatch(groups, ex -> ex instanceof Alias a && a.child().semanticEquals(gf)) == false) {
330+
failures.add(fail(gf, "can only use grouping function [{}] part of the BY clause", gf.sourceText()));
331+
}
332+
}
333+
});
334+
}
311335
// found an aggregate, constant or a group, bail out
312336
if (e instanceof AggregateFunction af) {
313337
af.field().forEachDown(AggregateFunction.class, f -> {
@@ -319,7 +343,7 @@ private static void checkInvalidNamedExpressionUsage(
319343
} else if (e instanceof GroupingFunction gf) {
320344
// optimizer will later unroll expressions with aggs and non-aggs with a grouping function into an EVAL, but that will no longer
321345
// be verified (by check above in checkAggregate()), so do it explicitly here
322-
if (groups.stream().anyMatch(ex -> ex instanceof Alias a && a.child().semanticEquals(gf)) == false) {
346+
if (Expressions.anyMatch(groups, ex -> ex instanceof Alias a && a.child().semanticEquals(gf)) == false) {
323347
failures.add(fail(gf, "can only use grouping function [{}] part of the BY clause", gf.sourceText()));
324348
} else if (level == 0) {
325349
addFailureOnGroupingUsedNakedInAggs(failures, gf, "function");

0 commit comments

Comments
 (0)