1111import org .elasticsearch .action .ActionListenerResponseHandler ;
1212import org .elasticsearch .action .OriginalIndices ;
1313import org .elasticsearch .action .support .ChannelActionListener ;
14- import org .elasticsearch .compute .EsqlRefCountingListener ;
1514import org .elasticsearch .compute .operator .exchange .ExchangeService ;
1615import org .elasticsearch .compute .operator .exchange .ExchangeSourceHandler ;
1716import org .elasticsearch .core .Releasable ;
17+ import org .elasticsearch .core .TimeValue ;
1818import org .elasticsearch .tasks .CancellableTask ;
1919import org .elasticsearch .tasks .Task ;
2020import org .elasticsearch .tasks .TaskCancelledException ;
2525import org .elasticsearch .transport .TransportRequestHandler ;
2626import org .elasticsearch .transport .TransportRequestOptions ;
2727import org .elasticsearch .transport .TransportService ;
28- import org .elasticsearch .xpack .esql .action .EsqlExecutionInfo ;
2928import org .elasticsearch .xpack .esql .plan .physical .ExchangeSinkExec ;
3029import org .elasticsearch .xpack .esql .plan .physical .PhysicalPlan ;
3130import org .elasticsearch .xpack .esql .session .Configuration ;
3231
3332import java .util .ArrayList ;
34- import java .util .Arrays ;
3533import java .util .List ;
3634import java .util .Map ;
35+ import java .util .Objects ;
3736import java .util .Set ;
3837import java .util .concurrent .Executor ;
38+ import java .util .concurrent .atomic .AtomicReference ;
3939
4040/**
4141 * Manages computes across multiple clusters by sending {@link ClusterComputeRequest} to remote clusters and executing the computes.
@@ -63,46 +63,54 @@ final class ClusterComputeHandler implements TransportRequestHandler<ClusterComp
6363 transportService .registerRequestHandler (ComputeService .CLUSTER_ACTION_NAME , esqlExecutor , ClusterComputeRequest ::new , this );
6464 }
6565
66- void startComputeOnRemoteClusters (
66+ void startComputeOnRemoteCluster (
6767 String sessionId ,
6868 CancellableTask rootTask ,
6969 Configuration configuration ,
7070 PhysicalPlan plan ,
7171 ExchangeSourceHandler exchangeSource ,
72- List <RemoteCluster > clusters ,
73- ComputeListener computeListener
72+ RemoteCluster cluster ,
73+ Runnable cancelQueryOnFailure ,
74+ ActionListener <ComputeResponse > listener
7475 ) {
7576 var queryPragmas = configuration .pragmas ();
76- var linkExchangeListeners = ActionListener .releaseAfter (computeListener .acquireAvoid (), exchangeSource .addEmptySink ());
77- try (EsqlRefCountingListener refs = new EsqlRefCountingListener (linkExchangeListeners )) {
78- for (RemoteCluster cluster : clusters ) {
79- final var childSessionId = computeService .newChildSession (sessionId );
80- ExchangeService .openExchange (
81- transportService ,
82- cluster .connection ,
83- childSessionId ,
84- queryPragmas .exchangeBufferSize (),
85- esqlExecutor ,
86- refs .acquire ().delegateFailureAndWrap ((l , unused ) -> {
87- var remoteSink = exchangeService .newRemoteSink (rootTask , childSessionId , transportService , cluster .connection );
88- exchangeSource .addRemoteSink (remoteSink , true , queryPragmas .concurrentExchangeClients (), ActionListener .noop ());
89- var remotePlan = new RemoteClusterPlan (plan , cluster .concreteIndices , cluster .originalIndices );
90- var clusterRequest = new ClusterComputeRequest (cluster .clusterAlias , childSessionId , configuration , remotePlan );
91- var clusterListener = ActionListener .runBefore (
92- computeListener .acquireCompute (cluster .clusterAlias ()),
93- () -> l .onResponse (null )
94- );
95- transportService .sendChildRequest (
96- cluster .connection ,
97- ComputeService .CLUSTER_ACTION_NAME ,
98- clusterRequest ,
99- rootTask ,
100- TransportRequestOptions .EMPTY ,
101- new ActionListenerResponseHandler <>(clusterListener , ComputeResponse ::new , esqlExecutor )
102- );
103- })
104- );
105- }
77+ listener = ActionListener .runBefore (listener , exchangeSource .addEmptySink ()::close );
78+ final var childSessionId = computeService .newChildSession (sessionId );
79+ final AtomicReference <ComputeResponse > finalResponse = new AtomicReference <>();
80+ try (var computeListener = new ComputeListener (transportService .getThreadPool (), cancelQueryOnFailure , listener .map (profiles -> {
81+ var resp = finalResponse .get ();
82+ return Objects .requireNonNullElseGet (resp , () -> new ComputeResponse (profiles ));
83+ }))) {
84+ ExchangeService .openExchange (
85+ transportService ,
86+ cluster .connection ,
87+ childSessionId ,
88+ queryPragmas .exchangeBufferSize (),
89+ esqlExecutor ,
90+ computeListener .acquireCompute ().delegateFailureAndWrap ((l , unused ) -> {
91+ var remoteSink = exchangeService .newRemoteSink (rootTask , childSessionId , transportService , cluster .connection );
92+ exchangeSource .addRemoteSink (
93+ remoteSink ,
94+ true ,
95+ queryPragmas .concurrentExchangeClients (),
96+ computeListener .acquireAvoid ()
97+ );
98+ var remotePlan = new RemoteClusterPlan (plan , cluster .concreteIndices , cluster .originalIndices );
99+ var clusterRequest = new ClusterComputeRequest (cluster .clusterAlias , childSessionId , configuration , remotePlan );
100+ final ActionListener <ComputeResponse > clusterListener = l .map (r -> {
101+ finalResponse .set (r );
102+ return r .getProfiles ();
103+ });
104+ transportService .sendChildRequest (
105+ cluster .connection ,
106+ ComputeService .CLUSTER_ACTION_NAME ,
107+ clusterRequest ,
108+ rootTask ,
109+ TransportRequestOptions .EMPTY ,
110+ new ActionListenerResponseHandler <>(clusterListener , ComputeResponse ::new , esqlExecutor )
111+ );
112+ })
113+ );
106114 }
107115 }
108116
@@ -141,28 +149,16 @@ public void messageReceived(ClusterComputeRequest request, TransportChannel chan
141149 listener .onFailure (new IllegalStateException ("expected exchange sink for a remote compute; got " + plan ));
142150 return ;
143151 }
144- String clusterAlias = request .clusterAlias ();
145- /*
146- * This handler runs only on remote cluster coordinators, so it creates a new local EsqlExecutionInfo object to record
147- * execution metadata for ES|QL processing local to this cluster. The execution info will be copied into the
148- * ComputeResponse that is sent back to the primary coordinating cluster.
149- */
150- EsqlExecutionInfo execInfo = new EsqlExecutionInfo (true );
151- execInfo .swapCluster (clusterAlias , (k , v ) -> new EsqlExecutionInfo .Cluster (clusterAlias , Arrays .toString (request .indices ())));
152- CancellableTask cancellable = (CancellableTask ) task ;
153- try (var computeListener = ComputeListener .create (clusterAlias , transportService , cancellable , execInfo , listener )) {
154- runComputeOnRemoteCluster (
155- clusterAlias ,
156- request .sessionId (),
157- (CancellableTask ) task ,
158- request .configuration (),
159- (ExchangeSinkExec ) plan ,
160- Set .of (remoteClusterPlan .targetIndices ()),
161- remoteClusterPlan .originalIndices (),
162- execInfo ,
163- computeListener
164- );
165- }
152+ runComputeOnRemoteCluster (
153+ request .clusterAlias (),
154+ request .sessionId (),
155+ (CancellableTask ) task ,
156+ request .configuration (),
157+ (ExchangeSinkExec ) plan ,
158+ Set .of (remoteClusterPlan .targetIndices ()),
159+ remoteClusterPlan .originalIndices (),
160+ listener
161+ );
166162 }
167163
168164 /**
@@ -182,48 +178,59 @@ void runComputeOnRemoteCluster(
182178 ExchangeSinkExec plan ,
183179 Set <String > concreteIndices ,
184180 OriginalIndices originalIndices ,
185- EsqlExecutionInfo executionInfo ,
186- ComputeListener computeListener
181+ ActionListener <ComputeResponse > listener
187182 ) {
188183 final var exchangeSink = exchangeService .getSinkHandler (globalSessionId );
189184 parentTask .addListener (
190185 () -> exchangeService .finishSinkHandler (globalSessionId , new TaskCancelledException (parentTask .getReasonCancelled ()))
191186 );
192187 final String localSessionId = clusterAlias + ":" + globalSessionId ;
193188 final PhysicalPlan coordinatorPlan = ComputeService .reductionPlan (plan , true );
194- var exchangeSource = new ExchangeSourceHandler (
195- configuration .pragmas ().exchangeBufferSize (),
196- transportService .getThreadPool ().executor (ThreadPool .Names .SEARCH ),
197- computeListener .acquireAvoid ()
198- );
199- try (Releasable ignored = exchangeSource .addEmptySink ()) {
200- exchangeSink .addCompletionListener (computeListener .acquireAvoid ());
201- computeService .runCompute (
202- parentTask ,
203- new ComputeContext (
189+ final AtomicReference <ComputeResponse > finalResponse = new AtomicReference <>();
190+ final long startTimeInNanos = System .nanoTime ();
191+ final Runnable cancelQueryOnFailure = computeService .cancelQueryOnFailure (parentTask );
192+ try (var computeListener = new ComputeListener (transportService .getThreadPool (), cancelQueryOnFailure , listener .map (profiles -> {
193+ final TimeValue took = TimeValue .timeValueNanos (System .nanoTime () - startTimeInNanos );
194+ final ComputeResponse r = finalResponse .get ();
195+ return new ComputeResponse (profiles , took , r .totalShards , r .successfulShards , r .skippedShards , r .failedShards );
196+ }))) {
197+ var exchangeSource = new ExchangeSourceHandler (
198+ configuration .pragmas ().exchangeBufferSize (),
199+ transportService .getThreadPool ().executor (ThreadPool .Names .SEARCH ),
200+ computeListener .acquireAvoid ()
201+ );
202+ try (Releasable ignored = exchangeSource .addEmptySink ()) {
203+ exchangeSink .addCompletionListener (computeListener .acquireAvoid ());
204+ computeService .runCompute (
205+ parentTask ,
206+ new ComputeContext (
207+ localSessionId ,
208+ clusterAlias ,
209+ List .of (),
210+ configuration ,
211+ configuration .newFoldContext (),
212+ exchangeSource ,
213+ exchangeSink
214+ ),
215+ coordinatorPlan ,
216+ computeListener .acquireCompute ()
217+ );
218+ dataNodeComputeHandler .startComputeOnDataNodes (
204219 localSessionId ,
205220 clusterAlias ,
206- List . of () ,
221+ parentTask ,
207222 configuration ,
208- configuration .newFoldContext (),
223+ plan ,
224+ concreteIndices ,
225+ originalIndices ,
209226 exchangeSource ,
210- exchangeSink
211- ),
212- coordinatorPlan ,
213- computeListener .acquireCompute (clusterAlias )
214- );
215- dataNodeComputeHandler .startComputeOnDataNodes (
216- localSessionId ,
217- clusterAlias ,
218- parentTask ,
219- configuration ,
220- plan ,
221- concreteIndices ,
222- originalIndices ,
223- exchangeSource ,
224- executionInfo ,
225- computeListener
226- );
227+ cancelQueryOnFailure ,
228+ computeListener .acquireCompute ().map (r -> {
229+ finalResponse .set (r );
230+ return r .getProfiles ();
231+ })
232+ );
233+ }
227234 }
228235 }
229236
0 commit comments