Skip to content

Commit 2aecdf5

Browse files
pabloemmridula-s109
authored andcommitted
Count and Count distinct functions for tsds (#128530)
* Count and Count distinct functions for tsds * issues with test * fixup of tests * fixup of output types * fix naming and helper fns * fixup * fix other test * ugh mistake
1 parent 9663e45 commit 2aecdf5

File tree

8 files changed

+288
-10
lines changed

8 files changed

+288
-10
lines changed

x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,3 +295,44 @@ sum_cost:double | cluster:keyword | time_bucket:datetime
295295

296296
;
297297

298+
count_over_time
299+
required_capability: metrics_command
300+
required_capability: count_over_time
301+
302+
TS k8s
303+
| STATS count=count(count_over_time(network.cost)) BY cluster, time_bucket = bucket(@timestamp,1minute)
304+
| SORT count DESC, time_bucket DESC, cluster
305+
| LIMIT 10;
306+
307+
count:long | cluster:keyword | time_bucket:datetime
308+
3 | staging | 2024-05-10T00:22:00.000Z
309+
3 | prod | 2024-05-10T00:20:00.000Z
310+
3 | prod | 2024-05-10T00:19:00.000Z
311+
3 | prod | 2024-05-10T00:18:00.000Z
312+
3 | qa | 2024-05-10T00:18:00.000Z
313+
3 | staging | 2024-05-10T00:18:00.000Z
314+
3 | prod | 2024-05-10T00:17:00.000Z
315+
3 | qa | 2024-05-10T00:17:00.000Z
316+
3 | qa | 2024-05-10T00:15:00.000Z
317+
3 | staging | 2024-05-10T00:15:00.000Z
318+
319+
;
320+
count_distinct_over_time
321+
required_capability: metrics_command
322+
required_capability: distinct_over_time
323+
324+
TS k8s | STATS distincts=count_distinct(count_distinct_over_time(network.cost)), distincts_imprecise=count_distinct(count_distinct_over_time(network.cost, 100)) BY cluster, time_bucket = bucket(@timestamp,1minute) | SORT distincts DESC, time_bucket DESC, cluster | LIMIT 10;
325+
326+
distincts:long | distincts_imprecise:long | cluster:keyword | time_bucket:datetime
327+
3 |3 | qa | 2024-05-10T00:17:00.000Z
328+
3 |3 | qa | 2024-05-10T00:15:00.000Z
329+
3 |3 | prod | 2024-05-10T00:09:00.000Z
330+
3 |3 | qa | 2024-05-10T00:09:00.000Z
331+
2 |2 | prod | 2024-05-10T00:22:00.000Z
332+
2 |2 | staging | 2024-05-10T00:22:00.000Z
333+
2 |2 | prod | 2024-05-10T00:20:00.000Z
334+
2 |2 | prod | 2024-05-10T00:18:00.000Z
335+
2 |2 | qa | 2024-05-10T00:18:00.000Z
336+
2 |2 | staging | 2024-05-10T00:18:00.000Z
337+
338+
;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1088,6 +1088,16 @@ public enum Cap {
10881088
*/
10891089
SUM_OVER_TIME(Build.current().isSnapshot()),
10901090

1091+
/**
1092+
* Support count_over_time aggregation that gets evaluated per time-series
1093+
*/
1094+
COUNT_OVER_TIME(Build.current().isSnapshot()),
1095+
1096+
/**
1097+
* Support for count_distinct_over_time aggregation that gets evaluated per time-series
1098+
*/
1099+
COUNT_DISTINCT_OVER_TIME(Build.current().isSnapshot()),
1100+
10911101
/**
10921102
* Resolve groupings before resolving references to groupings in the aggregations.
10931103
*/

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
package org.elasticsearch.xpack.esql.expression.function;
99

1010
import org.elasticsearch.Build;
11+
import org.elasticsearch.common.Strings;
1112
import org.elasticsearch.common.util.CollectionUtils;
1213
import org.elasticsearch.common.util.FeatureFlag;
1314
import org.elasticsearch.xpack.esql.core.QlIllegalArgumentException;
@@ -22,6 +23,8 @@
2223
import org.elasticsearch.xpack.esql.expression.function.aggregate.AvgOverTime;
2324
import org.elasticsearch.xpack.esql.expression.function.aggregate.Count;
2425
import org.elasticsearch.xpack.esql.expression.function.aggregate.CountDistinct;
26+
import org.elasticsearch.xpack.esql.expression.function.aggregate.CountDistinctOverTime;
27+
import org.elasticsearch.xpack.esql.expression.function.aggregate.CountOverTime;
2528
import org.elasticsearch.xpack.esql.expression.function.aggregate.FirstOverTime;
2629
import org.elasticsearch.xpack.esql.expression.function.aggregate.LastOverTime;
2730
import org.elasticsearch.xpack.esql.expression.function.aggregate.Max;
@@ -455,6 +458,8 @@ private static FunctionDefinition[][] snapshotFunctions() {
455458
def(MaxOverTime.class, uni(MaxOverTime::new), "max_over_time"),
456459
def(MinOverTime.class, uni(MinOverTime::new), "min_over_time"),
457460
def(SumOverTime.class, uni(SumOverTime::new), "sum_over_time"),
461+
def(CountOverTime.class, uni(CountOverTime::new), "count_over_time"),
462+
def(CountDistinctOverTime.class, bi(CountDistinctOverTime::new), "count_distinct_over_time"),
458463
def(AvgOverTime.class, uni(AvgOverTime::new), "avg_over_time"),
459464
def(LastOverTime.class, LastOverTime::withUnresolvedTimestamp, "last_over_time"),
460465
def(FirstOverTime.class, FirstOverTime::withUnresolvedTimestamp, "first_over_time"),
@@ -914,9 +919,13 @@ public static <T extends Function> FunctionDefinition def(Class<T> function, Bin
914919
FunctionBuilder builder = (source, children, cfg) -> {
915920
boolean isBinaryOptionalParamFunction = OptionalArgument.class.isAssignableFrom(function);
916921
if (isBinaryOptionalParamFunction && (children.size() > 2 || children.size() < 1)) {
917-
throw new QlIllegalArgumentException("expects one or two arguments");
922+
throw new QlIllegalArgumentException(
923+
Strings.format("function %s expects one or two arguments but it received %d", Arrays.toString(names), children.size())
924+
);
918925
} else if (isBinaryOptionalParamFunction == false && children.size() != 2) {
919-
throw new QlIllegalArgumentException("expects exactly two arguments");
926+
throw new QlIllegalArgumentException(
927+
Strings.format("function %s expects exactly two arguments, it received %d", Arrays.toString(names), children.size())
928+
);
920929
}
921930

922931
return ctorRef.build(source, children.get(0), children.size() == 2 ? children.get(1) : null);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ public static List<NamedWriteableRegistry.Entry> getNamedWriteables() {
3737
LastOverTime.ENTRY,
3838
FirstOverTime.ENTRY,
3939
SumOverTime.ENTRY,
40+
CountOverTime.ENTRY,
41+
CountDistinctOverTime.ENTRY,
4042
// internal functions
4143
ToPartial.ENTRY,
4244
FromPartial.ENTRY,
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.expression.function.aggregate;
9+
10+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
11+
import org.elasticsearch.common.io.stream.StreamInput;
12+
import org.elasticsearch.xpack.esql.core.expression.Expression;
13+
import org.elasticsearch.xpack.esql.core.expression.Literal;
14+
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
15+
import org.elasticsearch.xpack.esql.core.tree.Source;
16+
import org.elasticsearch.xpack.esql.core.type.DataType;
17+
import org.elasticsearch.xpack.esql.expression.function.FunctionInfo;
18+
import org.elasticsearch.xpack.esql.expression.function.FunctionType;
19+
import org.elasticsearch.xpack.esql.expression.function.OptionalArgument;
20+
import org.elasticsearch.xpack.esql.expression.function.Param;
21+
22+
import java.io.IOException;
23+
import java.util.List;
24+
25+
/**
26+
* Similar to {@link CountDistinct}, but it is used to calculate the distinct count of values over a time series from the given field.
27+
*/
28+
public class CountDistinctOverTime extends TimeSeriesAggregateFunction implements OptionalArgument {
29+
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
30+
Expression.class,
31+
"DistinctOverTime",
32+
CountDistinctOverTime::new
33+
);
34+
35+
private final Expression precision;
36+
37+
@FunctionInfo(
38+
returnType = { "long" },
39+
description = "The count of distinct values over time for a field.",
40+
type = FunctionType.AGGREGATE
41+
)
42+
public CountDistinctOverTime(
43+
Source source,
44+
@Param(
45+
name = "field",
46+
type = { "boolean", "date", "date_nanos", "double", "integer", "ip", "keyword", "long", "text", "version" }
47+
) Expression field,
48+
@Param(
49+
optional = true,
50+
name = "precision",
51+
type = { "integer", "long", "unsigned_long" },
52+
description = "Precision threshold. Refer to <<esql-agg-count-distinct-approximate>>. "
53+
+ "The maximum supported value is 40000. Thresholds above this number will have the "
54+
+ "same effect as a threshold of 40000. The default value is 3000."
55+
) Expression precision
56+
) {
57+
this(source, field, Literal.TRUE, precision);
58+
}
59+
60+
public CountDistinctOverTime(Source source, Expression field, Expression filter, Expression precision) {
61+
super(source, field, filter, precision == null ? List.of() : List.of(precision));
62+
this.precision = precision;
63+
}
64+
65+
private CountDistinctOverTime(StreamInput in) throws IOException {
66+
super(in);
67+
this.precision = parameters().isEmpty() ? null : parameters().getFirst();
68+
}
69+
70+
@Override
71+
public String getWriteableName() {
72+
return ENTRY.name;
73+
}
74+
75+
@Override
76+
public CountDistinctOverTime withFilter(Expression filter) {
77+
return new CountDistinctOverTime(source(), field(), filter, precision);
78+
}
79+
80+
@Override
81+
protected NodeInfo<CountDistinctOverTime> info() {
82+
return NodeInfo.create(this, CountDistinctOverTime::new, field(), filter(), precision);
83+
}
84+
85+
@Override
86+
public CountDistinctOverTime replaceChildren(List<Expression> newChildren) {
87+
if (newChildren.size() < 3) {
88+
return new CountDistinctOverTime(source(), newChildren.get(0), newChildren.get(1), null);
89+
}
90+
return new CountDistinctOverTime(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2));
91+
}
92+
93+
@Override
94+
protected TypeResolution resolveType() {
95+
return perTimeSeriesAggregation().resolveType();
96+
}
97+
98+
@Override
99+
public DataType dataType() {
100+
return perTimeSeriesAggregation().dataType();
101+
}
102+
103+
@Override
104+
public CountDistinct perTimeSeriesAggregation() {
105+
return new CountDistinct(source(), field(), filter(), precision);
106+
}
107+
}
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.expression.function.aggregate;
9+
10+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
11+
import org.elasticsearch.common.io.stream.StreamInput;
12+
import org.elasticsearch.xpack.esql.core.expression.Expression;
13+
import org.elasticsearch.xpack.esql.core.expression.Literal;
14+
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
15+
import org.elasticsearch.xpack.esql.core.tree.Source;
16+
import org.elasticsearch.xpack.esql.core.type.DataType;
17+
import org.elasticsearch.xpack.esql.expression.function.FunctionInfo;
18+
import org.elasticsearch.xpack.esql.expression.function.FunctionType;
19+
import org.elasticsearch.xpack.esql.expression.function.Param;
20+
21+
import java.io.IOException;
22+
import java.util.List;
23+
24+
import static java.util.Collections.emptyList;
25+
26+
/**
27+
* Similar to {@link Count}, but it is used to calculate the count of values over a time series from the given field.
28+
*/
29+
public class CountOverTime extends TimeSeriesAggregateFunction {
30+
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
31+
Expression.class,
32+
"CountOverTime",
33+
CountOverTime::new
34+
);
35+
36+
@FunctionInfo(returnType = { "long" }, description = "The count over time value of a field.", type = FunctionType.AGGREGATE)
37+
public CountOverTime(
38+
Source source,
39+
@Param(
40+
name = "field",
41+
type = {
42+
"aggregate_metric_double",
43+
"boolean",
44+
"cartesian_point",
45+
"date",
46+
"double",
47+
"geo_point",
48+
"integer",
49+
"ip",
50+
"keyword",
51+
"long",
52+
"text",
53+
"unsigned_long",
54+
"version" }
55+
) Expression field
56+
) {
57+
this(source, field, Literal.TRUE);
58+
}
59+
60+
public CountOverTime(Source source, Expression field, Expression filter) {
61+
super(source, field, filter, emptyList());
62+
}
63+
64+
private CountOverTime(StreamInput in) throws IOException {
65+
super(in);
66+
}
67+
68+
@Override
69+
public String getWriteableName() {
70+
return ENTRY.name;
71+
}
72+
73+
@Override
74+
public CountOverTime withFilter(Expression filter) {
75+
return new CountOverTime(source(), field(), filter);
76+
}
77+
78+
@Override
79+
protected NodeInfo<CountOverTime> info() {
80+
return NodeInfo.create(this, CountOverTime::new, field(), filter());
81+
}
82+
83+
@Override
84+
public CountOverTime replaceChildren(List<Expression> newChildren) {
85+
return new CountOverTime(source(), newChildren.get(0), newChildren.get(1));
86+
}
87+
88+
@Override
89+
protected TypeResolution resolveType() {
90+
return perTimeSeriesAggregation().resolveType();
91+
}
92+
93+
@Override
94+
public DataType dataType() {
95+
return perTimeSeriesAggregation().dataType();
96+
}
97+
98+
@Override
99+
public Count perTimeSeriesAggregation() {
100+
return new Count(source(), field(), filter());
101+
}
102+
}

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistryTests.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import static org.elasticsearch.xpack.esql.ConfigurationTestUtils.randomConfiguration;
3030
import static org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry.def;
3131
import static org.elasticsearch.xpack.esql.expression.function.FunctionResolutionStrategy.DEFAULT;
32-
import static org.hamcrest.Matchers.endsWith;
32+
import static org.hamcrest.Matchers.containsString;
3333
import static org.hamcrest.Matchers.is;
3434
import static org.mockito.Mockito.mock;
3535

@@ -54,11 +54,11 @@ public void testBinaryFunction() {
5454

5555
// No children aren't supported
5656
ParsingException e = expectThrows(ParsingException.class, () -> uf(DEFAULT).buildResolved(randomConfiguration(), def));
57-
assertThat(e.getMessage(), endsWith("expects exactly two arguments"));
57+
assertThat(e.getMessage(), containsString("expects exactly two arguments"));
5858

5959
// One child isn't supported
6060
e = expectThrows(ParsingException.class, () -> uf(DEFAULT, mock(Expression.class)).buildResolved(randomConfiguration(), def));
61-
assertThat(e.getMessage(), endsWith("expects exactly two arguments"));
61+
assertThat(e.getMessage(), containsString("expects exactly two arguments"));
6262

6363
// Many children aren't supported
6464
e = expectThrows(
@@ -68,7 +68,7 @@ public void testBinaryFunction() {
6868
def
6969
)
7070
);
71-
assertThat(e.getMessage(), endsWith("expects exactly two arguments"));
71+
assertThat(e.getMessage(), containsString("expects exactly two arguments"));
7272
}
7373

7474
public void testAliasNameIsTheSameAsAFunctionName() {
@@ -138,14 +138,14 @@ public void testUnaryFunction() {
138138

139139
// No children aren't supported
140140
ParsingException e = expectThrows(ParsingException.class, () -> uf(DEFAULT).buildResolved(randomConfiguration(), def));
141-
assertThat(e.getMessage(), endsWith("expects exactly one argument"));
141+
assertThat(e.getMessage(), containsString("expects exactly one argument"));
142142

143143
// Multiple children aren't supported
144144
e = expectThrows(
145145
ParsingException.class,
146146
() -> uf(DEFAULT, mock(Expression.class), mock(Expression.class)).buildResolved(randomConfiguration(), def)
147147
);
148-
assertThat(e.getMessage(), endsWith("expects exactly one argument"));
148+
assertThat(e.getMessage(), containsString("expects exactly one argument"));
149149
}
150150

151151
public void testConfigurationOptionalFunction() {

x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,14 @@ setup:
3333
path: /_query
3434
parameters: []
3535
# A snapshot function was removed in match_function_options, it can't work on mixed cluster tests otherwise.
36-
capabilities: [ snapshot_test_for_telemetry, fn_byte_length, match_function_options, first_over_time, sum_over_time]
36+
capabilities:
37+
- snapshot_test_for_telemetry
38+
- fn_byte_length
39+
- match_function_options
40+
- first_over_time
41+
- sum_over_time
42+
- count_over_time
43+
- distinct_over_time
3744
reason: "Test that should only be executed on snapshot versions"
3845

3946
- do: {xpack.usage: {}}
@@ -123,7 +130,7 @@ setup:
123130
- match: {esql.functions.coalesce: $functions_coalesce}
124131
- gt: {esql.functions.categorize: $functions_categorize}
125132
# Testing for the entire function set isn't feasible, so we just check that we return the correct count as an approximation.
126-
- length: {esql.functions: 144} # check the "sister" test below for a likely update to the same esql.functions length check
133+
- length: {esql.functions: 146} # check the "sister" test below for a likely update to the same esql.functions length check
127134

128135
---
129136
"Basic ESQL usage output (telemetry) non-snapshot version":

0 commit comments

Comments
 (0)