From 924eb89d212f3b4b9371f12e8d0a7d07c126e126 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 5 Feb 2025 22:51:05 -0800 Subject: [PATCH] Keep outstanding pages when finish buffer early (#121857) Today, the exchange buffer of an exchange source is finished in two cases: (1) when the downstream pipeline has received enough data and (2) when all remote sinks have completed. In the first case, outstanding pages could be safely discarded. In the second case, no new pages should be received after finishing. In both scenarios, discarding all outstanding pages was safe if noMoreInputs was switched while adding pages. However, with the stop API, the buffer may now finish while keeping outstanding pages, and new pages may still be received. This change updates the exchange buffer to discard only the incoming page when noMoreInputs is switched, rather than all pages in the buffer. Closes #120757 --- muted-tests.yml | 3 --- .../operator/exchange/ExchangeBuffer.java | 12 +++++++++++- .../exchange/ExchangeBufferTests.java | 19 +++++++++++++++++++ 3 files changed, 30 insertions(+), 4 deletions(-) diff --git a/muted-tests.yml b/muted-tests.yml index 7d950c4f5e59a..7d50ab26b8b1b 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -209,9 +209,6 @@ tests: issue: https://github.com/elastic/elasticsearch/issues/120668 - class: org.elasticsearch.xpack.security.authc.ldap.ADLdapUserSearchSessionFactoryTests issue: https://github.com/elastic/elasticsearch/issues/119882 -- class: org.elasticsearch.xpack.esql.action.CrossClusterAsyncEnrichStopIT - method: testEnrichAfterStop - issue: https://github.com/elastic/elasticsearch/issues/120757 - class: org.elasticsearch.xpack.test.rest.XPackRestIT method: test {p0=ml/3rd_party_deployment/Test start deployment fails while model download in progress} issue: https://github.com/elastic/elasticsearch/issues/120810 diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeBuffer.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeBuffer.java index ce400ddbdd6f9..23c98a1df193d 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeBuffer.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeBuffer.java @@ -47,7 +47,17 @@ void addPage(Page page) { notifyNotEmpty(); } if (noMoreInputs) { - discardPages(); + // O(N) but acceptable because it only occurs with the stop API, and the queue size should be very small. + if (queue.removeIf(p -> p == page)) { + page.releaseBlocks(); + final int size = queueSize.decrementAndGet(); + if (size == maxSize - 1) { + notifyNotFull(); + } + if (size == 0) { + completionFuture.onResponse(null); + } + } } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeBufferTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeBufferTests.java index bd5b53fb25c8b..7213e0b27aea0 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeBufferTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeBufferTests.java @@ -66,6 +66,25 @@ public void testDrainPages() throws Exception { blockFactory.ensureAllBlocksAreReleased(); } + public void testOutstandingPages() throws Exception { + ExchangeBuffer buffer = new ExchangeBuffer(randomIntBetween(1000, 10000)); + var blockFactory = blockFactory(); + Page p1 = randomPage(blockFactory); + Page p2 = randomPage(blockFactory); + buffer.addPage(p1); + buffer.addPage(p2); + buffer.finish(false); + buffer.addPage(randomPage(blockFactory)); + assertThat(buffer.size(), equalTo(2)); + assertSame(buffer.pollPage(), p1); + p1.releaseBlocks(); + assertSame(buffer.pollPage(), p2); + p2.releaseBlocks(); + assertNull(buffer.pollPage()); + assertTrue(buffer.isFinished()); + blockFactory.ensureAllBlocksAreReleased(); + } + private static MockBlockFactory blockFactory() { BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, ByteSizeValue.ofGb(1)).withCircuitBreaking(); CircuitBreaker breaker = bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST);