Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/130330.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 130330
summary: "TopNOperator, release Row on failure"
area: ES|QL
type: bug
issues:
- 130215
9 changes: 0 additions & 9 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -549,9 +549,6 @@ tests:
- class: org.elasticsearch.xpack.esql.qa.single_node.GenerativeIT
method: test
issue: https://github.com/elastic/elasticsearch/issues/130067
- class: org.elasticsearch.xpack.esql.action.EnrichIT
method: testTopN
issue: https://github.com/elastic/elasticsearch/issues/130122
- class: org.elasticsearch.action.support.ThreadedActionListenerTests
method: testRejectionHandling
issue: https://github.com/elastic/elasticsearch/issues/130129
Expand All @@ -566,9 +563,6 @@ tests:
- class: org.elasticsearch.index.codec.vectors.cluster.KMeansLocalTests
method: testKMeansNeighbors
issue: https://github.com/elastic/elasticsearch/issues/130258
- class: org.elasticsearch.compute.operator.topn.TopNOperatorTests
method: testSimpleWithCranky
issue: https://github.com/elastic/elasticsearch/issues/130215
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
method: test {p0=esql/10_basic/basic with documents_found}
issue: https://github.com/elastic/elasticsearch/issues/130256
Expand All @@ -578,9 +572,6 @@ tests:
- class: org.elasticsearch.index.IndexingPressureIT
method: testWriteCanRejectOnPrimaryBasedOnMaxOperationSize
issue: https://github.com/elastic/elasticsearch/issues/130281
- class: org.elasticsearch.xpack.esql.action.EnrichIT
method: testProfile
issue: https://github.com/elastic/elasticsearch/issues/130270
- class: org.elasticsearch.xpack.esql.qa.multi_node.EsqlSpecIT
method: test {lookup-join.MvJoinKeyOnFrom SYNC}
issue: https://github.com/elastic/elasticsearch/issues/130296
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,51 +469,50 @@ private Iterator<Page> toPages() {
p = 0;
}

Row row = list.get(i);
Copy link
Contributor Author

@GalLalouche GalLalouche Jun 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line was basically moved to the try-with-resource clause. Everything else is indentation.

BytesRef keys = row.keys.bytesRefView();
for (SortOrder so : sortOrders) {
if (keys.bytes[keys.offset] == so.nul()) {
try (Row row = list.get(i)) {
BytesRef keys = row.keys.bytesRefView();
for (SortOrder so : sortOrders) {
if (keys.bytes[keys.offset] == so.nul()) {
keys.offset++;
keys.length--;
continue;
}
keys.offset++;
keys.length--;
continue;
builders[so.channel].decodeKey(keys);
}
if (keys.length != 0) {
throw new IllegalArgumentException("didn't read all keys");
}
keys.offset++;
keys.length--;
builders[so.channel].decodeKey(keys);
}
if (keys.length != 0) {
throw new IllegalArgumentException("didn't read all keys");
}

BytesRef values = row.values.bytesRefView();
for (ResultBuilder builder : builders) {
builder.setNextRefCounted(row.shardRefCounter);
builder.decodeValue(values);
}
if (values.length != 0) {
throw new IllegalArgumentException("didn't read all values");
}

list.set(i, null);
BytesRef values = row.values.bytesRefView();
for (ResultBuilder builder : builders) {
builder.setNextRefCounted(row.shardRefCounter);
builder.decodeValue(values);
}
if (values.length != 0) {
throw new IllegalArgumentException("didn't read all values");
}

p++;
if (p == size) {
Block[] blocks = new Block[builders.length];
try {
for (int b = 0; b < blocks.length; b++) {
blocks[b] = builders[b].build();
}
} finally {
if (blocks[blocks.length - 1] == null) {
Releasables.closeExpectNoException(blocks);
list.set(i, null);

p++;
if (p == size) {
Block[] blocks = new Block[builders.length];
try {
for (int b = 0; b < blocks.length; b++) {
blocks[b] = builders[b].build();
}
} finally {
if (blocks[blocks.length - 1] == null) {
Releasables.closeExpectNoException(blocks);
}
}
result.add(new Page(blocks));
Releasables.closeExpectNoException(builders);
builders = null;
}
result.add(new Page(blocks));
Releasables.closeExpectNoException(builders);
builders = null;
}
// It's important to close the row only after we build the new block, so we don't pre-release any shard counter.
row.close();
}
assert builders == null;
success = true;
Expand Down
Loading