Skip to content

Commit c888b2c

Browse files
authored
Add max_over_time aggregation for TSDB (#126498)
This change adds support for the `max_over_time` aggregation for time_series indices. Similar to the `rate` aggregation, this aggregation is translated into two stages: the first stage groups by `_tsid` (and time-bucket), and the second stage groups by the user-specified groups. For example: ``` TS my-metrics | STATS SUM(max_over_time(memory_usage)) BY cluster, bucket(@timestamp, 1 minute) ``` is translated into: ``` TS my-metrics | STATS max_memory_usage=max(memory_usage), cluster=VALUES(cluster) BY _tsid, ts=bucket(@timestamp, 1 minute) | STATS sum(max_memory_usage) BY cluster, ts ``` In this case, we don't need to keep the Lucene source emitted in the order of _tsid/timestamp, but I leave this optimization for the future. Other `{agg}_over_time` functions will be added later.
1 parent f07d943 commit c888b2c

File tree

14 files changed

+241
-38
lines changed

14 files changed

+241
-38
lines changed

x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,3 +164,22 @@ null | three | 2024-05-10T00:05:00.000
164164
null | three | 2024-05-10T00:02:00.000Z
165165
null | three | 2024-05-10T00:01:00.000Z
166166
;
167+
168+
169+
max_over_time
170+
required_capability: metrics_command
171+
required_capability: max_over_time
172+
TS k8s | STATS cost=sum(max_over_time(network.cost)) BY cluster, time_bucket = bucket(@timestamp,1minute) | SORT cost DESC, time_bucket DESC, cluster | LIMIT 10;
173+
174+
cost:double | cluster:keyword | time_bucket:datetime
175+
32.75 | qa | 2024-05-10T00:17:00.000Z
176+
32.25 | staging | 2024-05-10T00:09:00.000Z
177+
31.75 | qa | 2024-05-10T00:06:00.000Z
178+
29.0 | prod | 2024-05-10T00:19:00.000Z
179+
28.625 | qa | 2024-05-10T00:09:00.000Z
180+
24.625 | qa | 2024-05-10T00:18:00.000Z
181+
23.25 | qa | 2024-05-10T00:11:00.000Z
182+
23.125 | staging | 2024-05-10T00:08:00.000Z
183+
22.75 | prod | 2024-05-10T00:13:00.000Z
184+
22.75 | qa | 2024-05-10T00:08:00.000Z
185+
;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -959,7 +959,12 @@ public enum Cap {
959959
/**
960960
* Listing queries and getting information on a specific query.
961961
*/
962-
QUERY_MONITORING;
962+
QUERY_MONITORING,
963+
964+
/**
965+
* Support max_over_time aggregation
966+
*/
967+
MAX_OVER_TIME(Build.current().isSnapshot());
963968

964969
private final boolean enabled;
965970

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.elasticsearch.xpack.esql.expression.function.aggregate.Count;
2121
import org.elasticsearch.xpack.esql.expression.function.aggregate.CountDistinct;
2222
import org.elasticsearch.xpack.esql.expression.function.aggregate.Max;
23+
import org.elasticsearch.xpack.esql.expression.function.aggregate.MaxOverTime;
2324
import org.elasticsearch.xpack.esql.expression.function.aggregate.Median;
2425
import org.elasticsearch.xpack.esql.expression.function.aggregate.MedianAbsoluteDeviation;
2526
import org.elasticsearch.xpack.esql.expression.function.aggregate.Min;
@@ -432,6 +433,7 @@ private static FunctionDefinition[][] snapshotFunctions() {
432433
// This is an experimental function and can be removed without notice.
433434
def(Delay.class, Delay::new, "delay"),
434435
def(Rate.class, Rate::withUnresolvedTimestamp, "rate"),
436+
def(MaxOverTime.class, uni(MaxOverTime::new), "max_over_time"),
435437
def(Term.class, bi(Term::new), "term") } };
436438
}
437439

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public static List<NamedWriteableRegistry.Entry> getNamedWriteables() {
3030
Sum.ENTRY,
3131
Top.ENTRY,
3232
Values.ENTRY,
33+
MaxOverTime.ENTRY,
3334
// internal functions
3435
ToPartial.ENTRY,
3536
FromPartial.ENTRY,
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.expression.function.aggregate;
9+
10+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
11+
import org.elasticsearch.common.io.stream.StreamInput;
12+
import org.elasticsearch.xpack.esql.core.expression.Expression;
13+
import org.elasticsearch.xpack.esql.core.expression.Literal;
14+
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
15+
import org.elasticsearch.xpack.esql.core.tree.Source;
16+
import org.elasticsearch.xpack.esql.core.type.DataType;
17+
import org.elasticsearch.xpack.esql.expression.function.FunctionInfo;
18+
import org.elasticsearch.xpack.esql.expression.function.FunctionType;
19+
import org.elasticsearch.xpack.esql.expression.function.Param;
20+
21+
import java.io.IOException;
22+
import java.util.List;
23+
24+
import static java.util.Collections.emptyList;
25+
26+
/**
27+
* Similar to {@link Max}, but it is used to calculate the maximum value over a time series of values from the given field.
28+
*/
29+
public class MaxOverTime extends TimeSeriesAggregateFunction {
30+
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
31+
Expression.class,
32+
"MaxOverTime",
33+
MaxOverTime::new
34+
);
35+
36+
@FunctionInfo(
37+
returnType = { "boolean", "double", "integer", "long", "date", "date_nanos", "ip", "keyword", "long", "version" },
38+
description = "The maximum over time value of a field.",
39+
type = FunctionType.AGGREGATE
40+
)
41+
public MaxOverTime(
42+
Source source,
43+
@Param(
44+
name = "field",
45+
type = {
46+
"aggregate_metric_double",
47+
"boolean",
48+
"double",
49+
"integer",
50+
"long",
51+
"date",
52+
"date_nanos",
53+
"ip",
54+
"keyword",
55+
"text",
56+
"long",
57+
"version" }
58+
) Expression field
59+
) {
60+
this(source, field, Literal.TRUE);
61+
}
62+
63+
public MaxOverTime(Source source, Expression field, Expression filter) {
64+
super(source, field, filter, emptyList());
65+
}
66+
67+
private MaxOverTime(StreamInput in) throws IOException {
68+
super(in);
69+
}
70+
71+
@Override
72+
public String getWriteableName() {
73+
return ENTRY.name;
74+
}
75+
76+
@Override
77+
public MaxOverTime withFilter(Expression filter) {
78+
return new MaxOverTime(source(), field(), filter);
79+
}
80+
81+
@Override
82+
protected NodeInfo<MaxOverTime> info() {
83+
return NodeInfo.create(this, MaxOverTime::new, field(), filter());
84+
}
85+
86+
@Override
87+
public MaxOverTime replaceChildren(List<Expression> newChildren) {
88+
return new MaxOverTime(source(), newChildren.get(0), newChildren.get(1));
89+
}
90+
91+
@Override
92+
protected TypeResolution resolveType() {
93+
return perTimeSeriesAggregation().resolveType();
94+
}
95+
96+
@Override
97+
public DataType dataType() {
98+
return perTimeSeriesAggregation().dataType();
99+
}
100+
101+
@Override
102+
public Max perTimeSeriesAggregation() {
103+
return new Max(source(), field(), filter());
104+
}
105+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Rate.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,15 @@
2525
import org.elasticsearch.xpack.esql.expression.function.OptionalArgument;
2626
import org.elasticsearch.xpack.esql.expression.function.Param;
2727
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
28-
import org.elasticsearch.xpack.esql.planner.ToTimeSeriesAggregator;
28+
import org.elasticsearch.xpack.esql.planner.ToAggregator;
2929

3030
import java.io.IOException;
3131
import java.util.List;
3232

3333
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.FIRST;
3434
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isType;
3535

36-
public class Rate extends AggregateFunction implements OptionalArgument, ToTimeSeriesAggregator {
36+
public class Rate extends TimeSeriesAggregateFunction implements OptionalArgument, ToAggregator {
3737
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "Rate", Rate::new);
3838

3939
private final Expression timestamp;
@@ -119,6 +119,11 @@ public AggregatorFunctionSupplier supplier() {
119119
};
120120
}
121121

122+
@Override
123+
public Rate perTimeSeriesAggregation() {
124+
return this;
125+
}
126+
122127
@Override
123128
public String toString() {
124129
return "rate(" + field() + ")";
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
package org.elasticsearch.xpack.esql.expression.function.aggregate;
8+
9+
import org.elasticsearch.common.io.stream.StreamInput;
10+
import org.elasticsearch.xpack.esql.core.expression.Expression;
11+
import org.elasticsearch.xpack.esql.core.tree.Source;
12+
13+
import java.io.IOException;
14+
import java.util.List;
15+
16+
/**
17+
* Extends {@link AggregateFunction} to support aggregation per time_series,
18+
* such as {@link Rate} or {@link MaxOverTime}.
19+
*/
20+
public abstract class TimeSeriesAggregateFunction extends AggregateFunction {
21+
22+
protected TimeSeriesAggregateFunction(Source source, Expression field, Expression filter, List<? extends Expression> parameters) {
23+
super(source, field, filter, parameters);
24+
}
25+
26+
protected TimeSeriesAggregateFunction(StreamInput in) throws IOException {
27+
super(in);
28+
}
29+
30+
/**
31+
* Returns the aggregation function to be used in the first aggregation stage,
32+
* which is grouped by `_tsid` (and `time_bucket`).
33+
*
34+
* @see org.elasticsearch.xpack.esql.optimizer.rules.logical.TranslateTimeSeriesAggregate
35+
*/
36+
public abstract AggregateFunction perTimeSeriesAggregation();
37+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/SubstituteSurrogates.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
1818
import org.elasticsearch.xpack.esql.expression.SurrogateExpression;
1919
import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction;
20-
import org.elasticsearch.xpack.esql.expression.function.aggregate.Rate;
20+
import org.elasticsearch.xpack.esql.expression.function.aggregate.TimeSeriesAggregateFunction;
2121
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
2222
import org.elasticsearch.xpack.esql.plan.logical.Eval;
2323
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
@@ -70,8 +70,7 @@ protected LogicalPlan rule(Aggregate aggregate) {
7070
if (s instanceof AggregateFunction == false) {
7171
// 1. collect all aggregate functions from the expression
7272
var surrogateWithRefs = s.transformUp(AggregateFunction.class, af -> {
73-
// TODO: more generic than this?
74-
if (af instanceof Rate) {
73+
if (af instanceof TimeSeriesAggregateFunction) {
7574
return af;
7675
}
7776
// 2. check if they are already use otherwise add them to the Aggregate with some made-up aliases

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateTimeSeriesAggregate.java

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import org.elasticsearch.xpack.esql.core.util.Holder;
2020
import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction;
2121
import org.elasticsearch.xpack.esql.expression.function.aggregate.FromPartial;
22-
import org.elasticsearch.xpack.esql.expression.function.aggregate.Rate;
22+
import org.elasticsearch.xpack.esql.expression.function.aggregate.TimeSeriesAggregateFunction;
2323
import org.elasticsearch.xpack.esql.expression.function.aggregate.ToPartial;
2424
import org.elasticsearch.xpack.esql.expression.function.aggregate.Values;
2525
import org.elasticsearch.xpack.esql.expression.function.grouping.Bucket;
@@ -34,7 +34,7 @@
3434
import java.util.Map;
3535

3636
/**
37-
* Rate aggregation is special because it must be computed per time series, regardless of the grouping keys.
37+
* Time-series aggregation is special because it must be computed per time series, regardless of the grouping keys.
3838
* The keys must be `_tsid` or a pair of `_tsid` and `time_bucket`. To support user-defined grouping keys,
3939
* we first execute the rate aggregation using the time-series keys, then perform another aggregation with
4040
* the resulting rate using the user-specific keys.
@@ -113,6 +113,17 @@
113113
* | STATS rate(request), $p1=to_partial(min(memory_used)), VALUES(pod) BY _tsid, `bucket(@timestamp, 5m)`
114114
* | STATS sum(`rate(request)`), `min(memory_used)` = from_partial($p1, min($)) BY pod=`VALUES(pod)`, `bucket(@timestamp, 5m)`
115115
* | KEEP `min(memory_used)`, `sum(rate(request))`, pod, `bucket(@timestamp, 5m)`
116+
*
117+
* {agg}_over_time time-series aggregation will be rewritten in the similar way
118+
*
119+
* TS k8s | STATS sum(max_over_time(memory_usage)) BY host, bucket(@timestamp, 1minute)
120+
*
121+
* becomes
122+
*
123+
* TS k8s
124+
* | STATS max_memory_usage = max(memory_usage), host_values=VALUES(host) BY _tsid, time_bucket=bucket(@timestamp, 1minute)
125+
* | STATS sum(max_memory_usage) BY host_values, time_bucket
126+
*
116127
* </pre>
117128
*/
118129
public final class TranslateTimeSeriesAggregate extends OptimizerRules.OptimizerRule<Aggregate> {
@@ -131,20 +142,21 @@ protected LogicalPlan rule(Aggregate aggregate) {
131142
}
132143

133144
LogicalPlan translate(TimeSeriesAggregate aggregate) {
134-
Map<Rate, Alias> rateAggs = new HashMap<>();
145+
Map<AggregateFunction, Alias> timeSeriesAggs = new HashMap<>();
135146
List<NamedExpression> firstPassAggs = new ArrayList<>();
136147
List<NamedExpression> secondPassAggs = new ArrayList<>();
137148
for (NamedExpression agg : aggregate.aggregates()) {
138149
if (agg instanceof Alias alias && alias.child() instanceof AggregateFunction af) {
139150
Holder<Boolean> changed = new Holder<>(Boolean.FALSE);
140-
Expression outerAgg = af.transformDown(Rate.class, rate -> {
151+
Expression outerAgg = af.transformDown(TimeSeriesAggregateFunction.class, tsAgg -> {
141152
changed.set(Boolean.TRUE);
142-
Alias rateAgg = rateAggs.computeIfAbsent(rate, k -> {
143-
Alias newRateAgg = new Alias(rate.source(), agg.name(), rate);
144-
firstPassAggs.add(newRateAgg);
145-
return newRateAgg;
153+
AggregateFunction firstStageFn = tsAgg.perTimeSeriesAggregation();
154+
Alias newAgg = timeSeriesAggs.computeIfAbsent(firstStageFn, k -> {
155+
Alias firstStageAlias = new Alias(tsAgg.source(), agg.name(), firstStageFn);
156+
firstPassAggs.add(firstStageAlias);
157+
return firstStageAlias;
146158
});
147-
return rateAgg.toAttribute();
159+
return newAgg.toAttribute();
148160
});
149161
if (changed.get()) {
150162
secondPassAggs.add(new Alias(alias.source(), alias.name(), outerAgg, agg.id()));
@@ -156,7 +168,7 @@ LogicalPlan translate(TimeSeriesAggregate aggregate) {
156168
}
157169
}
158170
}
159-
if (rateAggs.isEmpty()) {
171+
if (timeSeriesAggs.isEmpty()) {
160172
// no time-series aggregations, run a regular aggregation instead.
161173
return new Aggregate(aggregate.source(), aggregate.child(), aggregate.groupings(), aggregate.aggregates());
162174
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Aggregate.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction;
2929
import org.elasticsearch.xpack.esql.expression.function.aggregate.FilteredExpression;
3030
import org.elasticsearch.xpack.esql.expression.function.aggregate.Rate;
31+
import org.elasticsearch.xpack.esql.expression.function.aggregate.TimeSeriesAggregateFunction;
3132
import org.elasticsearch.xpack.esql.expression.function.grouping.Categorize;
3233
import org.elasticsearch.xpack.esql.expression.function.grouping.GroupingFunction;
3334
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
@@ -223,8 +224,8 @@ public void postAnalysisVerification(Failures failures) {
223224
aggregates.forEach(a -> checkRateAggregates(a, 0, failures));
224225
} else {
225226
forEachExpression(
226-
Rate.class,
227-
r -> failures.add(fail(r, "the rate aggregate[{}] can only be used with the TS command", r.sourceText()))
227+
TimeSeriesAggregateFunction.class,
228+
r -> failures.add(fail(r, "time_series aggregate[{}] can only be used with the TS command", r.sourceText()))
228229
);
229230
}
230231
checkCategorizeGrouping(failures);
@@ -370,7 +371,7 @@ else if (c instanceof GroupingFunction gf) {
370371
if (e instanceof AggregateFunction af) {
371372
af.field().forEachDown(AggregateFunction.class, f -> {
372373
// rate aggregate is allowed to be inside another aggregate
373-
if (f instanceof Rate == false) {
374+
if (f instanceof TimeSeriesAggregateFunction == false) {
374375
failures.add(fail(f, "nested aggregations [{}] not allowed inside other aggregations [{}]", f, af));
375376
}
376377
});

0 commit comments

Comments
 (0)