Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
package org.elasticsearch.compute.data.sort;

import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ByteUtils;
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.IntVector;
Expand All @@ -29,6 +31,11 @@
/**
* Aggregates the top N variable length {@link BytesRef} values per bucket.
* See {@link BucketedSort} for more information.
* <p>
* This is substantially different from {@link IpBucketedSort} because
* this has to handle variable length byte strings. To do that it allocates
* a heap of {@link BreakingBytesRefBuilder}s.
* </p>
*/
public class BytesRefBucketedSort implements Releasable {
private final BucketedSortCommon common;
Expand Down Expand Up @@ -123,7 +130,7 @@ public void collect(BytesRef value, int bucket) {
// Gathering mode
long requiredSize = common.endIndex(rootIndex);
if (values.size() < requiredSize) {
grow(requiredSize);
grow(bucket);
}
int next = getNextGatherOffset(rootIndex);
common.assertValidNextOffset(next);
Expand All @@ -143,13 +150,13 @@ public void collect(BytesRef value, int bucket) {
*/
public void merge(int bucket, BytesRefBucketedSort other, int otherBucket) {
long otherRootIndex = other.common.rootIndex(otherBucket);
long otherEnd = other.common.endIndex(otherRootIndex);
if (otherEnd >= other.values.size()) {
if (otherRootIndex >= other.values.size()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see we reverted the check, assuming that when there is enough space for the other root index to be contained in the other values, we're gonna say that there is a bucket at otherRootIndex.

I took a look at how grow currently works, and it seems that this expectation is reasonable, because we resize to multiples of the bucket size and call fillGatherOffsets to make sure that at there's an object at each new bucket roots.

// The value was never collected.
return;
}
other.checkInvariant(otherBucket);
long otherStart = other.startIndex(otherBucket, otherRootIndex);
long otherEnd = other.common.endIndex(otherRootIndex);
// TODO: This can be improved for heapified buckets by making use of the heap structures
for (long i = otherStart; i < otherEnd; i++) {
collect(other.values.get(i).bytesRefView(), bucket);
Expand Down Expand Up @@ -271,13 +278,21 @@ private void swap(long lhs, long rhs) {

/**
* Allocate storage for more buckets and store the "next gather offset"
* for those new buckets.
* for those new buckets. We always grow the storage by whole bucket's
* worth of slots at a time. We never allocate space for partial buckets.
*/
private void grow(long requiredSize) {
private void grow(int bucket) {
long oldMax = values.size();
values = common.bigArrays.grow(values, requiredSize);
assert oldMax % common.bucketSize == 0;

long newSizeInBuckets = BigArrays.overSize(
bucket + 1,
PageCacheRecycler.OBJECT_PAGE_SIZE,
RamUsageEstimator.NUM_BYTES_OBJECT_REF * common.bucketSize
);
values = common.bigArrays.resize(values, newSizeInBuckets * common.bucketSize);
// Set the next gather offsets for all newly allocated buckets.
fillGatherOffsets(oldMax - (oldMax % common.bucketSize));
fillGatherOffsets(oldMax);
}

/**
Expand All @@ -296,6 +311,7 @@ private void fillGatherOffsets(long startingAt) {
bytes.grow(Integer.BYTES);
bytes.setLength(Integer.BYTES);
ByteUtils.writeIntLE(nextOffset, bytes.bytes(), 0);
checkInvariant(Math.toIntExact(bucketRoot / common.bucketSize));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@
/**
* Aggregates the top N IP values per bucket.
* See {@link BucketedSort} for more information.
* <p>
* This is substantially different from {@link BytesRefBucketedSort} because
* this takes advantage of IPs having a fixed length and allocates a dense
* storage for them.
* </p>
*/
public class IpBucketedSort implements Releasable {
private static final int IP_LENGTH = 16; // Bytes. It's ipv6.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package org.elasticsearch.compute.data.sort;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.BitArray;
import org.elasticsearch.common.util.$Type$Array;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.IntVector;
Expand Down Expand Up @@ -101,7 +102,7 @@ public class $Type$BucketedSort implements Releasable {
// Gathering mode
long requiredSize = rootIndex + bucketSize;
if (values.size() < requiredSize) {
grow(requiredSize);
grow(bucket);
}
int next = getNextGatherOffset(rootIndex);
assert 0 <= next && next < bucketSize
Expand Down Expand Up @@ -261,19 +262,27 @@ $endif$

/**
* Allocate storage for more buckets and store the "next gather offset"
* for those new buckets.
* for those new buckets. We always grow the storage by whole bucket's
* worth of slots at a time. We never allocate space for partial buckets.
*/
private void grow(long minSize) {
private void grow(int bucket) {
long oldMax = values.size();
values = bigArrays.grow(values, minSize);
assert oldMax % bucketSize == 0;

long newSizeInBuckets = BigArrays.overSize(
bucket + 1,
PageCacheRecycler.$TYPE$_PAGE_SIZE,
$BYTES$ * bucketSize
);
values = bigArrays.resize(values, newSizeInBuckets * bucketSize);
// Set the next gather offsets for all newly allocated buckets.
setNextGatherOffsets(oldMax - (oldMax % getBucketSize()));
fillGatherOffsets(oldMax);
}

/**
* Maintain the "next gather offsets" for newly allocated buckets.
*/
private void setNextGatherOffsets(long startingAt) {
private void fillGatherOffsets(long startingAt) {
int nextOffset = getBucketSize() - 1;
for (long bucketRoot = startingAt; bucketRoot < values.size(); bucketRoot += getBucketSize()) {
setNextGatherOffset(bucketRoot, nextOffset);
Expand Down