Skip to content

Commit c3c9689

Browse files
committed
Count and Count distinct functions for tsds
1 parent 8929a64 commit c3c9689

File tree

7 files changed

+240
-2
lines changed

7 files changed

+240
-2
lines changed

x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,3 +295,41 @@ sum_cost:double | cluster:keyword | time_bucket:datetime
295295

296296
;
297297

298+
count_over_time
299+
required_capability: metrics_command
300+
required_capability: count_over_time
301+
302+
TS k8s | STATS count=count(count_over_time(network.cost)) BY cluster, time_bucket = bucket(@timestamp,1minute) | SORT count DESC, time_bucket DESC, cluster | LIMIT 10;
303+
304+
count:long | cluster:keyword | time_bucket:datetime
305+
3 | staging | 2024-05-10T00:22:00.000Z
306+
3 | prod | 2024-05-10T00:20:00.000Z
307+
3 | prod | 2024-05-10T00:19:00.000Z
308+
3 | prod | 2024-05-10T00:18:00.000Z
309+
3 | qa | 2024-05-10T00:18:00.000Z
310+
3 | staging | 2024-05-10T00:18:00.000Z
311+
3 | prod | 2024-05-10T00:17:00.000Z
312+
3 | qa | 2024-05-10T00:17:00.000Z
313+
3 | qa | 2024-05-10T00:15:00.000Z
314+
3 | staging | 2024-05-10T00:15:00.000Z
315+
316+
;
317+
distinct_over_time
318+
required_capability: metrics_command
319+
required_capability: distinct_over_time
320+
321+
TS k8s | STATS distincts=count_distinct(distinct_over_time(network.cost)) BY cluster, time_bucket = bucket(@timestamp,1minute) | SORT distincts DESC, time_bucket DESC, cluster | LIMIT 10;
322+
323+
distincts:long | cluster:keyword | time_bucket:datetime
324+
3 | qa | 2024-05-10T00:17:00.000Z
325+
3 | qa | 2024-05-10T00:15:00.000Z
326+
3 | prod | 2024-05-10T00:09:00.000Z
327+
3 | qa | 2024-05-10T00:09:00.000Z
328+
2 | prod | 2024-05-10T00:22:00.000Z
329+
2 | staging | 2024-05-10T00:22:00.000Z
330+
2 | prod | 2024-05-10T00:20:00.000Z
331+
2 | prod | 2024-05-10T00:18:00.000Z
332+
2 | qa | 2024-05-10T00:18:00.000Z
333+
2 | staging | 2024-05-10T00:18:00.000Z
334+
335+
;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1087,6 +1087,16 @@ public enum Cap {
10871087
*/
10881088
SUM_OVER_TIME(Build.current().isSnapshot()),
10891089

1090+
/**
1091+
* Support count_over_time aggregation that gets evaluated per time-series
1092+
*/
1093+
COUNT_OVER_TIME(Build.current().isSnapshot()),
1094+
1095+
/**
1096+
* Support for distinct_over_time aggregation that gets evaluated per time-series
1097+
*/
1098+
DISTINCT_OVER_TIME(Build.current().isSnapshot()),
1099+
10901100
/**
10911101
* Resolve groupings before resolving references to groupings in the aggregations.
10921102
*/

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.elasticsearch.xpack.esql.expression.function.aggregate.AvgOverTime;
2323
import org.elasticsearch.xpack.esql.expression.function.aggregate.Count;
2424
import org.elasticsearch.xpack.esql.expression.function.aggregate.CountDistinct;
25+
import org.elasticsearch.xpack.esql.expression.function.aggregate.CountOverTime;
26+
import org.elasticsearch.xpack.esql.expression.function.aggregate.DistinctOverTime;
2527
import org.elasticsearch.xpack.esql.expression.function.aggregate.FirstOverTime;
2628
import org.elasticsearch.xpack.esql.expression.function.aggregate.LastOverTime;
2729
import org.elasticsearch.xpack.esql.expression.function.aggregate.Max;
@@ -455,6 +457,8 @@ private static FunctionDefinition[][] snapshotFunctions() {
455457
def(MaxOverTime.class, uni(MaxOverTime::new), "max_over_time"),
456458
def(MinOverTime.class, uni(MinOverTime::new), "min_over_time"),
457459
def(SumOverTime.class, uni(SumOverTime::new), "sum_over_time"),
460+
def(CountOverTime.class, uni(CountOverTime::new), "count_over_time"),
461+
def(DistinctOverTime.class, uni(DistinctOverTime::new), "distinct_over_time"),
458462
def(AvgOverTime.class, uni(AvgOverTime::new), "avg_over_time"),
459463
def(LastOverTime.class, LastOverTime::withUnresolvedTimestamp, "last_over_time"),
460464
def(FirstOverTime.class, FirstOverTime::withUnresolvedTimestamp, "first_over_time"),

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ public static List<NamedWriteableRegistry.Entry> getNamedWriteables() {
3737
LastOverTime.ENTRY,
3838
FirstOverTime.ENTRY,
3939
SumOverTime.ENTRY,
40+
CountOverTime.ENTRY,
41+
DistinctOverTime.ENTRY,
4042
// internal functions
4143
ToPartial.ENTRY,
4244
FromPartial.ENTRY,
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.expression.function.aggregate;
9+
10+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
11+
import org.elasticsearch.common.io.stream.StreamInput;
12+
import org.elasticsearch.xpack.esql.core.expression.Expression;
13+
import org.elasticsearch.xpack.esql.core.expression.Literal;
14+
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
15+
import org.elasticsearch.xpack.esql.core.tree.Source;
16+
import org.elasticsearch.xpack.esql.core.type.DataType;
17+
import org.elasticsearch.xpack.esql.expression.function.FunctionInfo;
18+
import org.elasticsearch.xpack.esql.expression.function.FunctionType;
19+
import org.elasticsearch.xpack.esql.expression.function.Param;
20+
21+
import java.io.IOException;
22+
import java.util.List;
23+
24+
import static java.util.Collections.emptyList;
25+
26+
/**
27+
* Similar to {@link Count}, but it is used to calculate the count of values over a time series from the given field.
28+
*/
29+
public class CountOverTime extends TimeSeriesAggregateFunction {
30+
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
31+
Expression.class,
32+
"CountOverTime",
33+
CountOverTime::new
34+
);
35+
36+
@FunctionInfo(returnType = { "integer", "long" }, description = "The count over time value of a field.", type = FunctionType.AGGREGATE)
37+
public CountOverTime(
38+
Source source,
39+
@Param(name = "field", type = { "aggregate_metric_double", "double", "integer", "long" }) Expression field
40+
) {
41+
this(source, field, Literal.TRUE);
42+
}
43+
44+
public CountOverTime(Source source, Expression field, Expression filter) {
45+
super(source, field, filter, emptyList());
46+
}
47+
48+
private CountOverTime(StreamInput in) throws IOException {
49+
super(in);
50+
}
51+
52+
@Override
53+
public String getWriteableName() {
54+
return ENTRY.name;
55+
}
56+
57+
@Override
58+
public CountOverTime withFilter(Expression filter) {
59+
return new CountOverTime(source(), field(), filter);
60+
}
61+
62+
@Override
63+
protected NodeInfo<CountOverTime> info() {
64+
return NodeInfo.create(this, CountOverTime::new, field(), filter());
65+
}
66+
67+
@Override
68+
public CountOverTime replaceChildren(List<Expression> newChildren) {
69+
return new CountOverTime(source(), newChildren.get(0), newChildren.get(1));
70+
}
71+
72+
@Override
73+
protected TypeResolution resolveType() {
74+
return perTimeSeriesAggregation().resolveType();
75+
}
76+
77+
@Override
78+
public DataType dataType() {
79+
return perTimeSeriesAggregation().dataType();
80+
}
81+
82+
@Override
83+
public Count perTimeSeriesAggregation() {
84+
return new Count(source(), field(), filter());
85+
}
86+
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.expression.function.aggregate;
9+
10+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
11+
import org.elasticsearch.common.io.stream.StreamInput;
12+
import org.elasticsearch.xpack.esql.core.expression.Expression;
13+
import org.elasticsearch.xpack.esql.core.expression.Literal;
14+
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
15+
import org.elasticsearch.xpack.esql.core.tree.Source;
16+
import org.elasticsearch.xpack.esql.core.type.DataType;
17+
import org.elasticsearch.xpack.esql.expression.function.FunctionInfo;
18+
import org.elasticsearch.xpack.esql.expression.function.FunctionType;
19+
import org.elasticsearch.xpack.esql.expression.function.Param;
20+
21+
import java.io.IOException;
22+
import java.util.List;
23+
24+
import static java.util.Collections.emptyList;
25+
26+
/**
27+
* Similar to {@link CountDistinct}, but it is used to calculate the distinct count of values over a time series from the given field.
28+
*/
29+
public class DistinctOverTime extends TimeSeriesAggregateFunction {
30+
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
31+
Expression.class,
32+
"DistinctOverTime",
33+
DistinctOverTime::new
34+
);
35+
36+
@FunctionInfo(
37+
returnType = { "integer", "long" },
38+
description = "The count of distinct values over time for a field.",
39+
type = FunctionType.AGGREGATE
40+
)
41+
public DistinctOverTime(
42+
Source source,
43+
@Param(name = "field", type = { "aggregate_metric_double", "double", "integer", "long", "float" }) Expression field
44+
) {
45+
this(source, field, Literal.TRUE);
46+
}
47+
48+
public DistinctOverTime(Source source, Expression field, Expression filter) {
49+
super(source, field, filter, emptyList());
50+
}
51+
52+
private DistinctOverTime(StreamInput in) throws IOException {
53+
super(in);
54+
}
55+
56+
@Override
57+
public String getWriteableName() {
58+
return ENTRY.name;
59+
}
60+
61+
@Override
62+
public DistinctOverTime withFilter(Expression filter) {
63+
return new DistinctOverTime(source(), field(), filter);
64+
}
65+
66+
@Override
67+
protected NodeInfo<DistinctOverTime> info() {
68+
return NodeInfo.create(this, DistinctOverTime::new, field(), filter());
69+
}
70+
71+
@Override
72+
public DistinctOverTime replaceChildren(List<Expression> newChildren) {
73+
return new DistinctOverTime(source(), newChildren.get(0), newChildren.get(1));
74+
}
75+
76+
@Override
77+
protected TypeResolution resolveType() {
78+
return perTimeSeriesAggregation().resolveType();
79+
}
80+
81+
@Override
82+
public DataType dataType() {
83+
return perTimeSeriesAggregation().dataType();
84+
}
85+
86+
@Override
87+
public CountDistinct perTimeSeriesAggregation() {
88+
// TODO(pabloem): Do we need to take in a precision parameter here?
89+
return new CountDistinct(source(), field(), filter(), null);
90+
}
91+
}

x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,14 @@ setup:
3333
path: /_query
3434
parameters: []
3535
# A snapshot function was removed in match_function_options, it can't work on mixed cluster tests otherwise.
36-
capabilities: [ snapshot_test_for_telemetry, fn_byte_length, match_function_options, first_over_time, sum_over_time]
36+
capabilities:
37+
- snapshot_test_for_telemetry
38+
- fn_byte_length
39+
- match_function_options
40+
- first_over_time
41+
- sum_over_time
42+
- count_over_time
43+
- distinct_over_time
3744
reason: "Test that should only be executed on snapshot versions"
3845

3946
- do: {xpack.usage: {}}
@@ -123,7 +130,7 @@ setup:
123130
- match: {esql.functions.coalesce: $functions_coalesce}
124131
- gt: {esql.functions.categorize: $functions_categorize}
125132
# Testing for the entire function set isn't feasible, so we just check that we return the correct count as an approximation.
126-
- length: {esql.functions: 144} # check the "sister" test below for a likely update to the same esql.functions length check
133+
- length: {esql.functions: 146} # check the "sister" test below for a likely update to the same esql.functions length check
127134

128135
---
129136
"Basic ESQL usage output (telemetry) non-snapshot version":

0 commit comments

Comments
 (0)