Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -295,3 +295,44 @@ sum_cost:double | cluster:keyword | time_bucket:datetime

;

count_over_time
required_capability: metrics_command
required_capability: count_over_time

TS k8s
| STATS count=count(count_over_time(network.cost)) BY cluster, time_bucket = bucket(@timestamp,1minute)
| SORT count DESC, time_bucket DESC, cluster
| LIMIT 10;

count:long | cluster:keyword | time_bucket:datetime
3 | staging | 2024-05-10T00:22:00.000Z
3 | prod | 2024-05-10T00:20:00.000Z
3 | prod | 2024-05-10T00:19:00.000Z
3 | prod | 2024-05-10T00:18:00.000Z
3 | qa | 2024-05-10T00:18:00.000Z
3 | staging | 2024-05-10T00:18:00.000Z
3 | prod | 2024-05-10T00:17:00.000Z
3 | qa | 2024-05-10T00:17:00.000Z
3 | qa | 2024-05-10T00:15:00.000Z
3 | staging | 2024-05-10T00:15:00.000Z

;
count_distinct_over_time
required_capability: metrics_command
required_capability: distinct_over_time

TS k8s | STATS distincts=count_distinct(count_distinct_over_time(network.cost)), distincts_imprecise=count_distinct(count_distinct_over_time(network.cost, 100)) BY cluster, time_bucket = bucket(@timestamp,1minute) | SORT distincts DESC, time_bucket DESC, cluster | LIMIT 10;

distincts:long | distincts_imprecise:long | cluster:keyword | time_bucket:datetime
3 |3 | qa | 2024-05-10T00:17:00.000Z
3 |3 | qa | 2024-05-10T00:15:00.000Z
3 |3 | prod | 2024-05-10T00:09:00.000Z
3 |3 | qa | 2024-05-10T00:09:00.000Z
2 |2 | prod | 2024-05-10T00:22:00.000Z
2 |2 | staging | 2024-05-10T00:22:00.000Z
2 |2 | prod | 2024-05-10T00:20:00.000Z
2 |2 | prod | 2024-05-10T00:18:00.000Z
2 |2 | qa | 2024-05-10T00:18:00.000Z
2 |2 | staging | 2024-05-10T00:18:00.000Z

;

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -1088,6 +1088,16 @@ public enum Cap {
*/
SUM_OVER_TIME(Build.current().isSnapshot()),

/**
* Support count_over_time aggregation that gets evaluated per time-series
*/
COUNT_OVER_TIME(Build.current().isSnapshot()),

/**
* Support for count_distinct_over_time aggregation that gets evaluated per time-series
*/
COUNT_DISTINCT_OVER_TIME(Build.current().isSnapshot()),

/**
* Resolve groupings before resolving references to groupings in the aggregations.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.xpack.esql.expression.function;

import org.elasticsearch.Build;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.util.FeatureFlag;
import org.elasticsearch.xpack.esql.core.QlIllegalArgumentException;
Expand All @@ -22,6 +23,8 @@
import org.elasticsearch.xpack.esql.expression.function.aggregate.AvgOverTime;
import org.elasticsearch.xpack.esql.expression.function.aggregate.Count;
import org.elasticsearch.xpack.esql.expression.function.aggregate.CountDistinct;
import org.elasticsearch.xpack.esql.expression.function.aggregate.CountDistinctOverTime;
import org.elasticsearch.xpack.esql.expression.function.aggregate.CountOverTime;
import org.elasticsearch.xpack.esql.expression.function.aggregate.FirstOverTime;
import org.elasticsearch.xpack.esql.expression.function.aggregate.LastOverTime;
import org.elasticsearch.xpack.esql.expression.function.aggregate.Max;
Expand Down Expand Up @@ -455,6 +458,8 @@ private static FunctionDefinition[][] snapshotFunctions() {
def(MaxOverTime.class, uni(MaxOverTime::new), "max_over_time"),
def(MinOverTime.class, uni(MinOverTime::new), "min_over_time"),
def(SumOverTime.class, uni(SumOverTime::new), "sum_over_time"),
def(CountOverTime.class, uni(CountOverTime::new), "count_over_time"),
def(CountDistinctOverTime.class, bi(CountDistinctOverTime::new), "count_distinct_over_time"),
def(AvgOverTime.class, uni(AvgOverTime::new), "avg_over_time"),
def(LastOverTime.class, LastOverTime::withUnresolvedTimestamp, "last_over_time"),
def(FirstOverTime.class, FirstOverTime::withUnresolvedTimestamp, "first_over_time"),
Expand Down Expand Up @@ -914,9 +919,13 @@ public static <T extends Function> FunctionDefinition def(Class<T> function, Bin
FunctionBuilder builder = (source, children, cfg) -> {
boolean isBinaryOptionalParamFunction = OptionalArgument.class.isAssignableFrom(function);
if (isBinaryOptionalParamFunction && (children.size() > 2 || children.size() < 1)) {
throw new QlIllegalArgumentException("expects one or two arguments");
throw new QlIllegalArgumentException(
Strings.format("function %s expects one or two arguments but it received %d", Arrays.toString(names), children.size())
);
} else if (isBinaryOptionalParamFunction == false && children.size() != 2) {
throw new QlIllegalArgumentException("expects exactly two arguments");
throw new QlIllegalArgumentException(
Strings.format("function %s expects exactly two arguments, it received %d", Arrays.toString(names), children.size())
);
}

return ctorRef.build(source, children.get(0), children.size() == 2 ? children.get(1) : null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public static List<NamedWriteableRegistry.Entry> getNamedWriteables() {
LastOverTime.ENTRY,
FirstOverTime.ENTRY,
SumOverTime.ENTRY,
CountOverTime.ENTRY,
CountDistinctOverTime.ENTRY,
// internal functions
ToPartial.ENTRY,
FromPartial.ENTRY,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.expression.function.aggregate;

import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.expression.function.FunctionInfo;
import org.elasticsearch.xpack.esql.expression.function.FunctionType;
import org.elasticsearch.xpack.esql.expression.function.OptionalArgument;
import org.elasticsearch.xpack.esql.expression.function.Param;

import java.io.IOException;
import java.util.List;

/**
* Similar to {@link CountDistinct}, but it is used to calculate the distinct count of values over a time series from the given field.
*/
public class CountDistinctOverTime extends TimeSeriesAggregateFunction implements OptionalArgument {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
Expression.class,
"DistinctOverTime",
CountDistinctOverTime::new
);

private final Expression precision;

@FunctionInfo(
returnType = { "long" },
description = "The count of distinct values over time for a field.",
type = FunctionType.AGGREGATE
)
public CountDistinctOverTime(
Source source,
@Param(
name = "field",
type = { "boolean", "date", "date_nanos", "double", "integer", "ip", "keyword", "long", "text", "version" }
) Expression field,
@Param(
optional = true,
name = "precision",
type = { "integer", "long", "unsigned_long" },
description = "Precision threshold. Refer to <<esql-agg-count-distinct-approximate>>. "
+ "The maximum supported value is 40000. Thresholds above this number will have the "
+ "same effect as a threshold of 40000. The default value is 3000."
) Expression precision
) {
this(source, field, Literal.TRUE, precision);
}

public CountDistinctOverTime(Source source, Expression field, Expression filter, Expression precision) {
super(source, field, filter, precision == null ? List.of() : List.of(precision));
this.precision = precision;
}

private CountDistinctOverTime(StreamInput in) throws IOException {
super(in);
this.precision = parameters().isEmpty() ? null : parameters().getFirst();
}

@Override
public String getWriteableName() {
return ENTRY.name;
}

@Override
public CountDistinctOverTime withFilter(Expression filter) {
return new CountDistinctOverTime(source(), field(), filter, precision);
}

@Override
protected NodeInfo<CountDistinctOverTime> info() {
return NodeInfo.create(this, CountDistinctOverTime::new, field(), filter(), precision);
}

@Override
public CountDistinctOverTime replaceChildren(List<Expression> newChildren) {
if (newChildren.size() < 3) {
return new CountDistinctOverTime(source(), newChildren.get(0), newChildren.get(1), null);
}
return new CountDistinctOverTime(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2));
}

@Override
protected TypeResolution resolveType() {
return perTimeSeriesAggregation().resolveType();
}

@Override
public DataType dataType() {
return perTimeSeriesAggregation().dataType();
}

@Override
public CountDistinct perTimeSeriesAggregation() {
return new CountDistinct(source(), field(), filter(), precision);
}
}
Loading