Skip to content

Commit f76b13a

Browse files
committed
Fix deletions
* Treat queued with gen as started that blocks deletion to run * Aborted queued with gen is failed with a separate cluster state update to simulate how abort for init works.
1 parent a74f94b commit f76b13a

File tree

2 files changed

+38
-7
lines changed

2 files changed

+38
-7
lines changed

server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import java.util.Map;
5959
import java.util.Objects;
6060
import java.util.Set;
61+
import java.util.function.BiConsumer;
6162
import java.util.stream.Stream;
6263

6364
import static org.elasticsearch.repositories.ProjectRepo.PROJECT_REPO_SERIALIZER;
@@ -1229,7 +1230,7 @@ public Entry withClones(Map<RepositoryShardId, ShardSnapshotStatus> updatedClone
12291230
* @return aborted snapshot entry or {@code null} if entry can be removed from the cluster state directly
12301231
*/
12311232
@Nullable
1232-
public Entry abort() {
1233+
public Entry abort(BiConsumer<ShardId, ShardSnapshotStatus> abortedQueuedWithGenerationShardConsumer) {
12331234
final Map<ShardId, ShardSnapshotStatus> shardsBuilder = new HashMap<>();
12341235
boolean completed = true;
12351236
boolean allQueued = true;
@@ -1238,12 +1239,23 @@ public Entry abort() {
12381239
allQueued &= status.state() == ShardState.QUEUED;
12391240
if (status.state().completed() == false) {
12401241
final String nodeId = status.nodeId();
1242+
final var isQueuedWithGeneration = status.isQueuedWithGeneration();
12411243
status = new ShardSnapshotStatus(
12421244
nodeId,
1243-
nodeId == null ? ShardState.FAILED : ShardState.ABORTED, // TODO: Aborted if assigned queued?
1245+
// QUEUED with generation transitioned to ABORTED (incomplete) and is completed by a separate cluster state update
1246+
(nodeId == null && isQueuedWithGeneration == false) ? ShardState.FAILED : ShardState.ABORTED,
12441247
status.generation(),
12451248
"aborted by snapshot deletion"
12461249
);
1250+
if (isQueuedWithGeneration) {
1251+
// Accumulate the updates needed to complete the aborted QUEUED with generation shard snapshots
1252+
assert isClone() == false
1253+
: "The state queued with generation should not be possible for a clone entry [" + this + "]";
1254+
abortedQueuedWithGenerationShardConsumer.accept(
1255+
shardEntry.getKey(),
1256+
new ShardSnapshotStatus(null, ShardState.FAILED, status.generation, "aborted by snapshot deletion")
1257+
);
1258+
}
12471259
}
12481260
completed &= status.state().completed();
12491261
shardsBuilder.put(shardEntry.getKey(), status);

server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1874,7 +1874,9 @@ private static Tuple<ClusterState, List<SnapshotDeletionsInProgress.Entry>> read
18741874
final var projectRepo = new ProjectRepo(entry.projectId(), entry.repository());
18751875
if (repositoriesSeen.add(projectRepo)
18761876
&& entry.state() == SnapshotDeletionsInProgress.State.WAITING
1877-
&& snapshotsInProgress.forRepo(projectRepo).stream().noneMatch(SnapshotsService::isWritingToRepository)) {
1877+
&& snapshotsInProgress.forRepo(projectRepo)
1878+
.stream()
1879+
.noneMatch(SnapshotsService::isWritingToRepositoryOrQueueWithGeneration)) {
18781880
changed = true;
18791881
final SnapshotDeletionsInProgress.Entry newEntry = entry.started();
18801882
readyDeletions.add(newEntry);
@@ -2293,14 +2295,26 @@ public ClusterState execute(ClusterState currentState) {
22932295
}
22942296
// Snapshot ids that will have to be physically deleted from the repository
22952297
final Set<SnapshotId> snapshotIdsRequiringCleanup = new HashSet<>(snapshotIds);
2298+
2299+
final List<Runnable> completeAbortedQueuedWithGenerationRunnables = new ArrayList<>();
22962300
final SnapshotsInProgress updatedSnapshots = snapshotsInProgress.createCopyWithUpdatedEntriesForRepo(
22972301
projectId,
22982302
repositoryName,
22992303
snapshotsInProgress.forRepo(projectId, repositoryName).stream().map(existing -> {
23002304
if (existing.state() == SnapshotsInProgress.State.STARTED
23012305
&& snapshotIdsRequiringCleanup.contains(existing.snapshot().getSnapshotId())) {
23022306
// snapshot is started - mark every non completed shard as aborted
2303-
final SnapshotsInProgress.Entry abortedEntry = existing.abort();
2307+
final SnapshotsInProgress.Entry abortedEntry = existing.abort(
2308+
((shardId, shardSnapshotStatus) -> completeAbortedQueuedWithGenerationRunnables.add(
2309+
() -> innerUpdateSnapshotState(
2310+
existing.snapshot(),
2311+
shardId,
2312+
null,
2313+
shardSnapshotStatus,
2314+
ActionListener.noop()
2315+
)
2316+
))
2317+
);
23042318
if (abortedEntry == null) {
23052319
// No work has been done for this snapshot yet so we remove it from the cluster state directly
23062320
final Snapshot existingNotYetStartedSnapshot = existing.snapshot();
@@ -2318,6 +2332,9 @@ public ClusterState execute(ClusterState currentState) {
23182332
return existing;
23192333
}).filter(Objects::nonNull).toList()
23202334
);
2335+
for (var runnable : completeAbortedQueuedWithGenerationRunnables) {
2336+
runnable.run();
2337+
}
23212338
if (snapshotIdsRequiringCleanup.isEmpty()) {
23222339
// We only saw snapshots that could be removed from the cluster state right away, no need to update the deletions
23232340
return updateWithSnapshots(currentState, updatedSnapshots, null);
@@ -2350,7 +2367,9 @@ public ClusterState execute(ClusterState currentState) {
23502367
List.copyOf(snapshotIdsRequiringCleanup),
23512368
threadPool.absoluteTimeInMillis(),
23522369
repositoryData.getGenId(),
2353-
updatedSnapshots.forRepo(projectId, repositoryName).stream().noneMatch(SnapshotsService::isWritingToRepository)
2370+
updatedSnapshots.forRepo(projectId, repositoryName)
2371+
.stream()
2372+
.noneMatch(SnapshotsService::isWritingToRepositoryOrQueueWithGeneration)
23542373
&& deletionsInProgress.hasExecutingDeletion(projectId, repositoryName) == false
23552374
? SnapshotDeletionsInProgress.State.STARTED
23562375
: SnapshotDeletionsInProgress.State.WAITING
@@ -2432,13 +2451,13 @@ public String toString() {
24322451
* @param entry snapshot entry
24332452
* @return true if entry is currently writing to the repository
24342453
*/
2435-
private static boolean isWritingToRepository(SnapshotsInProgress.Entry entry) {
2454+
private static boolean isWritingToRepositoryOrQueueWithGeneration(SnapshotsInProgress.Entry entry) {
24362455
if (entry.state().completed()) {
24372456
// Entry is writing to the repo because it's finalizing on master
24382457
return true;
24392458
}
24402459
for (ShardSnapshotStatus value : entry.shardSnapshotStatusByRepoShardId().values()) {
2441-
if (value.isActive()) {
2460+
if (value.isActiveOrQueuedWithGeneration()) {
24422461
// Entry is writing to the repo because it's writing to a shard on a data node or waiting to do so for a concrete shard
24432462
return true;
24442463
}

0 commit comments

Comments
 (0)