diff --git a/docs/reference/query-languages/esql/_snippets/functions/types/first.md b/docs/reference/query-languages/esql/_snippets/functions/types/first.md index 4571d108fc722..ca982dfb580bc 100644 --- a/docs/reference/query-languages/esql/_snippets/functions/types/first.md +++ b/docs/reference/query-languages/esql/_snippets/functions/types/first.md @@ -8,6 +8,10 @@ | double | date_nanos | double | | integer | date | integer | | integer | date_nanos | integer | +| keyword | date | keyword | +| keyword | date_nanos | keyword | | long | date | long | | long | date_nanos | long | +| text | date | keyword | +| text | date_nanos | keyword | diff --git a/docs/reference/query-languages/esql/_snippets/functions/types/last.md b/docs/reference/query-languages/esql/_snippets/functions/types/last.md index 4571d108fc722..ca982dfb580bc 100644 --- a/docs/reference/query-languages/esql/_snippets/functions/types/last.md +++ b/docs/reference/query-languages/esql/_snippets/functions/types/last.md @@ -8,6 +8,10 @@ | double | date_nanos | double | | integer | date | integer | | integer | date_nanos | integer | +| keyword | date | keyword | +| keyword | date_nanos | keyword | | long | date | long | | long | date_nanos | long | +| text | date | keyword | +| text | date_nanos | keyword | diff --git a/docs/reference/query-languages/esql/kibana/definition/functions/first.json b/docs/reference/query-languages/esql/kibana/definition/functions/first.json index 06e68c53c3bde..336afa2f43a2b 100644 --- a/docs/reference/query-languages/esql/kibana/definition/functions/first.json +++ b/docs/reference/query-languages/esql/kibana/definition/functions/first.json @@ -76,6 +76,42 @@ "variadic" : false, "returnType" : "integer" }, + { + "params" : [ + { + "name" : "value", + "type" : "keyword", + "optional" : false, + "description" : "Values to return" + }, + { + "name" : "sort", + "type" : "date", + "optional" : false, + "description" : "Sort key" + } + ], + "variadic" : false, + "returnType" : "keyword" + }, + { + "params" : [ + { + "name" : "value", + "type" : "keyword", + "optional" : false, + "description" : "Values to return" + }, + { + "name" : "sort", + "type" : "date_nanos", + "optional" : false, + "description" : "Sort key" + } + ], + "variadic" : false, + "returnType" : "keyword" + }, { "params" : [ { @@ -111,6 +147,42 @@ ], "variadic" : false, "returnType" : "long" + }, + { + "params" : [ + { + "name" : "value", + "type" : "text", + "optional" : false, + "description" : "Values to return" + }, + { + "name" : "sort", + "type" : "date", + "optional" : false, + "description" : "Sort key" + } + ], + "variadic" : false, + "returnType" : "keyword" + }, + { + "params" : [ + { + "name" : "value", + "type" : "text", + "optional" : false, + "description" : "Values to return" + }, + { + "name" : "sort", + "type" : "date_nanos", + "optional" : false, + "description" : "Sort key" + } + ], + "variadic" : false, + "returnType" : "keyword" } ], "examples" : [ diff --git a/docs/reference/query-languages/esql/kibana/definition/functions/last.json b/docs/reference/query-languages/esql/kibana/definition/functions/last.json index b1bf80789f9a5..2525423825096 100644 --- a/docs/reference/query-languages/esql/kibana/definition/functions/last.json +++ b/docs/reference/query-languages/esql/kibana/definition/functions/last.json @@ -76,6 +76,42 @@ "variadic" : false, "returnType" : "integer" }, + { + "params" : [ + { + "name" : "value", + "type" : "keyword", + "optional" : false, + "description" : "Values to return" + }, + { + "name" : "sort", + "type" : "date", + "optional" : false, + "description" : "Sort key" + } + ], + "variadic" : false, + "returnType" : "keyword" + }, + { + "params" : [ + { + "name" : "value", + "type" : "keyword", + "optional" : false, + "description" : "Values to return" + }, + { + "name" : "sort", + "type" : "date_nanos", + "optional" : false, + "description" : "Sort key" + } + ], + "variadic" : false, + "returnType" : "keyword" + }, { "params" : [ { @@ -111,6 +147,42 @@ ], "variadic" : false, "returnType" : "long" + }, + { + "params" : [ + { + "name" : "value", + "type" : "text", + "optional" : false, + "description" : "Values to return" + }, + { + "name" : "sort", + "type" : "date", + "optional" : false, + "description" : "Sort key" + } + ], + "variadic" : false, + "returnType" : "keyword" + }, + { + "params" : [ + { + "name" : "value", + "type" : "text", + "optional" : false, + "description" : "Values to return" + }, + { + "name" : "sort", + "type" : "date_nanos", + "optional" : false, + "description" : "Sort key" + } + ], + "variadic" : false, + "returnType" : "keyword" } ], "examples" : [ diff --git a/x-pack/plugin/esql/compute/build.gradle b/x-pack/plugin/esql/compute/build.gradle index 2d30ad7debb19..8acb7697b9f15 100644 --- a/x-pack/plugin/esql/compute/build.gradle +++ b/x-pack/plugin/esql/compute/build.gradle @@ -477,7 +477,7 @@ tasks.named('stringTemplates').configure { */ File twoStateInputFile = file("src/main/java/org/elasticsearch/compute/aggregation/X-2State.java.st") [longProperties].forEach { v1 -> - [intProperties, longProperties, floatProperties, doubleProperties].forEach { v2 -> + [intProperties, longProperties, floatProperties, doubleProperties, bytesRefProperties].forEach { v2 -> { var properties = [:] v1.forEach { k, v -> properties["v1_" + k] = v} @@ -945,6 +945,11 @@ tasks.named('stringTemplates').configure { it.inputFile = valueByTimestampAggregatorInputFile it.outputFile = "org/elasticsearch/compute/aggregation/${Occurrence}DoubleByTimestampAggregator.java" } + template { + it.properties = addOccurrence(bytesRefProperties, Occurrence) + it.inputFile = valueByTimestampAggregatorInputFile + it.outputFile = "org/elasticsearch/compute/aggregation/${Occurrence}BytesRefByTimestampAggregator.java" + } } } } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstBytesRefByTimestampAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstBytesRefByTimestampAggregator.java new file mode 100644 index 0000000000000..2859c456ce1f7 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstBytesRefByTimestampAggregator.java @@ -0,0 +1,209 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.aggregation; + +// begin generated imports +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.util.ObjectArray; +import org.elasticsearch.compute.operator.BreakingBytesRefBuilder; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.BytesRefArray; +import org.elasticsearch.common.util.LongArray; +import org.elasticsearch.compute.ann.Aggregator; +import org.elasticsearch.compute.ann.GroupingAggregator; +import org.elasticsearch.compute.ann.IntermediateState; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BooleanBlock; +import org.elasticsearch.compute.data.BytesRefBlock; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.core.Releasables; +// end generated imports + +/** + * A time-series aggregation function that collects the First occurrence value of a time series in a specified interval. + * This class is generated. Edit `X-ValueByTimestampAggregator.java.st` instead. + */ +@Aggregator( + { + @IntermediateState(name = "timestamps", type = "LONG"), + @IntermediateState(name = "values", type = "BYTES_REF"), + @IntermediateState(name = "seen", type = "BOOLEAN") } +) +@GroupingAggregator( + { @IntermediateState(name = "timestamps", type = "LONG_BLOCK"), @IntermediateState(name = "values", type = "BYTES_REF_BLOCK") } +) +public class FirstBytesRefByTimestampAggregator { + public static String describe() { + return "first_BytesRef_by_timestamp"; + } + + public static LongBytesRefState initSingle(DriverContext driverContext) { + return new LongBytesRefState(0, new BytesRef(), driverContext.breaker(), describe()); + } + + public static void first(LongBytesRefState current, BytesRef value, long timestamp) { + current.v1(timestamp); + current.v2(value); + } + + public static void combine(LongBytesRefState current, BytesRef value, long timestamp) { + if (timestamp < current.v1()) { + current.v1(timestamp); + current.v2(value); + } + } + + public static void combineIntermediate(LongBytesRefState current, long timestamp, BytesRef value, boolean seen) { + if (seen) { + if (current.seen()) { + combine(current, value, timestamp); + } else { + first(current, value, timestamp); + current.seen(true); + } + } + } + + public static Block evaluateFinal(LongBytesRefState current, DriverContext ctx) { + return ctx.blockFactory().newConstantBytesRefBlockWith(current.v2(), 1); + } + + public static GroupingState initGrouping(DriverContext driverContext) { + return new GroupingState(driverContext.bigArrays(), driverContext.breaker()); + } + + public static void combine(GroupingState current, int groupId, BytesRef value, long timestamp) { + current.collectValue(groupId, timestamp, value); + } + + public static void combineIntermediate( + GroupingState current, + int groupId, + LongBlock timestamps, // stylecheck + BytesRefBlock values, + int otherPosition + ) { + // TODO seen should probably be part of the intermediate representation + int valueCount = values.getValueCount(otherPosition); + if (valueCount > 0) { + long timestamp = timestamps.getLong(timestamps.getFirstValueIndex(otherPosition)); + int firstIndex = values.getFirstValueIndex(otherPosition); + BytesRef bytesScratch = new BytesRef(); + for (int i = 0; i < valueCount; i++) { + current.collectValue(groupId, timestamp, values.getBytesRef(firstIndex + i, bytesScratch)); + } + } + } + + public static Block evaluateFinal(GroupingState state, IntVector selected, GroupingAggregatorEvaluationContext ctx) { + return state.evaluateFinal(selected, ctx); + } + + public static final class GroupingState extends AbstractArrayState { + private final BigArrays bigArrays; + private LongArray timestamps; + private ObjectArray values; + private final CircuitBreaker breaker; + private int maxGroupId = -1; + + GroupingState(BigArrays bigArrays, CircuitBreaker breaker) { + super(bigArrays); + this.bigArrays = bigArrays; + boolean success = false; + this.breaker = breaker; + LongArray timestamps = null; + try { + timestamps = bigArrays.newLongArray(1, false); + this.timestamps = timestamps; + this.values = bigArrays.newObjectArray(1); + /* + * Enable group id tracking because we use has hasValue in the + * collection itself to detect the when a value first arrives. + */ + enableGroupIdTracking(new SeenGroupIds.Empty()); + success = true; + } finally { + if (success == false) { + Releasables.close(timestamps, values, super::close); + } + } + } + + void collectValue(int groupId, long timestamp, BytesRef value) { + boolean updated = false; + if (groupId < timestamps.size()) { + // TODO: handle multiple values? + if (groupId > maxGroupId || hasValue(groupId) == false || timestamps.get(groupId) > timestamp) { + timestamps.set(groupId, timestamp); + updated = true; + } + } else { + timestamps = bigArrays.grow(timestamps, groupId + 1); + timestamps.set(groupId, timestamp); + updated = true; + } + if (updated) { + values = bigArrays.grow(values, groupId + 1); + BreakingBytesRefBuilder builder = values.get(groupId); + if (builder == null) { + builder = new BreakingBytesRefBuilder(breaker, "First", value.length); + } + builder.copyBytes(value); + values.set(groupId, builder); + } + maxGroupId = Math.max(maxGroupId, groupId); + trackGroupId(groupId); + } + + @Override + public void close() { + for (long i = 0; i < values.size(); i++) { + Releasables.close(values.get(i)); + } + Releasables.close(timestamps, values, super::close); + } + + @Override + public void toIntermediate(Block[] blocks, int offset, IntVector selected, DriverContext driverContext) { + try ( + var timestampsBuilder = driverContext.blockFactory().newLongBlockBuilder(selected.getPositionCount()); + var valuesBuilder = driverContext.blockFactory().newBytesRefBlockBuilder(selected.getPositionCount()) + ) { + for (int p = 0; p < selected.getPositionCount(); p++) { + int group = selected.getInt(p); + if (group < timestamps.size() && hasValue(group)) { + timestampsBuilder.appendLong(timestamps.get(group)); + valuesBuilder.appendBytesRef(values.get(group).bytesRefView()); + } else { + timestampsBuilder.appendNull(); + valuesBuilder.appendNull(); + } + } + blocks[offset] = timestampsBuilder.build(); + blocks[offset + 1] = valuesBuilder.build(); + } + } + + Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext evalContext) { + try (var builder = evalContext.blockFactory().newBytesRefBlockBuilder(selected.getPositionCount())) { + for (int p = 0; p < selected.getPositionCount(); p++) { + int group = selected.getInt(p); + if (group < timestamps.size() && hasValue(group)) { + builder.appendBytesRef(values.get(group).bytesRefView()); + } else { + builder.appendNull(); + } + } + return builder.build(); + } + } + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstDoubleByTimestampAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstDoubleByTimestampAggregator.java index d0b2220c8ca11..e1cdfd78e2375 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstDoubleByTimestampAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstDoubleByTimestampAggregator.java @@ -8,6 +8,10 @@ package org.elasticsearch.compute.aggregation; // begin generated imports +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.util.ObjectArray; +import org.elasticsearch.compute.operator.BreakingBytesRefBuilder; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.DoubleArray; import org.elasticsearch.common.util.LongArray; diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstFloatByTimestampAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstFloatByTimestampAggregator.java index ade7d17d2ae47..c31b2ba50fe44 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstFloatByTimestampAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstFloatByTimestampAggregator.java @@ -8,6 +8,10 @@ package org.elasticsearch.compute.aggregation; // begin generated imports +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.util.ObjectArray; +import org.elasticsearch.compute.operator.BreakingBytesRefBuilder; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.FloatArray; import org.elasticsearch.common.util.LongArray; diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstIntByTimestampAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstIntByTimestampAggregator.java index 89af438983a48..a95eebc397ce4 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstIntByTimestampAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstIntByTimestampAggregator.java @@ -8,6 +8,10 @@ package org.elasticsearch.compute.aggregation; // begin generated imports +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.util.ObjectArray; +import org.elasticsearch.compute.operator.BreakingBytesRefBuilder; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.IntArray; import org.elasticsearch.common.util.LongArray; diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstLongByTimestampAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstLongByTimestampAggregator.java index 96f378dd8ab2c..9b0e183fa2172 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstLongByTimestampAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstLongByTimestampAggregator.java @@ -8,6 +8,10 @@ package org.elasticsearch.compute.aggregation; // begin generated imports +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.util.ObjectArray; +import org.elasticsearch.compute.operator.BreakingBytesRefBuilder; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.LongArray; import org.elasticsearch.common.util.LongArray; diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastBytesRefByTimestampAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastBytesRefByTimestampAggregator.java new file mode 100644 index 0000000000000..afb8a4b8d30f7 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastBytesRefByTimestampAggregator.java @@ -0,0 +1,209 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.aggregation; + +// begin generated imports +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.util.ObjectArray; +import org.elasticsearch.compute.operator.BreakingBytesRefBuilder; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.BytesRefArray; +import org.elasticsearch.common.util.LongArray; +import org.elasticsearch.compute.ann.Aggregator; +import org.elasticsearch.compute.ann.GroupingAggregator; +import org.elasticsearch.compute.ann.IntermediateState; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BooleanBlock; +import org.elasticsearch.compute.data.BytesRefBlock; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.core.Releasables; +// end generated imports + +/** + * A time-series aggregation function that collects the Last occurrence value of a time series in a specified interval. + * This class is generated. Edit `X-ValueByTimestampAggregator.java.st` instead. + */ +@Aggregator( + { + @IntermediateState(name = "timestamps", type = "LONG"), + @IntermediateState(name = "values", type = "BYTES_REF"), + @IntermediateState(name = "seen", type = "BOOLEAN") } +) +@GroupingAggregator( + { @IntermediateState(name = "timestamps", type = "LONG_BLOCK"), @IntermediateState(name = "values", type = "BYTES_REF_BLOCK") } +) +public class LastBytesRefByTimestampAggregator { + public static String describe() { + return "last_BytesRef_by_timestamp"; + } + + public static LongBytesRefState initSingle(DriverContext driverContext) { + return new LongBytesRefState(0, new BytesRef(), driverContext.breaker(), describe()); + } + + public static void first(LongBytesRefState current, BytesRef value, long timestamp) { + current.v1(timestamp); + current.v2(value); + } + + public static void combine(LongBytesRefState current, BytesRef value, long timestamp) { + if (timestamp > current.v1()) { + current.v1(timestamp); + current.v2(value); + } + } + + public static void combineIntermediate(LongBytesRefState current, long timestamp, BytesRef value, boolean seen) { + if (seen) { + if (current.seen()) { + combine(current, value, timestamp); + } else { + first(current, value, timestamp); + current.seen(true); + } + } + } + + public static Block evaluateFinal(LongBytesRefState current, DriverContext ctx) { + return ctx.blockFactory().newConstantBytesRefBlockWith(current.v2(), 1); + } + + public static GroupingState initGrouping(DriverContext driverContext) { + return new GroupingState(driverContext.bigArrays(), driverContext.breaker()); + } + + public static void combine(GroupingState current, int groupId, BytesRef value, long timestamp) { + current.collectValue(groupId, timestamp, value); + } + + public static void combineIntermediate( + GroupingState current, + int groupId, + LongBlock timestamps, // stylecheck + BytesRefBlock values, + int otherPosition + ) { + // TODO seen should probably be part of the intermediate representation + int valueCount = values.getValueCount(otherPosition); + if (valueCount > 0) { + long timestamp = timestamps.getLong(timestamps.getFirstValueIndex(otherPosition)); + int firstIndex = values.getFirstValueIndex(otherPosition); + BytesRef bytesScratch = new BytesRef(); + for (int i = 0; i < valueCount; i++) { + current.collectValue(groupId, timestamp, values.getBytesRef(firstIndex + i, bytesScratch)); + } + } + } + + public static Block evaluateFinal(GroupingState state, IntVector selected, GroupingAggregatorEvaluationContext ctx) { + return state.evaluateFinal(selected, ctx); + } + + public static final class GroupingState extends AbstractArrayState { + private final BigArrays bigArrays; + private LongArray timestamps; + private ObjectArray values; + private final CircuitBreaker breaker; + private int maxGroupId = -1; + + GroupingState(BigArrays bigArrays, CircuitBreaker breaker) { + super(bigArrays); + this.bigArrays = bigArrays; + boolean success = false; + this.breaker = breaker; + LongArray timestamps = null; + try { + timestamps = bigArrays.newLongArray(1, false); + this.timestamps = timestamps; + this.values = bigArrays.newObjectArray(1); + /* + * Enable group id tracking because we use has hasValue in the + * collection itself to detect the when a value first arrives. + */ + enableGroupIdTracking(new SeenGroupIds.Empty()); + success = true; + } finally { + if (success == false) { + Releasables.close(timestamps, values, super::close); + } + } + } + + void collectValue(int groupId, long timestamp, BytesRef value) { + boolean updated = false; + if (groupId < timestamps.size()) { + // TODO: handle multiple values? + if (groupId > maxGroupId || hasValue(groupId) == false || timestamps.get(groupId) < timestamp) { + timestamps.set(groupId, timestamp); + updated = true; + } + } else { + timestamps = bigArrays.grow(timestamps, groupId + 1); + timestamps.set(groupId, timestamp); + updated = true; + } + if (updated) { + values = bigArrays.grow(values, groupId + 1); + BreakingBytesRefBuilder builder = values.get(groupId); + if (builder == null) { + builder = new BreakingBytesRefBuilder(breaker, "Last", value.length); + } + builder.copyBytes(value); + values.set(groupId, builder); + } + maxGroupId = Math.max(maxGroupId, groupId); + trackGroupId(groupId); + } + + @Override + public void close() { + for (long i = 0; i < values.size(); i++) { + Releasables.close(values.get(i)); + } + Releasables.close(timestamps, values, super::close); + } + + @Override + public void toIntermediate(Block[] blocks, int offset, IntVector selected, DriverContext driverContext) { + try ( + var timestampsBuilder = driverContext.blockFactory().newLongBlockBuilder(selected.getPositionCount()); + var valuesBuilder = driverContext.blockFactory().newBytesRefBlockBuilder(selected.getPositionCount()) + ) { + for (int p = 0; p < selected.getPositionCount(); p++) { + int group = selected.getInt(p); + if (group < timestamps.size() && hasValue(group)) { + timestampsBuilder.appendLong(timestamps.get(group)); + valuesBuilder.appendBytesRef(values.get(group).bytesRefView()); + } else { + timestampsBuilder.appendNull(); + valuesBuilder.appendNull(); + } + } + blocks[offset] = timestampsBuilder.build(); + blocks[offset + 1] = valuesBuilder.build(); + } + } + + Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext evalContext) { + try (var builder = evalContext.blockFactory().newBytesRefBlockBuilder(selected.getPositionCount())) { + for (int p = 0; p < selected.getPositionCount(); p++) { + int group = selected.getInt(p); + if (group < timestamps.size() && hasValue(group)) { + builder.appendBytesRef(values.get(group).bytesRefView()); + } else { + builder.appendNull(); + } + } + return builder.build(); + } + } + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastDoubleByTimestampAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastDoubleByTimestampAggregator.java index cb37ffef8683b..42207b1acfa36 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastDoubleByTimestampAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastDoubleByTimestampAggregator.java @@ -8,6 +8,10 @@ package org.elasticsearch.compute.aggregation; // begin generated imports +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.util.ObjectArray; +import org.elasticsearch.compute.operator.BreakingBytesRefBuilder; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.DoubleArray; import org.elasticsearch.common.util.LongArray; diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastFloatByTimestampAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastFloatByTimestampAggregator.java index 1010433a7b785..8bac613a7e788 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastFloatByTimestampAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastFloatByTimestampAggregator.java @@ -8,6 +8,10 @@ package org.elasticsearch.compute.aggregation; // begin generated imports +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.util.ObjectArray; +import org.elasticsearch.compute.operator.BreakingBytesRefBuilder; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.FloatArray; import org.elasticsearch.common.util.LongArray; diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastIntByTimestampAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastIntByTimestampAggregator.java index 59f57281675d1..9472484e02646 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastIntByTimestampAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastIntByTimestampAggregator.java @@ -8,6 +8,10 @@ package org.elasticsearch.compute.aggregation; // begin generated imports +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.util.ObjectArray; +import org.elasticsearch.compute.operator.BreakingBytesRefBuilder; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.IntArray; import org.elasticsearch.common.util.LongArray; diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastLongByTimestampAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastLongByTimestampAggregator.java index 0c0118c321837..5790c33620b46 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastLongByTimestampAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastLongByTimestampAggregator.java @@ -8,6 +8,10 @@ package org.elasticsearch.compute.aggregation; // begin generated imports +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.util.ObjectArray; +import org.elasticsearch.compute.operator.BreakingBytesRefBuilder; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.LongArray; import org.elasticsearch.common.util.LongArray; diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LongBytesRefState.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LongBytesRefState.java new file mode 100644 index 0000000000000..8c9fa72d956fb --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LongBytesRefState.java @@ -0,0 +1,71 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.aggregation; + +// begin generated imports +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.compute.operator.BreakingBytesRefBuilder; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.core.Releasables; +// end generated imports + +/** + * Aggregator state for a single {@code long} and a single {@code BytesRef}. + * This class is generated. Edit {@code X-2State.java.st} instead. + */ +final class LongBytesRefState implements AggregatorState { + private long v1; + private final BreakingBytesRefBuilder v2; + private boolean seen; + + LongBytesRefState(long v1, BytesRef v2, CircuitBreaker breaker, String label) { + this.v1 = v1; + this.v2 = new BreakingBytesRefBuilder(breaker, label, v2.length); + this.v2.copyBytes(v2); + } + + long v1() { + return v1; + } + + void v1(long v1) { + this.v1 = v1; + } + + BytesRef v2() { + return v2.bytesRefView(); + } + + void v2(BytesRef v2) { + this.v2.copyBytes(v2); + } + + boolean seen() { + return seen; + } + + void seen(boolean seen) { + this.seen = seen; + } + + /** Extracts an intermediate view of the contents of this state. */ + @Override + public void toIntermediate(Block[] blocks, int offset, DriverContext driverContext) { + assert blocks.length >= offset + 3; + blocks[offset + 0] = driverContext.blockFactory().newConstantLongBlockWith(v1, 1); + blocks[offset + 1] = driverContext.blockFactory().newConstantBytesRefBlockWith(v2.bytesRefView(), 1); + blocks[offset + 2] = driverContext.blockFactory().newConstantBooleanBlockWith(seen, 1); + } + + @Override + public void close() { + Releasables.close(this.v2); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LongDoubleState.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LongDoubleState.java index 47647d3179f47..99dd5a1e95340 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LongDoubleState.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LongDoubleState.java @@ -7,8 +7,14 @@ package org.elasticsearch.compute.aggregation; +// begin generated imports +import org.apache.lucene.util.BytesRef; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.compute.operator.BreakingBytesRefBuilder; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.core.Releasables; +// end generated imports /** * Aggregator state for a single {@code long} and a single {@code double}. diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LongFloatState.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LongFloatState.java index 0c96f31bca7de..3f5f724db298c 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LongFloatState.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LongFloatState.java @@ -7,8 +7,14 @@ package org.elasticsearch.compute.aggregation; +// begin generated imports +import org.apache.lucene.util.BytesRef; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.compute.operator.BreakingBytesRefBuilder; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.core.Releasables; +// end generated imports /** * Aggregator state for a single {@code long} and a single {@code float}. diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LongIntState.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LongIntState.java index 11ad9acc19334..40772bfd3521e 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LongIntState.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LongIntState.java @@ -7,8 +7,14 @@ package org.elasticsearch.compute.aggregation; +// begin generated imports +import org.apache.lucene.util.BytesRef; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.compute.operator.BreakingBytesRefBuilder; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.core.Releasables; +// end generated imports /** * Aggregator state for a single {@code long} and a single {@code int}. diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LongLongState.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LongLongState.java index ce5867bdaa884..f21b053d4f64b 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LongLongState.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LongLongState.java @@ -7,8 +7,14 @@ package org.elasticsearch.compute.aggregation; +// begin generated imports +import org.apache.lucene.util.BytesRef; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.compute.operator.BreakingBytesRefBuilder; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.core.Releasables; +// end generated imports /** * Aggregator state for a single {@code long} and a single {@code long}. diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstBytesRefByTimestampAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstBytesRefByTimestampAggregatorFunction.java new file mode 100644 index 0000000000000..08fefc3216150 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstBytesRefByTimestampAggregatorFunction.java @@ -0,0 +1,267 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.lang.StringBuilder; +import java.util.List; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BooleanBlock; +import org.elasticsearch.compute.data.BooleanVector; +import org.elasticsearch.compute.data.BytesRefBlock; +import org.elasticsearch.compute.data.BytesRefVector; +import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.data.LongVector; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link AggregatorFunction} implementation for {@link FirstBytesRefByTimestampAggregator}. + * This class is generated. Edit {@code AggregatorImplementer} instead. + */ +public final class FirstBytesRefByTimestampAggregatorFunction implements AggregatorFunction { + private static final List INTERMEDIATE_STATE_DESC = List.of( + new IntermediateStateDesc("timestamps", ElementType.LONG), + new IntermediateStateDesc("values", ElementType.BYTES_REF), + new IntermediateStateDesc("seen", ElementType.BOOLEAN) ); + + private final DriverContext driverContext; + + private final LongBytesRefState state; + + private final List channels; + + public FirstBytesRefByTimestampAggregatorFunction(DriverContext driverContext, + List channels, LongBytesRefState state) { + this.driverContext = driverContext; + this.channels = channels; + this.state = state; + } + + public static FirstBytesRefByTimestampAggregatorFunction create(DriverContext driverContext, + List channels) { + return new FirstBytesRefByTimestampAggregatorFunction(driverContext, channels, FirstBytesRefByTimestampAggregator.initSingle(driverContext)); + } + + public static List intermediateStateDesc() { + return INTERMEDIATE_STATE_DESC; + } + + @Override + public int intermediateBlockCount() { + return INTERMEDIATE_STATE_DESC.size(); + } + + @Override + public void addRawInput(Page page, BooleanVector mask) { + if (mask.allFalse()) { + // Entire page masked away + } else if (mask.allTrue()) { + addRawInputNotMasked(page); + } else { + addRawInputMasked(page, mask); + } + } + + private void addRawInputMasked(Page page, BooleanVector mask) { + BytesRefBlock valueBlock = page.getBlock(channels.get(0)); + LongBlock timestampBlock = page.getBlock(channels.get(1)); + BytesRefVector valueVector = valueBlock.asVector(); + if (valueVector == null) { + addRawBlock(valueBlock, timestampBlock, mask); + return; + } + LongVector timestampVector = timestampBlock.asVector(); + if (timestampVector == null) { + addRawBlock(valueBlock, timestampBlock, mask); + return; + } + addRawVector(valueVector, timestampVector, mask); + } + + private void addRawInputNotMasked(Page page) { + BytesRefBlock valueBlock = page.getBlock(channels.get(0)); + LongBlock timestampBlock = page.getBlock(channels.get(1)); + BytesRefVector valueVector = valueBlock.asVector(); + if (valueVector == null) { + addRawBlock(valueBlock, timestampBlock); + return; + } + LongVector timestampVector = timestampBlock.asVector(); + if (timestampVector == null) { + addRawBlock(valueBlock, timestampBlock); + return; + } + addRawVector(valueVector, timestampVector); + } + + private void addRawVector(BytesRefVector valueVector, LongVector timestampVector) { + BytesRef valueScratch = new BytesRef(); + // Find the first value up front in the Vector path which is more complex but should be faster + int valuesPosition = 0; + while (state.seen() == false && valuesPosition < valueVector.getPositionCount()) { + BytesRef valueValue = valueVector.getBytesRef(valuesPosition, valueScratch); + long timestampValue = timestampVector.getLong(valuesPosition); + FirstBytesRefByTimestampAggregator.first(state, valueValue, timestampValue); + valuesPosition++; + state.seen(true); + break; + } + while (valuesPosition < valueVector.getPositionCount()) { + BytesRef valueValue = valueVector.getBytesRef(valuesPosition, valueScratch); + long timestampValue = timestampVector.getLong(valuesPosition); + FirstBytesRefByTimestampAggregator.combine(state, valueValue, timestampValue); + valuesPosition++; + } + } + + private void addRawVector(BytesRefVector valueVector, LongVector timestampVector, + BooleanVector mask) { + BytesRef valueScratch = new BytesRef(); + // Find the first value up front in the Vector path which is more complex but should be faster + int valuesPosition = 0; + while (state.seen() == false && valuesPosition < valueVector.getPositionCount()) { + if (mask.getBoolean(valuesPosition) == false) { + valuesPosition++; + continue; + } + BytesRef valueValue = valueVector.getBytesRef(valuesPosition, valueScratch); + long timestampValue = timestampVector.getLong(valuesPosition); + FirstBytesRefByTimestampAggregator.first(state, valueValue, timestampValue); + valuesPosition++; + state.seen(true); + break; + } + while (valuesPosition < valueVector.getPositionCount()) { + if (mask.getBoolean(valuesPosition) == false) { + valuesPosition++; + continue; + } + BytesRef valueValue = valueVector.getBytesRef(valuesPosition, valueScratch); + long timestampValue = timestampVector.getLong(valuesPosition); + FirstBytesRefByTimestampAggregator.combine(state, valueValue, timestampValue); + valuesPosition++; + } + } + + private void addRawBlock(BytesRefBlock valueBlock, LongBlock timestampBlock) { + BytesRef valueScratch = new BytesRef(); + for (int p = 0; p < valueBlock.getPositionCount(); p++) { + if (valueBlock.isNull(p)) { + continue; + } + if (timestampBlock.isNull(p)) { + continue; + } + int valueStart = valueBlock.getFirstValueIndex(p); + int valueEnd = valueStart + valueBlock.getValueCount(p); + for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { + BytesRef valueValue = valueBlock.getBytesRef(valueOffset, valueScratch); + int timestampStart = timestampBlock.getFirstValueIndex(p); + int timestampEnd = timestampStart + timestampBlock.getValueCount(p); + for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) { + long timestampValue = timestampBlock.getLong(timestampOffset); + // Check seen in every iteration to save on complexity in the Block path + if (state.seen()) { + FirstBytesRefByTimestampAggregator.combine(state, valueValue, timestampValue); + } else { + state.seen(true); + FirstBytesRefByTimestampAggregator.first(state, valueValue, timestampValue); + } + } + } + } + } + + private void addRawBlock(BytesRefBlock valueBlock, LongBlock timestampBlock, BooleanVector mask) { + BytesRef valueScratch = new BytesRef(); + for (int p = 0; p < valueBlock.getPositionCount(); p++) { + if (mask.getBoolean(p) == false) { + continue; + } + if (valueBlock.isNull(p)) { + continue; + } + if (timestampBlock.isNull(p)) { + continue; + } + int valueStart = valueBlock.getFirstValueIndex(p); + int valueEnd = valueStart + valueBlock.getValueCount(p); + for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { + BytesRef valueValue = valueBlock.getBytesRef(valueOffset, valueScratch); + int timestampStart = timestampBlock.getFirstValueIndex(p); + int timestampEnd = timestampStart + timestampBlock.getValueCount(p); + for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) { + long timestampValue = timestampBlock.getLong(timestampOffset); + // Check seen in every iteration to save on complexity in the Block path + if (state.seen()) { + FirstBytesRefByTimestampAggregator.combine(state, valueValue, timestampValue); + } else { + state.seen(true); + FirstBytesRefByTimestampAggregator.first(state, valueValue, timestampValue); + } + } + } + } + } + + @Override + public void addIntermediateInput(Page page) { + assert channels.size() == intermediateBlockCount(); + assert page.getBlockCount() >= channels.get(0) + intermediateStateDesc().size(); + Block timestampsUncast = page.getBlock(channels.get(0)); + if (timestampsUncast.areAllValuesNull()) { + return; + } + LongVector timestamps = ((LongBlock) timestampsUncast).asVector(); + assert timestamps.getPositionCount() == 1; + Block valuesUncast = page.getBlock(channels.get(1)); + if (valuesUncast.areAllValuesNull()) { + return; + } + BytesRefVector values = ((BytesRefBlock) valuesUncast).asVector(); + assert values.getPositionCount() == 1; + Block seenUncast = page.getBlock(channels.get(2)); + if (seenUncast.areAllValuesNull()) { + return; + } + BooleanVector seen = ((BooleanBlock) seenUncast).asVector(); + assert seen.getPositionCount() == 1; + BytesRef scratch = new BytesRef(); + FirstBytesRefByTimestampAggregator.combineIntermediate(state, timestamps.getLong(0), values.getBytesRef(0, scratch), seen.getBoolean(0)); + } + + @Override + public void evaluateIntermediate(Block[] blocks, int offset, DriverContext driverContext) { + state.toIntermediate(blocks, offset, driverContext); + } + + @Override + public void evaluateFinal(Block[] blocks, int offset, DriverContext driverContext) { + if (state.seen() == false) { + blocks[offset] = driverContext.blockFactory().newConstantNullBlock(1); + return; + } + blocks[offset] = FirstBytesRefByTimestampAggregator.evaluateFinal(state, driverContext); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(getClass().getSimpleName()).append("["); + sb.append("channels=").append(channels); + sb.append("]"); + return sb.toString(); + } + + @Override + public void close() { + state.close(); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstBytesRefByTimestampAggregatorFunctionSupplier.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstBytesRefByTimestampAggregatorFunctionSupplier.java new file mode 100644 index 0000000000000..0a44288d7a8d8 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstBytesRefByTimestampAggregatorFunctionSupplier.java @@ -0,0 +1,47 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.util.List; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link AggregatorFunctionSupplier} implementation for {@link FirstBytesRefByTimestampAggregator}. + * This class is generated. Edit {@code AggregatorFunctionSupplierImplementer} instead. + */ +public final class FirstBytesRefByTimestampAggregatorFunctionSupplier implements AggregatorFunctionSupplier { + public FirstBytesRefByTimestampAggregatorFunctionSupplier() { + } + + @Override + public List nonGroupingIntermediateStateDesc() { + return FirstBytesRefByTimestampAggregatorFunction.intermediateStateDesc(); + } + + @Override + public List groupingIntermediateStateDesc() { + return FirstBytesRefByTimestampGroupingAggregatorFunction.intermediateStateDesc(); + } + + @Override + public FirstBytesRefByTimestampAggregatorFunction aggregator(DriverContext driverContext, + List channels) { + return FirstBytesRefByTimestampAggregatorFunction.create(driverContext, channels); + } + + @Override + public FirstBytesRefByTimestampGroupingAggregatorFunction groupingAggregator( + DriverContext driverContext, List channels) { + return FirstBytesRefByTimestampGroupingAggregatorFunction.create(channels, driverContext); + } + + @Override + public String describe() { + return FirstBytesRefByTimestampAggregator.describe(); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstBytesRefByTimestampGroupingAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstBytesRefByTimestampGroupingAggregatorFunction.java new file mode 100644 index 0000000000000..aee8d6296dc02 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstBytesRefByTimestampGroupingAggregatorFunction.java @@ -0,0 +1,400 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.lang.StringBuilder; +import java.util.List; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BytesRefBlock; +import org.elasticsearch.compute.data.BytesRefVector; +import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.IntArrayBlock; +import org.elasticsearch.compute.data.IntBigArrayBlock; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.data.LongVector; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link GroupingAggregatorFunction} implementation for {@link FirstBytesRefByTimestampAggregator}. + * This class is generated. Edit {@code GroupingAggregatorImplementer} instead. + */ +public final class FirstBytesRefByTimestampGroupingAggregatorFunction implements GroupingAggregatorFunction { + private static final List INTERMEDIATE_STATE_DESC = List.of( + new IntermediateStateDesc("timestamps", ElementType.LONG), + new IntermediateStateDesc("values", ElementType.BYTES_REF) ); + + private final FirstBytesRefByTimestampAggregator.GroupingState state; + + private final List channels; + + private final DriverContext driverContext; + + public FirstBytesRefByTimestampGroupingAggregatorFunction(List channels, + FirstBytesRefByTimestampAggregator.GroupingState state, DriverContext driverContext) { + this.channels = channels; + this.state = state; + this.driverContext = driverContext; + } + + public static FirstBytesRefByTimestampGroupingAggregatorFunction create(List channels, + DriverContext driverContext) { + return new FirstBytesRefByTimestampGroupingAggregatorFunction(channels, FirstBytesRefByTimestampAggregator.initGrouping(driverContext), driverContext); + } + + public static List intermediateStateDesc() { + return INTERMEDIATE_STATE_DESC; + } + + @Override + public int intermediateBlockCount() { + return INTERMEDIATE_STATE_DESC.size(); + } + + @Override + public GroupingAggregatorFunction.AddInput prepareProcessRawInputPage(SeenGroupIds seenGroupIds, + Page page) { + BytesRefBlock valueBlock = page.getBlock(channels.get(0)); + LongBlock timestampBlock = page.getBlock(channels.get(1)); + BytesRefVector valueVector = valueBlock.asVector(); + if (valueVector == null) { + maybeEnableGroupIdTracking(seenGroupIds, valueBlock, timestampBlock); + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, valueBlock, timestampBlock); + } + + @Override + public void add(int positionOffset, IntBigArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, valueBlock, timestampBlock); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, valueBlock, timestampBlock); + } + + @Override + public void close() { + } + }; + } + LongVector timestampVector = timestampBlock.asVector(); + if (timestampVector == null) { + maybeEnableGroupIdTracking(seenGroupIds, valueBlock, timestampBlock); + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, valueBlock, timestampBlock); + } + + @Override + public void add(int positionOffset, IntBigArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, valueBlock, timestampBlock); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, valueBlock, timestampBlock); + } + + @Override + public void close() { + } + }; + } + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, valueVector, timestampVector); + } + + @Override + public void add(int positionOffset, IntBigArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, valueVector, timestampVector); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, valueVector, timestampVector); + } + + @Override + public void close() { + } + }; + } + + private void addRawInput(int positionOffset, IntArrayBlock groups, BytesRefBlock valueBlock, + LongBlock timestampBlock) { + BytesRef valueScratch = new BytesRef(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int valuesPosition = groupPosition + positionOffset; + if (valueBlock.isNull(valuesPosition)) { + continue; + } + if (timestampBlock.isNull(valuesPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + int valueStart = valueBlock.getFirstValueIndex(valuesPosition); + int valueEnd = valueStart + valueBlock.getValueCount(valuesPosition); + for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { + BytesRef valueValue = valueBlock.getBytesRef(valueOffset, valueScratch); + int timestampStart = timestampBlock.getFirstValueIndex(valuesPosition); + int timestampEnd = timestampStart + timestampBlock.getValueCount(valuesPosition); + for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) { + long timestampValue = timestampBlock.getLong(timestampOffset); + FirstBytesRefByTimestampAggregator.combine(state, groupId, valueValue, timestampValue); + } + } + } + } + } + + private void addRawInput(int positionOffset, IntArrayBlock groups, BytesRefVector valueVector, + LongVector timestampVector) { + BytesRef valueScratch = new BytesRef(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int valuesPosition = groupPosition + positionOffset; + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + BytesRef valueValue = valueVector.getBytesRef(valuesPosition, valueScratch); + long timestampValue = timestampVector.getLong(valuesPosition); + FirstBytesRefByTimestampAggregator.combine(state, groupId, valueValue, timestampValue); + } + } + } + + @Override + public void addIntermediateInput(int positionOffset, IntArrayBlock groups, Page page) { + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + assert channels.size() == intermediateBlockCount(); + Block timestampsUncast = page.getBlock(channels.get(0)); + if (timestampsUncast.areAllValuesNull()) { + return; + } + LongBlock timestamps = (LongBlock) timestampsUncast; + Block valuesUncast = page.getBlock(channels.get(1)); + if (valuesUncast.areAllValuesNull()) { + return; + } + BytesRefBlock values = (BytesRefBlock) valuesUncast; + assert timestamps.getPositionCount() == values.getPositionCount(); + BytesRef scratch = new BytesRef(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + int valuesPosition = groupPosition + positionOffset; + FirstBytesRefByTimestampAggregator.combineIntermediate(state, groupId, timestamps, values, valuesPosition); + } + } + } + + private void addRawInput(int positionOffset, IntBigArrayBlock groups, BytesRefBlock valueBlock, + LongBlock timestampBlock) { + BytesRef valueScratch = new BytesRef(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int valuesPosition = groupPosition + positionOffset; + if (valueBlock.isNull(valuesPosition)) { + continue; + } + if (timestampBlock.isNull(valuesPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + int valueStart = valueBlock.getFirstValueIndex(valuesPosition); + int valueEnd = valueStart + valueBlock.getValueCount(valuesPosition); + for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { + BytesRef valueValue = valueBlock.getBytesRef(valueOffset, valueScratch); + int timestampStart = timestampBlock.getFirstValueIndex(valuesPosition); + int timestampEnd = timestampStart + timestampBlock.getValueCount(valuesPosition); + for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) { + long timestampValue = timestampBlock.getLong(timestampOffset); + FirstBytesRefByTimestampAggregator.combine(state, groupId, valueValue, timestampValue); + } + } + } + } + } + + private void addRawInput(int positionOffset, IntBigArrayBlock groups, BytesRefVector valueVector, + LongVector timestampVector) { + BytesRef valueScratch = new BytesRef(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int valuesPosition = groupPosition + positionOffset; + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + BytesRef valueValue = valueVector.getBytesRef(valuesPosition, valueScratch); + long timestampValue = timestampVector.getLong(valuesPosition); + FirstBytesRefByTimestampAggregator.combine(state, groupId, valueValue, timestampValue); + } + } + } + + @Override + public void addIntermediateInput(int positionOffset, IntBigArrayBlock groups, Page page) { + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + assert channels.size() == intermediateBlockCount(); + Block timestampsUncast = page.getBlock(channels.get(0)); + if (timestampsUncast.areAllValuesNull()) { + return; + } + LongBlock timestamps = (LongBlock) timestampsUncast; + Block valuesUncast = page.getBlock(channels.get(1)); + if (valuesUncast.areAllValuesNull()) { + return; + } + BytesRefBlock values = (BytesRefBlock) valuesUncast; + assert timestamps.getPositionCount() == values.getPositionCount(); + BytesRef scratch = new BytesRef(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + int valuesPosition = groupPosition + positionOffset; + FirstBytesRefByTimestampAggregator.combineIntermediate(state, groupId, timestamps, values, valuesPosition); + } + } + } + + private void addRawInput(int positionOffset, IntVector groups, BytesRefBlock valueBlock, + LongBlock timestampBlock) { + BytesRef valueScratch = new BytesRef(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int valuesPosition = groupPosition + positionOffset; + if (valueBlock.isNull(valuesPosition)) { + continue; + } + if (timestampBlock.isNull(valuesPosition)) { + continue; + } + int groupId = groups.getInt(groupPosition); + int valueStart = valueBlock.getFirstValueIndex(valuesPosition); + int valueEnd = valueStart + valueBlock.getValueCount(valuesPosition); + for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { + BytesRef valueValue = valueBlock.getBytesRef(valueOffset, valueScratch); + int timestampStart = timestampBlock.getFirstValueIndex(valuesPosition); + int timestampEnd = timestampStart + timestampBlock.getValueCount(valuesPosition); + for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) { + long timestampValue = timestampBlock.getLong(timestampOffset); + FirstBytesRefByTimestampAggregator.combine(state, groupId, valueValue, timestampValue); + } + } + } + } + + private void addRawInput(int positionOffset, IntVector groups, BytesRefVector valueVector, + LongVector timestampVector) { + BytesRef valueScratch = new BytesRef(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int valuesPosition = groupPosition + positionOffset; + int groupId = groups.getInt(groupPosition); + BytesRef valueValue = valueVector.getBytesRef(valuesPosition, valueScratch); + long timestampValue = timestampVector.getLong(valuesPosition); + FirstBytesRefByTimestampAggregator.combine(state, groupId, valueValue, timestampValue); + } + } + + @Override + public void addIntermediateInput(int positionOffset, IntVector groups, Page page) { + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + assert channels.size() == intermediateBlockCount(); + Block timestampsUncast = page.getBlock(channels.get(0)); + if (timestampsUncast.areAllValuesNull()) { + return; + } + LongBlock timestamps = (LongBlock) timestampsUncast; + Block valuesUncast = page.getBlock(channels.get(1)); + if (valuesUncast.areAllValuesNull()) { + return; + } + BytesRefBlock values = (BytesRefBlock) valuesUncast; + assert timestamps.getPositionCount() == values.getPositionCount(); + BytesRef scratch = new BytesRef(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + int valuesPosition = groupPosition + positionOffset; + FirstBytesRefByTimestampAggregator.combineIntermediate(state, groupId, timestamps, values, valuesPosition); + } + } + + private void maybeEnableGroupIdTracking(SeenGroupIds seenGroupIds, BytesRefBlock valueBlock, + LongBlock timestampBlock) { + if (valueBlock.mayHaveNulls()) { + state.enableGroupIdTracking(seenGroupIds); + } + if (timestampBlock.mayHaveNulls()) { + state.enableGroupIdTracking(seenGroupIds); + } + } + + @Override + public void selectedMayContainUnseenGroups(SeenGroupIds seenGroupIds) { + state.enableGroupIdTracking(seenGroupIds); + } + + @Override + public void evaluateIntermediate(Block[] blocks, int offset, IntVector selected) { + state.toIntermediate(blocks, offset, selected, driverContext); + } + + @Override + public void evaluateFinal(Block[] blocks, int offset, IntVector selected, + GroupingAggregatorEvaluationContext ctx) { + blocks[offset] = FirstBytesRefByTimestampAggregator.evaluateFinal(state, selected, ctx); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(getClass().getSimpleName()).append("["); + sb.append("channels=").append(channels); + sb.append("]"); + return sb.toString(); + } + + @Override + public void close() { + state.close(); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastBytesRefByTimestampAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastBytesRefByTimestampAggregatorFunction.java new file mode 100644 index 0000000000000..ad67c8c8fd549 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastBytesRefByTimestampAggregatorFunction.java @@ -0,0 +1,267 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.lang.StringBuilder; +import java.util.List; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BooleanBlock; +import org.elasticsearch.compute.data.BooleanVector; +import org.elasticsearch.compute.data.BytesRefBlock; +import org.elasticsearch.compute.data.BytesRefVector; +import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.data.LongVector; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link AggregatorFunction} implementation for {@link LastBytesRefByTimestampAggregator}. + * This class is generated. Edit {@code AggregatorImplementer} instead. + */ +public final class LastBytesRefByTimestampAggregatorFunction implements AggregatorFunction { + private static final List INTERMEDIATE_STATE_DESC = List.of( + new IntermediateStateDesc("timestamps", ElementType.LONG), + new IntermediateStateDesc("values", ElementType.BYTES_REF), + new IntermediateStateDesc("seen", ElementType.BOOLEAN) ); + + private final DriverContext driverContext; + + private final LongBytesRefState state; + + private final List channels; + + public LastBytesRefByTimestampAggregatorFunction(DriverContext driverContext, + List channels, LongBytesRefState state) { + this.driverContext = driverContext; + this.channels = channels; + this.state = state; + } + + public static LastBytesRefByTimestampAggregatorFunction create(DriverContext driverContext, + List channels) { + return new LastBytesRefByTimestampAggregatorFunction(driverContext, channels, LastBytesRefByTimestampAggregator.initSingle(driverContext)); + } + + public static List intermediateStateDesc() { + return INTERMEDIATE_STATE_DESC; + } + + @Override + public int intermediateBlockCount() { + return INTERMEDIATE_STATE_DESC.size(); + } + + @Override + public void addRawInput(Page page, BooleanVector mask) { + if (mask.allFalse()) { + // Entire page masked away + } else if (mask.allTrue()) { + addRawInputNotMasked(page); + } else { + addRawInputMasked(page, mask); + } + } + + private void addRawInputMasked(Page page, BooleanVector mask) { + BytesRefBlock valueBlock = page.getBlock(channels.get(0)); + LongBlock timestampBlock = page.getBlock(channels.get(1)); + BytesRefVector valueVector = valueBlock.asVector(); + if (valueVector == null) { + addRawBlock(valueBlock, timestampBlock, mask); + return; + } + LongVector timestampVector = timestampBlock.asVector(); + if (timestampVector == null) { + addRawBlock(valueBlock, timestampBlock, mask); + return; + } + addRawVector(valueVector, timestampVector, mask); + } + + private void addRawInputNotMasked(Page page) { + BytesRefBlock valueBlock = page.getBlock(channels.get(0)); + LongBlock timestampBlock = page.getBlock(channels.get(1)); + BytesRefVector valueVector = valueBlock.asVector(); + if (valueVector == null) { + addRawBlock(valueBlock, timestampBlock); + return; + } + LongVector timestampVector = timestampBlock.asVector(); + if (timestampVector == null) { + addRawBlock(valueBlock, timestampBlock); + return; + } + addRawVector(valueVector, timestampVector); + } + + private void addRawVector(BytesRefVector valueVector, LongVector timestampVector) { + BytesRef valueScratch = new BytesRef(); + // Find the first value up front in the Vector path which is more complex but should be faster + int valuesPosition = 0; + while (state.seen() == false && valuesPosition < valueVector.getPositionCount()) { + BytesRef valueValue = valueVector.getBytesRef(valuesPosition, valueScratch); + long timestampValue = timestampVector.getLong(valuesPosition); + LastBytesRefByTimestampAggregator.first(state, valueValue, timestampValue); + valuesPosition++; + state.seen(true); + break; + } + while (valuesPosition < valueVector.getPositionCount()) { + BytesRef valueValue = valueVector.getBytesRef(valuesPosition, valueScratch); + long timestampValue = timestampVector.getLong(valuesPosition); + LastBytesRefByTimestampAggregator.combine(state, valueValue, timestampValue); + valuesPosition++; + } + } + + private void addRawVector(BytesRefVector valueVector, LongVector timestampVector, + BooleanVector mask) { + BytesRef valueScratch = new BytesRef(); + // Find the first value up front in the Vector path which is more complex but should be faster + int valuesPosition = 0; + while (state.seen() == false && valuesPosition < valueVector.getPositionCount()) { + if (mask.getBoolean(valuesPosition) == false) { + valuesPosition++; + continue; + } + BytesRef valueValue = valueVector.getBytesRef(valuesPosition, valueScratch); + long timestampValue = timestampVector.getLong(valuesPosition); + LastBytesRefByTimestampAggregator.first(state, valueValue, timestampValue); + valuesPosition++; + state.seen(true); + break; + } + while (valuesPosition < valueVector.getPositionCount()) { + if (mask.getBoolean(valuesPosition) == false) { + valuesPosition++; + continue; + } + BytesRef valueValue = valueVector.getBytesRef(valuesPosition, valueScratch); + long timestampValue = timestampVector.getLong(valuesPosition); + LastBytesRefByTimestampAggregator.combine(state, valueValue, timestampValue); + valuesPosition++; + } + } + + private void addRawBlock(BytesRefBlock valueBlock, LongBlock timestampBlock) { + BytesRef valueScratch = new BytesRef(); + for (int p = 0; p < valueBlock.getPositionCount(); p++) { + if (valueBlock.isNull(p)) { + continue; + } + if (timestampBlock.isNull(p)) { + continue; + } + int valueStart = valueBlock.getFirstValueIndex(p); + int valueEnd = valueStart + valueBlock.getValueCount(p); + for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { + BytesRef valueValue = valueBlock.getBytesRef(valueOffset, valueScratch); + int timestampStart = timestampBlock.getFirstValueIndex(p); + int timestampEnd = timestampStart + timestampBlock.getValueCount(p); + for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) { + long timestampValue = timestampBlock.getLong(timestampOffset); + // Check seen in every iteration to save on complexity in the Block path + if (state.seen()) { + LastBytesRefByTimestampAggregator.combine(state, valueValue, timestampValue); + } else { + state.seen(true); + LastBytesRefByTimestampAggregator.first(state, valueValue, timestampValue); + } + } + } + } + } + + private void addRawBlock(BytesRefBlock valueBlock, LongBlock timestampBlock, BooleanVector mask) { + BytesRef valueScratch = new BytesRef(); + for (int p = 0; p < valueBlock.getPositionCount(); p++) { + if (mask.getBoolean(p) == false) { + continue; + } + if (valueBlock.isNull(p)) { + continue; + } + if (timestampBlock.isNull(p)) { + continue; + } + int valueStart = valueBlock.getFirstValueIndex(p); + int valueEnd = valueStart + valueBlock.getValueCount(p); + for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { + BytesRef valueValue = valueBlock.getBytesRef(valueOffset, valueScratch); + int timestampStart = timestampBlock.getFirstValueIndex(p); + int timestampEnd = timestampStart + timestampBlock.getValueCount(p); + for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) { + long timestampValue = timestampBlock.getLong(timestampOffset); + // Check seen in every iteration to save on complexity in the Block path + if (state.seen()) { + LastBytesRefByTimestampAggregator.combine(state, valueValue, timestampValue); + } else { + state.seen(true); + LastBytesRefByTimestampAggregator.first(state, valueValue, timestampValue); + } + } + } + } + } + + @Override + public void addIntermediateInput(Page page) { + assert channels.size() == intermediateBlockCount(); + assert page.getBlockCount() >= channels.get(0) + intermediateStateDesc().size(); + Block timestampsUncast = page.getBlock(channels.get(0)); + if (timestampsUncast.areAllValuesNull()) { + return; + } + LongVector timestamps = ((LongBlock) timestampsUncast).asVector(); + assert timestamps.getPositionCount() == 1; + Block valuesUncast = page.getBlock(channels.get(1)); + if (valuesUncast.areAllValuesNull()) { + return; + } + BytesRefVector values = ((BytesRefBlock) valuesUncast).asVector(); + assert values.getPositionCount() == 1; + Block seenUncast = page.getBlock(channels.get(2)); + if (seenUncast.areAllValuesNull()) { + return; + } + BooleanVector seen = ((BooleanBlock) seenUncast).asVector(); + assert seen.getPositionCount() == 1; + BytesRef scratch = new BytesRef(); + LastBytesRefByTimestampAggregator.combineIntermediate(state, timestamps.getLong(0), values.getBytesRef(0, scratch), seen.getBoolean(0)); + } + + @Override + public void evaluateIntermediate(Block[] blocks, int offset, DriverContext driverContext) { + state.toIntermediate(blocks, offset, driverContext); + } + + @Override + public void evaluateFinal(Block[] blocks, int offset, DriverContext driverContext) { + if (state.seen() == false) { + blocks[offset] = driverContext.blockFactory().newConstantNullBlock(1); + return; + } + blocks[offset] = LastBytesRefByTimestampAggregator.evaluateFinal(state, driverContext); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(getClass().getSimpleName()).append("["); + sb.append("channels=").append(channels); + sb.append("]"); + return sb.toString(); + } + + @Override + public void close() { + state.close(); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastBytesRefByTimestampAggregatorFunctionSupplier.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastBytesRefByTimestampAggregatorFunctionSupplier.java new file mode 100644 index 0000000000000..7953f76a49913 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastBytesRefByTimestampAggregatorFunctionSupplier.java @@ -0,0 +1,47 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.util.List; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link AggregatorFunctionSupplier} implementation for {@link LastBytesRefByTimestampAggregator}. + * This class is generated. Edit {@code AggregatorFunctionSupplierImplementer} instead. + */ +public final class LastBytesRefByTimestampAggregatorFunctionSupplier implements AggregatorFunctionSupplier { + public LastBytesRefByTimestampAggregatorFunctionSupplier() { + } + + @Override + public List nonGroupingIntermediateStateDesc() { + return LastBytesRefByTimestampAggregatorFunction.intermediateStateDesc(); + } + + @Override + public List groupingIntermediateStateDesc() { + return LastBytesRefByTimestampGroupingAggregatorFunction.intermediateStateDesc(); + } + + @Override + public LastBytesRefByTimestampAggregatorFunction aggregator(DriverContext driverContext, + List channels) { + return LastBytesRefByTimestampAggregatorFunction.create(driverContext, channels); + } + + @Override + public LastBytesRefByTimestampGroupingAggregatorFunction groupingAggregator( + DriverContext driverContext, List channels) { + return LastBytesRefByTimestampGroupingAggregatorFunction.create(channels, driverContext); + } + + @Override + public String describe() { + return LastBytesRefByTimestampAggregator.describe(); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastBytesRefByTimestampGroupingAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastBytesRefByTimestampGroupingAggregatorFunction.java new file mode 100644 index 0000000000000..c5ee2afaf04c7 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastBytesRefByTimestampGroupingAggregatorFunction.java @@ -0,0 +1,400 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.lang.StringBuilder; +import java.util.List; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BytesRefBlock; +import org.elasticsearch.compute.data.BytesRefVector; +import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.IntArrayBlock; +import org.elasticsearch.compute.data.IntBigArrayBlock; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.data.LongVector; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link GroupingAggregatorFunction} implementation for {@link LastBytesRefByTimestampAggregator}. + * This class is generated. Edit {@code GroupingAggregatorImplementer} instead. + */ +public final class LastBytesRefByTimestampGroupingAggregatorFunction implements GroupingAggregatorFunction { + private static final List INTERMEDIATE_STATE_DESC = List.of( + new IntermediateStateDesc("timestamps", ElementType.LONG), + new IntermediateStateDesc("values", ElementType.BYTES_REF) ); + + private final LastBytesRefByTimestampAggregator.GroupingState state; + + private final List channels; + + private final DriverContext driverContext; + + public LastBytesRefByTimestampGroupingAggregatorFunction(List channels, + LastBytesRefByTimestampAggregator.GroupingState state, DriverContext driverContext) { + this.channels = channels; + this.state = state; + this.driverContext = driverContext; + } + + public static LastBytesRefByTimestampGroupingAggregatorFunction create(List channels, + DriverContext driverContext) { + return new LastBytesRefByTimestampGroupingAggregatorFunction(channels, LastBytesRefByTimestampAggregator.initGrouping(driverContext), driverContext); + } + + public static List intermediateStateDesc() { + return INTERMEDIATE_STATE_DESC; + } + + @Override + public int intermediateBlockCount() { + return INTERMEDIATE_STATE_DESC.size(); + } + + @Override + public GroupingAggregatorFunction.AddInput prepareProcessRawInputPage(SeenGroupIds seenGroupIds, + Page page) { + BytesRefBlock valueBlock = page.getBlock(channels.get(0)); + LongBlock timestampBlock = page.getBlock(channels.get(1)); + BytesRefVector valueVector = valueBlock.asVector(); + if (valueVector == null) { + maybeEnableGroupIdTracking(seenGroupIds, valueBlock, timestampBlock); + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, valueBlock, timestampBlock); + } + + @Override + public void add(int positionOffset, IntBigArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, valueBlock, timestampBlock); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, valueBlock, timestampBlock); + } + + @Override + public void close() { + } + }; + } + LongVector timestampVector = timestampBlock.asVector(); + if (timestampVector == null) { + maybeEnableGroupIdTracking(seenGroupIds, valueBlock, timestampBlock); + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, valueBlock, timestampBlock); + } + + @Override + public void add(int positionOffset, IntBigArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, valueBlock, timestampBlock); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, valueBlock, timestampBlock); + } + + @Override + public void close() { + } + }; + } + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, valueVector, timestampVector); + } + + @Override + public void add(int positionOffset, IntBigArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, valueVector, timestampVector); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, valueVector, timestampVector); + } + + @Override + public void close() { + } + }; + } + + private void addRawInput(int positionOffset, IntArrayBlock groups, BytesRefBlock valueBlock, + LongBlock timestampBlock) { + BytesRef valueScratch = new BytesRef(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int valuesPosition = groupPosition + positionOffset; + if (valueBlock.isNull(valuesPosition)) { + continue; + } + if (timestampBlock.isNull(valuesPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + int valueStart = valueBlock.getFirstValueIndex(valuesPosition); + int valueEnd = valueStart + valueBlock.getValueCount(valuesPosition); + for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { + BytesRef valueValue = valueBlock.getBytesRef(valueOffset, valueScratch); + int timestampStart = timestampBlock.getFirstValueIndex(valuesPosition); + int timestampEnd = timestampStart + timestampBlock.getValueCount(valuesPosition); + for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) { + long timestampValue = timestampBlock.getLong(timestampOffset); + LastBytesRefByTimestampAggregator.combine(state, groupId, valueValue, timestampValue); + } + } + } + } + } + + private void addRawInput(int positionOffset, IntArrayBlock groups, BytesRefVector valueVector, + LongVector timestampVector) { + BytesRef valueScratch = new BytesRef(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int valuesPosition = groupPosition + positionOffset; + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + BytesRef valueValue = valueVector.getBytesRef(valuesPosition, valueScratch); + long timestampValue = timestampVector.getLong(valuesPosition); + LastBytesRefByTimestampAggregator.combine(state, groupId, valueValue, timestampValue); + } + } + } + + @Override + public void addIntermediateInput(int positionOffset, IntArrayBlock groups, Page page) { + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + assert channels.size() == intermediateBlockCount(); + Block timestampsUncast = page.getBlock(channels.get(0)); + if (timestampsUncast.areAllValuesNull()) { + return; + } + LongBlock timestamps = (LongBlock) timestampsUncast; + Block valuesUncast = page.getBlock(channels.get(1)); + if (valuesUncast.areAllValuesNull()) { + return; + } + BytesRefBlock values = (BytesRefBlock) valuesUncast; + assert timestamps.getPositionCount() == values.getPositionCount(); + BytesRef scratch = new BytesRef(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + int valuesPosition = groupPosition + positionOffset; + LastBytesRefByTimestampAggregator.combineIntermediate(state, groupId, timestamps, values, valuesPosition); + } + } + } + + private void addRawInput(int positionOffset, IntBigArrayBlock groups, BytesRefBlock valueBlock, + LongBlock timestampBlock) { + BytesRef valueScratch = new BytesRef(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int valuesPosition = groupPosition + positionOffset; + if (valueBlock.isNull(valuesPosition)) { + continue; + } + if (timestampBlock.isNull(valuesPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + int valueStart = valueBlock.getFirstValueIndex(valuesPosition); + int valueEnd = valueStart + valueBlock.getValueCount(valuesPosition); + for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { + BytesRef valueValue = valueBlock.getBytesRef(valueOffset, valueScratch); + int timestampStart = timestampBlock.getFirstValueIndex(valuesPosition); + int timestampEnd = timestampStart + timestampBlock.getValueCount(valuesPosition); + for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) { + long timestampValue = timestampBlock.getLong(timestampOffset); + LastBytesRefByTimestampAggregator.combine(state, groupId, valueValue, timestampValue); + } + } + } + } + } + + private void addRawInput(int positionOffset, IntBigArrayBlock groups, BytesRefVector valueVector, + LongVector timestampVector) { + BytesRef valueScratch = new BytesRef(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int valuesPosition = groupPosition + positionOffset; + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + BytesRef valueValue = valueVector.getBytesRef(valuesPosition, valueScratch); + long timestampValue = timestampVector.getLong(valuesPosition); + LastBytesRefByTimestampAggregator.combine(state, groupId, valueValue, timestampValue); + } + } + } + + @Override + public void addIntermediateInput(int positionOffset, IntBigArrayBlock groups, Page page) { + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + assert channels.size() == intermediateBlockCount(); + Block timestampsUncast = page.getBlock(channels.get(0)); + if (timestampsUncast.areAllValuesNull()) { + return; + } + LongBlock timestamps = (LongBlock) timestampsUncast; + Block valuesUncast = page.getBlock(channels.get(1)); + if (valuesUncast.areAllValuesNull()) { + return; + } + BytesRefBlock values = (BytesRefBlock) valuesUncast; + assert timestamps.getPositionCount() == values.getPositionCount(); + BytesRef scratch = new BytesRef(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + int valuesPosition = groupPosition + positionOffset; + LastBytesRefByTimestampAggregator.combineIntermediate(state, groupId, timestamps, values, valuesPosition); + } + } + } + + private void addRawInput(int positionOffset, IntVector groups, BytesRefBlock valueBlock, + LongBlock timestampBlock) { + BytesRef valueScratch = new BytesRef(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int valuesPosition = groupPosition + positionOffset; + if (valueBlock.isNull(valuesPosition)) { + continue; + } + if (timestampBlock.isNull(valuesPosition)) { + continue; + } + int groupId = groups.getInt(groupPosition); + int valueStart = valueBlock.getFirstValueIndex(valuesPosition); + int valueEnd = valueStart + valueBlock.getValueCount(valuesPosition); + for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { + BytesRef valueValue = valueBlock.getBytesRef(valueOffset, valueScratch); + int timestampStart = timestampBlock.getFirstValueIndex(valuesPosition); + int timestampEnd = timestampStart + timestampBlock.getValueCount(valuesPosition); + for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) { + long timestampValue = timestampBlock.getLong(timestampOffset); + LastBytesRefByTimestampAggregator.combine(state, groupId, valueValue, timestampValue); + } + } + } + } + + private void addRawInput(int positionOffset, IntVector groups, BytesRefVector valueVector, + LongVector timestampVector) { + BytesRef valueScratch = new BytesRef(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int valuesPosition = groupPosition + positionOffset; + int groupId = groups.getInt(groupPosition); + BytesRef valueValue = valueVector.getBytesRef(valuesPosition, valueScratch); + long timestampValue = timestampVector.getLong(valuesPosition); + LastBytesRefByTimestampAggregator.combine(state, groupId, valueValue, timestampValue); + } + } + + @Override + public void addIntermediateInput(int positionOffset, IntVector groups, Page page) { + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + assert channels.size() == intermediateBlockCount(); + Block timestampsUncast = page.getBlock(channels.get(0)); + if (timestampsUncast.areAllValuesNull()) { + return; + } + LongBlock timestamps = (LongBlock) timestampsUncast; + Block valuesUncast = page.getBlock(channels.get(1)); + if (valuesUncast.areAllValuesNull()) { + return; + } + BytesRefBlock values = (BytesRefBlock) valuesUncast; + assert timestamps.getPositionCount() == values.getPositionCount(); + BytesRef scratch = new BytesRef(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + int valuesPosition = groupPosition + positionOffset; + LastBytesRefByTimestampAggregator.combineIntermediate(state, groupId, timestamps, values, valuesPosition); + } + } + + private void maybeEnableGroupIdTracking(SeenGroupIds seenGroupIds, BytesRefBlock valueBlock, + LongBlock timestampBlock) { + if (valueBlock.mayHaveNulls()) { + state.enableGroupIdTracking(seenGroupIds); + } + if (timestampBlock.mayHaveNulls()) { + state.enableGroupIdTracking(seenGroupIds); + } + } + + @Override + public void selectedMayContainUnseenGroups(SeenGroupIds seenGroupIds) { + state.enableGroupIdTracking(seenGroupIds); + } + + @Override + public void evaluateIntermediate(Block[] blocks, int offset, IntVector selected) { + state.toIntermediate(blocks, offset, selected, driverContext); + } + + @Override + public void evaluateFinal(Block[] blocks, int offset, IntVector selected, + GroupingAggregatorEvaluationContext ctx) { + blocks[offset] = LastBytesRefByTimestampAggregator.evaluateFinal(state, selected, ctx); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(getClass().getSimpleName()).append("["); + sb.append("channels=").append(channels); + sb.append("]"); + return sb.toString(); + } + + @Override + public void close() { + state.close(); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-2State.java.st b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-2State.java.st index 87cd4db893eec..fa17594b6db97 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-2State.java.st +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-2State.java.st @@ -7,8 +7,14 @@ package org.elasticsearch.compute.aggregation; +// begin generated imports +import org.apache.lucene.util.BytesRef; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.compute.operator.BreakingBytesRefBuilder; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.core.Releasables; +// end generated imports /** * Aggregator state for a single {@code $v1_type$} and a single {@code $v2_type$}. @@ -16,12 +22,21 @@ import org.elasticsearch.compute.operator.DriverContext; */ final class $v1_Type$$v2_Type$State implements AggregatorState { private $v1_type$ v1; + $if(v2_BytesRef)$ + private final BreakingBytesRefBuilder v2; + $else$ private $v2_type$ v2; + $endif$ private boolean seen; - $v1_Type$$v2_Type$State($v1_type$ v1, $v2_type$ v2) { + $v1_Type$$v2_Type$State($v1_type$ v1, $v2_type$ v2$if(v2_BytesRef)$, CircuitBreaker breaker, String label$endif$) { this.v1 = v1; + $if(v2_BytesRef)$ + this.v2 = new BreakingBytesRefBuilder(breaker, label, v2.length); + this.v2.copyBytes(v2); + $else$ this.v2 = v2; + $endif$ } $v1_type$ v1() { @@ -33,11 +48,15 @@ final class $v1_Type$$v2_Type$State implements AggregatorState { } $v2_type$ v2() { - return v2; + return v2$if(v2_BytesRef)$.bytesRefView()$endif$; } void v2($v2_type$ v2) { + $if(v2_BytesRef)$ + this.v2.copyBytes(v2); + $else$ this.v2 = v2; + $endif$ } boolean seen() { @@ -53,10 +72,16 @@ final class $v1_Type$$v2_Type$State implements AggregatorState { public void toIntermediate(Block[] blocks, int offset, DriverContext driverContext) { assert blocks.length >= offset + 3; blocks[offset + 0] = driverContext.blockFactory().newConstant$v1_Type$BlockWith(v1, 1); - blocks[offset + 1] = driverContext.blockFactory().newConstant$v2_Type$BlockWith(v2, 1); + blocks[offset + 1] = driverContext.blockFactory().newConstant$v2_Type$BlockWith(v2$if(v2_BytesRef)$.bytesRefView()$endif$, 1); blocks[offset + 2] = driverContext.blockFactory().newConstantBooleanBlockWith(seen, 1); } @Override + $if(v2_BytesRef)$ + public void close() { + Releasables.close(this.v2); + } + $else$ public void close() {} + $endif$ } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-ValueByTimestampAggregator.java.st b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-ValueByTimestampAggregator.java.st index 1dede7ac6de4e..f27bc00257518 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-ValueByTimestampAggregator.java.st +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-ValueByTimestampAggregator.java.st @@ -8,6 +8,10 @@ package org.elasticsearch.compute.aggregation; // begin generated imports +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.util.ObjectArray; +import org.elasticsearch.compute.operator.BreakingBytesRefBuilder; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.$Type$Array; import org.elasticsearch.common.util.LongArray; @@ -42,7 +46,11 @@ public class $Occurrence$$Type$ByTimestampAggregator { } public static Long$Type$State initSingle(DriverContext driverContext) { + $if(BytesRef)$ + return new LongBytesRefState(0, new BytesRef(), driverContext.breaker(), describe()); + $else$ return new Long$Type$State(0, 0); + $endif$ } public static void first(Long$Type$State current, $type$ value, long timestamp) { @@ -73,7 +81,11 @@ public class $Occurrence$$Type$ByTimestampAggregator { } public static GroupingState initGrouping(DriverContext driverContext) { + $if(BytesRef)$ + return new GroupingState(driverContext.bigArrays(), driverContext.breaker()); + $else$ return new GroupingState(driverContext.bigArrays()); + $endif$ } public static void combine(GroupingState current, int groupId, $type$ value, long timestamp) { @@ -92,8 +104,15 @@ public class $Occurrence$$Type$ByTimestampAggregator { if (valueCount > 0) { long timestamp = timestamps.getLong(timestamps.getFirstValueIndex(otherPosition)); int firstIndex = values.getFirstValueIndex(otherPosition); + $if(BytesRef)$ + BytesRef bytesScratch = new BytesRef(); + $endif$ for (int i = 0; i < valueCount; i++) { + $if(BytesRef)$ + current.collectValue(groupId, timestamp, values.getBytesRef(firstIndex + i, bytesScratch)); + $else$ current.collectValue(groupId, timestamp, values.get$Type$(firstIndex + i)); + $endif$ } } } @@ -105,18 +124,32 @@ public class $Occurrence$$Type$ByTimestampAggregator { public static final class GroupingState extends AbstractArrayState { private final BigArrays bigArrays; private LongArray timestamps; + $if(BytesRef)$ + private ObjectArray values; + $else$ private $Type$Array values; + $endif$ + $if(BytesRef)$ + private final CircuitBreaker breaker; + $endif$ private int maxGroupId = -1; - GroupingState(BigArrays bigArrays) { + GroupingState(BigArrays bigArrays$if(BytesRef)$, CircuitBreaker breaker$endif$) { super(bigArrays); this.bigArrays = bigArrays; boolean success = false; + $if(BytesRef)$ + this.breaker = breaker; + $endif$ LongArray timestamps = null; try { timestamps = bigArrays.newLongArray(1, false); this.timestamps = timestamps; + $if(BytesRef)$ + this.values = bigArrays.newObjectArray(1); + $else$ this.values = bigArrays.new$Type$Array(1, false); + $endif$ /* * Enable group id tracking because we use has hasValue in the * collection itself to detect the when a value first arrives. @@ -145,7 +178,16 @@ public class $Occurrence$$Type$ByTimestampAggregator { } if (updated) { values = bigArrays.grow(values, groupId + 1); + $if(BytesRef)$ + BreakingBytesRefBuilder builder = values.get(groupId); + if (builder == null) { + builder = new BreakingBytesRefBuilder(breaker, "$Occurrence$", value.length); + } + builder.copyBytes(value); + values.set(groupId, builder); + $else$ values.set(groupId, value); + $endif$ } maxGroupId = Math.max(maxGroupId, groupId); trackGroupId(groupId); @@ -153,6 +195,11 @@ public class $Occurrence$$Type$ByTimestampAggregator { @Override public void close() { + $if(BytesRef)$ + for (long i = 0; i < values.size(); i++) { + Releasables.close(values.get(i)); + } + $endif$ Releasables.close(timestamps, values, super::close); } @@ -166,7 +213,11 @@ public class $Occurrence$$Type$ByTimestampAggregator { int group = selected.getInt(p); if (group < timestamps.size() && hasValue(group)) { timestampsBuilder.appendLong(timestamps.get(group)); + $if(BytesRef)$ + valuesBuilder.append$Type$(values.get(group).bytesRefView()); + $else$ valuesBuilder.append$Type$(values.get(group)); + $endif$ } else { timestampsBuilder.appendNull(); valuesBuilder.appendNull(); @@ -182,7 +233,11 @@ public class $Occurrence$$Type$ByTimestampAggregator { for (int p = 0; p < selected.getPositionCount(); p++) { int group = selected.getInt(p); if (group < timestamps.size() && hasValue(group)) { + $if(BytesRef)$ + builder.append$Type$(values.get(group).bytesRefView()); + $else$ builder.append$Type$(values.get(group)); + $endif$ } else { builder.appendNull(); } diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/data/logs.csv b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/data/logs.csv index 2fd232a3ed33d..2fbcc2f4f4f78 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/data/logs.csv +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/data/logs.csv @@ -1,5 +1,8 @@ -@timestamp:date ,system:keyword,message:keyword +@timestamp:date ,system:keyword,message:text 2023-10-23T13:55:01.543Z, ping,Pinging 192.168.86.046 -2023-10-23T13:55:01.544Z, cron,Running cats +2023-10-23T13:55:01.544Z, cron,Running cats (cycle 1) 2023-10-23T13:55:01.545Z, java,Doing java stuff for 192.168.86.038 2023-10-23T13:55:01.546Z, java,More java stuff +2023-10-23T13:56:01.543Z, ping,No response +2023-10-23T13:56:01.544Z, cron,Running cats (cycle 2) +2023-10-23T13:57:01.544Z, cron,Running cats (cycle 3) diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/ip.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/ip.csv-spec index 351fec4ee7a46..afb5baefab7b8 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/ip.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/ip.csv-spec @@ -301,20 +301,20 @@ convertFromStringFancy required_capability: to_ip_leading_zeros FROM logs | KEEP @timestamp, system, message +| SORT @timestamp +| LIMIT 4 | EVAL client = CASE( system == "ping", TO_IP(REPLACE(message, "Pinging ", ""), {"leading_zeros": "octal"}), system == "java" AND STARTS_WITH(message, "Doing java stuff for "), TO_IP(REPLACE(message, "Doing java stuff for ", ""), {"leading_zeros": "decimal"})) -| SORT @timestamp -| LIMIT 4 ; -@timestamp:date |system:keyword|message:keyword |client:ip -2023-10-23T13:55:01.543Z| ping|Pinging 192.168.86.046 |192.168.86.38 -2023-10-23T13:55:01.544Z| cron|Running cats |null -2023-10-23T13:55:01.545Z| java|Doing java stuff for 192.168.86.038|192.168.86.38 -2023-10-23T13:55:01.546Z| java|More java stuff |null +@timestamp:datetime | system:keyword | message:text | client:ip +2023-10-23T13:55:01.543Z | ping | Pinging 192.168.86.046 | 192.168.86.38 +2023-10-23T13:55:01.544Z | cron | Running cats (cycle 1) | null +2023-10-23T13:55:01.545Z | java | Doing java stuff for 192.168.86.038 | 192.168.86.38 +2023-10-23T13:55:01.546Z | java | More java stuff | null ; toIpInAgg#[skip:-8.12.99,reason:StatsNestedExp introduced in v8.13.0] diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/mapping-logs.json b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/mapping-logs.json index 8b1ddb0e299db..779b899bc3195 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/mapping-logs.json +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/mapping-logs.json @@ -7,7 +7,7 @@ "type" : "keyword" }, "message" : { - "type" : "keyword" + "type" : "text" } } } diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/stats_first.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/stats_first.csv-spec index 4ebdd9ea98e2f..149aa668f11ed 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/stats_first.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/stats_first.csv-spec @@ -89,3 +89,54 @@ ROW v = ["null", "3", "2"] FIRST(v, @timestamp):integer 2 ; + + +first_keywords +required_capability: agg_first_last_string +FROM employees +| STATS + first_employee = FIRST(first_name, hire_date) +; + +first_employee:keyword +Sumant +; + +first_keywords_by_year +required_capability: agg_first_last_string +FROM employees +| EVAL year = date_trunc(1 year, hire_date) +| STATS + first_employee = FIRST(first_name, hire_date) BY year +| SORT year ASC +; + +first_employee:keyword | year:datetime +Sumant | 1985-01-01T00:00:00.000Z +Sanjiv | 1986-01-01T00:00:00.000Z +Claudi | 1987-01-01T00:00:00.000Z +Ramzi | 1988-01-01T00:00:00.000Z +Tzvetan | 1989-01-01T00:00:00.000Z +Parviz | 1990-01-01T00:00:00.000Z +Mayuko | 1991-01-01T00:00:00.000Z +Ebbe | 1992-01-01T00:00:00.000Z +Weiyi | 1993-01-01T00:00:00.000Z +Kenroku | 1994-01-01T00:00:00.000Z +Kazuhito | 1995-01-01T00:00:00.000Z +Sailaja | 1996-01-01T00:00:00.000Z +Suzette | 1997-01-01T00:00:00.000Z +Lillian | 1999-01-01T00:00:00.000Z +; + +first_text_by_system +required_capability: agg_first_last_string +FROM logs +| STATS message = FIRST(message, @timestamp) BY system +| SORT system ASC +; + +message:keyword | system:keyword +Running cats (cycle 1) | cron +Doing java stuff for 192.168.86.038 | java +Pinging 192.168.86.046 | ping +; diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/stats_last.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/stats_last.csv-spec index 2f82bb7f83ce0..5905f6f686dd9 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/stats_last.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/stats_last.csv-spec @@ -89,3 +89,54 @@ ROW v = ["null", "1", "2"] LAST(v, @timestamp):integer 2 ; + + +last_keywords +required_capability: agg_first_last_string +FROM employees +| STATS + last_employee = LAST(first_name, hire_date) +; + +last_employee:keyword +Lillian +; + +last_keywords_by_year +required_capability: agg_first_last_string +FROM employees +| EVAL year = date_trunc(1 year, hire_date) +| STATS + last_employee = LAST(first_name, hire_date) BY year +| SORT year ASC +; + +last_employee:keyword | year:datetime +Bezalel | 1985-01-01T00:00:00.000Z +Chirstian | 1986-01-01T00:00:00.000Z +Breannda | 1987-01-01T00:00:00.000Z +Valter | 1988-01-01T00:00:00.000Z +Bojan | 1989-01-01T00:00:00.000Z +Yinghua | 1990-01-01T00:00:00.000Z +Shir | 1991-01-01T00:00:00.000Z +Patricio | 1992-01-01T00:00:00.000Z +Cristinel | 1993-01-01T00:00:00.000Z +Saniya | 1994-01-01T00:00:00.000Z +Tuval | 1995-01-01T00:00:00.000Z +Sailaja | 1996-01-01T00:00:00.000Z +Suzette | 1997-01-01T00:00:00.000Z +Lillian | 1999-01-01T00:00:00.000Z +; + +last_text_by_system +required_capability: agg_first_last_string +FROM logs +| STATS message = LAST(message, @timestamp) BY system +| SORT system ASC +; + +message:keyword | system:keyword +Running cats (cycle 3) | cron +More java stuff | java +No response | ping +; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index b39135097f1f9..c3c4121b095f4 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -1365,6 +1365,7 @@ public enum Cap { * FIRST and LAST aggregate functions. */ AGG_FIRST_LAST(Build.current().isSnapshot()), + AGG_FIRST_LAST_STRING(Build.current().isSnapshot()), /** * Support correct counting of skipped shards. diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/First.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/First.java index 95ca792634ed9..b4add744fd005 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/First.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/First.java @@ -10,6 +10,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.compute.aggregation.AggregatorFunctionSupplier; +import org.elasticsearch.compute.aggregation.FirstBytesRefByTimestampAggregatorFunctionSupplier; import org.elasticsearch.compute.aggregation.FirstDoubleByTimestampAggregatorFunctionSupplier; import org.elasticsearch.compute.aggregation.FirstFloatByTimestampAggregatorFunctionSupplier; import org.elasticsearch.compute.aggregation.FirstIntByTimestampAggregatorFunctionSupplier; @@ -32,7 +33,7 @@ import java.io.IOException; import java.util.List; -import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.DEFAULT; +import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.FIRST; import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.SECOND; import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isType; @@ -44,14 +45,18 @@ public class First extends AggregateFunction implements ToAggregator { // TODO: support all types of values @FunctionInfo( type = FunctionType.AGGREGATE, - returnType = { "long", "integer", "double" }, + returnType = { "long", "integer", "double", "keyword" }, description = "The earliest value of a field.", appliesTo = @FunctionAppliesTo(lifeCycle = FunctionAppliesToLifecycle.UNAVAILABLE), examples = @Example(file = "stats_first", tag = "first") ) public First( Source source, - @Param(name = "value", type = { "long", "integer", "double" }, description = "Values to return") Expression field, + @Param( + name = "value", + type = { "long", "integer", "double", "keyword", "text" }, + description = "Values to return" + ) Expression field, @Param(name = "sort", type = { "date", "date_nanos" }, description = "Sort key") Expression sort ) { this(source, field, Literal.TRUE, sort); @@ -93,21 +98,37 @@ public First withFilter(Expression filter) { @Override public DataType dataType() { - return field().dataType(); + return field().dataType().noText(); } @Override protected TypeResolution resolveType() { - return isType(field(), dt -> dt.isNumeric() && dt != DataType.UNSIGNED_LONG, sourceText(), DEFAULT, "numeric except unsigned_long") - .and( - isType( - sort, - dt -> dt == DataType.LONG || dt == DataType.DATETIME || dt == DataType.DATE_NANOS, - sourceText(), - SECOND, - "long or date_nanos or datetime" - ) - ); + if (childrenResolved() == false) { + return new TypeResolution("Unresolved children"); + } + + return isType( + field(), + dt -> dt == DataType.BOOLEAN + || dt == DataType.DATETIME + || DataType.isString(dt) + || (dt.isNumeric() && dt != DataType.UNSIGNED_LONG), + sourceText(), + FIRST, + "boolean", + "date", + "ip", + "string", + "numeric except unsigned_long or counter types" + ).and( + isType( + sort, + dt -> dt == DataType.LONG || dt == DataType.DATETIME || dt == DataType.DATE_NANOS, + sourceText(), + SECOND, + "long or date_nanos or datetime" + ) + ); } @Override @@ -118,6 +139,7 @@ public AggregatorFunctionSupplier supplier() { case INTEGER -> new FirstIntByTimestampAggregatorFunctionSupplier(); case DOUBLE -> new FirstDoubleByTimestampAggregatorFunctionSupplier(); case FLOAT -> new FirstFloatByTimestampAggregatorFunctionSupplier(); + case KEYWORD, TEXT -> new FirstBytesRefByTimestampAggregatorFunctionSupplier(); default -> throw EsqlIllegalArgumentException.illegalDataType(type); }; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Last.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Last.java index d775ed089714e..d0245b435fd59 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Last.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Last.java @@ -10,6 +10,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.compute.aggregation.AggregatorFunctionSupplier; +import org.elasticsearch.compute.aggregation.LastBytesRefByTimestampAggregatorFunctionSupplier; import org.elasticsearch.compute.aggregation.LastDoubleByTimestampAggregatorFunctionSupplier; import org.elasticsearch.compute.aggregation.LastFloatByTimestampAggregatorFunctionSupplier; import org.elasticsearch.compute.aggregation.LastIntByTimestampAggregatorFunctionSupplier; @@ -32,7 +33,7 @@ import java.io.IOException; import java.util.List; -import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.DEFAULT; +import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.FIRST; import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.SECOND; import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isType; @@ -44,14 +45,18 @@ public class Last extends AggregateFunction implements ToAggregator { // TODO: support all types @FunctionInfo( type = FunctionType.AGGREGATE, - returnType = { "long", "integer", "double" }, + returnType = { "long", "integer", "double", "keyword" }, description = "The latest value of a field.", appliesTo = @FunctionAppliesTo(lifeCycle = FunctionAppliesToLifecycle.UNAVAILABLE), examples = @Example(file = "stats_last", tag = "last") ) public Last( Source source, - @Param(name = "value", type = { "long", "integer", "double" }, description = "Values to return") Expression field, + @Param( + name = "value", + type = { "long", "integer", "double", "keyword", "text" }, + description = "Values to return" + ) Expression field, @Param(name = "sort", type = { "date", "date_nanos" }, description = "Sort key") Expression sort ) { this(source, field, Literal.TRUE, sort); @@ -93,21 +98,33 @@ public Last withFilter(Expression filter) { @Override public DataType dataType() { - return field().dataType(); + return field().dataType().noText(); } @Override protected TypeResolution resolveType() { - return isType(field(), dt -> dt.isNumeric() && dt != DataType.UNSIGNED_LONG, sourceText(), DEFAULT, "numeric except unsigned_long") - .and( - isType( - sort, - dt -> dt == DataType.LONG || dt == DataType.DATETIME || dt == DataType.DATE_NANOS, - sourceText(), - SECOND, - "long or date_nanos or datetime" - ) - ); + return isType( + field(), + dt -> dt == DataType.BOOLEAN + || dt == DataType.DATETIME + || DataType.isString(dt) + || (dt.isNumeric() && dt != DataType.UNSIGNED_LONG), + sourceText(), + FIRST, + "boolean", + "date", + "ip", + "string", + "numeric except unsigned_long or counter types" + ).and( + isType( + sort, + dt -> dt == DataType.LONG || dt == DataType.DATETIME || dt == DataType.DATE_NANOS, + sourceText(), + SECOND, + "long or date_nanos or datetime" + ) + ); } @Override @@ -118,6 +135,7 @@ public AggregatorFunctionSupplier supplier() { case INTEGER -> new LastIntByTimestampAggregatorFunctionSupplier(); case DOUBLE -> new LastDoubleByTimestampAggregatorFunctionSupplier(); case FLOAT -> new LastFloatByTimestampAggregatorFunctionSupplier(); + case KEYWORD, TEXT -> new LastBytesRefByTimestampAggregatorFunctionSupplier(); default -> throw EsqlIllegalArgumentException.illegalDataType(type); }; } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/MultiRowTestCaseSupplier.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/MultiRowTestCaseSupplier.java index 1e86516133f76..677eb2389b3fc 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/MultiRowTestCaseSupplier.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/MultiRowTestCaseSupplier.java @@ -48,6 +48,7 @@ public static List unlimitedSuppliers(DataType type, int minR case LONG -> longCases(minRows, maxRows, Long.MIN_VALUE, Long.MAX_VALUE, true); case UNSIGNED_LONG -> ulongCases(minRows, maxRows, BigInteger.ZERO, UNSIGNED_LONG_MAX, true); case DOUBLE -> doubleCases(minRows, maxRows, -Double.MAX_VALUE, Double.MAX_VALUE, true); + case KEYWORD, TEXT -> stringCases(minRows, maxRows, type); // If a type is missing here it's safe to them as you need them default -> throw new IllegalArgumentException("unsupported type [" + type + "]"); }; diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/FirstTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/FirstTests.java index 1577ca2872b75..ae875cea3f72b 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/FirstTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/FirstTests.java @@ -37,7 +37,7 @@ public static Iterable parameters() { int rows = 1000; List suppliers = new ArrayList<>(); - for (DataType valueType : List.of(DataType.INTEGER, DataType.LONG, DataType.DOUBLE)) { + for (DataType valueType : List.of(DataType.INTEGER, DataType.LONG, DataType.DOUBLE, DataType.KEYWORD, DataType.TEXT)) { for (TestCaseSupplier.TypedDataSupplier valueSupplier : unlimitedSuppliers(valueType, rows, rows)) { for (DataType sortType : List.of(DataType.DATETIME, DataType.DATE_NANOS)) { for (TestCaseSupplier.TypedDataSupplier sortSupplier : unlimitedSuppliers(sortType, rows, rows)) { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/LastTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/LastTests.java index cd315d26ef4ef..abba5631373e1 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/LastTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/LastTests.java @@ -33,7 +33,7 @@ public static Iterable parameters() { int rows = 1000; List suppliers = new ArrayList<>(); - for (DataType valueType : List.of(DataType.INTEGER, DataType.LONG, DataType.DOUBLE)) { + for (DataType valueType : List.of(DataType.INTEGER, DataType.LONG, DataType.DOUBLE, DataType.KEYWORD, DataType.TEXT)) { for (TestCaseSupplier.TypedDataSupplier valueSupplier : unlimitedSuppliers(valueType, rows, rows)) { for (DataType sortType : List.of(DataType.DATETIME, DataType.DATE_NANOS)) { for (TestCaseSupplier.TypedDataSupplier sortSupplier : unlimitedSuppliers(sortType, rows, rows)) {