Skip to content

Commit 3e79b2a

Browse files
ivanceamridula-s109
authored andcommitted
ESQL: Skip unused STATS groups by adding a Top N BlockHash implementation (elastic#127148)
- Add a new `LongTopNBlockHash` implementation taking care of skipping unused values. - Add a `TopNUniqueSet` to take care of storing the top N values (without nulls). - Add a `TopNMultivalueDedupeLong` class helping with it (An adaptation of the existing `MultivalueDedupeLong`). - Add some tests to `HashAggregationOperator`. It wasn't changed much, but helps a bit with the E2E. - Add MicroBenchmarks for TopN groupings, to ensure we're actually improving things with this.
1 parent 1a9e672 commit 3e79b2a

File tree

19 files changed

+2290
-419
lines changed

19 files changed

+2290
-419
lines changed

benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/AggregatorBenchmark.java

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ public class AggregatorBenchmark {
7373
static final int BLOCK_LENGTH = 8 * 1024;
7474
private static final int OP_COUNT = 1024;
7575
private static final int GROUPS = 5;
76+
private static final int TOP_N_LIMIT = 3;
7677

7778
private static final BlockFactory blockFactory = BlockFactory.getInstance(
7879
new NoopCircuitBreaker("noop"),
@@ -90,6 +91,7 @@ public class AggregatorBenchmark {
9091
private static final String TWO_ORDINALS = "two_" + ORDINALS;
9192
private static final String LONGS_AND_BYTES_REFS = LONGS + "_and_" + BYTES_REFS;
9293
private static final String TWO_LONGS_AND_BYTES_REFS = "two_" + LONGS + "_and_" + BYTES_REFS;
94+
private static final String TOP_N_LONGS = "top_n_" + LONGS;
9395

9496
private static final String VECTOR_DOUBLES = "vector_doubles";
9597
private static final String HALF_NULL_DOUBLES = "half_null_doubles";
@@ -147,7 +149,8 @@ static void selfTest() {
147149
TWO_BYTES_REFS,
148150
TWO_ORDINALS,
149151
LONGS_AND_BYTES_REFS,
150-
TWO_LONGS_AND_BYTES_REFS }
152+
TWO_LONGS_AND_BYTES_REFS,
153+
TOP_N_LONGS }
151154
)
152155
public String grouping;
153156

@@ -161,8 +164,7 @@ static void selfTest() {
161164
public String filter;
162165

163166
private static Operator operator(DriverContext driverContext, String grouping, String op, String dataType, String filter) {
164-
165-
if (grouping.equals("none")) {
167+
if (grouping.equals(NONE)) {
166168
return new AggregationOperator(
167169
List.of(supplier(op, dataType, filter).aggregatorFactory(AggregatorMode.SINGLE, List.of(0)).apply(driverContext)),
168170
driverContext
@@ -188,6 +190,9 @@ private static Operator operator(DriverContext driverContext, String grouping, S
188190
new BlockHash.GroupSpec(1, ElementType.LONG),
189191
new BlockHash.GroupSpec(2, ElementType.BYTES_REF)
190192
);
193+
case TOP_N_LONGS -> List.of(
194+
new BlockHash.GroupSpec(0, ElementType.LONG, false, new BlockHash.TopNDef(0, true, true, TOP_N_LIMIT))
195+
);
191196
default -> throw new IllegalArgumentException("unsupported grouping [" + grouping + "]");
192197
};
193198
return new HashAggregationOperator(
@@ -271,10 +276,14 @@ private static void checkGrouped(String prefix, String grouping, String op, Stri
271276
case BOOLEANS -> 2;
272277
default -> GROUPS;
273278
};
279+
int availableGroups = switch (grouping) {
280+
case TOP_N_LONGS -> TOP_N_LIMIT;
281+
default -> groups;
282+
};
274283
switch (op) {
275284
case AVG -> {
276285
DoubleBlock dValues = (DoubleBlock) values;
277-
for (int g = 0; g < groups; g++) {
286+
for (int g = 0; g < availableGroups; g++) {
278287
long group = g;
279288
long sum = LongStream.range(0, BLOCK_LENGTH).filter(l -> l % groups == group).sum();
280289
long count = LongStream.range(0, BLOCK_LENGTH).filter(l -> l % groups == group).count();
@@ -286,7 +295,7 @@ private static void checkGrouped(String prefix, String grouping, String op, Stri
286295
}
287296
case COUNT -> {
288297
LongBlock lValues = (LongBlock) values;
289-
for (int g = 0; g < groups; g++) {
298+
for (int g = 0; g < availableGroups; g++) {
290299
long group = g;
291300
long expected = LongStream.range(0, BLOCK_LENGTH).filter(l -> l % groups == group).count() * opCount;
292301
if (lValues.getLong(g) != expected) {
@@ -296,7 +305,7 @@ private static void checkGrouped(String prefix, String grouping, String op, Stri
296305
}
297306
case COUNT_DISTINCT -> {
298307
LongBlock lValues = (LongBlock) values;
299-
for (int g = 0; g < groups; g++) {
308+
for (int g = 0; g < availableGroups; g++) {
300309
long group = g;
301310
long expected = LongStream.range(0, BLOCK_LENGTH).filter(l -> l % groups == group).distinct().count();
302311
long count = lValues.getLong(g);
@@ -310,15 +319,15 @@ private static void checkGrouped(String prefix, String grouping, String op, Stri
310319
switch (dataType) {
311320
case LONGS -> {
312321
LongBlock lValues = (LongBlock) values;
313-
for (int g = 0; g < groups; g++) {
322+
for (int g = 0; g < availableGroups; g++) {
314323
if (lValues.getLong(g) != (long) g) {
315324
throw new AssertionError(prefix + "expected [" + g + "] but was [" + lValues.getLong(g) + "]");
316325
}
317326
}
318327
}
319328
case DOUBLES -> {
320329
DoubleBlock dValues = (DoubleBlock) values;
321-
for (int g = 0; g < groups; g++) {
330+
for (int g = 0; g < availableGroups; g++) {
322331
if (dValues.getDouble(g) != (long) g) {
323332
throw new AssertionError(prefix + "expected [" + g + "] but was [" + dValues.getDouble(g) + "]");
324333
}
@@ -331,7 +340,7 @@ private static void checkGrouped(String prefix, String grouping, String op, Stri
331340
switch (dataType) {
332341
case LONGS -> {
333342
LongBlock lValues = (LongBlock) values;
334-
for (int g = 0; g < groups; g++) {
343+
for (int g = 0; g < availableGroups; g++) {
335344
long group = g;
336345
long expected = LongStream.range(0, BLOCK_LENGTH).filter(l -> l % groups == group).max().getAsLong();
337346
if (lValues.getLong(g) != expected) {
@@ -341,7 +350,7 @@ private static void checkGrouped(String prefix, String grouping, String op, Stri
341350
}
342351
case DOUBLES -> {
343352
DoubleBlock dValues = (DoubleBlock) values;
344-
for (int g = 0; g < groups; g++) {
353+
for (int g = 0; g < availableGroups; g++) {
345354
long group = g;
346355
long expected = LongStream.range(0, BLOCK_LENGTH).filter(l -> l % groups == group).max().getAsLong();
347356
if (dValues.getDouble(g) != expected) {
@@ -356,7 +365,7 @@ private static void checkGrouped(String prefix, String grouping, String op, Stri
356365
switch (dataType) {
357366
case LONGS -> {
358367
LongBlock lValues = (LongBlock) values;
359-
for (int g = 0; g < groups; g++) {
368+
for (int g = 0; g < availableGroups; g++) {
360369
long group = g;
361370
long expected = LongStream.range(0, BLOCK_LENGTH).filter(l -> l % groups == group).sum() * opCount;
362371
if (lValues.getLong(g) != expected) {
@@ -366,7 +375,7 @@ private static void checkGrouped(String prefix, String grouping, String op, Stri
366375
}
367376
case DOUBLES -> {
368377
DoubleBlock dValues = (DoubleBlock) values;
369-
for (int g = 0; g < groups; g++) {
378+
for (int g = 0; g < availableGroups; g++) {
370379
long group = g;
371380
long expected = LongStream.range(0, BLOCK_LENGTH).filter(l -> l % groups == group).sum() * opCount;
372381
if (dValues.getDouble(g) != expected) {
@@ -391,6 +400,14 @@ private static void checkGroupingBlock(String prefix, String grouping, Block blo
391400
}
392401
}
393402
}
403+
case TOP_N_LONGS -> {
404+
LongBlock groups = (LongBlock) block;
405+
for (int g = 0; g < TOP_N_LIMIT; g++) {
406+
if (groups.getLong(g) != (long) g) {
407+
throw new AssertionError(prefix + "bad group expected [" + g + "] but was [" + groups.getLong(g) + "]");
408+
}
409+
}
410+
}
394411
case INTS -> {
395412
IntBlock groups = (IntBlock) block;
396413
for (int g = 0; g < GROUPS; g++) {
@@ -495,7 +512,7 @@ private static void checkUngrouped(String prefix, String op, String dataType, Pa
495512

496513
private static Page page(BlockFactory blockFactory, String grouping, String blockType) {
497514
Block dataBlock = dataBlock(blockFactory, blockType);
498-
if (grouping.equals("none")) {
515+
if (grouping.equals(NONE)) {
499516
return new Page(dataBlock);
500517
}
501518
List<Block> blocks = groupingBlocks(grouping, blockType);
@@ -564,7 +581,7 @@ private static Block groupingBlock(String grouping, String blockType) {
564581
default -> throw new UnsupportedOperationException("bad grouping [" + grouping + "]");
565582
};
566583
return switch (grouping) {
567-
case LONGS -> {
584+
case TOP_N_LONGS, LONGS -> {
568585
var builder = blockFactory.newLongBlockBuilder(BLOCK_LENGTH);
569586
for (int i = 0; i < BLOCK_LENGTH; i++) {
570587
for (int v = 0; v < valuesPerGroup; v++) {

docs/changelog/127148.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 127148
2+
summary: Skip unused STATS groups by adding a Top N `BlockHash` implementation
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

server/src/main/java/org/elasticsearch/common/util/BinarySearcher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public abstract class BinarySearcher {
3838
/**
3939
* @return the index who's underlying value is closest to the value being searched for.
4040
*/
41-
private int getClosestIndex(int index1, int index2) {
41+
protected int getClosestIndex(int index1, int index2) {
4242
if (distance(index1) < distance(index2)) {
4343
return index1;
4444
} else {

x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/blockhash/BytesRefBlockHash.java

Lines changed: 9 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/blockhash/DoubleBlockHash.java

Lines changed: 9 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/blockhash/IntBlockHash.java

Lines changed: 9 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/blockhash/LongBlockHash.java

Lines changed: 9 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/BlockHash.java

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.compute.data.IntBlock;
2424
import org.elasticsearch.compute.data.IntVector;
2525
import org.elasticsearch.compute.data.Page;
26+
import org.elasticsearch.core.Nullable;
2627
import org.elasticsearch.core.Releasable;
2728
import org.elasticsearch.core.ReleasableIterator;
2829
import org.elasticsearch.index.analysis.AnalysisRegistry;
@@ -113,13 +114,30 @@ public abstract class BlockHash implements Releasable, SeenGroupIds {
113114
@Override
114115
public abstract BitArray seenGroupIds(BigArrays bigArrays);
115116

117+
/**
118+
* Configuration for a BlockHash group spec that is later sorted and limited (Top-N).
119+
* <p>
120+
* Part of a performance improvement to avoid aggregating groups that will not be used.
121+
* </p>
122+
*
123+
* @param order The order of this group in the sort, starting at 0
124+
* @param asc True if this group will be sorted ascending. False if descending.
125+
* @param nullsFirst True if the nulls should be the first elements in the TopN. False if they should be kept last.
126+
* @param limit The number of elements to keep, including nulls.
127+
*/
128+
public record TopNDef(int order, boolean asc, boolean nullsFirst, int limit) {}
129+
116130
/**
117131
* @param isCategorize Whether this group is a CATEGORIZE() or not.
118132
* May be changed in the future when more stateful grouping functions are added.
119133
*/
120-
public record GroupSpec(int channel, ElementType elementType, boolean isCategorize) {
134+
public record GroupSpec(int channel, ElementType elementType, boolean isCategorize, @Nullable TopNDef topNDef) {
121135
public GroupSpec(int channel, ElementType elementType) {
122-
this(channel, elementType, false);
136+
this(channel, elementType, false, null);
137+
}
138+
139+
public GroupSpec(int channel, ElementType elementType, boolean isCategorize) {
140+
this(channel, elementType, isCategorize, null);
123141
}
124142
}
125143

@@ -134,7 +152,12 @@ public GroupSpec(int channel, ElementType elementType) {
134152
*/
135153
public static BlockHash build(List<GroupSpec> groups, BlockFactory blockFactory, int emitBatchSize, boolean allowBrokenOptimizations) {
136154
if (groups.size() == 1) {
137-
return newForElementType(groups.get(0).channel(), groups.get(0).elementType(), blockFactory);
155+
GroupSpec group = groups.get(0);
156+
if (group.topNDef() != null && group.elementType() == ElementType.LONG) {
157+
TopNDef topNDef = group.topNDef();
158+
return new LongTopNBlockHash(group.channel(), topNDef.asc(), topNDef.nullsFirst(), topNDef.limit(), blockFactory);
159+
}
160+
return newForElementType(group.channel(), group.elementType(), blockFactory);
138161
}
139162
if (groups.stream().allMatch(g -> g.elementType == ElementType.BYTES_REF)) {
140163
switch (groups.size()) {

0 commit comments

Comments
 (0)