diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec index 69aa78e90e5a0..4b43d1da782b8 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec @@ -295,3 +295,44 @@ sum_cost:double | cluster:keyword | time_bucket:datetime ; +count_over_time +required_capability: metrics_command +required_capability: count_over_time + +TS k8s +| STATS count=count(count_over_time(network.cost)) BY cluster, time_bucket = bucket(@timestamp,1minute) +| SORT count DESC, time_bucket DESC, cluster +| LIMIT 10; + +count:long | cluster:keyword | time_bucket:datetime +3 | staging | 2024-05-10T00:22:00.000Z +3 | prod | 2024-05-10T00:20:00.000Z +3 | prod | 2024-05-10T00:19:00.000Z +3 | prod | 2024-05-10T00:18:00.000Z +3 | qa | 2024-05-10T00:18:00.000Z +3 | staging | 2024-05-10T00:18:00.000Z +3 | prod | 2024-05-10T00:17:00.000Z +3 | qa | 2024-05-10T00:17:00.000Z +3 | qa | 2024-05-10T00:15:00.000Z +3 | staging | 2024-05-10T00:15:00.000Z + +; +count_distinct_over_time +required_capability: metrics_command +required_capability: distinct_over_time + +TS k8s | STATS distincts=count_distinct(count_distinct_over_time(network.cost)), distincts_imprecise=count_distinct(count_distinct_over_time(network.cost, 100)) BY cluster, time_bucket = bucket(@timestamp,1minute) | SORT distincts DESC, time_bucket DESC, cluster | LIMIT 10; + +distincts:long | distincts_imprecise:long | cluster:keyword | time_bucket:datetime +3 |3 | qa | 2024-05-10T00:17:00.000Z +3 |3 | qa | 2024-05-10T00:15:00.000Z +3 |3 | prod | 2024-05-10T00:09:00.000Z +3 |3 | qa | 2024-05-10T00:09:00.000Z +2 |2 | prod | 2024-05-10T00:22:00.000Z +2 |2 | staging | 2024-05-10T00:22:00.000Z +2 |2 | prod | 2024-05-10T00:20:00.000Z +2 |2 | prod | 2024-05-10T00:18:00.000Z +2 |2 | qa | 2024-05-10T00:18:00.000Z +2 |2 | staging | 2024-05-10T00:18:00.000Z + +; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index d628b0a601c13..565a6519a831e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -1088,6 +1088,16 @@ public enum Cap { */ SUM_OVER_TIME(Build.current().isSnapshot()), + /** + * Support count_over_time aggregation that gets evaluated per time-series + */ + COUNT_OVER_TIME(Build.current().isSnapshot()), + + /** + * Support for count_distinct_over_time aggregation that gets evaluated per time-series + */ + COUNT_DISTINCT_OVER_TIME(Build.current().isSnapshot()), + /** * Resolve groupings before resolving references to groupings in the aggregations. */ diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java index 011a555ad6ded..695620982b2c0 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.esql.expression.function; import org.elasticsearch.Build; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.common.util.FeatureFlag; import org.elasticsearch.xpack.esql.core.QlIllegalArgumentException; @@ -22,6 +23,8 @@ import org.elasticsearch.xpack.esql.expression.function.aggregate.AvgOverTime; import org.elasticsearch.xpack.esql.expression.function.aggregate.Count; import org.elasticsearch.xpack.esql.expression.function.aggregate.CountDistinct; +import org.elasticsearch.xpack.esql.expression.function.aggregate.CountDistinctOverTime; +import org.elasticsearch.xpack.esql.expression.function.aggregate.CountOverTime; import org.elasticsearch.xpack.esql.expression.function.aggregate.FirstOverTime; import org.elasticsearch.xpack.esql.expression.function.aggregate.LastOverTime; import org.elasticsearch.xpack.esql.expression.function.aggregate.Max; @@ -455,6 +458,8 @@ private static FunctionDefinition[][] snapshotFunctions() { def(MaxOverTime.class, uni(MaxOverTime::new), "max_over_time"), def(MinOverTime.class, uni(MinOverTime::new), "min_over_time"), def(SumOverTime.class, uni(SumOverTime::new), "sum_over_time"), + def(CountOverTime.class, uni(CountOverTime::new), "count_over_time"), + def(CountDistinctOverTime.class, bi(CountDistinctOverTime::new), "count_distinct_over_time"), def(AvgOverTime.class, uni(AvgOverTime::new), "avg_over_time"), def(LastOverTime.class, LastOverTime::withUnresolvedTimestamp, "last_over_time"), def(FirstOverTime.class, FirstOverTime::withUnresolvedTimestamp, "first_over_time"), @@ -914,9 +919,13 @@ public static FunctionDefinition def(Class function, Bin FunctionBuilder builder = (source, children, cfg) -> { boolean isBinaryOptionalParamFunction = OptionalArgument.class.isAssignableFrom(function); if (isBinaryOptionalParamFunction && (children.size() > 2 || children.size() < 1)) { - throw new QlIllegalArgumentException("expects one or two arguments"); + throw new QlIllegalArgumentException( + Strings.format("function %s expects one or two arguments but it received %d", Arrays.toString(names), children.size()) + ); } else if (isBinaryOptionalParamFunction == false && children.size() != 2) { - throw new QlIllegalArgumentException("expects exactly two arguments"); + throw new QlIllegalArgumentException( + Strings.format("function %s expects exactly two arguments, it received %d", Arrays.toString(names), children.size()) + ); } return ctorRef.build(source, children.get(0), children.size() == 2 ? children.get(1) : null); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java index cf2722c0be664..7dfacafbc2c53 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java @@ -37,6 +37,8 @@ public static List getNamedWriteables() { LastOverTime.ENTRY, FirstOverTime.ENTRY, SumOverTime.ENTRY, + CountOverTime.ENTRY, + CountDistinctOverTime.ENTRY, // internal functions ToPartial.ENTRY, FromPartial.ENTRY, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/CountDistinctOverTime.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/CountDistinctOverTime.java new file mode 100644 index 0000000000000..99e419bbf14f5 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/CountDistinctOverTime.java @@ -0,0 +1,107 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.expression.function.aggregate; + +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.core.expression.Literal; +import org.elasticsearch.xpack.esql.core.tree.NodeInfo; +import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.expression.function.FunctionInfo; +import org.elasticsearch.xpack.esql.expression.function.FunctionType; +import org.elasticsearch.xpack.esql.expression.function.OptionalArgument; +import org.elasticsearch.xpack.esql.expression.function.Param; + +import java.io.IOException; +import java.util.List; + +/** + * Similar to {@link CountDistinct}, but it is used to calculate the distinct count of values over a time series from the given field. + */ +public class CountDistinctOverTime extends TimeSeriesAggregateFunction implements OptionalArgument { + public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( + Expression.class, + "DistinctOverTime", + CountDistinctOverTime::new + ); + + private final Expression precision; + + @FunctionInfo( + returnType = { "long" }, + description = "The count of distinct values over time for a field.", + type = FunctionType.AGGREGATE + ) + public CountDistinctOverTime( + Source source, + @Param( + name = "field", + type = { "boolean", "date", "date_nanos", "double", "integer", "ip", "keyword", "long", "text", "version" } + ) Expression field, + @Param( + optional = true, + name = "precision", + type = { "integer", "long", "unsigned_long" }, + description = "Precision threshold. Refer to <>. " + + "The maximum supported value is 40000. Thresholds above this number will have the " + + "same effect as a threshold of 40000. The default value is 3000." + ) Expression precision + ) { + this(source, field, Literal.TRUE, precision); + } + + public CountDistinctOverTime(Source source, Expression field, Expression filter, Expression precision) { + super(source, field, filter, precision == null ? List.of() : List.of(precision)); + this.precision = precision; + } + + private CountDistinctOverTime(StreamInput in) throws IOException { + super(in); + this.precision = parameters().isEmpty() ? null : parameters().getFirst(); + } + + @Override + public String getWriteableName() { + return ENTRY.name; + } + + @Override + public CountDistinctOverTime withFilter(Expression filter) { + return new CountDistinctOverTime(source(), field(), filter, precision); + } + + @Override + protected NodeInfo info() { + return NodeInfo.create(this, CountDistinctOverTime::new, field(), filter(), precision); + } + + @Override + public CountDistinctOverTime replaceChildren(List newChildren) { + if (newChildren.size() < 3) { + return new CountDistinctOverTime(source(), newChildren.get(0), newChildren.get(1), null); + } + return new CountDistinctOverTime(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2)); + } + + @Override + protected TypeResolution resolveType() { + return perTimeSeriesAggregation().resolveType(); + } + + @Override + public DataType dataType() { + return perTimeSeriesAggregation().dataType(); + } + + @Override + public CountDistinct perTimeSeriesAggregation() { + return new CountDistinct(source(), field(), filter(), precision); + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/CountOverTime.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/CountOverTime.java new file mode 100644 index 0000000000000..8da3aee69d8a4 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/CountOverTime.java @@ -0,0 +1,102 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.expression.function.aggregate; + +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.core.expression.Literal; +import org.elasticsearch.xpack.esql.core.tree.NodeInfo; +import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.expression.function.FunctionInfo; +import org.elasticsearch.xpack.esql.expression.function.FunctionType; +import org.elasticsearch.xpack.esql.expression.function.Param; + +import java.io.IOException; +import java.util.List; + +import static java.util.Collections.emptyList; + +/** + * Similar to {@link Count}, but it is used to calculate the count of values over a time series from the given field. + */ +public class CountOverTime extends TimeSeriesAggregateFunction { + public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( + Expression.class, + "CountOverTime", + CountOverTime::new + ); + + @FunctionInfo(returnType = { "long" }, description = "The count over time value of a field.", type = FunctionType.AGGREGATE) + public CountOverTime( + Source source, + @Param( + name = "field", + type = { + "aggregate_metric_double", + "boolean", + "cartesian_point", + "date", + "double", + "geo_point", + "integer", + "ip", + "keyword", + "long", + "text", + "unsigned_long", + "version" } + ) Expression field + ) { + this(source, field, Literal.TRUE); + } + + public CountOverTime(Source source, Expression field, Expression filter) { + super(source, field, filter, emptyList()); + } + + private CountOverTime(StreamInput in) throws IOException { + super(in); + } + + @Override + public String getWriteableName() { + return ENTRY.name; + } + + @Override + public CountOverTime withFilter(Expression filter) { + return new CountOverTime(source(), field(), filter); + } + + @Override + protected NodeInfo info() { + return NodeInfo.create(this, CountOverTime::new, field(), filter()); + } + + @Override + public CountOverTime replaceChildren(List newChildren) { + return new CountOverTime(source(), newChildren.get(0), newChildren.get(1)); + } + + @Override + protected TypeResolution resolveType() { + return perTimeSeriesAggregation().resolveType(); + } + + @Override + public DataType dataType() { + return perTimeSeriesAggregation().dataType(); + } + + @Override + public Count perTimeSeriesAggregation() { + return new Count(source(), field(), filter()); + } +} diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistryTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistryTests.java index 50cbbdf4a9338..d8c1b40e78c48 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistryTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistryTests.java @@ -29,7 +29,7 @@ import static org.elasticsearch.xpack.esql.ConfigurationTestUtils.randomConfiguration; import static org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry.def; import static org.elasticsearch.xpack.esql.expression.function.FunctionResolutionStrategy.DEFAULT; -import static org.hamcrest.Matchers.endsWith; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.is; import static org.mockito.Mockito.mock; @@ -54,11 +54,11 @@ public void testBinaryFunction() { // No children aren't supported ParsingException e = expectThrows(ParsingException.class, () -> uf(DEFAULT).buildResolved(randomConfiguration(), def)); - assertThat(e.getMessage(), endsWith("expects exactly two arguments")); + assertThat(e.getMessage(), containsString("expects exactly two arguments")); // One child isn't supported e = expectThrows(ParsingException.class, () -> uf(DEFAULT, mock(Expression.class)).buildResolved(randomConfiguration(), def)); - assertThat(e.getMessage(), endsWith("expects exactly two arguments")); + assertThat(e.getMessage(), containsString("expects exactly two arguments")); // Many children aren't supported e = expectThrows( @@ -68,7 +68,7 @@ public void testBinaryFunction() { def ) ); - assertThat(e.getMessage(), endsWith("expects exactly two arguments")); + assertThat(e.getMessage(), containsString("expects exactly two arguments")); } public void testAliasNameIsTheSameAsAFunctionName() { @@ -138,14 +138,14 @@ public void testUnaryFunction() { // No children aren't supported ParsingException e = expectThrows(ParsingException.class, () -> uf(DEFAULT).buildResolved(randomConfiguration(), def)); - assertThat(e.getMessage(), endsWith("expects exactly one argument")); + assertThat(e.getMessage(), containsString("expects exactly one argument")); // Multiple children aren't supported e = expectThrows( ParsingException.class, () -> uf(DEFAULT, mock(Expression.class), mock(Expression.class)).buildResolved(randomConfiguration(), def) ); - assertThat(e.getMessage(), endsWith("expects exactly one argument")); + assertThat(e.getMessage(), containsString("expects exactly one argument")); } public void testConfigurationOptionalFunction() { diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml index d1eb251788490..41c6e64f52a18 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml @@ -33,7 +33,14 @@ setup: path: /_query parameters: [] # A snapshot function was removed in match_function_options, it can't work on mixed cluster tests otherwise. - capabilities: [ snapshot_test_for_telemetry, fn_byte_length, match_function_options, first_over_time, sum_over_time] + capabilities: + - snapshot_test_for_telemetry + - fn_byte_length + - match_function_options + - first_over_time + - sum_over_time + - count_over_time + - distinct_over_time reason: "Test that should only be executed on snapshot versions" - do: {xpack.usage: {}} @@ -123,7 +130,7 @@ setup: - match: {esql.functions.coalesce: $functions_coalesce} - gt: {esql.functions.categorize: $functions_categorize} # Testing for the entire function set isn't feasible, so we just check that we return the correct count as an approximation. - - length: {esql.functions: 144} # check the "sister" test below for a likely update to the same esql.functions length check + - length: {esql.functions: 146} # check the "sister" test below for a likely update to the same esql.functions length check --- "Basic ESQL usage output (telemetry) non-snapshot version":