diff --git a/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/AggregatorImplementer.java b/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/AggregatorImplementer.java index 2edc567f6e744..0d59a0f6d28ed 100644 --- a/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/AggregatorImplementer.java +++ b/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/AggregatorImplementer.java @@ -36,6 +36,7 @@ import javax.lang.model.util.Elements; import static java.util.stream.Collectors.joining; +import static org.elasticsearch.compute.gen.Methods.optionalStaticMethod; import static org.elasticsearch.compute.gen.Methods.requireAnyArgs; import static org.elasticsearch.compute.gen.Methods.requireAnyType; import static org.elasticsearch.compute.gen.Methods.requireArgs; @@ -76,6 +77,7 @@ public class AggregatorImplementer { private final List warnExceptions; private final ExecutableElement init; private final ExecutableElement combine; + private final ExecutableElement first; private final List createParameters; private final ClassName implementation; private final List intermediateState; @@ -114,6 +116,18 @@ public AggregatorImplementer( .filter(f -> false == f.type().equals(BIG_ARRAYS) && false == f.type().equals(DRIVER_CONTEXT)) .toList(); + this.first = aggState.declaredType.isPrimitive() + ? null + : optionalStaticMethod( + declarationType, + requireVoidType(), + requireName("first"), + requireArgs(combine.getParameters().stream().map(p -> requireType(TypeName.get(p.asType()))).toArray(TypeMatcher[]::new)) + ); + if (this.aggState.hasSeen == false && this.first != null) { + throw new IllegalArgumentException("[first] method not supported without [seen] on agg state"); + } + this.implementation = ClassName.get( elements.getPackageOf(declarationType).toString(), (declarationType.getSimpleName() + "AggregatorFunction").replace("AggregatorAggregator", "Aggregator") @@ -339,10 +353,17 @@ private MethodSpec addRawVector(boolean masked) { builder.addComment("This type does not support vectors because all values are multi-valued"); return builder.build(); } + + if (first != null) { + builder.addComment("Find the first value up front in the Vector path which is more complex but should be faster"); + builder.addStatement("int valuesPosition = 0"); + addRawVectorWithFirst(builder, true, masked); + addRawVectorWithFirst(builder, false, masked); + return builder.build(); + } if (aggState.hasSeen()) { builder.addStatement("state.seen(true)"); } - builder.beginControlFlow( "for (int valuesPosition = 0; valuesPosition < $L.getPositionCount(); valuesPosition++)", aggParams.getFirst().vectorName() @@ -354,13 +375,39 @@ private MethodSpec addRawVector(boolean masked) { for (AggregationParameter p : aggParams) { p.read(builder, true); } - combineRawInput(builder); - + combineRawInput(builder, false); } builder.endControlFlow(); return builder.build(); } + private void addRawVectorWithFirst(MethodSpec.Builder builder, boolean firstPass, boolean masked) { + builder.beginControlFlow( + firstPass + ? "while (state.seen() == false && valuesPosition < $L.getPositionCount())" + : "while (valuesPosition < $L.getPositionCount())", + aggParams.getFirst().vectorName() + ); + { + if (masked) { + builder.beginControlFlow("if (mask.getBoolean(valuesPosition) == false)"); + builder.addStatement("valuesPosition++"); + builder.addStatement("continue"); + builder.endControlFlow(); + } + for (AggregationParameter p : aggParams) { + p.read(builder, true); + } + combineRawInput(builder, firstPass); + builder.addStatement("valuesPosition++"); + if (firstPass) { + builder.addStatement("state.seen(true)"); + builder.addStatement("break"); + } + } + builder.endControlFlow(); + } + private MethodSpec addRawBlock(boolean masked) { MethodSpec.Builder builder = initAddRaw(false, masked); @@ -374,9 +421,6 @@ private MethodSpec addRawBlock(boolean masked) { builder.addStatement("continue"); builder.endControlFlow(); } - if (aggState.hasSeen()) { - builder.addStatement("state.seen(true)"); - } if (aggParams.getFirst().isArray()) { if (aggParams.size() > 1) { @@ -399,6 +443,9 @@ private MethodSpec addRawBlock(boolean masked) { builder.endControlFlow(); combineRawInputForArray(builder, "valuesArray"); } else { + if (first == null && aggState.hasSeen()) { + builder.addStatement("state.seen(true)"); + } for (AggregationParameter p : aggParams) { builder.addStatement("int $L = $L.getFirstValueIndex(p)", p.startName(), p.blockName()); builder.addStatement("int $L = $L + $L.getValueCount(p)", p.endName(), p.startName(), p.blockName()); @@ -412,7 +459,21 @@ private MethodSpec addRawBlock(boolean masked) { ); p.read(builder, false); } - combineRawInput(builder); + if (first != null) { + builder.addComment("Check seen in every iteration to save on complexity in the Block path"); + builder.beginControlFlow("if (state.seen())"); + { + combineRawInput(builder, false); + } + builder.nextControlFlow("else"); + { + builder.addStatement("state.seen(true)"); + combineRawInput(builder, true); + } + builder.endControlFlow(); + } else { + combineRawInput(builder, false); + } for (AggregationParameter p : aggParams) { builder.endControlFlow(); } @@ -443,22 +504,26 @@ private MethodSpec.Builder initAddRaw(boolean valuesAreVector, boolean masked) { return builder; } - private void combineRawInput(MethodSpec.Builder builder) { + private void combineRawInput(MethodSpec.Builder builder, boolean useFirst) { TypeName returnType = TypeName.get(combine.getReturnType()); - warningsBlock(builder, () -> invokeCombineRawInput(returnType, builder)); + warningsBlock(builder, () -> invokeCombineRawInput(returnType, builder, useFirst)); } - private void invokeCombineRawInput(TypeName returnType, MethodSpec.Builder builder) { + private void invokeCombineRawInput(TypeName returnType, MethodSpec.Builder builder, boolean useFirst) { StringBuilder pattern = new StringBuilder(); List params = new ArrayList<>(); if (returnType.isPrimitive()) { + if (useFirst) { + throw new IllegalArgumentException("[first] not supported with primitive"); + } pattern.append("state.$TValue($T.combine(state.$TValue()"); params.add(returnType); params.add(declarationType); params.add(returnType); } else if (returnType == TypeName.VOID) { - pattern.append("$T.combine(state"); + pattern.append("$T.$L(state"); params.add(declarationType); + params.add(useFirst ? first.getSimpleName() : combine.getSimpleName()); } else { throw new IllegalArgumentException("combine must return void or a primitive"); } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstDoubleByTimestampAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstDoubleByTimestampAggregator.java index 93142168cad93..d0b2220c8ca11 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstDoubleByTimestampAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstDoubleByTimestampAggregator.java @@ -45,6 +45,11 @@ public static LongDoubleState initSingle(DriverContext driverContext) { return new LongDoubleState(0, 0); } + public static void first(LongDoubleState current, double value, long timestamp) { + current.v1(timestamp); + current.v2(value); + } + public static void combine(LongDoubleState current, double value, long timestamp) { if (timestamp < current.v1()) { current.v1(timestamp); @@ -54,8 +59,12 @@ public static void combine(LongDoubleState current, double value, long timestamp public static void combineIntermediate(LongDoubleState current, long timestamp, double value, boolean seen) { if (seen) { - current.seen(true); - combine(current, value, timestamp); + if (current.seen()) { + combine(current, value, timestamp); + } else { + first(current, value, timestamp); + current.seen(true); + } } } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstFloatByTimestampAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstFloatByTimestampAggregator.java index c0aae974ae00a..ade7d17d2ae47 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstFloatByTimestampAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstFloatByTimestampAggregator.java @@ -45,6 +45,11 @@ public static LongFloatState initSingle(DriverContext driverContext) { return new LongFloatState(0, 0); } + public static void first(LongFloatState current, float value, long timestamp) { + current.v1(timestamp); + current.v2(value); + } + public static void combine(LongFloatState current, float value, long timestamp) { if (timestamp < current.v1()) { current.v1(timestamp); @@ -54,8 +59,12 @@ public static void combine(LongFloatState current, float value, long timestamp) public static void combineIntermediate(LongFloatState current, long timestamp, float value, boolean seen) { if (seen) { - current.seen(true); - combine(current, value, timestamp); + if (current.seen()) { + combine(current, value, timestamp); + } else { + first(current, value, timestamp); + current.seen(true); + } } } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstIntByTimestampAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstIntByTimestampAggregator.java index 745b72b9b1dcb..89af438983a48 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstIntByTimestampAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstIntByTimestampAggregator.java @@ -45,6 +45,11 @@ public static LongIntState initSingle(DriverContext driverContext) { return new LongIntState(0, 0); } + public static void first(LongIntState current, int value, long timestamp) { + current.v1(timestamp); + current.v2(value); + } + public static void combine(LongIntState current, int value, long timestamp) { if (timestamp < current.v1()) { current.v1(timestamp); @@ -54,8 +59,12 @@ public static void combine(LongIntState current, int value, long timestamp) { public static void combineIntermediate(LongIntState current, long timestamp, int value, boolean seen) { if (seen) { - current.seen(true); - combine(current, value, timestamp); + if (current.seen()) { + combine(current, value, timestamp); + } else { + first(current, value, timestamp); + current.seen(true); + } } } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstLongByTimestampAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstLongByTimestampAggregator.java index 28a4335f6469f..96f378dd8ab2c 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstLongByTimestampAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstLongByTimestampAggregator.java @@ -45,6 +45,11 @@ public static LongLongState initSingle(DriverContext driverContext) { return new LongLongState(0, 0); } + public static void first(LongLongState current, long value, long timestamp) { + current.v1(timestamp); + current.v2(value); + } + public static void combine(LongLongState current, long value, long timestamp) { if (timestamp < current.v1()) { current.v1(timestamp); @@ -54,8 +59,12 @@ public static void combine(LongLongState current, long value, long timestamp) { public static void combineIntermediate(LongLongState current, long timestamp, long value, boolean seen) { if (seen) { - current.seen(true); - combine(current, value, timestamp); + if (current.seen()) { + combine(current, value, timestamp); + } else { + first(current, value, timestamp); + current.seen(true); + } } } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastDoubleByTimestampAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastDoubleByTimestampAggregator.java index 1e0b536e04929..cb37ffef8683b 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastDoubleByTimestampAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastDoubleByTimestampAggregator.java @@ -45,6 +45,11 @@ public static LongDoubleState initSingle(DriverContext driverContext) { return new LongDoubleState(0, 0); } + public static void first(LongDoubleState current, double value, long timestamp) { + current.v1(timestamp); + current.v2(value); + } + public static void combine(LongDoubleState current, double value, long timestamp) { if (timestamp > current.v1()) { current.v1(timestamp); @@ -54,8 +59,12 @@ public static void combine(LongDoubleState current, double value, long timestamp public static void combineIntermediate(LongDoubleState current, long timestamp, double value, boolean seen) { if (seen) { - current.seen(true); - combine(current, value, timestamp); + if (current.seen()) { + combine(current, value, timestamp); + } else { + first(current, value, timestamp); + current.seen(true); + } } } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastFloatByTimestampAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastFloatByTimestampAggregator.java index 32d1c4b818376..1010433a7b785 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastFloatByTimestampAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastFloatByTimestampAggregator.java @@ -45,6 +45,11 @@ public static LongFloatState initSingle(DriverContext driverContext) { return new LongFloatState(0, 0); } + public static void first(LongFloatState current, float value, long timestamp) { + current.v1(timestamp); + current.v2(value); + } + public static void combine(LongFloatState current, float value, long timestamp) { if (timestamp > current.v1()) { current.v1(timestamp); @@ -54,8 +59,12 @@ public static void combine(LongFloatState current, float value, long timestamp) public static void combineIntermediate(LongFloatState current, long timestamp, float value, boolean seen) { if (seen) { - current.seen(true); - combine(current, value, timestamp); + if (current.seen()) { + combine(current, value, timestamp); + } else { + first(current, value, timestamp); + current.seen(true); + } } } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastIntByTimestampAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastIntByTimestampAggregator.java index 6997b0d21bc8f..59f57281675d1 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastIntByTimestampAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastIntByTimestampAggregator.java @@ -45,6 +45,11 @@ public static LongIntState initSingle(DriverContext driverContext) { return new LongIntState(0, 0); } + public static void first(LongIntState current, int value, long timestamp) { + current.v1(timestamp); + current.v2(value); + } + public static void combine(LongIntState current, int value, long timestamp) { if (timestamp > current.v1()) { current.v1(timestamp); @@ -54,8 +59,12 @@ public static void combine(LongIntState current, int value, long timestamp) { public static void combineIntermediate(LongIntState current, long timestamp, int value, boolean seen) { if (seen) { - current.seen(true); - combine(current, value, timestamp); + if (current.seen()) { + combine(current, value, timestamp); + } else { + first(current, value, timestamp); + current.seen(true); + } } } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastLongByTimestampAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastLongByTimestampAggregator.java index cfc09c3f5670d..0c0118c321837 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastLongByTimestampAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastLongByTimestampAggregator.java @@ -45,6 +45,11 @@ public static LongLongState initSingle(DriverContext driverContext) { return new LongLongState(0, 0); } + public static void first(LongLongState current, long value, long timestamp) { + current.v1(timestamp); + current.v2(value); + } + public static void combine(LongLongState current, long value, long timestamp) { if (timestamp > current.v1()) { current.v1(timestamp); @@ -54,8 +59,12 @@ public static void combine(LongLongState current, long value, long timestamp) { public static void combineIntermediate(LongLongState current, long timestamp, long value, boolean seen) { if (seen) { - current.seen(true); - combine(current, value, timestamp); + if (current.seen()) { + combine(current, value, timestamp); + } else { + first(current, value, timestamp); + current.seen(true); + } } } diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstDoubleByTimestampAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstDoubleByTimestampAggregatorFunction.java index ca12dd6f36482..ebee4f6f6aeb2 100644 --- a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstDoubleByTimestampAggregatorFunction.java +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstDoubleByTimestampAggregatorFunction.java @@ -101,24 +101,49 @@ private void addRawInputNotMasked(Page page) { } private void addRawVector(DoubleVector valueVector, LongVector timestampVector) { - state.seen(true); - for (int valuesPosition = 0; valuesPosition < valueVector.getPositionCount(); valuesPosition++) { + // Find the first value up front in the Vector path which is more complex but should be faster + int valuesPosition = 0; + while (state.seen() == false && valuesPosition < valueVector.getPositionCount()) { + double valueValue = valueVector.getDouble(valuesPosition); + long timestampValue = timestampVector.getLong(valuesPosition); + FirstDoubleByTimestampAggregator.first(state, valueValue, timestampValue); + valuesPosition++; + state.seen(true); + break; + } + while (valuesPosition < valueVector.getPositionCount()) { double valueValue = valueVector.getDouble(valuesPosition); long timestampValue = timestampVector.getLong(valuesPosition); FirstDoubleByTimestampAggregator.combine(state, valueValue, timestampValue); + valuesPosition++; } } private void addRawVector(DoubleVector valueVector, LongVector timestampVector, BooleanVector mask) { - state.seen(true); - for (int valuesPosition = 0; valuesPosition < valueVector.getPositionCount(); valuesPosition++) { + // Find the first value up front in the Vector path which is more complex but should be faster + int valuesPosition = 0; + while (state.seen() == false && valuesPosition < valueVector.getPositionCount()) { if (mask.getBoolean(valuesPosition) == false) { + valuesPosition++; + continue; + } + double valueValue = valueVector.getDouble(valuesPosition); + long timestampValue = timestampVector.getLong(valuesPosition); + FirstDoubleByTimestampAggregator.first(state, valueValue, timestampValue); + valuesPosition++; + state.seen(true); + break; + } + while (valuesPosition < valueVector.getPositionCount()) { + if (mask.getBoolean(valuesPosition) == false) { + valuesPosition++; continue; } double valueValue = valueVector.getDouble(valuesPosition); long timestampValue = timestampVector.getLong(valuesPosition); FirstDoubleByTimestampAggregator.combine(state, valueValue, timestampValue); + valuesPosition++; } } @@ -130,7 +155,6 @@ private void addRawBlock(DoubleBlock valueBlock, LongBlock timestampBlock) { if (timestampBlock.isNull(p)) { continue; } - state.seen(true); int valueStart = valueBlock.getFirstValueIndex(p); int valueEnd = valueStart + valueBlock.getValueCount(p); for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { @@ -139,7 +163,13 @@ private void addRawBlock(DoubleBlock valueBlock, LongBlock timestampBlock) { int timestampEnd = timestampStart + timestampBlock.getValueCount(p); for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) { long timestampValue = timestampBlock.getLong(timestampOffset); - FirstDoubleByTimestampAggregator.combine(state, valueValue, timestampValue); + // Check seen in every iteration to save on complexity in the Block path + if (state.seen()) { + FirstDoubleByTimestampAggregator.combine(state, valueValue, timestampValue); + } else { + state.seen(true); + FirstDoubleByTimestampAggregator.first(state, valueValue, timestampValue); + } } } } @@ -156,7 +186,6 @@ private void addRawBlock(DoubleBlock valueBlock, LongBlock timestampBlock, Boole if (timestampBlock.isNull(p)) { continue; } - state.seen(true); int valueStart = valueBlock.getFirstValueIndex(p); int valueEnd = valueStart + valueBlock.getValueCount(p); for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { @@ -165,7 +194,13 @@ private void addRawBlock(DoubleBlock valueBlock, LongBlock timestampBlock, Boole int timestampEnd = timestampStart + timestampBlock.getValueCount(p); for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) { long timestampValue = timestampBlock.getLong(timestampOffset); - FirstDoubleByTimestampAggregator.combine(state, valueValue, timestampValue); + // Check seen in every iteration to save on complexity in the Block path + if (state.seen()) { + FirstDoubleByTimestampAggregator.combine(state, valueValue, timestampValue); + } else { + state.seen(true); + FirstDoubleByTimestampAggregator.first(state, valueValue, timestampValue); + } } } } diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstFloatByTimestampAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstFloatByTimestampAggregatorFunction.java index a843202c3992b..18972e8ba4cac 100644 --- a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstFloatByTimestampAggregatorFunction.java +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstFloatByTimestampAggregatorFunction.java @@ -101,24 +101,49 @@ private void addRawInputNotMasked(Page page) { } private void addRawVector(FloatVector valueVector, LongVector timestampVector) { - state.seen(true); - for (int valuesPosition = 0; valuesPosition < valueVector.getPositionCount(); valuesPosition++) { + // Find the first value up front in the Vector path which is more complex but should be faster + int valuesPosition = 0; + while (state.seen() == false && valuesPosition < valueVector.getPositionCount()) { + float valueValue = valueVector.getFloat(valuesPosition); + long timestampValue = timestampVector.getLong(valuesPosition); + FirstFloatByTimestampAggregator.first(state, valueValue, timestampValue); + valuesPosition++; + state.seen(true); + break; + } + while (valuesPosition < valueVector.getPositionCount()) { float valueValue = valueVector.getFloat(valuesPosition); long timestampValue = timestampVector.getLong(valuesPosition); FirstFloatByTimestampAggregator.combine(state, valueValue, timestampValue); + valuesPosition++; } } private void addRawVector(FloatVector valueVector, LongVector timestampVector, BooleanVector mask) { - state.seen(true); - for (int valuesPosition = 0; valuesPosition < valueVector.getPositionCount(); valuesPosition++) { + // Find the first value up front in the Vector path which is more complex but should be faster + int valuesPosition = 0; + while (state.seen() == false && valuesPosition < valueVector.getPositionCount()) { if (mask.getBoolean(valuesPosition) == false) { + valuesPosition++; + continue; + } + float valueValue = valueVector.getFloat(valuesPosition); + long timestampValue = timestampVector.getLong(valuesPosition); + FirstFloatByTimestampAggregator.first(state, valueValue, timestampValue); + valuesPosition++; + state.seen(true); + break; + } + while (valuesPosition < valueVector.getPositionCount()) { + if (mask.getBoolean(valuesPosition) == false) { + valuesPosition++; continue; } float valueValue = valueVector.getFloat(valuesPosition); long timestampValue = timestampVector.getLong(valuesPosition); FirstFloatByTimestampAggregator.combine(state, valueValue, timestampValue); + valuesPosition++; } } @@ -130,7 +155,6 @@ private void addRawBlock(FloatBlock valueBlock, LongBlock timestampBlock) { if (timestampBlock.isNull(p)) { continue; } - state.seen(true); int valueStart = valueBlock.getFirstValueIndex(p); int valueEnd = valueStart + valueBlock.getValueCount(p); for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { @@ -139,7 +163,13 @@ private void addRawBlock(FloatBlock valueBlock, LongBlock timestampBlock) { int timestampEnd = timestampStart + timestampBlock.getValueCount(p); for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) { long timestampValue = timestampBlock.getLong(timestampOffset); - FirstFloatByTimestampAggregator.combine(state, valueValue, timestampValue); + // Check seen in every iteration to save on complexity in the Block path + if (state.seen()) { + FirstFloatByTimestampAggregator.combine(state, valueValue, timestampValue); + } else { + state.seen(true); + FirstFloatByTimestampAggregator.first(state, valueValue, timestampValue); + } } } } @@ -156,7 +186,6 @@ private void addRawBlock(FloatBlock valueBlock, LongBlock timestampBlock, Boolea if (timestampBlock.isNull(p)) { continue; } - state.seen(true); int valueStart = valueBlock.getFirstValueIndex(p); int valueEnd = valueStart + valueBlock.getValueCount(p); for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { @@ -165,7 +194,13 @@ private void addRawBlock(FloatBlock valueBlock, LongBlock timestampBlock, Boolea int timestampEnd = timestampStart + timestampBlock.getValueCount(p); for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) { long timestampValue = timestampBlock.getLong(timestampOffset); - FirstFloatByTimestampAggregator.combine(state, valueValue, timestampValue); + // Check seen in every iteration to save on complexity in the Block path + if (state.seen()) { + FirstFloatByTimestampAggregator.combine(state, valueValue, timestampValue); + } else { + state.seen(true); + FirstFloatByTimestampAggregator.first(state, valueValue, timestampValue); + } } } } diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstIntByTimestampAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstIntByTimestampAggregatorFunction.java index bafe723529dd2..2aafbb878f619 100644 --- a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstIntByTimestampAggregatorFunction.java +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstIntByTimestampAggregatorFunction.java @@ -101,23 +101,48 @@ private void addRawInputNotMasked(Page page) { } private void addRawVector(IntVector valueVector, LongVector timestampVector) { - state.seen(true); - for (int valuesPosition = 0; valuesPosition < valueVector.getPositionCount(); valuesPosition++) { + // Find the first value up front in the Vector path which is more complex but should be faster + int valuesPosition = 0; + while (state.seen() == false && valuesPosition < valueVector.getPositionCount()) { + int valueValue = valueVector.getInt(valuesPosition); + long timestampValue = timestampVector.getLong(valuesPosition); + FirstIntByTimestampAggregator.first(state, valueValue, timestampValue); + valuesPosition++; + state.seen(true); + break; + } + while (valuesPosition < valueVector.getPositionCount()) { int valueValue = valueVector.getInt(valuesPosition); long timestampValue = timestampVector.getLong(valuesPosition); FirstIntByTimestampAggregator.combine(state, valueValue, timestampValue); + valuesPosition++; } } private void addRawVector(IntVector valueVector, LongVector timestampVector, BooleanVector mask) { - state.seen(true); - for (int valuesPosition = 0; valuesPosition < valueVector.getPositionCount(); valuesPosition++) { + // Find the first value up front in the Vector path which is more complex but should be faster + int valuesPosition = 0; + while (state.seen() == false && valuesPosition < valueVector.getPositionCount()) { if (mask.getBoolean(valuesPosition) == false) { + valuesPosition++; + continue; + } + int valueValue = valueVector.getInt(valuesPosition); + long timestampValue = timestampVector.getLong(valuesPosition); + FirstIntByTimestampAggregator.first(state, valueValue, timestampValue); + valuesPosition++; + state.seen(true); + break; + } + while (valuesPosition < valueVector.getPositionCount()) { + if (mask.getBoolean(valuesPosition) == false) { + valuesPosition++; continue; } int valueValue = valueVector.getInt(valuesPosition); long timestampValue = timestampVector.getLong(valuesPosition); FirstIntByTimestampAggregator.combine(state, valueValue, timestampValue); + valuesPosition++; } } @@ -129,7 +154,6 @@ private void addRawBlock(IntBlock valueBlock, LongBlock timestampBlock) { if (timestampBlock.isNull(p)) { continue; } - state.seen(true); int valueStart = valueBlock.getFirstValueIndex(p); int valueEnd = valueStart + valueBlock.getValueCount(p); for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { @@ -138,7 +162,13 @@ private void addRawBlock(IntBlock valueBlock, LongBlock timestampBlock) { int timestampEnd = timestampStart + timestampBlock.getValueCount(p); for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) { long timestampValue = timestampBlock.getLong(timestampOffset); - FirstIntByTimestampAggregator.combine(state, valueValue, timestampValue); + // Check seen in every iteration to save on complexity in the Block path + if (state.seen()) { + FirstIntByTimestampAggregator.combine(state, valueValue, timestampValue); + } else { + state.seen(true); + FirstIntByTimestampAggregator.first(state, valueValue, timestampValue); + } } } } @@ -155,7 +185,6 @@ private void addRawBlock(IntBlock valueBlock, LongBlock timestampBlock, BooleanV if (timestampBlock.isNull(p)) { continue; } - state.seen(true); int valueStart = valueBlock.getFirstValueIndex(p); int valueEnd = valueStart + valueBlock.getValueCount(p); for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { @@ -164,7 +193,13 @@ private void addRawBlock(IntBlock valueBlock, LongBlock timestampBlock, BooleanV int timestampEnd = timestampStart + timestampBlock.getValueCount(p); for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) { long timestampValue = timestampBlock.getLong(timestampOffset); - FirstIntByTimestampAggregator.combine(state, valueValue, timestampValue); + // Check seen in every iteration to save on complexity in the Block path + if (state.seen()) { + FirstIntByTimestampAggregator.combine(state, valueValue, timestampValue); + } else { + state.seen(true); + FirstIntByTimestampAggregator.first(state, valueValue, timestampValue); + } } } } diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstLongByTimestampAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstLongByTimestampAggregatorFunction.java index 2916b1582c4b8..453af7d513f40 100644 --- a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstLongByTimestampAggregatorFunction.java +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstLongByTimestampAggregatorFunction.java @@ -99,24 +99,49 @@ private void addRawInputNotMasked(Page page) { } private void addRawVector(LongVector valueVector, LongVector timestampVector) { - state.seen(true); - for (int valuesPosition = 0; valuesPosition < valueVector.getPositionCount(); valuesPosition++) { + // Find the first value up front in the Vector path which is more complex but should be faster + int valuesPosition = 0; + while (state.seen() == false && valuesPosition < valueVector.getPositionCount()) { + long valueValue = valueVector.getLong(valuesPosition); + long timestampValue = timestampVector.getLong(valuesPosition); + FirstLongByTimestampAggregator.first(state, valueValue, timestampValue); + valuesPosition++; + state.seen(true); + break; + } + while (valuesPosition < valueVector.getPositionCount()) { long valueValue = valueVector.getLong(valuesPosition); long timestampValue = timestampVector.getLong(valuesPosition); FirstLongByTimestampAggregator.combine(state, valueValue, timestampValue); + valuesPosition++; } } private void addRawVector(LongVector valueVector, LongVector timestampVector, BooleanVector mask) { - state.seen(true); - for (int valuesPosition = 0; valuesPosition < valueVector.getPositionCount(); valuesPosition++) { + // Find the first value up front in the Vector path which is more complex but should be faster + int valuesPosition = 0; + while (state.seen() == false && valuesPosition < valueVector.getPositionCount()) { if (mask.getBoolean(valuesPosition) == false) { + valuesPosition++; + continue; + } + long valueValue = valueVector.getLong(valuesPosition); + long timestampValue = timestampVector.getLong(valuesPosition); + FirstLongByTimestampAggregator.first(state, valueValue, timestampValue); + valuesPosition++; + state.seen(true); + break; + } + while (valuesPosition < valueVector.getPositionCount()) { + if (mask.getBoolean(valuesPosition) == false) { + valuesPosition++; continue; } long valueValue = valueVector.getLong(valuesPosition); long timestampValue = timestampVector.getLong(valuesPosition); FirstLongByTimestampAggregator.combine(state, valueValue, timestampValue); + valuesPosition++; } } @@ -128,7 +153,6 @@ private void addRawBlock(LongBlock valueBlock, LongBlock timestampBlock) { if (timestampBlock.isNull(p)) { continue; } - state.seen(true); int valueStart = valueBlock.getFirstValueIndex(p); int valueEnd = valueStart + valueBlock.getValueCount(p); for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { @@ -137,7 +161,13 @@ private void addRawBlock(LongBlock valueBlock, LongBlock timestampBlock) { int timestampEnd = timestampStart + timestampBlock.getValueCount(p); for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) { long timestampValue = timestampBlock.getLong(timestampOffset); - FirstLongByTimestampAggregator.combine(state, valueValue, timestampValue); + // Check seen in every iteration to save on complexity in the Block path + if (state.seen()) { + FirstLongByTimestampAggregator.combine(state, valueValue, timestampValue); + } else { + state.seen(true); + FirstLongByTimestampAggregator.first(state, valueValue, timestampValue); + } } } } @@ -154,7 +184,6 @@ private void addRawBlock(LongBlock valueBlock, LongBlock timestampBlock, Boolean if (timestampBlock.isNull(p)) { continue; } - state.seen(true); int valueStart = valueBlock.getFirstValueIndex(p); int valueEnd = valueStart + valueBlock.getValueCount(p); for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { @@ -163,7 +192,13 @@ private void addRawBlock(LongBlock valueBlock, LongBlock timestampBlock, Boolean int timestampEnd = timestampStart + timestampBlock.getValueCount(p); for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) { long timestampValue = timestampBlock.getLong(timestampOffset); - FirstLongByTimestampAggregator.combine(state, valueValue, timestampValue); + // Check seen in every iteration to save on complexity in the Block path + if (state.seen()) { + FirstLongByTimestampAggregator.combine(state, valueValue, timestampValue); + } else { + state.seen(true); + FirstLongByTimestampAggregator.first(state, valueValue, timestampValue); + } } } } diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastDoubleByTimestampAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastDoubleByTimestampAggregatorFunction.java index f8ebe1efcbc05..71e21d128d987 100644 --- a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastDoubleByTimestampAggregatorFunction.java +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastDoubleByTimestampAggregatorFunction.java @@ -101,24 +101,49 @@ private void addRawInputNotMasked(Page page) { } private void addRawVector(DoubleVector valueVector, LongVector timestampVector) { - state.seen(true); - for (int valuesPosition = 0; valuesPosition < valueVector.getPositionCount(); valuesPosition++) { + // Find the first value up front in the Vector path which is more complex but should be faster + int valuesPosition = 0; + while (state.seen() == false && valuesPosition < valueVector.getPositionCount()) { + double valueValue = valueVector.getDouble(valuesPosition); + long timestampValue = timestampVector.getLong(valuesPosition); + LastDoubleByTimestampAggregator.first(state, valueValue, timestampValue); + valuesPosition++; + state.seen(true); + break; + } + while (valuesPosition < valueVector.getPositionCount()) { double valueValue = valueVector.getDouble(valuesPosition); long timestampValue = timestampVector.getLong(valuesPosition); LastDoubleByTimestampAggregator.combine(state, valueValue, timestampValue); + valuesPosition++; } } private void addRawVector(DoubleVector valueVector, LongVector timestampVector, BooleanVector mask) { - state.seen(true); - for (int valuesPosition = 0; valuesPosition < valueVector.getPositionCount(); valuesPosition++) { + // Find the first value up front in the Vector path which is more complex but should be faster + int valuesPosition = 0; + while (state.seen() == false && valuesPosition < valueVector.getPositionCount()) { if (mask.getBoolean(valuesPosition) == false) { + valuesPosition++; + continue; + } + double valueValue = valueVector.getDouble(valuesPosition); + long timestampValue = timestampVector.getLong(valuesPosition); + LastDoubleByTimestampAggregator.first(state, valueValue, timestampValue); + valuesPosition++; + state.seen(true); + break; + } + while (valuesPosition < valueVector.getPositionCount()) { + if (mask.getBoolean(valuesPosition) == false) { + valuesPosition++; continue; } double valueValue = valueVector.getDouble(valuesPosition); long timestampValue = timestampVector.getLong(valuesPosition); LastDoubleByTimestampAggregator.combine(state, valueValue, timestampValue); + valuesPosition++; } } @@ -130,7 +155,6 @@ private void addRawBlock(DoubleBlock valueBlock, LongBlock timestampBlock) { if (timestampBlock.isNull(p)) { continue; } - state.seen(true); int valueStart = valueBlock.getFirstValueIndex(p); int valueEnd = valueStart + valueBlock.getValueCount(p); for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { @@ -139,7 +163,13 @@ private void addRawBlock(DoubleBlock valueBlock, LongBlock timestampBlock) { int timestampEnd = timestampStart + timestampBlock.getValueCount(p); for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) { long timestampValue = timestampBlock.getLong(timestampOffset); - LastDoubleByTimestampAggregator.combine(state, valueValue, timestampValue); + // Check seen in every iteration to save on complexity in the Block path + if (state.seen()) { + LastDoubleByTimestampAggregator.combine(state, valueValue, timestampValue); + } else { + state.seen(true); + LastDoubleByTimestampAggregator.first(state, valueValue, timestampValue); + } } } } @@ -156,7 +186,6 @@ private void addRawBlock(DoubleBlock valueBlock, LongBlock timestampBlock, Boole if (timestampBlock.isNull(p)) { continue; } - state.seen(true); int valueStart = valueBlock.getFirstValueIndex(p); int valueEnd = valueStart + valueBlock.getValueCount(p); for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { @@ -165,7 +194,13 @@ private void addRawBlock(DoubleBlock valueBlock, LongBlock timestampBlock, Boole int timestampEnd = timestampStart + timestampBlock.getValueCount(p); for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) { long timestampValue = timestampBlock.getLong(timestampOffset); - LastDoubleByTimestampAggregator.combine(state, valueValue, timestampValue); + // Check seen in every iteration to save on complexity in the Block path + if (state.seen()) { + LastDoubleByTimestampAggregator.combine(state, valueValue, timestampValue); + } else { + state.seen(true); + LastDoubleByTimestampAggregator.first(state, valueValue, timestampValue); + } } } } diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastFloatByTimestampAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastFloatByTimestampAggregatorFunction.java index bcee94964dd9a..b5899735dee00 100644 --- a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastFloatByTimestampAggregatorFunction.java +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastFloatByTimestampAggregatorFunction.java @@ -101,24 +101,49 @@ private void addRawInputNotMasked(Page page) { } private void addRawVector(FloatVector valueVector, LongVector timestampVector) { - state.seen(true); - for (int valuesPosition = 0; valuesPosition < valueVector.getPositionCount(); valuesPosition++) { + // Find the first value up front in the Vector path which is more complex but should be faster + int valuesPosition = 0; + while (state.seen() == false && valuesPosition < valueVector.getPositionCount()) { + float valueValue = valueVector.getFloat(valuesPosition); + long timestampValue = timestampVector.getLong(valuesPosition); + LastFloatByTimestampAggregator.first(state, valueValue, timestampValue); + valuesPosition++; + state.seen(true); + break; + } + while (valuesPosition < valueVector.getPositionCount()) { float valueValue = valueVector.getFloat(valuesPosition); long timestampValue = timestampVector.getLong(valuesPosition); LastFloatByTimestampAggregator.combine(state, valueValue, timestampValue); + valuesPosition++; } } private void addRawVector(FloatVector valueVector, LongVector timestampVector, BooleanVector mask) { - state.seen(true); - for (int valuesPosition = 0; valuesPosition < valueVector.getPositionCount(); valuesPosition++) { + // Find the first value up front in the Vector path which is more complex but should be faster + int valuesPosition = 0; + while (state.seen() == false && valuesPosition < valueVector.getPositionCount()) { if (mask.getBoolean(valuesPosition) == false) { + valuesPosition++; + continue; + } + float valueValue = valueVector.getFloat(valuesPosition); + long timestampValue = timestampVector.getLong(valuesPosition); + LastFloatByTimestampAggregator.first(state, valueValue, timestampValue); + valuesPosition++; + state.seen(true); + break; + } + while (valuesPosition < valueVector.getPositionCount()) { + if (mask.getBoolean(valuesPosition) == false) { + valuesPosition++; continue; } float valueValue = valueVector.getFloat(valuesPosition); long timestampValue = timestampVector.getLong(valuesPosition); LastFloatByTimestampAggregator.combine(state, valueValue, timestampValue); + valuesPosition++; } } @@ -130,7 +155,6 @@ private void addRawBlock(FloatBlock valueBlock, LongBlock timestampBlock) { if (timestampBlock.isNull(p)) { continue; } - state.seen(true); int valueStart = valueBlock.getFirstValueIndex(p); int valueEnd = valueStart + valueBlock.getValueCount(p); for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { @@ -139,7 +163,13 @@ private void addRawBlock(FloatBlock valueBlock, LongBlock timestampBlock) { int timestampEnd = timestampStart + timestampBlock.getValueCount(p); for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) { long timestampValue = timestampBlock.getLong(timestampOffset); - LastFloatByTimestampAggregator.combine(state, valueValue, timestampValue); + // Check seen in every iteration to save on complexity in the Block path + if (state.seen()) { + LastFloatByTimestampAggregator.combine(state, valueValue, timestampValue); + } else { + state.seen(true); + LastFloatByTimestampAggregator.first(state, valueValue, timestampValue); + } } } } @@ -156,7 +186,6 @@ private void addRawBlock(FloatBlock valueBlock, LongBlock timestampBlock, Boolea if (timestampBlock.isNull(p)) { continue; } - state.seen(true); int valueStart = valueBlock.getFirstValueIndex(p); int valueEnd = valueStart + valueBlock.getValueCount(p); for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { @@ -165,7 +194,13 @@ private void addRawBlock(FloatBlock valueBlock, LongBlock timestampBlock, Boolea int timestampEnd = timestampStart + timestampBlock.getValueCount(p); for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) { long timestampValue = timestampBlock.getLong(timestampOffset); - LastFloatByTimestampAggregator.combine(state, valueValue, timestampValue); + // Check seen in every iteration to save on complexity in the Block path + if (state.seen()) { + LastFloatByTimestampAggregator.combine(state, valueValue, timestampValue); + } else { + state.seen(true); + LastFloatByTimestampAggregator.first(state, valueValue, timestampValue); + } } } } diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastIntByTimestampAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastIntByTimestampAggregatorFunction.java index a8bea89bc3c19..97caf71d60a53 100644 --- a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastIntByTimestampAggregatorFunction.java +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastIntByTimestampAggregatorFunction.java @@ -101,23 +101,48 @@ private void addRawInputNotMasked(Page page) { } private void addRawVector(IntVector valueVector, LongVector timestampVector) { - state.seen(true); - for (int valuesPosition = 0; valuesPosition < valueVector.getPositionCount(); valuesPosition++) { + // Find the first value up front in the Vector path which is more complex but should be faster + int valuesPosition = 0; + while (state.seen() == false && valuesPosition < valueVector.getPositionCount()) { + int valueValue = valueVector.getInt(valuesPosition); + long timestampValue = timestampVector.getLong(valuesPosition); + LastIntByTimestampAggregator.first(state, valueValue, timestampValue); + valuesPosition++; + state.seen(true); + break; + } + while (valuesPosition < valueVector.getPositionCount()) { int valueValue = valueVector.getInt(valuesPosition); long timestampValue = timestampVector.getLong(valuesPosition); LastIntByTimestampAggregator.combine(state, valueValue, timestampValue); + valuesPosition++; } } private void addRawVector(IntVector valueVector, LongVector timestampVector, BooleanVector mask) { - state.seen(true); - for (int valuesPosition = 0; valuesPosition < valueVector.getPositionCount(); valuesPosition++) { + // Find the first value up front in the Vector path which is more complex but should be faster + int valuesPosition = 0; + while (state.seen() == false && valuesPosition < valueVector.getPositionCount()) { if (mask.getBoolean(valuesPosition) == false) { + valuesPosition++; + continue; + } + int valueValue = valueVector.getInt(valuesPosition); + long timestampValue = timestampVector.getLong(valuesPosition); + LastIntByTimestampAggregator.first(state, valueValue, timestampValue); + valuesPosition++; + state.seen(true); + break; + } + while (valuesPosition < valueVector.getPositionCount()) { + if (mask.getBoolean(valuesPosition) == false) { + valuesPosition++; continue; } int valueValue = valueVector.getInt(valuesPosition); long timestampValue = timestampVector.getLong(valuesPosition); LastIntByTimestampAggregator.combine(state, valueValue, timestampValue); + valuesPosition++; } } @@ -129,7 +154,6 @@ private void addRawBlock(IntBlock valueBlock, LongBlock timestampBlock) { if (timestampBlock.isNull(p)) { continue; } - state.seen(true); int valueStart = valueBlock.getFirstValueIndex(p); int valueEnd = valueStart + valueBlock.getValueCount(p); for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { @@ -138,7 +162,13 @@ private void addRawBlock(IntBlock valueBlock, LongBlock timestampBlock) { int timestampEnd = timestampStart + timestampBlock.getValueCount(p); for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) { long timestampValue = timestampBlock.getLong(timestampOffset); - LastIntByTimestampAggregator.combine(state, valueValue, timestampValue); + // Check seen in every iteration to save on complexity in the Block path + if (state.seen()) { + LastIntByTimestampAggregator.combine(state, valueValue, timestampValue); + } else { + state.seen(true); + LastIntByTimestampAggregator.first(state, valueValue, timestampValue); + } } } } @@ -155,7 +185,6 @@ private void addRawBlock(IntBlock valueBlock, LongBlock timestampBlock, BooleanV if (timestampBlock.isNull(p)) { continue; } - state.seen(true); int valueStart = valueBlock.getFirstValueIndex(p); int valueEnd = valueStart + valueBlock.getValueCount(p); for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { @@ -164,7 +193,13 @@ private void addRawBlock(IntBlock valueBlock, LongBlock timestampBlock, BooleanV int timestampEnd = timestampStart + timestampBlock.getValueCount(p); for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) { long timestampValue = timestampBlock.getLong(timestampOffset); - LastIntByTimestampAggregator.combine(state, valueValue, timestampValue); + // Check seen in every iteration to save on complexity in the Block path + if (state.seen()) { + LastIntByTimestampAggregator.combine(state, valueValue, timestampValue); + } else { + state.seen(true); + LastIntByTimestampAggregator.first(state, valueValue, timestampValue); + } } } } diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastLongByTimestampAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastLongByTimestampAggregatorFunction.java index cc7c96cca77ca..da40f91d43b8f 100644 --- a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastLongByTimestampAggregatorFunction.java +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastLongByTimestampAggregatorFunction.java @@ -99,24 +99,49 @@ private void addRawInputNotMasked(Page page) { } private void addRawVector(LongVector valueVector, LongVector timestampVector) { - state.seen(true); - for (int valuesPosition = 0; valuesPosition < valueVector.getPositionCount(); valuesPosition++) { + // Find the first value up front in the Vector path which is more complex but should be faster + int valuesPosition = 0; + while (state.seen() == false && valuesPosition < valueVector.getPositionCount()) { + long valueValue = valueVector.getLong(valuesPosition); + long timestampValue = timestampVector.getLong(valuesPosition); + LastLongByTimestampAggregator.first(state, valueValue, timestampValue); + valuesPosition++; + state.seen(true); + break; + } + while (valuesPosition < valueVector.getPositionCount()) { long valueValue = valueVector.getLong(valuesPosition); long timestampValue = timestampVector.getLong(valuesPosition); LastLongByTimestampAggregator.combine(state, valueValue, timestampValue); + valuesPosition++; } } private void addRawVector(LongVector valueVector, LongVector timestampVector, BooleanVector mask) { - state.seen(true); - for (int valuesPosition = 0; valuesPosition < valueVector.getPositionCount(); valuesPosition++) { + // Find the first value up front in the Vector path which is more complex but should be faster + int valuesPosition = 0; + while (state.seen() == false && valuesPosition < valueVector.getPositionCount()) { if (mask.getBoolean(valuesPosition) == false) { + valuesPosition++; + continue; + } + long valueValue = valueVector.getLong(valuesPosition); + long timestampValue = timestampVector.getLong(valuesPosition); + LastLongByTimestampAggregator.first(state, valueValue, timestampValue); + valuesPosition++; + state.seen(true); + break; + } + while (valuesPosition < valueVector.getPositionCount()) { + if (mask.getBoolean(valuesPosition) == false) { + valuesPosition++; continue; } long valueValue = valueVector.getLong(valuesPosition); long timestampValue = timestampVector.getLong(valuesPosition); LastLongByTimestampAggregator.combine(state, valueValue, timestampValue); + valuesPosition++; } } @@ -128,7 +153,6 @@ private void addRawBlock(LongBlock valueBlock, LongBlock timestampBlock) { if (timestampBlock.isNull(p)) { continue; } - state.seen(true); int valueStart = valueBlock.getFirstValueIndex(p); int valueEnd = valueStart + valueBlock.getValueCount(p); for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { @@ -137,7 +161,13 @@ private void addRawBlock(LongBlock valueBlock, LongBlock timestampBlock) { int timestampEnd = timestampStart + timestampBlock.getValueCount(p); for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) { long timestampValue = timestampBlock.getLong(timestampOffset); - LastLongByTimestampAggregator.combine(state, valueValue, timestampValue); + // Check seen in every iteration to save on complexity in the Block path + if (state.seen()) { + LastLongByTimestampAggregator.combine(state, valueValue, timestampValue); + } else { + state.seen(true); + LastLongByTimestampAggregator.first(state, valueValue, timestampValue); + } } } } @@ -154,7 +184,6 @@ private void addRawBlock(LongBlock valueBlock, LongBlock timestampBlock, Boolean if (timestampBlock.isNull(p)) { continue; } - state.seen(true); int valueStart = valueBlock.getFirstValueIndex(p); int valueEnd = valueStart + valueBlock.getValueCount(p); for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { @@ -163,7 +192,13 @@ private void addRawBlock(LongBlock valueBlock, LongBlock timestampBlock, Boolean int timestampEnd = timestampStart + timestampBlock.getValueCount(p); for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) { long timestampValue = timestampBlock.getLong(timestampOffset); - LastLongByTimestampAggregator.combine(state, valueValue, timestampValue); + // Check seen in every iteration to save on complexity in the Block path + if (state.seen()) { + LastLongByTimestampAggregator.combine(state, valueValue, timestampValue); + } else { + state.seen(true); + LastLongByTimestampAggregator.first(state, valueValue, timestampValue); + } } } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-ValueByTimestampAggregator.java.st b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-ValueByTimestampAggregator.java.st index ad1e383f4e482..1dede7ac6de4e 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-ValueByTimestampAggregator.java.st +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-ValueByTimestampAggregator.java.st @@ -45,6 +45,11 @@ public class $Occurrence$$Type$ByTimestampAggregator { return new Long$Type$State(0, 0); } + public static void first(Long$Type$State current, $type$ value, long timestamp) { + current.v1(timestamp); + current.v2(value); + } + public static void combine(Long$Type$State current, $type$ value, long timestamp) { if (timestamp $if(First)$<$else$>$endif$ current.v1()) { current.v1(timestamp); @@ -54,8 +59,12 @@ public class $Occurrence$$Type$ByTimestampAggregator { public static void combineIntermediate(Long$Type$State current, long timestamp, $type$ value, boolean seen) { if (seen) { - current.seen(true); - combine(current, value, timestamp); + if (current.seen()) { + combine(current, value, timestamp); + } else { + first(current, value, timestamp); + current.seen(true); + } } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FirstDoubleByTimestampAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FirstDoubleByTimestampAggregatorFunctionTests.java index 8f8d965aba8d3..89bf9e077e052 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FirstDoubleByTimestampAggregatorFunctionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FirstDoubleByTimestampAggregatorFunctionTests.java @@ -8,6 +8,7 @@ package org.elasticsearch.compute.aggregation; import org.elasticsearch.compute.aggregation.FirstLongByTimestampGroupingAggregatorFunctionTests.ExpectedWork; +import org.elasticsearch.compute.aggregation.FirstLongByTimestampGroupingAggregatorFunctionTests.TimestampGen; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.BlockUtils; @@ -24,10 +25,11 @@ public class FirstDoubleByTimestampAggregatorFunctionTests extends AggregatorFunctionTestCase { @Override protected SourceOperator simpleInput(BlockFactory blockFactory, int size) { + TimestampGen tsgen = randomFrom(TimestampGen.values()); return new ListRowsBlockSourceOperator( blockFactory, List.of(ElementType.DOUBLE, ElementType.LONG), - IntStream.range(0, size).mapToObj(l -> List.of(randomDouble(), randomLong())).toList() + IntStream.range(0, size).mapToObj(l -> List.of(randomDouble(), tsgen.gen())).toList() ); } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FirstDoubleByTimestampGroupingAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FirstDoubleByTimestampGroupingAggregatorFunctionTests.java index b084c166d1d86..074d855aa69e7 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FirstDoubleByTimestampGroupingAggregatorFunctionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FirstDoubleByTimestampGroupingAggregatorFunctionTests.java @@ -24,10 +24,13 @@ public class FirstDoubleByTimestampGroupingAggregatorFunctionTests extends GroupingAggregatorFunctionTestCase { @Override protected SourceOperator simpleInput(BlockFactory blockFactory, int size) { + FirstLongByTimestampGroupingAggregatorFunctionTests.TimestampGen tsgen = randomFrom( + FirstLongByTimestampGroupingAggregatorFunctionTests.TimestampGen.values() + ); return new ListRowsBlockSourceOperator( blockFactory, List.of(ElementType.LONG, ElementType.DOUBLE, ElementType.LONG), - IntStream.range(0, size).mapToObj(l -> List.of(randomLongBetween(0, 4), randomDouble(), randomLong())).toList() + IntStream.range(0, size).mapToObj(l -> List.of(randomLongBetween(0, 4), randomDouble(), tsgen.gen())).toList() ); } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FirstFloatByTimestampAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FirstFloatByTimestampAggregatorFunctionTests.java index 4a2148d7ebf0e..34e8b9089fc74 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FirstFloatByTimestampAggregatorFunctionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FirstFloatByTimestampAggregatorFunctionTests.java @@ -24,10 +24,13 @@ public class FirstFloatByTimestampAggregatorFunctionTests extends AggregatorFunctionTestCase { @Override protected SourceOperator simpleInput(BlockFactory blockFactory, int size) { + FirstLongByTimestampGroupingAggregatorFunctionTests.TimestampGen tsgen = randomFrom( + FirstLongByTimestampGroupingAggregatorFunctionTests.TimestampGen.values() + ); return new ListRowsBlockSourceOperator( blockFactory, List.of(ElementType.FLOAT, ElementType.LONG), - IntStream.range(0, size).mapToObj(l -> List.of(randomInt(), randomLong())).toList() + IntStream.range(0, size).mapToObj(l -> List.of(randomInt(), tsgen.gen())).toList() ); } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FirstFloatByTimestampGroupingAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FirstFloatByTimestampGroupingAggregatorFunctionTests.java index 8d2027ae57fd2..8226ce5a33c35 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FirstFloatByTimestampGroupingAggregatorFunctionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FirstFloatByTimestampGroupingAggregatorFunctionTests.java @@ -24,10 +24,13 @@ public class FirstFloatByTimestampGroupingAggregatorFunctionTests extends GroupingAggregatorFunctionTestCase { @Override protected SourceOperator simpleInput(BlockFactory blockFactory, int size) { + FirstLongByTimestampGroupingAggregatorFunctionTests.TimestampGen tsgen = randomFrom( + FirstLongByTimestampGroupingAggregatorFunctionTests.TimestampGen.values() + ); return new ListRowsBlockSourceOperator( blockFactory, List.of(ElementType.LONG, ElementType.FLOAT, ElementType.LONG), - IntStream.range(0, size).mapToObj(l -> List.of(randomLongBetween(0, 4), randomFloat(), randomLong())).toList() + IntStream.range(0, size).mapToObj(l -> List.of(randomLongBetween(0, 4), randomFloat(), tsgen.gen())).toList() ); } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FirstIntByTimestampAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FirstIntByTimestampAggregatorFunctionTests.java index 383bab213cf19..91677a51f4c92 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FirstIntByTimestampAggregatorFunctionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FirstIntByTimestampAggregatorFunctionTests.java @@ -24,10 +24,13 @@ public class FirstIntByTimestampAggregatorFunctionTests extends AggregatorFunctionTestCase { @Override protected SourceOperator simpleInput(BlockFactory blockFactory, int size) { + FirstLongByTimestampGroupingAggregatorFunctionTests.TimestampGen tsgen = randomFrom( + FirstLongByTimestampGroupingAggregatorFunctionTests.TimestampGen.values() + ); return new ListRowsBlockSourceOperator( blockFactory, List.of(ElementType.INT, ElementType.LONG), - IntStream.range(0, size).mapToObj(l -> List.of(randomInt(), randomLong())).toList() + IntStream.range(0, size).mapToObj(l -> List.of(randomInt(), tsgen.gen())).toList() ); } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FirstIntByTimestampGroupingAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FirstIntByTimestampGroupingAggregatorFunctionTests.java index b6952cb50e615..c71a5d4c08488 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FirstIntByTimestampGroupingAggregatorFunctionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FirstIntByTimestampGroupingAggregatorFunctionTests.java @@ -24,10 +24,13 @@ public class FirstIntByTimestampGroupingAggregatorFunctionTests extends GroupingAggregatorFunctionTestCase { @Override protected SourceOperator simpleInput(BlockFactory blockFactory, int size) { + FirstLongByTimestampGroupingAggregatorFunctionTests.TimestampGen tsgen = randomFrom( + FirstLongByTimestampGroupingAggregatorFunctionTests.TimestampGen.values() + ); return new ListRowsBlockSourceOperator( blockFactory, List.of(ElementType.LONG, ElementType.INT, ElementType.LONG), - IntStream.range(0, size).mapToObj(l -> List.of(randomLongBetween(0, 4), randomInt(), randomLong())).toList() + IntStream.range(0, size).mapToObj(l -> List.of(randomLongBetween(0, 4), randomInt(), tsgen.gen())).toList() ); } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FirstLongByTimestampAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FirstLongByTimestampAggregatorFunctionTests.java index a77ad857b9ef9..0c34e36b93e17 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FirstLongByTimestampAggregatorFunctionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FirstLongByTimestampAggregatorFunctionTests.java @@ -23,9 +23,12 @@ public class FirstLongByTimestampAggregatorFunctionTests extends AggregatorFunctionTestCase { @Override protected SourceOperator simpleInput(BlockFactory blockFactory, int size) { + FirstLongByTimestampGroupingAggregatorFunctionTests.TimestampGen tsgen = randomFrom( + FirstLongByTimestampGroupingAggregatorFunctionTests.TimestampGen.values() + ); return new TupleLongLongBlockSourceOperator( blockFactory, - IntStream.range(0, size).mapToObj(l -> Tuple.tuple(randomLong(), randomLong())) + IntStream.range(0, size).mapToObj(l -> Tuple.tuple(randomLong(), tsgen.gen())) ); } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FirstLongByTimestampGroupingAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FirstLongByTimestampGroupingAggregatorFunctionTests.java index 6f4f609b61420..c96cb6c5bf952 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FirstLongByTimestampGroupingAggregatorFunctionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FirstLongByTimestampGroupingAggregatorFunctionTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.ListRowsBlockSourceOperator; import org.elasticsearch.compute.operator.SourceOperator; +import org.elasticsearch.index.mapper.DateFieldMapper; import java.util.HashSet; import java.util.List; @@ -24,10 +25,11 @@ public class FirstLongByTimestampGroupingAggregatorFunctionTests extends GroupingAggregatorFunctionTestCase { @Override protected SourceOperator simpleInput(BlockFactory blockFactory, int size) { + TimestampGen tsgen = randomFrom(TimestampGen.values()); return new ListRowsBlockSourceOperator( blockFactory, List.of(ElementType.LONG, ElementType.LONG, ElementType.LONG), - IntStream.range(0, size).mapToObj(l -> List.of(randomLongBetween(0, 4), randomLong(), randomLong())).toList() + IntStream.range(0, size).mapToObj(l -> List.of(randomLongBetween(0, 4), randomLong(), tsgen.gen())).toList() ); } @@ -98,12 +100,60 @@ void check(Object v) { } } else { if (expected.contains(v) == false) { - String expectedMessage = expected.size() == 1 - ? "expected " + expected.iterator().next() - : "expected one of " + expected.stream().sorted().toList(); - throw new AssertionError(expectedMessage + " but was " + v); + throw new AssertionError("expected " + expectedMessage() + " but was " + v); } } } + + private String expectedMessage() { + if (expected.size() == 1) { + return expected.iterator().next().toString(); + } + if (expected.size() > 10) { + return "one of " + expected.size() + " values"; + } + return "one of " + expected.stream().sorted().toList(); + } + } + + enum TimestampGen { + ANY { + @Override + public long gen() { + return randomLong(); + } + }, + + RECENT { + private static final long RECENT = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2025-01-01"); + + @Override + public long gen() { + return randomLongBetween(RECENT, Long.MAX_VALUE); + } + }, + + AFTER_EPOCH { + @Override + public long gen() { + return randomLongBetween(0, Long.MAX_VALUE); + } + }, + + BEFORE_EPOCH { + @Override + public long gen() { + return randomLongBetween(Long.MIN_VALUE, 0); + } + }, + + EPOCH { + @Override + public long gen() { + return 0; + } + }; + + public abstract long gen(); } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/LastDoubleByTimestampAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/LastDoubleByTimestampAggregatorFunctionTests.java index 51c82c9bc165c..36c29e6335951 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/LastDoubleByTimestampAggregatorFunctionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/LastDoubleByTimestampAggregatorFunctionTests.java @@ -24,10 +24,13 @@ public class LastDoubleByTimestampAggregatorFunctionTests extends AggregatorFunctionTestCase { @Override protected SourceOperator simpleInput(BlockFactory blockFactory, int size) { + FirstLongByTimestampGroupingAggregatorFunctionTests.TimestampGen tsgen = randomFrom( + FirstLongByTimestampGroupingAggregatorFunctionTests.TimestampGen.values() + ); return new ListRowsBlockSourceOperator( blockFactory, List.of(ElementType.DOUBLE, ElementType.LONG), - IntStream.range(0, size).mapToObj(l -> List.of(randomDouble(), randomLong())).toList() + IntStream.range(0, size).mapToObj(l -> List.of(randomDouble(), tsgen.gen())).toList() ); } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/LastDoubleByTimestampGroupingAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/LastDoubleByTimestampGroupingAggregatorFunctionTests.java index b83244b253a2e..28998e6bbb0f5 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/LastDoubleByTimestampGroupingAggregatorFunctionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/LastDoubleByTimestampGroupingAggregatorFunctionTests.java @@ -24,10 +24,13 @@ public class LastDoubleByTimestampGroupingAggregatorFunctionTests extends GroupingAggregatorFunctionTestCase { @Override protected SourceOperator simpleInput(BlockFactory blockFactory, int size) { + FirstLongByTimestampGroupingAggregatorFunctionTests.TimestampGen tsgen = randomFrom( + FirstLongByTimestampGroupingAggregatorFunctionTests.TimestampGen.values() + ); return new ListRowsBlockSourceOperator( blockFactory, List.of(ElementType.LONG, ElementType.DOUBLE, ElementType.LONG), - IntStream.range(0, size).mapToObj(l -> List.of(randomLongBetween(0, 4), randomDouble(), randomLong())).toList() + IntStream.range(0, size).mapToObj(l -> List.of(randomLongBetween(0, 4), randomDouble(), tsgen.gen())).toList() ); } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/LastFloatByTimestampAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/LastFloatByTimestampAggregatorFunctionTests.java index 8dac984842a36..0d44d82420ffa 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/LastFloatByTimestampAggregatorFunctionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/LastFloatByTimestampAggregatorFunctionTests.java @@ -24,10 +24,13 @@ public class LastFloatByTimestampAggregatorFunctionTests extends AggregatorFunctionTestCase { @Override protected SourceOperator simpleInput(BlockFactory blockFactory, int size) { + FirstLongByTimestampGroupingAggregatorFunctionTests.TimestampGen tsgen = randomFrom( + FirstLongByTimestampGroupingAggregatorFunctionTests.TimestampGen.values() + ); return new ListRowsBlockSourceOperator( blockFactory, List.of(ElementType.FLOAT, ElementType.LONG), - IntStream.range(0, size).mapToObj(l -> List.of(randomInt(), randomLong())).toList() + IntStream.range(0, size).mapToObj(l -> List.of(randomInt(), tsgen.gen())).toList() ); } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/LastFloatByTimestampGroupingAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/LastFloatByTimestampGroupingAggregatorFunctionTests.java index 7e9e44968e586..f6098ccf2dc67 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/LastFloatByTimestampGroupingAggregatorFunctionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/LastFloatByTimestampGroupingAggregatorFunctionTests.java @@ -24,10 +24,13 @@ public class LastFloatByTimestampGroupingAggregatorFunctionTests extends GroupingAggregatorFunctionTestCase { @Override protected SourceOperator simpleInput(BlockFactory blockFactory, int size) { + FirstLongByTimestampGroupingAggregatorFunctionTests.TimestampGen tsgen = randomFrom( + FirstLongByTimestampGroupingAggregatorFunctionTests.TimestampGen.values() + ); return new ListRowsBlockSourceOperator( blockFactory, List.of(ElementType.LONG, ElementType.FLOAT, ElementType.LONG), - IntStream.range(0, size).mapToObj(l -> List.of(randomLongBetween(0, 4), randomFloat(), randomLong())).toList() + IntStream.range(0, size).mapToObj(l -> List.of(randomLongBetween(0, 4), randomFloat(), tsgen.gen())).toList() ); } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/LastIntByTimestampAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/LastIntByTimestampAggregatorFunctionTests.java index 52eead78c7fc4..af80187d64dc4 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/LastIntByTimestampAggregatorFunctionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/LastIntByTimestampAggregatorFunctionTests.java @@ -24,10 +24,13 @@ public class LastIntByTimestampAggregatorFunctionTests extends AggregatorFunctionTestCase { @Override protected SourceOperator simpleInput(BlockFactory blockFactory, int size) { + FirstLongByTimestampGroupingAggregatorFunctionTests.TimestampGen tsgen = randomFrom( + FirstLongByTimestampGroupingAggregatorFunctionTests.TimestampGen.values() + ); return new ListRowsBlockSourceOperator( blockFactory, List.of(ElementType.INT, ElementType.LONG), - IntStream.range(0, size).mapToObj(l -> List.of(randomInt(), randomLong())).toList() + IntStream.range(0, size).mapToObj(l -> List.of(randomInt(), tsgen.gen())).toList() ); } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/LastIntByTimestampGroupingAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/LastIntByTimestampGroupingAggregatorFunctionTests.java index 125f9c33c9bff..50992d009a3f7 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/LastIntByTimestampGroupingAggregatorFunctionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/LastIntByTimestampGroupingAggregatorFunctionTests.java @@ -24,10 +24,13 @@ public class LastIntByTimestampGroupingAggregatorFunctionTests extends GroupingAggregatorFunctionTestCase { @Override protected SourceOperator simpleInput(BlockFactory blockFactory, int size) { + FirstLongByTimestampGroupingAggregatorFunctionTests.TimestampGen tsgen = randomFrom( + FirstLongByTimestampGroupingAggregatorFunctionTests.TimestampGen.values() + ); return new ListRowsBlockSourceOperator( blockFactory, List.of(ElementType.LONG, ElementType.INT, ElementType.LONG), - IntStream.range(0, size).mapToObj(l -> List.of(randomLongBetween(0, 4), randomInt(), randomLong())).toList() + IntStream.range(0, size).mapToObj(l -> List.of(randomLongBetween(0, 4), randomInt(), tsgen.gen())).toList() ); } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/LastLongByTimestampAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/LastLongByTimestampAggregatorFunctionTests.java index 3207cb533d0aa..27d6d0177ef49 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/LastLongByTimestampAggregatorFunctionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/LastLongByTimestampAggregatorFunctionTests.java @@ -23,9 +23,12 @@ public class LastLongByTimestampAggregatorFunctionTests extends AggregatorFunctionTestCase { @Override protected SourceOperator simpleInput(BlockFactory blockFactory, int size) { + FirstLongByTimestampGroupingAggregatorFunctionTests.TimestampGen tsgen = randomFrom( + FirstLongByTimestampGroupingAggregatorFunctionTests.TimestampGen.values() + ); return new TupleLongLongBlockSourceOperator( blockFactory, - IntStream.range(0, size).mapToObj(l -> Tuple.tuple(randomLong(), randomLong())) + IntStream.range(0, size).mapToObj(l -> Tuple.tuple(randomLong(), tsgen.gen())) ); } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/LastLongByTimestampGroupingAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/LastLongByTimestampGroupingAggregatorFunctionTests.java index 9ae8ecbeede11..c8fe28c4e6735 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/LastLongByTimestampGroupingAggregatorFunctionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/LastLongByTimestampGroupingAggregatorFunctionTests.java @@ -24,10 +24,13 @@ public class LastLongByTimestampGroupingAggregatorFunctionTests extends GroupingAggregatorFunctionTestCase { @Override protected SourceOperator simpleInput(BlockFactory blockFactory, int size) { + FirstLongByTimestampGroupingAggregatorFunctionTests.TimestampGen tsgen = randomFrom( + FirstLongByTimestampGroupingAggregatorFunctionTests.TimestampGen.values() + ); return new ListRowsBlockSourceOperator( blockFactory, List.of(ElementType.LONG, ElementType.LONG, ElementType.LONG), - IntStream.range(0, size).mapToObj(l -> List.of(randomLongBetween(0, 4), randomLong(), randomLong())).toList() + IntStream.range(0, size).mapToObj(l -> List.of(randomLongBetween(0, 4), randomLong(), tsgen.gen())).toList() ); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/package-info.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/package-info.java index ce37e98b292f1..fbfaea592f9ff 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/package-info.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/package-info.java @@ -150,6 +150,11 @@ * {@code Block evaluateFinal(AggregatorState state, DriverContext)} converts the inner state of the aggregation to the result * column * + *
  • + * (optional) {@code void first(AggregatorState state, I input)} if present, this is called the first time a value + * is seen instead of calling {@code combine}. This is more efficient than manually checking for + * uninitialized state on every call to {@code combine}. + *
  • * *

    Grouping aggregation expects:

    *