2727import org .elasticsearch .compute .operator .DriverCompletionInfo ;
2828import org .elasticsearch .compute .operator .FailureCollector ;
2929import org .elasticsearch .index .Index ;
30+ import org .elasticsearch .index .IndexNotFoundException ;
3031import org .elasticsearch .index .query .QueryBuilder ;
3132import org .elasticsearch .index .shard .ShardId ;
33+ import org .elasticsearch .index .shard .ShardNotFoundException ;
34+ import org .elasticsearch .logging .LogManager ;
35+ import org .elasticsearch .logging .Logger ;
3236import org .elasticsearch .search .SearchShardTarget ;
3337import org .elasticsearch .search .internal .AliasFilter ;
3438import org .elasticsearch .tasks .CancellableTask ;
6670 */
6771abstract class DataNodeRequestSender {
6872
73+ private static final Logger LOGGER = LogManager .getLogger (DataNodeRequestSender .class );
74+
6975 /**
7076 * Query order according to the
7177 * <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/node-roles-overview.html">node roles</a>.
@@ -282,38 +288,51 @@ private void sendOneNodeRequest(TargetShards targetShards, ComputeListener compu
282288 final ActionListener <DriverCompletionInfo > listener = computeListener .acquireCompute ();
283289 sendRequest (request .node , request .shardIds , request .aliasFilters , new NodeListener () {
284290
285- void onAfter ( DriverCompletionInfo info ) {
291+ void onAfterRequest ( ) {
286292 nodePermits .get (request .node ).release ();
287293 if (concurrentRequests != null ) {
288294 concurrentRequests .release ();
289295 }
290296 trySendingRequestsForPendingShards (targetShards , computeListener );
291- listener .onResponse (info );
292297 }
293298
294299 @ Override
295300 public void onResponse (DataNodeComputeResponse response ) {
296- // remove failures of successful shards
297- for (ShardId shardId : request .shardIds ()) {
298- if (response .shardLevelFailures ().containsKey (shardId ) == false ) {
299- shardFailures .remove (shardId );
301+ try {
302+ // remove failures of successful shards
303+ for (ShardId shardId : request .shardIds ()) {
304+ if (response .shardLevelFailures ().containsKey (shardId ) == false ) {
305+ shardFailures .remove (shardId );
306+ }
300307 }
308+ for (var entry : response .shardLevelFailures ().entrySet ()) {
309+ final ShardId shardId = entry .getKey ();
310+ trackShardLevelFailure (shardId , false , entry .getValue ());
311+ pendingShardIds .add (shardId );
312+ }
313+ onAfterRequest ();
314+ } catch (Exception ex ) {
315+ expectNoFailure ("expect no failure while handling data node response" , ex );
316+ listener .onFailure (ex );
317+ return ;
301318 }
302- for (var entry : response .shardLevelFailures ().entrySet ()) {
303- final ShardId shardId = entry .getKey ();
304- trackShardLevelFailure (shardId , false , entry .getValue ());
305- pendingShardIds .add (shardId );
306- }
307- onAfter (response .completionInfo ());
319+ listener .onResponse (response .completionInfo ());
308320 }
309321
310322 @ Override
311323 public void onFailure (Exception e , boolean receivedData ) {
312- for (ShardId shardId : request .shardIds ) {
313- trackShardLevelFailure (shardId , receivedData , e );
314- pendingShardIds .add (shardId );
324+ try {
325+ for (ShardId shardId : request .shardIds ) {
326+ trackShardLevelFailure (shardId , receivedData , e );
327+ pendingShardIds .add (shardId );
328+ }
329+ onAfterRequest ();
330+ } catch (Exception ex ) {
331+ expectNoFailure ("expect no failure while handling failure of data node request" , ex );
332+ listener .onFailure (ex );
333+ return ;
315334 }
316- onAfter (DriverCompletionInfo .EMPTY );
335+ listener . onResponse (DriverCompletionInfo .EMPTY );
317336 }
318337
319338 @ Override
@@ -325,6 +344,11 @@ public void onSkip() {
325344 onResponse (new DataNodeComputeResponse (DriverCompletionInfo .EMPTY , Map .of ()));
326345 }
327346 }
347+
348+ private void expectNoFailure (String message , Exception e ) {
349+ LOGGER .error (message , e );
350+ assert false : new AssertionError (message , e );
351+ }
328352 });
329353 }
330354
@@ -515,15 +539,19 @@ Map<ShardId, List<DiscoveryNode>> resolveShards(Set<ShardId> shardIds) {
515539 var project = projectResolver .getProjectState (clusterService .state ());
516540 var nodes = Maps .<ShardId , List <DiscoveryNode >>newMapWithExpectedSize (shardIds .size ());
517541 for (var shardId : shardIds ) {
518- nodes . put (
519- shardId ,
520- project .routingTable ()
542+ List < DiscoveryNode > allocatedNodes ;
543+ try {
544+ allocatedNodes = project .routingTable ()
521545 .shardRoutingTable (shardId )
522546 .allShards ()
523547 .filter (shard -> shard .active () && shard .isSearchable ())
524548 .map (shard -> project .cluster ().nodes ().get (shard .currentNodeId ()))
525- .toList ()
526- );
549+ .toList ();
550+ } catch (Exception ex ) {
551+ assert ex instanceof IndexNotFoundException || ex instanceof ShardNotFoundException : new AssertionError (ex );
552+ continue ;
553+ }
554+ nodes .put (shardId , allocatedNodes );
527555 }
528556 return nodes ;
529557 }
0 commit comments