@@ -1013,7 +1013,7 @@ private static boolean assertNoDanglingSnapshots(ClusterState state) {
10131013 if (value .equals (ShardSnapshotStatus .UNASSIGNED_QUEUED )) {
10141014 assert reposWithRunningDelete .contains (new ProjectRepo (entry .projectId (), entry .repository ()))
10151015 : "Found shard snapshot waiting to be assigned in [" + entry + "] but it is not blocked by any running delete" ;
1016- } else if (value .isActiveOrAssignedQueued ()) {
1016+ } else if (value .isActiveOrQueuedWithGeneration ()) {
10171017 assert reposWithRunningDelete .contains (new ProjectRepo (entry .projectId (), entry .repository ())) == false
10181018 : "Found shard snapshot actively executing in ["
10191019 + entry
@@ -2842,7 +2842,6 @@ public ClusterState execute(ClusterState currentState) {
28422842 if (perNodeShardSnapshotCounter .hasCapacityOnAnyNode ()) {
28432843 final ProjectRepo repoForDeletedEntry = new ProjectRepo (deleteEntry .projectId (), deleteEntry .repository ());
28442844 SnapshotsInProgress updatedSnapshotsInProgress = snapshotsInProgress ;
2845- final Map <ProjectRepo , InFlightShardSnapshotStates > repoToInFlightShardSnapshotStates = new HashMap <>();
28462845 for (var repo : snapshotsInProgress .repos ()) {
28472846 if (repo .equals (repoForDeletedEntry )) {
28482847 continue ;
@@ -2855,16 +2854,12 @@ public ClusterState execute(ClusterState currentState) {
28552854 for (SnapshotsInProgress .Entry entry : oldEntries ) {
28562855 if (entry .isClone () == false && perNodeShardSnapshotCounter .hasCapacityOnAnyNode ()) {
28572856 final var shardsBuilder = ImmutableOpenMap .builder (entry .shards ());
2858- maybeStartAssignedQueuedShardSnapshot (
2857+ maybeStartQueuedWithGenerationShardSnapshots (
28592858 updatedState ,
28602859 entry ,
28612860 snapshotsInProgress ::isNodeIdForRemoval ,
28622861 shardsBuilder ,
28632862 perNodeShardSnapshotCounter ,
2864- () -> repoToInFlightShardSnapshotStates .computeIfAbsent (
2865- repo ,
2866- r -> InFlightShardSnapshotStates .forEntries (snapshotsInProgress .forRepo (r ))
2867- ),
28682863 () -> {},
28692864 () -> {}
28702865 );
@@ -3063,7 +3058,7 @@ private SnapshotsInProgress updatedSnapshotsInProgress(ClusterState currentState
30633058 : "Missing assignment for [" + sid + "]" ;
30643059 updatedAssignmentsBuilder .put (sid , ShardSnapshotStatus .MISSING );
30653060 } else {
3066- if (updated .isActiveOrAssignedQueued ()) {
3061+ if (updated .isActiveOrQueuedWithGeneration ()) {
30673062 markShardReassigned (shardId , reassignedShardIds );
30683063 }
30693064 updatedAssignmentsBuilder .put (sid , updated );
@@ -3105,21 +3100,19 @@ public String toString() {
31053100 }
31063101 }
31073102
3108- private static void maybeStartAssignedQueuedShardSnapshot (
3103+ private static void maybeStartQueuedWithGenerationShardSnapshots (
31093104 ClusterState clusterState ,
31103105 SnapshotsInProgress .Entry entry ,
31113106 Predicate <String > nodeIdRemovalPredicate ,
31123107 ImmutableOpenMap .Builder <ShardId , ShardSnapshotStatus > shardsBuilder ,
31133108 PerNodeShardSnapshotCounter perNodeShardSnapshotCounter ,
3114- Supplier <InFlightShardSnapshotStates > inflightShardStatesFunc ,
31153109 Runnable changedCallback ,
31163110 Runnable startedCallback
31173111 ) {
3118- // TODO check snapshot completed and skip
31193112 if (perNodeShardSnapshotCounter .hasCapacityOnAnyNode ()) {
31203113 for (var shardId : shardsBuilder .keys ()) {
31213114 final var existingShardSnapshotStatus = shardsBuilder .get (shardId );
3122- if (existingShardSnapshotStatus .isAssignedQueued () == false ) {
3115+ if (existingShardSnapshotStatus .isQueuedWithGeneration () == false ) {
31233116 continue ;
31243117 }
31253118 final IndexRoutingTable indexRouting = clusterState .routingTable (entry .projectId ()).index (shardId .getIndex ());
@@ -3304,7 +3297,7 @@ private static ShardSnapshotStatus initShardSnapshotStatus(
33043297 if (perNodeShardSnapshotCounter .tryStartShardSnapshotOnNode (primary .currentNodeId ())) {
33053298 shardSnapshotStatus = new ShardSnapshotStatus (primary .currentNodeId (), shardRepoGeneration );
33063299 } else {
3307- shardSnapshotStatus = ShardSnapshotStatus .assignedQueued ( primary . currentNodeId (), shardRepoGeneration );
3300+ shardSnapshotStatus = ShardSnapshotStatus .queuedWithGeneration ( shardRepoGeneration );
33083301 }
33093302 }
33103303 return shardSnapshotStatus ;
@@ -3466,8 +3459,6 @@ static final class SnapshotShardsUpdateContext {
34663459
34673460 private final PerNodeShardSnapshotCounter perNodeShardSnapshotCounter ;
34683461
3469- private final Map <ProjectRepo , InFlightShardSnapshotStates > perRepoInFlightShardStates = new HashMap <>();
3470-
34713462 /**
34723463 * Sets up {@link #updatesByRepo} to organize the {@link ShardSnapshotUpdate} tasks by repository name.
34733464 */
@@ -3500,13 +3491,6 @@ static final class SnapshotShardsUpdateContext {
35003491 );
35013492 }
35023493
3503- InFlightShardSnapshotStates getInFlightShardStates (ProjectRepo projectRepo ) {
3504- return perRepoInFlightShardStates .computeIfAbsent (
3505- projectRepo ,
3506- repo -> InFlightShardSnapshotStates .forEntries (SnapshotsInProgress .get (initialState ).forRepo (repo ))
3507- );
3508- }
3509-
35103494 /**
35113495 * Applies the {@link ShardSnapshotUpdate}s in {@link #updatesByRepo} to the relevant {@link SnapshotsInProgress.Entry} entries in
35123496 * the cluster state {@link #initialState}.
@@ -3546,7 +3530,15 @@ SnapshotsInProgress computeUpdatedState() {
35463530 for (SnapshotsInProgress .Entry entry : oldEntries ) {
35473531 if (entry .isClone () == false && perNodeShardSnapshotCounter .hasCapacityOnAnyNode ()) {
35483532 final var shardsBuilder = ImmutableOpenMap .builder (entry .shards ());
3549- maybeStartShardSnapshotHorizontally (entry , shardsBuilder );
3533+ maybeStartQueuedWithGenerationShardSnapshots (
3534+ initialState ,
3535+ entry ,
3536+ nodeIdRemovalPredicate ,
3537+ shardsBuilder ,
3538+ perNodeShardSnapshotCounter ,
3539+ () -> changedCount ++,
3540+ () -> startedCount ++
3541+ );
35503542 final var newEntry = entry .withShardStates (shardsBuilder .build ());
35513543 newEntries .add (newEntry );
35523544 if (newEntry != entry && newEntry .state ().completed ()) {
@@ -3570,22 +3562,6 @@ SnapshotsInProgress computeUpdatedState() {
35703562 return existing ;
35713563 }
35723564
3573- private void maybeStartShardSnapshotHorizontally (
3574- SnapshotsInProgress .Entry entry ,
3575- ImmutableOpenMap .Builder <ShardId , ShardSnapshotStatus > shardsBuilder
3576- ) {
3577- maybeStartAssignedQueuedShardSnapshot (
3578- initialState ,
3579- entry ,
3580- nodeIdRemovalPredicate ,
3581- shardsBuilder ,
3582- perNodeShardSnapshotCounter ,
3583- () -> getInFlightShardStates (new ProjectRepo (entry .projectId (), entry .repository ())),
3584- () -> changedCount ++,
3585- () -> startedCount ++
3586- );
3587- }
3588-
35893565 /**
35903566 * Sets up the final callback {@link #completionHandler} to run after the {@link MasterService} successfully publishes the batched
35913567 * update {@link #batchExecutionContext}. Also sets up the callers of the tasks within the {@link #batchExecutionContext} to receive
@@ -3690,7 +3666,15 @@ SnapshotsInProgress.Entry computeUpdatedSnapshotEntryFromShardUpdates() {
36903666 + " as well as "
36913667 + shardsBuilder ;
36923668
3693- maybeStartShardSnapshotHorizontally (entry , shardsBuilder );
3669+ maybeStartQueuedWithGenerationShardSnapshots (
3670+ initialState ,
3671+ entry ,
3672+ nodeIdRemovalPredicate ,
3673+ shardsBuilder ,
3674+ perNodeShardSnapshotCounter ,
3675+ () -> changedCount ++,
3676+ () -> startedCount ++
3677+ );
36943678 return entry .withShardStates (shardsBuilder .build ());
36953679 } else if (clonesBuilder != null ) {
36963680 return entry .withClones (clonesBuilder .build ());
@@ -3896,7 +3880,7 @@ private void startShardSnapshot(RepositoryShardId repoShardId, ShardGeneration g
38963880 if (shardSnapshotStatus .isActive ()) {
38973881 startShardOperation (shardsBuilder (), routingShardId , shardSnapshotStatus );
38983882 } else {
3899- if (shardSnapshotStatus .isAssignedQueued ()) {
3883+ if (shardSnapshotStatus .isQueuedWithGeneration ()) {
39003884 updatesIterator .remove ();
39013885 }
39023886 // update to queued snapshot did not result in an actual update execution so we just record it but keep applying
0 commit comments