From 74a5feca273ccb02d8c71b2e323bf93a1bbe38b3 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 10 Apr 2025 21:44:01 -0700 Subject: [PATCH 1/4] Add last_over_time --- x-pack/plugin/esql/compute/build.gradle | 23 ++ .../LastOverTimeDoubleAggregator.java | 176 ++++++++++++++ .../LastOverTimeFloatAggregator.java | 176 ++++++++++++++ .../LastOverTimeIntAggregator.java | 176 ++++++++++++++ .../LastOverTimeLongAggregator.java | 174 +++++++++++++ ...rTimeDoubleAggregatorFunctionSupplier.java | 46 ++++ ...rTimeDoubleGroupingAggregatorFunction.java | 228 ++++++++++++++++++ ...erTimeFloatAggregatorFunctionSupplier.java | 46 ++++ ...erTimeFloatGroupingAggregatorFunction.java | 228 ++++++++++++++++++ ...OverTimeIntAggregatorFunctionSupplier.java | 46 ++++ ...OverTimeIntGroupingAggregatorFunction.java | 226 +++++++++++++++++ ...verTimeLongAggregatorFunctionSupplier.java | 46 ++++ ...verTimeLongGroupingAggregatorFunction.java | 226 +++++++++++++++++ .../X-LastOverTimeAggregator.java.st | 180 ++++++++++++++ .../main/resources/k8s-timeseries.csv-spec | 19 ++ .../xpack/esql/action/EsqlCapabilities.java | 7 +- .../function/EsqlFunctionRegistry.java | 2 + .../aggregate/AggregateWritables.java | 1 + .../function/aggregate/LastOverTime.java | 141 +++++++++++ .../rest-api-spec/test/esql/60_usage.yml | 4 +- 20 files changed, 2168 insertions(+), 3 deletions(-) create mode 100644 x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeDoubleAggregator.java create mode 100644 x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeFloatAggregator.java create mode 100644 x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeIntAggregator.java create mode 100644 x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeLongAggregator.java create mode 100644 x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeDoubleAggregatorFunctionSupplier.java create mode 100644 x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeDoubleGroupingAggregatorFunction.java create mode 100644 x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeFloatAggregatorFunctionSupplier.java create mode 100644 x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeFloatGroupingAggregatorFunction.java create mode 100644 x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeIntAggregatorFunctionSupplier.java create mode 100644 x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeIntGroupingAggregatorFunction.java create mode 100644 x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeLongAggregatorFunctionSupplier.java create mode 100644 x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeLongGroupingAggregatorFunction.java create mode 100644 x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-LastOverTimeAggregator.java.st create mode 100644 x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/LastOverTime.java diff --git a/x-pack/plugin/esql/compute/build.gradle b/x-pack/plugin/esql/compute/build.gradle index f2eca7aee058f..2d8a2e445ca7e 100644 --- a/x-pack/plugin/esql/compute/build.gradle +++ b/x-pack/plugin/esql/compute/build.gradle @@ -862,4 +862,27 @@ tasks.named('stringTemplates').configure { it.outputFile = "org/elasticsearch/xpack/compute/operator/lookup/EnrichResultBuilderForBoolean.java" } + // TODO: add last_over_time for other types: boolean, bytes_refs + File lastOverTimeAggregatorInputFile = file("src/main/java/org/elasticsearch/compute/aggregation/X-LastOverTimeAggregator.java.st") + template { + it.properties = intProperties + it.inputFile = lastOverTimeAggregatorInputFile + it.outputFile = "org/elasticsearch/compute/aggregation/LastOverTimeIntAggregator.java" + } + template { + it.properties = longProperties + it.inputFile = lastOverTimeAggregatorInputFile + it.outputFile = "org/elasticsearch/compute/aggregation/LastOverTimeLongAggregator.java" + } + template { + it.properties = floatProperties + it.inputFile = lastOverTimeAggregatorInputFile + it.outputFile = "org/elasticsearch/compute/aggregation/LastOverTimeFloatAggregator.java" + } + template { + it.properties = doubleProperties + it.inputFile = lastOverTimeAggregatorInputFile + it.outputFile = "org/elasticsearch/compute/aggregation/LastOverTimeDoubleAggregator.java" + } + } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeDoubleAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeDoubleAggregator.java new file mode 100644 index 0000000000000..dab550a82e7c2 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeDoubleAggregator.java @@ -0,0 +1,176 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.aggregation; + +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.BitArray; +import org.elasticsearch.common.util.DoubleArray; +import org.elasticsearch.common.util.LongArray; +import org.elasticsearch.compute.ann.GroupingAggregator; +import org.elasticsearch.compute.ann.IntermediateState; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.DoubleBlock; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; + +/** + * A time-series aggregation function that collects the last value of each time series in each grouping + * This class is generated. Edit `X-LastOverTimeAggregator.java.st` instead. + */ +@GroupingAggregator( + timeseries = true, + value = { @IntermediateState(name = "timestamps", type = "LONG_BLOCK"), @IntermediateState(name = "values", type = "DOUBLE_BLOCK") } +) +public class LastOverTimeDoubleAggregator { + + public static GroupingState initGrouping(DriverContext driverContext) { + return new GroupingState(driverContext.bigArrays()); + } + + public static void combine(GroupingState current, int groupId, long timestamp, double value) { + current.maybeCollect(groupId, timestamp, value); + } + + public static void combineIntermediate( + GroupingState current, + int groupId, + LongBlock timestamps, // stylecheck + DoubleBlock values, + int otherPosition + ) { + int valueCount = values.getValueCount(otherPosition); + if (valueCount > 0) { + long timestamp = timestamps.getLong(timestamps.getFirstValueIndex(otherPosition)); + int firstIndex = values.getFirstValueIndex(otherPosition); + for (int i = 0; i < valueCount; i++) { + current.maybeCollect(groupId, timestamp, values.getDouble(firstIndex + i)); + } + } + } + + public static void combineStates(GroupingState current, int currentGroupId, GroupingState otherState, int otherGroupId) { + if (otherGroupId < otherState.timestamps.size() && otherState.hasValue(otherGroupId)) { + var timestamp = otherState.timestamps.get(otherGroupId); + var value = otherState.values.get(otherGroupId); + current.maybeCollect(currentGroupId, timestamp, value); + } + } + + public static Block evaluateFinal(GroupingState state, IntVector selected, GroupingAggregatorEvaluationContext evalContext) { + return state.evaluateFinal(selected, evalContext); + } + + public static final class GroupingState implements GroupingAggregatorState, Releasable { + private final BigArrays bigArrays; + private LongArray timestamps; + private DoubleArray values; + private BitArray hasValues = null; + private int maxGroupId = -1; + + GroupingState(BigArrays bigArrays) { + this.bigArrays = bigArrays; + boolean success = false; + LongArray timestamps = null; + DoubleArray values = null; + try { + timestamps = bigArrays.newLongArray(1, false); + values = bigArrays.newDoubleArray(1, false); + this.timestamps = timestamps; + this.values = values; + success = true; + } finally { + if (success == false) { + Releasables.close(timestamps, values); + } + } + } + + void maybeCollect(int groupId, long timestamp, double value) { + if (groupId > maxGroupId) { + timestamps = bigArrays.grow(timestamps, groupId + 1); + values = bigArrays.grow(values, groupId + 1); + timestamps.set(groupId, timestamp); + values.set(groupId, value); + } else { + // TODO: handle multiple values? + if (hasValue(groupId) == false || timestamps.get(groupId) < timestamp) { + timestamps.set(groupId, timestamp); + values.set(groupId, value); + } + } + maybeTrackGroup(groupId); + } + + private void maybeTrackGroup(int groupId) { + if (hasValues != null) { + hasValues.set(groupId, true); + } else { + if (groupId > maxGroupId + 1) { + hasValues = new BitArray(groupId + 1, bigArrays); + if (maxGroupId >= 0) { + hasValues.fill(0, maxGroupId + 1, true); + } + hasValues.set(groupId, true); + } + } + maxGroupId = Math.max(maxGroupId, groupId); + } + + boolean hasValue(long groupId) { + return groupId <= maxGroupId && (hasValues == null || hasValues.get(groupId)); + } + + @Override + public void close() { + Releasables.close(timestamps, values, hasValues); + } + + @Override + public void toIntermediate(Block[] blocks, int offset, IntVector selected, DriverContext driverContext) { + try ( + var timestampsBuilder = driverContext.blockFactory().newLongBlockBuilder(selected.getPositionCount()); + var valuesBuilder = driverContext.blockFactory().newDoubleBlockBuilder(selected.getPositionCount()) + ) { + for (int p = 0; p < selected.getPositionCount(); p++) { + int group = selected.getInt(p); + if (hasValue(group)) { + timestampsBuilder.appendLong(timestamps.get(group)); + valuesBuilder.appendDouble(values.get(group)); + } else { + timestampsBuilder.appendNull(); + valuesBuilder.appendNull(); + } + } + blocks[offset] = timestampsBuilder.build(); + blocks[offset + 1] = valuesBuilder.build(); + } + } + + Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext evalContext) { + try (var builder = evalContext.blockFactory().newDoubleBlockBuilder(selected.getPositionCount())) { + for (int p = 0; p < selected.getPositionCount(); p++) { + int group = selected.getInt(p); + if (hasValue(group)) { + builder.appendDouble(values.get(group)); + } else { + builder.appendNull(); + } + } + return builder.build(); + } + } + + @Override + public void enableGroupIdTracking(SeenGroupIds seenGroupIds) { + // tracking via hasValues + } + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeFloatAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeFloatAggregator.java new file mode 100644 index 0000000000000..a63084347f64e --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeFloatAggregator.java @@ -0,0 +1,176 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.aggregation; + +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.BitArray; +import org.elasticsearch.common.util.FloatArray; +import org.elasticsearch.common.util.LongArray; +import org.elasticsearch.compute.ann.GroupingAggregator; +import org.elasticsearch.compute.ann.IntermediateState; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.FloatBlock; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; + +/** + * A time-series aggregation function that collects the last value of each time series in each grouping + * This class is generated. Edit `X-LastOverTimeAggregator.java.st` instead. + */ +@GroupingAggregator( + timeseries = true, + value = { @IntermediateState(name = "timestamps", type = "LONG_BLOCK"), @IntermediateState(name = "values", type = "FLOAT_BLOCK") } +) +public class LastOverTimeFloatAggregator { + + public static GroupingState initGrouping(DriverContext driverContext) { + return new GroupingState(driverContext.bigArrays()); + } + + public static void combine(GroupingState current, int groupId, long timestamp, float value) { + current.maybeCollect(groupId, timestamp, value); + } + + public static void combineIntermediate( + GroupingState current, + int groupId, + LongBlock timestamps, // stylecheck + FloatBlock values, + int otherPosition + ) { + int valueCount = values.getValueCount(otherPosition); + if (valueCount > 0) { + long timestamp = timestamps.getLong(timestamps.getFirstValueIndex(otherPosition)); + int firstIndex = values.getFirstValueIndex(otherPosition); + for (int i = 0; i < valueCount; i++) { + current.maybeCollect(groupId, timestamp, values.getFloat(firstIndex + i)); + } + } + } + + public static void combineStates(GroupingState current, int currentGroupId, GroupingState otherState, int otherGroupId) { + if (otherGroupId < otherState.timestamps.size() && otherState.hasValue(otherGroupId)) { + var timestamp = otherState.timestamps.get(otherGroupId); + var value = otherState.values.get(otherGroupId); + current.maybeCollect(currentGroupId, timestamp, value); + } + } + + public static Block evaluateFinal(GroupingState state, IntVector selected, GroupingAggregatorEvaluationContext evalContext) { + return state.evaluateFinal(selected, evalContext); + } + + public static final class GroupingState implements GroupingAggregatorState, Releasable { + private final BigArrays bigArrays; + private LongArray timestamps; + private FloatArray values; + private BitArray hasValues = null; + private int maxGroupId = -1; + + GroupingState(BigArrays bigArrays) { + this.bigArrays = bigArrays; + boolean success = false; + LongArray timestamps = null; + FloatArray values = null; + try { + timestamps = bigArrays.newLongArray(1, false); + values = bigArrays.newFloatArray(1, false); + this.timestamps = timestamps; + this.values = values; + success = true; + } finally { + if (success == false) { + Releasables.close(timestamps, values); + } + } + } + + void maybeCollect(int groupId, long timestamp, float value) { + if (groupId > maxGroupId) { + timestamps = bigArrays.grow(timestamps, groupId + 1); + values = bigArrays.grow(values, groupId + 1); + timestamps.set(groupId, timestamp); + values.set(groupId, value); + } else { + // TODO: handle multiple values? + if (hasValue(groupId) == false || timestamps.get(groupId) < timestamp) { + timestamps.set(groupId, timestamp); + values.set(groupId, value); + } + } + maybeTrackGroup(groupId); + } + + private void maybeTrackGroup(int groupId) { + if (hasValues != null) { + hasValues.set(groupId, true); + } else { + if (groupId > maxGroupId + 1) { + hasValues = new BitArray(groupId + 1, bigArrays); + if (maxGroupId >= 0) { + hasValues.fill(0, maxGroupId + 1, true); + } + hasValues.set(groupId, true); + } + } + maxGroupId = Math.max(maxGroupId, groupId); + } + + boolean hasValue(long groupId) { + return groupId <= maxGroupId && (hasValues == null || hasValues.get(groupId)); + } + + @Override + public void close() { + Releasables.close(timestamps, values, hasValues); + } + + @Override + public void toIntermediate(Block[] blocks, int offset, IntVector selected, DriverContext driverContext) { + try ( + var timestampsBuilder = driverContext.blockFactory().newLongBlockBuilder(selected.getPositionCount()); + var valuesBuilder = driverContext.blockFactory().newFloatBlockBuilder(selected.getPositionCount()) + ) { + for (int p = 0; p < selected.getPositionCount(); p++) { + int group = selected.getInt(p); + if (hasValue(group)) { + timestampsBuilder.appendLong(timestamps.get(group)); + valuesBuilder.appendFloat(values.get(group)); + } else { + timestampsBuilder.appendNull(); + valuesBuilder.appendNull(); + } + } + blocks[offset] = timestampsBuilder.build(); + blocks[offset + 1] = valuesBuilder.build(); + } + } + + Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext evalContext) { + try (var builder = evalContext.blockFactory().newFloatBlockBuilder(selected.getPositionCount())) { + for (int p = 0; p < selected.getPositionCount(); p++) { + int group = selected.getInt(p); + if (hasValue(group)) { + builder.appendFloat(values.get(group)); + } else { + builder.appendNull(); + } + } + return builder.build(); + } + } + + @Override + public void enableGroupIdTracking(SeenGroupIds seenGroupIds) { + // tracking via hasValues + } + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeIntAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeIntAggregator.java new file mode 100644 index 0000000000000..2b395ec6b713f --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeIntAggregator.java @@ -0,0 +1,176 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.aggregation; + +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.BitArray; +import org.elasticsearch.common.util.IntArray; +import org.elasticsearch.common.util.LongArray; +import org.elasticsearch.compute.ann.GroupingAggregator; +import org.elasticsearch.compute.ann.IntermediateState; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.IntBlock; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; + +/** + * A time-series aggregation function that collects the last value of each time series in each grouping + * This class is generated. Edit `X-LastOverTimeAggregator.java.st` instead. + */ +@GroupingAggregator( + timeseries = true, + value = { @IntermediateState(name = "timestamps", type = "LONG_BLOCK"), @IntermediateState(name = "values", type = "INT_BLOCK") } +) +public class LastOverTimeIntAggregator { + + public static GroupingState initGrouping(DriverContext driverContext) { + return new GroupingState(driverContext.bigArrays()); + } + + public static void combine(GroupingState current, int groupId, long timestamp, int value) { + current.maybeCollect(groupId, timestamp, value); + } + + public static void combineIntermediate( + GroupingState current, + int groupId, + LongBlock timestamps, // stylecheck + IntBlock values, + int otherPosition + ) { + int valueCount = values.getValueCount(otherPosition); + if (valueCount > 0) { + long timestamp = timestamps.getLong(timestamps.getFirstValueIndex(otherPosition)); + int firstIndex = values.getFirstValueIndex(otherPosition); + for (int i = 0; i < valueCount; i++) { + current.maybeCollect(groupId, timestamp, values.getInt(firstIndex + i)); + } + } + } + + public static void combineStates(GroupingState current, int currentGroupId, GroupingState otherState, int otherGroupId) { + if (otherGroupId < otherState.timestamps.size() && otherState.hasValue(otherGroupId)) { + var timestamp = otherState.timestamps.get(otherGroupId); + var value = otherState.values.get(otherGroupId); + current.maybeCollect(currentGroupId, timestamp, value); + } + } + + public static Block evaluateFinal(GroupingState state, IntVector selected, GroupingAggregatorEvaluationContext evalContext) { + return state.evaluateFinal(selected, evalContext); + } + + public static final class GroupingState implements GroupingAggregatorState, Releasable { + private final BigArrays bigArrays; + private LongArray timestamps; + private IntArray values; + private BitArray hasValues = null; + private int maxGroupId = -1; + + GroupingState(BigArrays bigArrays) { + this.bigArrays = bigArrays; + boolean success = false; + LongArray timestamps = null; + IntArray values = null; + try { + timestamps = bigArrays.newLongArray(1, false); + values = bigArrays.newIntArray(1, false); + this.timestamps = timestamps; + this.values = values; + success = true; + } finally { + if (success == false) { + Releasables.close(timestamps, values); + } + } + } + + void maybeCollect(int groupId, long timestamp, int value) { + if (groupId > maxGroupId) { + timestamps = bigArrays.grow(timestamps, groupId + 1); + values = bigArrays.grow(values, groupId + 1); + timestamps.set(groupId, timestamp); + values.set(groupId, value); + } else { + // TODO: handle multiple values? + if (hasValue(groupId) == false || timestamps.get(groupId) < timestamp) { + timestamps.set(groupId, timestamp); + values.set(groupId, value); + } + } + maybeTrackGroup(groupId); + } + + private void maybeTrackGroup(int groupId) { + if (hasValues != null) { + hasValues.set(groupId, true); + } else { + if (groupId > maxGroupId + 1) { + hasValues = new BitArray(groupId + 1, bigArrays); + if (maxGroupId >= 0) { + hasValues.fill(0, maxGroupId + 1, true); + } + hasValues.set(groupId, true); + } + } + maxGroupId = Math.max(maxGroupId, groupId); + } + + boolean hasValue(long groupId) { + return groupId <= maxGroupId && (hasValues == null || hasValues.get(groupId)); + } + + @Override + public void close() { + Releasables.close(timestamps, values, hasValues); + } + + @Override + public void toIntermediate(Block[] blocks, int offset, IntVector selected, DriverContext driverContext) { + try ( + var timestampsBuilder = driverContext.blockFactory().newLongBlockBuilder(selected.getPositionCount()); + var valuesBuilder = driverContext.blockFactory().newIntBlockBuilder(selected.getPositionCount()) + ) { + for (int p = 0; p < selected.getPositionCount(); p++) { + int group = selected.getInt(p); + if (hasValue(group)) { + timestampsBuilder.appendLong(timestamps.get(group)); + valuesBuilder.appendInt(values.get(group)); + } else { + timestampsBuilder.appendNull(); + valuesBuilder.appendNull(); + } + } + blocks[offset] = timestampsBuilder.build(); + blocks[offset + 1] = valuesBuilder.build(); + } + } + + Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext evalContext) { + try (var builder = evalContext.blockFactory().newIntBlockBuilder(selected.getPositionCount())) { + for (int p = 0; p < selected.getPositionCount(); p++) { + int group = selected.getInt(p); + if (hasValue(group)) { + builder.appendInt(values.get(group)); + } else { + builder.appendNull(); + } + } + return builder.build(); + } + } + + @Override + public void enableGroupIdTracking(SeenGroupIds seenGroupIds) { + // tracking via hasValues + } + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeLongAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeLongAggregator.java new file mode 100644 index 0000000000000..9af5d27f5523b --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeLongAggregator.java @@ -0,0 +1,174 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.aggregation; + +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.BitArray; +import org.elasticsearch.common.util.LongArray; +import org.elasticsearch.compute.ann.GroupingAggregator; +import org.elasticsearch.compute.ann.IntermediateState; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; + +/** + * A time-series aggregation function that collects the last value of each time series in each grouping + * This class is generated. Edit `X-LastOverTimeAggregator.java.st` instead. + */ +@GroupingAggregator( + timeseries = true, + value = { @IntermediateState(name = "timestamps", type = "LONG_BLOCK"), @IntermediateState(name = "values", type = "LONG_BLOCK") } +) +public class LastOverTimeLongAggregator { + + public static GroupingState initGrouping(DriverContext driverContext) { + return new GroupingState(driverContext.bigArrays()); + } + + public static void combine(GroupingState current, int groupId, long timestamp, long value) { + current.maybeCollect(groupId, timestamp, value); + } + + public static void combineIntermediate( + GroupingState current, + int groupId, + LongBlock timestamps, // stylecheck + LongBlock values, + int otherPosition + ) { + int valueCount = values.getValueCount(otherPosition); + if (valueCount > 0) { + long timestamp = timestamps.getLong(timestamps.getFirstValueIndex(otherPosition)); + int firstIndex = values.getFirstValueIndex(otherPosition); + for (int i = 0; i < valueCount; i++) { + current.maybeCollect(groupId, timestamp, values.getLong(firstIndex + i)); + } + } + } + + public static void combineStates(GroupingState current, int currentGroupId, GroupingState otherState, int otherGroupId) { + if (otherGroupId < otherState.timestamps.size() && otherState.hasValue(otherGroupId)) { + var timestamp = otherState.timestamps.get(otherGroupId); + var value = otherState.values.get(otherGroupId); + current.maybeCollect(currentGroupId, timestamp, value); + } + } + + public static Block evaluateFinal(GroupingState state, IntVector selected, GroupingAggregatorEvaluationContext evalContext) { + return state.evaluateFinal(selected, evalContext); + } + + public static final class GroupingState implements GroupingAggregatorState, Releasable { + private final BigArrays bigArrays; + private LongArray timestamps; + private LongArray values; + private BitArray hasValues = null; + private int maxGroupId = -1; + + GroupingState(BigArrays bigArrays) { + this.bigArrays = bigArrays; + boolean success = false; + LongArray timestamps = null; + LongArray values = null; + try { + timestamps = bigArrays.newLongArray(1, false); + values = bigArrays.newLongArray(1, false); + this.timestamps = timestamps; + this.values = values; + success = true; + } finally { + if (success == false) { + Releasables.close(timestamps, values); + } + } + } + + void maybeCollect(int groupId, long timestamp, long value) { + if (groupId > maxGroupId) { + timestamps = bigArrays.grow(timestamps, groupId + 1); + values = bigArrays.grow(values, groupId + 1); + timestamps.set(groupId, timestamp); + values.set(groupId, value); + } else { + // TODO: handle multiple values? + if (hasValue(groupId) == false || timestamps.get(groupId) < timestamp) { + timestamps.set(groupId, timestamp); + values.set(groupId, value); + } + } + maybeTrackGroup(groupId); + } + + private void maybeTrackGroup(int groupId) { + if (hasValues != null) { + hasValues.set(groupId, true); + } else { + if (groupId > maxGroupId + 1) { + hasValues = new BitArray(groupId + 1, bigArrays); + if (maxGroupId >= 0) { + hasValues.fill(0, maxGroupId + 1, true); + } + hasValues.set(groupId, true); + } + } + maxGroupId = Math.max(maxGroupId, groupId); + } + + boolean hasValue(long groupId) { + return groupId <= maxGroupId && (hasValues == null || hasValues.get(groupId)); + } + + @Override + public void close() { + Releasables.close(timestamps, values, hasValues); + } + + @Override + public void toIntermediate(Block[] blocks, int offset, IntVector selected, DriverContext driverContext) { + try ( + var timestampsBuilder = driverContext.blockFactory().newLongBlockBuilder(selected.getPositionCount()); + var valuesBuilder = driverContext.blockFactory().newLongBlockBuilder(selected.getPositionCount()) + ) { + for (int p = 0; p < selected.getPositionCount(); p++) { + int group = selected.getInt(p); + if (hasValue(group)) { + timestampsBuilder.appendLong(timestamps.get(group)); + valuesBuilder.appendLong(values.get(group)); + } else { + timestampsBuilder.appendNull(); + valuesBuilder.appendNull(); + } + } + blocks[offset] = timestampsBuilder.build(); + blocks[offset + 1] = valuesBuilder.build(); + } + } + + Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext evalContext) { + try (var builder = evalContext.blockFactory().newLongBlockBuilder(selected.getPositionCount())) { + for (int p = 0; p < selected.getPositionCount(); p++) { + int group = selected.getInt(p); + if (hasValue(group)) { + builder.appendLong(values.get(group)); + } else { + builder.appendNull(); + } + } + return builder.build(); + } + } + + @Override + public void enableGroupIdTracking(SeenGroupIds seenGroupIds) { + // tracking via hasValues + } + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeDoubleAggregatorFunctionSupplier.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeDoubleAggregatorFunctionSupplier.java new file mode 100644 index 0000000000000..b45b106373539 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeDoubleAggregatorFunctionSupplier.java @@ -0,0 +1,46 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.util.List; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link AggregatorFunctionSupplier} implementation for {@link LastOverTimeDoubleAggregator}. + * This class is generated. Edit {@code AggregatorFunctionSupplierImplementer} instead. + */ +public final class LastOverTimeDoubleAggregatorFunctionSupplier implements AggregatorFunctionSupplier { + public LastOverTimeDoubleAggregatorFunctionSupplier() { + } + + @Override + public List nonGroupingIntermediateStateDesc() { + throw new UnsupportedOperationException("non-grouping aggregator is not supported"); + } + + @Override + public List groupingIntermediateStateDesc() { + return LastOverTimeDoubleGroupingAggregatorFunction.intermediateStateDesc(); + } + + @Override + public AggregatorFunction aggregator(DriverContext driverContext, List channels) { + throw new UnsupportedOperationException("non-grouping aggregator is not supported"); + } + + @Override + public LastOverTimeDoubleGroupingAggregatorFunction groupingAggregator( + DriverContext driverContext, List channels) { + return LastOverTimeDoubleGroupingAggregatorFunction.create(channels, driverContext); + } + + @Override + public String describe() { + return "last_over_time of doubles"; + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeDoubleGroupingAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeDoubleGroupingAggregatorFunction.java new file mode 100644 index 0000000000000..2ca6ab02875a2 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeDoubleGroupingAggregatorFunction.java @@ -0,0 +1,228 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.lang.StringBuilder; +import java.util.List; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.DoubleBlock; +import org.elasticsearch.compute.data.DoubleVector; +import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.IntBlock; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.data.LongVector; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link GroupingAggregatorFunction} implementation for {@link LastOverTimeDoubleAggregator}. + * This class is generated. Edit {@code GroupingAggregatorImplementer} instead. + */ +public final class LastOverTimeDoubleGroupingAggregatorFunction implements GroupingAggregatorFunction { + private static final List INTERMEDIATE_STATE_DESC = List.of( + new IntermediateStateDesc("timestamps", ElementType.LONG), + new IntermediateStateDesc("values", ElementType.DOUBLE) ); + + private final LastOverTimeDoubleAggregator.GroupingState state; + + private final List channels; + + private final DriverContext driverContext; + + public LastOverTimeDoubleGroupingAggregatorFunction(List channels, + LastOverTimeDoubleAggregator.GroupingState state, DriverContext driverContext) { + this.channels = channels; + this.state = state; + this.driverContext = driverContext; + } + + public static LastOverTimeDoubleGroupingAggregatorFunction create(List channels, + DriverContext driverContext) { + return new LastOverTimeDoubleGroupingAggregatorFunction(channels, LastOverTimeDoubleAggregator.initGrouping(driverContext), driverContext); + } + + public static List intermediateStateDesc() { + return INTERMEDIATE_STATE_DESC; + } + + @Override + public int intermediateBlockCount() { + return INTERMEDIATE_STATE_DESC.size(); + } + + @Override + public GroupingAggregatorFunction.AddInput prepareProcessPage(SeenGroupIds seenGroupIds, + Page page) { + DoubleBlock valuesBlock = page.getBlock(channels.get(0)); + DoubleVector valuesVector = valuesBlock.asVector(); + LongBlock timestampsBlock = page.getBlock(channels.get(1)); + LongVector timestampsVector = timestampsBlock.asVector(); + if (timestampsVector == null) { + throw new IllegalStateException("expected @timestamp vector; but got a block"); + } + if (valuesVector == null) { + if (valuesBlock.mayHaveNulls()) { + state.enableGroupIdTracking(seenGroupIds); + } + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntBlock groupIds) { + addRawInput(positionOffset, groupIds, valuesBlock, timestampsVector); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, valuesBlock, timestampsVector); + } + + @Override + public void close() { + } + }; + } + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntBlock groupIds) { + addRawInput(positionOffset, groupIds, valuesVector, timestampsVector); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, valuesVector, timestampsVector); + } + + @Override + public void close() { + } + }; + } + + private void addRawInput(int positionOffset, IntVector groups, DoubleBlock values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + if (values.isNull(groupPosition + positionOffset)) { + continue; + } + int valuesStart = values.getFirstValueIndex(groupPosition + positionOffset); + int valuesEnd = valuesStart + values.getValueCount(groupPosition + positionOffset); + for (int v = valuesStart; v < valuesEnd; v++) { + LastOverTimeDoubleAggregator.combine(state, groupId, timestamps.getLong(v), values.getDouble(v)); + } + } + } + + private void addRawInput(int positionOffset, IntVector groups, DoubleVector values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + var valuePosition = groupPosition + positionOffset; + LastOverTimeDoubleAggregator.combine(state, groupId, timestamps.getLong(valuePosition), values.getDouble(valuePosition)); + } + } + + private void addRawInput(int positionOffset, IntBlock groups, DoubleBlock values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + if (values.isNull(groupPosition + positionOffset)) { + continue; + } + int valuesStart = values.getFirstValueIndex(groupPosition + positionOffset); + int valuesEnd = valuesStart + values.getValueCount(groupPosition + positionOffset); + for (int v = valuesStart; v < valuesEnd; v++) { + LastOverTimeDoubleAggregator.combine(state, groupId, timestamps.getLong(v), values.getDouble(v)); + } + } + } + } + + private void addRawInput(int positionOffset, IntBlock groups, DoubleVector values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + var valuePosition = groupPosition + positionOffset; + LastOverTimeDoubleAggregator.combine(state, groupId, timestamps.getLong(valuePosition), values.getDouble(valuePosition)); + } + } + } + + @Override + public void selectedMayContainUnseenGroups(SeenGroupIds seenGroupIds) { + state.enableGroupIdTracking(seenGroupIds); + } + + @Override + public void addIntermediateInput(int positionOffset, IntVector groups, Page page) { + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + assert channels.size() == intermediateBlockCount(); + Block timestampsUncast = page.getBlock(channels.get(0)); + if (timestampsUncast.areAllValuesNull()) { + return; + } + LongBlock timestamps = (LongBlock) timestampsUncast; + Block valuesUncast = page.getBlock(channels.get(1)); + if (valuesUncast.areAllValuesNull()) { + return; + } + DoubleBlock values = (DoubleBlock) valuesUncast; + assert timestamps.getPositionCount() == values.getPositionCount(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + LastOverTimeDoubleAggregator.combineIntermediate(state, groupId, timestamps, values, groupPosition + positionOffset); + } + } + + @Override + public void addIntermediateRowInput(int groupId, GroupingAggregatorFunction input, int position) { + if (input.getClass() != getClass()) { + throw new IllegalArgumentException("expected " + getClass() + "; got " + input.getClass()); + } + LastOverTimeDoubleAggregator.GroupingState inState = ((LastOverTimeDoubleGroupingAggregatorFunction) input).state; + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + LastOverTimeDoubleAggregator.combineStates(state, groupId, inState, position); + } + + @Override + public void evaluateIntermediate(Block[] blocks, int offset, IntVector selected) { + state.toIntermediate(blocks, offset, selected, driverContext); + } + + @Override + public void evaluateFinal(Block[] blocks, int offset, IntVector selected, + GroupingAggregatorEvaluationContext evaluatorContext) { + blocks[offset] = LastOverTimeDoubleAggregator.evaluateFinal(state, selected, evaluatorContext); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(getClass().getSimpleName()).append("["); + sb.append("channels=").append(channels); + sb.append("]"); + return sb.toString(); + } + + @Override + public void close() { + state.close(); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeFloatAggregatorFunctionSupplier.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeFloatAggregatorFunctionSupplier.java new file mode 100644 index 0000000000000..17a714939a460 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeFloatAggregatorFunctionSupplier.java @@ -0,0 +1,46 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.util.List; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link AggregatorFunctionSupplier} implementation for {@link LastOverTimeFloatAggregator}. + * This class is generated. Edit {@code AggregatorFunctionSupplierImplementer} instead. + */ +public final class LastOverTimeFloatAggregatorFunctionSupplier implements AggregatorFunctionSupplier { + public LastOverTimeFloatAggregatorFunctionSupplier() { + } + + @Override + public List nonGroupingIntermediateStateDesc() { + throw new UnsupportedOperationException("non-grouping aggregator is not supported"); + } + + @Override + public List groupingIntermediateStateDesc() { + return LastOverTimeFloatGroupingAggregatorFunction.intermediateStateDesc(); + } + + @Override + public AggregatorFunction aggregator(DriverContext driverContext, List channels) { + throw new UnsupportedOperationException("non-grouping aggregator is not supported"); + } + + @Override + public LastOverTimeFloatGroupingAggregatorFunction groupingAggregator(DriverContext driverContext, + List channels) { + return LastOverTimeFloatGroupingAggregatorFunction.create(channels, driverContext); + } + + @Override + public String describe() { + return "last_over_time of floats"; + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeFloatGroupingAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeFloatGroupingAggregatorFunction.java new file mode 100644 index 0000000000000..38a3b23ee8cc5 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeFloatGroupingAggregatorFunction.java @@ -0,0 +1,228 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.lang.StringBuilder; +import java.util.List; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.FloatBlock; +import org.elasticsearch.compute.data.FloatVector; +import org.elasticsearch.compute.data.IntBlock; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.data.LongVector; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link GroupingAggregatorFunction} implementation for {@link LastOverTimeFloatAggregator}. + * This class is generated. Edit {@code GroupingAggregatorImplementer} instead. + */ +public final class LastOverTimeFloatGroupingAggregatorFunction implements GroupingAggregatorFunction { + private static final List INTERMEDIATE_STATE_DESC = List.of( + new IntermediateStateDesc("timestamps", ElementType.LONG), + new IntermediateStateDesc("values", ElementType.FLOAT) ); + + private final LastOverTimeFloatAggregator.GroupingState state; + + private final List channels; + + private final DriverContext driverContext; + + public LastOverTimeFloatGroupingAggregatorFunction(List channels, + LastOverTimeFloatAggregator.GroupingState state, DriverContext driverContext) { + this.channels = channels; + this.state = state; + this.driverContext = driverContext; + } + + public static LastOverTimeFloatGroupingAggregatorFunction create(List channels, + DriverContext driverContext) { + return new LastOverTimeFloatGroupingAggregatorFunction(channels, LastOverTimeFloatAggregator.initGrouping(driverContext), driverContext); + } + + public static List intermediateStateDesc() { + return INTERMEDIATE_STATE_DESC; + } + + @Override + public int intermediateBlockCount() { + return INTERMEDIATE_STATE_DESC.size(); + } + + @Override + public GroupingAggregatorFunction.AddInput prepareProcessPage(SeenGroupIds seenGroupIds, + Page page) { + FloatBlock valuesBlock = page.getBlock(channels.get(0)); + FloatVector valuesVector = valuesBlock.asVector(); + LongBlock timestampsBlock = page.getBlock(channels.get(1)); + LongVector timestampsVector = timestampsBlock.asVector(); + if (timestampsVector == null) { + throw new IllegalStateException("expected @timestamp vector; but got a block"); + } + if (valuesVector == null) { + if (valuesBlock.mayHaveNulls()) { + state.enableGroupIdTracking(seenGroupIds); + } + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntBlock groupIds) { + addRawInput(positionOffset, groupIds, valuesBlock, timestampsVector); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, valuesBlock, timestampsVector); + } + + @Override + public void close() { + } + }; + } + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntBlock groupIds) { + addRawInput(positionOffset, groupIds, valuesVector, timestampsVector); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, valuesVector, timestampsVector); + } + + @Override + public void close() { + } + }; + } + + private void addRawInput(int positionOffset, IntVector groups, FloatBlock values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + if (values.isNull(groupPosition + positionOffset)) { + continue; + } + int valuesStart = values.getFirstValueIndex(groupPosition + positionOffset); + int valuesEnd = valuesStart + values.getValueCount(groupPosition + positionOffset); + for (int v = valuesStart; v < valuesEnd; v++) { + LastOverTimeFloatAggregator.combine(state, groupId, timestamps.getLong(v), values.getFloat(v)); + } + } + } + + private void addRawInput(int positionOffset, IntVector groups, FloatVector values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + var valuePosition = groupPosition + positionOffset; + LastOverTimeFloatAggregator.combine(state, groupId, timestamps.getLong(valuePosition), values.getFloat(valuePosition)); + } + } + + private void addRawInput(int positionOffset, IntBlock groups, FloatBlock values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + if (values.isNull(groupPosition + positionOffset)) { + continue; + } + int valuesStart = values.getFirstValueIndex(groupPosition + positionOffset); + int valuesEnd = valuesStart + values.getValueCount(groupPosition + positionOffset); + for (int v = valuesStart; v < valuesEnd; v++) { + LastOverTimeFloatAggregator.combine(state, groupId, timestamps.getLong(v), values.getFloat(v)); + } + } + } + } + + private void addRawInput(int positionOffset, IntBlock groups, FloatVector values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + var valuePosition = groupPosition + positionOffset; + LastOverTimeFloatAggregator.combine(state, groupId, timestamps.getLong(valuePosition), values.getFloat(valuePosition)); + } + } + } + + @Override + public void selectedMayContainUnseenGroups(SeenGroupIds seenGroupIds) { + state.enableGroupIdTracking(seenGroupIds); + } + + @Override + public void addIntermediateInput(int positionOffset, IntVector groups, Page page) { + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + assert channels.size() == intermediateBlockCount(); + Block timestampsUncast = page.getBlock(channels.get(0)); + if (timestampsUncast.areAllValuesNull()) { + return; + } + LongBlock timestamps = (LongBlock) timestampsUncast; + Block valuesUncast = page.getBlock(channels.get(1)); + if (valuesUncast.areAllValuesNull()) { + return; + } + FloatBlock values = (FloatBlock) valuesUncast; + assert timestamps.getPositionCount() == values.getPositionCount(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + LastOverTimeFloatAggregator.combineIntermediate(state, groupId, timestamps, values, groupPosition + positionOffset); + } + } + + @Override + public void addIntermediateRowInput(int groupId, GroupingAggregatorFunction input, int position) { + if (input.getClass() != getClass()) { + throw new IllegalArgumentException("expected " + getClass() + "; got " + input.getClass()); + } + LastOverTimeFloatAggregator.GroupingState inState = ((LastOverTimeFloatGroupingAggregatorFunction) input).state; + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + LastOverTimeFloatAggregator.combineStates(state, groupId, inState, position); + } + + @Override + public void evaluateIntermediate(Block[] blocks, int offset, IntVector selected) { + state.toIntermediate(blocks, offset, selected, driverContext); + } + + @Override + public void evaluateFinal(Block[] blocks, int offset, IntVector selected, + GroupingAggregatorEvaluationContext evaluatorContext) { + blocks[offset] = LastOverTimeFloatAggregator.evaluateFinal(state, selected, evaluatorContext); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(getClass().getSimpleName()).append("["); + sb.append("channels=").append(channels); + sb.append("]"); + return sb.toString(); + } + + @Override + public void close() { + state.close(); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeIntAggregatorFunctionSupplier.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeIntAggregatorFunctionSupplier.java new file mode 100644 index 0000000000000..55230f932c681 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeIntAggregatorFunctionSupplier.java @@ -0,0 +1,46 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.util.List; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link AggregatorFunctionSupplier} implementation for {@link LastOverTimeIntAggregator}. + * This class is generated. Edit {@code AggregatorFunctionSupplierImplementer} instead. + */ +public final class LastOverTimeIntAggregatorFunctionSupplier implements AggregatorFunctionSupplier { + public LastOverTimeIntAggregatorFunctionSupplier() { + } + + @Override + public List nonGroupingIntermediateStateDesc() { + throw new UnsupportedOperationException("non-grouping aggregator is not supported"); + } + + @Override + public List groupingIntermediateStateDesc() { + return LastOverTimeIntGroupingAggregatorFunction.intermediateStateDesc(); + } + + @Override + public AggregatorFunction aggregator(DriverContext driverContext, List channels) { + throw new UnsupportedOperationException("non-grouping aggregator is not supported"); + } + + @Override + public LastOverTimeIntGroupingAggregatorFunction groupingAggregator(DriverContext driverContext, + List channels) { + return LastOverTimeIntGroupingAggregatorFunction.create(channels, driverContext); + } + + @Override + public String describe() { + return "last_over_time of ints"; + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeIntGroupingAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeIntGroupingAggregatorFunction.java new file mode 100644 index 0000000000000..f03728a905ac3 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeIntGroupingAggregatorFunction.java @@ -0,0 +1,226 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.lang.StringBuilder; +import java.util.List; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.IntBlock; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.data.LongVector; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link GroupingAggregatorFunction} implementation for {@link LastOverTimeIntAggregator}. + * This class is generated. Edit {@code GroupingAggregatorImplementer} instead. + */ +public final class LastOverTimeIntGroupingAggregatorFunction implements GroupingAggregatorFunction { + private static final List INTERMEDIATE_STATE_DESC = List.of( + new IntermediateStateDesc("timestamps", ElementType.LONG), + new IntermediateStateDesc("values", ElementType.INT) ); + + private final LastOverTimeIntAggregator.GroupingState state; + + private final List channels; + + private final DriverContext driverContext; + + public LastOverTimeIntGroupingAggregatorFunction(List channels, + LastOverTimeIntAggregator.GroupingState state, DriverContext driverContext) { + this.channels = channels; + this.state = state; + this.driverContext = driverContext; + } + + public static LastOverTimeIntGroupingAggregatorFunction create(List channels, + DriverContext driverContext) { + return new LastOverTimeIntGroupingAggregatorFunction(channels, LastOverTimeIntAggregator.initGrouping(driverContext), driverContext); + } + + public static List intermediateStateDesc() { + return INTERMEDIATE_STATE_DESC; + } + + @Override + public int intermediateBlockCount() { + return INTERMEDIATE_STATE_DESC.size(); + } + + @Override + public GroupingAggregatorFunction.AddInput prepareProcessPage(SeenGroupIds seenGroupIds, + Page page) { + IntBlock valuesBlock = page.getBlock(channels.get(0)); + IntVector valuesVector = valuesBlock.asVector(); + LongBlock timestampsBlock = page.getBlock(channels.get(1)); + LongVector timestampsVector = timestampsBlock.asVector(); + if (timestampsVector == null) { + throw new IllegalStateException("expected @timestamp vector; but got a block"); + } + if (valuesVector == null) { + if (valuesBlock.mayHaveNulls()) { + state.enableGroupIdTracking(seenGroupIds); + } + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntBlock groupIds) { + addRawInput(positionOffset, groupIds, valuesBlock, timestampsVector); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, valuesBlock, timestampsVector); + } + + @Override + public void close() { + } + }; + } + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntBlock groupIds) { + addRawInput(positionOffset, groupIds, valuesVector, timestampsVector); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, valuesVector, timestampsVector); + } + + @Override + public void close() { + } + }; + } + + private void addRawInput(int positionOffset, IntVector groups, IntBlock values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + if (values.isNull(groupPosition + positionOffset)) { + continue; + } + int valuesStart = values.getFirstValueIndex(groupPosition + positionOffset); + int valuesEnd = valuesStart + values.getValueCount(groupPosition + positionOffset); + for (int v = valuesStart; v < valuesEnd; v++) { + LastOverTimeIntAggregator.combine(state, groupId, timestamps.getLong(v), values.getInt(v)); + } + } + } + + private void addRawInput(int positionOffset, IntVector groups, IntVector values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + var valuePosition = groupPosition + positionOffset; + LastOverTimeIntAggregator.combine(state, groupId, timestamps.getLong(valuePosition), values.getInt(valuePosition)); + } + } + + private void addRawInput(int positionOffset, IntBlock groups, IntBlock values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + if (values.isNull(groupPosition + positionOffset)) { + continue; + } + int valuesStart = values.getFirstValueIndex(groupPosition + positionOffset); + int valuesEnd = valuesStart + values.getValueCount(groupPosition + positionOffset); + for (int v = valuesStart; v < valuesEnd; v++) { + LastOverTimeIntAggregator.combine(state, groupId, timestamps.getLong(v), values.getInt(v)); + } + } + } + } + + private void addRawInput(int positionOffset, IntBlock groups, IntVector values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + var valuePosition = groupPosition + positionOffset; + LastOverTimeIntAggregator.combine(state, groupId, timestamps.getLong(valuePosition), values.getInt(valuePosition)); + } + } + } + + @Override + public void selectedMayContainUnseenGroups(SeenGroupIds seenGroupIds) { + state.enableGroupIdTracking(seenGroupIds); + } + + @Override + public void addIntermediateInput(int positionOffset, IntVector groups, Page page) { + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + assert channels.size() == intermediateBlockCount(); + Block timestampsUncast = page.getBlock(channels.get(0)); + if (timestampsUncast.areAllValuesNull()) { + return; + } + LongBlock timestamps = (LongBlock) timestampsUncast; + Block valuesUncast = page.getBlock(channels.get(1)); + if (valuesUncast.areAllValuesNull()) { + return; + } + IntBlock values = (IntBlock) valuesUncast; + assert timestamps.getPositionCount() == values.getPositionCount(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + LastOverTimeIntAggregator.combineIntermediate(state, groupId, timestamps, values, groupPosition + positionOffset); + } + } + + @Override + public void addIntermediateRowInput(int groupId, GroupingAggregatorFunction input, int position) { + if (input.getClass() != getClass()) { + throw new IllegalArgumentException("expected " + getClass() + "; got " + input.getClass()); + } + LastOverTimeIntAggregator.GroupingState inState = ((LastOverTimeIntGroupingAggregatorFunction) input).state; + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + LastOverTimeIntAggregator.combineStates(state, groupId, inState, position); + } + + @Override + public void evaluateIntermediate(Block[] blocks, int offset, IntVector selected) { + state.toIntermediate(blocks, offset, selected, driverContext); + } + + @Override + public void evaluateFinal(Block[] blocks, int offset, IntVector selected, + GroupingAggregatorEvaluationContext evaluatorContext) { + blocks[offset] = LastOverTimeIntAggregator.evaluateFinal(state, selected, evaluatorContext); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(getClass().getSimpleName()).append("["); + sb.append("channels=").append(channels); + sb.append("]"); + return sb.toString(); + } + + @Override + public void close() { + state.close(); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeLongAggregatorFunctionSupplier.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeLongAggregatorFunctionSupplier.java new file mode 100644 index 0000000000000..4223c27b21216 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeLongAggregatorFunctionSupplier.java @@ -0,0 +1,46 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.util.List; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link AggregatorFunctionSupplier} implementation for {@link LastOverTimeLongAggregator}. + * This class is generated. Edit {@code AggregatorFunctionSupplierImplementer} instead. + */ +public final class LastOverTimeLongAggregatorFunctionSupplier implements AggregatorFunctionSupplier { + public LastOverTimeLongAggregatorFunctionSupplier() { + } + + @Override + public List nonGroupingIntermediateStateDesc() { + throw new UnsupportedOperationException("non-grouping aggregator is not supported"); + } + + @Override + public List groupingIntermediateStateDesc() { + return LastOverTimeLongGroupingAggregatorFunction.intermediateStateDesc(); + } + + @Override + public AggregatorFunction aggregator(DriverContext driverContext, List channels) { + throw new UnsupportedOperationException("non-grouping aggregator is not supported"); + } + + @Override + public LastOverTimeLongGroupingAggregatorFunction groupingAggregator(DriverContext driverContext, + List channels) { + return LastOverTimeLongGroupingAggregatorFunction.create(channels, driverContext); + } + + @Override + public String describe() { + return "last_over_time of longs"; + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeLongGroupingAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeLongGroupingAggregatorFunction.java new file mode 100644 index 0000000000000..c9ee5fbad3707 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastOverTimeLongGroupingAggregatorFunction.java @@ -0,0 +1,226 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.lang.StringBuilder; +import java.util.List; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.IntBlock; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.data.LongVector; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link GroupingAggregatorFunction} implementation for {@link LastOverTimeLongAggregator}. + * This class is generated. Edit {@code GroupingAggregatorImplementer} instead. + */ +public final class LastOverTimeLongGroupingAggregatorFunction implements GroupingAggregatorFunction { + private static final List INTERMEDIATE_STATE_DESC = List.of( + new IntermediateStateDesc("timestamps", ElementType.LONG), + new IntermediateStateDesc("values", ElementType.LONG) ); + + private final LastOverTimeLongAggregator.GroupingState state; + + private final List channels; + + private final DriverContext driverContext; + + public LastOverTimeLongGroupingAggregatorFunction(List channels, + LastOverTimeLongAggregator.GroupingState state, DriverContext driverContext) { + this.channels = channels; + this.state = state; + this.driverContext = driverContext; + } + + public static LastOverTimeLongGroupingAggregatorFunction create(List channels, + DriverContext driverContext) { + return new LastOverTimeLongGroupingAggregatorFunction(channels, LastOverTimeLongAggregator.initGrouping(driverContext), driverContext); + } + + public static List intermediateStateDesc() { + return INTERMEDIATE_STATE_DESC; + } + + @Override + public int intermediateBlockCount() { + return INTERMEDIATE_STATE_DESC.size(); + } + + @Override + public GroupingAggregatorFunction.AddInput prepareProcessPage(SeenGroupIds seenGroupIds, + Page page) { + LongBlock valuesBlock = page.getBlock(channels.get(0)); + LongVector valuesVector = valuesBlock.asVector(); + LongBlock timestampsBlock = page.getBlock(channels.get(1)); + LongVector timestampsVector = timestampsBlock.asVector(); + if (timestampsVector == null) { + throw new IllegalStateException("expected @timestamp vector; but got a block"); + } + if (valuesVector == null) { + if (valuesBlock.mayHaveNulls()) { + state.enableGroupIdTracking(seenGroupIds); + } + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntBlock groupIds) { + addRawInput(positionOffset, groupIds, valuesBlock, timestampsVector); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, valuesBlock, timestampsVector); + } + + @Override + public void close() { + } + }; + } + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntBlock groupIds) { + addRawInput(positionOffset, groupIds, valuesVector, timestampsVector); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, valuesVector, timestampsVector); + } + + @Override + public void close() { + } + }; + } + + private void addRawInput(int positionOffset, IntVector groups, LongBlock values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + if (values.isNull(groupPosition + positionOffset)) { + continue; + } + int valuesStart = values.getFirstValueIndex(groupPosition + positionOffset); + int valuesEnd = valuesStart + values.getValueCount(groupPosition + positionOffset); + for (int v = valuesStart; v < valuesEnd; v++) { + LastOverTimeLongAggregator.combine(state, groupId, timestamps.getLong(v), values.getLong(v)); + } + } + } + + private void addRawInput(int positionOffset, IntVector groups, LongVector values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + var valuePosition = groupPosition + positionOffset; + LastOverTimeLongAggregator.combine(state, groupId, timestamps.getLong(valuePosition), values.getLong(valuePosition)); + } + } + + private void addRawInput(int positionOffset, IntBlock groups, LongBlock values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + if (values.isNull(groupPosition + positionOffset)) { + continue; + } + int valuesStart = values.getFirstValueIndex(groupPosition + positionOffset); + int valuesEnd = valuesStart + values.getValueCount(groupPosition + positionOffset); + for (int v = valuesStart; v < valuesEnd; v++) { + LastOverTimeLongAggregator.combine(state, groupId, timestamps.getLong(v), values.getLong(v)); + } + } + } + } + + private void addRawInput(int positionOffset, IntBlock groups, LongVector values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + var valuePosition = groupPosition + positionOffset; + LastOverTimeLongAggregator.combine(state, groupId, timestamps.getLong(valuePosition), values.getLong(valuePosition)); + } + } + } + + @Override + public void selectedMayContainUnseenGroups(SeenGroupIds seenGroupIds) { + state.enableGroupIdTracking(seenGroupIds); + } + + @Override + public void addIntermediateInput(int positionOffset, IntVector groups, Page page) { + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + assert channels.size() == intermediateBlockCount(); + Block timestampsUncast = page.getBlock(channels.get(0)); + if (timestampsUncast.areAllValuesNull()) { + return; + } + LongBlock timestamps = (LongBlock) timestampsUncast; + Block valuesUncast = page.getBlock(channels.get(1)); + if (valuesUncast.areAllValuesNull()) { + return; + } + LongBlock values = (LongBlock) valuesUncast; + assert timestamps.getPositionCount() == values.getPositionCount(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + LastOverTimeLongAggregator.combineIntermediate(state, groupId, timestamps, values, groupPosition + positionOffset); + } + } + + @Override + public void addIntermediateRowInput(int groupId, GroupingAggregatorFunction input, int position) { + if (input.getClass() != getClass()) { + throw new IllegalArgumentException("expected " + getClass() + "; got " + input.getClass()); + } + LastOverTimeLongAggregator.GroupingState inState = ((LastOverTimeLongGroupingAggregatorFunction) input).state; + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + LastOverTimeLongAggregator.combineStates(state, groupId, inState, position); + } + + @Override + public void evaluateIntermediate(Block[] blocks, int offset, IntVector selected) { + state.toIntermediate(blocks, offset, selected, driverContext); + } + + @Override + public void evaluateFinal(Block[] blocks, int offset, IntVector selected, + GroupingAggregatorEvaluationContext evaluatorContext) { + blocks[offset] = LastOverTimeLongAggregator.evaluateFinal(state, selected, evaluatorContext); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(getClass().getSimpleName()).append("["); + sb.append("channels=").append(channels); + sb.append("]"); + return sb.toString(); + } + + @Override + public void close() { + state.close(); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-LastOverTimeAggregator.java.st b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-LastOverTimeAggregator.java.st new file mode 100644 index 0000000000000..dbef41ab17584 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-LastOverTimeAggregator.java.st @@ -0,0 +1,180 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.aggregation; + +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.BitArray; +$if(int||double||float)$ +import org.elasticsearch.common.util.$Type$Array; +$endif$ +import org.elasticsearch.common.util.LongArray; +import org.elasticsearch.compute.ann.GroupingAggregator; +import org.elasticsearch.compute.ann.IntermediateState; +import org.elasticsearch.compute.data.Block; +$if(int||double||float)$ +import org.elasticsearch.compute.data.$Type$Block; +$endif$ +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; + +/** + * A time-series aggregation function that collects the last value of each time series in each grouping + * This class is generated. Edit `X-LastOverTimeAggregator.java.st` instead. + */ +@GroupingAggregator( + timeseries = true, + value = { @IntermediateState(name = "timestamps", type = "LONG_BLOCK"), @IntermediateState(name = "values", type = "$TYPE$_BLOCK") } +) +public class LastOverTime$Type$Aggregator { + + public static GroupingState initGrouping(DriverContext driverContext) { + return new GroupingState(driverContext.bigArrays()); + } + + public static void combine(GroupingState current, int groupId, long timestamp, $type$ value) { + current.maybeCollect(groupId, timestamp, value); + } + + public static void combineIntermediate( + GroupingState current, + int groupId, + LongBlock timestamps, // stylecheck + $Type$Block values, + int otherPosition + ) { + int valueCount = values.getValueCount(otherPosition); + if (valueCount > 0) { + long timestamp = timestamps.getLong(timestamps.getFirstValueIndex(otherPosition)); + int firstIndex = values.getFirstValueIndex(otherPosition); + for (int i = 0; i < valueCount; i++) { + current.maybeCollect(groupId, timestamp, values.get$Type$(firstIndex + i)); + } + } + } + + public static void combineStates(GroupingState current, int currentGroupId, GroupingState otherState, int otherGroupId) { + if (otherGroupId < otherState.timestamps.size() && otherState.hasValue(otherGroupId)) { + var timestamp = otherState.timestamps.get(otherGroupId); + var value = otherState.values.get(otherGroupId); + current.maybeCollect(currentGroupId, timestamp, value); + } + } + + public static Block evaluateFinal(GroupingState state, IntVector selected, GroupingAggregatorEvaluationContext evalContext) { + return state.evaluateFinal(selected, evalContext); + } + + public static final class GroupingState implements GroupingAggregatorState, Releasable { + private final BigArrays bigArrays; + private LongArray timestamps; + private $Type$Array values; + private BitArray hasValues = null; + private int maxGroupId = -1; + + GroupingState(BigArrays bigArrays) { + this.bigArrays = bigArrays; + boolean success = false; + LongArray timestamps = null; + $Type$Array values = null; + try { + timestamps = bigArrays.newLongArray(1, false); + values = bigArrays.new$Type$Array(1, false); + this.timestamps = timestamps; + this.values = values; + success = true; + } finally { + if (success == false) { + Releasables.close(timestamps, values); + } + } + } + + void maybeCollect(int groupId, long timestamp, $type$ value) { + if (groupId > maxGroupId) { + timestamps = bigArrays.grow(timestamps, groupId + 1); + values = bigArrays.grow(values, groupId + 1); + timestamps.set(groupId, timestamp); + values.set(groupId, value); + } else { + // TODO: handle multiple values? + if (hasValue(groupId) == false || timestamps.get(groupId) < timestamp) { + timestamps.set(groupId, timestamp); + values.set(groupId, value); + } + } + maybeTrackGroup(groupId); + } + + private void maybeTrackGroup(int groupId) { + if (hasValues != null) { + hasValues.set(groupId, true); + } else { + if (groupId > maxGroupId + 1) { + hasValues = new BitArray(groupId + 1, bigArrays); + if (maxGroupId >= 0) { + hasValues.fill(0, maxGroupId + 1, true); + } + hasValues.set(groupId, true); + } + } + maxGroupId = Math.max(maxGroupId, groupId); + } + + boolean hasValue(long groupId) { + return groupId <= maxGroupId && (hasValues == null || hasValues.get(groupId)); + } + + @Override + public void close() { + Releasables.close(timestamps, values, hasValues); + } + + @Override + public void toIntermediate(Block[] blocks, int offset, IntVector selected, DriverContext driverContext) { + try ( + var timestampsBuilder = driverContext.blockFactory().newLongBlockBuilder(selected.getPositionCount()); + var valuesBuilder = driverContext.blockFactory().new$Type$BlockBuilder(selected.getPositionCount()) + ) { + for (int p = 0; p < selected.getPositionCount(); p++) { + int group = selected.getInt(p); + if (hasValue(group)) { + timestampsBuilder.appendLong(timestamps.get(group)); + valuesBuilder.append$Type$(values.get(group)); + } else { + timestampsBuilder.appendNull(); + valuesBuilder.appendNull(); + } + } + blocks[offset] = timestampsBuilder.build(); + blocks[offset + 1] = valuesBuilder.build(); + } + } + + Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext evalContext) { + try (var builder = evalContext.blockFactory().new$Type$BlockBuilder(selected.getPositionCount())) { + for (int p = 0; p < selected.getPositionCount(); p++) { + int group = selected.getInt(p); + if (hasValue(group)) { + builder.append$Type$(values.get(group)); + } else { + builder.appendNull(); + } + } + return builder.build(); + } + } + + @Override + public void enableGroupIdTracking(SeenGroupIds seenGroupIds) { + // tracking via hasValues + } + } +} diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec index 4e701744cb749..cbccc0c61e416 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec @@ -219,3 +219,22 @@ avg_cost:double | cluster:keyword | time_bucket:datetime 8.71875 | prod | 2024-05-10T00:22:00.000Z 8.5625 | qa | 2024-05-10T00:22:00.000Z ; + + +max_of_last_over_time +required_capability: metrics_command +required_capability: last_over_time +TS k8s | STATS max_cost=max(last_over_time(network.cost)) BY cluster, time_bucket = bucket(@timestamp,1minute) | SORT max_cost DESC, time_bucket DESC, cluster | LIMIT 10; + +max_cost:double | cluster:keyword | time_bucket:datetime +12.5 | staging | 2024-05-10T00:09:00.000Z +12.375 | prod | 2024-05-10T00:17:00.000Z +12.375 | qa | 2024-05-10T00:06:00.000Z +12.375 | qa | 2024-05-10T00:01:00.000Z +12.25 | prod | 2024-05-10T00:19:00.000Z +12.125 | qa | 2024-05-10T00:17:00.000Z +12.125 | prod | 2024-05-10T00:00:00.000Z +12.0 | prod | 2024-05-10T00:08:00.000Z +12.0 | qa | 2024-05-10T00:08:00.000Z +11.875 | qa | 2024-05-10T00:21:00.000Z +; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index 912f3fe34eba9..3d3200b092bf4 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -979,7 +979,12 @@ public enum Cap { /** * Support avg_over_time aggregation that gets evaluated per time-series */ - AVG_OVER_TIME(Build.current().isSnapshot()); + AVG_OVER_TIME(Build.current().isSnapshot()), + + /** + * Support last_over_time aggregation that gets evaluated per time-series + */ + LAST_OVER_TIME(Build.current().isSnapshot()); private final boolean enabled; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java index 764f83a67e925..608eb86f83184 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java @@ -20,6 +20,7 @@ import org.elasticsearch.xpack.esql.expression.function.aggregate.AvgOverTime; import org.elasticsearch.xpack.esql.expression.function.aggregate.Count; import org.elasticsearch.xpack.esql.expression.function.aggregate.CountDistinct; +import org.elasticsearch.xpack.esql.expression.function.aggregate.LastOverTime; import org.elasticsearch.xpack.esql.expression.function.aggregate.Max; import org.elasticsearch.xpack.esql.expression.function.aggregate.MaxOverTime; import org.elasticsearch.xpack.esql.expression.function.aggregate.Median; @@ -436,6 +437,7 @@ private static FunctionDefinition[][] snapshotFunctions() { def(Rate.class, Rate::withUnresolvedTimestamp, "rate"), def(MaxOverTime.class, uni(MaxOverTime::new), "max_over_time"), def(AvgOverTime.class, uni(AvgOverTime::new), "avg_over_time"), + def(LastOverTime.class, LastOverTime::withUnresolvedTimestamp, "last_over_time"), def(Term.class, bi(Term::new), "term") } }; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java index 0923dbc5d6853..aedd976b69762 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java @@ -32,6 +32,7 @@ public static List getNamedWriteables() { Values.ENTRY, MaxOverTime.ENTRY, AvgOverTime.ENTRY, + LastOverTime.ENTRY, // internal functions ToPartial.ENTRY, FromPartial.ENTRY, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/LastOverTime.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/LastOverTime.java new file mode 100644 index 0000000000000..d236310b57d32 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/LastOverTime.java @@ -0,0 +1,141 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.expression.function.aggregate; + +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.compute.aggregation.AggregatorFunctionSupplier; +import org.elasticsearch.compute.aggregation.LastOverTimeDoubleAggregatorFunctionSupplier; +import org.elasticsearch.compute.aggregation.LastOverTimeFloatAggregatorFunctionSupplier; +import org.elasticsearch.compute.aggregation.LastOverTimeIntAggregatorFunctionSupplier; +import org.elasticsearch.compute.aggregation.LastOverTimeLongAggregatorFunctionSupplier; +import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException; +import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.core.expression.Literal; +import org.elasticsearch.xpack.esql.core.expression.UnresolvedAttribute; +import org.elasticsearch.xpack.esql.core.tree.NodeInfo; +import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.expression.function.FunctionInfo; +import org.elasticsearch.xpack.esql.expression.function.FunctionType; +import org.elasticsearch.xpack.esql.expression.function.OptionalArgument; +import org.elasticsearch.xpack.esql.expression.function.Param; +import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; +import org.elasticsearch.xpack.esql.planner.ToAggregator; + +import java.io.IOException; +import java.util.List; + +import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.DEFAULT; +import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isType; + +public class LastOverTime extends TimeSeriesAggregateFunction implements OptionalArgument, ToAggregator { + public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( + Expression.class, + "LastOverTime", + LastOverTime::new + ); + + private final Expression timestamp; + + @FunctionInfo( + returnType = { "int", "double", "integer", "long" }, + description = "Collect the last value of a time-series. Available with TS command only", + type = FunctionType.AGGREGATE + ) + public LastOverTime( + Source source, + @Param(name = "field", type = { "long|int|double|float" }, description = "field") Expression field, + Expression timestamp + ) { + this(source, field, Literal.TRUE, timestamp); + } + + // compatibility constructor used when reading from the stream + private LastOverTime(Source source, Expression field, Expression filter, List children) { + this(source, field, filter, children.getFirst()); + } + + private LastOverTime(Source source, Expression field, Expression filter, Expression timestamp) { + super(source, field, filter, List.of(timestamp)); + this.timestamp = timestamp; + } + + public LastOverTime(StreamInput in) throws IOException { + this( + Source.readFrom((PlanStreamInput) in), + in.readNamedWriteable(Expression.class), + in.readNamedWriteable(Expression.class), + in.readNamedWriteableCollectionAsList(Expression.class) + ); + } + + @Override + public String getWriteableName() { + return ENTRY.name; + } + + public static LastOverTime withUnresolvedTimestamp(Source source, Expression field) { + return new LastOverTime(source, field, new UnresolvedAttribute(source, "@timestamp")); + } + + @Override + protected NodeInfo info() { + return NodeInfo.create(this, LastOverTime::new, field(), timestamp); + } + + @Override + public LastOverTime replaceChildren(List newChildren) { + if (newChildren.size() != 3) { + assert false : "expected 3 children for field, filter, @timestamp; got " + newChildren; + throw new IllegalArgumentException("expected 3 children for field, filter, @timestamp; got " + newChildren); + } + return new LastOverTime(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2)); + } + + @Override + public LastOverTime withFilter(Expression filter) { + return new LastOverTime(source(), field(), filter, timestamp); + } + + @Override + public DataType dataType() { + return field().dataType(); + } + + @Override + protected TypeResolution resolveType() { + return isType(field(), dt -> dt.isNumeric() && dt != DataType.UNSIGNED_LONG, sourceText(), DEFAULT, "numeric except unsigned_long"); + } + + @Override + public AggregatorFunctionSupplier supplier() { + final DataType type = field().dataType(); + return switch (type) { + case LONG -> new LastOverTimeLongAggregatorFunctionSupplier(); + case INTEGER -> new LastOverTimeIntAggregatorFunctionSupplier(); + case DOUBLE -> new LastOverTimeDoubleAggregatorFunctionSupplier(); + case FLOAT -> new LastOverTimeFloatAggregatorFunctionSupplier(); + default -> throw EsqlIllegalArgumentException.illegalDataType(type); + }; + } + + @Override + public LastOverTime perTimeSeriesAggregation() { + return this; + } + + @Override + public String toString() { + return "last_over_time(" + field() + ")"; + } + + Expression timestamp() { + return timestamp; + } +} diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml index 25bae65958f2f..af6a80d6b027e 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml @@ -33,7 +33,7 @@ setup: path: /_query parameters: [] # A snapshot function was removed in match_function_options, it can't work on mixed cluster tests otherwise. - capabilities: [ snapshot_test_for_telemetry, fn_byte_length, match_function_options, avg_over_time] + capabilities: [ snapshot_test_for_telemetry, fn_byte_length, match_function_options, last_over_time] reason: "Test that should only be executed on snapshot versions" - do: {xpack.usage: {}} @@ -101,7 +101,7 @@ setup: - match: {esql.functions.coalesce: $functions_coalesce} - gt: {esql.functions.categorize: $functions_categorize} # Testing for the entire function set isn't feasbile, so we just check that we return the correct count as an approximation. - - length: {esql.functions: 136} # check the "sister" test below for a likely update to the same esql.functions length check + - length: {esql.functions: 137} # check the "sister" test below for a likely update to the same esql.functions length check --- "Basic ESQL usage output (telemetry) non-snapshot version": From dc058047731e14f984aa5b2ca326f434620c75e4 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 11 Apr 2025 07:19:20 -0700 Subject: [PATCH 2/4] Simplify --- .../LastOverTimeDoubleAggregator.java | 65 +++++-------------- .../LastOverTimeFloatAggregator.java | 65 +++++-------------- .../LastOverTimeIntAggregator.java | 65 +++++-------------- .../LastOverTimeLongAggregator.java | 65 +++++-------------- .../X-LastOverTimeAggregator.java.st | 65 +++++-------------- .../function/aggregate/LastOverTime.java | 2 +- 6 files changed, 91 insertions(+), 236 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeDoubleAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeDoubleAggregator.java index dab550a82e7c2..d33c64dd2b630 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeDoubleAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeDoubleAggregator.java @@ -8,7 +8,6 @@ package org.elasticsearch.compute.aggregation; import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.common.util.BitArray; import org.elasticsearch.common.util.DoubleArray; import org.elasticsearch.common.util.LongArray; import org.elasticsearch.compute.ann.GroupingAggregator; @@ -18,11 +17,10 @@ import org.elasticsearch.compute.data.IntVector; import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.operator.DriverContext; -import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; /** - * A time-series aggregation function that collects the last value of each time series in each grouping + * A time-series aggregation function that collects the most recent value of a time series in a specified interval. * This class is generated. Edit `X-LastOverTimeAggregator.java.st` instead. */ @GroupingAggregator( @@ -36,7 +34,7 @@ public static GroupingState initGrouping(DriverContext driverContext) { } public static void combine(GroupingState current, int groupId, long timestamp, double value) { - current.maybeCollect(groupId, timestamp, value); + current.collectValue(groupId, timestamp, value); } public static void combineIntermediate( @@ -51,7 +49,7 @@ public static void combineIntermediate( long timestamp = timestamps.getLong(timestamps.getFirstValueIndex(otherPosition)); int firstIndex = values.getFirstValueIndex(otherPosition); for (int i = 0; i < valueCount; i++) { - current.maybeCollect(groupId, timestamp, values.getDouble(firstIndex + i)); + current.collectValue(groupId, timestamp, values.getDouble(firstIndex + i)); } } } @@ -60,7 +58,7 @@ public static void combineStates(GroupingState current, int currentGroupId, Grou if (otherGroupId < otherState.timestamps.size() && otherState.hasValue(otherGroupId)) { var timestamp = otherState.timestamps.get(otherGroupId); var value = otherState.values.get(otherGroupId); - current.maybeCollect(currentGroupId, timestamp, value); + current.collectValue(currentGroupId, timestamp, value); } } @@ -68,69 +66,47 @@ public static Block evaluateFinal(GroupingState state, IntVector selected, Group return state.evaluateFinal(selected, evalContext); } - public static final class GroupingState implements GroupingAggregatorState, Releasable { + public static final class GroupingState extends AbstractArrayState { private final BigArrays bigArrays; private LongArray timestamps; private DoubleArray values; - private BitArray hasValues = null; - private int maxGroupId = -1; GroupingState(BigArrays bigArrays) { + super(bigArrays); this.bigArrays = bigArrays; boolean success = false; LongArray timestamps = null; - DoubleArray values = null; try { timestamps = bigArrays.newLongArray(1, false); - values = bigArrays.newDoubleArray(1, false); this.timestamps = timestamps; - this.values = values; + this.values = bigArrays.newDoubleArray(1, false); success = true; } finally { if (success == false) { - Releasables.close(timestamps, values); + Releasables.close(timestamps, values, super::close); } } } - void maybeCollect(int groupId, long timestamp, double value) { - if (groupId > maxGroupId) { - timestamps = bigArrays.grow(timestamps, groupId + 1); - values = bigArrays.grow(values, groupId + 1); - timestamps.set(groupId, timestamp); - values.set(groupId, value); - } else { + void collectValue(int groupId, long timestamp, double value) { + if (groupId < timestamps.size()) { // TODO: handle multiple values? if (hasValue(groupId) == false || timestamps.get(groupId) < timestamp) { timestamps.set(groupId, timestamp); values.set(groupId, value); } - } - maybeTrackGroup(groupId); - } - - private void maybeTrackGroup(int groupId) { - if (hasValues != null) { - hasValues.set(groupId, true); } else { - if (groupId > maxGroupId + 1) { - hasValues = new BitArray(groupId + 1, bigArrays); - if (maxGroupId >= 0) { - hasValues.fill(0, maxGroupId + 1, true); - } - hasValues.set(groupId, true); - } + timestamps = bigArrays.grow(timestamps, groupId + 1); + values = bigArrays.grow(values, groupId + 1); + timestamps.set(groupId, timestamp); + values.set(groupId, value); } - maxGroupId = Math.max(maxGroupId, groupId); - } - - boolean hasValue(long groupId) { - return groupId <= maxGroupId && (hasValues == null || hasValues.get(groupId)); + trackGroupId(groupId); } @Override public void close() { - Releasables.close(timestamps, values, hasValues); + Releasables.close(timestamps, values, super::close); } @Override @@ -141,7 +117,7 @@ public void toIntermediate(Block[] blocks, int offset, IntVector selected, Drive ) { for (int p = 0; p < selected.getPositionCount(); p++) { int group = selected.getInt(p); - if (hasValue(group)) { + if (group < timestamps.size() && hasValue(group)) { timestampsBuilder.appendLong(timestamps.get(group)); valuesBuilder.appendDouble(values.get(group)); } else { @@ -158,7 +134,7 @@ Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext eval try (var builder = evalContext.blockFactory().newDoubleBlockBuilder(selected.getPositionCount())) { for (int p = 0; p < selected.getPositionCount(); p++) { int group = selected.getInt(p); - if (hasValue(group)) { + if (group < timestamps.size() && hasValue(group)) { builder.appendDouble(values.get(group)); } else { builder.appendNull(); @@ -167,10 +143,5 @@ Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext eval return builder.build(); } } - - @Override - public void enableGroupIdTracking(SeenGroupIds seenGroupIds) { - // tracking via hasValues - } } } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeFloatAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeFloatAggregator.java index a63084347f64e..ad663a3bcc01c 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeFloatAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeFloatAggregator.java @@ -8,7 +8,6 @@ package org.elasticsearch.compute.aggregation; import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.common.util.BitArray; import org.elasticsearch.common.util.FloatArray; import org.elasticsearch.common.util.LongArray; import org.elasticsearch.compute.ann.GroupingAggregator; @@ -18,11 +17,10 @@ import org.elasticsearch.compute.data.IntVector; import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.operator.DriverContext; -import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; /** - * A time-series aggregation function that collects the last value of each time series in each grouping + * A time-series aggregation function that collects the most recent value of a time series in a specified interval. * This class is generated. Edit `X-LastOverTimeAggregator.java.st` instead. */ @GroupingAggregator( @@ -36,7 +34,7 @@ public static GroupingState initGrouping(DriverContext driverContext) { } public static void combine(GroupingState current, int groupId, long timestamp, float value) { - current.maybeCollect(groupId, timestamp, value); + current.collectValue(groupId, timestamp, value); } public static void combineIntermediate( @@ -51,7 +49,7 @@ public static void combineIntermediate( long timestamp = timestamps.getLong(timestamps.getFirstValueIndex(otherPosition)); int firstIndex = values.getFirstValueIndex(otherPosition); for (int i = 0; i < valueCount; i++) { - current.maybeCollect(groupId, timestamp, values.getFloat(firstIndex + i)); + current.collectValue(groupId, timestamp, values.getFloat(firstIndex + i)); } } } @@ -60,7 +58,7 @@ public static void combineStates(GroupingState current, int currentGroupId, Grou if (otherGroupId < otherState.timestamps.size() && otherState.hasValue(otherGroupId)) { var timestamp = otherState.timestamps.get(otherGroupId); var value = otherState.values.get(otherGroupId); - current.maybeCollect(currentGroupId, timestamp, value); + current.collectValue(currentGroupId, timestamp, value); } } @@ -68,69 +66,47 @@ public static Block evaluateFinal(GroupingState state, IntVector selected, Group return state.evaluateFinal(selected, evalContext); } - public static final class GroupingState implements GroupingAggregatorState, Releasable { + public static final class GroupingState extends AbstractArrayState { private final BigArrays bigArrays; private LongArray timestamps; private FloatArray values; - private BitArray hasValues = null; - private int maxGroupId = -1; GroupingState(BigArrays bigArrays) { + super(bigArrays); this.bigArrays = bigArrays; boolean success = false; LongArray timestamps = null; - FloatArray values = null; try { timestamps = bigArrays.newLongArray(1, false); - values = bigArrays.newFloatArray(1, false); this.timestamps = timestamps; - this.values = values; + this.values = bigArrays.newFloatArray(1, false); success = true; } finally { if (success == false) { - Releasables.close(timestamps, values); + Releasables.close(timestamps, values, super::close); } } } - void maybeCollect(int groupId, long timestamp, float value) { - if (groupId > maxGroupId) { - timestamps = bigArrays.grow(timestamps, groupId + 1); - values = bigArrays.grow(values, groupId + 1); - timestamps.set(groupId, timestamp); - values.set(groupId, value); - } else { + void collectValue(int groupId, long timestamp, float value) { + if (groupId < timestamps.size()) { // TODO: handle multiple values? if (hasValue(groupId) == false || timestamps.get(groupId) < timestamp) { timestamps.set(groupId, timestamp); values.set(groupId, value); } - } - maybeTrackGroup(groupId); - } - - private void maybeTrackGroup(int groupId) { - if (hasValues != null) { - hasValues.set(groupId, true); } else { - if (groupId > maxGroupId + 1) { - hasValues = new BitArray(groupId + 1, bigArrays); - if (maxGroupId >= 0) { - hasValues.fill(0, maxGroupId + 1, true); - } - hasValues.set(groupId, true); - } + timestamps = bigArrays.grow(timestamps, groupId + 1); + values = bigArrays.grow(values, groupId + 1); + timestamps.set(groupId, timestamp); + values.set(groupId, value); } - maxGroupId = Math.max(maxGroupId, groupId); - } - - boolean hasValue(long groupId) { - return groupId <= maxGroupId && (hasValues == null || hasValues.get(groupId)); + trackGroupId(groupId); } @Override public void close() { - Releasables.close(timestamps, values, hasValues); + Releasables.close(timestamps, values, super::close); } @Override @@ -141,7 +117,7 @@ public void toIntermediate(Block[] blocks, int offset, IntVector selected, Drive ) { for (int p = 0; p < selected.getPositionCount(); p++) { int group = selected.getInt(p); - if (hasValue(group)) { + if (group < timestamps.size() && hasValue(group)) { timestampsBuilder.appendLong(timestamps.get(group)); valuesBuilder.appendFloat(values.get(group)); } else { @@ -158,7 +134,7 @@ Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext eval try (var builder = evalContext.blockFactory().newFloatBlockBuilder(selected.getPositionCount())) { for (int p = 0; p < selected.getPositionCount(); p++) { int group = selected.getInt(p); - if (hasValue(group)) { + if (group < timestamps.size() && hasValue(group)) { builder.appendFloat(values.get(group)); } else { builder.appendNull(); @@ -167,10 +143,5 @@ Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext eval return builder.build(); } } - - @Override - public void enableGroupIdTracking(SeenGroupIds seenGroupIds) { - // tracking via hasValues - } } } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeIntAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeIntAggregator.java index 2b395ec6b713f..6ede9c32eb3a4 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeIntAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeIntAggregator.java @@ -8,7 +8,6 @@ package org.elasticsearch.compute.aggregation; import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.common.util.BitArray; import org.elasticsearch.common.util.IntArray; import org.elasticsearch.common.util.LongArray; import org.elasticsearch.compute.ann.GroupingAggregator; @@ -18,11 +17,10 @@ import org.elasticsearch.compute.data.IntVector; import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.operator.DriverContext; -import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; /** - * A time-series aggregation function that collects the last value of each time series in each grouping + * A time-series aggregation function that collects the most recent value of a time series in a specified interval. * This class is generated. Edit `X-LastOverTimeAggregator.java.st` instead. */ @GroupingAggregator( @@ -36,7 +34,7 @@ public static GroupingState initGrouping(DriverContext driverContext) { } public static void combine(GroupingState current, int groupId, long timestamp, int value) { - current.maybeCollect(groupId, timestamp, value); + current.collectValue(groupId, timestamp, value); } public static void combineIntermediate( @@ -51,7 +49,7 @@ public static void combineIntermediate( long timestamp = timestamps.getLong(timestamps.getFirstValueIndex(otherPosition)); int firstIndex = values.getFirstValueIndex(otherPosition); for (int i = 0; i < valueCount; i++) { - current.maybeCollect(groupId, timestamp, values.getInt(firstIndex + i)); + current.collectValue(groupId, timestamp, values.getInt(firstIndex + i)); } } } @@ -60,7 +58,7 @@ public static void combineStates(GroupingState current, int currentGroupId, Grou if (otherGroupId < otherState.timestamps.size() && otherState.hasValue(otherGroupId)) { var timestamp = otherState.timestamps.get(otherGroupId); var value = otherState.values.get(otherGroupId); - current.maybeCollect(currentGroupId, timestamp, value); + current.collectValue(currentGroupId, timestamp, value); } } @@ -68,69 +66,47 @@ public static Block evaluateFinal(GroupingState state, IntVector selected, Group return state.evaluateFinal(selected, evalContext); } - public static final class GroupingState implements GroupingAggregatorState, Releasable { + public static final class GroupingState extends AbstractArrayState { private final BigArrays bigArrays; private LongArray timestamps; private IntArray values; - private BitArray hasValues = null; - private int maxGroupId = -1; GroupingState(BigArrays bigArrays) { + super(bigArrays); this.bigArrays = bigArrays; boolean success = false; LongArray timestamps = null; - IntArray values = null; try { timestamps = bigArrays.newLongArray(1, false); - values = bigArrays.newIntArray(1, false); this.timestamps = timestamps; - this.values = values; + this.values = bigArrays.newIntArray(1, false); success = true; } finally { if (success == false) { - Releasables.close(timestamps, values); + Releasables.close(timestamps, values, super::close); } } } - void maybeCollect(int groupId, long timestamp, int value) { - if (groupId > maxGroupId) { - timestamps = bigArrays.grow(timestamps, groupId + 1); - values = bigArrays.grow(values, groupId + 1); - timestamps.set(groupId, timestamp); - values.set(groupId, value); - } else { + void collectValue(int groupId, long timestamp, int value) { + if (groupId < timestamps.size()) { // TODO: handle multiple values? if (hasValue(groupId) == false || timestamps.get(groupId) < timestamp) { timestamps.set(groupId, timestamp); values.set(groupId, value); } - } - maybeTrackGroup(groupId); - } - - private void maybeTrackGroup(int groupId) { - if (hasValues != null) { - hasValues.set(groupId, true); } else { - if (groupId > maxGroupId + 1) { - hasValues = new BitArray(groupId + 1, bigArrays); - if (maxGroupId >= 0) { - hasValues.fill(0, maxGroupId + 1, true); - } - hasValues.set(groupId, true); - } + timestamps = bigArrays.grow(timestamps, groupId + 1); + values = bigArrays.grow(values, groupId + 1); + timestamps.set(groupId, timestamp); + values.set(groupId, value); } - maxGroupId = Math.max(maxGroupId, groupId); - } - - boolean hasValue(long groupId) { - return groupId <= maxGroupId && (hasValues == null || hasValues.get(groupId)); + trackGroupId(groupId); } @Override public void close() { - Releasables.close(timestamps, values, hasValues); + Releasables.close(timestamps, values, super::close); } @Override @@ -141,7 +117,7 @@ public void toIntermediate(Block[] blocks, int offset, IntVector selected, Drive ) { for (int p = 0; p < selected.getPositionCount(); p++) { int group = selected.getInt(p); - if (hasValue(group)) { + if (group < timestamps.size() && hasValue(group)) { timestampsBuilder.appendLong(timestamps.get(group)); valuesBuilder.appendInt(values.get(group)); } else { @@ -158,7 +134,7 @@ Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext eval try (var builder = evalContext.blockFactory().newIntBlockBuilder(selected.getPositionCount())) { for (int p = 0; p < selected.getPositionCount(); p++) { int group = selected.getInt(p); - if (hasValue(group)) { + if (group < timestamps.size() && hasValue(group)) { builder.appendInt(values.get(group)); } else { builder.appendNull(); @@ -167,10 +143,5 @@ Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext eval return builder.build(); } } - - @Override - public void enableGroupIdTracking(SeenGroupIds seenGroupIds) { - // tracking via hasValues - } } } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeLongAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeLongAggregator.java index 9af5d27f5523b..fca94a45469a1 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeLongAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeLongAggregator.java @@ -8,7 +8,6 @@ package org.elasticsearch.compute.aggregation; import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.common.util.BitArray; import org.elasticsearch.common.util.LongArray; import org.elasticsearch.compute.ann.GroupingAggregator; import org.elasticsearch.compute.ann.IntermediateState; @@ -16,11 +15,10 @@ import org.elasticsearch.compute.data.IntVector; import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.operator.DriverContext; -import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; /** - * A time-series aggregation function that collects the last value of each time series in each grouping + * A time-series aggregation function that collects the most recent value of a time series in a specified interval. * This class is generated. Edit `X-LastOverTimeAggregator.java.st` instead. */ @GroupingAggregator( @@ -34,7 +32,7 @@ public static GroupingState initGrouping(DriverContext driverContext) { } public static void combine(GroupingState current, int groupId, long timestamp, long value) { - current.maybeCollect(groupId, timestamp, value); + current.collectValue(groupId, timestamp, value); } public static void combineIntermediate( @@ -49,7 +47,7 @@ public static void combineIntermediate( long timestamp = timestamps.getLong(timestamps.getFirstValueIndex(otherPosition)); int firstIndex = values.getFirstValueIndex(otherPosition); for (int i = 0; i < valueCount; i++) { - current.maybeCollect(groupId, timestamp, values.getLong(firstIndex + i)); + current.collectValue(groupId, timestamp, values.getLong(firstIndex + i)); } } } @@ -58,7 +56,7 @@ public static void combineStates(GroupingState current, int currentGroupId, Grou if (otherGroupId < otherState.timestamps.size() && otherState.hasValue(otherGroupId)) { var timestamp = otherState.timestamps.get(otherGroupId); var value = otherState.values.get(otherGroupId); - current.maybeCollect(currentGroupId, timestamp, value); + current.collectValue(currentGroupId, timestamp, value); } } @@ -66,69 +64,47 @@ public static Block evaluateFinal(GroupingState state, IntVector selected, Group return state.evaluateFinal(selected, evalContext); } - public static final class GroupingState implements GroupingAggregatorState, Releasable { + public static final class GroupingState extends AbstractArrayState { private final BigArrays bigArrays; private LongArray timestamps; private LongArray values; - private BitArray hasValues = null; - private int maxGroupId = -1; GroupingState(BigArrays bigArrays) { + super(bigArrays); this.bigArrays = bigArrays; boolean success = false; LongArray timestamps = null; - LongArray values = null; try { timestamps = bigArrays.newLongArray(1, false); - values = bigArrays.newLongArray(1, false); this.timestamps = timestamps; - this.values = values; + this.values = bigArrays.newLongArray(1, false); success = true; } finally { if (success == false) { - Releasables.close(timestamps, values); + Releasables.close(timestamps, values, super::close); } } } - void maybeCollect(int groupId, long timestamp, long value) { - if (groupId > maxGroupId) { - timestamps = bigArrays.grow(timestamps, groupId + 1); - values = bigArrays.grow(values, groupId + 1); - timestamps.set(groupId, timestamp); - values.set(groupId, value); - } else { + void collectValue(int groupId, long timestamp, long value) { + if (groupId < timestamps.size()) { // TODO: handle multiple values? if (hasValue(groupId) == false || timestamps.get(groupId) < timestamp) { timestamps.set(groupId, timestamp); values.set(groupId, value); } - } - maybeTrackGroup(groupId); - } - - private void maybeTrackGroup(int groupId) { - if (hasValues != null) { - hasValues.set(groupId, true); } else { - if (groupId > maxGroupId + 1) { - hasValues = new BitArray(groupId + 1, bigArrays); - if (maxGroupId >= 0) { - hasValues.fill(0, maxGroupId + 1, true); - } - hasValues.set(groupId, true); - } + timestamps = bigArrays.grow(timestamps, groupId + 1); + values = bigArrays.grow(values, groupId + 1); + timestamps.set(groupId, timestamp); + values.set(groupId, value); } - maxGroupId = Math.max(maxGroupId, groupId); - } - - boolean hasValue(long groupId) { - return groupId <= maxGroupId && (hasValues == null || hasValues.get(groupId)); + trackGroupId(groupId); } @Override public void close() { - Releasables.close(timestamps, values, hasValues); + Releasables.close(timestamps, values, super::close); } @Override @@ -139,7 +115,7 @@ public void toIntermediate(Block[] blocks, int offset, IntVector selected, Drive ) { for (int p = 0; p < selected.getPositionCount(); p++) { int group = selected.getInt(p); - if (hasValue(group)) { + if (group < timestamps.size() && hasValue(group)) { timestampsBuilder.appendLong(timestamps.get(group)); valuesBuilder.appendLong(values.get(group)); } else { @@ -156,7 +132,7 @@ Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext eval try (var builder = evalContext.blockFactory().newLongBlockBuilder(selected.getPositionCount())) { for (int p = 0; p < selected.getPositionCount(); p++) { int group = selected.getInt(p); - if (hasValue(group)) { + if (group < timestamps.size() && hasValue(group)) { builder.appendLong(values.get(group)); } else { builder.appendNull(); @@ -165,10 +141,5 @@ Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext eval return builder.build(); } } - - @Override - public void enableGroupIdTracking(SeenGroupIds seenGroupIds) { - // tracking via hasValues - } } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-LastOverTimeAggregator.java.st b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-LastOverTimeAggregator.java.st index dbef41ab17584..e7cb3c4b829d8 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-LastOverTimeAggregator.java.st +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-LastOverTimeAggregator.java.st @@ -8,7 +8,6 @@ package org.elasticsearch.compute.aggregation; import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.common.util.BitArray; $if(int||double||float)$ import org.elasticsearch.common.util.$Type$Array; $endif$ @@ -22,11 +21,10 @@ $endif$ import org.elasticsearch.compute.data.IntVector; import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.operator.DriverContext; -import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; /** - * A time-series aggregation function that collects the last value of each time series in each grouping + * A time-series aggregation function that collects the most recent value of a time series in a specified interval. * This class is generated. Edit `X-LastOverTimeAggregator.java.st` instead. */ @GroupingAggregator( @@ -40,7 +38,7 @@ public class LastOverTime$Type$Aggregator { } public static void combine(GroupingState current, int groupId, long timestamp, $type$ value) { - current.maybeCollect(groupId, timestamp, value); + current.collectValue(groupId, timestamp, value); } public static void combineIntermediate( @@ -55,7 +53,7 @@ public class LastOverTime$Type$Aggregator { long timestamp = timestamps.getLong(timestamps.getFirstValueIndex(otherPosition)); int firstIndex = values.getFirstValueIndex(otherPosition); for (int i = 0; i < valueCount; i++) { - current.maybeCollect(groupId, timestamp, values.get$Type$(firstIndex + i)); + current.collectValue(groupId, timestamp, values.get$Type$(firstIndex + i)); } } } @@ -64,7 +62,7 @@ public class LastOverTime$Type$Aggregator { if (otherGroupId < otherState.timestamps.size() && otherState.hasValue(otherGroupId)) { var timestamp = otherState.timestamps.get(otherGroupId); var value = otherState.values.get(otherGroupId); - current.maybeCollect(currentGroupId, timestamp, value); + current.collectValue(currentGroupId, timestamp, value); } } @@ -72,69 +70,47 @@ public class LastOverTime$Type$Aggregator { return state.evaluateFinal(selected, evalContext); } - public static final class GroupingState implements GroupingAggregatorState, Releasable { + public static final class GroupingState extends AbstractArrayState { private final BigArrays bigArrays; private LongArray timestamps; private $Type$Array values; - private BitArray hasValues = null; - private int maxGroupId = -1; GroupingState(BigArrays bigArrays) { + super(bigArrays); this.bigArrays = bigArrays; boolean success = false; LongArray timestamps = null; - $Type$Array values = null; try { timestamps = bigArrays.newLongArray(1, false); - values = bigArrays.new$Type$Array(1, false); this.timestamps = timestamps; - this.values = values; + this.values = bigArrays.new$Type$Array(1, false); success = true; } finally { if (success == false) { - Releasables.close(timestamps, values); + Releasables.close(timestamps, values, super::close); } } } - void maybeCollect(int groupId, long timestamp, $type$ value) { - if (groupId > maxGroupId) { - timestamps = bigArrays.grow(timestamps, groupId + 1); - values = bigArrays.grow(values, groupId + 1); - timestamps.set(groupId, timestamp); - values.set(groupId, value); - } else { + void collectValue(int groupId, long timestamp, $type$ value) { + if (groupId < timestamps.size()) { // TODO: handle multiple values? if (hasValue(groupId) == false || timestamps.get(groupId) < timestamp) { timestamps.set(groupId, timestamp); values.set(groupId, value); } - } - maybeTrackGroup(groupId); - } - - private void maybeTrackGroup(int groupId) { - if (hasValues != null) { - hasValues.set(groupId, true); } else { - if (groupId > maxGroupId + 1) { - hasValues = new BitArray(groupId + 1, bigArrays); - if (maxGroupId >= 0) { - hasValues.fill(0, maxGroupId + 1, true); - } - hasValues.set(groupId, true); - } + timestamps = bigArrays.grow(timestamps, groupId + 1); + values = bigArrays.grow(values, groupId + 1); + timestamps.set(groupId, timestamp); + values.set(groupId, value); } - maxGroupId = Math.max(maxGroupId, groupId); - } - - boolean hasValue(long groupId) { - return groupId <= maxGroupId && (hasValues == null || hasValues.get(groupId)); + trackGroupId(groupId); } @Override public void close() { - Releasables.close(timestamps, values, hasValues); + Releasables.close(timestamps, values, super::close); } @Override @@ -145,7 +121,7 @@ public class LastOverTime$Type$Aggregator { ) { for (int p = 0; p < selected.getPositionCount(); p++) { int group = selected.getInt(p); - if (hasValue(group)) { + if (group < timestamps.size() && hasValue(group)) { timestampsBuilder.appendLong(timestamps.get(group)); valuesBuilder.append$Type$(values.get(group)); } else { @@ -162,7 +138,7 @@ public class LastOverTime$Type$Aggregator { try (var builder = evalContext.blockFactory().new$Type$BlockBuilder(selected.getPositionCount())) { for (int p = 0; p < selected.getPositionCount(); p++) { int group = selected.getInt(p); - if (hasValue(group)) { + if (group < timestamps.size() && hasValue(group)) { builder.append$Type$(values.get(group)); } else { builder.appendNull(); @@ -171,10 +147,5 @@ public class LastOverTime$Type$Aggregator { return builder.build(); } } - - @Override - public void enableGroupIdTracking(SeenGroupIds seenGroupIds) { - // tracking via hasValues - } } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/LastOverTime.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/LastOverTime.java index d236310b57d32..2c9645b035017 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/LastOverTime.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/LastOverTime.java @@ -45,7 +45,7 @@ public class LastOverTime extends TimeSeriesAggregateFunction implements Optiona @FunctionInfo( returnType = { "int", "double", "integer", "long" }, - description = "Collect the last value of a time-series. Available with TS command only", + description = "Collect the most recent value of a time-series in the specified interval. Available with TS command only", type = FunctionType.AGGREGATE ) public LastOverTime( From 9e57f6c1db731305366493f1239664edecec8e43 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 16 Apr 2025 23:52:21 -0700 Subject: [PATCH 3/4] Add comment --- .../compute/aggregation/LastOverTimeDoubleAggregator.java | 2 ++ .../compute/aggregation/LastOverTimeFloatAggregator.java | 2 ++ .../compute/aggregation/LastOverTimeIntAggregator.java | 2 ++ .../compute/aggregation/LastOverTimeLongAggregator.java | 2 ++ .../compute/aggregation/X-LastOverTimeAggregator.java.st | 2 ++ 5 files changed, 10 insertions(+) diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeDoubleAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeDoubleAggregator.java index d33c64dd2b630..9231d345b7c84 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeDoubleAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeDoubleAggregator.java @@ -33,6 +33,8 @@ public static GroupingState initGrouping(DriverContext driverContext) { return new GroupingState(driverContext.bigArrays()); } + // TODO: Since data in data_streams is sorted by `_tsid` and timestamp in descending order, + // we can read the first encountered value for each group of `_tsid` and time bucket. public static void combine(GroupingState current, int groupId, long timestamp, double value) { current.collectValue(groupId, timestamp, value); } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeFloatAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeFloatAggregator.java index ad663a3bcc01c..f6d47c9b98ed6 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeFloatAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeFloatAggregator.java @@ -33,6 +33,8 @@ public static GroupingState initGrouping(DriverContext driverContext) { return new GroupingState(driverContext.bigArrays()); } + // TODO: Since data in data_streams is sorted by `_tsid` and timestamp in descending order, + // we can read the first encountered value for each group of `_tsid` and time bucket. public static void combine(GroupingState current, int groupId, long timestamp, float value) { current.collectValue(groupId, timestamp, value); } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeIntAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeIntAggregator.java index 6ede9c32eb3a4..8764a86d03a20 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeIntAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeIntAggregator.java @@ -33,6 +33,8 @@ public static GroupingState initGrouping(DriverContext driverContext) { return new GroupingState(driverContext.bigArrays()); } + // TODO: Since data in data_streams is sorted by `_tsid` and timestamp in descending order, + // we can read the first encountered value for each group of `_tsid` and time bucket. public static void combine(GroupingState current, int groupId, long timestamp, int value) { current.collectValue(groupId, timestamp, value); } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeLongAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeLongAggregator.java index fca94a45469a1..94787db738bf2 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeLongAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeLongAggregator.java @@ -31,6 +31,8 @@ public static GroupingState initGrouping(DriverContext driverContext) { return new GroupingState(driverContext.bigArrays()); } + // TODO: Since data in data_streams is sorted by `_tsid` and timestamp in descending order, + // we can read the first encountered value for each group of `_tsid` and time bucket. public static void combine(GroupingState current, int groupId, long timestamp, long value) { current.collectValue(groupId, timestamp, value); } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-LastOverTimeAggregator.java.st b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-LastOverTimeAggregator.java.st index e7cb3c4b829d8..b189a83873dd7 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-LastOverTimeAggregator.java.st +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-LastOverTimeAggregator.java.st @@ -37,6 +37,8 @@ public class LastOverTime$Type$Aggregator { return new GroupingState(driverContext.bigArrays()); } + // TODO: Since data in data_streams is sorted by `_tsid` and timestamp in descending order, + // we can read the first encountered value for each group of `_tsid` and time bucket. public static void combine(GroupingState current, int groupId, long timestamp, $type$ value) { current.collectValue(groupId, timestamp, value); } From 427faa27447ae7accdcfd49d98e83d975ca8cb49 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 17 Apr 2025 07:12:44 -0700 Subject: [PATCH 4/4] fix tests --- .../yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml index af6a80d6b027e..48912ce1bbade 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml @@ -101,7 +101,7 @@ setup: - match: {esql.functions.coalesce: $functions_coalesce} - gt: {esql.functions.categorize: $functions_categorize} # Testing for the entire function set isn't feasbile, so we just check that we return the correct count as an approximation. - - length: {esql.functions: 137} # check the "sister" test below for a likely update to the same esql.functions length check + - length: {esql.functions: 138} # check the "sister" test below for a likely update to the same esql.functions length check --- "Basic ESQL usage output (telemetry) non-snapshot version":