Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -295,3 +295,41 @@ sum_cost:double | cluster:keyword | time_bucket:datetime

;

count_over_time
required_capability: metrics_command
required_capability: count_over_time

TS k8s | STATS count=count(count_over_time(network.cost)) BY cluster, time_bucket = bucket(@timestamp,1minute) | SORT count DESC, time_bucket DESC, cluster | LIMIT 10;

count:long | cluster:keyword | time_bucket:datetime
3 | staging | 2024-05-10T00:22:00.000Z
3 | prod | 2024-05-10T00:20:00.000Z
3 | prod | 2024-05-10T00:19:00.000Z
3 | prod | 2024-05-10T00:18:00.000Z
3 | qa | 2024-05-10T00:18:00.000Z
3 | staging | 2024-05-10T00:18:00.000Z
3 | prod | 2024-05-10T00:17:00.000Z
3 | qa | 2024-05-10T00:17:00.000Z
3 | qa | 2024-05-10T00:15:00.000Z
3 | staging | 2024-05-10T00:15:00.000Z

;
distinct_over_time
required_capability: metrics_command
required_capability: distinct_over_time

TS k8s | STATS distincts=count_distinct(distinct_over_time(network.cost)) BY cluster, time_bucket = bucket(@timestamp,1minute) | SORT distincts DESC, time_bucket DESC, cluster | LIMIT 10;

distincts:long | cluster:keyword | time_bucket:datetime
3 | qa | 2024-05-10T00:17:00.000Z
3 | qa | 2024-05-10T00:15:00.000Z
3 | prod | 2024-05-10T00:09:00.000Z
3 | qa | 2024-05-10T00:09:00.000Z
2 | prod | 2024-05-10T00:22:00.000Z
2 | staging | 2024-05-10T00:22:00.000Z
2 | prod | 2024-05-10T00:20:00.000Z
2 | prod | 2024-05-10T00:18:00.000Z
2 | qa | 2024-05-10T00:18:00.000Z
2 | staging | 2024-05-10T00:18:00.000Z

;
Original file line number Diff line number Diff line change
Expand Up @@ -1088,6 +1088,16 @@ public enum Cap {
*/
SUM_OVER_TIME(Build.current().isSnapshot()),

/**
* Support count_over_time aggregation that gets evaluated per time-series
*/
COUNT_OVER_TIME(Build.current().isSnapshot()),

/**
* Support for distinct_over_time aggregation that gets evaluated per time-series
*/
DISTINCT_OVER_TIME(Build.current().isSnapshot()),

/**
* Resolve groupings before resolving references to groupings in the aggregations.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.elasticsearch.xpack.esql.expression.function.aggregate.AvgOverTime;
import org.elasticsearch.xpack.esql.expression.function.aggregate.Count;
import org.elasticsearch.xpack.esql.expression.function.aggregate.CountDistinct;
import org.elasticsearch.xpack.esql.expression.function.aggregate.CountOverTime;
import org.elasticsearch.xpack.esql.expression.function.aggregate.DistinctOverTime;
import org.elasticsearch.xpack.esql.expression.function.aggregate.FirstOverTime;
import org.elasticsearch.xpack.esql.expression.function.aggregate.LastOverTime;
import org.elasticsearch.xpack.esql.expression.function.aggregate.Max;
Expand Down Expand Up @@ -455,6 +457,8 @@ private static FunctionDefinition[][] snapshotFunctions() {
def(MaxOverTime.class, uni(MaxOverTime::new), "max_over_time"),
def(MinOverTime.class, uni(MinOverTime::new), "min_over_time"),
def(SumOverTime.class, uni(SumOverTime::new), "sum_over_time"),
def(CountOverTime.class, uni(CountOverTime::new), "count_over_time"),
def(DistinctOverTime.class, uni(DistinctOverTime::new), "distinct_over_time"),
def(AvgOverTime.class, uni(AvgOverTime::new), "avg_over_time"),
def(LastOverTime.class, LastOverTime::withUnresolvedTimestamp, "last_over_time"),
def(FirstOverTime.class, FirstOverTime::withUnresolvedTimestamp, "first_over_time"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public static List<NamedWriteableRegistry.Entry> getNamedWriteables() {
LastOverTime.ENTRY,
FirstOverTime.ENTRY,
SumOverTime.ENTRY,
CountOverTime.ENTRY,
DistinctOverTime.ENTRY,
// internal functions
ToPartial.ENTRY,
FromPartial.ENTRY,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.expression.function.aggregate;

import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.expression.function.FunctionInfo;
import org.elasticsearch.xpack.esql.expression.function.FunctionType;
import org.elasticsearch.xpack.esql.expression.function.Param;

import java.io.IOException;
import java.util.List;

import static java.util.Collections.emptyList;

/**
* Similar to {@link Count}, but it is used to calculate the count of values over a time series from the given field.
*/
public class CountOverTime extends TimeSeriesAggregateFunction {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
Expression.class,
"CountOverTime",
CountOverTime::new
);

@FunctionInfo(returnType = { "integer", "long" }, description = "The count over time value of a field.", type = FunctionType.AGGREGATE)
public CountOverTime(
Source source,
@Param(name = "field", type = { "aggregate_metric_double", "double", "integer", "long" }) Expression field
) {
this(source, field, Literal.TRUE);
}

public CountOverTime(Source source, Expression field, Expression filter) {
super(source, field, filter, emptyList());
}

private CountOverTime(StreamInput in) throws IOException {
super(in);
}

@Override
public String getWriteableName() {
return ENTRY.name;
}

@Override
public CountOverTime withFilter(Expression filter) {
return new CountOverTime(source(), field(), filter);
}

@Override
protected NodeInfo<CountOverTime> info() {
return NodeInfo.create(this, CountOverTime::new, field(), filter());
}

@Override
public CountOverTime replaceChildren(List<Expression> newChildren) {
return new CountOverTime(source(), newChildren.get(0), newChildren.get(1));
}

@Override
protected TypeResolution resolveType() {
return perTimeSeriesAggregation().resolveType();
}

@Override
public DataType dataType() {
return perTimeSeriesAggregation().dataType();
}

@Override
public Count perTimeSeriesAggregation() {
return new Count(source(), field(), filter());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.expression.function.aggregate;

import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.expression.function.FunctionInfo;
import org.elasticsearch.xpack.esql.expression.function.FunctionType;
import org.elasticsearch.xpack.esql.expression.function.Param;

import java.io.IOException;
import java.util.List;

import static java.util.Collections.emptyList;

/**
* Similar to {@link CountDistinct}, but it is used to calculate the distinct count of values over a time series from the given field.
*/
public class DistinctOverTime extends TimeSeriesAggregateFunction {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
Expression.class,
"DistinctOverTime",
DistinctOverTime::new
);

@FunctionInfo(
returnType = { "integer", "long" },
description = "The count of distinct values over time for a field.",
type = FunctionType.AGGREGATE
)
public DistinctOverTime(
Source source,
@Param(name = "field", type = { "aggregate_metric_double", "double", "integer", "long", "float" }) Expression field
) {
this(source, field, Literal.TRUE);
}

public DistinctOverTime(Source source, Expression field, Expression filter) {
super(source, field, filter, emptyList());
}

private DistinctOverTime(StreamInput in) throws IOException {
super(in);
}

@Override
public String getWriteableName() {
return ENTRY.name;
}

@Override
public DistinctOverTime withFilter(Expression filter) {
return new DistinctOverTime(source(), field(), filter);
}

@Override
protected NodeInfo<DistinctOverTime> info() {
return NodeInfo.create(this, DistinctOverTime::new, field(), filter());
}

@Override
public DistinctOverTime replaceChildren(List<Expression> newChildren) {
return new DistinctOverTime(source(), newChildren.get(0), newChildren.get(1));
}

@Override
protected TypeResolution resolveType() {
return perTimeSeriesAggregation().resolveType();
}

@Override
public DataType dataType() {
return perTimeSeriesAggregation().dataType();
}

@Override
public CountDistinct perTimeSeriesAggregation() {
// TODO(pabloem): Do we need to take in a precision parameter here?
return new CountDistinct(source(), field(), filter(), null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,14 @@ setup:
path: /_query
parameters: []
# A snapshot function was removed in match_function_options, it can't work on mixed cluster tests otherwise.
capabilities: [ snapshot_test_for_telemetry, fn_byte_length, match_function_options, first_over_time, sum_over_time]
capabilities:
- snapshot_test_for_telemetry
- fn_byte_length
- match_function_options
- first_over_time
- sum_over_time
- count_over_time
- distinct_over_time
reason: "Test that should only be executed on snapshot versions"

- do: {xpack.usage: {}}
Expand Down Expand Up @@ -123,7 +130,7 @@ setup:
- match: {esql.functions.coalesce: $functions_coalesce}
- gt: {esql.functions.categorize: $functions_categorize}
# Testing for the entire function set isn't feasible, so we just check that we return the correct count as an approximation.
- length: {esql.functions: 144} # check the "sister" test below for a likely update to the same esql.functions length check
- length: {esql.functions: 146} # check the "sister" test below for a likely update to the same esql.functions length check

---
"Basic ESQL usage output (telemetry) non-snapshot version":
Expand Down