Skip to content

Commit e5670dc

Browse files
committed
Add "includeTimestamps" to non-grouping aggregator
1 parent 6e6466e commit e5670dc

File tree

61 files changed

+511
-534
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

61 files changed

+511
-534
lines changed

x-pack/plugin/esql/compute/ann/src/main/java/org/elasticsearch/compute/ann/Aggregator.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,4 +63,8 @@
6363
*/
6464
Class<? extends Exception>[] warnExceptions() default {};
6565

66+
/**
67+
* If {@code true} then the @timestamp LongVector will be appended to the input blocks of the aggregation function.
68+
*/
69+
boolean includeTimestamps() default false;
6670
}

x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/AggregatorImplementer.java

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -87,12 +87,14 @@ public class AggregatorImplementer {
8787
private final boolean valuesIsBytesRef;
8888
private final List<IntermediateStateDesc> intermediateState;
8989
private final List<Parameter> createParameters;
90+
private final boolean includeTimestampVector;
9091

9192
public AggregatorImplementer(
9293
Elements elements,
9394
TypeElement declarationType,
9495
IntermediateState[] interStateAnno,
95-
List<TypeMirror> warnExceptions
96+
List<TypeMirror> warnExceptions,
97+
boolean includeTimestampVector
9698
) {
9799
this.declarationType = declarationType;
98100
this.warnExceptions = warnExceptions;
@@ -128,6 +130,7 @@ public AggregatorImplementer(
128130
);
129131
this.valuesIsBytesRef = BYTES_REF.equals(TypeName.get(combine.getParameters().get(combine.getParameters().size() - 1).asType()));
130132
intermediateState = Arrays.stream(interStateAnno).map(IntermediateStateDesc::newIntermediateStateDesc).toList();
133+
this.includeTimestampVector = includeTimestampVector;
131134
}
132135

133136
ClassName implementation() {
@@ -359,34 +362,44 @@ private MethodSpec addRawInput() {
359362
builder.addStatement("return");
360363
}
361364
builder.endControlFlow();
365+
builder.addStatement("$T block = page.getBlock(channels.get(0))", valueBlockType(init, combine));
366+
builder.addStatement("$T vector = block.asVector()", valueVectorType(init, combine));
367+
if (includeTimestampVector) {
368+
builder.addStatement("$T timestampsBlock = page.getBlock(channels.get(1))", LONG_BLOCK);
369+
builder.addStatement("$T timestampsVector = timestampsBlock.asVector()", LONG_VECTOR);
370+
builder.beginControlFlow("if (timestampsVector == null) ");
371+
builder.addStatement("throw new IllegalStateException($S)", "expected @timestamp vector; but got a block");
372+
builder.endControlFlow();
373+
}
374+
362375
builder.beginControlFlow("if (mask.allTrue())");
376+
String extra = includeTimestampVector ? ", timestampsVector" : "";
363377
{
364378
builder.addComment("No masking");
365-
builder.addStatement("$T block = page.getBlock(channels.get(0))", valueBlockType(init, combine));
366-
builder.addStatement("$T vector = block.asVector()", valueVectorType(init, combine));
367379
builder.beginControlFlow("if (vector != null)");
368-
builder.addStatement("addRawVector(vector)");
380+
builder.addStatement("addRawVector(vector$L)", extra);
369381
builder.nextControlFlow("else");
370-
builder.addStatement("addRawBlock(block)");
382+
builder.addStatement("addRawBlock(block$L)", extra);
371383
builder.endControlFlow();
372384
builder.addStatement("return");
373385
}
374-
builder.endControlFlow();
375-
386+
builder.nextControlFlow("else");
376387
builder.addComment("Some positions masked away, others kept");
377-
builder.addStatement("$T block = page.getBlock(channels.get(0))", valueBlockType(init, combine));
378-
builder.addStatement("$T vector = block.asVector()", valueVectorType(init, combine));
379388
builder.beginControlFlow("if (vector != null)");
380-
builder.addStatement("addRawVector(vector, mask)");
389+
builder.addStatement("addRawVector(vector$L, mask)", extra);
381390
builder.nextControlFlow("else");
382-
builder.addStatement("addRawBlock(block, mask)");
391+
builder.addStatement("addRawBlock(block$L, mask)", extra);
392+
builder.endControlFlow();
383393
builder.endControlFlow();
384394
return builder.build();
385395
}
386396

387397
private MethodSpec addRawVector(boolean masked) {
388398
MethodSpec.Builder builder = MethodSpec.methodBuilder("addRawVector");
389399
builder.addModifiers(Modifier.PRIVATE).addParameter(valueVectorType(init, combine), "vector");
400+
if (includeTimestampVector) {
401+
builder.addParameter(LONG_VECTOR, "timestamps");
402+
}
390403
if (masked) {
391404
builder.addParameter(BOOLEAN_VECTOR, "mask");
392405
}
@@ -416,6 +429,9 @@ private MethodSpec addRawVector(boolean masked) {
416429
private MethodSpec addRawBlock(boolean masked) {
417430
MethodSpec.Builder builder = MethodSpec.methodBuilder("addRawBlock");
418431
builder.addModifiers(Modifier.PRIVATE).addParameter(valueBlockType(init, combine), "block");
432+
if (includeTimestampVector) {
433+
builder.addParameter(LONG_VECTOR, "timestamps");
434+
}
419435
if (masked) {
420436
builder.addParameter(BOOLEAN_VECTOR, "mask");
421437
}
@@ -455,6 +471,8 @@ private void combineRawInput(MethodSpec.Builder builder, String blockVariable) {
455471
}
456472
if (valuesIsBytesRef) {
457473
combineRawInputForBytesRef(builder, blockVariable);
474+
} else if (includeTimestampVector) {
475+
combineRawInputWithTimestamp(builder, blockVariable);
458476
} else if (returnType.isPrimitive()) {
459477
combineRawInputForPrimitive(returnType, builder, blockVariable);
460478
} else if (returnType == TypeName.VOID) {
@@ -492,6 +510,12 @@ private void combineRawInputForVoid(MethodSpec.Builder builder, String blockVari
492510
);
493511
}
494512

513+
private void combineRawInputWithTimestamp(MethodSpec.Builder builder, String blockVariable) {
514+
TypeName valueType = TypeName.get(combine.getParameters().get(combine.getParameters().size() - 1).asType());
515+
String blockType = valueType.toString().substring(0, 1).toUpperCase(Locale.ROOT) + valueType.toString().substring(1);
516+
builder.addStatement("$T.combine(state, timestamps.getLong(i), $L.get$L(i))", declarationType, blockVariable, blockType);
517+
}
518+
495519
private void combineRawInputForBytesRef(MethodSpec.Builder builder, String blockVariable) {
496520
// scratch is a BytesRef var that must have been defined before the iteration starts
497521
builder.addStatement("$T.combine(state, $L.getBytesRef(i, scratch))", declarationType, blockVariable);

x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/AggregatorProcessor.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,14 @@ public boolean process(Set<? extends TypeElement> set, RoundEnvironment roundEnv
8787
);
8888
if (aggClass.getAnnotation(Aggregator.class) != null) {
8989
IntermediateState[] intermediateState = aggClass.getAnnotation(Aggregator.class).value();
90-
implementer = new AggregatorImplementer(env.getElementUtils(), aggClass, intermediateState, warnExceptionsTypes);
90+
boolean includeTimestamps = aggClass.getAnnotation(Aggregator.class).includeTimestamps();
91+
implementer = new AggregatorImplementer(
92+
env.getElementUtils(),
93+
aggClass,
94+
intermediateState,
95+
warnExceptionsTypes,
96+
includeTimestamps
97+
);
9198
write(aggClass, "aggregator", implementer.sourceFile(), env);
9299
}
93100
GroupingAggregatorImplementer groupingAggregatorImplementer = null;

x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/CountDistinctBooleanAggregatorFunction.java

Lines changed: 8 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/CountDistinctBytesRefAggregatorFunction.java

Lines changed: 8 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/CountDistinctDoubleAggregatorFunction.java

Lines changed: 8 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/CountDistinctFloatAggregatorFunction.java

Lines changed: 8 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/CountDistinctIntAggregatorFunction.java

Lines changed: 8 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/CountDistinctLongAggregatorFunction.java

Lines changed: 8 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MaxBooleanAggregatorFunction.java

Lines changed: 8 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)