Skip to content

Commit 570c6e9

Browse files
dnhatnjfreden
authored andcommitted
Add min_over_time (elastic#127649)
This change adds support for `min_over_time` in time-series data streams. Similar to `max_over_time`, `min_over_time` is translated to compute the min per time-series first. For example: ``` TS my-metrics | STATS SUM(min_over_time(memory_usage)) BY cluster, bucket(@timestamp, 1 minute) ``` is translated into: ``` TS my-metrics | STATS min_memory_usage=min(memory_usage), cluster=VALUES(cluster) BY _tsid, ts=bucket(@timestamp, 1 minute) | STATS sum(min_memory_usage) BY cluster, ts ```
1 parent d628e0e commit 570c6e9

File tree

6 files changed

+134
-3
lines changed

6 files changed

+134
-3
lines changed

x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,24 @@ cost:double | cluster:keyword | time_bucket:datetime
184184
22.75 | qa | 2024-05-10T00:08:00.000Z
185185
;
186186

187+
min_over_time
188+
required_capability: metrics_command
189+
required_capability: min_over_time
190+
TS k8s | STATS cost=sum(min_over_time(network.cost)) BY cluster, time_bucket = bucket(@timestamp,1minute) | SORT cost DESC, time_bucket DESC, cluster | LIMIT 10;
191+
192+
cost:double | cluster:keyword | time_bucket:datetime
193+
29.0 | prod | 2024-05-10T00:19:00.000Z
194+
27.625 | qa | 2024-05-10T00:06:00.000Z
195+
24.25 | qa | 2024-05-10T00:09:00.000Z
196+
23.125 | staging | 2024-05-10T00:08:00.000Z
197+
22.5 | prod | 2024-05-10T00:13:00.000Z
198+
18.625 | qa | 2024-05-10T00:04:00.000Z
199+
18.0 | qa | 2024-05-10T00:11:00.000Z
200+
17.75 | qa | 2024-05-10T00:01:00.000Z
201+
17.125 | qa | 2024-05-10T00:22:00.000Z
202+
16.875 | qa | 2024-05-10T00:20:00.000Z
203+
;
204+
187205
max_of_avg_over_time
188206
required_capability: metrics_command
189207
required_capability: avg_over_time

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1054,7 +1054,12 @@ public enum Cap {
10541054
/**
10551055
* Guards a bug fix matching {@code TO_LOWER(f) == ""}.
10561056
*/
1057-
TO_LOWER_EMPTY_STRING;
1057+
TO_LOWER_EMPTY_STRING,
1058+
1059+
/**
1060+
* Support min_over_time aggregation that gets evaluated per time-series
1061+
*/
1062+
MIN_OVER_TIME(Build.current().isSnapshot());
10581063

10591064
private final boolean enabled;
10601065

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.xpack.esql.expression.function.aggregate.Median;
2929
import org.elasticsearch.xpack.esql.expression.function.aggregate.MedianAbsoluteDeviation;
3030
import org.elasticsearch.xpack.esql.expression.function.aggregate.Min;
31+
import org.elasticsearch.xpack.esql.expression.function.aggregate.MinOverTime;
3132
import org.elasticsearch.xpack.esql.expression.function.aggregate.Percentile;
3233
import org.elasticsearch.xpack.esql.expression.function.aggregate.Rate;
3334
import org.elasticsearch.xpack.esql.expression.function.aggregate.SpatialCentroid;
@@ -444,6 +445,7 @@ private static FunctionDefinition[][] snapshotFunctions() {
444445
def(Delay.class, Delay::new, "delay"),
445446
def(Rate.class, Rate::withUnresolvedTimestamp, "rate"),
446447
def(MaxOverTime.class, uni(MaxOverTime::new), "max_over_time"),
448+
def(MinOverTime.class, uni(MinOverTime::new), "min_over_time"),
447449
def(AvgOverTime.class, uni(AvgOverTime::new), "avg_over_time"),
448450
def(LastOverTime.class, LastOverTime::withUnresolvedTimestamp, "last_over_time"),
449451
def(Term.class, bi(Term::new), "term") } };

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public static List<NamedWriteableRegistry.Entry> getNamedWriteables() {
3030
Sum.ENTRY,
3131
Top.ENTRY,
3232
Values.ENTRY,
33+
MinOverTime.ENTRY,
3334
MaxOverTime.ENTRY,
3435
AvgOverTime.ENTRY,
3536
LastOverTime.ENTRY,
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.expression.function.aggregate;
9+
10+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
11+
import org.elasticsearch.common.io.stream.StreamInput;
12+
import org.elasticsearch.xpack.esql.core.expression.Expression;
13+
import org.elasticsearch.xpack.esql.core.expression.Literal;
14+
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
15+
import org.elasticsearch.xpack.esql.core.tree.Source;
16+
import org.elasticsearch.xpack.esql.core.type.DataType;
17+
import org.elasticsearch.xpack.esql.expression.function.FunctionInfo;
18+
import org.elasticsearch.xpack.esql.expression.function.FunctionType;
19+
import org.elasticsearch.xpack.esql.expression.function.Param;
20+
21+
import java.io.IOException;
22+
import java.util.List;
23+
24+
import static java.util.Collections.emptyList;
25+
26+
/**
27+
* Similar to {@link Min}, but it is used to calculate the minimum value over a time series of values from the given field.
28+
*/
29+
public class MinOverTime extends TimeSeriesAggregateFunction {
30+
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
31+
Expression.class,
32+
"MinOverTime",
33+
MinOverTime::new
34+
);
35+
36+
@FunctionInfo(
37+
returnType = { "boolean", "double", "integer", "long", "date", "date_nanos", "ip", "keyword", "long", "version" },
38+
description = "The minimum over time value of a field.",
39+
type = FunctionType.AGGREGATE
40+
)
41+
public MinOverTime(
42+
Source source,
43+
@Param(
44+
name = "field",
45+
type = {
46+
"aggregate_metric_double",
47+
"boolean",
48+
"double",
49+
"integer",
50+
"long",
51+
"date",
52+
"date_nanos",
53+
"ip",
54+
"keyword",
55+
"text",
56+
"long",
57+
"version" }
58+
) Expression field
59+
) {
60+
this(source, field, Literal.TRUE);
61+
}
62+
63+
public MinOverTime(Source source, Expression field, Expression filter) {
64+
super(source, field, filter, emptyList());
65+
}
66+
67+
private MinOverTime(StreamInput in) throws IOException {
68+
super(in);
69+
}
70+
71+
@Override
72+
public String getWriteableName() {
73+
return ENTRY.name;
74+
}
75+
76+
@Override
77+
public MinOverTime withFilter(Expression filter) {
78+
return new MinOverTime(source(), field(), filter);
79+
}
80+
81+
@Override
82+
protected NodeInfo<MinOverTime> info() {
83+
return NodeInfo.create(this, MinOverTime::new, field(), filter());
84+
}
85+
86+
@Override
87+
public MinOverTime replaceChildren(List<Expression> newChildren) {
88+
return new MinOverTime(source(), newChildren.get(0), newChildren.get(1));
89+
}
90+
91+
@Override
92+
protected TypeResolution resolveType() {
93+
return perTimeSeriesAggregation().resolveType();
94+
}
95+
96+
@Override
97+
public DataType dataType() {
98+
return perTimeSeriesAggregation().dataType();
99+
}
100+
101+
@Override
102+
public Min perTimeSeriesAggregation() {
103+
return new Min(source(), field(), filter());
104+
}
105+
}

x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ setup:
3333
path: /_query
3434
parameters: []
3535
# A snapshot function was removed in match_function_options, it can't work on mixed cluster tests otherwise.
36-
capabilities: [ snapshot_test_for_telemetry, fn_byte_length, match_function_options, last_over_time]
36+
capabilities: [ snapshot_test_for_telemetry, fn_byte_length, match_function_options, min_over_time ]
3737
reason: "Test that should only be executed on snapshot versions"
3838

3939
- do: {xpack.usage: {}}
@@ -123,7 +123,7 @@ setup:
123123
- match: {esql.functions.coalesce: $functions_coalesce}
124124
- gt: {esql.functions.categorize: $functions_categorize}
125125
# Testing for the entire function set isn't feasbile, so we just check that we return the correct count as an approximation.
126-
- length: {esql.functions: 138} # check the "sister" test below for a likely update to the same esql.functions length check
126+
- length: {esql.functions: 139} # check the "sister" test below for a likely update to the same esql.functions length check
127127

128128
---
129129
"Basic ESQL usage output (telemetry) non-snapshot version":

0 commit comments

Comments
 (0)