Skip to content

Commit 2d39c12

Browse files
committed
More
1 parent 1920982 commit 2d39c12

File tree

6 files changed

+73
-23
lines changed

6 files changed

+73
-23
lines changed

test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,11 @@ public void testSortByManyLongsGiantTopNTooMuchMemory() throws IOException {
212212
assertCircuitBreaks(attempt -> sortBySomeLongsLimit(attempt * 500000));
213213
}
214214

215+
public void testStupidTopN() throws IOException {
216+
initManyLongs(1); // Doesn't actually matter how much data there is.
217+
assertCircuitBreaks(attempt -> sortBySomeLongsLimit(2147483630));
218+
}
219+
215220
private static final int MAX_ATTEMPTS = 5;
216221

217222
interface TryCircuitBreaking {

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,16 @@
2424
import org.apache.lucene.util.RamUsageEstimator;
2525
import org.elasticsearch.common.Strings;
2626
import org.elasticsearch.common.breaker.CircuitBreaker;
27-
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
2827
import org.elasticsearch.compute.data.BlockFactory;
2928
import org.elasticsearch.compute.data.DocBlock;
3029
import org.elasticsearch.compute.data.DocVector;
3130
import org.elasticsearch.compute.data.DoubleBlock;
3231
import org.elasticsearch.compute.data.DoubleVector;
33-
import org.elasticsearch.compute.data.ElementType;
3432
import org.elasticsearch.compute.data.IntBlock;
3533
import org.elasticsearch.compute.data.IntVector;
3634
import org.elasticsearch.compute.data.Page;
3735
import org.elasticsearch.compute.operator.DriverContext;
3836
import org.elasticsearch.compute.operator.SourceOperator;
39-
import org.elasticsearch.core.Releasable;
4037
import org.elasticsearch.core.Releasables;
4138
import org.elasticsearch.search.sort.SortAndFormats;
4239
import org.elasticsearch.search.sort.SortBuilder;
@@ -168,7 +165,7 @@ public LuceneTopNSourceOperator(
168165
this.estimatedPerRowSortSize = estimatedPerRowSortSize;
169166
this.limit = limit;
170167
this.needsScore = needsScore;
171-
breaker.addEstimateBytesAndMaybeBreak(reserveSize(), "esql topn");
168+
breaker.addEstimateBytesAndMaybeBreak(reserveSize(), "esql lucene topn");
172169
}
173170

174171
@Override

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java

Lines changed: 42 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -294,14 +294,14 @@ public String describe() {
294294

295295
private final BlockFactory blockFactory;
296296
private final CircuitBreaker breaker;
297-
private final Queue inputQueue;
298297

299298
private final int maxPageSize;
300299

301300
private final List<ElementType> elementTypes;
302301
private final List<TopNEncoder> encoders;
303302
private final List<SortOrder> sortOrders;
304303

304+
private Queue inputQueue;
305305
private Row spare;
306306
private int spareValuesPreAllocSize = 0;
307307
private int spareKeysPreAllocSize = 0;
@@ -346,7 +346,8 @@ public TopNOperator(
346346
this.elementTypes = elementTypes;
347347
this.encoders = encoders;
348348
this.sortOrders = sortOrders;
349-
this.inputQueue = new Queue(topCount);
349+
breaker.addEstimateBytesAndMaybeBreak(Queue.sizeOf(topCount), "esql engine topn");
350+
this.inputQueue = new Queue(breaker, topCount);
350351
}
351352

352353
static int compareRows(Row r1, Row r2) {
@@ -457,6 +458,8 @@ private Iterator<Page> toPages() {
457458
list.add(inputQueue.pop());
458459
}
459460
Collections.reverse(list);
461+
inputQueue.close();
462+
inputQueue = null;
460463

461464
int p = 0;
462465
int size = 0;
@@ -563,19 +566,27 @@ public Page getOutput() {
563566

564567
@Override
565568
public void close() {
566-
/*
567-
* If we close before calling finish then spare and inputQueue will be live rows
568-
* that need closing. If we close after calling finish then the output iterator
569-
* will contain pages of results that have yet to be returned.
570-
*/
571569
Releasables.closeExpectNoException(
570+
/*
571+
* The spare is used during most collections. It's cleared when this Operator
572+
* is finish()ed. So it could be null here.
573+
*/
572574
spare,
573-
inputQueue == null ? null : Releasables.wrap(inputQueue),
575+
/*
576+
* The inputQueue is a min heap of all live rows. Closing it will close all
577+
* the rows it contains and all decrement the breaker for the size of
578+
* the heap itself.
579+
*/
580+
inputQueue,
581+
/*
582+
* If we're in the process of outputting pages then output will contain all
583+
* allocated but un-emitted pages.
584+
*/
574585
output == null ? null : Releasables.wrap(() -> Iterators.map(output, p -> p::releaseBlocks))
575586
);
576587
}
577588

578-
private static long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(TopNOperator.class) + RamUsageEstimator
589+
private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(TopNOperator.class) + RamUsageEstimator
579590
.shallowSizeOfInstance(List.class) * 3;
580591

581592
@Override
@@ -589,7 +600,9 @@ public long ramBytesUsed() {
589600
size += RamUsageEstimator.alignObjectSize(arrHeader + ref * encoders.size());
590601
size += RamUsageEstimator.alignObjectSize(arrHeader + ref * sortOrders.size());
591602
size += sortOrders.size() * SortOrder.SHALLOW_SIZE;
592-
size += inputQueue.ramBytesUsed();
603+
if (inputQueue != null) {
604+
size += inputQueue.ramBytesUsed();
605+
}
593606
return size;
594607
}
595608

@@ -598,7 +611,7 @@ public Status status() {
598611
return new TopNOperatorStatus(
599612
receiveNanos,
600613
emitNanos,
601-
inputQueue.size(),
614+
inputQueue != null ? inputQueue.size() : 0,
602615
ramBytesUsed(),
603616
pagesReceived,
604617
pagesEmitted,
@@ -620,16 +633,14 @@ public String toString() {
620633
+ "]";
621634
}
622635

623-
CircuitBreaker breaker() {
624-
return breaker;
625-
}
626-
627-
private static class Queue extends PriorityQueue<Row> implements Accountable {
636+
private static class Queue extends PriorityQueue<Row> implements Accountable, Releasable {
628637
private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(Queue.class);
638+
private final CircuitBreaker breaker;
629639
private final int maxSize;
630640

631-
Queue(int maxSize) {
641+
Queue(CircuitBreaker breaker, int maxSize) {
632642
super(maxSize);
643+
this.breaker = breaker;
633644
this.maxSize = maxSize;
634645
}
635646

@@ -654,5 +665,19 @@ public long ramBytesUsed() {
654665
}
655666
return total;
656667
}
668+
669+
@Override
670+
public void close() {
671+
Releasables.close(Releasables.wrap(this), () -> breaker.addWithoutBreaking(-Queue.sizeOf(maxSize)));
672+
673+
}
674+
675+
public static long sizeOf(int topCount) {
676+
long total = SHALLOW_SIZE;
677+
total += RamUsageEstimator.alignObjectSize(
678+
RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + RamUsageEstimator.NUM_BYTES_OBJECT_REF * ((long) topCount + 1)
679+
);
680+
return total;
681+
}
657682
}
658683
}

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1489,7 +1489,9 @@ public void testRowResizes() {
14891489
block.decRef();
14901490
op.addInput(new Page(blocks));
14911491

1492-
assertThat(breaker.getMemoryRequestCount(), is(94L));
1492+
// 94 are from the collection process
1493+
// 1 is for the min-heap itself
1494+
assertThat(breaker.getMemoryRequestCount(), is(95L));
14931495
}
14941496
}
14951497

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsQueryExec.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,12 @@ public interface Sort {
6969
Order.OrderDirection direction();
7070

7171
FieldAttribute field();
72+
73+
/**
74+
* Type of the <strong>result</strong> of the sort. For example,
75+
* geo distance will be {@link DataType#DOUBLE}.
76+
*/
77+
DataType resulType();
7278
}
7379

7480
public record FieldSort(FieldAttribute field, Order.OrderDirection direction, Order.NullsPosition nulls) implements Sort {
@@ -80,6 +86,11 @@ public SortBuilder<?> sortBuilder() {
8086
builder.unmappedType(field.dataType().esType());
8187
return builder;
8288
}
89+
90+
@Override
91+
public DataType resulType() {
92+
return field.dataType();
93+
}
8394
}
8495

8596
public record GeoDistanceSort(FieldAttribute field, Order.OrderDirection direction, double lat, double lon) implements Sort {
@@ -89,6 +100,11 @@ public SortBuilder<?> sortBuilder() {
89100
builder.order(Direction.from(direction).asOrder());
90101
return builder;
91102
}
103+
104+
@Override
105+
public DataType resulType() {
106+
return DataType.DOUBLE;
107+
}
92108
}
93109

94110
public record ScoreSort(Order.OrderDirection direction) implements Sort {
@@ -102,6 +118,11 @@ public FieldAttribute field() {
102118
// TODO: refactor this: not all Sorts are backed by FieldAttributes
103119
return null;
104120
}
121+
122+
@Override
123+
public DataType resulType() {
124+
return DataType.DOUBLE;
125+
}
105126
}
106127

107128
public record QueryBuilderAndTags(QueryBuilder query, List<Object> tags) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec,
298298
long estimatedPerRowSortSize = 0;
299299
for (Sort sort : sorts) {
300300
sortBuilders.add(sort.sortBuilder());
301-
estimatedPerRowSortSize += EstimatesRowSize.estimateSize(sort.field().dataType());
301+
estimatedPerRowSortSize += EstimatesRowSize.estimateSize(sort.resulType());
302302
}
303303
/*
304304
* In the worst case Lucene's TopN keeps each value in memory twice. Once

0 commit comments

Comments
 (0)