5959import java .util .Map ;
6060import java .util .Objects ;
6161import java .util .Set ;
62- import java .util .function .BiConsumer ;
6362import java .util .stream .Collectors ;
6463import java .util .stream .Stream ;
6564
@@ -1272,17 +1271,18 @@ public Entry withClones(Map<RepositoryShardId, ShardSnapshotStatus> updatedClone
12721271 * In the special case where this instance has not yet made any progress on any shard this method just returns
12731272 * {@code null} since no abort is needed and the snapshot can simply be removed from the cluster state outright.
12741273 *
1274+ * @param completedAssignedQueuedShards Map to accumulate assigned-queued shards that get aborted in this entry
12751275 * @return aborted snapshot entry or {@code null} if entry can be removed from the cluster state directly
12761276 */
12771277 @ Nullable
1278- public Entry abort (String localNodeId , BiConsumer < ShardId , ShardSnapshotStatus > abortedAssignedQueuedShardConsumer ) {
1278+ public Entry abort (Map < ShardId , Tuple < Snapshot , ShardSnapshotStatus >> completedAssignedQueuedShards ) {
12791279 final Map <ShardId , ShardSnapshotStatus > shardsBuilder = new HashMap <>();
12801280 boolean completed = true ;
12811281 boolean allQueued = true ;
12821282 for (Map .Entry <ShardId , ShardSnapshotStatus > shardEntry : shards .entrySet ()) {
12831283 ShardSnapshotStatus status = shardEntry .getValue ();
12841284 final var isAssignedQueued = status .isAssignedQueued ();
1285- allQueued &= ( status .state () == ShardState .QUEUED && isAssignedQueued == false ) ;
1285+ allQueued &= status .state () == ShardState .QUEUED ;
12861286 if (status .state ().completed () == false ) {
12871287 final String nodeId = status .nodeId ();
12881288 if (isAssignedQueued == false ) {
@@ -1293,21 +1293,14 @@ public Entry abort(String localNodeId, BiConsumer<ShardId, ShardSnapshotStatus>
12931293 "aborted by snapshot deletion"
12941294 );
12951295 } else {
1296+ // Record the deleted assigned-queued shard snapshot so that we can reassign to the first QUEUED one
1297+ // in later snapshots.
1298+ final var old = completedAssignedQueuedShards .put (shardEntry .getKey (), new Tuple <>(snapshot (), status ));
1299+ assert old == null : old ;
12961300 assert isClone () == false
12971301 : "The state queued with generation should not be possible for a clone entry [" + this + "]" ;
12981302 final String reason = "assigned-queued aborted by snapshot deletion" ;
1299- // Assigned QUEUED transitions to ABORTED (incomplete) and is completed by a separate cluster state update
1300- status = new ShardSnapshotStatus (
1301- null , // use null nodeId to differentiate it from aborted INIT shard snapshot
1302- ShardState .ABORTED ,
1303- status .generation (),
1304- reason
1305- );
1306- // Accumulate the updates needed to complete the aborted assigned-queued shard snapshots
1307- abortedAssignedQueuedShardConsumer .accept (
1308- shardEntry .getKey (),
1309- new ShardSnapshotStatus (localNodeId , ShardState .FAILED , status .generation , reason )
1310- );
1303+ status = new ShardSnapshotStatus (nodeId , ShardState .FAILED , status .generation (), reason );
13111304 }
13121305 }
13131306 completed &= status .state ().completed ();
0 commit comments