Skip to content

Commit fd313fb

Browse files
committed
ESQL: Fix a bug in TOP (elastic#121552)
Fix a bug in TOP which surfaces when merging results from ordinals. We weren't always accounting for oversized arrays when checking if we'd ever seen a field. This changes the oversize itself to always size on a bucket boundary. The test for this required random `bucketSize` - without that the oversizing frequently wouldn't cause trouble.
1 parent 4efc72b commit fd313fb

File tree

9 files changed

+146
-40
lines changed

9 files changed

+146
-40
lines changed

docs/changelog/121552.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 121552
2+
summary: Fix a bug in TOP
3+
area: ES|QL
4+
type: bug
5+
issues: []

x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/sort/DoubleBucketedSort.java

Lines changed: 13 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/sort/FloatBucketedSort.java

Lines changed: 13 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/sort/IntBucketedSort.java

Lines changed: 13 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/sort/LongBucketedSort.java

Lines changed: 13 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/sort/BytesRefBucketedSort.java

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@
88
package org.elasticsearch.compute.data.sort;
99

1010
import org.apache.lucene.util.BytesRef;
11+
import org.apache.lucene.util.RamUsageEstimator;
1112
import org.elasticsearch.common.breaker.CircuitBreaker;
1213
import org.elasticsearch.common.util.BigArrays;
1314
import org.elasticsearch.common.util.ByteUtils;
1415
import org.elasticsearch.common.util.ObjectArray;
16+
import org.elasticsearch.common.util.PageCacheRecycler;
1517
import org.elasticsearch.compute.data.Block;
1618
import org.elasticsearch.compute.data.BlockFactory;
1719
import org.elasticsearch.compute.data.IntVector;
@@ -29,6 +31,11 @@
2931
/**
3032
* Aggregates the top N variable length {@link BytesRef} values per bucket.
3133
* See {@link BucketedSort} for more information.
34+
* <p>
35+
* This is substantially different from {@link IpBucketedSort} because
36+
* this has to handle variable length byte strings. To do that it allocates
37+
* a heap of {@link BreakingBytesRefBuilder}s.
38+
* </p>
3239
*/
3340
public class BytesRefBucketedSort implements Releasable {
3441
private final BucketedSortCommon common;
@@ -123,7 +130,7 @@ public void collect(BytesRef value, int bucket) {
123130
// Gathering mode
124131
long requiredSize = common.endIndex(rootIndex);
125132
if (values.size() < requiredSize) {
126-
grow(requiredSize);
133+
grow(bucket);
127134
}
128135
int next = getNextGatherOffset(rootIndex);
129136
common.assertValidNextOffset(next);
@@ -271,13 +278,23 @@ private void swap(long lhs, long rhs) {
271278

272279
/**
273280
* Allocate storage for more buckets and store the "next gather offset"
274-
* for those new buckets.
281+
* for those new buckets. We always grow the storage by whole bucket's
282+
* worth of slots at a time. We never allocate space for partial buckets.
275283
*/
276-
private void grow(long requiredSize) {
284+
private void grow(int bucket) {
277285
long oldMax = values.size();
278-
values = common.bigArrays.grow(values, requiredSize);
286+
assert oldMax % common.bucketSize == 0;
287+
288+
long newSize = BigArrays.overSize(
289+
((long) bucket + 1) * common.bucketSize,
290+
PageCacheRecycler.OBJECT_PAGE_SIZE,
291+
RamUsageEstimator.NUM_BYTES_OBJECT_REF
292+
);
293+
// Round up to the next full bucket.
294+
newSize = (newSize + common.bucketSize - 1) / common.bucketSize;
295+
values = common.bigArrays.resize(values, newSize * common.bucketSize);
279296
// Set the next gather offsets for all newly allocated buckets.
280-
fillGatherOffsets(oldMax - (oldMax % common.bucketSize));
297+
fillGatherOffsets(oldMax);
281298
}
282299

283300
/**
@@ -296,6 +313,7 @@ private void fillGatherOffsets(long startingAt) {
296313
bytes.grow(Integer.BYTES);
297314
bytes.setLength(Integer.BYTES);
298315
ByteUtils.writeIntLE(nextOffset, bytes.bytes(), 0);
316+
checkInvariant(Math.toIntExact(bucketRoot / common.bucketSize));
299317
}
300318
}
301319

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/sort/IpBucketedSort.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.elasticsearch.common.util.BigArrays;
1212
import org.elasticsearch.common.util.ByteArray;
1313
import org.elasticsearch.common.util.ByteUtils;
14+
import org.elasticsearch.common.util.PageCacheRecycler;
1415
import org.elasticsearch.compute.data.Block;
1516
import org.elasticsearch.compute.data.BlockFactory;
1617
import org.elasticsearch.compute.data.IntVector;
@@ -26,6 +27,11 @@
2627
/**
2728
* Aggregates the top N IP values per bucket.
2829
* See {@link BucketedSort} for more information.
30+
* <p>
31+
* This is substantially different from {@link BytesRefBucketedSort} because
32+
* this takes advantage of IPs having a fixed length and allocates a dense
33+
* storage for them.
34+
* </p>
2935
*/
3036
public class IpBucketedSort implements Releasable {
3137
private static final int IP_LENGTH = 16; // Bytes. It's ipv6.
@@ -101,7 +107,7 @@ public void collect(BytesRef value, int bucket) {
101107
// Gathering mode
102108
long requiredSize = common.endIndex(rootIndex) * IP_LENGTH;
103109
if (values.size() < requiredSize) {
104-
grow(requiredSize);
110+
grow(bucket);
105111
}
106112
int next = getNextGatherOffset(rootIndex);
107113
common.assertValidNextOffset(next);
@@ -268,17 +274,23 @@ private void swap(long lhs, long rhs) {
268274
* Allocate storage for more buckets and store the "next gather offset"
269275
* for those new buckets.
270276
*/
271-
private void grow(long minSize) {
277+
private void grow(int bucket) {
272278
long oldMax = values.size() / IP_LENGTH;
273-
values = common.bigArrays.grow(values, minSize);
279+
assert oldMax % common.bucketSize == 0;
280+
281+
int bucketBytes = common.bucketSize * IP_LENGTH;
282+
long newSize = BigArrays.overSize(((long) bucket + 1) * bucketBytes, PageCacheRecycler.BYTE_PAGE_SIZE, 1);
283+
// Round up to the next full bucket.
284+
newSize = (newSize + bucketBytes - 1) / bucketBytes;
285+
values = common.bigArrays.resize(values, newSize * bucketBytes);
274286
// Set the next gather offsets for all newly allocated buckets.
275-
setNextGatherOffsets(oldMax - (oldMax % common.bucketSize));
287+
fillGatherOffsets(oldMax);
276288
}
277289

278290
/**
279291
* Maintain the "next gather offsets" for newly allocated buckets.
280292
*/
281-
private void setNextGatherOffsets(long startingAt) {
293+
private void fillGatherOffsets(long startingAt) {
282294
int nextOffset = common.bucketSize - 1;
283295
for (long bucketRoot = startingAt; bucketRoot < values.size() / IP_LENGTH; bucketRoot += common.bucketSize) {
284296
setNextGatherOffset(bucketRoot, nextOffset);

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/sort/X-BucketedSort.java.st

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ package org.elasticsearch.compute.data.sort;
1010
import org.elasticsearch.common.util.BigArrays;
1111
import org.elasticsearch.common.util.BitArray;
1212
import org.elasticsearch.common.util.$Type$Array;
13+
import org.elasticsearch.common.util.PageCacheRecycler;
1314
import org.elasticsearch.compute.data.Block;
1415
import org.elasticsearch.compute.data.BlockFactory;
1516
import org.elasticsearch.compute.data.IntVector;
@@ -101,7 +102,7 @@ public class $Type$BucketedSort implements Releasable {
101102
// Gathering mode
102103
long requiredSize = rootIndex + bucketSize;
103104
if (values.size() < requiredSize) {
104-
grow(requiredSize);
105+
grow(bucket);
105106
}
106107
int next = getNextGatherOffset(rootIndex);
107108
assert 0 <= next && next < bucketSize
@@ -261,19 +262,25 @@ $endif$
261262

262263
/**
263264
* Allocate storage for more buckets and store the "next gather offset"
264-
* for those new buckets.
265+
* for those new buckets. We always grow the storage by whole bucket's
266+
* worth of slots at a time. We never allocate space for partial buckets.
265267
*/
266-
private void grow(long minSize) {
268+
private void grow(int bucket) {
267269
long oldMax = values.size();
268-
values = bigArrays.grow(values, minSize);
270+
assert oldMax % bucketSize == 0;
271+
272+
long newSize = BigArrays.overSize(((long) bucket + 1) * bucketSize, PageCacheRecycler.$TYPE$_PAGE_SIZE, $BYTES$);
273+
// Round up to the next full bucket.
274+
newSize = (newSize + bucketSize - 1) / bucketSize;
275+
values = bigArrays.resize(values, newSize * bucketSize);
269276
// Set the next gather offsets for all newly allocated buckets.
270-
setNextGatherOffsets(oldMax - (oldMax % getBucketSize()));
277+
fillGatherOffsets(oldMax);
271278
}
272279

273280
/**
274281
* Maintain the "next gather offsets" for newly allocated buckets.
275282
*/
276-
private void setNextGatherOffsets(long startingAt) {
283+
private void fillGatherOffsets(long startingAt) {
277284
int nextOffset = getBucketSize() - 1;
278285
for (long bucketRoot = startingAt; bucketRoot < values.size(); bucketRoot += getBucketSize()) {
279286
setNextGatherOffset(bucketRoot, nextOffset);

0 commit comments

Comments
 (0)