Skip to content

Commit 58c5044

Browse files
authored
ESQL: Fix bug in FIRST/LAST initialization (#132671)
Fixes a bug in the ungrouped FIRST/LAST implementation when it received timestamps always less than 0 (greater than 0 for LAST). We were always returning `0` as the value....
1 parent c91871f commit 58c5044

File tree

35 files changed

+638
-113
lines changed

35 files changed

+638
-113
lines changed

x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/AggregatorImplementer.java

Lines changed: 76 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import javax.lang.model.util.Elements;
3737

3838
import static java.util.stream.Collectors.joining;
39+
import static org.elasticsearch.compute.gen.Methods.optionalStaticMethod;
3940
import static org.elasticsearch.compute.gen.Methods.requireAnyArgs;
4041
import static org.elasticsearch.compute.gen.Methods.requireAnyType;
4142
import static org.elasticsearch.compute.gen.Methods.requireArgs;
@@ -76,6 +77,7 @@ public class AggregatorImplementer {
7677
private final List<TypeMirror> warnExceptions;
7778
private final ExecutableElement init;
7879
private final ExecutableElement combine;
80+
private final ExecutableElement first;
7981
private final List<Parameter> createParameters;
8082
private final ClassName implementation;
8183
private final List<IntermediateStateDesc> intermediateState;
@@ -114,6 +116,18 @@ public AggregatorImplementer(
114116
.filter(f -> false == f.type().equals(BIG_ARRAYS) && false == f.type().equals(DRIVER_CONTEXT))
115117
.toList();
116118

119+
this.first = aggState.declaredType.isPrimitive()
120+
? null
121+
: optionalStaticMethod(
122+
declarationType,
123+
requireVoidType(),
124+
requireName("first"),
125+
requireArgs(combine.getParameters().stream().map(p -> requireType(TypeName.get(p.asType()))).toArray(TypeMatcher[]::new))
126+
);
127+
if (this.aggState.hasSeen == false && this.first != null) {
128+
throw new IllegalArgumentException("[first] method not supported without [seen] on agg state");
129+
}
130+
117131
this.implementation = ClassName.get(
118132
elements.getPackageOf(declarationType).toString(),
119133
(declarationType.getSimpleName() + "AggregatorFunction").replace("AggregatorAggregator", "Aggregator")
@@ -339,10 +353,17 @@ private MethodSpec addRawVector(boolean masked) {
339353
builder.addComment("This type does not support vectors because all values are multi-valued");
340354
return builder.build();
341355
}
356+
357+
if (first != null) {
358+
builder.addComment("Find the first value up front in the Vector path which is more complex but should be faster");
359+
builder.addStatement("int valuesPosition = 0");
360+
addRawVectorWithFirst(builder, true, masked);
361+
addRawVectorWithFirst(builder, false, masked);
362+
return builder.build();
363+
}
342364
if (aggState.hasSeen()) {
343365
builder.addStatement("state.seen(true)");
344366
}
345-
346367
builder.beginControlFlow(
347368
"for (int valuesPosition = 0; valuesPosition < $L.getPositionCount(); valuesPosition++)",
348369
aggParams.getFirst().vectorName()
@@ -354,13 +375,39 @@ private MethodSpec addRawVector(boolean masked) {
354375
for (AggregationParameter p : aggParams) {
355376
p.read(builder, true);
356377
}
357-
combineRawInput(builder);
358-
378+
combineRawInput(builder, false);
359379
}
360380
builder.endControlFlow();
361381
return builder.build();
362382
}
363383

384+
private void addRawVectorWithFirst(MethodSpec.Builder builder, boolean firstPass, boolean masked) {
385+
builder.beginControlFlow(
386+
firstPass
387+
? "while (state.seen() == false && valuesPosition < $L.getPositionCount())"
388+
: "while (valuesPosition < $L.getPositionCount())",
389+
aggParams.getFirst().vectorName()
390+
);
391+
{
392+
if (masked) {
393+
builder.beginControlFlow("if (mask.getBoolean(valuesPosition) == false)");
394+
builder.addStatement("valuesPosition++");
395+
builder.addStatement("continue");
396+
builder.endControlFlow();
397+
}
398+
for (AggregationParameter p : aggParams) {
399+
p.read(builder, true);
400+
}
401+
combineRawInput(builder, firstPass);
402+
builder.addStatement("valuesPosition++");
403+
if (firstPass) {
404+
builder.addStatement("state.seen(true)");
405+
builder.addStatement("break");
406+
}
407+
}
408+
builder.endControlFlow();
409+
}
410+
364411
private MethodSpec addRawBlock(boolean masked) {
365412
MethodSpec.Builder builder = initAddRaw(false, masked);
366413

@@ -374,9 +421,6 @@ private MethodSpec addRawBlock(boolean masked) {
374421
builder.addStatement("continue");
375422
builder.endControlFlow();
376423
}
377-
if (aggState.hasSeen()) {
378-
builder.addStatement("state.seen(true)");
379-
}
380424

381425
if (aggParams.getFirst().isArray()) {
382426
if (aggParams.size() > 1) {
@@ -399,6 +443,9 @@ private MethodSpec addRawBlock(boolean masked) {
399443
builder.endControlFlow();
400444
combineRawInputForArray(builder, "valuesArray");
401445
} else {
446+
if (first == null && aggState.hasSeen()) {
447+
builder.addStatement("state.seen(true)");
448+
}
402449
for (AggregationParameter p : aggParams) {
403450
builder.addStatement("int $L = $L.getFirstValueIndex(p)", p.startName(), p.blockName());
404451
builder.addStatement("int $L = $L + $L.getValueCount(p)", p.endName(), p.startName(), p.blockName());
@@ -412,7 +459,21 @@ private MethodSpec addRawBlock(boolean masked) {
412459
);
413460
p.read(builder, false);
414461
}
415-
combineRawInput(builder);
462+
if (first != null) {
463+
builder.addComment("Check seen in every iteration to save on complexity in the Block path");
464+
builder.beginControlFlow("if (state.seen())");
465+
{
466+
combineRawInput(builder, false);
467+
}
468+
builder.nextControlFlow("else");
469+
{
470+
builder.addStatement("state.seen(true)");
471+
combineRawInput(builder, true);
472+
}
473+
builder.endControlFlow();
474+
} else {
475+
combineRawInput(builder, false);
476+
}
416477
for (AggregationParameter p : aggParams) {
417478
builder.endControlFlow();
418479
}
@@ -443,22 +504,26 @@ private MethodSpec.Builder initAddRaw(boolean valuesAreVector, boolean masked) {
443504
return builder;
444505
}
445506

446-
private void combineRawInput(MethodSpec.Builder builder) {
507+
private void combineRawInput(MethodSpec.Builder builder, boolean useFirst) {
447508
TypeName returnType = TypeName.get(combine.getReturnType());
448-
warningsBlock(builder, () -> invokeCombineRawInput(returnType, builder));
509+
warningsBlock(builder, () -> invokeCombineRawInput(returnType, builder, useFirst));
449510
}
450511

451-
private void invokeCombineRawInput(TypeName returnType, MethodSpec.Builder builder) {
512+
private void invokeCombineRawInput(TypeName returnType, MethodSpec.Builder builder, boolean useFirst) {
452513
StringBuilder pattern = new StringBuilder();
453514
List<Object> params = new ArrayList<>();
454515
if (returnType.isPrimitive()) {
516+
if (useFirst) {
517+
throw new IllegalArgumentException("[first] not supported with primitive");
518+
}
455519
pattern.append("state.$TValue($T.combine(state.$TValue()");
456520
params.add(returnType);
457521
params.add(declarationType);
458522
params.add(returnType);
459523
} else if (returnType == TypeName.VOID) {
460-
pattern.append("$T.combine(state");
524+
pattern.append("$T.$L(state");
461525
params.add(declarationType);
526+
params.add(useFirst ? first.getSimpleName() : combine.getSimpleName());
462527
} else {
463528
throw new IllegalArgumentException("combine must return void or a primitive");
464529
}

x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstDoubleByTimestampAggregator.java

Lines changed: 11 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstFloatByTimestampAggregator.java

Lines changed: 11 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstIntByTimestampAggregator.java

Lines changed: 11 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstLongByTimestampAggregator.java

Lines changed: 11 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastDoubleByTimestampAggregator.java

Lines changed: 11 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastFloatByTimestampAggregator.java

Lines changed: 11 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastIntByTimestampAggregator.java

Lines changed: 11 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastLongByTimestampAggregator.java

Lines changed: 11 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)