From 4b2955255804ff3379be4de347ca133b498d90af Mon Sep 17 00:00:00 2001 From: Gal Lalouche Date: Mon, 30 Jun 2025 14:49:45 +0300 Subject: [PATCH 1/2] ESQL: TopNOperator, release Row on failure Resolves #130215, #130222, #130270. --- muted-tests.yml | 9 --- .../compute/operator/topn/TopNOperator.java | 73 +++++++++---------- 2 files changed, 36 insertions(+), 46 deletions(-) diff --git a/muted-tests.yml b/muted-tests.yml index 6e318f8949954..bbb0f1d9e3c08 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -552,9 +552,6 @@ tests: - class: org.elasticsearch.xpack.esql.qa.single_node.GenerativeIT method: test issue: https://github.com/elastic/elasticsearch/issues/130067 -- class: org.elasticsearch.xpack.esql.action.EnrichIT - method: testTopN - issue: https://github.com/elastic/elasticsearch/issues/130122 - class: org.elasticsearch.action.support.ThreadedActionListenerTests method: testRejectionHandling issue: https://github.com/elastic/elasticsearch/issues/130129 @@ -569,9 +566,6 @@ tests: - class: org.elasticsearch.index.codec.vectors.cluster.KMeansLocalTests method: testKMeansNeighbors issue: https://github.com/elastic/elasticsearch/issues/130258 -- class: org.elasticsearch.compute.operator.topn.TopNOperatorTests - method: testSimpleWithCranky - issue: https://github.com/elastic/elasticsearch/issues/130215 - class: org.elasticsearch.xpack.test.rest.XPackRestIT method: test {p0=esql/10_basic/basic with documents_found} issue: https://github.com/elastic/elasticsearch/issues/130256 @@ -581,9 +575,6 @@ tests: - class: org.elasticsearch.index.IndexingPressureIT method: testWriteCanRejectOnPrimaryBasedOnMaxOperationSize issue: https://github.com/elastic/elasticsearch/issues/130281 -- class: org.elasticsearch.xpack.esql.action.EnrichIT - method: testProfile - issue: https://github.com/elastic/elasticsearch/issues/130270 - class: org.elasticsearch.xpack.esql.qa.multi_node.EsqlSpecIT method: test {lookup-join.MvJoinKeyOnFrom SYNC} issue: https://github.com/elastic/elasticsearch/issues/130296 diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java index fdf88cf8f55b4..fde51d4642ae0 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java @@ -469,51 +469,50 @@ private Iterator toPages() { p = 0; } - Row row = list.get(i); - BytesRef keys = row.keys.bytesRefView(); - for (SortOrder so : sortOrders) { - if (keys.bytes[keys.offset] == so.nul()) { + try (Row row = list.get(i)) { + BytesRef keys = row.keys.bytesRefView(); + for (SortOrder so : sortOrders) { + if (keys.bytes[keys.offset] == so.nul()) { + keys.offset++; + keys.length--; + continue; + } keys.offset++; keys.length--; - continue; + builders[so.channel].decodeKey(keys); + } + if (keys.length != 0) { + throw new IllegalArgumentException("didn't read all keys"); } - keys.offset++; - keys.length--; - builders[so.channel].decodeKey(keys); - } - if (keys.length != 0) { - throw new IllegalArgumentException("didn't read all keys"); - } - - BytesRef values = row.values.bytesRefView(); - for (ResultBuilder builder : builders) { - builder.setNextRefCounted(row.shardRefCounter); - builder.decodeValue(values); - } - if (values.length != 0) { - throw new IllegalArgumentException("didn't read all values"); - } - list.set(i, null); + BytesRef values = row.values.bytesRefView(); + for (ResultBuilder builder : builders) { + builder.setNextRefCounted(row.shardRefCounter); + builder.decodeValue(values); + } + if (values.length != 0) { + throw new IllegalArgumentException("didn't read all values"); + } - p++; - if (p == size) { - Block[] blocks = new Block[builders.length]; - try { - for (int b = 0; b < blocks.length; b++) { - blocks[b] = builders[b].build(); - } - } finally { - if (blocks[blocks.length - 1] == null) { - Releasables.closeExpectNoException(blocks); + list.set(i, null); + + p++; + if (p == size) { + Block[] blocks = new Block[builders.length]; + try { + for (int b = 0; b < blocks.length; b++) { + blocks[b] = builders[b].build(); + } + } finally { + if (blocks[blocks.length - 1] == null) { + Releasables.closeExpectNoException(blocks); + } } + result.add(new Page(blocks)); + Releasables.closeExpectNoException(builders); + builders = null; } - result.add(new Page(blocks)); - Releasables.closeExpectNoException(builders); - builders = null; } - // It's important to close the row only after we build the new block, so we don't pre-release any shard counter. - row.close(); } assert builders == null; success = true; From f279769e009658ee7acc133d4a4853ebf076e808 Mon Sep 17 00:00:00 2001 From: Gal Lalouche Date: Mon, 30 Jun 2025 15:49:52 +0300 Subject: [PATCH 2/2] Update docs/changelog/130330.yaml --- docs/changelog/130330.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 docs/changelog/130330.yaml diff --git a/docs/changelog/130330.yaml b/docs/changelog/130330.yaml new file mode 100644 index 0000000000000..33dd45014575c --- /dev/null +++ b/docs/changelog/130330.yaml @@ -0,0 +1,6 @@ +pr: 130330 +summary: "TopNOperator, release Row on failure" +area: ES|QL +type: bug +issues: + - 130215