Skip to content

Commit 26c4354

Browse files
authored
ESQL: TopNOperator, release Row on failure (elastic#130330)
Handles the case where the Row was released on failure, by moving the declaration to a try-with-resource clause. Resolves elastic#130215, elastic#130222, elastic#130270.
1 parent 80062dc commit 26c4354

File tree

3 files changed

+42
-46
lines changed

3 files changed

+42
-46
lines changed

docs/changelog/130330.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 130330
2+
summary: "TopNOperator, release Row on failure"
3+
area: ES|QL
4+
type: bug
5+
issues:
6+
- 130215

muted-tests.yml

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -543,9 +543,6 @@ tests:
543543
- class: org.elasticsearch.xpack.esql.qa.single_node.GenerativeIT
544544
method: test
545545
issue: https://github.com/elastic/elasticsearch/issues/130067
546-
- class: org.elasticsearch.xpack.esql.action.EnrichIT
547-
method: testTopN
548-
issue: https://github.com/elastic/elasticsearch/issues/130122
549546
- class: org.elasticsearch.action.support.ThreadedActionListenerTests
550547
method: testRejectionHandling
551548
issue: https://github.com/elastic/elasticsearch/issues/130129
@@ -560,9 +557,6 @@ tests:
560557
- class: org.elasticsearch.index.codec.vectors.cluster.KMeansLocalTests
561558
method: testKMeansNeighbors
562559
issue: https://github.com/elastic/elasticsearch/issues/130258
563-
- class: org.elasticsearch.compute.operator.topn.TopNOperatorTests
564-
method: testSimpleWithCranky
565-
issue: https://github.com/elastic/elasticsearch/issues/130215
566560
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
567561
method: test {p0=esql/10_basic/basic with documents_found}
568562
issue: https://github.com/elastic/elasticsearch/issues/130256
@@ -572,9 +566,6 @@ tests:
572566
- class: org.elasticsearch.index.IndexingPressureIT
573567
method: testWriteCanRejectOnPrimaryBasedOnMaxOperationSize
574568
issue: https://github.com/elastic/elasticsearch/issues/130281
575-
- class: org.elasticsearch.xpack.esql.action.EnrichIT
576-
method: testProfile
577-
issue: https://github.com/elastic/elasticsearch/issues/130270
578569
- class: org.elasticsearch.xpack.esql.qa.multi_node.EsqlSpecIT
579570
method: test {lookup-join.MvJoinKeyOnFrom SYNC}
580571
issue: https://github.com/elastic/elasticsearch/issues/130296

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java

Lines changed: 36 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -469,51 +469,50 @@ private Iterator<Page> toPages() {
469469
p = 0;
470470
}
471471

472-
Row row = list.get(i);
473-
BytesRef keys = row.keys.bytesRefView();
474-
for (SortOrder so : sortOrders) {
475-
if (keys.bytes[keys.offset] == so.nul()) {
472+
try (Row row = list.get(i)) {
473+
BytesRef keys = row.keys.bytesRefView();
474+
for (SortOrder so : sortOrders) {
475+
if (keys.bytes[keys.offset] == so.nul()) {
476+
keys.offset++;
477+
keys.length--;
478+
continue;
479+
}
476480
keys.offset++;
477481
keys.length--;
478-
continue;
482+
builders[so.channel].decodeKey(keys);
483+
}
484+
if (keys.length != 0) {
485+
throw new IllegalArgumentException("didn't read all keys");
479486
}
480-
keys.offset++;
481-
keys.length--;
482-
builders[so.channel].decodeKey(keys);
483-
}
484-
if (keys.length != 0) {
485-
throw new IllegalArgumentException("didn't read all keys");
486-
}
487-
488-
BytesRef values = row.values.bytesRefView();
489-
for (ResultBuilder builder : builders) {
490-
builder.setNextRefCounted(row.shardRefCounter);
491-
builder.decodeValue(values);
492-
}
493-
if (values.length != 0) {
494-
throw new IllegalArgumentException("didn't read all values");
495-
}
496487

497-
list.set(i, null);
488+
BytesRef values = row.values.bytesRefView();
489+
for (ResultBuilder builder : builders) {
490+
builder.setNextRefCounted(row.shardRefCounter);
491+
builder.decodeValue(values);
492+
}
493+
if (values.length != 0) {
494+
throw new IllegalArgumentException("didn't read all values");
495+
}
498496

499-
p++;
500-
if (p == size) {
501-
Block[] blocks = new Block[builders.length];
502-
try {
503-
for (int b = 0; b < blocks.length; b++) {
504-
blocks[b] = builders[b].build();
505-
}
506-
} finally {
507-
if (blocks[blocks.length - 1] == null) {
508-
Releasables.closeExpectNoException(blocks);
497+
list.set(i, null);
498+
499+
p++;
500+
if (p == size) {
501+
Block[] blocks = new Block[builders.length];
502+
try {
503+
for (int b = 0; b < blocks.length; b++) {
504+
blocks[b] = builders[b].build();
505+
}
506+
} finally {
507+
if (blocks[blocks.length - 1] == null) {
508+
Releasables.closeExpectNoException(blocks);
509+
}
509510
}
511+
result.add(new Page(blocks));
512+
Releasables.closeExpectNoException(builders);
513+
builders = null;
510514
}
511-
result.add(new Page(blocks));
512-
Releasables.closeExpectNoException(builders);
513-
builders = null;
514515
}
515-
// It's important to close the row only after we build the new block, so we don't pre-release any shard counter.
516-
row.close();
517516
}
518517
assert builders == null;
519518
success = true;

0 commit comments

Comments
 (0)