diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/sort/DoubleBucketedSort.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/sort/DoubleBucketedSort.java index ca89e6f999641..c8c6701e68e4a 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/sort/DoubleBucketedSort.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/sort/DoubleBucketedSort.java @@ -20,7 +20,6 @@ import org.elasticsearch.search.sort.BucketedSort; import org.elasticsearch.search.sort.SortOrder; -import java.util.Arrays; import java.util.stream.IntStream; /** @@ -95,7 +94,7 @@ public void collect(double value, int bucket) { if (inHeapMode(bucket)) { if (betterThan(value, values.get(rootIndex))) { values.set(rootIndex, value); - downHeap(rootIndex, 0); + downHeap(rootIndex, 0, bucketSize); } return; } @@ -111,7 +110,7 @@ public void collect(double value, int bucket) { values.set(index, value); if (next == 0) { heapMode.set(bucket); - heapify(rootIndex); + heapify(rootIndex, bucketSize); } else { setNextGatherOffset(rootIndex, next - 1); } @@ -172,14 +171,12 @@ public Block toBlock(BlockFactory blockFactory, IntVector selected) { return blockFactory.newConstantNullBlock(selected.getPositionCount()); } - // Used to sort the values in the bucket. - var bucketValues = new double[bucketSize]; - try (var builder = blockFactory.newDoubleBlockBuilder(selected.getPositionCount())) { for (int s = 0; s < selected.getPositionCount(); s++) { int bucket = selected.getInt(s); var bounds = getBucketValuesIndexes(bucket); + var rootIndex = bounds.v1(); var size = bounds.v2() - bounds.v1(); if (size == 0) { @@ -192,22 +189,15 @@ public Block toBlock(BlockFactory blockFactory, IntVector selected) { continue; } - for (int i = 0; i < size; i++) { - bucketValues[i] = values.get(bounds.v1() + i); + // If we are in the gathering mode, we need to heapify before sorting. + if (inHeapMode(bucket) == false) { + heapify(rootIndex, (int) size); } - - // TODO: Make use of heap structures to faster iterate in order instead of copying and sorting - Arrays.sort(bucketValues, 0, (int) size); + heapSort(rootIndex, (int) size); builder.beginPositionEntry(); - if (order == SortOrder.ASC) { - for (int i = 0; i < size; i++) { - builder.appendDouble(bucketValues[i]); - } - } else { - for (int i = (int) size - 1; i >= 0; i--) { - builder.appendDouble(bucketValues[i]); - } + for (int i = 0; i < size; i++) { + builder.appendDouble(values.get(bounds.v1() + i)); } builder.endPositionEntry(); } @@ -305,10 +295,28 @@ private void fillGatherOffsets(long startingAt) { * * @param rootIndex the index the start of the bucket */ - private void heapify(long rootIndex) { - int maxParent = bucketSize / 2 - 1; + private void heapify(long rootIndex, int heapSize) { + int maxParent = heapSize / 2 - 1; for (int parent = maxParent; parent >= 0; parent--) { - downHeap(rootIndex, parent); + downHeap(rootIndex, parent, heapSize); + } + } + + /** + * Sorts all the values in the heap using heap sort algorithm. + * This runs in {@code O(n log n)} time. + * @param rootIndex index of the start of the bucket + * @param heapSize Number of values that belong to the heap. + * Can be less than bucketSize. + * In such a case, the remaining values in range + * (rootIndex + heapSize, rootIndex + bucketSize) + * are *not* considered part of the heap. + */ + private void heapSort(long rootIndex, int heapSize) { + while (heapSize > 0) { + swap(rootIndex, rootIndex + heapSize - 1); + heapSize--; + downHeap(rootIndex, 0, heapSize); } } @@ -318,22 +326,27 @@ private void heapify(long rootIndex) { * @param rootIndex index of the start of the bucket * @param parent Index within the bucket of the parent to check. * For example, 0 is the "root". + * @param heapSize Number of values that belong to the heap. + * Can be less than bucketSize. + * In such a case, the remaining values in range + * (rootIndex + heapSize, rootIndex + bucketSize) + * are *not* considered part of the heap. */ - private void downHeap(long rootIndex, int parent) { + private void downHeap(long rootIndex, int parent, int heapSize) { while (true) { long parentIndex = rootIndex + parent; int worst = parent; long worstIndex = parentIndex; int leftChild = parent * 2 + 1; long leftIndex = rootIndex + leftChild; - if (leftChild < bucketSize) { + if (leftChild < heapSize) { if (betterThan(values.get(worstIndex), values.get(leftIndex))) { worst = leftChild; worstIndex = leftIndex; } int rightChild = leftChild + 1; long rightIndex = rootIndex + rightChild; - if (rightChild < bucketSize && betterThan(values.get(worstIndex), values.get(rightIndex))) { + if (rightChild < heapSize && betterThan(values.get(worstIndex), values.get(rightIndex))) { worst = rightChild; worstIndex = rightIndex; } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/sort/FloatBucketedSort.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/sort/FloatBucketedSort.java index 2bf8edd99f48c..4afaa818855e4 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/sort/FloatBucketedSort.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/sort/FloatBucketedSort.java @@ -20,7 +20,6 @@ import org.elasticsearch.search.sort.BucketedSort; import org.elasticsearch.search.sort.SortOrder; -import java.util.Arrays; import java.util.stream.IntStream; /** @@ -95,7 +94,7 @@ public void collect(float value, int bucket) { if (inHeapMode(bucket)) { if (betterThan(value, values.get(rootIndex))) { values.set(rootIndex, value); - downHeap(rootIndex, 0); + downHeap(rootIndex, 0, bucketSize); } return; } @@ -111,7 +110,7 @@ public void collect(float value, int bucket) { values.set(index, value); if (next == 0) { heapMode.set(bucket); - heapify(rootIndex); + heapify(rootIndex, bucketSize); } else { setNextGatherOffset(rootIndex, next - 1); } @@ -172,14 +171,12 @@ public Block toBlock(BlockFactory blockFactory, IntVector selected) { return blockFactory.newConstantNullBlock(selected.getPositionCount()); } - // Used to sort the values in the bucket. - var bucketValues = new float[bucketSize]; - try (var builder = blockFactory.newFloatBlockBuilder(selected.getPositionCount())) { for (int s = 0; s < selected.getPositionCount(); s++) { int bucket = selected.getInt(s); var bounds = getBucketValuesIndexes(bucket); + var rootIndex = bounds.v1(); var size = bounds.v2() - bounds.v1(); if (size == 0) { @@ -192,22 +189,15 @@ public Block toBlock(BlockFactory blockFactory, IntVector selected) { continue; } - for (int i = 0; i < size; i++) { - bucketValues[i] = values.get(bounds.v1() + i); + // If we are in the gathering mode, we need to heapify before sorting. + if (inHeapMode(bucket) == false) { + heapify(rootIndex, (int) size); } - - // TODO: Make use of heap structures to faster iterate in order instead of copying and sorting - Arrays.sort(bucketValues, 0, (int) size); + heapSort(rootIndex, (int) size); builder.beginPositionEntry(); - if (order == SortOrder.ASC) { - for (int i = 0; i < size; i++) { - builder.appendFloat(bucketValues[i]); - } - } else { - for (int i = (int) size - 1; i >= 0; i--) { - builder.appendFloat(bucketValues[i]); - } + for (int i = 0; i < size; i++) { + builder.appendFloat(values.get(bounds.v1() + i)); } builder.endPositionEntry(); } @@ -305,10 +295,28 @@ private void fillGatherOffsets(long startingAt) { * * @param rootIndex the index the start of the bucket */ - private void heapify(long rootIndex) { - int maxParent = bucketSize / 2 - 1; + private void heapify(long rootIndex, int heapSize) { + int maxParent = heapSize / 2 - 1; for (int parent = maxParent; parent >= 0; parent--) { - downHeap(rootIndex, parent); + downHeap(rootIndex, parent, heapSize); + } + } + + /** + * Sorts all the values in the heap using heap sort algorithm. + * This runs in {@code O(n log n)} time. + * @param rootIndex index of the start of the bucket + * @param heapSize Number of values that belong to the heap. + * Can be less than bucketSize. + * In such a case, the remaining values in range + * (rootIndex + heapSize, rootIndex + bucketSize) + * are *not* considered part of the heap. + */ + private void heapSort(long rootIndex, int heapSize) { + while (heapSize > 0) { + swap(rootIndex, rootIndex + heapSize - 1); + heapSize--; + downHeap(rootIndex, 0, heapSize); } } @@ -318,22 +326,27 @@ private void heapify(long rootIndex) { * @param rootIndex index of the start of the bucket * @param parent Index within the bucket of the parent to check. * For example, 0 is the "root". + * @param heapSize Number of values that belong to the heap. + * Can be less than bucketSize. + * In such a case, the remaining values in range + * (rootIndex + heapSize, rootIndex + bucketSize) + * are *not* considered part of the heap. */ - private void downHeap(long rootIndex, int parent) { + private void downHeap(long rootIndex, int parent, int heapSize) { while (true) { long parentIndex = rootIndex + parent; int worst = parent; long worstIndex = parentIndex; int leftChild = parent * 2 + 1; long leftIndex = rootIndex + leftChild; - if (leftChild < bucketSize) { + if (leftChild < heapSize) { if (betterThan(values.get(worstIndex), values.get(leftIndex))) { worst = leftChild; worstIndex = leftIndex; } int rightChild = leftChild + 1; long rightIndex = rootIndex + rightChild; - if (rightChild < bucketSize && betterThan(values.get(worstIndex), values.get(rightIndex))) { + if (rightChild < heapSize && betterThan(values.get(worstIndex), values.get(rightIndex))) { worst = rightChild; worstIndex = rightIndex; } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/sort/IntBucketedSort.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/sort/IntBucketedSort.java index 257dfe2ebb0bd..5ba1a3f7138a3 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/sort/IntBucketedSort.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/sort/IntBucketedSort.java @@ -20,7 +20,6 @@ import org.elasticsearch.search.sort.BucketedSort; import org.elasticsearch.search.sort.SortOrder; -import java.util.Arrays; import java.util.stream.IntStream; /** @@ -95,7 +94,7 @@ public void collect(int value, int bucket) { if (inHeapMode(bucket)) { if (betterThan(value, values.get(rootIndex))) { values.set(rootIndex, value); - downHeap(rootIndex, 0); + downHeap(rootIndex, 0, bucketSize); } return; } @@ -111,7 +110,7 @@ public void collect(int value, int bucket) { values.set(index, value); if (next == 0) { heapMode.set(bucket); - heapify(rootIndex); + heapify(rootIndex, bucketSize); } else { setNextGatherOffset(rootIndex, next - 1); } @@ -172,14 +171,12 @@ public Block toBlock(BlockFactory blockFactory, IntVector selected) { return blockFactory.newConstantNullBlock(selected.getPositionCount()); } - // Used to sort the values in the bucket. - var bucketValues = new int[bucketSize]; - try (var builder = blockFactory.newIntBlockBuilder(selected.getPositionCount())) { for (int s = 0; s < selected.getPositionCount(); s++) { int bucket = selected.getInt(s); var bounds = getBucketValuesIndexes(bucket); + var rootIndex = bounds.v1(); var size = bounds.v2() - bounds.v1(); if (size == 0) { @@ -192,22 +189,15 @@ public Block toBlock(BlockFactory blockFactory, IntVector selected) { continue; } - for (int i = 0; i < size; i++) { - bucketValues[i] = values.get(bounds.v1() + i); + // If we are in the gathering mode, we need to heapify before sorting. + if (inHeapMode(bucket) == false) { + heapify(rootIndex, (int) size); } - - // TODO: Make use of heap structures to faster iterate in order instead of copying and sorting - Arrays.sort(bucketValues, 0, (int) size); + heapSort(rootIndex, (int) size); builder.beginPositionEntry(); - if (order == SortOrder.ASC) { - for (int i = 0; i < size; i++) { - builder.appendInt(bucketValues[i]); - } - } else { - for (int i = (int) size - 1; i >= 0; i--) { - builder.appendInt(bucketValues[i]); - } + for (int i = 0; i < size; i++) { + builder.appendInt(values.get(bounds.v1() + i)); } builder.endPositionEntry(); } @@ -305,10 +295,28 @@ private void fillGatherOffsets(long startingAt) { * * @param rootIndex the index the start of the bucket */ - private void heapify(long rootIndex) { - int maxParent = bucketSize / 2 - 1; + private void heapify(long rootIndex, int heapSize) { + int maxParent = heapSize / 2 - 1; for (int parent = maxParent; parent >= 0; parent--) { - downHeap(rootIndex, parent); + downHeap(rootIndex, parent, heapSize); + } + } + + /** + * Sorts all the values in the heap using heap sort algorithm. + * This runs in {@code O(n log n)} time. + * @param rootIndex index of the start of the bucket + * @param heapSize Number of values that belong to the heap. + * Can be less than bucketSize. + * In such a case, the remaining values in range + * (rootIndex + heapSize, rootIndex + bucketSize) + * are *not* considered part of the heap. + */ + private void heapSort(long rootIndex, int heapSize) { + while (heapSize > 0) { + swap(rootIndex, rootIndex + heapSize - 1); + heapSize--; + downHeap(rootIndex, 0, heapSize); } } @@ -318,22 +326,27 @@ private void heapify(long rootIndex) { * @param rootIndex index of the start of the bucket * @param parent Index within the bucket of the parent to check. * For example, 0 is the "root". + * @param heapSize Number of values that belong to the heap. + * Can be less than bucketSize. + * In such a case, the remaining values in range + * (rootIndex + heapSize, rootIndex + bucketSize) + * are *not* considered part of the heap. */ - private void downHeap(long rootIndex, int parent) { + private void downHeap(long rootIndex, int parent, int heapSize) { while (true) { long parentIndex = rootIndex + parent; int worst = parent; long worstIndex = parentIndex; int leftChild = parent * 2 + 1; long leftIndex = rootIndex + leftChild; - if (leftChild < bucketSize) { + if (leftChild < heapSize) { if (betterThan(values.get(worstIndex), values.get(leftIndex))) { worst = leftChild; worstIndex = leftIndex; } int rightChild = leftChild + 1; long rightIndex = rootIndex + rightChild; - if (rightChild < bucketSize && betterThan(values.get(worstIndex), values.get(rightIndex))) { + if (rightChild < heapSize && betterThan(values.get(worstIndex), values.get(rightIndex))) { worst = rightChild; worstIndex = rightIndex; } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/sort/LongBucketedSort.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/sort/LongBucketedSort.java index c27467ebb60ff..ac472cc411668 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/sort/LongBucketedSort.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/sort/LongBucketedSort.java @@ -20,7 +20,6 @@ import org.elasticsearch.search.sort.BucketedSort; import org.elasticsearch.search.sort.SortOrder; -import java.util.Arrays; import java.util.stream.IntStream; /** @@ -95,7 +94,7 @@ public void collect(long value, int bucket) { if (inHeapMode(bucket)) { if (betterThan(value, values.get(rootIndex))) { values.set(rootIndex, value); - downHeap(rootIndex, 0); + downHeap(rootIndex, 0, bucketSize); } return; } @@ -111,7 +110,7 @@ public void collect(long value, int bucket) { values.set(index, value); if (next == 0) { heapMode.set(bucket); - heapify(rootIndex); + heapify(rootIndex, bucketSize); } else { setNextGatherOffset(rootIndex, next - 1); } @@ -172,14 +171,12 @@ public Block toBlock(BlockFactory blockFactory, IntVector selected) { return blockFactory.newConstantNullBlock(selected.getPositionCount()); } - // Used to sort the values in the bucket. - var bucketValues = new long[bucketSize]; - try (var builder = blockFactory.newLongBlockBuilder(selected.getPositionCount())) { for (int s = 0; s < selected.getPositionCount(); s++) { int bucket = selected.getInt(s); var bounds = getBucketValuesIndexes(bucket); + var rootIndex = bounds.v1(); var size = bounds.v2() - bounds.v1(); if (size == 0) { @@ -192,22 +189,15 @@ public Block toBlock(BlockFactory blockFactory, IntVector selected) { continue; } - for (int i = 0; i < size; i++) { - bucketValues[i] = values.get(bounds.v1() + i); + // If we are in the gathering mode, we need to heapify before sorting. + if (inHeapMode(bucket) == false) { + heapify(rootIndex, (int) size); } - - // TODO: Make use of heap structures to faster iterate in order instead of copying and sorting - Arrays.sort(bucketValues, 0, (int) size); + heapSort(rootIndex, (int) size); builder.beginPositionEntry(); - if (order == SortOrder.ASC) { - for (int i = 0; i < size; i++) { - builder.appendLong(bucketValues[i]); - } - } else { - for (int i = (int) size - 1; i >= 0; i--) { - builder.appendLong(bucketValues[i]); - } + for (int i = 0; i < size; i++) { + builder.appendLong(values.get(bounds.v1() + i)); } builder.endPositionEntry(); } @@ -305,10 +295,28 @@ private void fillGatherOffsets(long startingAt) { * * @param rootIndex the index the start of the bucket */ - private void heapify(long rootIndex) { - int maxParent = bucketSize / 2 - 1; + private void heapify(long rootIndex, int heapSize) { + int maxParent = heapSize / 2 - 1; for (int parent = maxParent; parent >= 0; parent--) { - downHeap(rootIndex, parent); + downHeap(rootIndex, parent, heapSize); + } + } + + /** + * Sorts all the values in the heap using heap sort algorithm. + * This runs in {@code O(n log n)} time. + * @param rootIndex index of the start of the bucket + * @param heapSize Number of values that belong to the heap. + * Can be less than bucketSize. + * In such a case, the remaining values in range + * (rootIndex + heapSize, rootIndex + bucketSize) + * are *not* considered part of the heap. + */ + private void heapSort(long rootIndex, int heapSize) { + while (heapSize > 0) { + swap(rootIndex, rootIndex + heapSize - 1); + heapSize--; + downHeap(rootIndex, 0, heapSize); } } @@ -318,22 +326,27 @@ private void heapify(long rootIndex) { * @param rootIndex index of the start of the bucket * @param parent Index within the bucket of the parent to check. * For example, 0 is the "root". + * @param heapSize Number of values that belong to the heap. + * Can be less than bucketSize. + * In such a case, the remaining values in range + * (rootIndex + heapSize, rootIndex + bucketSize) + * are *not* considered part of the heap. */ - private void downHeap(long rootIndex, int parent) { + private void downHeap(long rootIndex, int parent, int heapSize) { while (true) { long parentIndex = rootIndex + parent; int worst = parent; long worstIndex = parentIndex; int leftChild = parent * 2 + 1; long leftIndex = rootIndex + leftChild; - if (leftChild < bucketSize) { + if (leftChild < heapSize) { if (betterThan(values.get(worstIndex), values.get(leftIndex))) { worst = leftChild; worstIndex = leftIndex; } int rightChild = leftChild + 1; long rightIndex = rootIndex + rightChild; - if (rightChild < bucketSize && betterThan(values.get(worstIndex), values.get(rightIndex))) { + if (rightChild < heapSize && betterThan(values.get(worstIndex), values.get(rightIndex))) { worst = rightChild; worstIndex = rightIndex; } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/sort/BooleanBucketedSort.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/sort/BooleanBucketedSort.java index eb66d2c836d2f..9420069cdb3c5 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/sort/BooleanBucketedSort.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/sort/BooleanBucketedSort.java @@ -53,9 +53,6 @@ public BooleanBucketedSort(BigArrays bigArrays, SortOrder order, int bucketSize) /** * Collects a {@code value} into a {@code bucket}. - *

- * It may or may not be inserted in the heap, depending on if it is better than the current root. - *

*/ public void collect(boolean value, int bucket) { long rootIndex = (long) bucket * 2; diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/sort/BytesRefBucketedSort.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/sort/BytesRefBucketedSort.java index 63d79a9198622..5f0555aca56cf 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/sort/BytesRefBucketedSort.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/sort/BytesRefBucketedSort.java @@ -24,7 +24,6 @@ import org.elasticsearch.search.sort.BucketedSort; import org.elasticsearch.search.sort.SortOrder; -import java.util.Arrays; import java.util.stream.IntStream; import java.util.stream.LongStream; @@ -122,7 +121,7 @@ public void collect(BytesRef value, int bucket) { if (common.inHeapMode(bucket)) { if (betterThan(value, values.get(rootIndex).bytesRefView())) { clearedBytesAt(rootIndex).append(value); - downHeap(rootIndex, 0); + downHeap(rootIndex, 0, common.bucketSize); } checkInvariant(bucket); return; @@ -138,7 +137,7 @@ public void collect(BytesRef value, int bucket) { clearedBytesAt(index).append(value); if (next == 0) { common.enableHeapMode(bucket); - heapify(rootIndex); + heapify(rootIndex, common.bucketSize); } else { ByteUtils.writeIntLE(next - 1, values.get(rootIndex).bytes(), 0); } @@ -182,9 +181,6 @@ public Block toBlock(BlockFactory blockFactory, IntVector selected) { return blockFactory.newConstantNullBlock(selected.getPositionCount()); } - // Used to sort the values in the bucket. - BytesRef[] bucketValues = new BytesRef[common.bucketSize]; - try (var builder = blockFactory.newBytesRefBlockBuilder(selected.getPositionCount())) { for (int s = 0; s < selected.getPositionCount(); s++) { int bucket = selected.getInt(s); @@ -212,25 +208,18 @@ public Block toBlock(BlockFactory blockFactory, IntVector selected) { continue; } - for (int i = 0; i < size; i++) { - try (BreakingBytesRefBuilder bytes = values.get(start + i)) { - bucketValues[i] = bytes.bytesRefView(); - } - values.set(start + i, null); + // If we are in the gathering mode, we need to heapify before sorting. + if (common.inHeapMode(bucket) == false) { + heapify(start, (int) size); } - - // TODO: Make use of heap structures to faster iterate in order instead of copying and sorting - Arrays.sort(bucketValues, 0, (int) size); + heapSort(start, (int) size); builder.beginPositionEntry(); - if (common.order == SortOrder.ASC) { - for (int i = 0; i < size; i++) { - builder.appendBytesRef(bucketValues[i]); - } - } else { - for (int i = (int) size - 1; i >= 0; i--) { - builder.appendBytesRef(bucketValues[i]); + for (int i = 0; i < size; i++) { + try (BreakingBytesRefBuilder bytes = values.get(start + i)) { + builder.appendBytesRef(bytes.bytesRefView()); } + values.set(start + i, null); } builder.endPositionEntry(); } @@ -339,10 +328,28 @@ private void fillGatherOffsets(long startingAt) { * * @param rootIndex the index the start of the bucket */ - private void heapify(long rootIndex) { - int maxParent = common.bucketSize / 2 - 1; + private void heapify(long rootIndex, int heapSize) { + int maxParent = heapSize / 2 - 1; for (int parent = maxParent; parent >= 0; parent--) { - downHeap(rootIndex, parent); + downHeap(rootIndex, parent, heapSize); + } + } + + /** + * Sorts all the values in the heap using heap sort algorithm. + * This runs in {@code O(n log n)} time. + * @param rootIndex index of the start of the bucket + * @param heapSize Number of values that belong to the heap. + * Can be less than bucketSize. + * In such a case, the remaining values in range + * (rootIndex + heapSize, rootIndex + bucketSize) + * are *not* considered part of the heap. + */ + private void heapSort(long rootIndex, int heapSize) { + while (heapSize > 0) { + swap(rootIndex, rootIndex + heapSize - 1); + heapSize--; + downHeap(rootIndex, 0, heapSize); } } @@ -352,24 +359,27 @@ private void heapify(long rootIndex) { * @param rootIndex index of the start of the bucket * @param parent Index within the bucket of the parent to check. * For example, 0 is the "root". + * @param heapSize Number of values that belong to the heap. + * Can be less than bucketSize. + * In such a case, the remaining values in range + * (rootIndex + heapSize, rootIndex + bucketSize) + * are *not* considered part of the heap. */ - private void downHeap(long rootIndex, int parent) { + private void downHeap(long rootIndex, int parent, int heapSize) { while (true) { long parentIndex = rootIndex + parent; int worst = parent; long worstIndex = parentIndex; int leftChild = parent * 2 + 1; long leftIndex = rootIndex + leftChild; - if (leftChild < common.bucketSize) { + if (leftChild < heapSize) { if (betterThan(values.get(worstIndex).bytesRefView(), values.get(leftIndex).bytesRefView())) { worst = leftChild; worstIndex = leftIndex; } int rightChild = leftChild + 1; long rightIndex = rootIndex + rightChild; - if (rightChild < common.bucketSize - && betterThan(values.get(worstIndex).bytesRefView(), values.get(rightIndex).bytesRefView())) { - + if (rightChild < heapSize && betterThan(values.get(worstIndex).bytesRefView(), values.get(rightIndex).bytesRefView())) { worst = rightChild; worstIndex = rightIndex; } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/sort/IpBucketedSort.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/sort/IpBucketedSort.java index 4392d3994886c..576f36fec3b21 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/sort/IpBucketedSort.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/sort/IpBucketedSort.java @@ -21,7 +21,6 @@ import org.elasticsearch.search.sort.BucketedSort; import org.elasticsearch.search.sort.SortOrder; -import java.util.Arrays; import java.util.stream.IntStream; /** @@ -100,7 +99,7 @@ public void collect(BytesRef value, int bucket) { if (common.inHeapMode(bucket)) { if (betterThan(value, get(rootIndex, scratch1))) { set(rootIndex, value); - downHeap(rootIndex, 0); + downHeap(rootIndex, 0, common.bucketSize); } return; } @@ -115,7 +114,7 @@ public void collect(BytesRef value, int bucket) { set(index, value); if (next == 0) { common.enableHeapMode(bucket); - heapify(rootIndex); + heapify(rootIndex, common.bucketSize); } else { setNextGatherOffset(rootIndex, next - 1); } @@ -163,14 +162,12 @@ public Block toBlock(BlockFactory blockFactory, IntVector selected) { return blockFactory.newConstantNullBlock(selected.getPositionCount()); } - // Used to sort the values in the bucket. - var bucketValues = new BytesRef[common.bucketSize]; - try (var builder = blockFactory.newBytesRefBlockBuilder(selected.getPositionCount())) { for (int s = 0; s < selected.getPositionCount(); s++) { int bucket = selected.getInt(s); var bounds = getBucketValuesIndexes(bucket); + var rootIndex = bounds.v1(); var size = bounds.v2() - bounds.v1(); if (size == 0) { @@ -179,26 +176,19 @@ public Block toBlock(BlockFactory blockFactory, IntVector selected) { } if (size == 1) { - builder.appendBytesRef(get(bounds.v1(), scratch1)); + builder.appendBytesRef(get(rootIndex, scratch1)); continue; } - for (int i = 0; i < size; i++) { - bucketValues[i] = get(bounds.v1() + i, new BytesRef()); + // If we are in the gathering mode, we need to heapify before sorting. + if (common.inHeapMode(bucket) == false) { + heapify(rootIndex, (int) size); } - - // TODO: Make use of heap structures to faster iterate in order instead of copying and sorting - Arrays.sort(bucketValues, 0, (int) size); + heapSort(rootIndex, (int) size); builder.beginPositionEntry(); - if (common.order == SortOrder.ASC) { - for (int i = 0; i < size; i++) { - builder.appendBytesRef(bucketValues[i]); - } - } else { - for (int i = (int) size - 1; i >= 0; i--) { - builder.appendBytesRef(bucketValues[i]); - } + for (int i = 0; i < size; i++) { + builder.appendBytesRef(get(rootIndex + i, new BytesRef())); } builder.endPositionEntry(); } @@ -319,10 +309,28 @@ private void fillGatherOffsets(long startingAt) { * * @param rootIndex the index the start of the bucket */ - private void heapify(long rootIndex) { - int maxParent = common.bucketSize / 2 - 1; + private void heapify(long rootIndex, int heapSize) { + int maxParent = heapSize / 2 - 1; for (int parent = maxParent; parent >= 0; parent--) { - downHeap(rootIndex, parent); + downHeap(rootIndex, parent, heapSize); + } + } + + /** + * Sorts all the values in the heap using heap sort algorithm. + * This runs in {@code O(n log n)} time. + * @param rootIndex index of the start of the bucket + * @param heapSize Number of values that belong to the heap. + * Can be less than bucketSize. + * In such a case, the remaining values in range + * (rootIndex + heapSize, rootIndex + bucketSize) + * are *not* considered part of the heap. + */ + private void heapSort(long rootIndex, int heapSize) { + while (heapSize > 0) { + swap(rootIndex, rootIndex + heapSize - 1); + heapSize--; + downHeap(rootIndex, 0, heapSize); } } @@ -332,22 +340,27 @@ private void heapify(long rootIndex) { * @param rootIndex index of the start of the bucket * @param parent Index within the bucket of the parent to check. * For example, 0 is the "root". + * @param heapSize Number of values that belong to the heap. + * Can be less than bucketSize. + * In such a case, the remaining values in range + * (rootIndex + heapSize, rootIndex + bucketSize) + * are *not* considered part of the heap. */ - private void downHeap(long rootIndex, int parent) { + private void downHeap(long rootIndex, int parent, int heapSize) { while (true) { long parentIndex = rootIndex + parent; int worst = parent; long worstIndex = parentIndex; int leftChild = parent * 2 + 1; long leftIndex = rootIndex + leftChild; - if (leftChild < common.bucketSize) { + if (leftChild < heapSize) { if (betterThan(get(worstIndex, scratch1), get(leftIndex, scratch2))) { worst = leftChild; worstIndex = leftIndex; } int rightChild = leftChild + 1; long rightIndex = rootIndex + rightChild; - if (rightChild < common.bucketSize && betterThan(get(worstIndex, scratch1), get(rightIndex, scratch2))) { + if (rightChild < heapSize && betterThan(get(worstIndex, scratch1), get(rightIndex, scratch2))) { worst = rightChild; worstIndex = rightIndex; } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/sort/X-BucketedSort.java.st b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/sort/X-BucketedSort.java.st index 095d48021e9c1..39eead2ed3044 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/sort/X-BucketedSort.java.st +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/sort/X-BucketedSort.java.st @@ -20,7 +20,6 @@ import org.elasticsearch.core.Tuple; import org.elasticsearch.search.sort.BucketedSort; import org.elasticsearch.search.sort.SortOrder; -import java.util.Arrays; import java.util.stream.IntStream; /** @@ -95,7 +94,7 @@ public class $Type$BucketedSort implements Releasable { if (inHeapMode(bucket)) { if (betterThan(value, values.get(rootIndex))) { values.set(rootIndex, value); - downHeap(rootIndex, 0); + downHeap(rootIndex, 0, bucketSize); } return; } @@ -111,7 +110,7 @@ public class $Type$BucketedSort implements Releasable { values.set(index, value); if (next == 0) { heapMode.set(bucket); - heapify(rootIndex); + heapify(rootIndex, bucketSize); } else { setNextGatherOffset(rootIndex, next - 1); } @@ -172,14 +171,12 @@ public class $Type$BucketedSort implements Releasable { return blockFactory.newConstantNullBlock(selected.getPositionCount()); } - // Used to sort the values in the bucket. - var bucketValues = new $type$[bucketSize]; - try (var builder = blockFactory.new$Type$BlockBuilder(selected.getPositionCount())) { for (int s = 0; s < selected.getPositionCount(); s++) { int bucket = selected.getInt(s); var bounds = getBucketValuesIndexes(bucket); + var rootIndex = bounds.v1(); var size = bounds.v2() - bounds.v1(); if (size == 0) { @@ -192,22 +189,15 @@ public class $Type$BucketedSort implements Releasable { continue; } - for (int i = 0; i < size; i++) { - bucketValues[i] = values.get(bounds.v1() + i); + // If we are in the gathering mode, we need to heapify before sorting. + if (inHeapMode(bucket) == false) { + heapify(rootIndex, (int) size); } - - // TODO: Make use of heap structures to faster iterate in order instead of copying and sorting - Arrays.sort(bucketValues, 0, (int) size); + heapSort(rootIndex, (int) size); builder.beginPositionEntry(); - if (order == SortOrder.ASC) { - for (int i = 0; i < size; i++) { - builder.append$Type$(bucketValues[i]); - } - } else { - for (int i = (int) size - 1; i >= 0; i--) { - builder.append$Type$(bucketValues[i]); - } + for (int i = 0; i < size; i++) { + builder.append$Type$(values.get(bounds.v1() + i)); } builder.endPositionEntry(); } @@ -309,10 +299,28 @@ $endif$ * * @param rootIndex the index the start of the bucket */ - private void heapify(long rootIndex) { - int maxParent = bucketSize / 2 - 1; + private void heapify(long rootIndex, int heapSize) { + int maxParent = heapSize / 2 - 1; for (int parent = maxParent; parent >= 0; parent--) { - downHeap(rootIndex, parent); + downHeap(rootIndex, parent, heapSize); + } + } + + /** + * Sorts all the values in the heap using heap sort algorithm. + * This runs in {@code O(n log n)} time. + * @param rootIndex index of the start of the bucket + * @param heapSize Number of values that belong to the heap. + * Can be less than bucketSize. + * In such a case, the remaining values in range + * (rootIndex + heapSize, rootIndex + bucketSize) + * are *not* considered part of the heap. + */ + private void heapSort(long rootIndex, int heapSize) { + while (heapSize > 0) { + swap(rootIndex, rootIndex + heapSize - 1); + heapSize--; + downHeap(rootIndex, 0, heapSize); } } @@ -322,22 +330,27 @@ $endif$ * @param rootIndex index of the start of the bucket * @param parent Index within the bucket of the parent to check. * For example, 0 is the "root". + * @param heapSize Number of values that belong to the heap. + * Can be less than bucketSize. + * In such a case, the remaining values in range + * (rootIndex + heapSize, rootIndex + bucketSize) + * are *not* considered part of the heap. */ - private void downHeap(long rootIndex, int parent) { + private void downHeap(long rootIndex, int parent, int heapSize) { while (true) { long parentIndex = rootIndex + parent; int worst = parent; long worstIndex = parentIndex; int leftChild = parent * 2 + 1; long leftIndex = rootIndex + leftChild; - if (leftChild < bucketSize) { + if (leftChild < heapSize) { if (betterThan(values.get(worstIndex), values.get(leftIndex))) { worst = leftChild; worstIndex = leftIndex; } int rightChild = leftChild + 1; long rightIndex = rootIndex + rightChild; - if (rightChild < bucketSize && betterThan(values.get(worstIndex), values.get(rightIndex))) { + if (rightChild < heapSize && betterThan(values.get(worstIndex), values.get(rightIndex))) { worst = rightChild; worstIndex = rightIndex; }