Skip to content

Commit 82765b2

Browse files
committed
Change back to assigned queued
Generation can still be null
1 parent 8eb71ef commit 82765b2

File tree

4 files changed

+43
-33
lines changed

4 files changed

+43
-33
lines changed

server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java

Lines changed: 31 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -509,7 +509,7 @@ private static boolean assertShardStateConsistent(
509509
int shardId,
510510
ShardSnapshotStatus shardSnapshotStatus
511511
) {
512-
if (shardSnapshotStatus.isActiveOrQueuedWithGeneration()) {
512+
if (shardSnapshotStatus.isActiveOrAssignedQueued()) {
513513
Tuple<String, Integer> plainShardId = Tuple.tuple(indexName, shardId);
514514
assert assignedShards.add(plainShardId) : plainShardId + " is assigned twice in " + entries;
515515
assert queuedShards.contains(plainShardId) == false : plainShardId + " is queued then assigned in " + entries;
@@ -758,12 +758,13 @@ public static ShardSnapshotStatus success(String nodeId, ShardSnapshotResult sha
758758
}
759759

760760
@SuppressForbidden(reason = "using a private constructor within the same file")
761-
public static ShardSnapshotStatus queuedWithGeneration(ShardGeneration generation) {
762-
return new ShardSnapshotStatus(null, ShardState.QUEUED, generation, null, null);
761+
public static ShardSnapshotStatus assignedQueued(String nodeId, ShardGeneration generation) {
762+
return new ShardSnapshotStatus(nodeId, ShardState.QUEUED, generation, null, null);
763763
}
764764

765-
public boolean isQueuedWithGeneration() {
766-
return state == ShardState.QUEUED && generation != null && nodeId == null;
765+
public boolean isAssignedQueued() {
766+
// generation can still be null if previous shard snapshots all failed
767+
return state == ShardState.QUEUED && nodeId != null;
767768
}
768769

769770
public boolean isUnassignedQueued() {
@@ -790,7 +791,7 @@ private boolean assertConsistent() {
790791
assert state.failed() == false || reason != null;
791792
assert (state != ShardState.INIT && state != ShardState.WAITING && state != ShardState.PAUSED_FOR_NODE_REMOVAL)
792793
|| nodeId != null : "Null node id for state [" + state + "]";
793-
assert state != ShardState.QUEUED || (isUnassignedQueued() || isQueuedWithGeneration())
794+
assert state != ShardState.QUEUED || (isUnassignedQueued() || isAssignedQueued())
794795
: "Found unexpected shard state=["
795796
+ state
796797
+ "], nodeId=["
@@ -853,8 +854,8 @@ public boolean isActive() {
853854
};
854855
}
855856

856-
public boolean isActiveOrQueuedWithGeneration() {
857-
return isActive() || isQueuedWithGeneration();
857+
public boolean isActiveOrAssignedQueued() {
858+
return isActive() || isAssignedQueued();
858859
}
859860

860861
@Override
@@ -1230,30 +1231,38 @@ public Entry withClones(Map<RepositoryShardId, ShardSnapshotStatus> updatedClone
12301231
* @return aborted snapshot entry or {@code null} if entry can be removed from the cluster state directly
12311232
*/
12321233
@Nullable
1233-
public Entry abort(BiConsumer<ShardId, ShardSnapshotStatus> abortedQueuedWithGenerationShardConsumer) {
1234+
public Entry abort(BiConsumer<ShardId, ShardSnapshotStatus> abortedAssignedQueuedShardConsumer) {
12341235
final Map<ShardId, ShardSnapshotStatus> shardsBuilder = new HashMap<>();
12351236
boolean completed = true;
12361237
boolean allQueued = true;
12371238
for (Map.Entry<ShardId, ShardSnapshotStatus> shardEntry : shards.entrySet()) {
12381239
ShardSnapshotStatus status = shardEntry.getValue();
1239-
final var isQueuedWithGeneration = status.isQueuedWithGeneration();
1240-
allQueued &= (status.state() == ShardState.QUEUED && isQueuedWithGeneration == false);
1240+
final var isAssignedQueued = status.isAssignedQueued();
1241+
allQueued &= (status.state() == ShardState.QUEUED && isAssignedQueued == false);
12411242
if (status.state().completed() == false) {
12421243
final String nodeId = status.nodeId();
1243-
status = new ShardSnapshotStatus(
1244-
nodeId,
1245-
// QUEUED with generation transitioned to ABORTED (incomplete) and is completed by a separate cluster state update
1246-
(nodeId == null && isQueuedWithGeneration == false) ? ShardState.FAILED : ShardState.ABORTED,
1247-
status.generation(),
1248-
"aborted by snapshot deletion"
1249-
);
1250-
if (isQueuedWithGeneration) {
1251-
// Accumulate the updates needed to complete the aborted QUEUED with generation shard snapshots
1244+
if (isAssignedQueued == false) {
1245+
status = new ShardSnapshotStatus(
1246+
nodeId,
1247+
nodeId == null ? ShardState.FAILED : ShardState.ABORTED,
1248+
status.generation(),
1249+
"aborted by snapshot deletion"
1250+
);
1251+
} else {
12521252
assert isClone() == false
12531253
: "The state queued with generation should not be possible for a clone entry [" + this + "]";
1254-
abortedQueuedWithGenerationShardConsumer.accept(
1254+
final String reason = "assigned-queued aborted by snapshot deletion";
1255+
status = new ShardSnapshotStatus(
1256+
nodeId,
1257+
// Assigned QUEUED transitions to ABORTED (incomplete) and is completed by a separate cluster state update
1258+
ShardState.ABORTED,
1259+
status.generation(),
1260+
reason
1261+
);
1262+
// Accumulate the updates needed to complete the aborted QUEUED with generation shard snapshots
1263+
abortedAssignedQueuedShardConsumer.accept(
12551264
shardEntry.getKey(),
1256-
new ShardSnapshotStatus(null, ShardState.FAILED, status.generation, "aborted by snapshot deletion")
1265+
new ShardSnapshotStatus(null, ShardState.FAILED, status.generation, reason)
12571266
);
12581267
}
12591268
}

server/src/main/java/org/elasticsearch/snapshots/InFlightShardSnapshotStates.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ private static void addStateInformation(
6868
String indexName
6969
) {
7070
// Both active or assigned queued means the shard is meant to be the one with actions if node capacity allows it
71-
if (shardState.isActiveOrQueuedWithGeneration()) {
71+
if (shardState.isActiveOrAssignedQueued()) {
7272
busyIds.computeIfAbsent(indexName, k -> new HashSet<>()).add(shardId);
7373
assert assertGenerationConsistency(generations, indexName, shardId, shardState.generation());
7474
} else if (shardState.state() == SnapshotsInProgress.ShardState.SUCCESS) {

server/src/main/java/org/elasticsearch/snapshots/PerNodeShardSnapshotCounter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ public PerNodeShardSnapshotCounter(
4444
return;
4545
}
4646
for (var shardSnapshotStatus : entry.shards().values()) {
47+
// TODO: consider more states as active, e.g. Abort on data node?
4748
if (shardSnapshotStatus.state() == SnapshotsInProgress.ShardState.INIT) {
4849
perNodeCounts.computeIfPresent(shardSnapshotStatus.nodeId(), (nodeId, count) -> count + 1);
4950
}

server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1013,7 +1013,7 @@ private static boolean assertNoDanglingSnapshots(ClusterState state) {
10131013
if (value.equals(ShardSnapshotStatus.UNASSIGNED_QUEUED)) {
10141014
assert reposWithRunningDelete.contains(new ProjectRepo(entry.projectId(), entry.repository()))
10151015
: "Found shard snapshot waiting to be assigned in [" + entry + "] but it is not blocked by any running delete";
1016-
} else if (value.isActiveOrQueuedWithGeneration()) {
1016+
} else if (value.isActiveOrAssignedQueued()) {
10171017
assert reposWithRunningDelete.contains(new ProjectRepo(entry.projectId(), entry.repository())) == false
10181018
: "Found shard snapshot actively executing in ["
10191019
+ entry
@@ -2296,7 +2296,7 @@ public ClusterState execute(ClusterState currentState) {
22962296
// Snapshot ids that will have to be physically deleted from the repository
22972297
final Set<SnapshotId> snapshotIdsRequiringCleanup = new HashSet<>(snapshotIds);
22982298

2299-
final List<Runnable> completeAbortedQueuedWithGenerationRunnables = new ArrayList<>();
2299+
final List<Runnable> completeAbortedAssignedQueuedRunnables = new ArrayList<>();
23002300
final SnapshotsInProgress updatedSnapshots = snapshotsInProgress.createCopyWithUpdatedEntriesForRepo(
23012301
projectId,
23022302
repositoryName,
@@ -2305,7 +2305,7 @@ public ClusterState execute(ClusterState currentState) {
23052305
&& snapshotIdsRequiringCleanup.contains(existing.snapshot().getSnapshotId())) {
23062306
// snapshot is started - mark every non completed shard as aborted
23072307
final SnapshotsInProgress.Entry abortedEntry = existing.abort(
2308-
((shardId, shardSnapshotStatus) -> completeAbortedQueuedWithGenerationRunnables.add(
2308+
((shardId, shardSnapshotStatus) -> completeAbortedAssignedQueuedRunnables.add(
23092309
() -> innerUpdateSnapshotState(
23102310
existing.snapshot(),
23112311
shardId,
@@ -2332,7 +2332,7 @@ public ClusterState execute(ClusterState currentState) {
23322332
return existing;
23332333
}).filter(Objects::nonNull).toList()
23342334
);
2335-
for (var runnable : completeAbortedQueuedWithGenerationRunnables) {
2335+
for (var runnable : completeAbortedAssignedQueuedRunnables) {
23362336
runnable.run();
23372337
}
23382338
if (snapshotIdsRequiringCleanup.isEmpty()) {
@@ -2457,7 +2457,7 @@ private static boolean isWritingToRepositoryOrQueueWithGeneration(SnapshotsInPro
24572457
return true;
24582458
}
24592459
for (ShardSnapshotStatus value : entry.shardSnapshotStatusByRepoShardId().values()) {
2460-
if (value.isActiveOrQueuedWithGeneration()) {
2460+
if (value.isActiveOrAssignedQueued()) {
24612461
// Entry is writing to the repo because it's writing to a shard on a data node or waiting to do so for a concrete shard
24622462
return true;
24632463
}
@@ -3059,7 +3059,7 @@ private SnapshotsInProgress updatedSnapshotsInProgress(ClusterState currentState
30593059
: "Missing assignment for [" + sid + "]";
30603060
updatedAssignmentsBuilder.put(sid, ShardSnapshotStatus.MISSING);
30613061
} else {
3062-
if (updated.isActiveOrQueuedWithGeneration()) {
3062+
if (updated.isActiveOrAssignedQueued()) {
30633063
markShardReassigned(shardId, reassignedShardIds);
30643064
}
30653065
updatedAssignmentsBuilder.put(sid, updated);
@@ -3151,7 +3151,7 @@ private static void maybeStartQueuedWithGenerationShardSnapshots(
31513151
if (perNodeShardSnapshotCounter.hasCapacityOnAnyNode()) {
31523152
for (var shardId : shardsBuilder.keys()) {
31533153
final var existingShardSnapshotStatus = shardsBuilder.get(shardId);
3154-
if (existingShardSnapshotStatus.isQueuedWithGeneration() == false) {
3154+
if (existingShardSnapshotStatus.isAssignedQueued() == false) {
31553155
continue;
31563156
}
31573157
final IndexRoutingTable indexRouting = clusterState.routingTable(entry.projectId()).index(shardId.getIndex());
@@ -3336,7 +3336,7 @@ private static ShardSnapshotStatus initShardSnapshotStatus(
33363336
if (perNodeShardSnapshotCounter.tryStartShardSnapshotOnNode(primary.currentNodeId())) {
33373337
shardSnapshotStatus = new ShardSnapshotStatus(primary.currentNodeId(), shardRepoGeneration);
33383338
} else {
3339-
shardSnapshotStatus = ShardSnapshotStatus.queuedWithGeneration(shardRepoGeneration);
3339+
shardSnapshotStatus = ShardSnapshotStatus.assignedQueued(primary.currentNodeId(), shardRepoGeneration);
33403340
}
33413341
}
33423342
return shardSnapshotStatus;
@@ -3821,7 +3821,7 @@ private <T> void applyShardSnapshotUpdate(
38213821
}
38223822

38233823
if (shardSnapshotStatusUpdate.isClone() == false
3824-
&& existing.state() == ShardState.INIT
3824+
&& existing.state() == ShardState.INIT // TODO: this is not right, should consider aborted ones on data nodes as well
38253825
&& updatedShardSnapshotStatus.state() != ShardState.INIT) {
38263826
perNodeShardSnapshotCounter.completeShardSnapshotOnNode(updatedShardSnapshotStatus.nodeId());
38273827
}
@@ -3906,7 +3906,7 @@ private void startShardSnapshot(RepositoryShardId repoShardId, ShardGeneration g
39063906
if (shardSnapshotStatus.isActive()) {
39073907
startShardOperation(shardsBuilder(), routingShardId, shardSnapshotStatus);
39083908
} else {
3909-
if (shardSnapshotStatus.isQueuedWithGeneration()) {
3909+
if (shardSnapshotStatus.isAssignedQueued()) {
39103910
updatesIterator.remove();
39113911
}
39123912
// update to queued snapshot did not result in an actual update execution so we just record it but keep applying

0 commit comments

Comments
 (0)