diff --git a/muted-tests.yml b/muted-tests.yml index 7ab855ac5c293..0616f4f92034a 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -209,9 +209,6 @@ tests: issue: https://github.com/elastic/elasticsearch/issues/120668 - class: org.elasticsearch.xpack.security.authc.ldap.ADLdapUserSearchSessionFactoryTests issue: https://github.com/elastic/elasticsearch/issues/119882 -- class: org.elasticsearch.xpack.esql.action.CrossClusterAsyncEnrichStopIT - method: testEnrichAfterStop - issue: https://github.com/elastic/elasticsearch/issues/120757 - class: org.elasticsearch.xpack.test.rest.XPackRestIT method: test {p0=ml/3rd_party_deployment/Test start deployment fails while model download in progress} issue: https://github.com/elastic/elasticsearch/issues/120810 diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeBuffer.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeBuffer.java index ce400ddbdd6f9..23c98a1df193d 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeBuffer.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeBuffer.java @@ -47,7 +47,17 @@ void addPage(Page page) { notifyNotEmpty(); } if (noMoreInputs) { - discardPages(); + // O(N) but acceptable because it only occurs with the stop API, and the queue size should be very small. + if (queue.removeIf(p -> p == page)) { + page.releaseBlocks(); + final int size = queueSize.decrementAndGet(); + if (size == maxSize - 1) { + notifyNotFull(); + } + if (size == 0) { + completionFuture.onResponse(null); + } + } } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeBufferTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeBufferTests.java index bd5b53fb25c8b..7213e0b27aea0 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeBufferTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeBufferTests.java @@ -66,6 +66,25 @@ public void testDrainPages() throws Exception { blockFactory.ensureAllBlocksAreReleased(); } + public void testOutstandingPages() throws Exception { + ExchangeBuffer buffer = new ExchangeBuffer(randomIntBetween(1000, 10000)); + var blockFactory = blockFactory(); + Page p1 = randomPage(blockFactory); + Page p2 = randomPage(blockFactory); + buffer.addPage(p1); + buffer.addPage(p2); + buffer.finish(false); + buffer.addPage(randomPage(blockFactory)); + assertThat(buffer.size(), equalTo(2)); + assertSame(buffer.pollPage(), p1); + p1.releaseBlocks(); + assertSame(buffer.pollPage(), p2); + p2.releaseBlocks(); + assertNull(buffer.pollPage()); + assertTrue(buffer.isFinished()); + blockFactory.ensureAllBlocksAreReleased(); + } + private static MockBlockFactory blockFactory() { BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, ByteSizeValue.ofGb(1)).withCircuitBreaking(); CircuitBreaker breaker = bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST);