77
88package org .elasticsearch .xpack .esql .plugin ;
99
10+ import org .elasticsearch .TransportVersions ;
1011import org .elasticsearch .action .ActionListener ;
1112import org .elasticsearch .action .ActionListenerResponseHandler ;
1213import org .elasticsearch .action .ActionRunnable ;
@@ -118,7 +119,8 @@ void startComputeOnDataNodes(
118119 for (DataNode node : dataNodeResult .dataNodes ()) {
119120 var queryPragmas = configuration .pragmas ();
120121 var childSessionId = computeService .newChildSession (sessionId );
121- ActionListener <ComputeResponse > nodeListener = computeListener .acquireCompute ().map (ComputeResponse ::getProfiles );
122+ ActionListener <DataNodeComputeResponse > nodeListener = computeListener .acquireCompute ()
123+ .map (DataNodeComputeResponse ::profiles );
122124 ExchangeService .openExchange (
123125 transportService ,
124126 node .connection ,
@@ -151,7 +153,7 @@ void startComputeOnDataNodes(
151153 dataNodeRequest ,
152154 parentTask ,
153155 TransportRequestOptions .EMPTY ,
154- new ActionListenerResponseHandler <>(nodeListener , ComputeResponse ::new , esqlExecutor )
156+ new ActionListenerResponseHandler <>(nodeListener , DataNodeComputeResponse ::new , esqlExecutor )
155157 );
156158 })
157159 );
@@ -160,72 +162,6 @@ void startComputeOnDataNodes(
160162 }, listener ::onFailure ));
161163 }
162164
163- private void acquireSearchContexts (
164- String clusterAlias ,
165- List <ShardId > shardIds ,
166- Configuration configuration ,
167- Map <Index , AliasFilter > aliasFilters ,
168- ActionListener <List <SearchContext >> listener
169- ) {
170- final List <IndexShard > targetShards = new ArrayList <>();
171- try {
172- for (ShardId shardId : shardIds ) {
173- var indexShard = searchService .getIndicesService ().indexServiceSafe (shardId .getIndex ()).getShard (shardId .id ());
174- targetShards .add (indexShard );
175- }
176- } catch (Exception e ) {
177- listener .onFailure (e );
178- return ;
179- }
180- final var doAcquire = ActionRunnable .supply (listener , () -> {
181- final List <SearchContext > searchContexts = new ArrayList <>(targetShards .size ());
182- boolean success = false ;
183- try {
184- for (IndexShard shard : targetShards ) {
185- var aliasFilter = aliasFilters .getOrDefault (shard .shardId ().getIndex (), AliasFilter .EMPTY );
186- var shardRequest = new ShardSearchRequest (
187- shard .shardId (),
188- configuration .absoluteStartedTimeInMillis (),
189- aliasFilter ,
190- clusterAlias
191- );
192- // TODO: `searchService.createSearchContext` allows opening search contexts without limits,
193- // we need to limit the number of active search contexts here or in SearchService
194- SearchContext context = searchService .createSearchContext (shardRequest , SearchService .NO_TIMEOUT );
195- searchContexts .add (context );
196- }
197- for (SearchContext searchContext : searchContexts ) {
198- searchContext .preProcess ();
199- }
200- success = true ;
201- return searchContexts ;
202- } finally {
203- if (success == false ) {
204- IOUtils .close (searchContexts );
205- }
206- }
207- });
208- final AtomicBoolean waitedForRefreshes = new AtomicBoolean ();
209- try (RefCountingRunnable refs = new RefCountingRunnable (() -> {
210- if (waitedForRefreshes .get ()) {
211- esqlExecutor .execute (doAcquire );
212- } else {
213- doAcquire .run ();
214- }
215- })) {
216- for (IndexShard targetShard : targetShards ) {
217- final Releasable ref = refs .acquire ();
218- targetShard .ensureShardSearchActive (await -> {
219- try (ref ) {
220- if (await ) {
221- waitedForRefreshes .set (true );
222- }
223- }
224- });
225- }
226- }
227- }
228-
229165 record DataNode (Transport .Connection connection , List <ShardId > shardIds , Map <Index , AliasFilter > aliasFilters ) {
230166
231167 }
@@ -317,19 +253,25 @@ private class DataNodeRequestExecutor {
317253 private final ComputeListener computeListener ;
318254 private final int maxConcurrentShards ;
319255 private final ExchangeSink blockingSink ; // block until we have completed on all shards or the coordinator has enough data
256+ private final boolean failFastOnShardFailure ;
257+ private final Map <ShardId , Exception > shardLevelFailures ;
320258
321259 DataNodeRequestExecutor (
322260 DataNodeRequest request ,
323261 CancellableTask parentTask ,
324262 ExchangeSinkHandler exchangeSink ,
325263 int maxConcurrentShards ,
264+ boolean failFastOnShardFailure ,
265+ Map <ShardId , Exception > shardLevelFailures ,
326266 ComputeListener computeListener
327267 ) {
328268 this .request = request ;
329269 this .parentTask = parentTask ;
330270 this .exchangeSink = exchangeSink ;
331271 this .computeListener = computeListener ;
332272 this .maxConcurrentShards = maxConcurrentShards ;
273+ this .failFastOnShardFailure = failFastOnShardFailure ;
274+ this .shardLevelFailures = shardLevelFailures ;
333275 this .blockingSink = exchangeSink .createExchangeSink ();
334276 }
335277
@@ -369,6 +311,10 @@ public void onFailure(Exception e) {
369311 };
370312 acquireSearchContexts (clusterAlias , shardIds , configuration , request .aliasFilters (), ActionListener .wrap (searchContexts -> {
371313 assert ThreadPool .assertCurrentThreadPool (ThreadPool .Names .SEARCH , ESQL_WORKER_THREAD_POOL_NAME );
314+ if (searchContexts .isEmpty ()) {
315+ batchListener .onResponse (List .of ());
316+ return ;
317+ }
372318 var computeContext = new ComputeContext (
373319 sessionId ,
374320 clusterAlias ,
@@ -382,6 +328,72 @@ public void onFailure(Exception e) {
382328 }, batchListener ::onFailure ));
383329 }
384330
331+ private void acquireSearchContexts (
332+ String clusterAlias ,
333+ List <ShardId > shardIds ,
334+ Configuration configuration ,
335+ Map <Index , AliasFilter > aliasFilters ,
336+ ActionListener <List <SearchContext >> listener
337+ ) {
338+ final List <IndexShard > targetShards = new ArrayList <>();
339+ for (ShardId shardId : shardIds ) {
340+ try {
341+ var indexShard = searchService .getIndicesService ().indexServiceSafe (shardId .getIndex ()).getShard (shardId .id ());
342+ targetShards .add (indexShard );
343+ } catch (Exception e ) {
344+ if (addShardLevelFailure (shardId , e ) == false ) {
345+ listener .onFailure (e );
346+ return ;
347+ }
348+ }
349+ }
350+ final var doAcquire = ActionRunnable .supply (listener , () -> {
351+ final List <SearchContext > searchContexts = new ArrayList <>(targetShards .size ());
352+ SearchContext context = null ;
353+ for (IndexShard shard : targetShards ) {
354+ try {
355+ var aliasFilter = aliasFilters .getOrDefault (shard .shardId ().getIndex (), AliasFilter .EMPTY );
356+ var shardRequest = new ShardSearchRequest (
357+ shard .shardId (),
358+ configuration .absoluteStartedTimeInMillis (),
359+ aliasFilter ,
360+ clusterAlias
361+ );
362+ // TODO: `searchService.createSearchContext` allows opening search contexts without limits,
363+ // we need to limit the number of active search contexts here or in SearchService
364+ context = searchService .createSearchContext (shardRequest , SearchService .NO_TIMEOUT );
365+ context .preProcess ();
366+ } catch (Exception e ) {
367+ if (addShardLevelFailure (shard .shardId (), e ) == false ) {
368+ IOUtils .closeWhileHandlingException (context , () -> IOUtils .close (searchContexts ));
369+ throw e ;
370+ }
371+ }
372+ searchContexts .add (context );
373+ }
374+ return searchContexts ;
375+ });
376+ final AtomicBoolean waitedForRefreshes = new AtomicBoolean ();
377+ try (RefCountingRunnable refs = new RefCountingRunnable (() -> {
378+ if (waitedForRefreshes .get ()) {
379+ esqlExecutor .execute (doAcquire );
380+ } else {
381+ doAcquire .run ();
382+ }
383+ })) {
384+ for (IndexShard targetShard : targetShards ) {
385+ final Releasable ref = refs .acquire ();
386+ targetShard .ensureShardSearchActive (await -> {
387+ try (ref ) {
388+ if (await ) {
389+ waitedForRefreshes .set (true );
390+ }
391+ }
392+ });
393+ }
394+ }
395+ }
396+
385397 private void onBatchCompleted (int lastBatchIndex ) {
386398 if (lastBatchIndex < request .shardIds ().size () && exchangeSink .isFinished () == false ) {
387399 runBatch (lastBatchIndex );
@@ -394,20 +406,30 @@ private void onBatchCompleted(int lastBatchIndex) {
394406 blockingSink .finish ();
395407 }
396408 }
409+
410+ private boolean addShardLevelFailure (ShardId shardId , Exception e ) {
411+ if (failFastOnShardFailure ) {
412+ return false ;
413+ }
414+ shardLevelFailures .put (shardId , e );
415+ return true ;
416+ }
397417 }
398418
399419 private void runComputeOnDataNode (
400420 CancellableTask task ,
401421 String externalId ,
402422 PhysicalPlan reducePlan ,
403423 DataNodeRequest request ,
404- ActionListener <ComputeResponse > listener
424+ boolean failFastOnShardFailure ,
425+ ActionListener <DataNodeComputeResponse > listener
405426 ) {
427+ final Map <ShardId , Exception > shardLevelFailures = new HashMap <>();
406428 try (
407429 ComputeListener computeListener = new ComputeListener (
408430 transportService .getThreadPool (),
409431 computeService .cancelQueryOnFailure (task ),
410- listener .map (ComputeResponse :: new )
432+ listener .map (profiles -> new DataNodeComputeResponse ( profiles , shardLevelFailures ) )
411433 )
412434 ) {
413435 var parentListener = computeListener .acquireAvoid ();
@@ -419,6 +441,8 @@ private void runComputeOnDataNode(
419441 task ,
420442 internalSink ,
421443 request .configuration ().pragmas ().maxConcurrentShardsPerNode (),
444+ failFastOnShardFailure ,
445+ shardLevelFailures ,
422446 computeListener
423447 );
424448 dataNodeRequestExecutor .start ();
@@ -464,7 +488,7 @@ private void runComputeOnDataNode(
464488
465489 @ Override
466490 public void messageReceived (DataNodeRequest request , TransportChannel channel , Task task ) {
467- final ActionListener <ComputeResponse > listener = new ChannelActionListener <>(channel );
491+ final ActionListener <DataNodeComputeResponse > listener = new ChannelActionListener <>(channel );
468492 final PhysicalPlan reductionPlan ;
469493 if (request .plan () instanceof ExchangeSinkExec plan ) {
470494 reductionPlan = ComputeService .reductionPlan (plan , request .runNodeLevelReduction ());
@@ -484,6 +508,8 @@ public void messageReceived(DataNodeRequest request, TransportChannel channel, T
484508 request .indicesOptions (),
485509 request .runNodeLevelReduction ()
486510 );
487- runComputeOnDataNode ((CancellableTask ) task , sessionId , reductionPlan , request , listener );
511+ // the sender doesn't support retry on shard failures, so we need to fail fast here.
512+ final boolean failFastOnShardFailures = channel .getVersion ().before (TransportVersions .ESQL_RETRY_ON_SHARD_LEVEL_FAILURE );
513+ runComputeOnDataNode ((CancellableTask ) task , sessionId , reductionPlan , request , failFastOnShardFailures , listener );
488514 }
489515}
0 commit comments