Skip to content

Commit fbeed15

Browse files
authored
Optimize ordinal inputs in Values aggregation (#127849) (#129009)
Currently, time-series aggregations use the `values` aggregation to collect dimension values. While we might introduce a specialized aggregation for this in the future, for now, we are using `values`, and the inputs are likely ordinal blocks. This change speeds up the `values` aggregation when the inputs are ordinal-based. Execution time reduced from 461ms to 192ms for 1000 groups. ``` ValuesAggregatorBenchmark.run BytesRef 10000 avgt 7 461.938 ± 6.089 ms/op ValuesAggregatorBenchmark.run BytesRef 10000 avgt 7 192.898 ± 1.781 ms/op ```
1 parent 4dd371d commit fbeed15

File tree

8 files changed

+250
-19
lines changed

8 files changed

+250
-19
lines changed

benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesAggregatorBenchmark.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,13 @@
2121
import org.elasticsearch.compute.data.Block;
2222
import org.elasticsearch.compute.data.BlockFactory;
2323
import org.elasticsearch.compute.data.BytesRefBlock;
24+
import org.elasticsearch.compute.data.BytesRefVector;
2425
import org.elasticsearch.compute.data.ElementType;
2526
import org.elasticsearch.compute.data.IntBlock;
27+
import org.elasticsearch.compute.data.IntVector;
2628
import org.elasticsearch.compute.data.LongBlock;
2729
import org.elasticsearch.compute.data.LongVector;
30+
import org.elasticsearch.compute.data.OrdinalBytesRefVector;
2831
import org.elasticsearch.compute.data.Page;
2932
import org.elasticsearch.compute.operator.AggregationOperator;
3033
import org.elasticsearch.compute.operator.DriverContext;
@@ -275,11 +278,18 @@ private static Block dataBlock(int groups, String dataType) {
275278
int blockLength = blockLength(groups);
276279
return switch (dataType) {
277280
case BYTES_REF -> {
278-
try (BytesRefBlock.Builder builder = blockFactory.newBytesRefBlockBuilder(blockLength)) {
281+
try (
282+
BytesRefVector.Builder dict = blockFactory.newBytesRefVectorBuilder(blockLength);
283+
IntVector.Builder ords = blockFactory.newIntVectorBuilder(blockLength)
284+
) {
285+
final int dictLength = Math.min(blockLength, KEYWORDS.length);
286+
for (int i = 0; i < dictLength; i++) {
287+
dict.appendBytesRef(KEYWORDS[i]);
288+
}
279289
for (int i = 0; i < blockLength; i++) {
280-
builder.appendBytesRef(KEYWORDS[i % KEYWORDS.length]);
290+
ords.appendInt(i % dictLength);
281291
}
282-
yield builder.build();
292+
yield new OrdinalBytesRefVector(ords.build(), dict.build()).asBlock();
283293
}
284294
}
285295
case INT -> {

docs/changelog/127849.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 127849
2+
summary: Optimize ordinal inputs in Values aggregation
3+
area: "ES|QL"
4+
type: enhancement
5+
issues: []

x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/GroupingAggregatorImplementer.java

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535

3636
import static java.util.stream.Collectors.joining;
3737
import static org.elasticsearch.compute.gen.AggregatorImplementer.capitalize;
38+
import static org.elasticsearch.compute.gen.Methods.optionalStaticMethod;
3839
import static org.elasticsearch.compute.gen.Methods.requireAnyArgs;
3940
import static org.elasticsearch.compute.gen.Methods.requireAnyType;
4041
import static org.elasticsearch.compute.gen.Methods.requireArgs;
@@ -332,10 +333,32 @@ private MethodSpec prepareProcessPage() {
332333
builder.beginControlFlow("if (valuesBlock.mayHaveNulls())");
333334
builder.addStatement("state.enableGroupIdTracking(seenGroupIds)");
334335
builder.endControlFlow();
335-
builder.addStatement("return $L", addInput(b -> b.addStatement("addRawInput(positionOffset, groupIds, valuesBlock$L)", extra)));
336+
if (shouldWrapAddInput(blockType(aggParam.type()))) {
337+
builder.addStatement(
338+
"var addInput = $L",
339+
addInput(b -> b.addStatement("addRawInput(positionOffset, groupIds, valuesBlock$L)", extra))
340+
);
341+
builder.addStatement("return $T.wrapAddInput(addInput, state, valuesBlock)", declarationType);
342+
} else {
343+
builder.addStatement(
344+
"return $L",
345+
addInput(b -> b.addStatement("addRawInput(positionOffset, groupIds, valuesBlock$L)", extra))
346+
);
347+
}
336348
}
337349
builder.endControlFlow();
338-
builder.addStatement("return $L", addInput(b -> b.addStatement("addRawInput(positionOffset, groupIds, valuesVector$L)", extra)));
350+
if (shouldWrapAddInput(vectorType(aggParam.type()))) {
351+
builder.addStatement(
352+
"var addInput = $L",
353+
addInput(b -> b.addStatement("addRawInput(positionOffset, groupIds, valuesVector$L)", extra))
354+
);
355+
builder.addStatement("return $T.wrapAddInput(addInput, state, valuesVector)", declarationType);
356+
} else {
357+
builder.addStatement(
358+
"return $L",
359+
addInput(b -> b.addStatement("addRawInput(positionOffset, groupIds, valuesVector$L)", extra))
360+
);
361+
}
339362
return builder.build();
340363
}
341364

@@ -525,6 +548,15 @@ private void combineRawInputForArray(MethodSpec.Builder builder, String arrayVar
525548
warningsBlock(builder, () -> builder.addStatement("$T.combine(state, groupId, $L)", declarationType, arrayVariable));
526549
}
527550

551+
private boolean shouldWrapAddInput(TypeName valuesType) {
552+
return optionalStaticMethod(
553+
declarationType,
554+
requireType(GROUPING_AGGREGATOR_FUNCTION_ADD_INPUT),
555+
requireName("wrapAddInput"),
556+
requireArgs(requireType(GROUPING_AGGREGATOR_FUNCTION_ADD_INPUT), requireType(aggState.declaredType()), requireType(valuesType))
557+
) != null;
558+
}
559+
528560
private void warningsBlock(MethodSpec.Builder builder, Runnable block) {
529561
if (warnExceptions.isEmpty() == false) {
530562
builder.beginControlFlow("try");

x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/Methods.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -59,20 +59,31 @@ static ExecutableElement requireStaticMethod(
5959
TypeMatcher returnTypeMatcher,
6060
NameMatcher nameMatcher,
6161
ArgumentMatcher argumentMatcher
62+
) {
63+
ExecutableElement method = optionalStaticMethod(declarationType, returnTypeMatcher, nameMatcher, argumentMatcher);
64+
if (method == null) {
65+
var message = nameMatcher.names.size() == 1 ? "Requires method: " : "Requires one of methods: ";
66+
var signatures = nameMatcher.names.stream()
67+
.map(name -> "public static " + returnTypeMatcher + " " + declarationType + "#" + name + "(" + argumentMatcher + ")")
68+
.collect(joining(" or "));
69+
throw new IllegalArgumentException(message + signatures);
70+
}
71+
return method;
72+
}
73+
74+
static ExecutableElement optionalStaticMethod(
75+
TypeElement declarationType,
76+
TypeMatcher returnTypeMatcher,
77+
NameMatcher nameMatcher,
78+
ArgumentMatcher argumentMatcher
6279
) {
6380
return typeAndSuperType(declarationType).flatMap(type -> ElementFilter.methodsIn(type.getEnclosedElements()).stream())
6481
.filter(method -> method.getModifiers().contains(Modifier.STATIC))
6582
.filter(method -> nameMatcher.test(method.getSimpleName().toString()))
6683
.filter(method -> returnTypeMatcher.test(TypeName.get(method.getReturnType())))
6784
.filter(method -> argumentMatcher.test(method.getParameters().stream().map(it -> TypeName.get(it.asType())).toList()))
6885
.findFirst()
69-
.orElseThrow(() -> {
70-
var message = nameMatcher.names.size() == 1 ? "Requires method: " : "Requires one of methods: ";
71-
var signatures = nameMatcher.names.stream()
72-
.map(name -> "public static " + returnTypeMatcher + " " + declarationType + "#" + name + "(" + argumentMatcher + ")")
73-
.collect(joining(" or "));
74-
return new IllegalArgumentException(message + signatures);
75-
});
86+
.orElse(null);
7687
}
7788

7889
static NameMatcher requireName(String... names) {

x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesBytesRefAggregator.java

Lines changed: 19 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/ValuesBytesRefGroupingAggregatorFunction.java

Lines changed: 4 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute.aggregation;
9+
10+
import org.apache.lucene.util.BytesRef;
11+
import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
12+
import org.elasticsearch.compute.data.BytesRefBlock;
13+
import org.elasticsearch.compute.data.BytesRefVector;
14+
import org.elasticsearch.compute.data.IntBlock;
15+
import org.elasticsearch.compute.data.IntVector;
16+
import org.elasticsearch.compute.data.OrdinalBytesRefBlock;
17+
import org.elasticsearch.core.Releasables;
18+
19+
final class ValuesBytesRefAggregators {
20+
static GroupingAggregatorFunction.AddInput wrapAddInput(
21+
GroupingAggregatorFunction.AddInput delegate,
22+
ValuesBytesRefAggregator.GroupingState state,
23+
BytesRefBlock values
24+
) {
25+
OrdinalBytesRefBlock valuesOrdinal = values.asOrdinals();
26+
if (valuesOrdinal == null) {
27+
return delegate;
28+
}
29+
BytesRefVector dict = valuesOrdinal.getDictionaryVector();
30+
final IntVector hashIds;
31+
BytesRef spare = new BytesRef();
32+
try (var hashIdsBuilder = values.blockFactory().newIntVectorFixedBuilder(dict.getPositionCount())) {
33+
for (int p = 0; p < dict.getPositionCount(); p++) {
34+
hashIdsBuilder.appendInt(Math.toIntExact(BlockHash.hashOrdToGroup(state.bytes.add(dict.getBytesRef(p, spare)))));
35+
}
36+
hashIds = hashIdsBuilder.build();
37+
}
38+
IntBlock ordinalIds = valuesOrdinal.getOrdinalsBlock();
39+
return new GroupingAggregatorFunction.AddInput() {
40+
@Override
41+
public void add(int positionOffset, IntBlock groupIds) {
42+
for (int groupPosition = 0; groupPosition < groupIds.getPositionCount(); groupPosition++) {
43+
if (groupIds.isNull(groupPosition)) {
44+
continue;
45+
}
46+
int groupStart = groupIds.getFirstValueIndex(groupPosition);
47+
int groupEnd = groupStart + groupIds.getValueCount(groupPosition);
48+
for (int g = groupStart; g < groupEnd; g++) {
49+
int groupId = groupIds.getInt(g);
50+
if (ordinalIds.isNull(groupPosition + positionOffset)) {
51+
continue;
52+
}
53+
int valuesStart = ordinalIds.getFirstValueIndex(groupPosition + positionOffset);
54+
int valuesEnd = valuesStart + ordinalIds.getValueCount(groupPosition + positionOffset);
55+
for (int v = valuesStart; v < valuesEnd; v++) {
56+
state.values.add(groupId, hashIds.getInt(ordinalIds.getInt(v)));
57+
}
58+
}
59+
}
60+
}
61+
62+
@Override
63+
public void add(int positionOffset, IntVector groupIds) {
64+
for (int groupPosition = 0; groupPosition < groupIds.getPositionCount(); groupPosition++) {
65+
int groupId = groupIds.getInt(groupPosition);
66+
if (ordinalIds.isNull(groupPosition + positionOffset)) {
67+
continue;
68+
}
69+
int valuesStart = ordinalIds.getFirstValueIndex(groupPosition + positionOffset);
70+
int valuesEnd = valuesStart + ordinalIds.getValueCount(groupPosition + positionOffset);
71+
for (int v = valuesStart; v < valuesEnd; v++) {
72+
state.values.add(groupId, hashIds.getInt(ordinalIds.getInt(v)));
73+
}
74+
}
75+
}
76+
77+
@Override
78+
public void close() {
79+
Releasables.close(hashIds, delegate);
80+
}
81+
};
82+
}
83+
84+
static GroupingAggregatorFunction.AddInput wrapAddInput(
85+
GroupingAggregatorFunction.AddInput delegate,
86+
ValuesBytesRefAggregator.GroupingState state,
87+
BytesRefVector values
88+
) {
89+
var valuesOrdinal = values.asOrdinals();
90+
if (valuesOrdinal == null) {
91+
return delegate;
92+
}
93+
BytesRefVector dict = valuesOrdinal.getDictionaryVector();
94+
final IntVector hashIds;
95+
BytesRef spare = new BytesRef();
96+
try (var hashIdsBuilder = values.blockFactory().newIntVectorFixedBuilder(dict.getPositionCount())) {
97+
for (int p = 0; p < dict.getPositionCount(); p++) {
98+
hashIdsBuilder.appendInt(Math.toIntExact(BlockHash.hashOrdToGroup(state.bytes.add(dict.getBytesRef(p, spare)))));
99+
}
100+
hashIds = hashIdsBuilder.build();
101+
}
102+
var ordinalIds = valuesOrdinal.getOrdinalsVector();
103+
return new GroupingAggregatorFunction.AddInput() {
104+
@Override
105+
public void add(int positionOffset, IntBlock groupIds) {
106+
for (int groupPosition = 0; groupPosition < groupIds.getPositionCount(); groupPosition++) {
107+
if (groupIds.isNull(groupPosition)) {
108+
continue;
109+
}
110+
int groupStart = groupIds.getFirstValueIndex(groupPosition);
111+
int groupEnd = groupStart + groupIds.getValueCount(groupPosition);
112+
for (int g = groupStart; g < groupEnd; g++) {
113+
int groupId = groupIds.getInt(g);
114+
state.values.add(groupId, hashIds.getInt(ordinalIds.getInt(groupPosition + positionOffset)));
115+
}
116+
}
117+
}
118+
119+
@Override
120+
public void add(int positionOffset, IntVector groupIds) {
121+
for (int groupPosition = 0; groupPosition < groupIds.getPositionCount(); groupPosition++) {
122+
int groupId = groupIds.getInt(groupPosition);
123+
state.values.add(groupId, hashIds.getInt(ordinalIds.getInt(groupPosition + positionOffset)));
124+
}
125+
}
126+
127+
@Override
128+
public void close() {
129+
Releasables.close(hashIds, delegate);
130+
}
131+
};
132+
}
133+
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-ValuesAggregator.java.st

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,10 @@ import org.elasticsearch.compute.ann.GroupingAggregator;
2828
import org.elasticsearch.compute.ann.IntermediateState;
2929
import org.elasticsearch.compute.data.Block;
3030
import org.elasticsearch.compute.data.BlockFactory;
31-
$if(int||double||float||BytesRef)$
31+
$if(BytesRef)$
32+
import org.elasticsearch.compute.data.$Type$Block;
33+
import org.elasticsearch.compute.data.$Type$Vector;
34+
$elseif(int||double||float)$
3235
import org.elasticsearch.compute.data.$Type$Block;
3336
$endif$
3437
import org.elasticsearch.compute.data.IntVector;
@@ -87,6 +90,24 @@ $endif$
8790
return new GroupingState(bigArrays);
8891
}
8992

93+
$if(BytesRef)$
94+
public static GroupingAggregatorFunction.AddInput wrapAddInput(
95+
GroupingAggregatorFunction.AddInput delegate,
96+
GroupingState state,
97+
BytesRefBlock values
98+
) {
99+
return ValuesBytesRefAggregators.wrapAddInput(delegate, state, values);
100+
}
101+
102+
public static GroupingAggregatorFunction.AddInput wrapAddInput(
103+
GroupingAggregatorFunction.AddInput delegate,
104+
GroupingState state,
105+
BytesRefVector values
106+
) {
107+
return ValuesBytesRefAggregators.wrapAddInput(delegate, state, values);
108+
}
109+
$endif$
110+
90111
public static void combine(GroupingState state, int groupId, $type$ v) {
91112
$if(long)$
92113
state.values.add(groupId, v);
@@ -234,8 +255,8 @@ $if(long||double)$
234255
private final LongLongHash values;
235256

236257
$elseif(BytesRef)$
237-
private final LongLongHash values;
238-
private final BytesRefHash bytes;
258+
final LongLongHash values;
259+
BytesRefHash bytes;
239260

240261
$elseif(int||float)$
241262
private final LongHash values;

0 commit comments

Comments
 (0)