@@ -1056,6 +1056,12 @@ public ClusterState execute(ClusterState currentState) {
10561056 final SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress .get (currentState );
10571057 final SnapshotDeletionsInProgress deletesInProgress = SnapshotDeletionsInProgress .get (currentState );
10581058 DiscoveryNodes nodes = currentState .nodes ();
1059+ final var perNodeShardSnapshotCounter = new PerNodeShardSnapshotCounter (
1060+ shardSnapshotPerNodeLimit ,
1061+ snapshotsInProgress ,
1062+ nodes ,
1063+ isStateless
1064+ );
10591065 final EnumSet <SnapshotsInProgress .State > statesToUpdate ;
10601066 if (changedNodes ) {
10611067 // If we are reacting to a change in the cluster node configuration we have to update the shard states of both started
@@ -1148,6 +1154,7 @@ public ClusterState execute(ClusterState currentState) {
11481154 currentState .routingTable (projectId ),
11491155 nodes ,
11501156 snapshotsInProgress ::isNodeIdForRemoval ,
1157+ perNodeShardSnapshotCounter ,
11511158 knownFailures
11521159 );
11531160 if (shards != null ) {
@@ -1240,6 +1247,7 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState)
12401247 * failed shard snapshots on the same shard IDs.
12411248 *
12421249 * @param nodeIdRemovalPredicate identify any nodes that are marked for removal / in shutdown mode
1250+ * @param perNodeShardSnapshotCounter The counter to keep track the number of shard snapshots running per node
12431251 * @param knownFailures already known failed shard snapshots, but more may be found in this method
12441252 * @return an updated map of shard statuses
12451253 */
@@ -1248,6 +1256,7 @@ private static ImmutableOpenMap<ShardId, ShardSnapshotStatus> processWaitingShar
12481256 RoutingTable routingTable ,
12491257 DiscoveryNodes nodes ,
12501258 Predicate <String > nodeIdRemovalPredicate ,
1259+ PerNodeShardSnapshotCounter perNodeShardSnapshotCounter ,
12511260 Map <RepositoryShardId , ShardSnapshotStatus > knownFailures
12521261 ) {
12531262 assert snapshotEntry .isClone () == false : "clones take a different path" ;
@@ -1306,8 +1315,11 @@ private static ImmutableOpenMap<ShardId, ShardSnapshotStatus> processWaitingShar
13061315 Starting shard [{}] with shard generation [{}] that we were waiting to start on node [{}]. Previous \
13071316 shard state [{}]
13081317 """ , shardId , shardStatus .generation (), shardStatus .nodeId (), shardStatus .state ());
1309- // TODO: The following should check node capacity
1310- shards .put (shardId , new ShardSnapshotStatus (primaryNodeId , shardStatus .generation ()));
1318+ if (perNodeShardSnapshotCounter .tryStartShardSnapshotOnNode (primaryNodeId )) {
1319+ shards .put (shardId , new ShardSnapshotStatus (primaryNodeId , shardStatus .generation ()));
1320+ } else {
1321+ shards .put (shardId , ShardSnapshotStatus .assignedQueued (primaryNodeId , shardStatus .generation ()));
1322+ }
13111323 continue ;
13121324 } else if (shardRouting .primaryShard ().initializing () || shardRouting .primaryShard ().relocating ()) {
13131325 // Shard that we were waiting for hasn't started yet or still relocating - will continue to wait
0 commit comments