6464import org .elasticsearch .common .Priority ;
6565import org .elasticsearch .common .ReferenceDocs ;
6666import org .elasticsearch .common .Strings ;
67+ import org .elasticsearch .common .TriConsumer ;
6768import org .elasticsearch .common .UUIDs ;
6869import org .elasticsearch .common .collect .ImmutableOpenMap ;
6970import org .elasticsearch .common .component .AbstractLifecycleComponent ;
@@ -1874,9 +1875,7 @@ private static Tuple<ClusterState, List<SnapshotDeletionsInProgress.Entry>> read
18741875 final var projectRepo = new ProjectRepo (entry .projectId (), entry .repository ());
18751876 if (repositoriesSeen .add (projectRepo )
18761877 && entry .state () == SnapshotDeletionsInProgress .State .WAITING
1877- && snapshotsInProgress .forRepo (projectRepo )
1878- .stream ()
1879- .noneMatch (SnapshotsService ::isWritingToRepositoryOrAssignedQueued )) {
1878+ && snapshotsInProgress .forRepo (projectRepo ).stream ().noneMatch (SnapshotsService ::isWritingToRepositoryOrAssignedQueued )) {
18801879 changed = true ;
18811880 final SnapshotDeletionsInProgress .Entry newEntry = entry .started ();
18821881 readyDeletions .add (newEntry );
@@ -2307,13 +2306,7 @@ public ClusterState execute(ClusterState currentState) {
23072306 final SnapshotsInProgress .Entry abortedEntry = existing .abort (
23082307 currentState .nodes ().getLocalNodeId (),
23092308 ((shardId , shardSnapshotStatus ) -> completeAbortedAssignedQueuedRunnables .add (
2310- () -> innerUpdateSnapshotState (
2311- existing .snapshot (),
2312- shardId ,
2313- null ,
2314- shardSnapshotStatus ,
2315- ActionListener .noop ()
2316- )
2309+ () -> innerUpdateSnapshotState (existing .snapshot (), shardId , shardSnapshotStatus )
23172310 ))
23182311 );
23192312 if (abortedEntry == null ) {
@@ -2874,6 +2867,7 @@ public ClusterState execute(ClusterState currentState) {
28742867 updatedState ,
28752868 updatedSnapshotsInProgress ,
28762869 perNodeShardSnapshotCounter ,
2870+ SnapshotsService .this ::innerUpdateSnapshotState ,
28772871 ignore -> {},
28782872 () -> {},
28792873 () -> {}
@@ -3109,6 +3103,7 @@ private static SnapshotsInProgress maybeStartAssignedQueuedShardSnapshotsForRepo
31093103 ClusterState clusterState ,
31103104 SnapshotsInProgress snapshotsInProgress ,
31113105 PerNodeShardSnapshotCounter perNodeShardSnapshotCounter ,
3106+ TriConsumer <Snapshot , ShardId , ShardSnapshotStatus > shardStatusUpdateConsumer ,
31123107 Consumer <SnapshotsInProgress .Entry > newEntryConsumer ,
31133108 Runnable changedCallback ,
31143109 Runnable startedCallback
@@ -3127,6 +3122,7 @@ private static SnapshotsInProgress maybeStartAssignedQueuedShardSnapshotsForRepo
31273122 snapshotsInProgress ::isNodeIdForRemoval ,
31283123 shardsBuilder ,
31293124 perNodeShardSnapshotCounter ,
3125+ shardStatusUpdateConsumer ,
31303126 changedCallback ,
31313127 startedCallback
31323128 );
@@ -3148,6 +3144,7 @@ private static void maybeStartAssignedQueuedShardSnapshots(
31483144 Predicate <String > nodeIdRemovalPredicate ,
31493145 ImmutableOpenMap .Builder <ShardId , ShardSnapshotStatus > shardsBuilder ,
31503146 PerNodeShardSnapshotCounter perNodeShardSnapshotCounter ,
3147+ TriConsumer <Snapshot , ShardId , ShardSnapshotStatus > shardStatusUpdateConsumer ,
31513148 Runnable changedCallback ,
31523149 Runnable startedCallback
31533150 ) {
@@ -3170,7 +3167,14 @@ private static void maybeStartAssignedQueuedShardSnapshots(
31703167 nodeIdRemovalPredicate ,
31713168 perNodeShardSnapshotCounter
31723169 );
3173- if (newShardSnapshotStatus .equals (existingShardSnapshotStatus ) == false ) {
3170+ if (newShardSnapshotStatus .state ().completed ()) {
3171+ // It can become complete if the shard is unassigned or deleted, i.e. state == MISSING.
3172+ // We cannot directly update its status here because there maybe another snapshot for
3173+ // the same shard that is QUEUED which must be updated as well, i.e. vertical update.
3174+ // So we submit the status update to let it be processed in a future cluster state update.
3175+ shardStatusUpdateConsumer .apply (entry .snapshot (), shardId , newShardSnapshotStatus );
3176+ continue ;
3177+ } else if (newShardSnapshotStatus .equals (existingShardSnapshotStatus ) == false ) {
31743178 changedCallback .run ();
31753179 if (newShardSnapshotStatus .state () == ShardState .INIT ) {
31763180 startedCallback .run ();
@@ -3496,6 +3500,8 @@ static final class SnapshotShardsUpdateContext {
34963500 // handles the completion of some shard-snapshot updates, performing the next possible actions
34973501 private final ShardSnapshotUpdateCompletionHandler completionHandler ;
34983502
3503+ private final TriConsumer <Snapshot , ShardId , ShardSnapshotStatus > shardStatusUpdateConsumer ;
3504+
34993505 // entries that became complete due to this batch of updates
35003506 private final List <SnapshotsInProgress .Entry > newlyCompletedEntries = new ArrayList <>();
35013507
@@ -3507,11 +3513,13 @@ static final class SnapshotShardsUpdateContext {
35073513 SnapshotShardsUpdateContext (
35083514 ClusterStateTaskExecutor .BatchExecutionContext <SnapshotTask > batchExecutionContext ,
35093515 ShardSnapshotUpdateCompletionHandler completionHandler ,
3516+ TriConsumer <Snapshot , ShardId , ShardSnapshotStatus > shardStatusUpdateConsumer ,
35103517 int shardSnapshotPerNodeLimit ,
35113518 boolean isStateless
35123519 ) {
35133520 this .batchExecutionContext = batchExecutionContext ;
35143521 this .initialState = batchExecutionContext .initialState ();
3522+ this .shardStatusUpdateConsumer = shardStatusUpdateConsumer ;
35153523 this .nodeIdRemovalPredicate = SnapshotsInProgress .get (initialState )::isNodeIdForRemoval ;
35163524 this .completionHandler = completionHandler ;
35173525
@@ -3572,6 +3580,7 @@ SnapshotsInProgress computeUpdatedState() {
35723580 initialState ,
35733581 updated ,
35743582 perNodeShardSnapshotCounter ,
3583+ shardStatusUpdateConsumer ,
35753584 newEntry -> {
35763585 if (newEntry .state ().completed ()) {
35773586 newlyCompletedEntries .add (newEntry );
@@ -3704,6 +3713,7 @@ SnapshotsInProgress.Entry computeUpdatedSnapshotEntryFromShardUpdates() {
37043713 nodeIdRemovalPredicate ,
37053714 shardsBuilder ,
37063715 perNodeShardSnapshotCounter ,
3716+ shardStatusUpdateConsumer ,
37073717 () -> changedCount ++,
37083718 () -> startedCount ++
37093719 );
@@ -4076,6 +4086,10 @@ private void innerUpdateSnapshotState(
40764086 masterServiceTaskQueue .submitTask ("update snapshot state" , update , null );
40774087 }
40784088
4089+ private void innerUpdateSnapshotState (Snapshot snapshot , ShardId shardId , ShardSnapshotStatus updatedState ) {
4090+ innerUpdateSnapshotState (snapshot , shardId , null , updatedState , ActionListener .noop ());
4091+ }
4092+
40794093 /**
40804094 * Maybe kick off new shard clone operations for all repositories from all projects
40814095 */
@@ -4422,6 +4436,7 @@ public ClusterState execute(BatchExecutionContext<SnapshotTask> batchExecutionCo
44224436 final SnapshotShardsUpdateContext shardsUpdateContext = new SnapshotShardsUpdateContext (
44234437 batchExecutionContext ,
44244438 shardSnapshotUpdateCompletionHandler ,
4439+ SnapshotsService .this ::innerUpdateSnapshotState ,
44254440 shardSnapshotPerNodeLimit ,
44264441 isStateless
44274442 );
0 commit comments