Skip to content

Commit afa6679

Browse files
committed
Tighten up readonly invariants on repositories
Today there are various mechanisms to prevent writes to readonly repositories, but they are scattered across the snapshot codebase and do not obviously prevent writes in all possible circumstances; it'd be easy to add a new operation on a repository that does not check the readonly flag in quite the right way. This commit adds much tighter checks which cannot be circumvented: - Do not allow to start an update of the root `index-N` blob if the repository is marked as readonly in the cluster state. - Conversely, do not allow the readonly flag to be set if an update of the root `index-N` blob is in progress. - Establish the invariant that we never create a `SnapshotsInProgress$Entry`, `SnapshotDeletionsInProgress$Entry`, or `RepositoryCleanupInProgress$Entry` if the repository is marked as readonly in the cluster state.
1 parent 7872e84 commit afa6679

File tree

6 files changed

+267
-4
lines changed

6 files changed

+267
-4
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ private void cleanupRepo(String repositoryName, ActionListener<RepositoryCleanup
171171
@Override
172172
public ClusterState execute(ClusterState currentState) {
173173
SnapshotsService.ensureRepositoryExists(repositoryName, currentState);
174+
SnapshotsService.ensureNotReadOnly(currentState, repositoryName);
174175
final RepositoryCleanupInProgress repositoryCleanupInProgress = RepositoryCleanupInProgress.get(currentState);
175176
if (repositoryCleanupInProgress.hasCleanupInProgress()) {
176177
throw new IllegalStateException(

server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.elasticsearch.index.Index;
5151
import org.elasticsearch.index.IndexVersion;
5252
import org.elasticsearch.repositories.VerifyNodeRepositoryAction.Request;
53+
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
5354
import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository;
5455
import org.elasticsearch.snapshots.Snapshot;
5556
import org.elasticsearch.threadpool.ThreadPool;
@@ -289,6 +290,7 @@ public ClusterState execute(ClusterState currentState) {
289290
List<RepositoryMetadata> repositoriesMetadata = new ArrayList<>(repositories.repositories().size() + 1);
290291
for (RepositoryMetadata repositoryMetadata : repositories.repositories()) {
291292
if (repositoryMetadata.name().equals(request.name())) {
293+
rejectInvalidReadonlyFlagChange(repositoryMetadata, request.settings());
292294
final RepositoryMetadata newRepositoryMetadata = new RepositoryMetadata(
293295
request.name(),
294296
// Copy the UUID from the existing instance rather than resetting it back to MISSING_UUID which would force us to
@@ -601,6 +603,7 @@ public static boolean isDedicatedVotingOnlyNode(Set<DiscoveryNodeRole> roles) {
601603
public void applyClusterState(ClusterChangedEvent event) {
602604
try {
603605
final ClusterState state = event.state();
606+
assert assertReadonlyRepositoriesNotInUseForWrites(state);
604607
final RepositoriesMetadata oldMetadata = RepositoriesMetadata.get(event.previousState());
605608
final RepositoriesMetadata newMetadata = RepositoriesMetadata.get(state);
606609

@@ -885,7 +888,7 @@ public static void validateRepositoryName(final String repositoryName) {
885888
}
886889
}
887890

888-
private static void ensureRepositoryNotInUse(ClusterState clusterState, String repository) {
891+
private static void ensureRepositoryNotInUseForWrites(ClusterState clusterState, String repository) {
889892
if (SnapshotsInProgress.get(clusterState).forRepo(repository).isEmpty() == false) {
890893
throw newRepositoryConflictException(repository, "snapshot is in progress");
891894
}
@@ -899,13 +902,57 @@ private static void ensureRepositoryNotInUse(ClusterState clusterState, String r
899902
throw newRepositoryConflictException(repository, "repository clean up is in progress");
900903
}
901904
}
905+
}
906+
907+
private static void ensureRepositoryNotInUse(ClusterState clusterState, String repository) {
908+
ensureRepositoryNotInUseForWrites(clusterState, repository);
902909
for (RestoreInProgress.Entry entry : RestoreInProgress.get(clusterState)) {
903910
if (repository.equals(entry.snapshot().getRepository())) {
904911
throw newRepositoryConflictException(repository, "snapshot restore is in progress");
905912
}
906913
}
907914
}
908915

916+
public static boolean isReadOnly(Settings repositorySettings) {
917+
return Boolean.TRUE.equals(repositorySettings.getAsBoolean(BlobStoreRepository.READONLY_SETTING_KEY, null));
918+
}
919+
920+
/**
921+
* Test-only check for the invariant that read-only repositories never have any write activities.
922+
*/
923+
private static boolean assertReadonlyRepositoriesNotInUseForWrites(ClusterState clusterState) {
924+
for (final var repositoryMetadata : RepositoriesMetadata.get(clusterState).repositories()) {
925+
if (isReadOnly(repositoryMetadata.settings())) {
926+
try {
927+
ensureRepositoryNotInUseForWrites(clusterState, repositoryMetadata.name());
928+
} catch (Exception e) {
929+
throw new AssertionError("repository [" + repositoryMetadata + "] is readonly but still in use", e);
930+
}
931+
}
932+
}
933+
return true;
934+
}
935+
936+
/**
937+
* Reject a change to the {@code readonly} setting if there is a pending generation change in progress, i.e. some node somewhere is
938+
* updating the root {@link RepositoryData} blob.
939+
*/
940+
private static void rejectInvalidReadonlyFlagChange(RepositoryMetadata existingRepositoryMetadata, Settings newSettings) {
941+
if (isReadOnly(newSettings)
942+
&& isReadOnly(existingRepositoryMetadata.settings()) == false
943+
&& existingRepositoryMetadata.generation() >= RepositoryData.EMPTY_REPO_GEN
944+
&& existingRepositoryMetadata.generation() != existingRepositoryMetadata.pendingGeneration()) {
945+
throw newRepositoryConflictException(
946+
existingRepositoryMetadata.name(),
947+
Strings.format(
948+
"currently updating root blob generation from [%d] to [%d], cannot update readonly flag",
949+
existingRepositoryMetadata.generation(),
950+
existingRepositoryMetadata.pendingGeneration()
951+
)
952+
);
953+
}
954+
}
955+
909956
private static void ensureNoSearchableSnapshotsIndicesInUse(ClusterState clusterState, RepositoryMetadata repositoryMetadata) {
910957
long count = 0L;
911958
List<Index> indices = null;

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2737,6 +2737,14 @@ protected void writeIndexGen(
27372737
public ClusterState execute(ClusterState currentState) {
27382738
final RepositoryMetadata meta = getRepoMetadata(currentState);
27392739
final String repoName = metadata.name();
2740+
2741+
if (RepositoriesService.isReadOnly(meta.settings())) {
2742+
// Last resort check: we shouldn't have been able to mark the repository as readonly while the operation that led to
2743+
// this writeIndexGen() call was in progress, and conversely shouldn't have started any such operation if the repo
2744+
// was already readonly, but these invariants are not obviously true and it is disastrous to proceed here.
2745+
throw new RepositoryException(meta.name(), "repository is readonly, cannot update root blob");
2746+
}
2747+
27402748
final long genInState = meta.generation();
27412749
final boolean uninitializedMeta = meta.generation() == RepositoryData.UNKNOWN_REPO_GEN || bestEffortConsistency;
27422750
if (uninitializedMeta == false && meta.pendingGeneration() != genInState) {

server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,7 @@ public ClusterState execute(ClusterState currentState) {
338338
ensureRepositoryExists(repositoryName, currentState);
339339
ensureSnapshotNameAvailableInRepo(repositoryData, snapshotName, repository);
340340
ensureNoCleanupInProgress(currentState, repositoryName, snapshotName, "clone snapshot");
341+
ensureNotReadOnly(currentState, repositoryName);
341342
final SnapshotsInProgress snapshots = SnapshotsInProgress.get(currentState);
342343
ensureSnapshotNameNotRunning(snapshots, repositoryName, snapshotName);
343344
validate(repositoryName, snapshotName, currentState);
@@ -433,6 +434,13 @@ private static void ensureNoCleanupInProgress(
433434
}
434435
}
435436

437+
public static void ensureNotReadOnly(final ClusterState currentState, final String repositoryName) {
438+
final var repositoryMetadata = RepositoriesMetadata.get(currentState).repository(repositoryName);
439+
if (RepositoriesService.isReadOnly(repositoryMetadata.settings())) {
440+
throw new RepositoryException(repositoryMetadata.name(), "repository is readonly");
441+
}
442+
}
443+
436444
private static void ensureSnapshotNameAvailableInRepo(RepositoryData repositoryData, String snapshotName, Repository repository) {
437445
// check if the snapshot name already exists in the repository
438446
if (repositoryData.getSnapshotIds().stream().anyMatch(s -> s.getName().equals(snapshotName))) {
@@ -2166,6 +2174,8 @@ public ClusterState execute(ClusterState currentState) {
21662174
"delete snapshot"
21672175
);
21682176

2177+
ensureNotReadOnly(currentState, repositoryName);
2178+
21692179
final SnapshotDeletionsInProgress deletionsInProgress = SnapshotDeletionsInProgress.get(currentState);
21702180

21712181
final RestoreInProgress restoreInProgress = RestoreInProgress.get(currentState);
@@ -4075,11 +4085,16 @@ public ClusterState execute(BatchExecutionContext<SnapshotTask> batchExecutionCo
40754085
for (final var taskContext : batchExecutionContext.taskContexts()) {
40764086
if (taskContext.getTask() instanceof CreateSnapshotTask task) {
40774087
try {
4088+
final var repoMeta = RepositoriesMetadata.get(state).repository(task.snapshot.getRepository());
4089+
if (RepositoriesService.isReadOnly(repoMeta.settings())) {
4090+
taskContext.onFailure(new RepositoryException(repoMeta.name(), "repository is readonly"));
4091+
continue;
4092+
}
4093+
40784094
registeredPolicySnapshots.addIfSnapshotIsSLMInitiated(
40794095
task.createSnapshotRequest.userMetadata(),
40804096
task.snapshot.getSnapshotId()
40814097
);
4082-
final var repoMeta = RepositoriesMetadata.get(state).repository(task.snapshot.getRepository());
40834098
if (Objects.equals(task.initialRepositoryMetadata, repoMeta)) {
40844099
snapshotsInProgress = createSnapshot(task, taskContext, state, snapshotsInProgress);
40854100
} else {

server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java

Lines changed: 105 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,14 @@
1212
import org.elasticsearch.action.ActionListener;
1313
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
1414
import org.elasticsearch.action.support.ActionFilters;
15+
import org.elasticsearch.action.support.ActionTestUtils;
1516
import org.elasticsearch.action.support.SubscribableListener;
1617
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1718
import org.elasticsearch.client.internal.node.NodeClient;
1819
import org.elasticsearch.cluster.ClusterChangedEvent;
1920
import org.elasticsearch.cluster.ClusterName;
2021
import org.elasticsearch.cluster.ClusterState;
22+
import org.elasticsearch.cluster.ClusterStateUpdateTask;
2123
import org.elasticsearch.cluster.metadata.IndexMetadata;
2224
import org.elasticsearch.cluster.metadata.Metadata;
2325
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
@@ -26,6 +28,7 @@
2628
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
2729
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
2830
import org.elasticsearch.cluster.service.ClusterService;
31+
import org.elasticsearch.common.Strings;
2932
import org.elasticsearch.common.UUIDs;
3033
import org.elasticsearch.common.blobstore.BlobPath;
3134
import org.elasticsearch.common.blobstore.BlobStore;
@@ -42,6 +45,7 @@
4245
import org.elasticsearch.index.store.Store;
4346
import org.elasticsearch.indices.recovery.RecoverySettings;
4447
import org.elasticsearch.indices.recovery.RecoveryState;
48+
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
4549
import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository;
4650
import org.elasticsearch.snapshots.SnapshotId;
4751
import org.elasticsearch.snapshots.SnapshotInfo;
@@ -377,6 +381,103 @@ public void testRegisterRepositorySuccessAfterCreationFailed() {
377381
assertThat(repositoriesService.repository(repoName), isA(TestRepository.class));
378382
}
379383

384+
public void testCannotSetRepositoryReadonlyFlagDuringGenerationChange() {
385+
final var repoName = randomAlphaOfLengthBetween(10, 25);
386+
final long originalGeneration = randomFrom(RepositoryData.EMPTY_REPO_GEN, 0L, 1L, randomLongBetween(2, Long.MAX_VALUE - 1));
387+
final long newGeneration = originalGeneration + 1;
388+
389+
safeAwait(
390+
SubscribableListener
391+
392+
.newForked(
393+
l -> repositoriesService.registerRepository(
394+
new PutRepositoryRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, repoName).type(TestRepository.TYPE),
395+
l.map(ignored -> null)
396+
)
397+
)
398+
.andThen(l -> updateGenerations(repoName, originalGeneration, newGeneration, l))
399+
.andThenAccept(ignored -> {
400+
final var metadata = repositoriesService.repository(repoName).getMetadata();
401+
assertEquals(originalGeneration, metadata.generation());
402+
assertEquals(newGeneration, metadata.pendingGeneration());
403+
assertNull(metadata.settings().getAsBoolean(BlobStoreRepository.READONLY_SETTING_KEY, null));
404+
})
405+
.andThen(
406+
l -> repositoriesService.registerRepository(
407+
new PutRepositoryRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, repoName).type(TestRepository.TYPE)
408+
.settings(Settings.builder().put(BlobStoreRepository.READONLY_SETTING_KEY, true)),
409+
ActionTestUtils.assertNoSuccessListener(e -> {
410+
assertEquals(
411+
Strings.format(
412+
"""
413+
[%s] trying to modify or unregister repository that is currently used \
414+
(currently updating root blob generation from [%d] to [%d], cannot update readonly flag)""",
415+
repoName,
416+
originalGeneration,
417+
newGeneration
418+
),
419+
asInstanceOf(RepositoryConflictException.class, e).getMessage()
420+
);
421+
l.onResponse(null);
422+
})
423+
)
424+
)
425+
.andThenAccept(ignored -> {
426+
final var metadata = repositoriesService.repository(repoName).getMetadata();
427+
assertEquals(originalGeneration, metadata.generation());
428+
assertEquals(newGeneration, metadata.pendingGeneration());
429+
assertNull(metadata.settings().getAsBoolean(BlobStoreRepository.READONLY_SETTING_KEY, null));
430+
})
431+
.andThen(l -> updateGenerations(repoName, newGeneration, newGeneration, l))
432+
.andThenAccept(ignored -> {
433+
final var metadata = repositoriesService.repository(repoName).getMetadata();
434+
assertEquals(newGeneration, metadata.generation());
435+
assertEquals(newGeneration, metadata.pendingGeneration());
436+
assertNull(metadata.settings().getAsBoolean(BlobStoreRepository.READONLY_SETTING_KEY, null));
437+
})
438+
.andThen(
439+
l -> repositoriesService.registerRepository(
440+
new PutRepositoryRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, repoName).type(TestRepository.TYPE)
441+
.settings(Settings.builder().put(BlobStoreRepository.READONLY_SETTING_KEY, true)),
442+
l.map(ignored -> null)
443+
)
444+
)
445+
.andThenAccept(
446+
ignored -> assertTrue(
447+
repositoriesService.repository(repoName)
448+
.getMetadata()
449+
.settings()
450+
.getAsBoolean(BlobStoreRepository.READONLY_SETTING_KEY, null)
451+
)
452+
)
453+
);
454+
}
455+
456+
private void updateGenerations(String repositoryName, long safeGeneration, long pendingGeneration, ActionListener<?> listener) {
457+
clusterService.submitUnbatchedStateUpdateTask("update repo generations", new ClusterStateUpdateTask() {
458+
@Override
459+
public ClusterState execute(ClusterState currentState) {
460+
return new ClusterState.Builder(currentState).metadata(
461+
Metadata.builder(currentState.metadata())
462+
.putCustom(
463+
RepositoriesMetadata.TYPE,
464+
RepositoriesMetadata.get(currentState).withUpdatedGeneration(repositoryName, safeGeneration, pendingGeneration)
465+
)
466+
).build();
467+
}
468+
469+
@Override
470+
public void onFailure(Exception e) {
471+
listener.onFailure(e);
472+
}
473+
474+
@Override
475+
public void clusterStateProcessed(ClusterState initialState, ClusterState newState) {
476+
listener.onResponse(null);
477+
}
478+
});
479+
}
480+
380481
private ClusterState createClusterStateWithRepo(String repoName, String repoType) {
381482
ClusterState.Builder state = ClusterState.builder(new ClusterName("test"));
382483
Metadata.Builder mdBuilder = Metadata.builder();
@@ -406,7 +507,7 @@ private void assertThrowsOnRegister(String repoName) {
406507
private static class TestRepository implements Repository {
407508

408509
private static final String TYPE = "internal";
409-
private final RepositoryMetadata metadata;
510+
private RepositoryMetadata metadata;
410511
private boolean isClosed;
411512
private boolean isStarted;
412513

@@ -514,7 +615,9 @@ public IndexShardSnapshotStatus.Copy getShardSnapshotStatus(SnapshotId snapshotI
514615
}
515616

516617
@Override
517-
public void updateState(final ClusterState state) {}
618+
public void updateState(final ClusterState state) {
619+
metadata = RepositoriesMetadata.get(state).repository(metadata.name());
620+
}
518621

519622
@Override
520623
public void cloneShardSnapshot(

0 commit comments

Comments
 (0)