Skip to content

Commit 1ccb58a

Browse files
authored
Support inline filter in time-series aggregation (#134403)
The following query should now work: ``` TS k8s | STATS tx = sum(avg_over_time(network.bytes_in)) WHERE pod == "one" BY cluster, hourly = bucket(@timestamp, 10 minute) | SORT hourly, cluster | LIMIT 10; ``` Note: The inline filter is not pushed down, so performance may be worse than with a pre-filter. And aggregation with an inline filter may produce groups for which the data does not match the filter.
1 parent bbfa270 commit 1ccb58a

File tree

8 files changed

+179
-4
lines changed

8 files changed

+179
-4
lines changed

x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries-avg-over-time.csv-spec

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,22 @@ tx:double | cluster:keyword | time_bucket:datetime
100100
493.5 | staging | 2024-05-10T00:20:00.000Z
101101
;
102102

103+
avg_over_time_with_inline_filtering
104+
required_capability: metrics_command
105+
TS k8s | STATS tx = sum(avg_over_time(network.bytes_in)) WHERE pod == "one" BY cluster, time_bucket = bucket(@timestamp, 10minute) | SORT time_bucket, cluster | LIMIT 10;
106+
107+
tx:double | cluster:keyword | time_bucket:datetime
108+
293.0 | prod | 2024-05-10T00:00:00.000Z
109+
482.6666666666667 | qa | 2024-05-10T00:00:00.000Z
110+
494.1666666666667 | staging | 2024-05-10T00:00:00.000Z
111+
601.5454545454545 | prod | 2024-05-10T00:10:00.000Z
112+
496.14285714285717 | qa | 2024-05-10T00:10:00.000Z
113+
441.6 | staging | 2024-05-10T00:10:00.000Z
114+
633.3333333333334 | prod | 2024-05-10T00:20:00.000Z
115+
440.0 | qa | 2024-05-10T00:20:00.000Z
116+
493.5 | staging | 2024-05-10T00:20:00.000Z
117+
;
118+
103119
avg_over_time_older_than_10d
104120
required_capability: metrics_command
105121
required_capability: avg_over_time

x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries-last-over-time.csv-spec

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,24 @@ tx:long | cluster:keyword | time_bucket:datetime
106106
238 | staging | 2024-05-10T00:20:00.000Z
107107
;
108108

109+
implicit_last_over_time_with_inline_filtering
110+
required_capability: metrics_command
111+
required_capability: implicit_last_over_time
112+
TS k8s | STATS tx = sum(network.bytes_in) WHERE pod == "one" BY cluster, time_bucket = bucket(@timestamp, 10minute) | SORT time_bucket, cluster | LIMIT 10;
113+
114+
tx:long | cluster:keyword | time_bucket:datetime
115+
3 | prod | 2024-05-10T00:00:00.000Z
116+
830 | qa | 2024-05-10T00:00:00.000Z
117+
753 | staging | 2024-05-10T00:00:00.000Z
118+
542 | prod | 2024-05-10T00:10:00.000Z
119+
187 | qa | 2024-05-10T00:10:00.000Z
120+
4 | staging | 2024-05-10T00:10:00.000Z
121+
931 | prod | 2024-05-10T00:20:00.000Z
122+
206 | qa | 2024-05-10T00:20:00.000Z
123+
238 | staging | 2024-05-10T00:20:00.000Z
124+
;
125+
126+
109127

110128
last_over_time_older_than_10d
111129
required_capability: metrics_command

x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries-rate.csv-spec

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,24 @@ rate_bytes_in:double | cluster:keyword | time_bucket:datetime
7373
2.210158359293873 | prod | 2024-05-10T00:20:00.000Z
7474
0.8955555555555565 | qa | 2024-05-10T00:20:00.000Z
7575
0.595 | staging | 2024-05-10T00:20:00.000Z
76+
;
77+
78+
rate_with_inline_filtering
79+
required_capability: metrics_command
80+
TS k8s
81+
| STATS rate_bytes_in = sum(rate(network.total_bytes_in)) WHERE pod == "one" BY cluster, time_bucket = bucket(@timestamp, 10minute)
82+
| SORT time_bucket, cluster | LIMIT 10;
7683

84+
rate_bytes_in:double | cluster:keyword | time_bucket:datetime
85+
4.0314581958195825 | prod | 2024-05-10T00:00:00.000Z
86+
9.955833333333333 | qa | 2024-05-10T00:00:00.000Z
87+
4.242445473251029 | staging | 2024-05-10T00:00:00.000Z
88+
11.188380281690138 | prod | 2024-05-10T00:10:00.000Z
89+
12.222592592592592 | qa | 2024-05-10T00:10:00.000Z
90+
3.050371490280777 | staging | 2024-05-10T00:10:00.000Z
91+
2.210158359293873 | prod | 2024-05-10T00:20:00.000Z
92+
0.8955555555555565 | qa | 2024-05-10T00:20:00.000Z
93+
0.595 | staging | 2024-05-10T00:20:00.000Z
7794
;
7895

7996
eval_on_rate

x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,24 @@ max(rate(network.total_bytes_in)):double | time_bucket:datetime | cluster:ke
201201
11.562737642585551 | 2024-05-10T00:10:00.000Z | prod
202202
;
203203

204+
maxRateWithInlineFilter
205+
required_capability: metrics_command
206+
TS k8s | STATS max_rate = max(rate(network.total_bytes_in)) WHERE cluster=="prod" BY cluster | SORT cluster;
207+
208+
max_rate:double | cluster:keyword
209+
8.716707021791768 | prod
210+
null | qa
211+
null | staging
212+
;
213+
214+
maxRateWithPreFilter
215+
required_capability: metrics_command
216+
TS k8s | WHERE cluster=="prod" | STATS max_rate = max(rate(network.total_bytes_in)) BY cluster | SORT cluster;
217+
218+
max_rate:double | cluster:keyword
219+
8.716707021791768 | prod
220+
;
221+
204222
notEnoughSamples
205223
required_capability: metrics_command
206224
TS k8s | WHERE @timestamp <= "2024-05-10T00:06:14.000Z" | STATS max(rate(network.total_bytes_in)) BY pod, time_bucket = bucket(@timestamp,1minute) | SORT pod, time_bucket DESC | LIMIT 10;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateFunction.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@
1111
import org.elasticsearch.common.io.stream.StreamOutput;
1212
import org.elasticsearch.xpack.esql.capabilities.PostAnalysisPlanVerificationAware;
1313
import org.elasticsearch.xpack.esql.common.Failures;
14+
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
1415
import org.elasticsearch.xpack.esql.core.expression.Expression;
16+
import org.elasticsearch.xpack.esql.core.expression.Expressions;
1517
import org.elasticsearch.xpack.esql.core.expression.Literal;
1618
import org.elasticsearch.xpack.esql.core.expression.TypeResolutions;
1719
import org.elasticsearch.xpack.esql.core.expression.function.Function;
@@ -152,6 +154,17 @@ public AggregateFunction withParameters(List<? extends Expression> parameters) {
152154
return (AggregateFunction) replaceChildren(CollectionUtils.combine(asList(field, filter), parameters));
153155
}
154156

157+
/**
158+
* Returns the set of input attributes required by this aggregate function, excluding those referenced by the filter.
159+
*/
160+
public AttributeSet aggregateInputReferences() {
161+
if (hasFilter()) {
162+
return Expressions.references(CollectionUtils.combine(List.of(field), parameters));
163+
} else {
164+
return references();
165+
}
166+
}
167+
155168
@Override
156169
public int hashCode() {
157170
// NB: the hashcode is currently used for key generation so

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateTimeSeriesAggregate.java

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.xpack.esql.core.expression.Attribute;
1414
import org.elasticsearch.xpack.esql.core.expression.Expression;
1515
import org.elasticsearch.xpack.esql.core.expression.Expressions;
16+
import org.elasticsearch.xpack.esql.core.expression.Literal;
1617
import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute;
1718
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
1819
import org.elasticsearch.xpack.esql.core.type.DataType;
@@ -184,7 +185,21 @@ LogicalPlan translate(TimeSeriesAggregate aggregate) {
184185
for (NamedExpression agg : aggregate.aggregates()) {
185186
if (agg instanceof Alias alias && alias.child() instanceof AggregateFunction af) {
186187
Holder<Boolean> changed = new Holder<>(Boolean.FALSE);
188+
final Expression inlineFilter;
189+
if (af.hasFilter()) {
190+
inlineFilter = af.filter();
191+
af = af.withFilter(Literal.TRUE);
192+
} else {
193+
inlineFilter = null;
194+
}
187195
Expression outerAgg = af.transformDown(TimeSeriesAggregateFunction.class, tsAgg -> {
196+
if (inlineFilter != null) {
197+
if (tsAgg.hasFilter() == false) {
198+
throw new IllegalStateException("inline filter isn't propagated to time-series aggregation");
199+
}
200+
} else if (tsAgg.hasFilter()) {
201+
throw new IllegalStateException("unexpected inline filter in time-series aggregation");
202+
}
188203
changed.set(Boolean.TRUE);
189204
if (tsAgg instanceof Rate) {
190205
hasRateAggregates.set(Boolean.TRUE);
@@ -201,14 +216,28 @@ LogicalPlan translate(TimeSeriesAggregate aggregate) {
201216
secondPassAggs.add(new Alias(alias.source(), alias.name(), outerAgg, agg.id()));
202217
} else {
203218
// TODO: reject over_time_aggregation only
204-
var tsAgg = new LastOverTime(af.source(), af.field(), timestamp.get());
205-
AggregateFunction firstStageFn = tsAgg.perTimeSeriesAggregation();
219+
final Expression aggField = af.field();
220+
var tsAgg = new LastOverTime(af.source(), aggField, timestamp.get());
221+
final AggregateFunction firstStageFn;
222+
if (inlineFilter != null) {
223+
firstStageFn = tsAgg.perTimeSeriesAggregation().withFilter(inlineFilter);
224+
} else {
225+
firstStageFn = tsAgg.perTimeSeriesAggregation();
226+
}
206227
Alias newAgg = timeSeriesAggs.computeIfAbsent(firstStageFn, k -> {
207228
Alias firstStageAlias = new Alias(tsAgg.source(), internalNames.next(tsAgg.functionName()), firstStageFn);
208229
firstPassAggs.add(firstStageAlias);
209230
return firstStageAlias;
210231
});
211-
secondPassAggs.add((Alias) agg.transformUp(f -> f == af.field(), f -> newAgg.toAttribute()));
232+
secondPassAggs.add((Alias) agg.transformUp(f -> f == aggField || f instanceof AggregateFunction, e -> {
233+
if (e == aggField) {
234+
return newAgg.toAttribute();
235+
} else if (e instanceof AggregateFunction f) {
236+
return f.withFilter(Literal.TRUE);
237+
} else {
238+
return e;
239+
}
240+
}));
212241
}
213242
}
214243
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ private void aggregatesToFactory(
279279
}
280280
} else {
281281
// extra dependencies like TS ones (that require a timestamp)
282-
for (Expression input : aggregateFunction.references()) {
282+
for (Expression input : aggregateFunction.aggregateInputReferences()) {
283283
Attribute attr = Expressions.attribute(input);
284284
if (attr == null) {
285285
throw new EsqlIllegalArgumentException(

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7751,6 +7751,70 @@ public void testTranslateLastOverTime() {
77517751
assertThat(Expressions.attribute(bucket.field()).name(), equalTo("@timestamp"));
77527752
}
77537753

7754+
public void testTranslateWithInlineFilter() {
7755+
assumeTrue("requires metrics command", EsqlCapabilities.Cap.METRICS_COMMAND.isEnabled());
7756+
var query = """
7757+
TS k8s | STATS sum(last_over_time(network.bytes_in)) WHERE cluster == "prod" BY bucket(@timestamp, 1 minute)
7758+
| LIMIT 10
7759+
""";
7760+
var plan = logicalOptimizer.optimize(metricsAnalyzer.analyze(parser.createStatement(query, EsqlTestUtils.TEST_CFG)));
7761+
var limit = as(plan, Limit.class);
7762+
Aggregate finalAgg = as(limit.child(), Aggregate.class);
7763+
assertThat(finalAgg, not(instanceOf(TimeSeriesAggregate.class)));
7764+
TimeSeriesAggregate aggsByTsid = as(finalAgg.child(), TimeSeriesAggregate.class);
7765+
assertNotNull(aggsByTsid.timeBucket());
7766+
assertThat(aggsByTsid.timeBucket().buckets().fold(FoldContext.small()), equalTo(Duration.ofMinutes(1)));
7767+
Eval evalBucket = as(aggsByTsid.child(), Eval.class);
7768+
assertThat(evalBucket.fields(), hasSize(1));
7769+
EsRelation relation = as(evalBucket.child(), EsRelation.class);
7770+
assertThat(relation.indexMode(), equalTo(IndexMode.STANDARD));
7771+
7772+
var sum = as(Alias.unwrap(finalAgg.aggregates().get(0)), Sum.class);
7773+
assertFalse(sum.hasFilter());
7774+
7775+
LastOverTime lastOverTime = as(Alias.unwrap(aggsByTsid.aggregates().get(0)), LastOverTime.class);
7776+
assertThat(Expressions.attribute(lastOverTime.field()).name(), equalTo("network.bytes_in"));
7777+
assertThat(Expressions.attribute(aggsByTsid.groupings().get(1)).id(), equalTo(evalBucket.fields().get(0).id()));
7778+
Bucket bucket = as(Alias.unwrap(evalBucket.fields().get(0)), Bucket.class);
7779+
assertThat(Expressions.attribute(bucket.field()).name(), equalTo("@timestamp"));
7780+
assertTrue(lastOverTime.hasFilter());
7781+
assertThat(lastOverTime.filter(), instanceOf(Equals.class));
7782+
}
7783+
7784+
public void testTranslateWithInlineFilterWithImplicitLastOverTime() {
7785+
assumeTrue("requires metrics command", EsqlCapabilities.Cap.METRICS_COMMAND.isEnabled());
7786+
var query = """
7787+
TS k8s | STATS avg(network.bytes_in) WHERE cluster == "prod" BY bucket(@timestamp, 1 minute)
7788+
| LIMIT 10
7789+
""";
7790+
var plan = logicalOptimizer.optimize(metricsAnalyzer.analyze(parser.createStatement(query, EsqlTestUtils.TEST_CFG)));
7791+
var project = as(plan, Project.class);
7792+
var eval = as(project.child(), Eval.class);
7793+
var limit = as(eval.child(), Limit.class);
7794+
Aggregate finalAgg = as(limit.child(), Aggregate.class);
7795+
assertThat(finalAgg, not(instanceOf(TimeSeriesAggregate.class)));
7796+
TimeSeriesAggregate aggsByTsid = as(finalAgg.child(), TimeSeriesAggregate.class);
7797+
assertNotNull(aggsByTsid.timeBucket());
7798+
assertThat(aggsByTsid.timeBucket().buckets().fold(FoldContext.small()), equalTo(Duration.ofMinutes(1)));
7799+
Eval evalBucket = as(aggsByTsid.child(), Eval.class);
7800+
assertThat(evalBucket.fields(), hasSize(1));
7801+
EsRelation relation = as(evalBucket.child(), EsRelation.class);
7802+
assertThat(relation.indexMode(), equalTo(IndexMode.STANDARD));
7803+
7804+
var sum = as(Alias.unwrap(finalAgg.aggregates().get(0)), Sum.class);
7805+
assertFalse(sum.hasFilter());
7806+
var count = as(Alias.unwrap(finalAgg.aggregates().get(1)), Count.class);
7807+
assertFalse(count.hasFilter());
7808+
7809+
LastOverTime lastOverTime = as(Alias.unwrap(aggsByTsid.aggregates().get(0)), LastOverTime.class);
7810+
assertThat(Expressions.attribute(lastOverTime.field()).name(), equalTo("network.bytes_in"));
7811+
assertThat(Expressions.attribute(aggsByTsid.groupings().get(1)).id(), equalTo(evalBucket.fields().get(0).id()));
7812+
Bucket bucket = as(Alias.unwrap(evalBucket.fields().get(0)), Bucket.class);
7813+
assertThat(Expressions.attribute(bucket.field()).name(), equalTo("@timestamp"));
7814+
assertTrue(lastOverTime.hasFilter());
7815+
assertThat(lastOverTime.filter(), instanceOf(Equals.class));
7816+
}
7817+
77547818
public void testMvSortInvalidOrder() {
77557819
VerificationException e = expectThrows(VerificationException.class, () -> plan("""
77567820
from test

0 commit comments

Comments
 (0)