Skip to content

Commit 29fa4d1

Browse files
committed
Fix SeenGroupIds and added extra tests for it
1 parent c9635f9 commit 29fa4d1

File tree

3 files changed

+119
-12
lines changed

3 files changed

+119
-12
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/LongTopNBlockHash.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,18 @@ public IntVector nonEmpty() {
289289

290290
@Override
291291
public BitArray seenGroupIds(BigArrays bigArrays) {
292-
return new Range(hasNull ? 0 : 1, Math.toIntExact(hash.size() + 1)).seenGroupIds(bigArrays);
292+
BitArray seenGroups = new BitArray(111, bigArrays);
293+
if (hasNull) {
294+
seenGroups.set(0);
295+
}
296+
// TODO: Can we instead iterate the top and take the ids from the hash? To avoid checking unused values
297+
for (int i = 1; i < hash.size() + 1; i++) {
298+
long value = hash.get(i - 1);
299+
if (isInTop(value)) {
300+
seenGroups.set(i);
301+
}
302+
}
303+
return seenGroups;
293304
}
294305

295306
@Override

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/BlockHashTestCase.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.elasticsearch.common.breaker.CircuitBreaker;
1212
import org.elasticsearch.common.unit.ByteSizeValue;
1313
import org.elasticsearch.common.util.BigArrays;
14+
import org.elasticsearch.common.util.BitArray;
1415
import org.elasticsearch.common.util.MockBigArrays;
1516
import org.elasticsearch.common.util.PageCacheRecycler;
1617
import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction;
@@ -108,6 +109,7 @@ public void close() {
108109
fail("hashes should not close AddInput");
109110
}
110111
});
112+
assertSeenGroupIdsAndNonEmpty(blockHash);
111113
if (blockHash instanceof LongLongBlockHash == false
112114
&& blockHash instanceof BytesRefLongBlockHash == false
113115
&& blockHash instanceof BytesRef2BlockHash == false
@@ -127,6 +129,21 @@ public void close() {
127129
}
128130
}
129131

132+
private static void assertSeenGroupIdsAndNonEmpty(BlockHash blockHash) {
133+
try (BitArray seenGroupIds = blockHash.seenGroupIds(BigArrays.NON_RECYCLING_INSTANCE); IntVector nonEmpty = blockHash.nonEmpty()) {
134+
assertThat(
135+
"seenGroupIds cardinality doesn't match with nonEmpty size",
136+
seenGroupIds.cardinality(),
137+
equalTo((long) nonEmpty.getPositionCount())
138+
);
139+
140+
for (int position = 0; position < nonEmpty.getPositionCount(); position++) {
141+
int groupId = nonEmpty.getInt(position);
142+
assertThat("group " + groupId + " from nonEmpty isn't set in seenGroupIds", seenGroupIds.get(groupId), is(true));
143+
}
144+
}
145+
}
146+
130147
protected void assertOrds(IntBlock ordsBlock, Integer... expectedOrds) {
131148
assertOrds(ordsBlock, Arrays.stream(expectedOrds).map(l -> l == null ? null : new int[] { l }).toArray(int[][]::new));
132149
}

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/TopNBlockHashTests.java

Lines changed: 90 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@
1919
import org.elasticsearch.core.Releasables;
2020

2121
import java.util.ArrayList;
22+
import java.util.Arrays;
2223
import java.util.List;
2324
import java.util.function.Consumer;
2425

2526
import static org.hamcrest.Matchers.equalTo;
27+
import static org.hamcrest.Matchers.greaterThan;
2628

2729
public class TopNBlockHashTests extends BlockHashTestCase {
2830

@@ -73,7 +75,7 @@ public void testLongHash() {
7375
} else {
7476
assertThat(
7577
ordsAndKeys.description(),
76-
equalTo("LongTopNBlockHash{channel=0, " + topNParametersString(4) + ", hasNull=false}")
78+
equalTo("LongTopNBlockHash{channel=0, " + topNParametersString(4, 0) + ", hasNull=false}")
7779
);
7880
if (limit == LIMIT_HIGH) {
7981
assertKeys(ordsAndKeys.keys(), 2L, 1L, 4L, 3L);
@@ -94,6 +96,40 @@ public void testLongHash() {
9496
}, blockFactory.newLongArrayVector(values, values.length).asBlock());
9597
}
9698

99+
public void testLongHashBatched() {
100+
long[][] arrays = { new long[] { 2, 1, 4, 2 }, new long[] { 4, 1, 3, 4 } };
101+
102+
hashBatchesCallbackOnLast(ordsAndKeys -> {
103+
if (forcePackedHash) {
104+
// TODO: Not tested yet
105+
} else {
106+
assertThat(
107+
ordsAndKeys.description(),
108+
equalTo("LongTopNBlockHash{channel=0, " + topNParametersString(4, asc ? 0 : 1) + ", hasNull=false}")
109+
);
110+
if (limit == LIMIT_HIGH) {
111+
assertKeys(ordsAndKeys.keys(), 2L, 1L, 4L, 3L);
112+
assertOrds(ordsAndKeys.ords(), 3, 2, 4, 3);
113+
assertThat(ordsAndKeys.nonEmpty(), equalTo(intRange(1, 5)));
114+
} else {
115+
if (asc) {
116+
assertKeys(ordsAndKeys.keys(), 2L, 1L);
117+
assertOrds(ordsAndKeys.ords(), null, 2, null, null);
118+
assertThat(ordsAndKeys.nonEmpty(), equalTo(intVector(1, 2)));
119+
} else {
120+
assertKeys(ordsAndKeys.keys(), 4L, 3L);
121+
assertOrds(ordsAndKeys.ords(), 2, null, 3, 2);
122+
assertThat(ordsAndKeys.nonEmpty(), equalTo(intVector(2, 3)));
123+
}
124+
}
125+
}
126+
},
127+
Arrays.stream(arrays)
128+
.map(array -> new Block[] { blockFactory.newLongArrayVector(array, array.length).asBlock() })
129+
.toArray(Block[][]::new)
130+
);
131+
}
132+
97133
public void testLongHashWithNulls() {
98134
try (LongBlock.Builder builder = blockFactory.newLongBlockBuilder(4)) {
99135
builder.appendLong(0);
@@ -111,7 +147,7 @@ public void testLongHashWithNulls() {
111147
ordsAndKeys.description(),
112148
equalTo(
113149
"LongTopNBlockHash{channel=0, "
114-
+ topNParametersString(hasTwoNonNullValues ? 2 : 1)
150+
+ topNParametersString(hasTwoNonNullValues ? 2 : 1, 0)
115151
+ ", hasNull="
116152
+ hasNull
117153
+ "}"
@@ -172,7 +208,7 @@ public void testLongHashWithMultiValuedFields() {
172208
if (limit == LIMIT_HIGH) {
173209
assertThat(
174210
ordsAndKeys.description(),
175-
equalTo("LongTopNBlockHash{channel=0, " + topNParametersString(3) + ", hasNull=true}")
211+
equalTo("LongTopNBlockHash{channel=0, " + topNParametersString(3, 0) + ", hasNull=true}")
176212
);
177213
assertOrds(
178214
ordsAndKeys.ords(),
@@ -188,7 +224,11 @@ public void testLongHashWithMultiValuedFields() {
188224
assertThat(
189225
ordsAndKeys.description(),
190226
equalTo(
191-
"LongTopNBlockHash{channel=0, " + topNParametersString(nullsFirst ? 1 : 2) + ", hasNull=" + nullsFirst + "}"
227+
"LongTopNBlockHash{channel=0, "
228+
+ topNParametersString(nullsFirst ? 1 : 2, 0)
229+
+ ", hasNull="
230+
+ nullsFirst
231+
+ "}"
192232
)
193233
);
194234
if (nullsFirst) {
@@ -279,12 +319,44 @@ private void hash(Consumer<OrdsAndKeys> callback, Block... values) {
279319
}
280320
}
281321

282-
private void hash(Consumer<OrdsAndKeys> callback, int emitBatchSize, Block.Builder... values) {
283-
Block[] blocks = Block.Builder.buildAll(values);
284-
try (BlockHash hash = buildBlockHash(emitBatchSize, blocks)) {
285-
hash(true, hash, callback, blocks);
322+
// TODO: Randomize this instead?
323+
/**
324+
* Hashes multiple separated batches of values.
325+
*
326+
* @param callback Callback with the OrdsAndKeys for the last batch
327+
*/
328+
private void hashBatchesCallbackOnLast(Consumer<OrdsAndKeys> callback, Block[]... batches) {
329+
// Ensure all batches share the same specs
330+
assertThat(batches.length, greaterThan(0));
331+
for (Block[] batch : batches) {
332+
assertThat(batch.length, equalTo(batches[0].length));
333+
for (int i = 0; i < batch.length; i++) {
334+
assertThat(batches[0][i].elementType(), equalTo(batch[i].elementType()));
335+
}
336+
}
337+
338+
boolean[] called = new boolean[] { false };
339+
try (BlockHash hash = buildBlockHash(16 * 1024, batches[0])) {
340+
for (Block[] batch : batches) {
341+
called[0] = false;
342+
hash(true, hash, ordsAndKeys -> {
343+
if (called[0]) {
344+
throw new IllegalStateException("hash produced more than one block");
345+
}
346+
called[0] = true;
347+
if (batch == batches[batches.length - 1]) {
348+
callback.accept(ordsAndKeys);
349+
}
350+
try (ReleasableIterator<IntBlock> lookup = hash.lookup(new Page(batch), ByteSizeValue.ofKb(between(1, 100)))) {
351+
assertThat(lookup.hasNext(), equalTo(true));
352+
try (IntBlock ords = lookup.next()) {
353+
assertThat(ords, equalTo(ordsAndKeys.ords()));
354+
}
355+
}
356+
}, batch);
357+
}
286358
} finally {
287-
Releasables.closeExpectNoException(blocks);
359+
Releasables.close(Arrays.stream(batches).flatMap(Arrays::stream).toList());
288360
}
289361
}
290362

@@ -304,7 +376,14 @@ private BlockHash buildBlockHash(int emitBatchSize, Block... values) {
304376
/**
305377
* Returns the common toString() part of the TopNBlockHash using the test parameters.
306378
*/
307-
private String topNParametersString(int differentValues) {
308-
return "asc=" + asc + ", nullsFirst=" + nullsFirst + ", limit=" + limit + ", entries=" + Math.min(differentValues, limit);
379+
private String topNParametersString(int differentValues, int unusedInsertedValues) {
380+
return "asc="
381+
+ asc
382+
+ ", nullsFirst="
383+
+ nullsFirst
384+
+ ", limit="
385+
+ limit
386+
+ ", entries="
387+
+ Math.min(differentValues, limit + unusedInsertedValues);
309388
}
310389
}

0 commit comments

Comments
 (0)