88package org .elasticsearch .compute .data .sort ;
99
1010import org .apache .lucene .util .BytesRef ;
11+ import org .apache .lucene .util .RamUsageEstimator ;
1112import org .elasticsearch .common .breaker .CircuitBreaker ;
1213import org .elasticsearch .common .util .BigArrays ;
1314import org .elasticsearch .common .util .ByteUtils ;
1415import org .elasticsearch .common .util .ObjectArray ;
16+ import org .elasticsearch .common .util .PageCacheRecycler ;
1517import org .elasticsearch .compute .data .Block ;
1618import org .elasticsearch .compute .data .BlockFactory ;
1719import org .elasticsearch .compute .data .IntVector ;
@@ -123,7 +125,7 @@ public void collect(BytesRef value, int bucket) {
123125 // Gathering mode
124126 long requiredSize = common .endIndex (rootIndex );
125127 if (values .size () < requiredSize ) {
126- grow (requiredSize );
128+ grow (bucket );
127129 }
128130 int next = getNextGatherOffset (rootIndex );
129131 common .assertValidNextOffset (next );
@@ -143,13 +145,13 @@ public void collect(BytesRef value, int bucket) {
143145 */
144146 public void merge (int bucket , BytesRefBucketedSort other , int otherBucket ) {
145147 long otherRootIndex = other .common .rootIndex (otherBucket );
146- long otherEnd = other .common .endIndex (otherRootIndex );
147- if (otherEnd >= other .values .size ()) {
148+ if (otherRootIndex >= other .values .size ()) {
148149 // The value was never collected.
149150 return ;
150151 }
151152 other .checkInvariant (otherBucket );
152153 long otherStart = other .startIndex (otherBucket , otherRootIndex );
154+ long otherEnd = other .common .endIndex (otherRootIndex );
153155 // TODO: This can be improved for heapified buckets by making use of the heap structures
154156 for (long i = otherStart ; i < otherEnd ; i ++) {
155157 collect (other .values .get (i ).bytesRefView (), bucket );
@@ -271,13 +273,21 @@ private void swap(long lhs, long rhs) {
271273
272274 /**
273275 * Allocate storage for more buckets and store the "next gather offset"
274- * for those new buckets.
276+ * for those new buckets. We always grow the storage by whole bucket's
277+ * worth of slots at a time. We never allocate space for partial buckets.
275278 */
276- private void grow (long requiredSize ) {
279+ private void grow (int bucket ) {
277280 long oldMax = values .size ();
278- values = common .bigArrays .grow (values , requiredSize );
281+ assert oldMax % common .bucketSize == 0 ;
282+
283+ long newSizeInBuckets = BigArrays .overSize (
284+ bucket + 1 ,
285+ PageCacheRecycler .OBJECT_PAGE_SIZE ,
286+ RamUsageEstimator .NUM_BYTES_OBJECT_REF * common .bucketSize
287+ );
288+ values = common .bigArrays .resize (values , newSizeInBuckets * common .bucketSize );
279289 // Set the next gather offsets for all newly allocated buckets.
280- fillGatherOffsets (oldMax - ( oldMax % common . bucketSize ) );
290+ fillGatherOffsets (oldMax );
281291 }
282292
283293 /**
@@ -296,6 +306,7 @@ private void fillGatherOffsets(long startingAt) {
296306 bytes .grow (Integer .BYTES );
297307 bytes .setLength (Integer .BYTES );
298308 ByteUtils .writeIntLE (nextOffset , bytes .bytes (), 0 );
309+ checkInvariant (Math .toIntExact (bucketRoot / common .bucketSize ));
299310 }
300311 }
301312
0 commit comments