5959import java .util .Map ;
6060import java .util .Objects ;
6161import java .util .Set ;
62+ import java .util .function .BiConsumer ;
6263import java .util .stream .Collectors ;
6364import java .util .stream .Stream ;
6465
@@ -879,6 +880,10 @@ public boolean isActiveOrAssignedQueued() {
879880 return isActive () || isAssignedQueued ();
880881 }
881882
883+ public boolean isAbortedAssignedQueued () {
884+ return state == ShardState .ABORTED && nodeId == null ;
885+ }
886+
882887 @ Override
883888 public void writeTo (StreamOutput out ) throws IOException {
884889 out .writeOptionalString (nodeId );
@@ -1267,18 +1272,17 @@ public Entry withClones(Map<RepositoryShardId, ShardSnapshotStatus> updatedClone
12671272 * In the special case where this instance has not yet made any progress on any shard this method just returns
12681273 * {@code null} since no abort is needed and the snapshot can simply be removed from the cluster state outright.
12691274 *
1270- * @param completedAssignedQueuedShards Map to accumulate assigned-queued shards that get aborted in this entry
12711275 * @return aborted snapshot entry or {@code null} if entry can be removed from the cluster state directly
12721276 */
12731277 @ Nullable
1274- public Entry abort (Map <ShardId , ShardSnapshotStatus > completedAssignedQueuedShards ) {
1278+ public Entry abort (String localNodeId , BiConsumer <ShardId , ShardSnapshotStatus > abortedAssignedQueuedShardConsumer ) {
12751279 final Map <ShardId , ShardSnapshotStatus > shardsBuilder = new HashMap <>();
12761280 boolean completed = true ;
12771281 boolean allQueued = true ;
12781282 for (Map .Entry <ShardId , ShardSnapshotStatus > shardEntry : shards .entrySet ()) {
12791283 ShardSnapshotStatus status = shardEntry .getValue ();
12801284 final var isAssignedQueued = status .isAssignedQueued ();
1281- allQueued &= status .state () == ShardState .QUEUED ;
1285+ allQueued &= ( status .state () == ShardState .QUEUED && isAssignedQueued == false ) ;
12821286 if (status .state ().completed () == false ) {
12831287 final String nodeId = status .nodeId ();
12841288 if (isAssignedQueued == false ) {
@@ -1289,15 +1293,21 @@ public Entry abort(Map<ShardId, ShardSnapshotStatus> completedAssignedQueuedShar
12891293 "aborted by snapshot deletion"
12901294 );
12911295 } else {
1292- // Record the deletion of the assigned-queued shard snapshot so that we can kick off the first QUEUED one
1293- // in later snapshots.
1294- final var old = completedAssignedQueuedShards .put (shardEntry .getKey (), status );
1295- assert old == null : old ;
12961296 assert isClone () == false
12971297 : "The state queued with generation should not be possible for a clone entry [" + this + "]" ;
12981298 final String reason = "assigned-queued aborted by snapshot deletion" ;
12991299 // Assigned QUEUED transitions to ABORTED (incomplete) and is completed by a separate cluster state update
1300- status = new ShardSnapshotStatus (nodeId , ShardState .FAILED , status .generation (), reason );
1300+ status = new ShardSnapshotStatus (
1301+ null , // use null nodeId to differentiate it from aborted INIT shard snapshot
1302+ ShardState .ABORTED ,
1303+ status .generation (),
1304+ reason
1305+ );
1306+ // Accumulate the updates needed to complete the aborted assigned-queued shard snapshots
1307+ abortedAssignedQueuedShardConsumer .accept (
1308+ shardEntry .getKey (),
1309+ new ShardSnapshotStatus (localNodeId , ShardState .FAILED , status .generation , reason )
1310+ );
13011311 }
13021312 }
13031313 completed &= status .state ().completed ();
0 commit comments