Skip to content

Commit 43611bc

Browse files
committed
Apply review comments
1 parent cff81d9 commit 43611bc

File tree

1 file changed

+9
-10
lines changed
  • x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn

1 file changed

+9
-10
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.elasticsearch.core.Releasable;
2525
import org.elasticsearch.core.Releasables;
2626

27-
import java.nio.charset.Charset;
2827
import java.util.ArrayList;
2928
import java.util.Arrays;
3029
import java.util.Collections;
@@ -282,7 +281,7 @@ public String describe() {
282281

283282
private final BlockFactory blockFactory;
284283
private final CircuitBreaker breaker;
285-
private final Map<String, Queue> inputQueues;
284+
private final Map<BytesRef, Queue> inputQueues;
286285

287286
private final int topCount;
288287
private final int maxPageSize;
@@ -408,7 +407,7 @@ public void addInput(Page page) {
408407
spareKeysPreAllocSize = Math.max(spare.keys.length(), spareKeysPreAllocSize / 2);
409408
spareValuesPreAllocSize = Math.max(spare.values.length(), spareValuesPreAllocSize / 2);
410409

411-
String partitionKey = getPartitionKey(page, i);
410+
BytesRef partitionKey = getPartitionKey(page, i);
412411
Queue inputQueue = inputQueues.computeIfAbsent(partitionKey, key -> new Queue(topCount));
413412
spare = inputQueue.insertWithOverflow(spare);
414413
}
@@ -426,19 +425,19 @@ public void addInput(Page page) {
426425
* @param i row index
427426
* @return partition key of the i-th row of the given page
428427
*/
429-
private String getPartitionKey(Page page, int i) {
428+
private BytesRef getPartitionKey(Page page, int i) {
430429
if (partitions.isEmpty()) {
431-
return "";
430+
return new BytesRef();
432431
}
433432
assert page.getPositionCount() > 0;
434-
StringBuilder builder = new StringBuilder();
433+
BreakingBytesRefBuilder builder = new BreakingBytesRefBuilder(breaker, "topn");
435434
for (Partition partition : partitions) {
436-
try (var block = page.getBlock(partition.channel).filter(i)) {
435+
try (var block = page.getBlock(partition.channel)) {
437436
BytesRef partitionFieldValue = ((BytesRefBlock) block).getBytesRef(i, new BytesRef());
438-
builder.append(partitionFieldValue.utf8ToString());
437+
builder.append(partitionFieldValue);
439438
}
440439
}
441-
return builder.toString();
440+
return builder.bytesRefView();
442441
}
443442

444443
@Override
@@ -604,7 +603,7 @@ public long ramBytesUsed() {
604603
size += RamUsageEstimator.alignObjectSize(arrHeader + ref * sortOrders.size());
605604
size += sortOrders.size() * SortOrder.SHALLOW_SIZE;
606605
long ramBytesUsedSum = inputQueues.entrySet().stream()
607-
.mapToLong(e -> e.getKey().getBytes(Charset.defaultCharset()).length + e.getValue().ramBytesUsed())
606+
.mapToLong(e -> e.getKey().bytes.length + e.getValue().ramBytesUsed())
608607
.sum();
609608
size += ramBytesUsedSum;
610609
return size;

0 commit comments

Comments
 (0)