Skip to content

Commit 05323e7

Browse files
authored
ESQL: Compute support for filtering ungrouped aggs (elastic#112717) (elastic#112763)
Adds support to the compute engine for filtering which positions are processed by ungrouping aggs. This should allow syntax like: ``` | STATS success = COUNT(*) WHERE 200 <= response_code AND response_code < 300, redirect = COUNT(*) WHERE 300 <= response_code AND response_code < 400, client_err = COUNT(*) WHERE 400 <= response_code AND response_code < 500, server_err = COUNT(*) WHERE 500 <= response_code AND response_code < 600, total_count = COUNT(*) ``` We could translate the WHERE expression into an `ExpressionEvaluator` and run it, then plug it into the filtering support added in this PR. The actual filtering is done by creating a `FilteredAggregatorFunction` which wraps a regular `AggregatorFunction` first executing the filter against the incoming `Page` and then passing the resulting mask to the `AggregatorFunction`. We've then added a `mask` to `AggregatorFunction#process` which each aggregation function must use for filtering. We keep the unfiltered behavior by sending a constant block with `true` in it. Each agg detects this and takes an "unfiltered" path, preserving the original performance. Importantly, when you don't turn this on it doesn't effect performance: ``` (blockType) (grouping) (op) Score Error -> Score Error Units vector_longs none count 0.007 ± 0.001 -> 0.007 ± 0.001 ns/op vector_longs none min 0.123 ± 0.004 -> 0.128 ± 0.005 ns/op vector_longs longs count 4.311 ± 0.192 -> 4.218 ± 0.053 ns/op vector_longs longs min 5.476 ± 0.077 -> 5.451 ± 0.074 ns/op ```
1 parent d16a27e commit 05323e7

File tree

63 files changed

+2500
-179
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

63 files changed

+2500
-179
lines changed

x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/AggregatorImplementer.java

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -240,8 +240,10 @@ private TypeSpec type() {
240240
builder.addMethod(intermediateStateDesc());
241241
builder.addMethod(intermediateBlockCount());
242242
builder.addMethod(addRawInput());
243-
builder.addMethod(addRawVector());
244-
builder.addMethod(addRawBlock());
243+
builder.addMethod(addRawVector(false));
244+
builder.addMethod(addRawVector(true));
245+
builder.addMethod(addRawBlock(false));
246+
builder.addMethod(addRawBlock(true));
245247
builder.addMethod(addIntermediateInput());
246248
builder.addMethod(evaluateIntermediate());
247249
builder.addMethod(evaluateFinal());
@@ -345,22 +347,48 @@ private MethodSpec intermediateBlockCount() {
345347

346348
private MethodSpec addRawInput() {
347349
MethodSpec.Builder builder = MethodSpec.methodBuilder("addRawInput");
348-
builder.addAnnotation(Override.class).addModifiers(Modifier.PUBLIC).addParameter(PAGE, "page");
350+
builder.addAnnotation(Override.class).addModifiers(Modifier.PUBLIC).addParameter(PAGE, "page").addParameter(BOOLEAN_VECTOR, "mask");
349351
if (stateTypeHasFailed) {
350352
builder.beginControlFlow("if (state.failed())");
351353
builder.addStatement("return");
352354
builder.endControlFlow();
353355
}
356+
builder.beginControlFlow("if (mask.isConstant())");
357+
{
358+
builder.beginControlFlow("if (mask.getBoolean(0) == false)");
359+
{
360+
builder.addComment("Entire page masked away");
361+
builder.addStatement("return");
362+
}
363+
builder.endControlFlow();
364+
builder.addComment("No masking");
365+
builder.addStatement("$T block = page.getBlock(channels.get(0))", valueBlockType(init, combine));
366+
builder.addStatement("$T vector = block.asVector()", valueVectorType(init, combine));
367+
builder.beginControlFlow("if (vector != null)");
368+
builder.addStatement("addRawVector(vector)");
369+
builder.nextControlFlow("else");
370+
builder.addStatement("addRawBlock(block)");
371+
builder.endControlFlow();
372+
builder.addStatement("return");
373+
}
374+
builder.endControlFlow();
375+
builder.addComment("Some positions masked away, others kept");
354376
builder.addStatement("$T block = page.getBlock(channels.get(0))", valueBlockType(init, combine));
355377
builder.addStatement("$T vector = block.asVector()", valueVectorType(init, combine));
356-
builder.beginControlFlow("if (vector != null)").addStatement("addRawVector(vector)");
357-
builder.nextControlFlow("else").addStatement("addRawBlock(block)").endControlFlow();
378+
builder.beginControlFlow("if (vector != null)");
379+
builder.addStatement("addRawVector(vector, mask)");
380+
builder.nextControlFlow("else");
381+
builder.addStatement("addRawBlock(block, mask)");
382+
builder.endControlFlow();
358383
return builder.build();
359384
}
360385

361-
private MethodSpec addRawVector() {
386+
private MethodSpec addRawVector(boolean masked) {
362387
MethodSpec.Builder builder = MethodSpec.methodBuilder("addRawVector");
363388
builder.addModifiers(Modifier.PRIVATE).addParameter(valueVectorType(init, combine), "vector");
389+
if (masked) {
390+
builder.addParameter(BOOLEAN_VECTOR, "mask");
391+
}
364392

365393
if (stateTypeHasSeen) {
366394
builder.addStatement("state.seen(true)");
@@ -372,6 +400,9 @@ private MethodSpec addRawVector() {
372400

373401
builder.beginControlFlow("for (int i = 0; i < vector.getPositionCount(); i++)");
374402
{
403+
if (masked) {
404+
builder.beginControlFlow("if (mask.getBoolean(i) == false)").addStatement("continue").endControlFlow();
405+
}
375406
combineRawInput(builder, "vector");
376407
}
377408
builder.endControlFlow();
@@ -381,16 +412,22 @@ private MethodSpec addRawVector() {
381412
return builder.build();
382413
}
383414

384-
private MethodSpec addRawBlock() {
415+
private MethodSpec addRawBlock(boolean masked) {
385416
MethodSpec.Builder builder = MethodSpec.methodBuilder("addRawBlock");
386417
builder.addModifiers(Modifier.PRIVATE).addParameter(valueBlockType(init, combine), "block");
418+
if (masked) {
419+
builder.addParameter(BOOLEAN_VECTOR, "mask");
420+
}
387421

388422
if (valuesIsBytesRef) {
389423
// Add bytes_ref scratch var that will only be used for bytes_ref blocks/vectors
390424
builder.addStatement("$T scratch = new $T()", BYTES_REF, BYTES_REF);
391425
}
392426
builder.beginControlFlow("for (int p = 0; p < block.getPositionCount(); p++)");
393427
{
428+
if (masked) {
429+
builder.beginControlFlow("if (mask.getBoolean(p) == false)").addStatement("continue").endControlFlow();
430+
}
394431
builder.beginControlFlow("if (block.isNull(p))");
395432
builder.addStatement("continue");
396433
builder.endControlFlow();

x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/CountDistinctBooleanAggregatorFunction.java

Lines changed: 44 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/CountDistinctBytesRefAggregatorFunction.java

Lines changed: 47 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/CountDistinctDoubleAggregatorFunction.java

Lines changed: 45 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)