@@ -292,79 +292,22 @@ private static class ShardFailedTransportHandler implements TransportRequestHand
292292
293293 @ Override
294294 public void messageReceived (FailedShardEntry request , TransportChannel channel , Task task ) throws Exception {
295- logger .debug (() -> new ParameterizedMessage ("{} received shard failed for {}" , request .shardId , request ), request .failure );
295+ logger .debug (
296+ () -> new ParameterizedMessage ("{} received shard failed for [{}]" , request .getShardId (), request ),
297+ request .failure
298+ );
299+ var update = new FailedShardUpdateTask (request , new ChannelActionListener <>(channel , TASK_SOURCE , request ));
296300 clusterService .submitStateUpdateTask (
297301 TASK_SOURCE ,
298- request ,
302+ update ,
299303 ClusterStateTaskConfig .build (Priority .HIGH ),
300304 shardFailedClusterStateTaskExecutor ,
301- new ClusterStateTaskListener () {
302- @ Override
303- public void onFailure (Exception e ) {
304- final MessageSupplier msg = () -> new ParameterizedMessage (
305- "{} unexpected failure while failing shard [{}]" ,
306- request .shardId ,
307- request
308- );
309- if (e instanceof FailedToCommitClusterStateException ) {
310- logger .debug (msg , e );
311- } else {
312- logger .error (msg , e );
313- }
314- try {
315- channel .sendResponse (e );
316- } catch (Exception channelException ) {
317- channelException .addSuppressed (e );
318- logger .warn (
319- () -> new ParameterizedMessage (
320- "{} failed to send failure [{}] while failing shard [{}]" ,
321- request .shardId ,
322- e ,
323- request
324- ),
325- channelException
326- );
327- }
328- }
329-
330- @ Override
331- public void onNoLongerMaster () {
332- logger .debug ("{} no longer master while failing shard [{}]" , request .shardId , request );
333- try {
334- channel .sendResponse (new NotMasterException (TASK_SOURCE ));
335- } catch (Exception channelException ) {
336- logger .warn (
337- () -> new ParameterizedMessage (
338- "{} failed to send no longer master while failing shard [{}]" ,
339- request .shardId ,
340- request
341- ),
342- channelException
343- );
344- }
345- }
346-
347- @ Override
348- public void clusterStateProcessed (ClusterState oldState , ClusterState newState ) {
349- try {
350- channel .sendResponse (TransportResponse .Empty .INSTANCE );
351- } catch (Exception channelException ) {
352- logger .warn (
353- () -> new ParameterizedMessage (
354- "{} failed to send response while failing shard [{}]" ,
355- request .shardId ,
356- request
357- ),
358- channelException
359- );
360- }
361- }
362- }
305+ update
363306 );
364307 }
365308 }
366309
367- public static class ShardFailedClusterStateTaskExecutor implements ClusterStateTaskExecutor <FailedShardEntry > {
310+ public static class ShardFailedClusterStateTaskExecutor implements ClusterStateTaskExecutor <FailedShardUpdateTask > {
368311 private final AllocationService allocationService ;
369312 private final RerouteService rerouteService ;
370313
@@ -374,17 +317,24 @@ public ShardFailedClusterStateTaskExecutor(AllocationService allocationService,
374317 }
375318
376319 @ Override
377- public ClusterTasksResult <FailedShardEntry > execute (ClusterState currentState , List <FailedShardEntry > tasks ) throws Exception {
378- ClusterTasksResult .Builder <FailedShardEntry > batchResultBuilder = ClusterTasksResult .builder ();
379- List <FailedShardEntry > tasksToBeApplied = new ArrayList <>();
320+ public ClusterTasksResult <FailedShardUpdateTask > execute (ClusterState currentState , List <FailedShardUpdateTask > tasks )
321+ throws Exception {
322+ ClusterTasksResult .Builder <FailedShardUpdateTask > batchResultBuilder = ClusterTasksResult .builder ();
323+ List <FailedShardUpdateTask > tasksToBeApplied = new ArrayList <>();
380324 List <FailedShard > failedShardsToBeApplied = new ArrayList <>();
381325 List <StaleShard > staleShardsToBeApplied = new ArrayList <>();
382326
383- for (FailedShardEntry task : tasks ) {
384- IndexMetadata indexMetadata = currentState .metadata ().index (task .shardId .getIndex ());
327+ for (FailedShardUpdateTask task : tasks ) {
328+ FailedShardEntry entry = task .getEntry ();
329+ IndexMetadata indexMetadata = currentState .metadata ().index (entry .getShardId ().getIndex ());
385330 if (indexMetadata == null ) {
386331 // tasks that correspond to non-existent indices are marked as successful
387- logger .debug ("{} ignoring shard failed task [{}] (unknown index {})" , task .shardId , task , task .shardId .getIndex ());
332+ logger .debug (
333+ "{} ignoring shard failed task [{}] (unknown index {})" ,
334+ entry .getShardId (),
335+ entry ,
336+ entry .getShardId ().getIndex ()
337+ );
388338 batchResultBuilder .success (task );
389339 } else {
390340 // The primary term is 0 if the shard failed itself. It is > 0 if a write was done on a primary but was failed to be
@@ -395,29 +345,29 @@ public ClusterTasksResult<FailedShardEntry> execute(ClusterState currentState, L
395345 // We check here that the primary to which the write happened was not already failed in an earlier cluster state update.
396346 // This prevents situations where a new primary has already been selected and replication failures from an old stale
397347 // primary unnecessarily fail currently active shards.
398- if (task .primaryTerm > 0 ) {
399- long currentPrimaryTerm = indexMetadata .primaryTerm (task . shardId .id ());
400- if (currentPrimaryTerm != task .primaryTerm ) {
401- assert currentPrimaryTerm > task .primaryTerm
348+ if (entry .primaryTerm > 0 ) {
349+ long currentPrimaryTerm = indexMetadata .primaryTerm (entry . getShardId () .id ());
350+ if (currentPrimaryTerm != entry .primaryTerm ) {
351+ assert currentPrimaryTerm > entry .primaryTerm
402352 : "received a primary term with a higher term than in the "
403353 + "current cluster state (received ["
404- + task .primaryTerm
354+ + entry .primaryTerm
405355 + "] but current is ["
406356 + currentPrimaryTerm
407357 + "])" ;
408358 logger .debug (
409359 "{} failing shard failed task [{}] (primary term {} does not match current term {})" ,
410- task . shardId ,
411- task ,
412- task .primaryTerm ,
413- indexMetadata .primaryTerm (task . shardId .id ())
360+ entry . getShardId () ,
361+ entry ,
362+ entry .primaryTerm ,
363+ indexMetadata .primaryTerm (entry . getShardId () .id ())
414364 );
415365 batchResultBuilder .failure (
416366 task ,
417367 new NoLongerPrimaryShardException (
418- task . shardId ,
368+ entry . getShardId () ,
419369 "primary term ["
420- + task .primaryTerm
370+ + entry .primaryTerm
421371 + "] did not match current primary term ["
422372 + currentPrimaryTerm
423373 + "]"
@@ -427,26 +377,31 @@ public ClusterTasksResult<FailedShardEntry> execute(ClusterState currentState, L
427377 }
428378 }
429379
430- ShardRouting matched = currentState .getRoutingTable ().getByAllocationId (task . shardId , task . allocationId );
380+ ShardRouting matched = currentState .getRoutingTable ().getByAllocationId (entry . getShardId (), entry . getAllocationId () );
431381 if (matched == null ) {
432- Set <String > inSyncAllocationIds = indexMetadata .inSyncAllocationIds (task . shardId .id ());
382+ Set <String > inSyncAllocationIds = indexMetadata .inSyncAllocationIds (entry . getShardId () .id ());
433383 // mark shard copies without routing entries that are in in-sync allocations set only as stale if the reason why
434384 // they were failed is because a write made it into the primary but not to this copy (which corresponds to
435385 // the check "primaryTerm > 0").
436- if (task .primaryTerm > 0 && inSyncAllocationIds .contains (task .allocationId )) {
437- logger .debug ("{} marking shard {} as stale (shard failed task: [{}])" , task .shardId , task .allocationId , task );
386+ if (entry .primaryTerm > 0 && inSyncAllocationIds .contains (entry .getAllocationId ())) {
387+ logger .debug (
388+ "{} marking shard {} as stale (shard failed task: [{}])" ,
389+ entry .getShardId (),
390+ entry .getAllocationId (),
391+ entry
392+ );
438393 tasksToBeApplied .add (task );
439- staleShardsToBeApplied .add (new StaleShard (task . shardId , task . allocationId ));
394+ staleShardsToBeApplied .add (new StaleShard (entry . getShardId (), entry . getAllocationId () ));
440395 } else {
441396 // tasks that correspond to non-existent shards are marked as successful
442- logger .debug ("{} ignoring shard failed task [{}] (shard does not exist anymore)" , task . shardId , task );
397+ logger .debug ("{} ignoring shard failed task [{}] (shard does not exist anymore)" , entry . getShardId (), entry );
443398 batchResultBuilder .success (task );
444399 }
445400 } else {
446401 // failing a shard also possibly marks it as stale (see IndexMetadataUpdater)
447- logger .debug ("{} failing shard {} (shard failed task: [{}])" , task . shardId , matched , task );
402+ logger .debug ("{} failing shard {} (shard failed task: [{}])" , entry . getShardId () , matched , task );
448403 tasksToBeApplied .add (task );
449- failedShardsToBeApplied .add (new FailedShard (matched , task .message , task .failure , task .markAsStale ));
404+ failedShardsToBeApplied .add (new FailedShard (matched , entry .message , entry .failure , entry .markAsStale ));
450405 }
451406 }
452407 }
@@ -578,6 +533,43 @@ public int hashCode() {
578533 }
579534 }
580535
536+ public static class FailedShardUpdateTask implements ClusterStateTaskListener {
537+
538+ private final FailedShardEntry entry ;
539+ private final ActionListener <TransportResponse .Empty > listener ;
540+
541+ public FailedShardUpdateTask (FailedShardEntry entry , ActionListener <TransportResponse .Empty > listener ) {
542+ this .entry = entry ;
543+ this .listener = listener ;
544+ }
545+
546+ public FailedShardEntry getEntry () {
547+ return entry ;
548+ }
549+
550+ @ Override
551+ public void onFailure (Exception e ) {
552+ if (e instanceof NotMasterException ) {
553+ logger .debug (() -> new ParameterizedMessage ("{} no longer master while failing shard [{}]" , entry .shardId , entry ));
554+ } else if (e instanceof FailedToCommitClusterStateException ) {
555+ logger .debug (() -> new ParameterizedMessage ("{} unexpected failure while failing shard [{}]" , entry .shardId , entry ), e );
556+ } else {
557+ logger .error (() -> new ParameterizedMessage ("{} unexpected failure while failing shard [{}]" , entry .shardId , entry ), e );
558+ }
559+ listener .onFailure (e );
560+ }
561+
562+ @ Override
563+ public void clusterStateProcessed (ClusterState oldState , ClusterState newState ) {
564+ listener .onResponse (TransportResponse .Empty .INSTANCE );
565+ }
566+
567+ @ Override
568+ public String toString () {
569+ return "FailedShardUpdateTask{entry=" + entry + ", listener=" + listener + "}" ;
570+ }
571+ }
572+
581573 public void shardStarted (
582574 final ShardRouting shardRouting ,
583575 final long primaryTerm ,
0 commit comments