Skip to content

Commit f82eec3

Browse files
authored
More validation for time-series aggregations (#134413)
1 parent 9c96b94 commit f82eec3

File tree

4 files changed

+205
-37
lines changed

4 files changed

+205
-37
lines changed

x-pack/plugin/esql/qa/testFixtures/src/main/resources/tsdb-mapping.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@
1010
"name": {
1111
"type": "keyword"
1212
},
13+
"host": {
14+
"type": "keyword",
15+
"time_series_dimension": true
16+
},
1317
"network": {
1418
"properties": {
1519
"connections": {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Aggregate.java

Lines changed: 8 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1111
import org.elasticsearch.common.io.stream.StreamInput;
1212
import org.elasticsearch.common.io.stream.StreamOutput;
13-
import org.elasticsearch.index.IndexMode;
1413
import org.elasticsearch.xpack.esql.capabilities.PostAnalysisVerificationAware;
1514
import org.elasticsearch.xpack.esql.capabilities.TelemetryAware;
1615
import org.elasticsearch.xpack.esql.common.Failures;
@@ -29,7 +28,6 @@
2928
import org.elasticsearch.xpack.esql.core.util.Holder;
3029
import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction;
3130
import org.elasticsearch.xpack.esql.expression.function.aggregate.FilteredExpression;
32-
import org.elasticsearch.xpack.esql.expression.function.aggregate.Rate;
3331
import org.elasticsearch.xpack.esql.expression.function.aggregate.TimeSeriesAggregateFunction;
3432
import org.elasticsearch.xpack.esql.expression.function.fulltext.FullTextFunction;
3533
import org.elasticsearch.xpack.esql.expression.function.grouping.Categorize;
@@ -243,18 +241,18 @@ public void postAnalysisVerification(Failures failures) {
243241
// traverse the tree to find invalid matches
244242
checkInvalidNamedExpressionUsage(exp, groupings, groupRefs, failures, 0);
245243
});
246-
if (anyMatch(l -> l instanceof EsRelation relation && relation.indexMode() == IndexMode.TIME_SERIES)) {
247-
aggregates.forEach(a -> checkRateAggregates(a, 0, failures));
248-
} else {
249-
forEachExpression(
250-
TimeSeriesAggregateFunction.class,
251-
r -> failures.add(fail(r, "time_series aggregate[{}] can only be used with the TS command", r.sourceText()))
252-
);
253-
}
244+
checkTimeSeriesAggregates(failures);
254245
checkCategorizeGrouping(failures);
255246
checkMultipleScoreAggregations(failures);
256247
}
257248

249+
protected void checkTimeSeriesAggregates(Failures failures) {
250+
forEachExpression(
251+
TimeSeriesAggregateFunction.class,
252+
r -> failures.add(fail(r, "time_series aggregate[{}] can only be used with the TS command", r.sourceText()))
253+
);
254+
}
255+
258256
private void checkMultipleScoreAggregations(Failures failures) {
259257
Holder<Boolean> hasScoringAggs = new Holder<>();
260258
forEachExpression(FilteredExpression.class, fe -> {
@@ -353,22 +351,6 @@ private void checkCategorizeGrouping(Failures failures) {
353351
})));
354352
}
355353

356-
private static void checkRateAggregates(Expression expr, int nestedLevel, Failures failures) {
357-
if (expr instanceof AggregateFunction) {
358-
nestedLevel++;
359-
}
360-
if (expr instanceof Rate r) {
361-
if (nestedLevel != 2) {
362-
failures.add(
363-
fail(expr, "the rate aggregate [{}] can only be used with the TS command and inside another aggregate", r.sourceText())
364-
);
365-
}
366-
}
367-
for (Expression child : expr.children()) {
368-
checkRateAggregates(child, nestedLevel, failures);
369-
}
370-
}
371-
372354
// traverse the expression and look either for an agg function or a grouping match
373355
// stop either when no children are left, the leafs are literals or a reference attribute is given
374356
private static void checkInvalidNamedExpressionUsage(

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/TimeSeriesAggregate.java

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,24 @@
1010
import org.elasticsearch.common.io.stream.StreamInput;
1111
import org.elasticsearch.common.io.stream.StreamOutput;
1212
import org.elasticsearch.core.Nullable;
13+
import org.elasticsearch.xpack.esql.common.Failures;
14+
import org.elasticsearch.xpack.esql.core.expression.Alias;
1315
import org.elasticsearch.xpack.esql.core.expression.Expression;
1416
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
1517
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
1618
import org.elasticsearch.xpack.esql.core.tree.Source;
19+
import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction;
20+
import org.elasticsearch.xpack.esql.expression.function.aggregate.Count;
21+
import org.elasticsearch.xpack.esql.expression.function.aggregate.TimeSeriesAggregateFunction;
1722
import org.elasticsearch.xpack.esql.expression.function.grouping.Bucket;
23+
import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin;
1824

1925
import java.io.IOException;
2026
import java.util.List;
2127
import java.util.Objects;
2228

29+
import static org.elasticsearch.xpack.esql.common.Failure.fail;
30+
2331
/**
2432
* An extension of {@link Aggregate} to perform time-series aggregation per time-series, such as rate or _over_time.
2533
* The grouping must be `_tsid` and `tbucket` or just `_tsid`.
@@ -106,4 +114,129 @@ public boolean equals(Object obj) {
106114
&& Objects.equals(child(), other.child())
107115
&& Objects.equals(timeBucket, other.timeBucket);
108116
}
117+
118+
@Override
119+
public void postAnalysisVerification(Failures failures) {
120+
super.postAnalysisVerification(failures);
121+
child().forEachDown(p -> {
122+
// reject `TS metrics | SORT BY ... | STATS ...`
123+
if (p instanceof OrderBy orderBy) {
124+
failures.add(
125+
fail(
126+
orderBy,
127+
"sorting [{}] between the time-series source and the first aggregation [{}] is not allowed",
128+
orderBy.sourceText(),
129+
this.sourceText()
130+
)
131+
);
132+
}
133+
// reject `TS metrics | LIMIT ... | STATS ...`
134+
if (p instanceof Limit limit) {
135+
failures.add(
136+
fail(
137+
limit,
138+
"limiting [{}] the time-series source before the first aggregation [{}] is not allowed; "
139+
+ "filter data with a WHERE command instead",
140+
limit.sourceText(),
141+
this.sourceText()
142+
)
143+
);
144+
}
145+
// reject `TS metrics | LOOKUP JOIN ... | STATS ...`
146+
if (p instanceof LookupJoin lookupJoin) {
147+
failures.add(
148+
fail(
149+
lookupJoin,
150+
"lookup join [{}] in the time-series before the first aggregation [{}] is not allowed",
151+
lookupJoin.sourceText(),
152+
this.sourceText()
153+
)
154+
);
155+
}
156+
// reject `TS metrics | ENRICH ... | STATS ...`
157+
if (p instanceof Enrich enrich) {
158+
failures.add(
159+
fail(
160+
enrich,
161+
"enrich [{}] in the time-series before the first aggregation [{}] is not allowed",
162+
enrich.sourceText(),
163+
this.sourceText()
164+
)
165+
);
166+
}
167+
// reject `TS metrics | CHANGE POINT ... | STATS ...`
168+
if (p instanceof ChangePoint changePoint) {
169+
failures.add(
170+
fail(
171+
changePoint,
172+
"change_point [{}] in the time-series the first aggregation [{}] is not allowed",
173+
changePoint.sourceText(),
174+
this.sourceText()
175+
)
176+
);
177+
}
178+
});
179+
}
180+
181+
@Override
182+
protected void checkTimeSeriesAggregates(Failures failures) {
183+
for (NamedExpression aggregate : aggregates) {
184+
if (aggregate instanceof Alias alias && Alias.unwrap(alias) instanceof AggregateFunction outer) {
185+
if (outer instanceof Count count && count.field().foldable()) {
186+
// reject `TS metrics | STATS COUNT(*)`
187+
failures.add(
188+
fail(count, "count_star [{}] can't be used with TS command; use count on a field instead", outer.sourceText())
189+
);
190+
}
191+
if (outer instanceof TimeSeriesAggregateFunction ts) {
192+
outer.field()
193+
.forEachDown(
194+
AggregateFunction.class,
195+
nested -> failures.add(
196+
fail(
197+
this,
198+
"cannot use aggregate function [{}] inside time-series aggregation function [{}]",
199+
nested.sourceText(),
200+
outer.sourceText()
201+
)
202+
)
203+
);
204+
// reject `TS metrics | STATS rate(requests)`
205+
// TODO: support this
206+
failures.add(
207+
fail(
208+
ts,
209+
"time-series aggregate function [{}] can only be used with the TS command "
210+
+ "and inside another aggregate function",
211+
ts.sourceText()
212+
)
213+
);
214+
} else {
215+
outer.field().forEachDown(AggregateFunction.class, nested -> {
216+
if (nested instanceof TimeSeriesAggregateFunction == false) {
217+
fail(
218+
this,
219+
"cannot use aggregate function [{}] inside aggregation function [{}];"
220+
+ "only time-series aggregation function can be used inside another aggregation function",
221+
nested.sourceText(),
222+
outer.sourceText()
223+
);
224+
}
225+
nested.field()
226+
.forEachDown(
227+
AggregateFunction.class,
228+
nested2 -> failures.add(
229+
fail(
230+
this,
231+
"cannot use aggregate function [{}] inside over-time aggregation function [{}]",
232+
nested.sourceText(),
233+
nested2.sourceText()
234+
)
235+
)
236+
);
237+
});
238+
}
239+
}
240+
}
241+
}
109242
}

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java

Lines changed: 60 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1191,25 +1191,52 @@ public void testNotAllowRateOutsideMetrics() {
11911191
);
11921192
}
11931193

1194-
public void testRateNotEnclosedInAggregate() {
1194+
public void testTimeseriesAggregate() {
11951195
assertThat(
11961196
error("TS tests | STATS rate(network.bytes_in)", tsdb),
1197-
equalTo("1:18: the rate aggregate [rate(network.bytes_in)] can only be used with the TS command and inside another aggregate")
1197+
equalTo(
1198+
"1:18: time-series aggregate function [rate(network.bytes_in)] can only be used with the TS command "
1199+
+ "and inside another aggregate function"
1200+
)
1201+
);
1202+
assertThat(
1203+
error("TS tests | STATS avg_over_time(network.connections)", tsdb),
1204+
equalTo(
1205+
"1:18: time-series aggregate function [avg_over_time(network.connections)] can only be used "
1206+
+ "with the TS command and inside another aggregate function"
1207+
)
11981208
);
11991209
assertThat(
12001210
error("TS tests | STATS avg(rate(network.bytes_in)), rate(network.bytes_in)", tsdb),
1201-
equalTo("1:47: the rate aggregate [rate(network.bytes_in)] can only be used with the TS command and inside another aggregate")
1211+
equalTo(
1212+
"1:47: time-series aggregate function [rate(network.bytes_in)] can only be used "
1213+
+ "with the TS command and inside another aggregate function"
1214+
)
12021215
);
1216+
12031217
assertThat(error("TS tests | STATS max(avg(rate(network.bytes_in)))", tsdb), equalTo("""
1204-
1:22: nested aggregations [avg(rate(network.bytes_in))] not allowed inside other aggregations\
1205-
[max(avg(rate(network.bytes_in)))]
1206-
line 1:26: the rate aggregate [rate(network.bytes_in)] can only be used with the TS command\
1207-
and inside another aggregate"""));
1218+
1:22: nested aggregations [avg(rate(network.bytes_in))] \
1219+
not allowed inside other aggregations [max(avg(rate(network.bytes_in)))]
1220+
line 1:12: cannot use aggregate function [avg(rate(network.bytes_in))] \
1221+
inside over-time aggregation function [rate(network.bytes_in)]"""));
1222+
12081223
assertThat(error("TS tests | STATS max(avg(rate(network.bytes_in)))", tsdb), equalTo("""
1209-
1:22: nested aggregations [avg(rate(network.bytes_in))] not allowed inside other aggregations\
1210-
[max(avg(rate(network.bytes_in)))]
1211-
line 1:26: the rate aggregate [rate(network.bytes_in)] can only be used with the TS command\
1212-
and inside another aggregate"""));
1224+
1:22: nested aggregations [avg(rate(network.bytes_in))] \
1225+
not allowed inside other aggregations [max(avg(rate(network.bytes_in)))]
1226+
line 1:12: cannot use aggregate function [avg(rate(network.bytes_in))] \
1227+
inside over-time aggregation function [rate(network.bytes_in)]"""));
1228+
1229+
assertThat(
1230+
error("TS tests | STATS rate(network.bytes_in) BY bucket(@timestamp, 1 hour)", tsdb),
1231+
equalTo(
1232+
"1:18: time-series aggregate function [rate(network.bytes_in)] can only be used "
1233+
+ "with the TS command and inside another aggregate function"
1234+
)
1235+
);
1236+
assertThat(
1237+
error("TS tests | STATS COUNT(*)", tsdb),
1238+
equalTo("1:18: count_star [COUNT(*)] can't be used with TS command; use count on a field instead")
1239+
);
12131240
}
12141241

12151242
public void testWeightedAvg() {
@@ -2626,6 +2653,28 @@ public void testFuse() {
26262653
);
26272654
}
26282655

2656+
public void testSortInTimeSeries() {
2657+
assertThat(
2658+
error("TS test | SORT host | STATS avg(last_over_time(network.connections))", tsdb),
2659+
equalTo(
2660+
"1:11: sorting [SORT host] between the time-series source "
2661+
+ "and the first aggregation [STATS avg(last_over_time(network.connections))] is not allowed"
2662+
)
2663+
);
2664+
assertThat(
2665+
error("TS test | LIMIT 10 | STATS avg(network.connections)", tsdb),
2666+
equalTo(
2667+
"1:11: limiting [LIMIT 10] the time-series source before the first aggregation "
2668+
+ "[STATS avg(network.connections)] is not allowed; filter data with a WHERE command instead"
2669+
)
2670+
);
2671+
assertThat(error("TS test | SORT host | LIMIT 10 | STATS avg(network.connections)", tsdb), equalTo("""
2672+
1:23: limiting [LIMIT 10] the time-series source \
2673+
before the first aggregation [STATS avg(network.connections)] is not allowed; filter data with a WHERE command instead
2674+
line 1:11: sorting [SORT host] between the time-series source \
2675+
and the first aggregation [STATS avg(network.connections)] is not allowed"""));
2676+
}
2677+
26292678
private void checkVectorFunctionsNullArgs(String functionInvocation) throws Exception {
26302679
query("from test | eval similarity = " + functionInvocation, fullTextAnalyzer);
26312680
}

0 commit comments

Comments
 (0)