Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/113183.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 113183
summary: "ESQL: TOP support for strings"
area: ES|QL
type: feature
issues:
- 109849
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import org.elasticsearch.search.sort.SortOrder;

/**
* Components common to BucketedSort classes.
* Components common to BucketedSort implementations.
*/
class BucketedSortCommon implements Releasable {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I yanked this out because it looked like it'd be safe to share at least a little code. I didn't plug this into the X-BucketedSort classes yet. But I think it's just about the same thing.

final BigArrays bigArrays;
Expand All @@ -24,7 +24,7 @@ class BucketedSortCommon implements Releasable {
* {@code true} if the bucket is in heap mode, {@code false} if
* it is still gathering.
*/
final BitArray heapMode;
private final BitArray heapMode;

BucketedSortCommon(BigArrays bigArrays, SortOrder order, int bucketSize) {
this.bigArrays = bigArrays;
Expand All @@ -48,10 +48,6 @@ long endIndex(long rootIndex) {
return rootIndex + bucketSize;
}

long requiredSize(long rootIndex) {
return rootIndex + bucketSize;
}

boolean inHeapMode(int bucket) {
return heapMode.get(bucket);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,13 @@

import java.util.Arrays;
import java.util.stream.IntStream;
import java.util.stream.LongStream;

/**
* Aggregates the top N variable length {@link BytesRef} values per bucket.
* See {@link BucketedSort} for more information.
*/
public class BytesRefBucketedSort implements Releasable {
/**
* {@code true} if the bucket is in heap mode, {@code false} if
* it is still gathering.
*/
private final BucketedSortCommon common;
private final CircuitBreaker breaker;
private final String label;
Expand Down Expand Up @@ -89,17 +86,15 @@ private void checkInvariant(int bucket) {
return;
}
long rootIndex = common.rootIndex(bucket);
long requiredSize = common.requiredSize(rootIndex);
long requiredSize = common.endIndex(rootIndex);
if (values.size() < requiredSize) {
throw new AssertionError("values too short " + values.size() + " < " + requiredSize);
}
if (values.get(rootIndex) == null) {
throw new AssertionError("new gather offset can't be null");
}
if (common.inHeapMode(bucket) == false) {
if (getNextGatherOffset(rootIndex) >= common.bucketSize) {
throw new AssertionError("values too short " + values.size() + " < " + requiredSize);
}
common.assertValidNextOffset(getNextGatherOffset(rootIndex));
} else {
for (long l = rootIndex; l < common.endIndex(rootIndex); l++) {
if (values.get(rootIndex) == null) {
Expand All @@ -126,7 +121,7 @@ public void collect(BytesRef value, int bucket) {
return;
}
// Gathering mode
long requiredSize = common.requiredSize(rootIndex);
long requiredSize = common.endIndex(rootIndex);
if (values.size() < requiredSize) {
grow(requiredSize);
}
Expand Down Expand Up @@ -382,13 +377,10 @@ private BreakingBytesRefBuilder clearedBytesAt(long index) {

@Override
public final void close() {
Releasables.close(() -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I thought this was an interesting, safe trick. Some reason to swap to wrap()? For future cases

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly paranoia around if one fails. wrap will continue even on close.

I feel like i go to a lot of trouble to call these methods to make sure closing happens right. Partly that's paranoia - it can't fail. But partly that just so readers see it and say "the normal close code" - they see a call to wrap and stuff as "normal"

for (int i = 0; i < values.size(); i++) {
BreakingBytesRefBuilder bytes = values.get(i);
if (bytes != null) {
bytes.close();
}
}
}, values, common);
Releasable allValues = values == null ? () -> {} : Releasables.wrap(LongStream.range(0, values.size()).mapToObj(i -> {
BreakingBytesRefBuilder bytes = values.get(i);
return bytes == null ? (Releasable) () -> {} : bytes;
}).toList().iterator());
Releasables.close(allValues, values, common);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.BitArray;
import org.elasticsearch.common.util.ByteArray;
import org.elasticsearch.common.util.ByteUtils;
import org.elasticsearch.compute.data.Block;
Expand Down Expand Up @@ -39,18 +38,11 @@ public class IpBucketedSort implements Releasable {
*/
private final byte[] scratchBytes = new byte[IP_LENGTH];

private final BigArrays bigArrays;
private final SortOrder order;
private final int bucketSize;
/**
* {@code true} if the bucket is in heap mode, {@code false} if
* it is still gathering.
*/
private final BitArray heapMode;
private final BucketedSortCommon common;
/**
* An array containing all the values on all buckets. The structure is as follows:
* <p>
* For each bucket, there are {@link #bucketSize} elements, based on the bucket id (0, 1, 2...).
* For each bucket, there are {@link BucketedSortCommon#bucketSize} elements, based on the bucket id (0, 1, 2...).
* Then, for each bucket, it can be in 2 states:
* </p>
* <ul>
Expand All @@ -77,10 +69,7 @@ public class IpBucketedSort implements Releasable {
private ByteArray values;

public IpBucketedSort(BigArrays bigArrays, SortOrder order, int bucketSize) {
this.bigArrays = bigArrays;
this.order = order;
this.bucketSize = bucketSize;
heapMode = new BitArray(0, bigArrays);
this.common = new BucketedSortCommon(bigArrays, order, bucketSize);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No inheritance? Shouldn't final methods be safe to use?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I sure could have inherited it. I started that way because it felt easier but the ctor with the sub-types and the closing and.... for that, at least, it felt easier to compose.


boolean success = false;
try {
Expand All @@ -101,26 +90,25 @@ public IpBucketedSort(BigArrays bigArrays, SortOrder order, int bucketSize) {
*/
public void collect(BytesRef value, int bucket) {
assert value.length == IP_LENGTH;
long rootIndex = (long) bucket * bucketSize;
if (inHeapMode(bucket)) {
long rootIndex = common.rootIndex(bucket);
if (common.inHeapMode(bucket)) {
if (betterThan(value, get(rootIndex, scratch1))) {
set(rootIndex, value);
downHeap(rootIndex, 0);
}
return;
}
// Gathering mode
long requiredSize = (rootIndex + bucketSize) * IP_LENGTH;
long requiredSize = common.endIndex(rootIndex) * IP_LENGTH;
if (values.size() < requiredSize) {
grow(requiredSize);
}
int next = getNextGatherOffset(rootIndex);
assert 0 <= next && next < bucketSize
: "Expected next to be in the range of valid buckets [0 <= " + next + " < " + bucketSize + "]";
common.assertValidNextOffset(next);
long index = next + rootIndex;
set(index, value);
if (next == 0) {
heapMode.set(bucket);
common.enableHeapMode(bucket);
heapify(rootIndex);
} else {
setNextGatherOffset(rootIndex, next - 1);
Expand All @@ -132,13 +120,13 @@ public void collect(BytesRef value, int bucket) {
* Returns [0, 0] if the bucket has never been collected.
*/
private Tuple<Long, Long> getBucketValuesIndexes(int bucket) {
long rootIndex = (long) bucket * bucketSize;
long rootIndex = common.rootIndex(bucket);
if (rootIndex >= values.size() / IP_LENGTH) {
// We've never seen this bucket.
return Tuple.tuple(0L, 0L);
}
long start = inHeapMode(bucket) ? rootIndex : (rootIndex + getNextGatherOffset(rootIndex) + 1);
long end = rootIndex + bucketSize;
long start = startIndex(bucket, rootIndex);
long end = common.endIndex(rootIndex);
return Tuple.tuple(start, end);
}

Expand Down Expand Up @@ -170,7 +158,7 @@ public Block toBlock(BlockFactory blockFactory, IntVector selected) {
}

// Used to sort the values in the bucket.
var bucketValues = new BytesRef[bucketSize];
var bucketValues = new BytesRef[common.bucketSize];

try (var builder = blockFactory.newBytesRefBlockBuilder(selected.getPositionCount())) {
for (int s = 0; s < selected.getPositionCount(); s++) {
Expand All @@ -197,7 +185,7 @@ public Block toBlock(BlockFactory blockFactory, IntVector selected) {
Arrays.sort(bucketValues, 0, (int) size);

builder.beginPositionEntry();
if (order == SortOrder.ASC) {
if (common.order == SortOrder.ASC) {
for (int i = 0; i < size; i++) {
builder.appendBytesRef(bucketValues[i]);
}
Expand All @@ -212,11 +200,11 @@ public Block toBlock(BlockFactory blockFactory, IntVector selected) {
}
}

/**
* Is this bucket a min heap {@code true} or in gathering mode {@code false}?
*/
private boolean inHeapMode(int bucket) {
return heapMode.get(bucket);
private long startIndex(int bucket, long rootIndex) {
if (common.inHeapMode(bucket)) {
return rootIndex;
}
return rootIndex + getNextGatherOffset(rootIndex) + 1;
}

/**
Expand Down Expand Up @@ -253,7 +241,7 @@ private void setNextGatherOffset(long rootIndex, int offset) {
* {@link SortOrder#ASC} and "higher" for {@link SortOrder#DESC}.
*/
private boolean betterThan(BytesRef lhs, BytesRef rhs) {
return order.reverseMul() * lhs.compareTo(rhs) < 0;
return common.order.reverseMul() * lhs.compareTo(rhs) < 0;
}

/**
Expand Down Expand Up @@ -282,17 +270,17 @@ private void swap(long lhs, long rhs) {
*/
private void grow(long minSize) {
long oldMax = values.size() / IP_LENGTH;
values = bigArrays.grow(values, minSize);
values = common.bigArrays.grow(values, minSize);
// Set the next gather offsets for all newly allocated buckets.
setNextGatherOffsets(oldMax - (oldMax % bucketSize));
setNextGatherOffsets(oldMax - (oldMax % common.bucketSize));
}

/**
* Maintain the "next gather offsets" for newly allocated buckets.
*/
private void setNextGatherOffsets(long startingAt) {
int nextOffset = bucketSize - 1;
for (long bucketRoot = startingAt; bucketRoot < values.size() / IP_LENGTH; bucketRoot += bucketSize) {
int nextOffset = common.bucketSize - 1;
for (long bucketRoot = startingAt; bucketRoot < values.size() / IP_LENGTH; bucketRoot += common.bucketSize) {
setNextGatherOffset(bucketRoot, nextOffset);
}
}
Expand Down Expand Up @@ -320,7 +308,7 @@ private void setNextGatherOffsets(long startingAt) {
* @param rootIndex the index the start of the bucket
*/
private void heapify(long rootIndex) {
int maxParent = bucketSize / 2 - 1;
int maxParent = common.bucketSize / 2 - 1;
for (int parent = maxParent; parent >= 0; parent--) {
downHeap(rootIndex, parent);
}
Expand All @@ -340,14 +328,14 @@ private void downHeap(long rootIndex, int parent) {
long worstIndex = parentIndex;
int leftChild = parent * 2 + 1;
long leftIndex = rootIndex + leftChild;
if (leftChild < bucketSize) {
if (leftChild < common.bucketSize) {
if (betterThan(get(worstIndex, scratch1), get(leftIndex, scratch2))) {
worst = leftChild;
worstIndex = leftIndex;
}
int rightChild = leftChild + 1;
long rightIndex = rootIndex + rightChild;
if (rightChild < bucketSize && betterThan(get(worstIndex, scratch1), get(rightIndex, scratch2))) {
if (rightChild < common.bucketSize && betterThan(get(worstIndex, scratch1), get(rightIndex, scratch2))) {
worst = rightChild;
worstIndex = rightIndex;
}
Expand Down Expand Up @@ -386,6 +374,6 @@ private void set(long index, BytesRef value) {

@Override
public final void close() {
Releasables.close(values, heapMode);
Releasables.close(values, common);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
import static org.hamcrest.Matchers.equalTo;

public abstract class BucketedSortTestCase<T extends Releasable, V extends Comparable<V>> extends ESTestCase {
// TODO add cranky tests

/**
* Build a {@link T} to test. Sorts built by this method shouldn't need scores.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@

import org.apache.lucene.document.InetAddressPoint;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BytesRefBlock;
Expand All @@ -26,7 +27,14 @@
public class BytesRefBucketedSortTests extends BucketedSortTestCase<BytesRefBucketedSort, BytesRef> {
@Override
protected BytesRefBucketedSort build(SortOrder sortOrder, int bucketSize) {
return new BytesRefBucketedSort(new NoopCircuitBreaker("test"), "test", bigArrays(), sortOrder, bucketSize);
BigArrays bigArrays = bigArrays();
return new BytesRefBucketedSort(
bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST),
"test",
bigArrays,
sortOrder,
bucketSize
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ public MixedClusterEsqlSpecIT(
protected void shouldSkipTest(String testName) throws IOException {
super.shouldSkipTest(testName);
assumeTrue("Test " + testName + " is skipped on " + bwcVersion, isEnabled(testName, instructions, bwcVersion));
assumeFalse(
"Skip META tests on mixed version clusters because we change it too quickly",
testCase.requiredCapabilities.contains("meta")
);
if (mode == ASYNC) {
assumeTrue("Async is not supported on " + bwcVersion, supportsAsync());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ protected void shouldSkipTest(String testName) throws IOException {
);
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains("inlinestats"));
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains("inlinestats_v2"));
assumeFalse(
"Skip META tests on mixed version clusters because we change it too quickly",
testCase.requiredCapabilities.contains("meta")
);
}

private TestFeatureService remoteFeaturesService() throws IOException {
Expand Down
Loading