diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java index ebbddaeeb0d21..68f684cdf9dcd 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java @@ -8,8 +8,6 @@ package org.elasticsearch.compute.operator.exchange; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRunnable; -import org.elasticsearch.action.support.RefCountingRunnable; import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; @@ -19,7 +17,6 @@ import org.elasticsearch.core.Releasable; import org.elasticsearch.tasks.TaskCancelledException; -import java.util.List; import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; @@ -51,28 +48,12 @@ public final class ExchangeSourceHandler { * @param maxBufferSize the maximum size of the exchange buffer. A larger buffer reduces ``pauses`` but uses more memory, * which could otherwise be allocated for other purposes. * @param fetchExecutor the executor used to fetch pages. - * @param completionListener a listener that will be notified when the exchange source handler completes */ - public ExchangeSourceHandler(int maxBufferSize, Executor fetchExecutor, ActionListener completionListener) { + public ExchangeSourceHandler(int maxBufferSize, Executor fetchExecutor) { this.buffer = new ExchangeBuffer(maxBufferSize); this.fetchExecutor = fetchExecutor; this.outstandingSinks = new PendingInstances(() -> buffer.finish(false)); - final PendingInstances closingSinks = new PendingInstances(() -> {}); - closingSinks.trackNewInstance(); - this.outstandingSources = new PendingInstances(() -> finishEarly(true, ActionListener.running(closingSinks::finishInstance))); - buffer.addCompletionListener(ActionListener.running(() -> { - final ActionListener listener = ActionListener.assertAtLeastOnce(completionListener); - try (RefCountingRunnable refs = new RefCountingRunnable(ActionRunnable.run(listener, this::checkFailure))) { - closingSinks.completion.addListener(refs.acquireListener()); - for (PendingInstances pending : List.of(outstandingSinks, outstandingSources)) { - // Create an outstanding instance and then finish to complete the completionListener - // if we haven't registered any instances of exchange sinks or exchange sources before. - pending.trackNewInstance(); - pending.completion.addListener(refs.acquireListener()); - pending.finishInstance(); - } - } - })); + this.outstandingSources = new PendingInstances(() -> finishEarly(true, ActionListener.noop())); } private void checkFailure() { @@ -271,7 +252,13 @@ public void addRemoteSink( final ActionListener sinkListener = ActionListener.assertAtLeastOnce( ActionListener.notifyOnce(ActionListener.runBefore(listener, () -> remoteSinks.remove(sinkId))) ); + final Releasable emptySink = addEmptySink(); fetchExecutor.execute(new AbstractRunnable() { + @Override + public void onAfter() { + emptySink.close(); + } + @Override public void onFailure(Exception e) { if (failFast) { diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java index f1f1f77623220..35ccf0da42963 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java @@ -334,8 +334,7 @@ public void testResumeOnEarlyFinish() throws Exception { DriverContext driverContext = driverContext(); ThreadPool threadPool = threadPool(); try { - PlainActionFuture sourceFuture = new PlainActionFuture<>(); - var sourceHandler = new ExchangeSourceHandler(between(1, 5), threadPool.executor("esql"), sourceFuture); + var sourceHandler = new ExchangeSourceHandler(between(1, 5), threadPool.executor("esql")); var sinkHandler = new ExchangeSinkHandler(driverContext.blockFactory(), between(1, 5), System::currentTimeMillis); var sourceOperator = new ExchangeSourceOperator(sourceHandler.createExchangeSource()); var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(() -> {}), Function.identity()); @@ -351,7 +350,6 @@ public void testResumeOnEarlyFinish() throws Exception { sinkHandler.fetchPageAsync(true, ActionListener.noop()); future.actionGet(5, TimeUnit.SECONDS); assertThat(driver.status().status(), equalTo(DriverStatus.Status.DONE)); - sourceFuture.actionGet(5, TimeUnit.SECONDS); } finally { terminate(threadPool); } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ForkingOperatorTestCase.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ForkingOperatorTestCase.java index 6b036dea5f749..f08552913963d 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ForkingOperatorTestCase.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ForkingOperatorTestCase.java @@ -216,11 +216,7 @@ List createDriversForInput(List input, List results, boolean randomIntBetween(2, 10), threadPool.relativeTimeInMillisSupplier() ); - ExchangeSourceHandler sourceExchanger = new ExchangeSourceHandler( - randomIntBetween(1, 4), - threadPool.executor(ESQL_TEST_EXECUTOR), - ActionListener.noop() - ); + ExchangeSourceHandler sourceExchanger = new ExchangeSourceHandler(randomIntBetween(1, 4), threadPool.executor(ESQL_TEST_EXECUTOR)); sourceExchanger.addRemoteSink( sinkExchanger::fetchPageAsync, randomBoolean(), diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java index 2927bc5439af6..57dfe65ca485f 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java @@ -12,6 +12,7 @@ import org.elasticsearch.TransportVersion; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.RefCountingListener; import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.node.VersionInformation; @@ -105,16 +106,16 @@ public void testBasic() throws Exception { AtomicInteger pagesAddedToSink = new AtomicInteger(); ExchangeSink sink1 = sinkExchanger.createExchangeSink(pagesAddedToSink::incrementAndGet); ExchangeSink sink2 = sinkExchanger.createExchangeSink(pagesAddedToSink::incrementAndGet); - PlainActionFuture sourceCompletion = new PlainActionFuture<>(); - ExchangeSourceHandler sourceExchanger = new ExchangeSourceHandler(3, threadPool.executor(ESQL_TEST_EXECUTOR), sourceCompletion); + ExchangeSourceHandler sourceExchanger = new ExchangeSourceHandler(3, threadPool.executor(ESQL_TEST_EXECUTOR)); ExchangeSource source = sourceExchanger.createExchangeSource(); AtomicInteger pagesAddedToSource = new AtomicInteger(); + PlainActionFuture remoteSinkFuture = new PlainActionFuture<>(); sourceExchanger.addRemoteSink( sinkExchanger::fetchPageAsync, randomBoolean(), pagesAddedToSource::incrementAndGet, 1, - ActionListener.noop() + remoteSinkFuture ); SubscribableListener waitForReading = source.waitForReading().listener(); assertFalse(waitForReading.isDone()); @@ -161,13 +162,12 @@ public void testBasic() throws Exception { sink2.finish(); assertTrue(sink2.isFinished()); assertTrue(source.isFinished()); - assertFalse(sourceCompletion.isDone()); source.finish(); - sourceCompletion.actionGet(10, TimeUnit.SECONDS); ESTestCase.terminate(threadPool); for (Page page : pages) { page.releaseBlocks(); } + safeGet(remoteSinkFuture); } /** @@ -350,47 +350,45 @@ protected void start(Driver driver, ActionListener listener) { public void testConcurrentWithHandlers() { BlockFactory blockFactory = blockFactory(); - PlainActionFuture sourceCompletionFuture = new PlainActionFuture<>(); - var sourceExchanger = new ExchangeSourceHandler( - randomExchangeBuffer(), - threadPool.executor(ESQL_TEST_EXECUTOR), - sourceCompletionFuture - ); - List sinkHandlers = new ArrayList<>(); - Supplier exchangeSink = () -> { - final ExchangeSinkHandler sinkHandler; - if (sinkHandlers.isEmpty() == false && randomBoolean()) { - sinkHandler = randomFrom(sinkHandlers); - } else { - sinkHandler = new ExchangeSinkHandler(blockFactory, randomExchangeBuffer(), threadPool.relativeTimeInMillisSupplier()); - sourceExchanger.addRemoteSink( - sinkHandler::fetchPageAsync, - randomBoolean(), - () -> {}, - randomIntBetween(1, 3), - ActionListener.noop() - ); - sinkHandlers.add(sinkHandler); - } - return sinkHandler.createExchangeSink(() -> {}); - }; - final int maxInputSeqNo = rarely() ? -1 : randomIntBetween(0, 50_000); - final int maxOutputSeqNo = rarely() ? -1 : randomIntBetween(0, 50_000); - Set actualSeqNos = runConcurrentTest(maxInputSeqNo, maxOutputSeqNo, sourceExchanger::createExchangeSource, exchangeSink); - var expectedSeqNos = IntStream.range(0, Math.min(maxInputSeqNo, maxOutputSeqNo)).boxed().collect(Collectors.toSet()); - assertThat(actualSeqNos, hasSize(expectedSeqNos.size())); - assertThat(actualSeqNos, equalTo(expectedSeqNos)); - sourceCompletionFuture.actionGet(10, TimeUnit.SECONDS); + var sourceExchanger = new ExchangeSourceHandler(randomExchangeBuffer(), threadPool.executor(ESQL_TEST_EXECUTOR)); + PlainActionFuture remoteSinksFuture = new PlainActionFuture<>(); + try (RefCountingListener refs = new RefCountingListener(remoteSinksFuture)) { + List sinkHandlers = new ArrayList<>(); + Supplier exchangeSink = () -> { + final ExchangeSinkHandler sinkHandler; + if (sinkHandlers.isEmpty() == false && randomBoolean()) { + sinkHandler = randomFrom(sinkHandlers); + } else { + sinkHandler = new ExchangeSinkHandler(blockFactory, randomExchangeBuffer(), threadPool.relativeTimeInMillisSupplier()); + sourceExchanger.addRemoteSink( + sinkHandler::fetchPageAsync, + randomBoolean(), + () -> {}, + randomIntBetween(1, 3), + refs.acquire() + ); + sinkHandlers.add(sinkHandler); + } + return sinkHandler.createExchangeSink(() -> {}); + }; + final int maxInputSeqNo = rarely() ? -1 : randomIntBetween(0, 50_000); + final int maxOutputSeqNo = rarely() ? -1 : randomIntBetween(0, 50_000); + Set actualSeqNos = runConcurrentTest( + maxInputSeqNo, + maxOutputSeqNo, + sourceExchanger::createExchangeSource, + exchangeSink + ); + var expectedSeqNos = IntStream.range(0, Math.min(maxInputSeqNo, maxOutputSeqNo)).boxed().collect(Collectors.toSet()); + assertThat(actualSeqNos, hasSize(expectedSeqNos.size())); + assertThat(actualSeqNos, equalTo(expectedSeqNos)); + } + safeGet(remoteSinksFuture); } public void testExchangeSourceContinueOnFailure() { BlockFactory blockFactory = blockFactory(); - PlainActionFuture sourceCompletionFuture = new PlainActionFuture<>(); - var exchangeSourceHandler = new ExchangeSourceHandler( - randomExchangeBuffer(), - threadPool.executor(ESQL_TEST_EXECUTOR), - sourceCompletionFuture - ); + var exchangeSourceHandler = new ExchangeSourceHandler(randomExchangeBuffer(), threadPool.executor(ESQL_TEST_EXECUTOR)); final int maxInputSeqNo = rarely() ? -1 : randomIntBetween(0, 50_000); final int maxOutputSeqNo = rarely() ? -1 : randomIntBetween(0, 50_000); Set expectedSeqNos = ConcurrentCollections.newConcurrentSet(); @@ -398,57 +396,65 @@ public void testExchangeSourceContinueOnFailure() { AtomicInteger totalSinks = new AtomicInteger(); AtomicInteger failedSinks = new AtomicInteger(); AtomicInteger completedSinks = new AtomicInteger(); - Supplier exchangeSink = () -> { - var sinkHandler = new ExchangeSinkHandler(blockFactory, randomExchangeBuffer(), threadPool.relativeTimeInMillisSupplier()); - int failAfter = randomBoolean() ? Integer.MAX_VALUE : randomIntBetween(0, 100); - AtomicInteger fetched = new AtomicInteger(); - int instance = randomIntBetween(1, 3); - totalSinks.incrementAndGet(); - AtomicBoolean sinkFailed = new AtomicBoolean(); - exchangeSourceHandler.addRemoteSink((allSourcesFinished, listener) -> { - if (fetched.incrementAndGet() > failAfter) { - sinkHandler.fetchPageAsync(true, listener.delegateFailure((l, r) -> { - failedRequests.incrementAndGet(); - sinkFailed.set(true); - listener.onFailure(new CircuitBreakingException("simulated", CircuitBreaker.Durability.PERMANENT)); - })); - } else { - sinkHandler.fetchPageAsync(allSourcesFinished, listener.delegateFailure((l, r) -> { - Page page = r.takePage(); - if (page != null) { - IntBlock block = page.getBlock(0); - for (int i = 0; i < block.getPositionCount(); i++) { - int v = block.getInt(i); - if (v < maxOutputSeqNo) { - expectedSeqNos.add(v); + PlainActionFuture remoteSinksFuture = new PlainActionFuture<>(); + try (RefCountingListener refs = new RefCountingListener(remoteSinksFuture)) { + Supplier exchangeSink = () -> { + var sinkHandler = new ExchangeSinkHandler(blockFactory, randomExchangeBuffer(), threadPool.relativeTimeInMillisSupplier()); + int failAfter = randomBoolean() ? Integer.MAX_VALUE : randomIntBetween(0, 100); + AtomicInteger fetched = new AtomicInteger(); + int instance = randomIntBetween(1, 3); + totalSinks.incrementAndGet(); + AtomicBoolean sinkFailed = new AtomicBoolean(); + ActionListener oneSinkListener = refs.acquire(); + exchangeSourceHandler.addRemoteSink((allSourcesFinished, listener) -> { + if (fetched.incrementAndGet() > failAfter) { + sinkHandler.fetchPageAsync(true, listener.delegateFailure((l, r) -> { + failedRequests.incrementAndGet(); + sinkFailed.set(true); + listener.onFailure(new CircuitBreakingException("simulated", CircuitBreaker.Durability.PERMANENT)); + })); + } else { + sinkHandler.fetchPageAsync(allSourcesFinished, listener.delegateFailure((l, r) -> { + Page page = r.takePage(); + if (page != null) { + IntBlock block = page.getBlock(0); + for (int i = 0; i < block.getPositionCount(); i++) { + int v = block.getInt(i); + if (v < maxOutputSeqNo) { + expectedSeqNos.add(v); + } } } - } - l.onResponse(new ExchangeResponse(blockFactory, page, r.finished())); - })); - } - }, false, () -> {}, instance, ActionListener.wrap(r -> { - assertFalse(sinkFailed.get()); - completedSinks.incrementAndGet(); - }, e -> { - assertTrue(sinkFailed.get()); - failedSinks.incrementAndGet(); - })); - return sinkHandler.createExchangeSink(() -> {}); - }; - Set actualSeqNos = runConcurrentTest( - maxInputSeqNo, - maxOutputSeqNo, - exchangeSourceHandler::createExchangeSource, - exchangeSink - ); - assertThat(actualSeqNos, equalTo(expectedSeqNos)); - safeGet(sourceCompletionFuture); - assertThat(completedSinks.get() + failedSinks.get(), equalTo(totalSinks.get())); + l.onResponse(new ExchangeResponse(blockFactory, page, r.finished())); + })); + } + }, false, () -> {}, instance, ActionListener.wrap(r -> { + assertFalse(sinkFailed.get()); + completedSinks.incrementAndGet(); + oneSinkListener.onResponse(null); + }, e -> { + assertTrue(sinkFailed.get()); + failedSinks.incrementAndGet(); + oneSinkListener.onFailure(e); + })); + return sinkHandler.createExchangeSink(() -> {}); + }; + Set actualSeqNos = runConcurrentTest( + maxInputSeqNo, + maxOutputSeqNo, + exchangeSourceHandler::createExchangeSource, + exchangeSink + ); + assertThat(actualSeqNos, equalTo(expectedSeqNos)); + } if (failedRequests.get() > 0) { + expectThrows(CircuitBreakingException.class, () -> remoteSinksFuture.actionGet(1, TimeUnit.MINUTES)); assertThat(failedSinks.get(), greaterThan(0)); + assertThat(completedSinks.get() + failedSinks.get(), equalTo(totalSinks.get())); } else { + safeGet(remoteSinksFuture); assertThat(failedSinks.get(), equalTo(0)); + assertThat(completedSinks.get(), equalTo(totalSinks.get())); } } @@ -465,7 +471,7 @@ public void testClosingSinks() { assertFalse(sink.waitForWriting().listener().isDone()); PlainActionFuture future = new PlainActionFuture<>(); sinkExchanger.fetchPageAsync(true, future); - ExchangeResponse resp = future.actionGet(); + ExchangeResponse resp = safeGet(future); assertTrue(resp.finished()); assertNull(resp.takePage()); assertTrue(sink.waitForWriting().listener().isDone()); @@ -473,7 +479,7 @@ public void testClosingSinks() { } public void testFinishEarly() throws Exception { - ExchangeSourceHandler sourceHandler = new ExchangeSourceHandler(20, threadPool.generic(), ActionListener.noop()); + ExchangeSourceHandler sourceHandler = new ExchangeSourceHandler(20, threadPool.generic()); Semaphore permits = new Semaphore(between(1, 5)); BlockFactory blockFactory = blockFactory(); Queue pages = ConcurrentCollections.newQueue(); @@ -544,12 +550,7 @@ public void testConcurrentWithTransportActions() { try (exchange0; exchange1; node0; node1) { String exchangeId = "exchange"; Task task = new Task(1, "", "", "", null, Collections.emptyMap()); - PlainActionFuture sourceCompletionFuture = new PlainActionFuture<>(); - var sourceHandler = new ExchangeSourceHandler( - randomExchangeBuffer(), - threadPool.executor(ESQL_TEST_EXECUTOR), - sourceCompletionFuture - ); + var sourceHandler = new ExchangeSourceHandler(randomExchangeBuffer(), threadPool.executor(ESQL_TEST_EXECUTOR)); ExchangeSinkHandler sinkHandler = exchange1.createSinkHandler(exchangeId, randomExchangeBuffer()); Transport.Connection connection = node0.getConnection(node1.getLocalNode()); sourceHandler.addRemoteSink( @@ -570,7 +571,6 @@ public void testConcurrentWithTransportActions() { var expectedSeqNos = IntStream.range(0, Math.min(maxInputSeqNo, maxOutputSeqNo)).boxed().collect(Collectors.toSet()); assertThat(actualSeqNos, hasSize(expectedSeqNos.size())); assertThat(actualSeqNos, equalTo(expectedSeqNos)); - sourceCompletionFuture.actionGet(10, TimeUnit.SECONDS); } } @@ -620,12 +620,7 @@ public void sendResponse(TransportResponse transportResponse) { try (exchange0; exchange1; node0; node1) { String exchangeId = "exchange"; Task task = new Task(1, "", "", "", null, Collections.emptyMap()); - PlainActionFuture sourceCompletionFuture = new PlainActionFuture<>(); - var sourceHandler = new ExchangeSourceHandler( - randomIntBetween(1, 128), - threadPool.executor(ESQL_TEST_EXECUTOR), - sourceCompletionFuture - ); + var sourceHandler = new ExchangeSourceHandler(randomIntBetween(1, 128), threadPool.executor(ESQL_TEST_EXECUTOR)); ExchangeSinkHandler sinkHandler = exchange1.createSinkHandler(exchangeId, randomIntBetween(1, 128)); Transport.Connection connection = node0.getConnection(node1.getLocalNode()); PlainActionFuture remoteSinkFuture = new PlainActionFuture<>(); @@ -652,15 +647,14 @@ public void sendResponse(TransportResponse transportResponse) { assertThat(cause.getMessage(), equalTo("page is too large")); PlainActionFuture sinkCompletionFuture = new PlainActionFuture<>(); sinkHandler.addCompletionListener(sinkCompletionFuture); - assertBusy(() -> assertTrue(sinkCompletionFuture.isDone())); - expectThrows(Exception.class, () -> sourceCompletionFuture.actionGet(10, TimeUnit.SECONDS)); + safeGet(sinkCompletionFuture); } } public void testNoCyclicException() throws Exception { PlainActionFuture future = new PlainActionFuture<>(); try (EsqlRefCountingListener refs = new EsqlRefCountingListener(future)) { - var exchangeSourceHandler = new ExchangeSourceHandler(between(10, 100), threadPool.generic(), refs.acquire()); + var exchangeSourceHandler = new ExchangeSourceHandler(between(10, 100), threadPool.generic()); int numSinks = between(5, 10); for (int i = 0; i < numSinks; i++) { RemoteSink remoteSink = (allSourcesFinished, listener) -> threadPool.schedule( diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java index a2a5e5175c4ec..e41dd42c7579d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java @@ -222,8 +222,7 @@ void runComputeOnRemoteCluster( }))) { var exchangeSource = new ExchangeSourceHandler( configuration.pragmas().exchangeBufferSize(), - transportService.getThreadPool().executor(ThreadPool.Names.SEARCH), - computeListener.acquireAvoid() + transportService.getThreadPool().executor(ThreadPool.Names.SEARCH) ); try (Releasable ignored = exchangeSource.addEmptySink()) { exchangeSink.addCompletionListener(computeListener.acquireAvoid()); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index c494c63d0fae8..4279d0114130d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -191,16 +191,16 @@ public void execute( * entire plan. */ List outputAttributes = physicalPlan.output(); + var exchangeSource = new ExchangeSourceHandler( + queryPragmas.exchangeBufferSize(), + transportService.getThreadPool().executor(ThreadPool.Names.SEARCH) + ); + listener = ActionListener.runBefore(listener, () -> exchangeService.removeExchangeSourceHandler(sessionId)); + exchangeService.addExchangeSourceHandler(sessionId, exchangeSource); try (var computeListener = new ComputeListener(transportService.getThreadPool(), cancelQueryOnFailure, listener.map(profiles -> { execInfo.markEndQuery(); // TODO: revisit this time recording model as part of INLINESTATS improvements return new Result(outputAttributes, collectedPages, profiles, execInfo); }))) { - var exchangeSource = new ExchangeSourceHandler( - queryPragmas.exchangeBufferSize(), - transportService.getThreadPool().executor(ThreadPool.Names.SEARCH), - ActionListener.runBefore(computeListener.acquireAvoid(), () -> exchangeService.removeExchangeSourceHandler(sessionId)) - ); - exchangeService.addExchangeSourceHandler(sessionId, exchangeSource); try (Releasable ignored = exchangeSource.addEmptySink()) { // run compute on the coordinator final AtomicBoolean localClusterWasInterrupted = new AtomicBoolean(); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java index ba2f3c5dfdc2c..ee5b192bf3285 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java @@ -171,7 +171,7 @@ protected void sendRequest( originalIndices, PlannerUtils.requestTimestampFilter(dataNodePlan), runOnTaskFailure, - ActionListener.runAfter(outListener, exchangeSource.addEmptySink()::close) + ActionListener.releaseAfter(outListener, exchangeSource.addEmptySink()) ); } @@ -391,7 +391,7 @@ private void runComputeOnDataNode( task.addListener( () -> exchangeService.finishSinkHandler(externalId, new TaskCancelledException(task.getReasonCancelled())) ); - var exchangeSource = new ExchangeSourceHandler(1, esqlExecutor, computeListener.acquireAvoid()); + var exchangeSource = new ExchangeSourceHandler(1, esqlExecutor); exchangeSource.addRemoteSink(internalSink::fetchPageAsync, true, () -> {}, 1, ActionListener.noop()); var reductionListener = computeListener.acquireCompute(); computeService.runCompute( diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java index 263c076c23310..1e21aa3774af7 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java @@ -613,7 +613,7 @@ void executeSubPlan( bigArrays, ByteSizeValue.ofBytes(randomLongBetween(1, BlockFactory.DEFAULT_MAX_BLOCK_PRIMITIVE_ARRAY_SIZE.getBytes() * 2)) ); - ExchangeSourceHandler exchangeSource = new ExchangeSourceHandler(between(1, 64), executor, ActionListener.noop()); + ExchangeSourceHandler exchangeSource = new ExchangeSourceHandler(between(1, 64), executor); ExchangeSinkHandler exchangeSink = new ExchangeSinkHandler(blockFactory, between(1, 64), threadPool::relativeTimeInMillis); LocalExecutionPlanner executionPlanner = new LocalExecutionPlanner( diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java index b877937d6397a..55448b7ceaf49 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java @@ -7596,7 +7596,7 @@ private LocalExecutionPlanner.LocalExecutionPlan physicalOperationsFromPhysicalP TestBlockFactory.getNonBreakingInstance(), Settings.EMPTY, config, - new ExchangeSourceHandler(10, null, null)::createExchangeSource, + new ExchangeSourceHandler(10, null)::createExchangeSource, () -> exchangeSinkHandler.createExchangeSink(() -> {}), null, null,