Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -295,3 +295,44 @@ sum_cost:double | cluster:keyword | time_bucket:datetime

;

count_over_time
required_capability: metrics_command
required_capability: count_over_time

TS k8s
| STATS count=count(count_over_time(network.cost)) BY cluster, time_bucket = bucket(@timestamp,1minute)
| SORT count DESC, time_bucket DESC, cluster
| LIMIT 10;

count:long | cluster:keyword | time_bucket:datetime
3 | staging | 2024-05-10T00:22:00.000Z
3 | prod | 2024-05-10T00:20:00.000Z
3 | prod | 2024-05-10T00:19:00.000Z
3 | prod | 2024-05-10T00:18:00.000Z
3 | qa | 2024-05-10T00:18:00.000Z
3 | staging | 2024-05-10T00:18:00.000Z
3 | prod | 2024-05-10T00:17:00.000Z
3 | qa | 2024-05-10T00:17:00.000Z
3 | qa | 2024-05-10T00:15:00.000Z
3 | staging | 2024-05-10T00:15:00.000Z

;
count_distinct_over_time
required_capability: metrics_command
required_capability: distinct_over_time

TS k8s | STATS distincts=count_distinct(count_distinct_over_time(network.cost)), distincts_imprecise=count_distinct(count_distinct_over_time(network.cost, 100)) BY cluster, time_bucket = bucket(@timestamp,1minute) | SORT distincts DESC, time_bucket DESC, cluster | LIMIT 10;

distincts:long | distincts_imprecise:long | cluster:keyword | time_bucket:datetime
3 |3 | qa | 2024-05-10T00:17:00.000Z
3 |3 | qa | 2024-05-10T00:15:00.000Z
3 |3 | prod | 2024-05-10T00:09:00.000Z
3 |3 | qa | 2024-05-10T00:09:00.000Z
2 |2 | prod | 2024-05-10T00:22:00.000Z
2 |2 | staging | 2024-05-10T00:22:00.000Z
2 |2 | prod | 2024-05-10T00:20:00.000Z
2 |2 | prod | 2024-05-10T00:18:00.000Z
2 |2 | qa | 2024-05-10T00:18:00.000Z
2 |2 | staging | 2024-05-10T00:18:00.000Z

;
Original file line number Diff line number Diff line change
Expand Up @@ -1088,6 +1088,16 @@ public enum Cap {
*/
SUM_OVER_TIME(Build.current().isSnapshot()),

/**
* Support count_over_time aggregation that gets evaluated per time-series
*/
COUNT_OVER_TIME(Build.current().isSnapshot()),

/**
* Support for count_distinct_over_time aggregation that gets evaluated per time-series
*/
COUNT_DISTINCT_OVER_TIME(Build.current().isSnapshot()),

/**
* Resolve groupings before resolving references to groupings in the aggregations.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.xpack.esql.expression.function;

import org.elasticsearch.Build;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.util.FeatureFlag;
import org.elasticsearch.xpack.esql.core.QlIllegalArgumentException;
Expand All @@ -22,6 +23,8 @@
import org.elasticsearch.xpack.esql.expression.function.aggregate.AvgOverTime;
import org.elasticsearch.xpack.esql.expression.function.aggregate.Count;
import org.elasticsearch.xpack.esql.expression.function.aggregate.CountDistinct;
import org.elasticsearch.xpack.esql.expression.function.aggregate.CountDistinctOverTime;
import org.elasticsearch.xpack.esql.expression.function.aggregate.CountOverTime;
import org.elasticsearch.xpack.esql.expression.function.aggregate.FirstOverTime;
import org.elasticsearch.xpack.esql.expression.function.aggregate.LastOverTime;
import org.elasticsearch.xpack.esql.expression.function.aggregate.Max;
Expand Down Expand Up @@ -455,6 +458,8 @@ private static FunctionDefinition[][] snapshotFunctions() {
def(MaxOverTime.class, uni(MaxOverTime::new), "max_over_time"),
def(MinOverTime.class, uni(MinOverTime::new), "min_over_time"),
def(SumOverTime.class, uni(SumOverTime::new), "sum_over_time"),
def(CountOverTime.class, uni(CountOverTime::new), "count_over_time"),
def(CountDistinctOverTime.class, bi(CountDistinctOverTime::new), "count_distinct_over_time"),
def(AvgOverTime.class, uni(AvgOverTime::new), "avg_over_time"),
def(LastOverTime.class, LastOverTime::withUnresolvedTimestamp, "last_over_time"),
def(FirstOverTime.class, FirstOverTime::withUnresolvedTimestamp, "first_over_time"),
Expand Down Expand Up @@ -914,9 +919,13 @@ public static <T extends Function> FunctionDefinition def(Class<T> function, Bin
FunctionBuilder builder = (source, children, cfg) -> {
boolean isBinaryOptionalParamFunction = OptionalArgument.class.isAssignableFrom(function);
if (isBinaryOptionalParamFunction && (children.size() > 2 || children.size() < 1)) {
throw new QlIllegalArgumentException("expects one or two arguments");
throw new QlIllegalArgumentException(
Strings.format("function %s expects one or two arguments but it received %d", Arrays.toString(names), children.size())
);
} else if (isBinaryOptionalParamFunction == false && children.size() != 2) {
throw new QlIllegalArgumentException("expects exactly two arguments");
throw new QlIllegalArgumentException(
Strings.format("function %s expects exactly two arguments, it received %d", Arrays.toString(names), children.size())
);
}

return ctorRef.build(source, children.get(0), children.size() == 2 ? children.get(1) : null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public static List<NamedWriteableRegistry.Entry> getNamedWriteables() {
LastOverTime.ENTRY,
FirstOverTime.ENTRY,
SumOverTime.ENTRY,
CountOverTime.ENTRY,
CountDistinctOverTime.ENTRY,
// internal functions
ToPartial.ENTRY,
FromPartial.ENTRY,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.expression.function.aggregate;

import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.expression.function.FunctionInfo;
import org.elasticsearch.xpack.esql.expression.function.FunctionType;
import org.elasticsearch.xpack.esql.expression.function.OptionalArgument;
import org.elasticsearch.xpack.esql.expression.function.Param;

import java.io.IOException;
import java.util.List;

/**
* Similar to {@link CountDistinct}, but it is used to calculate the distinct count of values over a time series from the given field.
*/
public class CountDistinctOverTime extends TimeSeriesAggregateFunction implements OptionalArgument {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
Expression.class,
"DistinctOverTime",
CountDistinctOverTime::new
);

private final Expression precision;

@FunctionInfo(
returnType = { "long" },
description = "The count of distinct values over time for a field.",
type = FunctionType.AGGREGATE
)
public CountDistinctOverTime(
Source source,
@Param(
name = "field",
type = { "boolean", "date", "date_nanos", "double", "integer", "ip", "keyword", "long", "text", "version" }
) Expression field,
@Param(
optional = true,
name = "precision",
type = { "integer", "long", "unsigned_long" },
description = "Precision threshold. Refer to <<esql-agg-count-distinct-approximate>>. "
+ "The maximum supported value is 40000. Thresholds above this number will have the "
+ "same effect as a threshold of 40000. The default value is 3000."
) Expression precision
) {
this(source, field, Literal.TRUE, precision);
}

public CountDistinctOverTime(Source source, Expression field, Expression filter, Expression precision) {
super(source, field, filter, precision == null ? List.of() : List.of(precision));
this.precision = precision;
}

private CountDistinctOverTime(StreamInput in) throws IOException {
super(in);
this.precision = parameters().isEmpty() ? null : parameters().getFirst();
}

@Override
public String getWriteableName() {
return ENTRY.name;
}

@Override
public CountDistinctOverTime withFilter(Expression filter) {
return new CountDistinctOverTime(source(), field(), filter, precision);
}

@Override
protected NodeInfo<CountDistinctOverTime> info() {
return NodeInfo.create(this, CountDistinctOverTime::new, field(), filter(), precision);
}

@Override
public CountDistinctOverTime replaceChildren(List<Expression> newChildren) {
if (newChildren.size() < 3) {
return new CountDistinctOverTime(source(), newChildren.get(0), newChildren.get(1), null);
}
return new CountDistinctOverTime(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2));
}

@Override
protected TypeResolution resolveType() {
return perTimeSeriesAggregation().resolveType();
}

@Override
public DataType dataType() {
return perTimeSeriesAggregation().dataType();
}

@Override
public CountDistinct perTimeSeriesAggregation() {
return new CountDistinct(source(), field(), filter(), precision);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.expression.function.aggregate;

import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.expression.function.FunctionInfo;
import org.elasticsearch.xpack.esql.expression.function.FunctionType;
import org.elasticsearch.xpack.esql.expression.function.Param;

import java.io.IOException;
import java.util.List;

import static java.util.Collections.emptyList;

/**
* Similar to {@link Count}, but it is used to calculate the count of values over a time series from the given field.
*/
public class CountOverTime extends TimeSeriesAggregateFunction {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
Expression.class,
"CountOverTime",
CountOverTime::new
);

@FunctionInfo(returnType = { "long" }, description = "The count over time value of a field.", type = FunctionType.AGGREGATE)
public CountOverTime(
Source source,
@Param(
name = "field",
type = {
"aggregate_metric_double",
"boolean",
"cartesian_point",
"date",
"double",
"geo_point",
"integer",
"ip",
"keyword",
"long",
"text",
"unsigned_long",
"version" }
) Expression field
) {
this(source, field, Literal.TRUE);
}

public CountOverTime(Source source, Expression field, Expression filter) {
super(source, field, filter, emptyList());
}

private CountOverTime(StreamInput in) throws IOException {
super(in);
}

@Override
public String getWriteableName() {
return ENTRY.name;
}

@Override
public CountOverTime withFilter(Expression filter) {
return new CountOverTime(source(), field(), filter);
}

@Override
protected NodeInfo<CountOverTime> info() {
return NodeInfo.create(this, CountOverTime::new, field(), filter());
}

@Override
public CountOverTime replaceChildren(List<Expression> newChildren) {
return new CountOverTime(source(), newChildren.get(0), newChildren.get(1));
}

@Override
protected TypeResolution resolveType() {
return perTimeSeriesAggregation().resolveType();
}

@Override
public DataType dataType() {
return perTimeSeriesAggregation().dataType();
}

@Override
public Count perTimeSeriesAggregation() {
return new Count(source(), field(), filter());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import static org.elasticsearch.xpack.esql.ConfigurationTestUtils.randomConfiguration;
import static org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry.def;
import static org.elasticsearch.xpack.esql.expression.function.FunctionResolutionStrategy.DEFAULT;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.mock;

Expand All @@ -54,11 +54,11 @@ public void testBinaryFunction() {

// No children aren't supported
ParsingException e = expectThrows(ParsingException.class, () -> uf(DEFAULT).buildResolved(randomConfiguration(), def));
assertThat(e.getMessage(), endsWith("expects exactly two arguments"));
assertThat(e.getMessage(), containsString("expects exactly two arguments"));

// One child isn't supported
e = expectThrows(ParsingException.class, () -> uf(DEFAULT, mock(Expression.class)).buildResolved(randomConfiguration(), def));
assertThat(e.getMessage(), endsWith("expects exactly two arguments"));
assertThat(e.getMessage(), containsString("expects exactly two arguments"));

// Many children aren't supported
e = expectThrows(
Expand All @@ -68,7 +68,7 @@ public void testBinaryFunction() {
def
)
);
assertThat(e.getMessage(), endsWith("expects exactly two arguments"));
assertThat(e.getMessage(), containsString("expects exactly two arguments"));
}

public void testAliasNameIsTheSameAsAFunctionName() {
Expand Down Expand Up @@ -138,14 +138,14 @@ public void testUnaryFunction() {

// No children aren't supported
ParsingException e = expectThrows(ParsingException.class, () -> uf(DEFAULT).buildResolved(randomConfiguration(), def));
assertThat(e.getMessage(), endsWith("expects exactly one argument"));
assertThat(e.getMessage(), containsString("expects exactly one argument"));

// Multiple children aren't supported
e = expectThrows(
ParsingException.class,
() -> uf(DEFAULT, mock(Expression.class), mock(Expression.class)).buildResolved(randomConfiguration(), def)
);
assertThat(e.getMessage(), endsWith("expects exactly one argument"));
assertThat(e.getMessage(), containsString("expects exactly one argument"));
}

public void testConfigurationOptionalFunction() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,14 @@ setup:
path: /_query
parameters: []
# A snapshot function was removed in match_function_options, it can't work on mixed cluster tests otherwise.
capabilities: [ snapshot_test_for_telemetry, fn_byte_length, match_function_options, first_over_time, sum_over_time]
capabilities:
- snapshot_test_for_telemetry
- fn_byte_length
- match_function_options
- first_over_time
- sum_over_time
- count_over_time
- distinct_over_time
reason: "Test that should only be executed on snapshot versions"

- do: {xpack.usage: {}}
Expand Down Expand Up @@ -123,7 +130,7 @@ setup:
- match: {esql.functions.coalesce: $functions_coalesce}
- gt: {esql.functions.categorize: $functions_categorize}
# Testing for the entire function set isn't feasible, so we just check that we return the correct count as an approximation.
- length: {esql.functions: 144} # check the "sister" test below for a likely update to the same esql.functions length check
- length: {esql.functions: 146} # check the "sister" test below for a likely update to the same esql.functions length check

---
"Basic ESQL usage output (telemetry) non-snapshot version":
Expand Down