Skip to content

Commit 54459eb

Browse files
Fix Concurrent Snapshot Repository Remove Issues (#74880) (#75122)
We need to fail repository operations as soon as a repository has been shut down to avoid getting into broken cluster state situations where operations are created in the cluster state but the repository they target was removed concurrently. closes #74858
1 parent 5b5dca2 commit 54459eb

File tree

4 files changed

+78
-6
lines changed

4 files changed

+78
-6
lines changed

server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.elasticsearch.indices.recovery.RecoveryState;
4747
import org.elasticsearch.node.Node;
4848
import org.elasticsearch.plugins.Plugin;
49+
import org.elasticsearch.repositories.RepositoryException;
4950
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
5051
import org.elasticsearch.rest.AbstractRestChannel;
5152
import org.elasticsearch.rest.RestRequest;
@@ -89,6 +90,7 @@
8990
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFutureThrows;
9091
import static org.hamcrest.Matchers.allOf;
9192
import static org.hamcrest.Matchers.containsString;
93+
import static org.hamcrest.Matchers.either;
9294
import static org.hamcrest.Matchers.empty;
9395
import static org.hamcrest.Matchers.equalTo;
9496
import static org.hamcrest.Matchers.greaterThan;
@@ -1115,6 +1117,45 @@ public void testGetReposWithWildcard() {
11151117
assertThat(repositoryMetadata, empty());
11161118
}
11171119

1120+
public void testConcurrentSnapshotAndRepoDelete() throws Exception {
1121+
internalCluster().startMasterOnlyNodes(1);
1122+
internalCluster().startDataOnlyNode();
1123+
final String repoName = "test-repo";
1124+
createRepository(repoName, "fs");
1125+
1126+
// create a few snapshots so deletes will run for a while
1127+
final int snapshotCount = randomIntBetween(10, 25);
1128+
final List<String> snapshotNames = createNSnapshots(repoName, snapshotCount);
1129+
1130+
// concurrently trigger repository and snapshot deletes
1131+
final List<ActionFuture<AcknowledgedResponse>> deleteFutures = new ArrayList<>(snapshotCount);
1132+
final ActionFuture<AcknowledgedResponse> deleteRepoFuture = clusterAdmin().prepareDeleteRepository(repoName).execute();
1133+
for (String snapshotName : snapshotNames) {
1134+
deleteFutures.add(clusterAdmin().prepareDeleteSnapshot(repoName, snapshotName).execute());
1135+
}
1136+
1137+
try {
1138+
assertAcked(deleteRepoFuture.actionGet());
1139+
} catch (Exception e) {
1140+
assertThat(
1141+
e.getMessage(),
1142+
containsString(
1143+
"trying to modify or unregister repository [test-repo] that is currently used (snapshot deletion is in progress)"
1144+
)
1145+
);
1146+
}
1147+
for (ActionFuture<AcknowledgedResponse> deleteFuture : deleteFutures) {
1148+
try {
1149+
assertAcked(deleteFuture.actionGet());
1150+
} catch (RepositoryException e) {
1151+
assertThat(
1152+
e.getMessage(),
1153+
either(containsString("[test-repo] repository is not in started state")).or(containsString("[test-repo] missing"))
1154+
);
1155+
}
1156+
}
1157+
}
1158+
11181159
private long calculateTotalFilesSize(List<Path> files) {
11191160
return files.stream().mapToLong(f -> {
11201161
try {

server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,8 +161,11 @@ private void cleanupRepo(String repositoryName, ActionListener<RepositoryCleanup
161161

162162
@Override
163163
public ClusterState execute(ClusterState currentState) {
164-
final RepositoryCleanupInProgress repositoryCleanupInProgress =
165-
currentState.custom(RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY);
164+
SnapshotsService.ensureRepositoryExists(repositoryName, currentState);
165+
final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(
166+
RepositoryCleanupInProgress.TYPE,
167+
RepositoryCleanupInProgress.EMPTY
168+
);
166169
if (repositoryCleanupInProgress.hasCleanupInProgress()) {
167170
throw new IllegalStateException(
168171
"Cannot cleanup [" + repositoryName + "] - a repository cleanup is already in-progress in ["

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -556,6 +556,10 @@ protected BlobStore getBlobStore() {
556556
protected BlobContainer blobContainer() {
557557
assertSnapshotOrGenericThread();
558558

559+
if (lifecycle.started() == false) {
560+
throw notStartedException();
561+
}
562+
559563
BlobContainer blobContainer = this.blobContainer.get();
560564
if (blobContainer == null) {
561565
synchronized (lock) {
@@ -583,7 +587,7 @@ public BlobStore blobStore() {
583587
store = blobStore.get();
584588
if (store == null) {
585589
if (lifecycle.started() == false) {
586-
throw new RepositoryException(metadata.name(), "repository is not in started state");
590+
throw notStartedException();
587591
}
588592
try {
589593
store = createBlobStore();
@@ -1365,6 +1369,11 @@ public void getRepositoryData(ActionListener<RepositoryData> listener) {
13651369
// master-eligible or not.
13661370
assert clusterService.localNode().isMasterNode() : "should only load repository data on master nodes";
13671371

1372+
if (lifecycle.started() == false) {
1373+
listener.onFailure(notStartedException());
1374+
return;
1375+
}
1376+
13681377
if (latestKnownRepoGen.get() == RepositoryData.CORRUPTED_REPO_GEN) {
13691378
listener.onFailure(corruptedStateException(null, null));
13701379
return;
@@ -1396,6 +1405,10 @@ public void getRepositoryData(ActionListener<RepositoryData> listener) {
13961405
}
13971406
}
13981407

1408+
private RepositoryException notStartedException() {
1409+
return new RepositoryException(metadata.name(), "repository is not in started state");
1410+
}
1411+
13991412
// Listener used to ensure that repository data is only initialized once in the cluster state by #initializeRepoGenerationTracking
14001413
private ListenableActionFuture<RepositoryData> repoDataInitialized;
14011414

@@ -2042,9 +2055,11 @@ private ClusterState updateRepositoryGenerationsIfNecessary(ClusterState state,
20422055
}
20432056

20442057
private RepositoryMetadata getRepoMetadata(ClusterState state) {
2045-
final RepositoryMetadata repositoryMetadata =
2046-
state.getMetadata().<RepositoriesMetadata>custom(RepositoriesMetadata.TYPE).repository(metadata.name());
2047-
assert repositoryMetadata != null;
2058+
final RepositoryMetadata repositoryMetadata = state.getMetadata()
2059+
.<RepositoriesMetadata>custom(RepositoriesMetadata.TYPE)
2060+
.repository(metadata.name());
2061+
assert repositoryMetadata != null || lifecycle.stoppedOrClosed()
2062+
: "did not find metadata for repo [" + metadata.name() + "] in state [" + lifecycleState() + "]";
20482063
return repositoryMetadata;
20492064
}
20502065

server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,7 @@ public void createSnapshotLegacy(final CreateSnapshotRequest request, final Acti
259259
@Override
260260
public ClusterState execute(ClusterState currentState) {
261261
validate(repositoryName, snapshotName, currentState);
262+
ensureRepositoryExists(repositoryName, currentState);
262263
SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
263264
if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
264265
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName,
@@ -413,6 +414,7 @@ public void createSnapshot(final CreateSnapshotRequest request, final ActionList
413414

414415
@Override
415416
public ClusterState execute(ClusterState currentState) {
417+
ensureRepositoryExists(repositoryName, currentState);
416418
ensureSnapshotNameAvailableInRepo(repositoryData, snapshotName, repository);
417419
final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
418420
final List<SnapshotsInProgress.Entry> runningSnapshots = snapshots.entries();
@@ -557,6 +559,7 @@ public void cloneSnapshot(CloneSnapshotRequest request, ActionListener<Void> lis
557559

558560
@Override
559561
public ClusterState execute(ClusterState currentState) {
562+
ensureRepositoryExists(repositoryName, currentState);
560563
ensureSnapshotNameAvailableInRepo(repositoryData, snapshotName, repository);
561564
ensureNoCleanupInProgress(currentState, repositoryName, snapshotName);
562565
final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
@@ -808,6 +811,15 @@ private void ensureBelowConcurrencyLimit(String repository, String name, Snapsho
808811
}
809812
}
810813

814+
/**
815+
* Throws {@link RepositoryMissingException} if no repository by the given name is found in the given cluster state.
816+
*/
817+
public static void ensureRepositoryExists(String repoName, ClusterState state) {
818+
if (state.metadata().custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY).repository(repoName) == null) {
819+
throw new RepositoryMissingException(repoName);
820+
}
821+
}
822+
811823
/**
812824
* Validates snapshot request
813825
*
@@ -2038,6 +2050,7 @@ public ClusterState execute(ClusterState currentState) throws Exception {
20382050
+ MULTI_DELETE_VERSION + "] but cluster contained node of version [" + currentState.nodes().getMinNodeVersion()
20392051
+ "]");
20402052
}
2053+
ensureRepositoryExists(repoName, currentState);
20412054
final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
20422055
final List<SnapshotsInProgress.Entry> snapshotEntries = findInProgressSnapshots(snapshots, snapshotNames, repoName);
20432056
final List<SnapshotId> snapshotIds = matchingSnapshotIds(

0 commit comments

Comments
 (0)