Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/121552.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 121552
summary: Fix a bug in TOP
area: ES|QL
type: bug
issues: []

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
package org.elasticsearch.compute.data.sort;

import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ByteUtils;
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.IntVector;
Expand All @@ -29,6 +31,11 @@
/**
* Aggregates the top N variable length {@link BytesRef} values per bucket.
* See {@link BucketedSort} for more information.
* <p>
* This is substantially different from {@link IpBucketedSort} because
* this has to handle variable length byte strings. To do that it allocates
* a heap of {@link BreakingBytesRefBuilder}s.
* </p>
*/
public class BytesRefBucketedSort implements Releasable {
private final BucketedSortCommon common;
Expand Down Expand Up @@ -123,7 +130,7 @@ public void collect(BytesRef value, int bucket) {
// Gathering mode
long requiredSize = common.endIndex(rootIndex);
if (values.size() < requiredSize) {
grow(requiredSize);
grow(bucket);
}
int next = getNextGatherOffset(rootIndex);
common.assertValidNextOffset(next);
Expand Down Expand Up @@ -271,13 +278,21 @@ private void swap(long lhs, long rhs) {

/**
* Allocate storage for more buckets and store the "next gather offset"
* for those new buckets.
* for those new buckets. We always grow the storage by whole bucket's
* worth of slots at a time. We never allocate space for partial buckets.
*/
private void grow(long requiredSize) {
private void grow(int bucket) {
long oldMax = values.size();
values = common.bigArrays.grow(values, requiredSize);
assert oldMax % common.bucketSize == 0;

long newSizeInBuckets = BigArrays.overSize(
bucket + 1,
PageCacheRecycler.OBJECT_PAGE_SIZE,
RamUsageEstimator.NUM_BYTES_OBJECT_REF * common.bucketSize
);
values = common.bigArrays.resize(values, newSizeInBuckets * common.bucketSize);
// Set the next gather offsets for all newly allocated buckets.
fillGatherOffsets(oldMax - (oldMax % common.bucketSize));
fillGatherOffsets(oldMax);
}

/**
Expand All @@ -296,6 +311,7 @@ private void fillGatherOffsets(long startingAt) {
bytes.grow(Integer.BYTES);
bytes.setLength(Integer.BYTES);
ByteUtils.writeIntLE(nextOffset, bytes.bytes(), 0);
checkInvariant(Math.toIntExact(bucketRoot / common.bucketSize));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@
/**
* Aggregates the top N IP values per bucket.
* See {@link BucketedSort} for more information.
* <p>
* This is substantially different from {@link BytesRefBucketedSort} because
* this takes advantage of IPs having a fixed length and allocates a dense
* storage for them.
* </p>
*/
public class IpBucketedSort implements Releasable {
private static final int IP_LENGTH = 16; // Bytes. It's ipv6.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package org.elasticsearch.compute.data.sort;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.BitArray;
import org.elasticsearch.common.util.$Type$Array;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.IntVector;
Expand Down Expand Up @@ -101,7 +102,7 @@ public class $Type$BucketedSort implements Releasable {
// Gathering mode
long requiredSize = rootIndex + bucketSize;
if (values.size() < requiredSize) {
grow(requiredSize);
grow(bucket);
}
int next = getNextGatherOffset(rootIndex);
assert 0 <= next && next < bucketSize
Expand Down Expand Up @@ -261,19 +262,27 @@ $endif$

/**
* Allocate storage for more buckets and store the "next gather offset"
* for those new buckets.
* for those new buckets. We always grow the storage by whole bucket's
* worth of slots at a time. We never allocate space for partial buckets.
*/
private void grow(long minSize) {
private void grow(int bucket) {
long oldMax = values.size();
values = bigArrays.grow(values, minSize);
assert oldMax % bucketSize == 0;

long newSizeInBuckets = BigArrays.overSize(
bucket + 1,
PageCacheRecycler.$TYPE$_PAGE_SIZE,
$BYTES$ * bucketSize
);
values = bigArrays.resize(values, newSizeInBuckets * bucketSize);
// Set the next gather offsets for all newly allocated buckets.
setNextGatherOffsets(oldMax - (oldMax % getBucketSize()));
fillGatherOffsets(oldMax);
}

/**
* Maintain the "next gather offsets" for newly allocated buckets.
*/
private void setNextGatherOffsets(long startingAt) {
private void fillGatherOffsets(long startingAt) {
int nextOffset = getBucketSize() - 1;
for (long bucketRoot = startingAt; bucketRoot < values.size(); bucketRoot += getBucketSize()) {
setNextGatherOffset(bucketRoot, nextOffset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,42 @@ public final void testMergeThisBigger() {
}
}

public final void testMergePastEnd() {
int buckets = 10000;
int bucketSize = between(1, 1000);
int target = between(0, buckets);
List<V> values = randomList(buckets, buckets, this::randomValue);
Collections.sort(values);
try (T sort = build(SortOrder.ASC, bucketSize)) {
// Add a single value to the main sort.
for (int b = 0; b < buckets; b++) {
collect(sort, values.get(b), b);
}

try (T other = build(SortOrder.ASC, bucketSize)) {
// Add *all* values to the target bucket of the secondary sort.
for (int i = 0; i < values.size(); i++) {
if (i != target) {
collect(other, values.get(i), target);
}
}

// Merge all buckets pairwise. Most of the secondary ones are empty.
for (int b = 0; b < buckets; b++) {
merge(sort, b, other, b);
}
}

for (int b = 0; b < buckets; b++) {
if (b == target) {
assertBlock(sort, b, values.subList(0, bucketSize));
} else {
assertBlock(sort, b, List.of(values.get(b)));
}
}
}
}

protected void assertBlock(T sort, int groupId, List<V> values) {
var blockFactory = TestBlockFactory.getNonBreakingInstance();

Expand Down