Skip to content

Commit ca9a217

Browse files
committed
Add avg_over_time
1 parent c888b2c commit ca9a217

File tree

9 files changed

+179
-4
lines changed

9 files changed

+179
-4
lines changed

x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,3 +183,39 @@ cost:double | cluster:keyword | time_bucket:datetime
183183
22.75 | prod | 2024-05-10T00:13:00.000Z
184184
22.75 | qa | 2024-05-10T00:08:00.000Z
185185
;
186+
187+
max_of_avg_over_time
188+
required_capability: metrics_command
189+
required_capability: avg_over_time
190+
TS k8s | STATS max_cost=max(avg_over_time(network.cost)) BY cluster, time_bucket = bucket(@timestamp,1minute) | SORT max_cost DESC, time_bucket DESC, cluster | LIMIT 10;
191+
192+
max_cost:double | cluster:keyword | time_bucket:datetime
193+
12.375 | prod | 2024-05-10T00:17:00.000Z
194+
12.375 | qa | 2024-05-10T00:01:00.000Z
195+
12.25 | prod | 2024-05-10T00:19:00.000Z
196+
12.0625 | qa | 2024-05-10T00:06:00.000Z
197+
11.875 | prod | 2024-05-10T00:15:00.000Z
198+
11.875 | qa | 2024-05-10T00:09:00.000Z
199+
11.625 | prod | 2024-05-10T00:12:00.000Z
200+
11.5 | prod | 2024-05-10T00:05:00.000Z
201+
11.25 | prod | 2024-05-10T00:13:00.000Z
202+
11.0 | qa | 2024-05-10T00:07:00.000Z
203+
;
204+
205+
avg_of_avg_over_time
206+
required_capability: metrics_command
207+
required_capability: avg_over_time
208+
TS k8s | STATS avg_cost=avg(avg_over_time(network.cost)) BY cluster, time_bucket = bucket(@timestamp,1minute) | SORT avg_cost DESC, time_bucket DESC, cluster | LIMIT 10;
209+
210+
avg_cost:double | cluster:keyword | time_bucket:datetime
211+
11.625 | prod | 2024-05-10T00:12:00.000Z
212+
10.6875 | prod | 2024-05-10T00:00:00.000Z
213+
10.145833333333332 | qa | 2024-05-10T00:04:00.000Z
214+
10.0 | staging | 2024-05-10T00:11:00.000Z
215+
9.895833333333334 | qa | 2024-05-10T00:06:00.000Z
216+
9.666666666666666 | prod | 2024-05-10T00:19:00.000Z
217+
8.875 | qa | 2024-05-10T00:01:00.000Z
218+
8.805555555555555 | qa | 2024-05-10T00:09:00.000Z
219+
8.71875 | prod | 2024-05-10T00:22:00.000Z
220+
8.5625 | qa | 2024-05-10T00:22:00.000Z
221+
;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -964,7 +964,12 @@ public enum Cap {
964964
/**
965965
* Support max_over_time aggregation
966966
*/
967-
MAX_OVER_TIME(Build.current().isSnapshot());
967+
MAX_OVER_TIME(Build.current().isSnapshot()),
968+
969+
/**
970+
* Support avg_over_time aggregation
971+
*/
972+
AVG_OVER_TIME(Build.current().isSnapshot());
968973

969974
private final boolean enabled;
970975

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.xpack.esql.core.type.DataType;
1818
import org.elasticsearch.xpack.esql.core.util.Check;
1919
import org.elasticsearch.xpack.esql.expression.function.aggregate.Avg;
20+
import org.elasticsearch.xpack.esql.expression.function.aggregate.AvgOverTime;
2021
import org.elasticsearch.xpack.esql.expression.function.aggregate.Count;
2122
import org.elasticsearch.xpack.esql.expression.function.aggregate.CountDistinct;
2223
import org.elasticsearch.xpack.esql.expression.function.aggregate.Max;
@@ -434,6 +435,7 @@ private static FunctionDefinition[][] snapshotFunctions() {
434435
def(Delay.class, Delay::new, "delay"),
435436
def(Rate.class, Rate::withUnresolvedTimestamp, "rate"),
436437
def(MaxOverTime.class, uni(MaxOverTime::new), "max_over_time"),
438+
def(AvgOverTime.class, uni(AvgOverTime::new), "avg_over_time"),
437439
def(Term.class, bi(Term::new), "term") } };
438440
}
439441

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ public static List<NamedWriteableRegistry.Entry> getNamedWriteables() {
3131
Top.ENTRY,
3232
Values.ENTRY,
3333
MaxOverTime.ENTRY,
34+
AvgOverTime.ENTRY,
3435
// internal functions
3536
ToPartial.ENTRY,
3637
FromPartial.ENTRY,
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.expression.function.aggregate;
9+
10+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
11+
import org.elasticsearch.common.io.stream.StreamInput;
12+
import org.elasticsearch.xpack.esql.core.expression.Expression;
13+
import org.elasticsearch.xpack.esql.core.expression.Literal;
14+
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
15+
import org.elasticsearch.xpack.esql.core.tree.Source;
16+
import org.elasticsearch.xpack.esql.core.type.DataType;
17+
import org.elasticsearch.xpack.esql.expression.function.FunctionInfo;
18+
import org.elasticsearch.xpack.esql.expression.function.FunctionType;
19+
import org.elasticsearch.xpack.esql.expression.function.Param;
20+
21+
import java.io.IOException;
22+
import java.util.List;
23+
24+
import static java.util.Collections.emptyList;
25+
26+
/**
27+
* Similar to {@link Avg}, but it is used to calculate the average value over a time series of values from the given field.
28+
*/
29+
public class AvgOverTime extends TimeSeriesAggregateFunction {
30+
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
31+
Expression.class,
32+
"AvgOverTime",
33+
AvgOverTime::new
34+
);
35+
36+
@FunctionInfo(returnType = "double", description = "The average of a numeric field.", type = FunctionType.AGGREGATE)
37+
public AvgOverTime(
38+
Source source,
39+
@Param(
40+
name = "number",
41+
type = { "double", "integer", "long" },
42+
description = "Expression that outputs values to average."
43+
) Expression field
44+
) {
45+
this(source, field, Literal.TRUE);
46+
}
47+
48+
public AvgOverTime(Source source, Expression field, Expression filter) {
49+
super(source, field, filter, emptyList());
50+
}
51+
52+
@Override
53+
protected TypeResolution resolveType() {
54+
return perTimeSeriesAggregation().resolveType();
55+
}
56+
57+
private AvgOverTime(StreamInput in) throws IOException {
58+
super(in);
59+
}
60+
61+
@Override
62+
public String getWriteableName() {
63+
return ENTRY.name;
64+
}
65+
66+
@Override
67+
public DataType dataType() {
68+
return DataType.DOUBLE;
69+
}
70+
71+
@Override
72+
protected NodeInfo<AvgOverTime> info() {
73+
return NodeInfo.create(this, AvgOverTime::new, field(), filter());
74+
}
75+
76+
@Override
77+
public AvgOverTime replaceChildren(List<Expression> newChildren) {
78+
return new AvgOverTime(source(), newChildren.get(0), newChildren.get(1));
79+
}
80+
81+
@Override
82+
public AvgOverTime withFilter(Expression filter) {
83+
return new AvgOverTime(source(), field(), filter);
84+
}
85+
86+
@Override
87+
public AggregateFunction perTimeSeriesAggregation() {
88+
return new Avg(source(), field(), filter());
89+
}
90+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,9 +138,10 @@ protected static Batch<LogicalPlan> substitutions() {
138138
new ReplaceAggregateAggExpressionWithEval(),
139139
// lastly replace surrogate functions
140140
new SubstituteSurrogates(),
141-
// translate metric aggregates after surrogate substitution and replace nested expressions with eval (again)
142141
new TranslateTimeSeriesAggregate(),
143142
new PruneUnusedIndexMode(),
143+
// after translating metric aggregates we need to replace surrogate substitution and nested expressions again.
144+
new SubstituteSurrogates(),
144145
new ReplaceAggregateNestedExpressionWithEval(),
145146
// this one needs to be placed before ReplaceAliasingEvalWithProject, so that any potential aliasing eval (eval x = y)
146147
// is not replaced with a Project before the eval to be copied on the left hand side of an InlineJoin

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateTimeSeriesAggregate.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,15 @@
124124
* | STATS max_memory_usage = max(memory_usage), host_values=VALUES(host) BY _tsid, time_bucket=bucket(@timestamp, 1minute)
125125
* | STATS sum(max_memory_usage) BY host_values, time_bucket
126126
*
127+
*
128+
* TS k8s | STATS sum(avg_over_time(memory_usage)) BY host, bucket(@timestamp, 1minute)
129+
*
130+
* becomes
131+
*
132+
* TS k8s
133+
* | STATS avg_memory_usage = avg(memory_usage), host_values=VALUES(host) BY _tsid, time_bucket=bucket(@timestamp, 1minute)
134+
* | STATS sum(avg_memory_usage) BY host_values, time_bucket
135+
*
127136
* </pre>
128137
*/
129138
public final class TranslateTimeSeriesAggregate extends OptimizerRules.OptimizerRule<Aggregate> {

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7100,6 +7100,37 @@ public void testTranslateMaxOverTime() {
71007100
assertThat(Expressions.attribute(bucket.field()).name(), equalTo("@timestamp"));
71017101
}
71027102

7103+
public void testTranslateAvgOverTime() {
7104+
assumeTrue("requires snapshot builds", Build.current().isSnapshot());
7105+
var query = "TS k8s | STATS sum(avg_over_time(network.bytes_in)) BY bucket(@timestamp, 1h)";
7106+
var plan = logicalOptimizer.optimize(metricsAnalyzer.analyze(parser.createStatement(query)));
7107+
Limit limit = as(plan, Limit.class);
7108+
Aggregate finalAgg = as(limit.child(), Aggregate.class);
7109+
assertThat(finalAgg, not(instanceOf(TimeSeriesAggregate.class)));
7110+
assertThat(finalAgg.aggregates(), hasSize(2));
7111+
Eval evalAvg = as(finalAgg.child(), Eval.class);
7112+
TimeSeriesAggregate aggsByTsid = as(evalAvg.child(), TimeSeriesAggregate.class);
7113+
assertThat(aggsByTsid.aggregates(), hasSize(3)); // _tsid is dropped
7114+
assertNotNull(aggsByTsid.timeBucket());
7115+
assertThat(aggsByTsid.timeBucket().buckets().fold(FoldContext.small()), equalTo(Duration.ofHours(1)));
7116+
Eval evalBucket = as(aggsByTsid.child(), Eval.class);
7117+
assertThat(evalBucket.fields(), hasSize(1));
7118+
as(evalBucket.child(), EsRelation.class);
7119+
7120+
Sum sum = as(Alias.unwrap(finalAgg.aggregates().get(0)), Sum.class);
7121+
assertThat(Expressions.attribute(sum.field()).id(), equalTo(evalAvg.fields().get(0).id()));
7122+
assertThat(finalAgg.groupings(), hasSize(1));
7123+
assertThat(Expressions.attribute(finalAgg.groupings().get(0)).id(), equalTo(aggsByTsid.aggregates().get(2).id()));
7124+
7125+
Sum sumTs = as(Alias.unwrap(aggsByTsid.aggregates().get(0)), Sum.class);
7126+
assertThat(Expressions.attribute(sumTs.field()).name(), equalTo("network.bytes_in"));
7127+
Count countTs = as(Alias.unwrap(aggsByTsid.aggregates().get(1)), Count.class);
7128+
assertThat(Expressions.attribute(countTs.field()).name(), equalTo("network.bytes_in"));
7129+
assertThat(Expressions.attribute(aggsByTsid.groupings().get(1)).id(), equalTo(evalBucket.fields().get(0).id()));
7130+
Bucket bucket = as(Alias.unwrap(evalBucket.fields().get(0)), Bucket.class);
7131+
assertThat(Expressions.attribute(bucket.field()).name(), equalTo("@timestamp"));
7132+
}
7133+
71037134
public void testMetricsWithoutRate() {
71047135
assumeTrue("requires snapshot builds", Build.current().isSnapshot());
71057136
List<String> queries = List.of("""

x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ setup:
3333
path: /_query
3434
parameters: []
3535
# A snapshot function was removed in match_function_options, it can't work on mixed cluster tests otherwise.
36-
capabilities: [ snapshot_test_for_telemetry, fn_byte_length, match_function_options, max_over_time]
36+
capabilities: [ snapshot_test_for_telemetry, fn_byte_length, match_function_options, avg_over_time]
3737
reason: "Test that should only be executed on snapshot versions"
3838

3939
- do: {xpack.usage: {}}
@@ -101,7 +101,7 @@ setup:
101101
- match: {esql.functions.coalesce: $functions_coalesce}
102102
- gt: {esql.functions.categorize: $functions_categorize}
103103
# Testing for the entire function set isn't feasbile, so we just check that we return the correct count as an approximation.
104-
- length: {esql.functions: 135} # check the "sister" test below for a likely update to the same esql.functions length check
104+
- length: {esql.functions: 136} # check the "sister" test below for a likely update to the same esql.functions length check
105105

106106
---
107107
"Basic ESQL usage output (telemetry) non-snapshot version":

0 commit comments

Comments
 (0)