Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,24 @@ cost:double | cluster:keyword | time_bucket:datetime
22.75 | qa | 2024-05-10T00:08:00.000Z
;

min_over_time
required_capability: metrics_command
required_capability: min_over_time
TS k8s | STATS cost=sum(min_over_time(network.cost)) BY cluster, time_bucket = bucket(@timestamp,1minute) | SORT cost DESC, time_bucket DESC, cluster | LIMIT 10;

cost:double | cluster:keyword | time_bucket:datetime
29.0 | prod | 2024-05-10T00:19:00.000Z
27.625 | qa | 2024-05-10T00:06:00.000Z
24.25 | qa | 2024-05-10T00:09:00.000Z
23.125 | staging | 2024-05-10T00:08:00.000Z
22.5 | prod | 2024-05-10T00:13:00.000Z
18.625 | qa | 2024-05-10T00:04:00.000Z
18.0 | qa | 2024-05-10T00:11:00.000Z
17.75 | qa | 2024-05-10T00:01:00.000Z
17.125 | qa | 2024-05-10T00:22:00.000Z
16.875 | qa | 2024-05-10T00:20:00.000Z
;

max_of_avg_over_time
required_capability: metrics_command
required_capability: avg_over_time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1054,7 +1054,12 @@ public enum Cap {
/**
* Guards a bug fix matching {@code TO_LOWER(f) == ""}.
*/
TO_LOWER_EMPTY_STRING;
TO_LOWER_EMPTY_STRING,

/**
* Support min_over_time aggregation that gets evaluated per time-series
*/
MIN_OVER_TIME(Build.current().isSnapshot());

private final boolean enabled;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.xpack.esql.expression.function.aggregate.Median;
import org.elasticsearch.xpack.esql.expression.function.aggregate.MedianAbsoluteDeviation;
import org.elasticsearch.xpack.esql.expression.function.aggregate.Min;
import org.elasticsearch.xpack.esql.expression.function.aggregate.MinOverTime;
import org.elasticsearch.xpack.esql.expression.function.aggregate.Percentile;
import org.elasticsearch.xpack.esql.expression.function.aggregate.Rate;
import org.elasticsearch.xpack.esql.expression.function.aggregate.SpatialCentroid;
Expand Down Expand Up @@ -444,6 +445,7 @@ private static FunctionDefinition[][] snapshotFunctions() {
def(Delay.class, Delay::new, "delay"),
def(Rate.class, Rate::withUnresolvedTimestamp, "rate"),
def(MaxOverTime.class, uni(MaxOverTime::new), "max_over_time"),
def(MinOverTime.class, uni(MinOverTime::new), "min_over_time"),
def(AvgOverTime.class, uni(AvgOverTime::new), "avg_over_time"),
def(LastOverTime.class, LastOverTime::withUnresolvedTimestamp, "last_over_time"),
def(Term.class, bi(Term::new), "term") } };
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public static List<NamedWriteableRegistry.Entry> getNamedWriteables() {
Sum.ENTRY,
Top.ENTRY,
Values.ENTRY,
MinOverTime.ENTRY,
MaxOverTime.ENTRY,
AvgOverTime.ENTRY,
LastOverTime.ENTRY,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.expression.function.aggregate;

import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.expression.function.FunctionInfo;
import org.elasticsearch.xpack.esql.expression.function.FunctionType;
import org.elasticsearch.xpack.esql.expression.function.Param;

import java.io.IOException;
import java.util.List;

import static java.util.Collections.emptyList;

/**
* Similar to {@link Min}, but it is used to calculate the minimum value over a time series of values from the given field.
*/
public class MinOverTime extends TimeSeriesAggregateFunction {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
Expression.class,
"MinOverTime",
MinOverTime::new
);

@FunctionInfo(
returnType = { "boolean", "double", "integer", "long", "date", "date_nanos", "ip", "keyword", "long", "version" },
description = "The minimum over time value of a field.",
type = FunctionType.AGGREGATE
)
public MinOverTime(
Source source,
@Param(
name = "field",
type = {
"aggregate_metric_double",
"boolean",
"double",
"integer",
"long",
"date",
"date_nanos",
"ip",
"keyword",
"text",
"long",
"version" }
) Expression field
) {
this(source, field, Literal.TRUE);
}

public MinOverTime(Source source, Expression field, Expression filter) {
super(source, field, filter, emptyList());
}

private MinOverTime(StreamInput in) throws IOException {
super(in);
}

@Override
public String getWriteableName() {
return ENTRY.name;
}

@Override
public MinOverTime withFilter(Expression filter) {
return new MinOverTime(source(), field(), filter);
}

@Override
protected NodeInfo<MinOverTime> info() {
return NodeInfo.create(this, MinOverTime::new, field(), filter());
}

@Override
public MinOverTime replaceChildren(List<Expression> newChildren) {
return new MinOverTime(source(), newChildren.get(0), newChildren.get(1));
}

@Override
protected TypeResolution resolveType() {
return perTimeSeriesAggregation().resolveType();
}

@Override
public DataType dataType() {
return perTimeSeriesAggregation().dataType();
}

@Override
public Min perTimeSeriesAggregation() {
return new Min(source(), field(), filter());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ setup:
path: /_query
parameters: []
# A snapshot function was removed in match_function_options, it can't work on mixed cluster tests otherwise.
capabilities: [ snapshot_test_for_telemetry, fn_byte_length, match_function_options, last_over_time]
capabilities: [ snapshot_test_for_telemetry, fn_byte_length, match_function_options, min_over_time ]
reason: "Test that should only be executed on snapshot versions"

- do: {xpack.usage: {}}
Expand Down Expand Up @@ -123,7 +123,7 @@ setup:
- match: {esql.functions.coalesce: $functions_coalesce}
- gt: {esql.functions.categorize: $functions_categorize}
# Testing for the entire function set isn't feasbile, so we just check that we return the correct count as an approximation.
- length: {esql.functions: 138} # check the "sister" test below for a likely update to the same esql.functions length check
- length: {esql.functions: 139} # check the "sister" test below for a likely update to the same esql.functions length check

---
"Basic ESQL usage output (telemetry) non-snapshot version":
Expand Down