Skip to content

Commit 3b784b8

Browse files
committed
ESQL: Fix a bug in TOP
Fix a bug in TOP which surfaces when merging results from ordinals. We weren't always accounting for oversized arrays when checking if we'd ever seen a field. The test for this required random `bucketSize` - without that the oversizing frequently wouldn't cause trouble.
1 parent 623a6af commit 3b784b8

File tree

2 files changed

+38
-2
lines changed

2 files changed

+38
-2
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/sort/BytesRefBucketedSort.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,13 +143,13 @@ public void collect(BytesRef value, int bucket) {
143143
*/
144144
public void merge(int bucket, BytesRefBucketedSort other, int otherBucket) {
145145
long otherRootIndex = other.common.rootIndex(otherBucket);
146-
if (otherRootIndex >= other.values.size()) {
146+
long otherEnd = other.common.endIndex(otherRootIndex);
147+
if (otherEnd >= other.values.size()) {
147148
// The value was never collected.
148149
return;
149150
}
150151
other.checkInvariant(otherBucket);
151152
long otherStart = other.startIndex(otherBucket, otherRootIndex);
152-
long otherEnd = other.common.endIndex(otherRootIndex);
153153
// TODO: This can be improved for heapified buckets by making use of the heap structures
154154
for (long i = otherStart; i < otherEnd; i++) {
155155
collect(other.values.get(i).bytesRefView(), bucket);

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/sort/BucketedSortTestCase.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,42 @@ public final void testMergeThisBigger() {
409409
}
410410
}
411411

412+
public final void testMergePastEnd() {
413+
int buckets = 10000;
414+
int bucketSize = between(1, 1000);
415+
int target = between(0, buckets);
416+
List<V> values = randomList(buckets, buckets, this::randomValue);
417+
Collections.sort(values);
418+
try (T sort = build(SortOrder.ASC, bucketSize)) {
419+
// Add a single value to the main sort.
420+
for (int b = 0; b < buckets; b++) {
421+
collect(sort, values.get(b), b);
422+
}
423+
424+
try (T other = build(SortOrder.ASC, bucketSize)) {
425+
// Add *all* values to the target bucket of the secondary sort.
426+
for (int i = 0; i < values.size(); i++) {
427+
if (i != target) {
428+
collect(other, values.get(i), target);
429+
}
430+
}
431+
432+
// Merge all buckets pairwise. Most of the secondary ones are empty.
433+
for (int b = 0; b < buckets; b++) {
434+
merge(sort, b, other, b);
435+
}
436+
}
437+
438+
for (int b = 0; b < buckets; b++) {
439+
if (b == target) {
440+
assertBlock(sort, b, values.subList(0, bucketSize));
441+
} else {
442+
assertBlock(sort, b, List.of(values.get(b)));
443+
}
444+
}
445+
}
446+
}
447+
412448
protected void assertBlock(T sort, int groupId, List<V> values) {
413449
var blockFactory = TestBlockFactory.getNonBreakingInstance();
414450

0 commit comments

Comments
 (0)