Skip to content

Commit e9416df

Browse files
authored
ESQL: TopNOperator, release Row on failure (#130330) (#130349)
Handles the case where the Row was released on failure, by moving the declaration to a try-with-resource clause. Resolves #130215, #130222, #130270.
1 parent 21e4024 commit e9416df

File tree

2 files changed

+36
-43
lines changed

2 files changed

+36
-43
lines changed

muted-tests.yml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -560,12 +560,6 @@ tests:
560560
- class: org.elasticsearch.xpack.esql.qa.single_node.GenerativeIT
561561
method: test
562562
issue: https://github.com/elastic/elasticsearch/issues/130067
563-
- class: org.elasticsearch.compute.operator.topn.TopNOperatorTests
564-
method: testSimpleWithCranky
565-
issue: https://github.com/elastic/elasticsearch/issues/130215
566-
- class: org.elasticsearch.xpack.esql.action.EnrichIT
567-
method: testProfile
568-
issue: https://github.com/elastic/elasticsearch/issues/130270
569563

570564
# Examples:
571565
#

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java

Lines changed: 36 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -469,51 +469,50 @@ private Iterator<Page> toPages() {
469469
p = 0;
470470
}
471471

472-
Row row = list.get(i);
473-
BytesRef keys = row.keys.bytesRefView();
474-
for (SortOrder so : sortOrders) {
475-
if (keys.bytes[keys.offset] == so.nul()) {
472+
try (Row row = list.get(i)) {
473+
BytesRef keys = row.keys.bytesRefView();
474+
for (SortOrder so : sortOrders) {
475+
if (keys.bytes[keys.offset] == so.nul()) {
476+
keys.offset++;
477+
keys.length--;
478+
continue;
479+
}
476480
keys.offset++;
477481
keys.length--;
478-
continue;
482+
builders[so.channel].decodeKey(keys);
483+
}
484+
if (keys.length != 0) {
485+
throw new IllegalArgumentException("didn't read all keys");
479486
}
480-
keys.offset++;
481-
keys.length--;
482-
builders[so.channel].decodeKey(keys);
483-
}
484-
if (keys.length != 0) {
485-
throw new IllegalArgumentException("didn't read all keys");
486-
}
487-
488-
BytesRef values = row.values.bytesRefView();
489-
for (ResultBuilder builder : builders) {
490-
builder.setNextRefCounted(row.shardRefCounter);
491-
builder.decodeValue(values);
492-
}
493-
if (values.length != 0) {
494-
throw new IllegalArgumentException("didn't read all values");
495-
}
496487

497-
list.set(i, null);
488+
BytesRef values = row.values.bytesRefView();
489+
for (ResultBuilder builder : builders) {
490+
builder.setNextRefCounted(row.shardRefCounter);
491+
builder.decodeValue(values);
492+
}
493+
if (values.length != 0) {
494+
throw new IllegalArgumentException("didn't read all values");
495+
}
498496

499-
p++;
500-
if (p == size) {
501-
Block[] blocks = new Block[builders.length];
502-
try {
503-
for (int b = 0; b < blocks.length; b++) {
504-
blocks[b] = builders[b].build();
505-
}
506-
} finally {
507-
if (blocks[blocks.length - 1] == null) {
508-
Releasables.closeExpectNoException(blocks);
497+
list.set(i, null);
498+
499+
p++;
500+
if (p == size) {
501+
Block[] blocks = new Block[builders.length];
502+
try {
503+
for (int b = 0; b < blocks.length; b++) {
504+
blocks[b] = builders[b].build();
505+
}
506+
} finally {
507+
if (blocks[blocks.length - 1] == null) {
508+
Releasables.closeExpectNoException(blocks);
509+
}
509510
}
511+
result.add(new Page(blocks));
512+
Releasables.closeExpectNoException(builders);
513+
builders = null;
510514
}
511-
result.add(new Page(blocks));
512-
Releasables.closeExpectNoException(builders);
513-
builders = null;
514515
}
515-
// It's important to close the row only after we build the new block, so we don't pre-release any shard counter.
516-
row.close();
517516
}
518517
assert builders == null;
519518
success = true;

0 commit comments

Comments
 (0)