Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -295,12 +295,15 @@ yield switch ((randomIntBetween(0, 6))) {
if (counterField == null) {
yield null;
}
yield "rate(" + counterField + ")";
yield switch ((randomIntBetween(0, 2))) {
case 0 -> "rate(" + counterField + ")";
case 1 -> "first_over_time(" + counterField + ")";
default -> "last_over_time(" + counterField + ")";
};
}
case 2 -> {
// numerics except aggregate_metric_double
// TODO: add to case 0 when support for aggregate_metric_double is added to these functions
// TODO: add to case 1 when support for counters is added
String numericFieldName = randomNumericField(previousOutput);
if (numericFieldName == null) {
yield null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,42 @@ events:long | pod:keyword | time_bucket:datetime
6 | two | 2024-05-10T00:20:00.000Z
;

first_over_time_counter_double
required_capability: ts_command_v0
required_capability: first_last_over_time_counter_support
TS k8s
| STATS sum = sum(first_over_time(network.total_cost)) by pod, time_bucket = bucket(@timestamp, 10minute)
| SORT time_bucket, pod
;

sum:double | pod:keyword | time_bucket:datetime
35.75 | one | 2024-05-10T00:00:00.000Z
34.375 | three | 2024-05-10T00:00:00.000Z
30.375 | two | 2024-05-10T00:00:00.000Z
96.5 | one | 2024-05-10T00:10:00.000Z
191.75 | three | 2024-05-10T00:10:00.000Z
163.25 | two | 2024-05-10T00:10:00.000Z
224.875 | one | 2024-05-10T00:20:00.000Z
142.875 | three | 2024-05-10T00:20:00.000Z
163.625 | two | 2024-05-10T00:20:00.000Z
;

first_over_time_counter_long
required_capability: ts_command_v0
required_capability: first_last_over_time_counter_support
TS k8s
| STATS max = max(first_over_time(network.total_bytes_in)) by pod, time_bucket = bucket(@timestamp, 10minute)
| SORT time_bucket, pod
;

max:long | pod:keyword | time_bucket:datetime
1103 | one | 2024-05-10T00:00:00.000Z
1441 | three | 2024-05-10T00:00:00.000Z
1395 | two | 2024-05-10T00:00:00.000Z
6077 | one | 2024-05-10T00:10:00.000Z
4200 | three | 2024-05-10T00:10:00.000Z
3478 | two | 2024-05-10T00:10:00.000Z
10207 | one | 2024-05-10T00:20:00.000Z
9969 | three | 2024-05-10T00:20:00.000Z
8709 | two | 2024-05-10T00:20:00.000Z
;
Original file line number Diff line number Diff line change
Expand Up @@ -276,3 +276,42 @@ events:long | pod:keyword | time_bucket:datetime
5 | two | 2024-05-10T00:20:00.000Z
;

last_over_time_counter_double
required_capability: ts_command_v0
required_capability: first_last_over_time_counter_support
TS k8s
| STATS sum = sum(last_over_time(network.total_cost)) by pod, time_bucket = bucket(@timestamp, 10minute)
| SORT time_bucket, pod
;

sum:double | pod:keyword | time_bucket:datetime
190.125 | one | 2024-05-10T00:00:00.000Z
158.75 | three | 2024-05-10T00:00:00.000Z
148.25 | two | 2024-05-10T00:00:00.000Z
220.125 | one | 2024-05-10T00:10:00.000Z
271.75 | three | 2024-05-10T00:10:00.000Z
160.75 | two | 2024-05-10T00:10:00.000Z
190.625 | one | 2024-05-10T00:20:00.000Z
180.375 | three | 2024-05-10T00:20:00.000Z
191.0 | two | 2024-05-10T00:20:00.000Z
;

last_over_time_counter_long
required_capability: ts_command_v0
required_capability: first_last_over_time_counter_support
TS k8s
| STATS max = max(last_over_time(network.total_bytes_in)) by pod, time_bucket = bucket(@timestamp, 10minute)
| SORT time_bucket, pod
;

max:long | pod:keyword | time_bucket:datetime
5963 | one | 2024-05-10T00:00:00.000Z
4169 | three | 2024-05-10T00:00:00.000Z
7900 | two | 2024-05-10T00:00:00.000Z
9254 | one | 2024-05-10T00:10:00.000Z
9195 | three | 2024-05-10T00:10:00.000Z
8031 | two | 2024-05-10T00:10:00.000Z
7286 | one | 2024-05-10T00:20:00.000Z
10797 | three | 2024-05-10T00:20:00.000Z
10277 | two | 2024-05-10T00:20:00.000Z
;
Original file line number Diff line number Diff line change
Expand Up @@ -1548,6 +1548,11 @@ public enum Cap {
*/
TS_COMMAND_V0(),

/**
* Add support for counter doubles, ints, and longs in first_ and last_over_time
*/
FIRST_LAST_OVER_TIME_COUNTER_SUPPORT,

FIX_ALIAS_ID_WHEN_DROP_ALL_AGGREGATES,

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.compute.aggregation.FirstIntByTimestampAggregatorFunctionSupplier;
import org.elasticsearch.compute.aggregation.FirstLongByTimestampAggregatorFunctionSupplier;
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
import org.elasticsearch.xpack.esql.core.InvalidArgumentException;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.expression.UnresolvedAttribute;
Expand Down Expand Up @@ -55,7 +56,10 @@ public class FirstOverTime extends TimeSeriesAggregateFunction implements Option
appliesTo = { @FunctionAppliesTo(lifeCycle = FunctionAppliesToLifecycle.PREVIEW, version = "9.2.0") },
examples = { @Example(file = "k8s-timeseries", tag = "first_over_time") }
)
public FirstOverTime(Source source, @Param(name = "field", type = { "long", "integer", "double" }) Expression field) {
public FirstOverTime(
Source source,
@Param(name = "field", type = { "counter_long", "counter_integer", "counter_double", "long", "integer", "double" }) Expression field
) {
this(source, field, new UnresolvedAttribute(source, "@timestamp"));
}

Expand Down Expand Up @@ -108,21 +112,29 @@ public FirstOverTime withFilter(Expression filter) {

@Override
public DataType dataType() {
return field().dataType();
var dataType = field().dataType();
if (dataType.isCounter()) {
return switch (dataType) {
case COUNTER_DOUBLE -> DataType.DOUBLE;
case COUNTER_INTEGER -> DataType.INTEGER;
case COUNTER_LONG -> DataType.LONG;
default -> throw new InvalidArgumentException("Received an unsupported counter type in FirstOverTime");
};
}
return dataType;
}

@Override
protected TypeResolution resolveType() {
return isType(field(), dt -> dt.isNumeric() && dt != DataType.UNSIGNED_LONG, sourceText(), DEFAULT, "numeric except unsigned_long")
.and(
isType(
timestamp,
dt -> dt == DataType.DATETIME || dt == DataType.DATE_NANOS,
sourceText(),
SECOND,
"date_nanos or datetime"
)
);
return isType(
field(),
dt -> (dt.isNumeric() && dt != DataType.UNSIGNED_LONG) || dt == DataType.AGGREGATE_METRIC_DOUBLE || dt.isCounter(),
sourceText(),
DEFAULT,
"numeric except unsigned_long"
).and(
isType(timestamp, dt -> dt == DataType.DATETIME || dt == DataType.DATE_NANOS, sourceText(), SECOND, "date_nanos or datetime")
);
}

@Override
Expand All @@ -131,9 +143,9 @@ public AggregatorFunctionSupplier supplier() {
// we can read the first encountered value for each group of `_tsid` and time bucket.
final DataType type = field().dataType();
return switch (type) {
case LONG -> new FirstLongByTimestampAggregatorFunctionSupplier();
case INTEGER -> new FirstIntByTimestampAggregatorFunctionSupplier();
case DOUBLE -> new FirstDoubleByTimestampAggregatorFunctionSupplier();
case LONG, COUNTER_LONG -> new FirstLongByTimestampAggregatorFunctionSupplier();
case INTEGER, COUNTER_INTEGER -> new FirstIntByTimestampAggregatorFunctionSupplier();
case DOUBLE, COUNTER_DOUBLE -> new FirstDoubleByTimestampAggregatorFunctionSupplier();
case FLOAT -> new FirstFloatByTimestampAggregatorFunctionSupplier();
default -> throw EsqlIllegalArgumentException.illegalDataType(type);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.compute.aggregation.LastIntByTimestampAggregatorFunctionSupplier;
import org.elasticsearch.compute.aggregation.LastLongByTimestampAggregatorFunctionSupplier;
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
import org.elasticsearch.xpack.esql.core.InvalidArgumentException;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.expression.UnresolvedAttribute;
Expand Down Expand Up @@ -56,7 +57,13 @@ public class LastOverTime extends TimeSeriesAggregateFunction implements Optiona
appliesTo = { @FunctionAppliesTo(lifeCycle = FunctionAppliesToLifecycle.PREVIEW, version = "9.2.0") },
examples = { @Example(file = "k8s-timeseries", tag = "last_over_time") }
)
public LastOverTime(Source source, @Param(name = "field", type = { "long", "integer", "double", "_tsid" }) Expression field) {
public LastOverTime(
Source source,
@Param(
name = "field",
type = { "counter_long", "counter_integer", "counter_double", "long", "integer", "double", "_tsid" }
) Expression field
) {
this(source, field, new UnresolvedAttribute(source, "@timestamp"));
}

Expand Down Expand Up @@ -109,14 +116,23 @@ public LastOverTime withFilter(Expression filter) {

@Override
public DataType dataType() {
return field().dataType();
var dataType = field().dataType();
if (dataType.isCounter()) {
return switch (dataType) {
case COUNTER_DOUBLE -> DataType.DOUBLE;
case COUNTER_INTEGER -> DataType.INTEGER;
case COUNTER_LONG -> DataType.LONG;
default -> throw new InvalidArgumentException("Received an unsupported counter type in LastOverTime");
};
}
return dataType;
}

@Override
protected TypeResolution resolveType() {
return isType(
field(),
dt -> (dt.isNumeric() && dt != DataType.UNSIGNED_LONG) || dt == DataType.TSID_DATA_TYPE,
dt -> (dt.isNumeric() && dt != DataType.UNSIGNED_LONG) || dt == DataType.TSID_DATA_TYPE || dt.isCounter(),
sourceText(),
DEFAULT,
"numeric except unsigned_long"
Expand All @@ -131,9 +147,9 @@ public AggregatorFunctionSupplier supplier() {
// we can read the first encountered value for each group of `_tsid` and time bucket.
final DataType type = field().dataType();
return switch (type) {
case LONG -> new LastLongByTimestampAggregatorFunctionSupplier();
case INTEGER -> new LastIntByTimestampAggregatorFunctionSupplier();
case DOUBLE -> new LastDoubleByTimestampAggregatorFunctionSupplier();
case LONG, COUNTER_LONG -> new LastLongByTimestampAggregatorFunctionSupplier();
case INTEGER, COUNTER_INTEGER -> new LastIntByTimestampAggregatorFunctionSupplier();
case DOUBLE, COUNTER_DOUBLE -> new LastDoubleByTimestampAggregatorFunctionSupplier();
case FLOAT -> new LastFloatByTimestampAggregatorFunctionSupplier();
case TSID_DATA_TYPE -> new LastBytesRefByTimestampAggregatorFunctionSupplier();
default -> throw EsqlIllegalArgumentException.illegalDataType(type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,13 @@ public static Iterable<Object[]> parameters() {
);
for (List<TestCaseSupplier.TypedDataSupplier> valuesSupplier : valuesSuppliers) {
for (TestCaseSupplier.TypedDataSupplier fieldSupplier : valuesSupplier) {
TestCaseSupplier testCaseSupplier = makeSupplier(fieldSupplier);
suppliers.add(testCaseSupplier);
DataType type = fieldSupplier.type();
suppliers.add(makeSupplier(fieldSupplier, type));
switch (type) {
case LONG -> suppliers.add(makeSupplier(fieldSupplier, DataType.COUNTER_LONG));
case DOUBLE -> suppliers.add(makeSupplier(fieldSupplier, DataType.COUNTER_DOUBLE));
case INTEGER -> suppliers.add(makeSupplier(fieldSupplier, DataType.COUNTER_INTEGER));
}
}
}
return parameterSuppliersFromTypedDataWithDefaultChecksNoErrors(suppliers);
Expand All @@ -65,11 +70,11 @@ public void testAggregateIntermediate() {
assumeTrue("time-series aggregation doesn't support ungrouped", false);
}

private static TestCaseSupplier makeSupplier(TestCaseSupplier.TypedDataSupplier fieldSupplier) {
DataType type = fieldSupplier.type();
private static TestCaseSupplier makeSupplier(TestCaseSupplier.TypedDataSupplier fieldSupplier, DataType type) {
return new TestCaseSupplier(fieldSupplier.name(), List.of(type, DataType.DATETIME), () -> {
TestCaseSupplier.TypedData fieldTypedData = fieldSupplier.get();
List<Object> dataRows = fieldTypedData.multiRowData();
fieldTypedData = TestCaseSupplier.TypedData.multiRow(dataRows, type, fieldTypedData.name());
List<Long> timestamps = IntStream.range(0, dataRows.size()).mapToLong(unused -> randomNonNegativeLong()).boxed().toList();
TestCaseSupplier.TypedData timestampsField = TestCaseSupplier.TypedData.multiRow(timestamps, DataType.DATETIME, "timestamps");
Object expected = null;
Expand All @@ -85,8 +90,8 @@ private static TestCaseSupplier makeSupplier(TestCaseSupplier.TypedDataSupplier
}
return new TestCaseSupplier.TestCase(
List.of(fieldTypedData, timestampsField),
standardAggregatorName("First", fieldSupplier.type()) + "ByTimestamp",
type,
standardAggregatorName("First", type) + "ByTimestamp",
fieldSupplier.type(),
equalTo(expected)
);
});
Expand Down
Loading
Loading