Skip to content

Commit 85a4782

Browse files
authored
[ES|QL] Support first(), last() functions (#137408)
- Register ALL_FIRST as snapshot function, and restrict it to the long type only, for now. - Exceptionally allow ALL_FIRST agg function to be passed down null values by exempting it from the corresponding rule. - Change agg and grouping agg implementers, to pass all values down to the aggregator which must handle null values according to its internal logic. - Allow the final step of SpatialExtentGroupingState and SpatialExtentState to treat infinite values the same way it treats missing ones, by appending null. This allows us to skip a failed attempt at rectangle creation which results in an IllegalArgException if seen flag was on but the shape was null.
1 parent d8c455c commit 85a4782

27 files changed

+1278
-92
lines changed

x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/AggregatorImplementer.java

Lines changed: 20 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ public class AggregatorImplementer {
8787

8888
private final AggregationState aggState;
8989
private final List<Argument> aggParams;
90+
private final boolean hasOnlyBlockArguments;
9091
private final boolean tryToUseVectors;
9192

9293
public AggregatorImplementer(
@@ -120,7 +121,7 @@ public AggregatorImplementer(
120121
}
121122
return a;
122123
}).filter(a -> a instanceof PositionArgument == false).toList();
123-
124+
this.hasOnlyBlockArguments = this.aggParams.stream().allMatch(a -> a instanceof BlockArgument);
124125
this.tryToUseVectors = aggParams.stream().anyMatch(a -> (a instanceof BlockArgument) == false)
125126
&& aggParams.stream().noneMatch(a -> a.supportsVectorReadAccess() == false);
126127

@@ -438,38 +439,25 @@ private MethodSpec addRawBlock(boolean masked) {
438439
if (masked) {
439440
builder.beginControlFlow("if (mask.getBoolean(p) == false)").addStatement("continue").endControlFlow();
440441
}
442+
441443
for (Argument a : aggParams) {
442-
builder.addStatement("int $LValueCount = $L.getValueCount(p)", a.name(), a.blockName());
443-
builder.beginControlFlow("if ($LValueCount == 0)", a.name());
444-
builder.addStatement("continue");
445-
builder.endControlFlow();
444+
a.addContinueIfPositionHasNoValueBlock(builder);
446445
}
447446

448-
if (aggParams.getFirst() instanceof BlockArgument) {
449-
if (aggParams.size() > 1) {
450-
throw new IllegalArgumentException("array mode not supported for multiple args");
451-
}
452-
warningsBlock(
453-
builder,
454-
() -> builder.addStatement("$T.combine(state, p, $L)", declarationType, aggParams.getFirst().blockName())
455-
);
456-
} else {
447+
if (hasOnlyBlockArguments == false) {
457448
if (first == null && aggState.hasSeen()) {
458449
builder.addStatement("state.seen(true)");
459450
}
460-
for (Argument a : aggParams) {
461-
builder.addStatement("int $L = $L.getFirstValueIndex(p)", a.startName(), a.blockName());
462-
builder.addStatement("int $L = $L + $LValueCount", a.endName(), a.startName(), a.name());
463-
builder.beginControlFlow(
464-
"for (int $L = $L; $L < $L; $L++)",
465-
a.offsetName(),
466-
a.startName(),
467-
a.offsetName(),
468-
a.endName(),
469-
a.offsetName()
470-
);
471-
a.read(builder, a.blockName(), a.offsetName());
472-
}
451+
}
452+
453+
for (Argument a : aggParams) {
454+
a.startBlockProcessingLoop(builder);
455+
}
456+
457+
if (hasOnlyBlockArguments) {
458+
String params = aggParams.stream().map(Argument::blockName).collect(joining(", "));
459+
warningsBlock(builder, () -> builder.addStatement("$T.combine(state, p, $L)", declarationType, params));
460+
} else {
473461
if (first != null) {
474462
builder.addComment("Check seen in every iteration to save on complexity in the Block path");
475463
builder.beginControlFlow("if (state.seen())");
@@ -485,9 +473,11 @@ private MethodSpec addRawBlock(boolean masked) {
485473
} else {
486474
combineRawInput(builder, false);
487475
}
488-
for (Argument a : aggParams) {
489-
builder.endControlFlow();
490-
}
476+
}
477+
478+
for (int i = aggParams.size() - 1; i >= 0; --i) {
479+
Argument a = aggParams.get(i);
480+
a.endBlockProcessingLoop(builder);
491481
}
492482
}
493483
builder.endControlFlow();

x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/GroupingAggregatorImplementer.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -468,7 +468,8 @@ private MethodSpec addRawInputLoop(TypeName groupsType, boolean valuesAreVector)
468468
builder.endControlFlow();
469469
}
470470
builder.addStatement("int valuesPosition = groupPosition + positionOffset");
471-
if (valuesAreVector == false) {
471+
472+
if (valuesAreVector == false && hasOnlyBlockArguments == false) {
472473
for (Argument a : aggParams) {
473474
builder.beginControlFlow("if ($L.isNull(valuesPosition))", a.blockName());
474475
builder.addStatement("continue");
@@ -497,16 +498,10 @@ private MethodSpec addRawInputLoop(TypeName groupsType, boolean valuesAreVector)
497498
combineRawInput(builder);
498499
} else {
499500
if (hasOnlyBlockArguments) {
500-
if (aggParams.size() > 1) {
501-
throw new IllegalArgumentException("array mode not supported for multiple args");
502-
}
501+
String params = aggParams.stream().map(Argument::blockName).collect(joining(", "));
503502
warningsBlock(
504503
builder,
505-
() -> builder.addStatement(
506-
"$T.combine(state, groupId, valuesPosition, $L)",
507-
declarationType,
508-
aggParams.getFirst().blockName()
509-
)
504+
() -> builder.addStatement("$T.combine(state, groupId, valuesPosition, $L)", declarationType, params)
510505
);
511506
} else {
512507
for (Argument a : aggParams) {

x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/argument/Argument.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,30 @@ default void declareProcessParameter(MethodSpec.Builder builder, boolean blockSt
219219
builder.addParameter(parameterType, parameterName);
220220
}
221221

222+
/**
223+
* Adds a block to read the value at the current position, and to skip calling the aggregator if the value is zeroed.
224+
*/
225+
default void addContinueIfPositionHasNoValueBlock(MethodSpec.Builder builder) {
226+
builder.addStatement("int $LValueCount = $L.getValueCount(p)", name(), blockName());
227+
builder.beginControlFlow("if ($LValueCount == 0)", name());
228+
builder.addStatement("continue");
229+
builder.endControlFlow();
230+
}
231+
232+
/**
233+
* Starts the loop needed to process this argument's values when passed as a block.
234+
*/
235+
default void startBlockProcessingLoop(MethodSpec.Builder builder) {
236+
throw new UnsupportedOperationException("can't build raw block for " + type());
237+
}
238+
239+
/**
240+
* Ends the loop needed to process this argument's values when passed as a block.
241+
*/
242+
default void endBlockProcessingLoop(MethodSpec.Builder builder) {
243+
throw new UnsupportedOperationException("can't end block for " + type());
244+
}
245+
222246
/**
223247
* Build the invocation of the process method for this parameter.
224248
*/

x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/argument/BlockArgument.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,12 @@ public void buildToStringInvocation(StringBuilder pattern, List<Object> args, St
115115
args.add(name);
116116
}
117117

118+
@Override
119+
public void addContinueIfPositionHasNoValueBlock(MethodSpec.Builder builder) {
120+
// nothing to do
121+
// block params don't skip any positions as all values must be passed down to the aggregator
122+
}
123+
118124
@Override
119125
public String closeInvocation() {
120126
return name;
@@ -124,4 +130,14 @@ public String closeInvocation() {
124130
public void sumBaseRamBytesUsed(MethodSpec.Builder builder) {
125131
builder.addStatement("baseRamBytesUsed += $L.baseRamBytesUsed()", name);
126132
}
133+
134+
@Override
135+
public void startBlockProcessingLoop(MethodSpec.Builder builder) {
136+
// nothing to do
137+
}
138+
139+
@Override
140+
public void endBlockProcessingLoop(MethodSpec.Builder builder) {
141+
// nothing to do
142+
}
127143
}

x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/argument/StandardArgument.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,19 @@ public void sumBaseRamBytesUsed(MethodSpec.Builder builder) {
156156
builder.addStatement("baseRamBytesUsed += $L.baseRamBytesUsed()", name);
157157
}
158158

159+
@Override
160+
public void startBlockProcessingLoop(MethodSpec.Builder builder) {
161+
builder.addStatement("int $L = $L.getFirstValueIndex(p)", startName(), blockName());
162+
builder.addStatement("int $L = $L + $LValueCount", endName(), startName(), name());
163+
builder.beginControlFlow("for (int $L = $L; $L < $L; $L++)", offsetName(), startName(), offsetName(), endName(), offsetName());
164+
read(builder, blockName(), offsetName());
165+
}
166+
167+
@Override
168+
public void endBlockProcessingLoop(MethodSpec.Builder builder) {
169+
builder.endControlFlow();
170+
}
171+
159172
static void skipNull(MethodSpec.Builder builder, String value) {
160173
builder.beginControlFlow("switch ($N.getValueCount(p))", value);
161174
{

x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/AllFirstLongByTimestampAggregatorFunction.java

Lines changed: 155 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/AllFirstLongByTimestampAggregatorFunctionSupplier.java

Lines changed: 47 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)