Skip to content

Commit 27a2bcd

Browse files
Merge branch 'main' into feature/timestamp-doc-values-spars-index-setting
2 parents 53fdd68 + 58277e7 commit 27a2bcd

File tree

5 files changed

+253
-118
lines changed

5 files changed

+253
-118
lines changed

x-pack/plugin/esql/compute/ann/src/main/java/org/elasticsearch/compute/ann/Aggregator.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,8 @@
3939
* <p>
4040
* The generation code also looks for the optional methods {@code combineIntermediate}
4141
* and {@code evaluateFinal} which are used to combine intermediate states and
42-
* produce the final output. If the first is missing then the generated code will
43-
* call the {@code combine} method to combine intermediate states. If the second
44-
* is missing the generated code will make a block containing the primitive from
45-
* the state. If either of those don't have sensible interpretations then the code
46-
* generation code will throw an error, aborting the compilation.
42+
* produce the final output. Please note, those are auto-generated when aggregating
43+
* primitive types such as boolean, int, long, float, double.
4744
* </p>
4845
*/
4946
@Target(ElementType.TYPE)
@@ -58,4 +55,8 @@
5855
*/
5956
Class<? extends Exception>[] warnExceptions() default {};
6057

58+
/**
59+
* If {@code true} then the @timestamp LongVector will be appended to the input blocks of the aggregation function.
60+
*/
61+
boolean includeTimestamps() default false;
6162
}

x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/AggregatorImplementer.java

Lines changed: 92 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@
5454
import static org.elasticsearch.compute.gen.Types.INTERMEDIATE_STATE_DESC;
5555
import static org.elasticsearch.compute.gen.Types.LIST_AGG_FUNC_DESC;
5656
import static org.elasticsearch.compute.gen.Types.LIST_INTEGER;
57+
import static org.elasticsearch.compute.gen.Types.LONG_BLOCK;
58+
import static org.elasticsearch.compute.gen.Types.LONG_VECTOR;
5759
import static org.elasticsearch.compute.gen.Types.PAGE;
5860
import static org.elasticsearch.compute.gen.Types.WARNINGS;
5961
import static org.elasticsearch.compute.gen.Types.blockType;
@@ -73,9 +75,10 @@ public class AggregatorImplementer {
7375
private final List<TypeMirror> warnExceptions;
7476
private final ExecutableElement init;
7577
private final ExecutableElement combine;
78+
private final List<Parameter> createParameters;
7679
private final ClassName implementation;
7780
private final List<IntermediateStateDesc> intermediateState;
78-
private final List<Parameter> createParameters;
81+
private final boolean includeTimestampVector;
7982

8083
private final AggregationState aggState;
8184
private final AggregationParameter aggParam;
@@ -84,7 +87,8 @@ public AggregatorImplementer(
8487
Elements elements,
8588
TypeElement declarationType,
8689
IntermediateState[] interStateAnno,
87-
List<TypeMirror> warnExceptions
90+
List<TypeMirror> warnExceptions,
91+
boolean includeTimestampVector
8892
) {
8993
this.declarationType = declarationType;
9094
this.warnExceptions = warnExceptions;
@@ -102,10 +106,10 @@ public AggregatorImplementer(
102106
declarationType,
103107
aggState.declaredType().isPrimitive() ? requireType(aggState.declaredType()) : requireVoidType(),
104108
requireName("combine"),
105-
requireArgs(requireType(aggState.declaredType()), requireAnyType("<aggregation input column type>"))
109+
combineArgs(aggState, includeTimestampVector)
106110
);
107111
// TODO support multiple parameters
108-
this.aggParam = AggregationParameter.create(combine.getParameters().get(1).asType());
112+
this.aggParam = AggregationParameter.create(combine.getParameters().getLast().asType());
109113

110114
this.createParameters = init.getParameters()
111115
.stream()
@@ -117,7 +121,20 @@ public AggregatorImplementer(
117121
elements.getPackageOf(declarationType).toString(),
118122
(declarationType.getSimpleName() + "AggregatorFunction").replace("AggregatorAggregator", "Aggregator")
119123
);
120-
intermediateState = Arrays.stream(interStateAnno).map(IntermediateStateDesc::newIntermediateStateDesc).toList();
124+
this.intermediateState = Arrays.stream(interStateAnno).map(IntermediateStateDesc::newIntermediateStateDesc).toList();
125+
this.includeTimestampVector = includeTimestampVector;
126+
}
127+
128+
private static Methods.ArgumentMatcher combineArgs(AggregationState aggState, boolean includeTimestampVector) {
129+
if (includeTimestampVector) {
130+
return requireArgs(
131+
requireType(aggState.declaredType()),
132+
requireType(TypeName.LONG), // @timestamp
133+
requireAnyType("<aggregation input column type>")
134+
);
135+
} else {
136+
return requireArgs(requireType(aggState.declaredType()), requireAnyType("<aggregation input column type>"));
137+
}
121138
}
122139

123140
ClassName implementation() {
@@ -295,10 +312,18 @@ private MethodSpec addRawInput() {
295312
builder.addComment("No masking");
296313
builder.addStatement("$T block = page.getBlock(channels.get(0))", blockType(aggParam.type()));
297314
builder.addStatement("$T vector = block.asVector()", vectorType(aggParam.type()));
315+
if (includeTimestampVector) {
316+
builder.addStatement("$T timestampsBlock = page.getBlock(channels.get(1))", LONG_BLOCK);
317+
builder.addStatement("$T timestampsVector = timestampsBlock.asVector()", LONG_VECTOR);
318+
319+
builder.beginControlFlow("if (timestampsVector == null) ");
320+
builder.addStatement("throw new IllegalStateException($S)", "expected @timestamp vector; but got a block");
321+
builder.endControlFlow();
322+
}
298323
builder.beginControlFlow("if (vector != null)");
299-
builder.addStatement("addRawVector(vector)");
324+
builder.addStatement(includeTimestampVector ? "addRawVector(vector, timestampsVector)" : "addRawVector(vector)");
300325
builder.nextControlFlow("else");
301-
builder.addStatement("addRawBlock(block)");
326+
builder.addStatement(includeTimestampVector ? "addRawBlock(block, timestampsVector)" : "addRawBlock(block)");
302327
builder.endControlFlow();
303328
builder.addStatement("return");
304329
}
@@ -307,17 +332,28 @@ private MethodSpec addRawInput() {
307332
builder.addComment("Some positions masked away, others kept");
308333
builder.addStatement("$T block = page.getBlock(channels.get(0))", blockType(aggParam.type()));
309334
builder.addStatement("$T vector = block.asVector()", vectorType(aggParam.type()));
335+
if (includeTimestampVector) {
336+
builder.addStatement("$T timestampsBlock = page.getBlock(channels.get(1))", LONG_BLOCK);
337+
builder.addStatement("$T timestampsVector = timestampsBlock.asVector()", LONG_VECTOR);
338+
339+
builder.beginControlFlow("if (timestampsVector == null) ");
340+
builder.addStatement("throw new IllegalStateException($S)", "expected @timestamp vector; but got a block");
341+
builder.endControlFlow();
342+
}
310343
builder.beginControlFlow("if (vector != null)");
311-
builder.addStatement("addRawVector(vector, mask)");
344+
builder.addStatement(includeTimestampVector ? "addRawVector(vector, timestampsVector, mask)" : "addRawVector(vector, mask)");
312345
builder.nextControlFlow("else");
313-
builder.addStatement("addRawBlock(block, mask)");
346+
builder.addStatement(includeTimestampVector ? "addRawBlock(block, timestampsVector, mask)" : "addRawBlock(block, mask)");
314347
builder.endControlFlow();
315348
return builder.build();
316349
}
317350

318351
private MethodSpec addRawVector(boolean masked) {
319352
MethodSpec.Builder builder = MethodSpec.methodBuilder("addRawVector");
320353
builder.addModifiers(Modifier.PRIVATE).addParameter(vectorType(aggParam.type()), "vector");
354+
if (includeTimestampVector) {
355+
builder.addParameter(LONG_VECTOR, "timestamps");
356+
}
321357
if (masked) {
322358
builder.addParameter(BOOLEAN_VECTOR, "mask");
323359
}
@@ -348,6 +384,9 @@ private MethodSpec addRawVector(boolean masked) {
348384
private MethodSpec addRawBlock(boolean masked) {
349385
MethodSpec.Builder builder = MethodSpec.methodBuilder("addRawBlock");
350386
builder.addModifiers(Modifier.PRIVATE).addParameter(blockType(aggParam.type()), "block");
387+
if (includeTimestampVector) {
388+
builder.addParameter(LONG_VECTOR, "timestamps");
389+
}
351390
if (masked) {
352391
builder.addParameter(BOOLEAN_VECTOR, "mask");
353392
}
@@ -401,33 +440,57 @@ private void combineRawInput(MethodSpec.Builder builder, String blockVariable) {
401440
});
402441
}
403442

404-
private void combineRawInputForPrimitive(TypeName returnType, MethodSpec.Builder builder, String blockVariable) {
405-
builder.addStatement(
406-
"state.$TValue($T.combine(state.$TValue(), $L.get$L(i)))",
407-
returnType,
408-
declarationType,
409-
returnType,
410-
blockVariable,
411-
capitalize(combine.getParameters().get(1).asType().toString())
412-
);
443+
private void combineRawInputForBytesRef(MethodSpec.Builder builder, String blockVariable) {
444+
// scratch is a BytesRef var that must have been defined before the iteration starts
445+
if (includeTimestampVector) {
446+
builder.addStatement("$T.combine(state, timestamps.getLong(i), $L.getBytesRef(i, scratch))", declarationType, blockVariable);
447+
} else {
448+
builder.addStatement("$T.combine(state, $L.getBytesRef(i, scratch))", declarationType, blockVariable);
449+
}
413450
}
414451

415-
private void combineRawInputForArray(MethodSpec.Builder builder, String arrayVariable) {
416-
warningsBlock(builder, () -> builder.addStatement("$T.combine(state, $L)", declarationType, arrayVariable));
452+
private void combineRawInputForPrimitive(TypeName returnType, MethodSpec.Builder builder, String blockVariable) {
453+
if (includeTimestampVector) {
454+
builder.addStatement(
455+
"state.$TValue($T.combine(state.$TValue(), timestamps.getLong(i), $L.get$L(i)))",
456+
returnType,
457+
declarationType,
458+
returnType,
459+
blockVariable,
460+
capitalize(combine.getParameters().get(1).asType().toString())
461+
);
462+
} else {
463+
builder.addStatement(
464+
"state.$TValue($T.combine(state.$TValue(), $L.get$L(i)))",
465+
returnType,
466+
declarationType,
467+
returnType,
468+
blockVariable,
469+
capitalize(combine.getParameters().get(1).asType().toString())
470+
);
471+
}
417472
}
418473

419474
private void combineRawInputForVoid(MethodSpec.Builder builder, String blockVariable) {
420-
builder.addStatement(
421-
"$T.combine(state, $L.get$L(i))",
422-
declarationType,
423-
blockVariable,
424-
capitalize(combine.getParameters().get(1).asType().toString())
425-
);
475+
if (includeTimestampVector) {
476+
builder.addStatement(
477+
"$T.combine(state, timestamps.getLong(i), $L.get$L(i))",
478+
declarationType,
479+
blockVariable,
480+
capitalize(combine.getParameters().get(1).asType().toString())
481+
);
482+
} else {
483+
builder.addStatement(
484+
"$T.combine(state, $L.get$L(i))",
485+
declarationType,
486+
blockVariable,
487+
capitalize(combine.getParameters().get(1).asType().toString())
488+
);
489+
}
426490
}
427491

428-
private void combineRawInputForBytesRef(MethodSpec.Builder builder, String blockVariable) {
429-
// scratch is a BytesRef var that must have been defined before the iteration starts
430-
builder.addStatement("$T.combine(state, $L.getBytesRef(i, scratch))", declarationType, blockVariable);
492+
private void combineRawInputForArray(MethodSpec.Builder builder, String arrayVariable) {
493+
warningsBlock(builder, () -> builder.addStatement("$T.combine(state, $L)", declarationType, arrayVariable));
431494
}
432495

433496
private void warningsBlock(MethodSpec.Builder builder, Runnable block) {

x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/AggregatorProcessor.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,13 @@ public boolean process(Set<? extends TypeElement> set, RoundEnvironment roundEnv
8787
);
8888
if (aggClass.getAnnotation(Aggregator.class) != null) {
8989
IntermediateState[] intermediateState = aggClass.getAnnotation(Aggregator.class).value();
90-
implementer = new AggregatorImplementer(env.getElementUtils(), aggClass, intermediateState, warnExceptionsTypes);
90+
implementer = new AggregatorImplementer(
91+
env.getElementUtils(),
92+
aggClass,
93+
intermediateState,
94+
warnExceptionsTypes,
95+
aggClass.getAnnotation(Aggregator.class).includeTimestamps()
96+
);
9197
write(aggClass, "aggregator", implementer.sourceFile(), env);
9298
}
9399
GroupingAggregatorImplementer groupingAggregatorImplementer = null;
@@ -96,13 +102,12 @@ public boolean process(Set<? extends TypeElement> set, RoundEnvironment roundEnv
96102
if (intermediateState.length == 0 && aggClass.getAnnotation(Aggregator.class) != null) {
97103
intermediateState = aggClass.getAnnotation(Aggregator.class).value();
98104
}
99-
boolean includeTimestamps = aggClass.getAnnotation(GroupingAggregator.class).includeTimestamps();
100105
groupingAggregatorImplementer = new GroupingAggregatorImplementer(
101106
env.getElementUtils(),
102107
aggClass,
103108
intermediateState,
104109
warnExceptionsTypes,
105-
includeTimestamps
110+
aggClass.getAnnotation(GroupingAggregator.class).includeTimestamps()
106111
);
107112
write(aggClass, "grouping aggregator", groupingAggregatorImplementer.sourceFile(), env);
108113
}

0 commit comments

Comments
 (0)