Skip to content

Commit 0409da1

Browse files
committed
Move, explain, rename
1 parent 2d39c12 commit 0409da1

File tree

1 file changed

+16
-9
lines changed
  • x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn

1 file changed

+16
-9
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -346,8 +346,7 @@ public TopNOperator(
346346
this.elementTypes = elementTypes;
347347
this.encoders = encoders;
348348
this.sortOrders = sortOrders;
349-
breaker.addEstimateBytesAndMaybeBreak(Queue.sizeOf(topCount), "esql engine topn");
350-
this.inputQueue = new Queue(breaker, topCount);
349+
this.inputQueue = Queue.build(breaker, topCount);
351350
}
352351

353352
static int compareRows(Row r1, Row r2) {
@@ -636,12 +635,20 @@ public String toString() {
636635
private static class Queue extends PriorityQueue<Row> implements Accountable, Releasable {
637636
private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(Queue.class);
638637
private final CircuitBreaker breaker;
639-
private final int maxSize;
638+
private final int topCount;
640639

641-
Queue(CircuitBreaker breaker, int maxSize) {
642-
super(maxSize);
640+
/**
641+
* Track memory usage in the breaker then build the {@link Queue}.
642+
*/
643+
static Queue build(CircuitBreaker breaker, int topCount) {
644+
breaker.addEstimateBytesAndMaybeBreak(Queue.sizeOf(topCount), "esql engine topn");
645+
return new Queue(breaker, topCount);
646+
}
647+
648+
private Queue(CircuitBreaker breaker, int topCount) {
649+
super(topCount);
643650
this.breaker = breaker;
644-
this.maxSize = maxSize;
651+
this.topCount = topCount;
645652
}
646653

647654
@Override
@@ -651,14 +658,14 @@ protected boolean lessThan(Row r1, Row r2) {
651658

652659
@Override
653660
public String toString() {
654-
return size() + "/" + maxSize;
661+
return size() + "/" + topCount;
655662
}
656663

657664
@Override
658665
public long ramBytesUsed() {
659666
long total = SHALLOW_SIZE;
660667
total += RamUsageEstimator.alignObjectSize(
661-
RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + RamUsageEstimator.NUM_BYTES_OBJECT_REF * (maxSize + 1)
668+
RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + RamUsageEstimator.NUM_BYTES_OBJECT_REF * ((long) topCount + 1)
662669
);
663670
for (Row r : this) {
664671
total += r == null ? 0 : r.ramBytesUsed();
@@ -668,7 +675,7 @@ public long ramBytesUsed() {
668675

669676
@Override
670677
public void close() {
671-
Releasables.close(Releasables.wrap(this), () -> breaker.addWithoutBreaking(-Queue.sizeOf(maxSize)));
678+
Releasables.close(Releasables.wrap(this), () -> breaker.addWithoutBreaking(-Queue.sizeOf(topCount)));
672679

673680
}
674681

0 commit comments

Comments
 (0)