@@ -1342,7 +1342,6 @@ private static ImmutableOpenMap<ShardId, ShardSnapshotStatus> processWaitingShar
13421342 shards .put (shardId , failedState );
13431343 knownFailures .put (shardSnapshotEntry .getKey (), failedState );
13441344 } else if (shardStatus .state ().completed () == false && shardStatus .nodeId () != null ) {
1345- // TODO: This branch applies to assigned-queued shards. It seems OK since it applies to INIT as well. Double check it.
13461345 if (nodes .nodeExists (shardStatus .nodeId ())) {
13471346 shards .put (shardId , shardStatus );
13481347 } else {
@@ -3089,16 +3088,16 @@ private static SnapshotsInProgress maybeStartAssignedQueuedShardSnapshotsForRepo
30893088 Runnable changedCallback ,
30903089 Runnable startedCallback
30913090 ) {
3091+ assert perNodeShardSnapshotCounter .hasCapacityOnAnyNode () : "no capacity left on any node " + perNodeShardSnapshotCounter ;
30923092 final List <SnapshotsInProgress .Entry > oldEntries = snapshotsInProgress .forRepo (projectRepo );
3093- if (oldEntries .isEmpty ()) {
3093+ if (oldEntries .isEmpty () || oldEntries . stream (). allMatch ( entry -> entry . hasAssignedQueuedShards () == false ) ) {
30943094 return snapshotsInProgress ;
30953095 }
30963096 final List <SnapshotsInProgress .Entry > newEntries = new ArrayList <>(oldEntries .size ());
30973097 for (SnapshotsInProgress .Entry entry : oldEntries ) {
3098- if (entry .isClone () == false && perNodeShardSnapshotCounter .hasCapacityOnAnyNode ()) {
3099- // TODO: Optimize this by checking whether the entry has any assigned-queued shards before building the shards map
3098+ if (entry .hasAssignedQueuedShards () && perNodeShardSnapshotCounter .hasCapacityOnAnyNode ()) {
31003099 final var shardsBuilder = ImmutableOpenMap .builder (entry .shards ());
3101- maybeStartAssignedQueuedShardSnapshots (
3100+ final var changed = maybeStartAssignedQueuedShardSnapshots (
31023101 clusterState ,
31033102 entry ,
31043103 snapshotsInProgress ::isNodeIdForRemoval ,
@@ -3108,10 +3107,12 @@ private static SnapshotsInProgress maybeStartAssignedQueuedShardSnapshotsForRepo
31083107 changedCallback ,
31093108 startedCallback
31103109 );
3111- final var newEntry = entry . withShardStates ( shardsBuilder . build ());
3112- newEntries . add ( newEntry );
3113- if ( newEntry != entry ) {
3110+ if ( changed ) {
3111+ final var newEntry = entry . withShardStates ( shardsBuilder . build () );
3112+ newEntries . add ( newEntry );
31143113 newEntryConsumer .accept (newEntry );
3114+ } else {
3115+ newEntries .add (entry );
31153116 }
31163117 } else {
31173118 newEntries .add (entry );
@@ -3120,7 +3121,7 @@ private static SnapshotsInProgress maybeStartAssignedQueuedShardSnapshotsForRepo
31203121 return snapshotsInProgress .createCopyWithUpdatedEntriesForRepo (projectRepo .projectId (), projectRepo .name (), newEntries );
31213122 }
31223123
3123- private static void maybeStartAssignedQueuedShardSnapshots (
3124+ private static boolean maybeStartAssignedQueuedShardSnapshots (
31243125 ClusterState clusterState ,
31253126 SnapshotsInProgress .Entry entry ,
31263127 Predicate <String > nodeIdRemovalPredicate ,
@@ -3130,41 +3131,46 @@ private static void maybeStartAssignedQueuedShardSnapshots(
31303131 Runnable changedCallback ,
31313132 Runnable startedCallback
31323133 ) {
3133- if (perNodeShardSnapshotCounter .hasCapacityOnAnyNode ()) {
3134- for (var shardId : shardsBuilder .keys ()) {
3135- final var existingShardSnapshotStatus = shardsBuilder .get (shardId );
3136- if (existingShardSnapshotStatus .isAssignedQueued () == false ) {
3137- continue ;
3138- }
3139- final IndexRoutingTable indexRouting = clusterState .routingTable (entry .projectId ()).index (shardId .getIndex ());
3140- final ShardRouting shardRouting ;
3141- if (indexRouting == null ) {
3142- shardRouting = null ;
3143- } else {
3144- shardRouting = indexRouting .shard (shardId .id ()).primaryShard ();
3145- }
3146- final var newShardSnapshotStatus = initShardSnapshotStatus (
3147- existingShardSnapshotStatus .generation (),
3148- shardRouting ,
3149- nodeIdRemovalPredicate ,
3150- perNodeShardSnapshotCounter
3151- );
3152- if (newShardSnapshotStatus .state ().completed ()) {
3153- // It can become complete if the shard is unassigned or deleted, i.e. state == MISSING.
3154- // We cannot directly update its status here because there maybe another snapshot for
3155- // the same shard that is QUEUED which must be updated as well, i.e. vertical update.
3156- // So we submit the status update to let it be processed in a future cluster state update.
3157- shardStatusUpdateConsumer .apply (entry .snapshot (), shardId , newShardSnapshotStatus );
3158- continue ;
3159- } else if (newShardSnapshotStatus .equals (existingShardSnapshotStatus ) == false ) {
3160- changedCallback .run ();
3161- if (newShardSnapshotStatus .state () == ShardState .INIT ) {
3162- startedCallback .run ();
3163- }
3134+ assert entry .hasAssignedQueuedShards () : "entry has no assigned queued shards: " + entry ;
3135+ assert perNodeShardSnapshotCounter .hasCapacityOnAnyNode () : "no capacity left on any node " + perNodeShardSnapshotCounter ;
3136+ boolean changed = false ;
3137+ for (var shardId : shardsBuilder .keys ()) {
3138+ if (perNodeShardSnapshotCounter .hasCapacityOnAnyNode () == false ) {
3139+ return changed ;
3140+ }
3141+ final var existingShardSnapshotStatus = shardsBuilder .get (shardId );
3142+ if (existingShardSnapshotStatus .isAssignedQueued () == false ) {
3143+ continue ;
3144+ }
3145+ final IndexRoutingTable indexRouting = clusterState .routingTable (entry .projectId ()).index (shardId .getIndex ());
3146+ final ShardRouting shardRouting ;
3147+ if (indexRouting == null ) {
3148+ shardRouting = null ;
3149+ } else {
3150+ shardRouting = indexRouting .shard (shardId .id ()).primaryShard ();
3151+ }
3152+ final var newShardSnapshotStatus = initShardSnapshotStatus (
3153+ existingShardSnapshotStatus .generation (),
3154+ shardRouting ,
3155+ nodeIdRemovalPredicate ,
3156+ perNodeShardSnapshotCounter
3157+ );
3158+ if (newShardSnapshotStatus .state ().completed ()) {
3159+ // It can become complete if the shard is unassigned or deleted, i.e. state == MISSING.
3160+ // We cannot directly update its status here because there maybe another snapshot for
3161+ // the same shard that is QUEUED which must be updated as well, i.e. vertical update.
3162+ // So we submit the status update to let it be processed in a future cluster state update.
3163+ shardStatusUpdateConsumer .apply (entry .snapshot (), shardId , newShardSnapshotStatus );
3164+ } else if (newShardSnapshotStatus .equals (existingShardSnapshotStatus ) == false ) {
3165+ changedCallback .run ();
3166+ if (newShardSnapshotStatus .state () == ShardState .INIT ) {
3167+ startedCallback .run ();
31643168 }
31653169 shardsBuilder .put (shardId , newShardSnapshotStatus );
3170+ changed = true ;
31663171 }
31673172 }
3173+ return changed ;
31683174 }
31693175
31703176 /**
@@ -3555,7 +3561,7 @@ SnapshotsInProgress computeUpdatedState() {
35553561 // due to snapshots running for a repository that is update to completion in this batch.
35563562 // (2) Repos that have seen updates in this batch because updates releasing capacity may all belong to later snapshots
35573563 // than the one has assigned-queued shards. These updates could be either for the same repo or a different repo.
3558- for (var repo : existing .repos ()) {
3564+ for (var repo : updated .repos ()) {
35593565 if (perNodeShardSnapshotCounter .hasCapacityOnAnyNode () == false ) {
35603566 break ;
35613567 }
@@ -3690,18 +3696,22 @@ SnapshotsInProgress.Entry computeUpdatedSnapshotEntryFromShardUpdates() {
36903696 + " as well as "
36913697 + shardsBuilder ;
36923698
3693- // Shard snapshots changed status for this entry, check within the snapshot to see whether any previously limited
3694- // shard snapshots can now start due to newly completed ones.
3695- maybeStartAssignedQueuedShardSnapshots (
3696- initialState ,
3697- entry ,
3698- nodeIdRemovalPredicate ,
3699- shardsBuilder ,
3700- perNodeShardSnapshotCounter ,
3701- shardStatusUpdateConsumer ,
3702- () -> changedCount ++,
3703- () -> startedCount ++
3704- );
3699+ if (entry .hasAssignedQueuedShards () && perNodeShardSnapshotCounter .hasCapacityOnAnyNode ()) {
3700+ // Shard snapshots changed status for this entry, check within the snapshot to see whether any previously limited
3701+ // shard snapshots can now start due to newly completed ones. This is only necessary if the entry has any
3702+ // assigned-queued shards before the update. If the entry gets any new assigned-queued shards from processing the
3703+ // update, they cannot be started anyway because they already reflect the latest node capacities.
3704+ maybeStartAssignedQueuedShardSnapshots (
3705+ initialState ,
3706+ entry ,
3707+ nodeIdRemovalPredicate ,
3708+ shardsBuilder ,
3709+ perNodeShardSnapshotCounter ,
3710+ shardStatusUpdateConsumer ,
3711+ () -> changedCount ++,
3712+ () -> startedCount ++
3713+ );
3714+ }
37053715 return entry .withShardStates (shardsBuilder .build ());
37063716 } else if (clonesBuilder != null ) {
37073717 return entry .withClones (clonesBuilder .build ());
0 commit comments