diff --git a/muted-tests.yml b/muted-tests.yml index bb8327adef475..32fcf5a1f3d26 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -555,9 +555,6 @@ tests: - class: org.elasticsearch.xpack.esql.action.EsqlRemoteErrorWrapIT method: testThatRemoteErrorsAreWrapped issue: https://github.com/elastic/elasticsearch/issues/130794 -- class: org.elasticsearch.xpack.esql.qa.multi_node.EsqlSpecIT - method: test {ints.InCast SYNC} - issue: https://github.com/elastic/elasticsearch/issues/130796 - class: org.elasticsearch.backwards.MixedClusterClientYamlTestSuiteIT method: test {p0=mtermvectors/10_basic/Tests catching other exceptions per item} issue: https://github.com/elastic/elasticsearch/issues/122414 diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AbstractPageMappingToIteratorOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AbstractPageMappingToIteratorOperator.java index 055359c6a389a..09960a9c48e65 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AbstractPageMappingToIteratorOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AbstractPageMappingToIteratorOperator.java @@ -321,6 +321,7 @@ public TransportVersion getMinimalSupportedVersion() { private static class AppendBlocksIterator implements ReleasableIterator { private final Page page; private final ReleasableIterator next; + private boolean closed = false; private int positionOffset; @@ -348,7 +349,15 @@ public final Page next() { for (int b = 0; b < page.getBlockCount(); b++) { page.getBlock(b).incRef(); } - return page.appendBlocks(read); + final Page result = page.appendBlocks(read); + // We need to release the blocks of the page in this iteration instead of delaying to the next, + // because the blocks of this page are now shared with the output page. The output page can be + // passed to a separate driver, which may run concurrently with this driver, leading to data races + // of references in AbstractNonThreadSafeRefCounted, which is not thread-safe. + // An alternative would be to make RefCounted for Vectors/Blocks thread-safe when they are about + // to be shared with other drivers via #allowPassingToDifferentDriver. + close(); + return result; } Block[] newBlocks = new Block[page.getBlockCount() + read.length]; System.arraycopy(read, 0, newBlocks, page.getBlockCount(), read.length); @@ -368,7 +377,10 @@ public final Page next() { @Override public void close() { - Releasables.closeExpectNoException(page::releaseBlocks, next); + if (closed == false) { + closed = true; + Releasables.closeExpectNoException(page::releaseBlocks, next); + } } } }