From b0bd4a8fccbe14a47a2a65e56ab54f5c99a97f74 Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Wed, 28 May 2025 15:21:51 +0200 Subject: [PATCH] Start polling after data computation is started --- .../xpack/esql/plugin/ClusterComputeHandler.java | 16 ++++++++-------- .../esql/plugin/DataNodeComputeHandler.java | 16 ++++++++-------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java index 64f63b962f169..cd4ff13700515 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java @@ -125,14 +125,6 @@ void startComputeOnRemoteCluster( updateExecutionInfo(executionInfo, clusterAlias, finalResponse.get()); return completionInfo; }))) { - var remoteSink = exchangeService.newRemoteSink(groupTask, childSessionId, transportService, cluster.connection); - exchangeSource.addRemoteSink( - remoteSink, - failFast, - () -> pagesFetched.set(true), - queryPragmas.concurrentExchangeClients(), - computeListener.acquireAvoid() - ); var remotePlan = new RemoteClusterPlan(plan, cluster.concreteIndices, cluster.originalIndices); var clusterRequest = new ClusterComputeRequest(clusterAlias, childSessionId, configuration, remotePlan); final ActionListener clusterListener = computeListener.acquireCompute().map(r -> { @@ -147,6 +139,14 @@ void startComputeOnRemoteCluster( TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(clusterListener, ComputeResponse::new, esqlExecutor) ); + var remoteSink = exchangeService.newRemoteSink(groupTask, childSessionId, transportService, cluster.connection); + exchangeSource.addRemoteSink( + remoteSink, + failFast, + () -> pagesFetched.set(true), + queryPragmas.concurrentExchangeClients(), + computeListener.acquireAvoid() + ); } }) ); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java index de431fdb8929e..0de272502e1c7 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java @@ -176,14 +176,6 @@ protected void sendRequest( try ( var computeListener = new ComputeListener(threadPool, onGroupFailure, l.map(ignored -> nodeResponseRef.get())) ) { - final var remoteSink = exchangeService.newRemoteSink(groupTask, childSessionId, transportService, connection); - exchangeSource.addRemoteSink( - remoteSink, - configuration.allowPartialResults() == false, - pagesFetched::incrementAndGet, - queryPragmas.concurrentExchangeClients(), - computeListener.acquireAvoid() - ); final boolean sameNode = transportService.getLocalNode().getId().equals(connection.getNode().getId()); var dataNodeRequest = new DataNodeRequest( childSessionId, @@ -207,6 +199,14 @@ protected void sendRequest( return r.completionInfo(); }), DataNodeComputeResponse::new, esqlExecutor) ); + final var remoteSink = exchangeService.newRemoteSink(groupTask, childSessionId, transportService, connection); + exchangeSource.addRemoteSink( + remoteSink, + configuration.allowPartialResults() == false, + pagesFetched::incrementAndGet, + queryPragmas.concurrentExchangeClients(), + computeListener.acquireAvoid() + ); } }) );