Skip to content

Commit 8f85e2e

Browse files
authored
ESQL: Fix unreleased block in topn (#101648) (#101694)
This fixes a bug in the topn operator where it'll fail to de-track blocks if there's a failure while building the results from topn.
1 parent 8cb540f commit 8f85e2e

File tree

8 files changed

+110
-32
lines changed

8 files changed

+110
-32
lines changed

docs/changelog/101648.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 101648
2+
summary: "ESQL: Fix unreleased block in topn"
3+
area: ES|QL
4+
type: bug
5+
issues:
6+
- 101588

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Block.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1313
import org.elasticsearch.core.Nullable;
1414
import org.elasticsearch.core.Releasable;
15+
import org.elasticsearch.core.Releasables;
1516

1617
import java.util.List;
1718

@@ -207,6 +208,24 @@ interface Builder extends Releasable {
207208
* Builds the block. This method can be called multiple times.
208209
*/
209210
Block build();
211+
212+
/**
213+
* Build many {@link Block}s at once, releasing any partially built blocks
214+
* if any fail.
215+
*/
216+
static Block[] buildAll(Block.Builder... builders) {
217+
Block[] blocks = new Block[builders.length];
218+
try {
219+
for (int b = 0; b < blocks.length; b++) {
220+
blocks[b] = builders[b].build();
221+
}
222+
} finally {
223+
if (blocks[blocks.length - 1] == null) {
224+
Releasables.closeExpectNoException(blocks);
225+
}
226+
}
227+
return blocks;
228+
}
210229
}
211230

212231
/**

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -91,12 +91,6 @@ public long ramBytesUsed() {
9191
return SHALLOW_SIZE + keys.ramBytesUsed() + orderByCompositeKeyAscending.size() / Byte.SIZE + values.ramBytesUsed();
9292
}
9393

94-
private void clear() {
95-
keys.clear();
96-
orderByCompositeKeyAscending.clear();
97-
values.clear();
98-
}
99-
10094
@Override
10195
public void close() {
10296
Releasables.closeExpectNoException(keys, values);
@@ -405,7 +399,17 @@ private Iterator<Page> toPages() {
405399

406400
p++;
407401
if (p == size) {
408-
result.add(new Page(Arrays.stream(builders).map(ResultBuilder::build).toArray(Block[]::new)));
402+
Block[] blocks = new Block[builders.length];
403+
try {
404+
for (int b = 0; b < blocks.length; b++) {
405+
blocks[b] = builders[b].build();
406+
}
407+
} finally {
408+
if (blocks[blocks.length - 1] == null) {
409+
Releasables.closeExpectNoException(blocks);
410+
}
411+
}
412+
result.add(new Page(blocks));
409413
Releasables.closeExpectNoException(builders);
410414
builders = null;
411415
}

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AnyOperatorTestCase.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction;
1717
import org.elasticsearch.compute.data.BlockFactory;
1818
import org.elasticsearch.compute.data.MockBlockFactory;
19+
import org.elasticsearch.indices.CrankyCircuitBreakerService;
1920
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
2021
import org.elasticsearch.test.ESTestCase;
2122
import org.junit.After;
@@ -107,6 +108,16 @@ protected DriverContext driverContext() { // TODO make this final and return a b
107108
private final List<CircuitBreaker> breakers = new ArrayList<>();
108109
private final List<BlockFactory> blockFactories = new ArrayList<>();
109110

111+
protected final DriverContext crankyDriverContext() {
112+
CrankyCircuitBreakerService cranky = new CrankyCircuitBreakerService();
113+
BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, cranky).withCircuitBreaking();
114+
CircuitBreaker breaker = bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST);
115+
breakers.add(breaker);
116+
BlockFactory blockFactory = new MockBlockFactory(breaker, bigArrays);
117+
blockFactories.add(blockFactory);
118+
return new DriverContext(bigArrays, blockFactory);
119+
}
120+
110121
/**
111122
* A {@link DriverContext} with a breaking {@link BigArrays} and {@link BlockFactory}.
112123
*/

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/TupleBlockSourceOperator.java

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.compute.operator;
99

10+
import org.elasticsearch.compute.data.Block;
1011
import org.elasticsearch.compute.data.BlockFactory;
1112
import org.elasticsearch.compute.data.Page;
1213
import org.elasticsearch.core.Tuple;
@@ -44,23 +45,23 @@ public TupleBlockSourceOperator(BlockFactory blockFactory, List<Tuple<Long, Long
4445

4546
@Override
4647
protected Page createPage(int positionOffset, int length) {
47-
var blockBuilder1 = blockFactory.newLongBlockBuilder(length);
48-
var blockBuilder2 = blockFactory.newLongBlockBuilder(length);
49-
for (int i = 0; i < length; i++) {
50-
Tuple<Long, Long> item = values.get(positionOffset + i);
51-
if (item.v1() == null) {
52-
blockBuilder1.appendNull();
53-
} else {
54-
blockBuilder1.appendLong(item.v1());
55-
}
56-
if (item.v2() == null) {
57-
blockBuilder2.appendNull();
58-
} else {
59-
blockBuilder2.appendLong(item.v2());
48+
try (var blockBuilder1 = blockFactory.newLongBlockBuilder(length); var blockBuilder2 = blockFactory.newLongBlockBuilder(length)) {
49+
for (int i = 0; i < length; i++) {
50+
Tuple<Long, Long> item = values.get(positionOffset + i);
51+
if (item.v1() == null) {
52+
blockBuilder1.appendNull();
53+
} else {
54+
blockBuilder1.appendLong(item.v1());
55+
}
56+
if (item.v2() == null) {
57+
blockBuilder2.appendNull();
58+
} else {
59+
blockBuilder2.appendLong(item.v2());
60+
}
6061
}
62+
currentPosition += length;
63+
return new Page(Block.Builder.buildAll(blockBuilder1, blockBuilder2));
6164
}
62-
currentPosition += length;
63-
return new Page(blockBuilder1.build(), blockBuilder2.build());
6465
}
6566

6667
@Override

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.apache.lucene.tests.util.RamUsageTester;
1212
import org.apache.lucene.util.BytesRef;
1313
import org.elasticsearch.common.breaker.CircuitBreaker;
14+
import org.elasticsearch.common.breaker.CircuitBreakingException;
1415
import org.elasticsearch.common.network.NetworkAddress;
1516
import org.elasticsearch.common.unit.ByteSizeValue;
1617
import org.elasticsearch.common.util.BigArrays;
@@ -36,6 +37,7 @@
3637
import org.elasticsearch.compute.operator.SourceOperator;
3738
import org.elasticsearch.compute.operator.TupleBlockSourceOperator;
3839
import org.elasticsearch.core.Tuple;
40+
import org.elasticsearch.indices.CrankyCircuitBreakerService;
3941
import org.elasticsearch.test.ESTestCase;
4042
import org.elasticsearch.test.ListMatcher;
4143
import org.elasticsearch.xpack.versionfield.Version;
@@ -234,15 +236,29 @@ public long accumulateObject(Object o, long shallowSize, Map<Field, Object> fiel
234236

235237
public void testRandomTopN() {
236238
for (boolean asc : List.of(true, false)) {
237-
int limit = randomIntBetween(1, 20);
238-
List<Long> inputValues = randomList(0, 5000, ESTestCase::randomLong);
239-
Comparator<Long> comparator = asc ? naturalOrder() : reverseOrder();
240-
List<Long> expectedValues = inputValues.stream().sorted(comparator).limit(limit).toList();
241-
List<Long> outputValues = topNLong(inputValues, limit, asc, false);
242-
assertThat(outputValues, equalTo(expectedValues));
239+
testRandomTopN(asc, driverContext());
243240
}
244241
}
245242

243+
public void testRandomTopNCranky() {
244+
try {
245+
testRandomTopN(randomBoolean(), crankyDriverContext());
246+
logger.info("cranky didn't break us");
247+
} catch (CircuitBreakingException e) {
248+
logger.info("broken", e);
249+
assertThat(e.getMessage(), equalTo(CrankyCircuitBreakerService.ERROR_MESSAGE));
250+
}
251+
}
252+
253+
private void testRandomTopN(boolean asc, DriverContext context) {
254+
int limit = randomIntBetween(1, 20);
255+
List<Long> inputValues = randomList(0, 5000, ESTestCase::randomLong);
256+
Comparator<Long> comparator = asc ? naturalOrder() : reverseOrder();
257+
List<Long> expectedValues = inputValues.stream().sorted(comparator).limit(limit).toList();
258+
List<Long> outputValues = topNLong(context, inputValues, limit, asc, false);
259+
assertThat(outputValues, equalTo(expectedValues));
260+
}
261+
246262
public void testBasicTopN() {
247263
List<Long> values = Arrays.asList(2L, 1L, 4L, null, 5L, 10L, null, 20L, 4L, 100L);
248264
assertThat(topNLong(values, 1, true, false), equalTo(Arrays.asList(1L)));
@@ -267,8 +283,15 @@ public void testBasicTopN() {
267283
assertThat(topNLong(values, 100, false, true), equalTo(Arrays.asList(null, null, 100L, 20L, 10L, 5L, 4L, 4L, 2L, 1L)));
268284
}
269285

270-
private List<Long> topNLong(List<Long> inputValues, int limit, boolean ascendingOrder, boolean nullsFirst) {
286+
private List<Long> topNLong(
287+
DriverContext driverContext,
288+
List<Long> inputValues,
289+
int limit,
290+
boolean ascendingOrder,
291+
boolean nullsFirst
292+
) {
271293
return topNTwoColumns(
294+
driverContext,
272295
inputValues.stream().map(v -> tuple(v, 0L)).toList(),
273296
limit,
274297
List.of(LONG, LONG),
@@ -277,6 +300,10 @@ private List<Long> topNLong(List<Long> inputValues, int limit, boolean ascending
277300
).stream().map(Tuple::v1).toList();
278301
}
279302

303+
private List<Long> topNLong(List<Long> inputValues, int limit, boolean ascendingOrder, boolean nullsFirst) {
304+
return topNLong(driverContext(), inputValues, limit, ascendingOrder, nullsFirst);
305+
}
306+
280307
public void testCompareInts() {
281308
testCompare(
282309
new Page(
@@ -422,6 +449,7 @@ public void testTopNTwoColumns() {
422449
List<Tuple<Long, Long>> values = Arrays.asList(tuple(1L, 1L), tuple(1L, 2L), tuple(null, null), tuple(null, 1L), tuple(1L, null));
423450
assertThat(
424451
topNTwoColumns(
452+
driverContext(),
425453
values,
426454
5,
427455
List.of(LONG, LONG),
@@ -432,6 +460,7 @@ public void testTopNTwoColumns() {
432460
);
433461
assertThat(
434462
topNTwoColumns(
463+
driverContext(),
435464
values,
436465
5,
437466
List.of(LONG, LONG),
@@ -442,6 +471,7 @@ public void testTopNTwoColumns() {
442471
);
443472
assertThat(
444473
topNTwoColumns(
474+
driverContext(),
445475
values,
446476
5,
447477
List.of(LONG, LONG),
@@ -613,13 +643,13 @@ public void testCollectAllValues_RandomMultiValues() {
613643
}
614644

615645
private List<Tuple<Long, Long>> topNTwoColumns(
646+
DriverContext driverContext,
616647
List<Tuple<Long, Long>> inputValues,
617648
int limit,
618649
List<ElementType> elementTypes,
619650
List<TopNEncoder> encoder,
620651
List<TopNOperator.SortOrder> sortOrders
621652
) {
622-
DriverContext driverContext = driverContext();
623653
List<Tuple<Long, Long>> outputValues = new ArrayList<>();
624654
try (
625655
Driver driver = new Driver(

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractEsqlIntegTestCase.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,10 @@ public void ensureBlocksReleased() {
4848
CircuitBreakerService breakerService = internalCluster().getInstance(CircuitBreakerService.class, node);
4949
CircuitBreaker reqBreaker = breakerService.getBreaker(CircuitBreaker.REQUEST);
5050
try {
51-
assertBusy(() -> assertThat("Request breaker not reset to 0 on node: " + node, reqBreaker.getUsed(), equalTo(0L)));
51+
assertBusy(() -> {
52+
logger.info("running tasks: {}", client().admin().cluster().prepareListTasks().get());
53+
assertThat("Request breaker not reset to 0 on node: " + node, reqBreaker.getUsed(), equalTo(0L));
54+
});
5255
} catch (Exception e) {
5356
assertThat("Request breaker not reset to 0 on node: " + node, reqBreaker.getUsed(), equalTo(0L));
5457
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@
4646
import org.elasticsearch.compute.operator.topn.TopNOperator.TopNOperatorFactory;
4747
import org.elasticsearch.core.Releasables;
4848
import org.elasticsearch.core.TimeValue;
49+
import org.elasticsearch.logging.LogManager;
50+
import org.elasticsearch.logging.Logger;
4951
import org.elasticsearch.search.internal.SearchContext;
5052
import org.elasticsearch.tasks.CancellableTask;
5153
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
@@ -111,6 +113,7 @@
111113
* drivers that are used to execute the given plan.
112114
*/
113115
public class LocalExecutionPlanner {
116+
private static final Logger logger = LogManager.getLogger(LocalExecutionPlanner.class);
114117

115118
private final String sessionId;
116119
private final CancellableTask parentTask;
@@ -813,14 +816,15 @@ public List<Driver> createDrivers(String sessionId) {
813816
try {
814817
for (DriverFactory df : driverFactories) {
815818
for (int i = 0; i < df.driverParallelism.instanceCount; i++) {
819+
logger.trace("building {} {}", i, df);
816820
drivers.add(df.driverSupplier.apply(sessionId));
817821
}
818822
}
819823
success = true;
820824
return drivers;
821825
} finally {
822826
if (success == false) {
823-
Releasables.close(() -> Releasables.close(drivers));
827+
Releasables.close(Releasables.wrap(drivers));
824828
}
825829
}
826830
}

0 commit comments

Comments
 (0)