3030import org .elasticsearch .common .Strings ;
3131import org .elasticsearch .common .settings .ClusterSettings ;
3232import org .elasticsearch .common .settings .Setting ;
33+ import org .elasticsearch .common .time .TimeProvider ;
3334import org .elasticsearch .core .TimeValue ;
3435import org .elasticsearch .core .Tuple ;
3536import org .elasticsearch .gateway .PriorityComparator ;
3637import org .elasticsearch .index .IndexVersions ;
3738import org .elasticsearch .index .shard .ShardId ;
38- import org .elasticsearch .threadpool .ThreadPool ;
3939
4040import java .util .Comparator ;
4141import java .util .Iterator ;
@@ -83,17 +83,16 @@ public class DesiredBalanceReconciler {
8383 private double undesiredAllocationsLogThreshold ;
8484 private final NodeAllocationOrdering allocationOrdering = new NodeAllocationOrdering ();
8585 private final NodeAllocationOrdering moveOrdering = new NodeAllocationOrdering ();
86+ private final UndesiredAllocationsTracker undesiredAllocationsTracker ;
8687
87- public DesiredBalanceReconciler (ClusterSettings clusterSettings , ThreadPool threadPool ) {
88- this .undesiredAllocationLogInterval = new FrequencyCappedAction (
89- threadPool .relativeTimeInMillisSupplier (),
90- TimeValue .timeValueMinutes (5 )
91- );
88+ public DesiredBalanceReconciler (ClusterSettings clusterSettings , TimeProvider timeProvider ) {
89+ this .undesiredAllocationLogInterval = new FrequencyCappedAction (timeProvider ::relativeTimeInMillis , TimeValue .timeValueMinutes (5 ));
9290 clusterSettings .initializeAndWatch (UNDESIRED_ALLOCATIONS_LOG_INTERVAL_SETTING , this .undesiredAllocationLogInterval ::setMinInterval );
9391 clusterSettings .initializeAndWatch (
9492 UNDESIRED_ALLOCATIONS_LOG_THRESHOLD_SETTING ,
9593 value -> this .undesiredAllocationsLogThreshold = value
9694 );
95+ this .undesiredAllocationsTracker = new UndesiredAllocationsTracker (clusterSettings , timeProvider );
9796 }
9897
9998 /**
@@ -114,6 +113,7 @@ public DesiredBalanceMetrics.AllocationStats reconcile(DesiredBalance desiredBal
114113 public void clear () {
115114 allocationOrdering .clear ();
116115 moveOrdering .clear ();
116+ undesiredAllocationsTracker .clear ();
117117 }
118118
119119 /**
@@ -134,6 +134,7 @@ private class Reconciliation {
134134 }
135135
136136 DesiredBalanceMetrics .AllocationStats run () {
137+ undesiredAllocationsTracker .cleanup (routingNodes );
137138 try (var ignored = allocation .withReconcilingFlag ()) {
138139
139140 logger .debug ("Reconciling desired balance for [{}]" , desiredBalance .lastConvergedIndex ());
@@ -493,35 +494,51 @@ private void moveShards() {
493494
494495 if (assignment .nodeIds ().contains (shardRouting .currentNodeId ())) {
495496 // shard is already on a desired node
497+ undesiredAllocationsTracker .removeTracking (shardRouting );
496498 continue ;
497499 }
498500
499- if (allocation .deciders ().canAllocate (shardRouting , allocation ).type () != Decision .Type .YES ) {
500- // cannot allocate anywhere, no point in looking for a target node
501- continue ;
502- }
501+ boolean movedUndesiredShard = false ;
502+ try {
503+ if (allocation .deciders ().canAllocate (shardRouting , allocation ).type () != Decision .Type .YES ) {
504+ // cannot allocate anywhere, no point in looking for a target node
505+ continue ;
506+ }
503507
504- final var routingNode = routingNodes .node (shardRouting .currentNodeId ());
505- final var canRemainDecision = allocation .deciders ().canRemain (shardRouting , routingNode , allocation );
506- if (canRemainDecision .type () != Decision .Type .NO && canRemainDecision .type () != Decision .Type .NOT_PREFERRED ) {
507- // If movement is throttled, a future reconciliation round will see a resolution. For now, leave it alone.
508- // Reconciliation treats canRemain NOT_PREFERRED answers as YES because the DesiredBalance computation already decided
509- // how to handle the situation.
510- continue ;
511- }
508+ final var routingNode = routingNodes .node (shardRouting .currentNodeId ());
509+ final var canRemainDecision = allocation .deciders ().canRemain (shardRouting , routingNode , allocation );
510+ if (canRemainDecision .type () != Decision .Type .NO && canRemainDecision .type () != Decision .Type .NOT_PREFERRED ) {
511+ // If movement is throttled, a future reconciliation round will see a resolution. For now, leave it alone.
512+ // Reconciliation treats canRemain NOT_PREFERRED answers as YES because the DesiredBalance computation already
513+ // decided how to handle the situation.
514+ continue ;
515+ }
512516
513- final var moveTarget = findRelocationTarget (shardRouting , assignment .nodeIds ());
514- if (moveTarget != null ) {
515- logger .debug ("Moving shard {} from {} to {}" , shardRouting .shardId (), shardRouting .currentNodeId (), moveTarget .getId ());
516- routingNodes .relocateShard (
517- shardRouting ,
518- moveTarget .getId (),
519- allocation .clusterInfo ().getShardSize (shardRouting , ShardRouting .UNAVAILABLE_EXPECTED_SHARD_SIZE ),
520- "move" ,
521- allocation .changes ()
522- );
523- iterator .dePrioritizeNode (shardRouting .currentNodeId ());
524- moveOrdering .recordAllocation (shardRouting .currentNodeId ());
517+ final var moveTarget = findRelocationTarget (shardRouting , assignment .nodeIds ());
518+ if (moveTarget != null ) {
519+ logger .debug (
520+ "Moving shard {} from {} to {}" ,
521+ shardRouting .shardId (),
522+ shardRouting .currentNodeId (),
523+ moveTarget .getId ()
524+ );
525+ routingNodes .relocateShard (
526+ shardRouting ,
527+ moveTarget .getId (),
528+ allocation .clusterInfo ().getShardSize (shardRouting , ShardRouting .UNAVAILABLE_EXPECTED_SHARD_SIZE ),
529+ "move" ,
530+ allocation .changes ()
531+ );
532+ iterator .dePrioritizeNode (shardRouting .currentNodeId ());
533+ moveOrdering .recordAllocation (shardRouting .currentNodeId ());
534+ movedUndesiredShard = true ;
535+ }
536+ } finally {
537+ if (movedUndesiredShard ) {
538+ undesiredAllocationsTracker .removeTracking (shardRouting );
539+ } else {
540+ undesiredAllocationsTracker .trackUndesiredAllocation (shardRouting );
541+ }
525542 }
526543 }
527544 }
@@ -555,51 +572,63 @@ private DesiredBalanceMetrics.AllocationStats balance() {
555572
556573 if (assignment .nodeIds ().contains (shardRouting .currentNodeId ())) {
557574 // shard is already on a desired node
575+ undesiredAllocationsTracker .removeTracking (shardRouting );
558576 continue ;
559577 }
560578
561- if (allocation .metadata ().nodeShutdowns ().contains (shardRouting .currentNodeId ()) == false ) {
562- // shard is not on a shutting down node, nor is it on a desired node per the previous check.
563- undesiredAllocationsExcludingShuttingDownNodes ++;
564- undesiredAllocationsExcludingShuttingDownNodesByRole .addTo (shardRouting .role (), 1 );
565- }
566-
567- if (allocation .deciders ().canRebalance (allocation ).type () != Decision .Type .YES ) {
568- // Rebalancing is disabled, we're just here to collect the AllocationStats to return.
569- continue ;
570- }
579+ boolean movedUndesiredShard = false ;
580+ try {
581+ if (allocation .metadata ().nodeShutdowns ().contains (shardRouting .currentNodeId ()) == false ) {
582+ // shard is not on a shutting down node, nor is it on a desired node per the previous check.
583+ undesiredAllocationsExcludingShuttingDownNodes ++;
584+ undesiredAllocationsExcludingShuttingDownNodesByRole .addTo (shardRouting .role (), 1 );
585+ }
571586
572- if (allocation .deciders ().canRebalance (shardRouting , allocation ).type () != Decision .Type .YES ) {
573- // rebalancing disabled for this shard
574- continue ;
575- }
587+ if (allocation .deciders ().canRebalance (allocation ).type () != Decision .Type .YES ) {
588+ // Rebalancing is disabled, we're just here to collect the AllocationStats to return.
589+ continue ;
590+ }
576591
577- if (allocation .deciders ().canAllocate (shardRouting , allocation ).type () != Decision .Type .YES ) {
578- // cannot allocate anywhere, no point in looking for a target node
579- continue ;
580- }
592+ if (allocation .deciders ().canRebalance (shardRouting , allocation ).type () != Decision .Type .YES ) {
593+ // rebalancing disabled for this shard
594+ continue ;
595+ }
581596
582- final var rebalanceTarget = findRelocationTarget (shardRouting , assignment .nodeIds (), this ::decideCanAllocate );
583- if (rebalanceTarget != null ) {
584- logger .debug (
585- "Rebalancing shard {} from {} to {}" ,
586- shardRouting .shardId (),
587- shardRouting .currentNodeId (),
588- rebalanceTarget .getId ()
589- );
597+ if (allocation .deciders ().canAllocate (shardRouting , allocation ).type () != Decision .Type .YES ) {
598+ // cannot allocate anywhere, no point in looking for a target node
599+ continue ;
600+ }
590601
591- routingNodes .relocateShard (
592- shardRouting ,
593- rebalanceTarget .getId (),
594- allocation .clusterInfo ().getShardSize (shardRouting , ShardRouting .UNAVAILABLE_EXPECTED_SHARD_SIZE ),
595- "rebalance" ,
596- allocation .changes ()
597- );
598- iterator .dePrioritizeNode (shardRouting .currentNodeId ());
599- moveOrdering .recordAllocation (shardRouting .currentNodeId ());
602+ final var rebalanceTarget = findRelocationTarget (shardRouting , assignment .nodeIds (), this ::decideCanAllocate );
603+ if (rebalanceTarget != null ) {
604+ logger .debug (
605+ "Rebalancing shard {} from {} to {}" ,
606+ shardRouting .shardId (),
607+ shardRouting .currentNodeId (),
608+ rebalanceTarget .getId ()
609+ );
610+
611+ routingNodes .relocateShard (
612+ shardRouting ,
613+ rebalanceTarget .getId (),
614+ allocation .clusterInfo ().getShardSize (shardRouting , ShardRouting .UNAVAILABLE_EXPECTED_SHARD_SIZE ),
615+ "rebalance" ,
616+ allocation .changes ()
617+ );
618+ iterator .dePrioritizeNode (shardRouting .currentNodeId ());
619+ moveOrdering .recordAllocation (shardRouting .currentNodeId ());
620+ movedUndesiredShard = true ;
621+ }
622+ } finally {
623+ if (movedUndesiredShard ) {
624+ undesiredAllocationsTracker .removeTracking (shardRouting );
625+ } else {
626+ undesiredAllocationsTracker .trackUndesiredAllocation (shardRouting );
627+ }
600628 }
601629 }
602630
631+ undesiredAllocationsTracker .maybeLogUndesiredShardsWarning (routingNodes , allocation , desiredBalance );
603632 maybeLogUndesiredAllocationsWarning (totalAllocations , undesiredAllocationsExcludingShuttingDownNodes , routingNodes .size ());
604633 return new DesiredBalanceMetrics .AllocationStats (
605634 unassignedShards ,
0 commit comments