From dc2451fa3e9d5fc57abd93f33bfb11e272598b26 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Wed, 18 Jun 2025 11:39:19 +1000 Subject: [PATCH 1/9] wip --- .../repositories/RepositoriesServiceIT.java | 5 +- .../TransportDeleteRepositoryAction.java | 10 +- .../get/TransportGetRepositoriesAction.java | 19 +- .../put/TransportPutRepositoryAction.java | 10 +- .../ReservedRepositoryAction.java | 20 +- .../TransportVerifyRepositoryAction.java | 9 +- .../get/TransportGetSnapshotsAction.java | 2 +- .../TransportSnapshotsStatusAction.java | 8 +- .../cluster/ClusterChangedEvent.java | 40 +- .../tracker/RepositoriesHealthTracker.java | 2 +- .../elasticsearch/index/shard/IndexShard.java | 8 +- .../repositories/IndexSnapshotsService.java | 8 +- .../repositories/RepositoriesService.java | 505 +++++++++++------- .../repositories/RepositoryOperation.java | 15 + .../repositories/ResolvedRepositories.java | 8 +- .../blobstore/BlobStoreRepository.java | 4 +- .../snapshots/RestoreService.java | 4 +- .../snapshots/SnapshotShardsService.java | 2 +- .../snapshots/SnapshotsService.java | 10 +- .../ReservedRepositoryActionTests.java | 5 +- .../cluster/ClusterChangedEventTests.java | 2 +- .../RepositoriesHealthTrackerTests.java | 11 +- .../RepositoriesServiceTests.java | 225 ++++++-- .../ResolvedRepositoriesTests.java | 41 +- .../BlobStoreRepositoryRestoreTests.java | 8 +- .../blobstore/BlobStoreRepositoryTests.java | 6 +- .../snapshots/RestoreServiceTests.java | 3 +- .../snapshots/SnapshotResiliencyTests.java | 9 +- .../blobstore/BlobStoreTestUtil.java | 6 +- .../DeleteInternalCcrRepositoryAction.java | 6 +- .../PutInternalCcrRepositoryAction.java | 6 +- .../SourceOnlySnapshotShardTests.java | 2 +- .../store/RepositorySupplier.java | 8 +- .../SearchableSnapshotDirectoryTests.java | 6 +- 34 files changed, 707 insertions(+), 326 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/repositories/RepositoriesServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/repositories/RepositoriesServiceIT.java index ecf910a7ab05f..c64e45dd68276 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/repositories/RepositoriesServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/repositories/RepositoriesServiceIT.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesResponse; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.Environment; @@ -65,7 +66,7 @@ public void testUpdateRepository() { assertThat(originalRepositoryMetadata.type(), equalTo(FsRepository.TYPE)); - final Repository originalRepository = repositoriesService.repository(repositoryName); + final Repository originalRepository = repositoriesService.repository(ProjectId.DEFAULT, repositoryName); assertThat(originalRepository, instanceOf(FsRepository.class)); final boolean updated = randomBoolean(); @@ -89,7 +90,7 @@ public void testUpdateRepository() { assertThat(updatedRepositoryMetadata.type(), equalTo(updatedRepositoryType)); - final Repository updatedRepository = repositoriesService.repository(repositoryName); + final Repository updatedRepository = repositoriesService.repository(ProjectId.DEFAULT, repositoryName); assertThat(updatedRepository, updated ? not(sameInstance(originalRepository)) : sameInstance(originalRepository)); // check that a noop update does not verify. Since the new data node does not share the same `path.repo`, verification will fail if diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/delete/TransportDeleteRepositoryAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/delete/TransportDeleteRepositoryAction.java index fc01c0fecca03..d52e7e91d4f43 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/delete/TransportDeleteRepositoryAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/delete/TransportDeleteRepositoryAction.java @@ -18,6 +18,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.injection.guice.Inject; @@ -36,6 +37,7 @@ public class TransportDeleteRepositoryAction extends AcknowledgedTransportMaster public static final ActionType TYPE = new ActionType<>("cluster:admin/repository/delete"); private final RepositoriesService repositoriesService; + private final ProjectResolver projectResolver; @Inject public TransportDeleteRepositoryAction( @@ -43,7 +45,8 @@ public TransportDeleteRepositoryAction( ClusterService clusterService, RepositoriesService repositoriesService, ThreadPool threadPool, - ActionFilters actionFilters + ActionFilters actionFilters, + ProjectResolver projectResolver ) { super( TYPE.name(), @@ -55,11 +58,12 @@ public TransportDeleteRepositoryAction( EsExecutors.DIRECT_EXECUTOR_SERVICE ); this.repositoriesService = repositoriesService; + this.projectResolver = projectResolver; } @Override protected ClusterBlockException checkBlock(DeleteRepositoryRequest request, ClusterState state) { - return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + return state.blocks().globalBlockedException(projectResolver.getProjectId(), ClusterBlockLevel.METADATA_WRITE); } @Override @@ -69,7 +73,7 @@ protected void masterOperation( ClusterState state, final ActionListener listener ) { - repositoriesService.unregisterRepository(request, listener); + repositoriesService.unregisterRepository(projectResolver.getProjectId(), request, listener); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/get/TransportGetRepositoriesAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/get/TransportGetRepositoriesAction.java index dbb448b8153ee..404ebab5edd14 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/get/TransportGetRepositoriesAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/get/TransportGetRepositoriesAction.java @@ -11,11 +11,12 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.master.TransportMasterNodeReadAction; -import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.action.support.master.TransportMasterNodeReadProjectAction; +import org.elasticsearch.cluster.ProjectState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.RepositoriesMetadata; +import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.injection.guice.Inject; @@ -28,14 +29,15 @@ /** * Transport action for get repositories operation */ -public class TransportGetRepositoriesAction extends TransportMasterNodeReadAction { +public class TransportGetRepositoriesAction extends TransportMasterNodeReadProjectAction { @Inject public TransportGetRepositoriesAction( TransportService transportService, ClusterService clusterService, ThreadPool threadPool, - ActionFilters actionFilters + ActionFilters actionFilters, + ProjectResolver projectResolver ) { super( GetRepositoriesAction.NAME, @@ -44,24 +46,25 @@ public TransportGetRepositoriesAction( threadPool, actionFilters, GetRepositoriesRequest::new, + projectResolver, GetRepositoriesResponse::new, EsExecutors.DIRECT_EXECUTOR_SERVICE ); } @Override - protected ClusterBlockException checkBlock(GetRepositoriesRequest request, ClusterState state) { - return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); + protected ClusterBlockException checkBlock(GetRepositoriesRequest request, ProjectState state) { + return state.blocks().globalBlockedException(state.projectId(), ClusterBlockLevel.METADATA_READ); } @Override protected void masterOperation( Task task, final GetRepositoriesRequest request, - ClusterState state, + ProjectState state, final ActionListener listener ) { - final var result = ResolvedRepositories.resolve(state, request.repositories()); + final var result = ResolvedRepositories.resolve(state.metadata(), request.repositories()); if (result.hasMissingRepositories()) { listener.onFailure(new RepositoryMissingException(String.join(", ", result.missing()))); } else { diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/put/TransportPutRepositoryAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/put/TransportPutRepositoryAction.java index 2456bc6a7c6ce..04de5d876efa5 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/put/TransportPutRepositoryAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/put/TransportPutRepositoryAction.java @@ -18,6 +18,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.injection.guice.Inject; @@ -36,6 +37,7 @@ public class TransportPutRepositoryAction extends AcknowledgedTransportMasterNod public static final ActionType TYPE = new ActionType<>("cluster:admin/repository/put"); private final RepositoriesService repositoriesService; + private final ProjectResolver projectResolver; @Inject public TransportPutRepositoryAction( @@ -43,7 +45,8 @@ public TransportPutRepositoryAction( ClusterService clusterService, RepositoriesService repositoriesService, ThreadPool threadPool, - ActionFilters actionFilters + ActionFilters actionFilters, + ProjectResolver projectResolver ) { super( TYPE.name(), @@ -55,11 +58,12 @@ public TransportPutRepositoryAction( EsExecutors.DIRECT_EXECUTOR_SERVICE ); this.repositoriesService = repositoriesService; + this.projectResolver = projectResolver; } @Override protected ClusterBlockException checkBlock(PutRepositoryRequest request, ClusterState state) { - return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + return state.blocks().globalBlockedException(projectResolver.getProjectId(), ClusterBlockLevel.METADATA_WRITE); } @Override @@ -69,7 +73,7 @@ protected void masterOperation( ClusterState state, final ActionListener listener ) { - repositoriesService.registerRepository(request, listener); + repositoriesService.registerRepository(projectResolver.getProjectId(), request, listener); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/reservedstate/ReservedRepositoryAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/reservedstate/ReservedRepositoryAction.java index ee6e7f320f513..3a307c9712ceb 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/reservedstate/ReservedRepositoryAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/reservedstate/ReservedRepositoryAction.java @@ -11,6 +11,8 @@ import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.reservedstate.ReservedClusterStateHandler; import org.elasticsearch.reservedstate.TransformState; @@ -60,7 +62,9 @@ public Collection prepare(Object input) { for (var repositoryRequest : repositories) { validate(repositoryRequest); RepositoriesService.validateRepositoryName(repositoryRequest.name()); - repositoriesService.validateRepositoryCanBeCreated(repositoryRequest); + @FixForMultiProject + final var projectId = ProjectId.DEFAULT; + repositoriesService.validateRepositoryCanBeCreated(projectId, repositoryRequest); } return repositories; @@ -72,8 +76,14 @@ public TransformState transform(List source, TransformStat ClusterState state = prevState.state(); + @FixForMultiProject + final var projectId = ProjectId.DEFAULT; for (var request : requests) { - RepositoriesService.RegisterRepositoryTask task = new RepositoriesService.RegisterRepositoryTask(repositoriesService, request); + RepositoriesService.RegisterRepositoryTask task = new RepositoriesService.RegisterRepositoryTask( + repositoriesService, + projectId, + request + ); state = task.execute(state); } @@ -83,7 +93,11 @@ public TransformState transform(List source, TransformStat toDelete.removeAll(entities); for (var repositoryToDelete : toDelete) { - var task = new RepositoriesService.UnregisterRepositoryTask(RESERVED_CLUSTER_STATE_HANDLER_IGNORED_TIMEOUT, repositoryToDelete); + var task = new RepositoriesService.UnregisterRepositoryTask( + RESERVED_CLUSTER_STATE_HANDLER_IGNORED_TIMEOUT, + projectId, + repositoryToDelete + ); state = task.execute(state); } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/verify/TransportVerifyRepositoryAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/verify/TransportVerifyRepositoryAction.java index b2344e18ddd67..d51c736d8fb5a 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/verify/TransportVerifyRepositoryAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/verify/TransportVerifyRepositoryAction.java @@ -16,6 +16,7 @@ import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.injection.guice.Inject; @@ -30,6 +31,7 @@ public class TransportVerifyRepositoryAction extends TransportMasterNodeAction { private final RepositoriesService repositoriesService; + private final ProjectResolver projectResolver; @Inject public TransportVerifyRepositoryAction( @@ -37,7 +39,8 @@ public TransportVerifyRepositoryAction( ClusterService clusterService, RepositoriesService repositoriesService, ThreadPool threadPool, - ActionFilters actionFilters + ActionFilters actionFilters, + ProjectResolver projectResolver ) { super( VerifyRepositoryAction.NAME, @@ -50,11 +53,12 @@ public TransportVerifyRepositoryAction( EsExecutors.DIRECT_EXECUTOR_SERVICE ); this.repositoriesService = repositoriesService; + this.projectResolver = projectResolver; } @Override protected ClusterBlockException checkBlock(VerifyRepositoryRequest request, ClusterState state) { - return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); + return state.blocks().globalBlockedException(projectResolver.getProjectId(), ClusterBlockLevel.METADATA_READ); } @Override @@ -65,6 +69,7 @@ protected void masterOperation( final ActionListener listener ) { repositoriesService.verifyRepository( + projectResolver.getProjectId(), request.name(), listener.map(verifyResponse -> new VerifyRepositoryResponse(verifyResponse.toArray(new DiscoveryNode[0]))) ); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java index eab763165a74c..7393ab23d8896 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java @@ -144,7 +144,7 @@ protected void masterOperation( ) { assert task instanceof CancellableTask : task + " not cancellable"; - final var resolvedRepositories = ResolvedRepositories.resolve(state, request.repositories()); + final var resolvedRepositories = ResolvedRepositories.resolve(state.metadata().getProject(), request.repositories()); if (resolvedRepositories.hasMissingRepositories()) { throw new RepositoryMissingException(String.join(", ", resolvedRepositories.missing())); } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java index b501d64b2cd84..bba666dce0a62 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java @@ -24,11 +24,13 @@ import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.common.util.concurrent.ListenableFuture; import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.injection.guice.Inject; @@ -252,7 +254,7 @@ void buildResponse( // BWC behavior, load the stats directly from the repository. shardStatus = new SnapshotIndexShardStatus( shardId, - repositoriesService.repository(entry.repository()) + repositoriesService.repository(entry.projectId(), entry.repository()) .getShardSnapshotStatus( entry.snapshot().getSnapshotId(), entry.indices().get(shardId.getIndexName()), @@ -297,7 +299,9 @@ private void loadRepositoryData( ) { final Set requestedSnapshotNames = Sets.newHashSet(request.snapshots()); final ListenableFuture repositoryDataListener = new ListenableFuture<>(); - repositoriesService.getRepositoryData(repositoryName, repositoryDataListener); + @FixForMultiProject + final var projectId = ProjectId.DEFAULT; + repositoriesService.getRepositoryData(projectId, repositoryName, repositoryDataListener); final Collection snapshotIdsToLoad = new ArrayList<>(); repositoryDataListener.addListener(listener.delegateFailureAndWrap((delegate, repositoryData) -> { task.ensureNotCancelled(); diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java b/server/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java index 8285900f0b00f..e572fa8335dfe 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java @@ -354,25 +354,43 @@ private static ProjectsDelta calculateProjectDelta(Metadata previousMetadata, Me && previousMetadata.hasProject(ProjectId.DEFAULT) && currentMetadata.projects().size() == 1 && currentMetadata.hasProject(ProjectId.DEFAULT))) { - return ProjectsDelta.EMPTY; + return ProjectsDelta.NO_CHANGE_DEFAULT_PROJECT; } - final Set added = Collections.unmodifiableSet( - Sets.difference(currentMetadata.projects().keySet(), previousMetadata.projects().keySet()) - ); - final Set removed = Collections.unmodifiableSet( - Sets.difference(previousMetadata.projects().keySet(), currentMetadata.projects().keySet()) - ); + final Set currentProjectIds = currentMetadata.projects().keySet(); + final Set previousProjectIds = previousMetadata.projects().keySet(); + + final var added = new HashSet(); + final var common = new HashSet(); + for (var projectId : currentProjectIds) { + if (previousProjectIds.contains(projectId)) { + common.add(projectId); + } else { + added.add(projectId); + } + } + + final Set removed = Sets.difference(previousProjectIds, currentProjectIds); // TODO: Enable the following assertions once tests no longer add or remove default projects // assert added.contains(ProjectId.DEFAULT) == false; // assert removed.contains(ProjectId.DEFAULT) == false; - return new ProjectsDelta(added, removed); + + if (added.isEmpty() && removed.isEmpty()) { + return new ProjectsDelta(Set.of(), Set.of(), currentProjectIds); + } else { + return new ProjectsDelta( + Collections.unmodifiableSet(added), + Collections.unmodifiableSet(removed), + Collections.unmodifiableSet(common) + ); + } } - public record ProjectsDelta(Set added, Set removed) { - private static final ProjectsDelta EMPTY = new ProjectsDelta(Set.of(), Set.of()); + public record ProjectsDelta(Set added, Set removed, Set common) { + + private static final ProjectsDelta NO_CHANGE_DEFAULT_PROJECT = new ProjectsDelta(Set.of(), Set.of(), Set.of(ProjectId.DEFAULT)); - public boolean isEmpty() { + public boolean hasNoChange() { return added.isEmpty() && removed.isEmpty(); } } diff --git a/server/src/main/java/org/elasticsearch/health/node/tracker/RepositoriesHealthTracker.java b/server/src/main/java/org/elasticsearch/health/node/tracker/RepositoriesHealthTracker.java index 5fd573a3f665d..a85a9bf73bea0 100644 --- a/server/src/main/java/org/elasticsearch/health/node/tracker/RepositoriesHealthTracker.java +++ b/server/src/main/java/org/elasticsearch/health/node/tracker/RepositoriesHealthTracker.java @@ -42,7 +42,7 @@ protected RepositoriesHealthInfo determineCurrentHealth() { var unknown = new ArrayList(); var invalid = new ArrayList(); - repositories.values().forEach(repository -> { + repositories.forEach(repository -> { if (repository instanceof UnknownTypeRepository) { unknown.add(repository.getMetadata().name()); } else if (repository instanceof InvalidRepository) { diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 01b7e63280d77..1b76f7adfc2b9 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -39,6 +39,7 @@ import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.MappingMetadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; @@ -153,6 +154,7 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.internal.FieldUsageTrackingDirectoryReader; import org.elasticsearch.search.suggest.completion.CompletionStats; +import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transports; @@ -3523,12 +3525,14 @@ public void startRecovery( } } case SNAPSHOT -> { - final String repo = ((SnapshotRecoverySource) recoveryState.getRecoverySource()).snapshot().getRepository(); + final Snapshot snapshot = ((SnapshotRecoverySource) recoveryState.getRecoverySource()).snapshot(); + final ProjectId projectId = snapshot.getProjectId(); + final String repo = snapshot.getRepository(); executeRecovery( "from snapshot", recoveryState, recoveryListener, - l -> restoreFromRepository(repositoriesService.repository(repo), l) + l -> restoreFromRepository(repositoriesService.repository(projectId, repo), l) ); } case LOCAL_SHARDS -> { diff --git a/server/src/main/java/org/elasticsearch/repositories/IndexSnapshotsService.java b/server/src/main/java/org/elasticsearch/repositories/IndexSnapshotsService.java index bd86ab93d30e7..01c7f521c3306 100644 --- a/server/src/main/java/org/elasticsearch/repositories/IndexSnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/repositories/IndexSnapshotsService.java @@ -13,8 +13,10 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ListenableFuture; +import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshots; @@ -28,7 +30,6 @@ import java.io.IOException; import java.util.Comparator; import java.util.List; -import java.util.Map; import java.util.Optional; public class IndexSnapshotsService { @@ -136,8 +137,9 @@ public void getLatestSuccessfulSnapshotForShard( } private Repository getRepository(String repositoryName) { - final Map repositories = repositoriesService.getRepositories(); - return repositories.get(repositoryName); + @FixForMultiProject + final var projectId = ProjectId.DEFAULT; + return repositoriesService.repositoryOrNull(projectId, repositoryName); } private static class FetchShardSnapshotContext { diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java index 6f009ed92cf94..ebb085fb00b6f 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateApplier; import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.ProjectState; import org.elasticsearch.cluster.RepositoryCleanupInProgress; import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.SnapshotDeletionsInProgress; @@ -66,6 +67,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.stream.Collectors; @@ -73,6 +75,7 @@ import static java.util.Collections.unmodifiableMap; import static org.elasticsearch.core.Strings.format; +import static org.elasticsearch.repositories.RepositoryOperation.projectRepoString; import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_REPOSITORY_NAME_SETTING_KEY; import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_REPOSITORY_UUID_SETTING_KEY; @@ -84,8 +87,9 @@ * factory information to create new repositories, and provides access to and maintains the lifecycle of repositories. New nodes can easily * find all the repositories via the cluster state after joining a cluster. * - * {@link #repository(String)} can be used to fetch a repository. {@link #createRepository(RepositoryMetadata)} does the heavy lifting of - * creation. {@link #applyClusterState(ClusterChangedEvent)} handles adding and removing repositories per cluster state updates. + * {@link #repository(ProjectId, String)} can be used to fetch a repository. + * {@link #createRepository(ProjectId, RepositoryMetadata)} does the heavy lifting of creation. + * {@link #applyClusterState(ClusterChangedEvent)} handles adding and removing repositories per cluster state updates. */ public class RepositoriesService extends AbstractLifecycleComponent implements ClusterStateApplier { @@ -112,8 +116,8 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C private final ThreadPool threadPool; private final NodeClient client; - private final Map internalRepositories = ConcurrentCollections.newConcurrentMap(); - private volatile Map repositories = Collections.emptyMap(); + private final Map> internalRepositories = ConcurrentCollections.newConcurrentMap(); + private final Map> repositories = ConcurrentCollections.newConcurrentMap(); private final RepositoriesStatsArchive repositoriesStatsArchive; private final List> preRestoreChecks; @@ -154,10 +158,15 @@ public RepositoriesService( * This method can be only called on the master node. * It tries to create a new repository on the master, and if it was successful, it adds a new repository to cluster metadata. * + * @param projectId the project ID to which the repository belongs. * @param request register repository request * @param responseListener register repository listener */ - public void registerRepository(final PutRepositoryRequest request, final ActionListener responseListener) { + public void registerRepository( + final ProjectId projectId, + final PutRepositoryRequest request, + final ActionListener responseListener + ) { assert lifecycle.started() : "Trying to register new repository but service is in state [" + lifecycle.state() + "]"; validateRepositoryName(request.name()); @@ -167,7 +176,7 @@ record RegisterRepositoryTaskResult(AcknowledgedResponse ackResponse, boolean ch SubscribableListener // Trying to create the new repository on master to make sure it works - .newForked(validationStep -> validatePutRepositoryRequest(request, validationStep)) + .newForked(validationStep -> validatePutRepositoryRequest(projectId, request, validationStep)) // When publication has completed (and all acks received or timed out) then verify the repository. // (if acks timed out then acknowledgementStep completes before the master processes this cluster state, hence why we have @@ -176,11 +185,11 @@ record RegisterRepositoryTaskResult(AcknowledgedResponse ackResponse, boolean ch final ListenableFuture acknowledgementStep = new ListenableFuture<>(); final ListenableFuture publicationStep = new ListenableFuture<>(); // Boolean==changed. submitUnbatchedTask( - "put_repository [" + request.name() + "]", - new RegisterRepositoryTask(this, request, acknowledgementStep) { + "put_repository " + projectRepoString(projectId, request.name()), + new RegisterRepositoryTask(this, projectId, request, acknowledgementStep) { @Override public void onFailure(Exception e) { - logger.warn(() -> "failed to create repository [" + request.name() + "]", e); + logger.warn(() -> "failed to create repository " + projectRepoString(projectId, request.name()), e); publicationStep.onFailure(e); super.onFailure(e); } @@ -195,9 +204,9 @@ public boolean mustAck(DiscoveryNode discoveryNode) { public void clusterStateProcessed(ClusterState oldState, ClusterState newState) { if (changed) { if (found) { - logger.info("updated repository [{}]", request.name()); + logger.info("updated repository {}", projectRepoString(projectId, request.name())); } else { - logger.info("put repository [{}]", request.name()); + logger.info("put repository {}", projectRepoString(projectId, request.name())); } } publicationStep.onResponse(oldState != newState); @@ -220,7 +229,7 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState) .>newForked(verifyRepositoryStep -> { if (taskResult.ackResponse.isAcknowledged() && taskResult.changed) { - verifyRepository(request.name(), verifyRepositoryStep); + verifyRepository(projectId, request.name(), verifyRepositoryStep); } else { verifyRepositoryStep.onResponse(null); } @@ -231,7 +240,7 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState) .execute( ActionRunnable.wrap( getRepositoryDataStep, - ll -> repository(request.name()).getRepositoryData( + ll -> repository(projectId, request.name()).getRepositoryData( // TODO contemplate threading, do we need to fork, see #101445? EsExecutors.DIRECT_EXECUTOR_SERVICE, ll @@ -243,6 +252,7 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState) .andThen( (updateRepoUuidStep, repositoryData) -> updateRepositoryUuidInMetadata( clusterService, + projectId, request.name(), repositoryData, updateRepoUuidStep @@ -263,16 +273,19 @@ public static class RegisterRepositoryTask extends AckedClusterStateUpdateTask { protected boolean found = false; protected boolean changed = false; + private final ProjectId projectId; private final PutRepositoryRequest request; private final RepositoriesService repositoriesService; RegisterRepositoryTask( final RepositoriesService repositoriesService, + final ProjectId projectId, final PutRepositoryRequest request, final ListenableFuture acknowledgementStep ) { super(request, acknowledgementStep); this.repositoriesService = repositoriesService; + this.projectId = projectId; this.request = request; } @@ -281,14 +294,18 @@ public static class RegisterRepositoryTask extends AckedClusterStateUpdateTask { * @param repositoriesService * @param request */ - public RegisterRepositoryTask(final RepositoriesService repositoriesService, final PutRepositoryRequest request) { - this(repositoriesService, request, null); + public RegisterRepositoryTask( + final RepositoriesService repositoriesService, + final ProjectId projectId, + final PutRepositoryRequest request + ) { + this(repositoriesService, projectId, request, null); } @Override public ClusterState execute(ClusterState currentState) { - final var project = currentState.metadata().getDefaultProject(); - RepositoriesMetadata repositories = RepositoriesMetadata.get(project); + final var projectState = currentState.projectState(projectId); + RepositoriesMetadata repositories = RepositoriesMetadata.get(projectState.metadata()); List repositoriesMetadata = new ArrayList<>(repositories.repositories().size() + 1); for (RepositoryMetadata repositoryMetadata : repositories.repositories()) { if (repositoryMetadata.name().equals(request.name())) { @@ -304,10 +321,7 @@ public ClusterState execute(ClusterState currentState) { request.type(), request.settings() ); - Repository existing = repositoriesService.repositories.get(request.name()); - if (existing == null) { - existing = repositoriesService.internalRepositories.get(request.name()); - } + Repository existing = repositoriesService.repositoryOrNull(projectId, request.name()); assert existing != null : "repository [" + newRepositoryMetadata.name() + "] must exist"; assert existing.getMetadata() == repositoryMetadata; final RepositoryMetadata updatedMetadata; @@ -316,7 +330,7 @@ public ClusterState execute(ClusterState currentState) { if (repositoryMetadata.generation() == RepositoryData.CORRUPTED_REPO_GEN) { // If recreating a corrupted repository with the same settings, reset the corrupt flag. // Setting the safe generation to unknown, so that a consistent generation is found. - ensureRepositoryNotInUse(currentState, request.name()); + ensureRepositoryNotInUse(projectState, request.name()); logger.info( "repository [{}/{}] is marked as corrupted, resetting the corruption marker", repositoryMetadata.name(), @@ -334,7 +348,7 @@ public ClusterState execute(ClusterState currentState) { // we're updating in place so the updated metadata must point at the same uuid and generations updatedMetadata = repositoryMetadata.withSettings(newRepositoryMetadata.settings()); } else { - ensureRepositoryNotInUse(currentState, request.name()); + ensureRepositoryNotInUse(projectState, request.name()); updatedMetadata = newRepositoryMetadata; } found = true; @@ -349,7 +363,7 @@ public ClusterState execute(ClusterState currentState) { repositories = new RepositoriesMetadata(repositoriesMetadata); changed = true; return ClusterState.builder(currentState) - .putProjectMetadata(ProjectMetadata.builder(project).putCustom(RepositoriesMetadata.TYPE, repositories)) + .putProjectMetadata(ProjectMetadata.builder(projectState.metadata()).putCustom(RepositoriesMetadata.TYPE, repositories)) .build(); } } @@ -361,17 +375,21 @@ public ClusterState execute(ClusterState currentState) { * * @param request */ - public void validateRepositoryCanBeCreated(final PutRepositoryRequest request) { + public void validateRepositoryCanBeCreated(final ProjectId projectId, final PutRepositoryRequest request) { final RepositoryMetadata newRepositoryMetadata = new RepositoryMetadata(request.name(), request.type(), request.settings()); // Trying to create the new repository on master to make sure it works - closeRepository(createRepository(newRepositoryMetadata)); + closeRepository(createRepository(projectId, newRepositoryMetadata)); } - private void validatePutRepositoryRequest(final PutRepositoryRequest request, ActionListener resultListener) { + private void validatePutRepositoryRequest( + final ProjectId projectId, + final PutRepositoryRequest request, + ActionListener resultListener + ) { final RepositoryMetadata newRepositoryMetadata = new RepositoryMetadata(request.name(), request.type(), request.settings()); try { - final var repository = createRepository(newRepositoryMetadata); + final var repository = createRepository(projectId, newRepositoryMetadata); if (request.verify()) { // verify repository on local node only, different from verifyRepository method that runs on other cluster nodes threadPool.executor(ThreadPool.Names.SNAPSHOT) @@ -413,6 +431,7 @@ private static void submitUnbatchedTask( */ public static void updateRepositoryUuidInMetadata( ClusterService clusterService, + final ProjectId projectId, final String repositoryName, RepositoryData repositoryData, ActionListener listener @@ -424,7 +443,8 @@ public static void updateRepositoryUuidInMetadata( return; } - final RepositoryMetadata repositoryMetadata = RepositoriesMetadata.get(clusterService.state()).repository(repositoryName); + final RepositoryMetadata repositoryMetadata = RepositoriesMetadata.get(clusterService.state().metadata().getProject(projectId)) + .repository(repositoryName); if (repositoryMetadata == null || repositoryMetadata.uuid().equals(repositoryUuid)) { listener.onResponse(null); return; @@ -441,11 +461,11 @@ public static void updateRepositoryUuidInMetadata( submitUnbatchedTask( clusterService, - "update repository UUID [" + repositoryName + "] to [" + repositoryUuid + "]", + "update repository UUID " + projectRepoString(projectId, repositoryName) + " to [" + repositoryUuid + "]", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { - final var project = currentState.metadata().getDefaultProject(); + final var project = currentState.metadata().getProject(projectId); final RepositoriesMetadata currentReposMetadata = RepositoriesMetadata.get(project); final RepositoryMetadata repositoryMetadata = currentReposMetadata.repository(repositoryName); @@ -477,24 +497,32 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState) *

* This method can be only called on the master node. It removes repository information from cluster metadata. * + * @param projectId project to look for the repository * @param request unregister repository request * @param listener unregister repository listener */ - public void unregisterRepository(final DeleteRepositoryRequest request, final ActionListener listener) { - submitUnbatchedTask("delete_repository [" + request.name() + "]", new UnregisterRepositoryTask(request, listener) { - @Override - public void clusterStateProcessed(ClusterState oldState, ClusterState newState) { - if (deletedRepositories.isEmpty() == false) { - logger.info("deleted repositories [{}]", deletedRepositories); + public void unregisterRepository( + final ProjectId projectId, + final DeleteRepositoryRequest request, + final ActionListener listener + ) { + submitUnbatchedTask( + "delete_repository " + projectRepoString(projectId, request.name()), + new UnregisterRepositoryTask(projectId, request, listener) { + @Override + public void clusterStateProcessed(ClusterState oldState, ClusterState newState) { + if (deletedRepositories.isEmpty() == false) { + logger.info("deleted repositories [{}] for project [{}]", deletedRepositories, projectId); + } } - } - @Override - public boolean mustAck(DiscoveryNode discoveryNode) { - // repository was created on both master and data nodes - return discoveryNode.isMasterNode() || discoveryNode.canContainData(); + @Override + public boolean mustAck(DiscoveryNode discoveryNode) { + // repository was created on both master and data nodes + return discoveryNode.isMasterNode() || discoveryNode.canContainData(); + } } - }); + ); } /** @@ -503,10 +531,16 @@ public boolean mustAck(DiscoveryNode discoveryNode) { */ public static class UnregisterRepositoryTask extends AckedClusterStateUpdateTask { protected final List deletedRepositories = new ArrayList<>(); + private final ProjectId projectId; private final DeleteRepositoryRequest request; - UnregisterRepositoryTask(final DeleteRepositoryRequest request, final ActionListener listener) { + UnregisterRepositoryTask( + final ProjectId projectId, + final DeleteRepositoryRequest request, + final ActionListener listener + ) { super(request, listener); + this.projectId = projectId; this.request = request; } @@ -514,20 +548,20 @@ public static class UnregisterRepositoryTask extends AckedClusterStateUpdateTask * Constructor used by {@link org.elasticsearch.action.admin.cluster.repositories.reservedstate.ReservedRepositoryAction} * @param name the repository name */ - public UnregisterRepositoryTask(TimeValue dummyTimeout, String name) { - this(new DeleteRepositoryRequest(dummyTimeout, dummyTimeout, name), null); + public UnregisterRepositoryTask(TimeValue dummyTimeout, ProjectId projectId, String name) { + this(projectId, new DeleteRepositoryRequest(dummyTimeout, dummyTimeout, name), null); } @Override public ClusterState execute(ClusterState currentState) { - final var project = currentState.metadata().getDefaultProject(); - RepositoriesMetadata repositories = RepositoriesMetadata.get(project); + final var projectState = currentState.projectState(projectId); + RepositoriesMetadata repositories = RepositoriesMetadata.get(projectState.metadata()); if (repositories.repositories().size() > 0) { List repositoriesMetadata = new ArrayList<>(repositories.repositories().size()); boolean changed = false; for (RepositoryMetadata repositoryMetadata : repositories.repositories()) { if (Regex.simpleMatch(request.name(), repositoryMetadata.name())) { - ensureRepositoryNotInUse(currentState, repositoryMetadata.name()); + ensureRepositoryNotInUse(projectState, repositoryMetadata.name()); ensureNoSearchableSnapshotsIndicesInUse(currentState, repositoryMetadata); deletedRepositories.add(repositoryMetadata.name()); changed = true; @@ -538,7 +572,9 @@ public ClusterState execute(ClusterState currentState) { if (changed) { repositories = new RepositoriesMetadata(repositoriesMetadata); return ClusterState.builder(currentState) - .putProjectMetadata(ProjectMetadata.builder(project).putCustom(RepositoriesMetadata.TYPE, repositories)) + .putProjectMetadata( + ProjectMetadata.builder(projectState.metadata()).putCustom(RepositoriesMetadata.TYPE, repositories) + ) .build(); } } @@ -549,8 +585,12 @@ public ClusterState execute(ClusterState currentState) { } } - public void verifyRepository(final String repositoryName, final ActionListener> listener) { - final Repository repository = repository(repositoryName); + public void verifyRepository( + final ProjectId projectId, + final String repositoryName, + final ActionListener> listener + ) { + final Repository repository = repository(projectId, repositoryName); threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new ActionRunnable<>(listener) { @Override protected void doRun() { @@ -566,7 +606,11 @@ protected void doRun() { try { repository.endVerification(verificationToken); } catch (Exception e) { - logger.warn(() -> "[" + repositoryName + "] failed to finish repository verification", e); + logger.warn( + () -> projectRepoString(projectId, repositoryName) + + " failed to finish repository verification", + e + ); delegatedListener.onFailure(e); return; } @@ -580,7 +624,10 @@ protected void doRun() { repository.endVerification(verificationToken); } catch (Exception inner) { inner.addSuppressed(e); - logger.warn(() -> "[" + repositoryName + "] failed to finish repository verification", inner); + logger.warn( + () -> projectRepoString(projectId, repositoryName) + " failed to finish repository verification", + inner + ); } listener.onFailure(e); }); @@ -608,78 +655,128 @@ public static boolean isDedicatedVotingOnlyNode(Set roles) { public void applyClusterState(ClusterChangedEvent event) { try { final ClusterState state = event.state(); - assert assertReadonlyRepositoriesNotInUseForWrites(state); - final RepositoriesMetadata oldMetadata = RepositoriesMetadata.get(event.previousState()); - final RepositoriesMetadata newMetadata = RepositoriesMetadata.get(state); - - // Check if repositories got changed - if (oldMetadata.equalsIgnoreGenerations(newMetadata)) { - for (Repository repo : repositories.values()) { - repo.updateState(state); - } - return; + final ClusterState previousState = event.previousState(); + + for (var projectId : event.projectDelta().removed()) { // removed projects + applyProjectState(state.version(), null, previousState.projectState(projectId)); } - logger.trace("processing new index repositories for state version [{}]", event.state().version()); + for (var projectId : event.projectDelta().added()) { // added projects + applyProjectState(state.version(), state.projectState(projectId), null); + } - Map survivors = new HashMap<>(); - // First, remove repositories that are no longer there - for (Map.Entry entry : repositories.entrySet()) { - if (newMetadata.repository(entry.getKey()) == null) { - logger.debug("unregistering repository [{}]", entry.getKey()); - Repository repository = entry.getValue(); - closeRepository(repository); - archiveRepositoryStats(repository, state.version()); - } else { - survivors.put(entry.getKey(), entry.getValue()); - } + for (var projectId : event.projectDelta().common()) { // existing projects + applyProjectState(state.version(), state.projectState(projectId), previousState.projectState(projectId)); + } + } catch (Exception ex) { + assert false : new AssertionError(ex); + logger.warn("failure updating cluster state ", ex); + } + } + + /** + * Apply changes for one project. The project can be either newly added, removed or an existing one. + * + * @param version The cluster state version of the change. + * @param state The current project state, or {@code null} if the project was removed. + * @param previousState the previous project state, or {@code null} if the project was newly added. + */ + private void applyProjectState(long version, @Nullable ProjectState state, @Nullable ProjectState previousState) { + assert state != null || previousState != null : "state and previousState cannot both be null"; + assert state == null || assertReadonlyRepositoriesNotInUseForWrites(state); + + final var projectId = state != null ? state.projectId() : previousState.projectId(); + assert ProjectId.DEFAULT.equals(projectId) == false || (state != null && previousState != null) + : "default project cannot be added or removed"; + assert previousState == null || projectId.equals(previousState.projectId()) + : "current and previous states must refer to the same project, but got " + projectId + " != " + previousState.projectId(); + + final RepositoriesMetadata newMetadata = state == null ? RepositoriesMetadata.EMPTY : RepositoriesMetadata.get(state.metadata()); + final RepositoriesMetadata oldMetadata = previousState == null + ? RepositoriesMetadata.EMPTY + : RepositoriesMetadata.get(previousState.metadata()); + + final Map projectRepositories = getProjectRepositories(projectId); + // Check if repositories got changed + if (state != null && oldMetadata.equalsIgnoreGenerations(newMetadata)) { + for (Repository repo : projectRepositories.values()) { + repo.updateState(state.cluster()); } + return; + } - Map builder = new HashMap<>(); - - // Now go through all repositories and update existing or create missing - for (RepositoryMetadata repositoryMetadata : newMetadata.repositories()) { - Repository repository = survivors.get(repositoryMetadata.name()); - if (repository != null) { - // Found previous version of this repository - if (canUpdateInPlace(repositoryMetadata, repository) == false) { - // Previous version is different from the version in settings - logger.debug("updating repository [{}]", repositoryMetadata.name()); - closeRepository(repository); - archiveRepositoryStats(repository, state.version()); - repository = null; - try { - repository = createRepository( - repositoryMetadata, - typesRegistry, - RepositoriesService::createUnknownTypeRepository - ); - } catch (RepositoryException ex) { - // TODO: this catch is bogus, it means the old repo is already closed, - // but we have nothing to replace it - logger.warn(() -> "failed to change repository [" + repositoryMetadata.name() + "]", ex); - repository = new InvalidRepository(state.metadata().getProject().id(), repositoryMetadata, ex); - } - } - } else { + logger.trace("processing new index repositories for project [{}] and state version [{}]", projectId, version); + + Map survivors = new HashMap<>(); + // First, remove repositories that are no longer there + for (Map.Entry entry : projectRepositories.entrySet()) { + if (newMetadata.repository(entry.getKey()) == null) { + logger.debug("unregistering repository {}", projectRepoString(projectId, entry.getKey())); + Repository repository = entry.getValue(); + closeRepository(repository); + archiveRepositoryStats(repository, version); + } else { + survivors.put(entry.getKey(), entry.getValue()); + } + } + + if (state == null) { // removed project + assert survivors.isEmpty() : "expect no repositories for removed project [" + projectId + "], but got " + survivors.keySet(); + repositories.remove(projectId); + return; + } + + Map builder = new HashMap<>(); + + // Now go through all repositories and update existing or create missing + for (RepositoryMetadata repositoryMetadata : newMetadata.repositories()) { + Repository repository = survivors.get(repositoryMetadata.name()); + if (repository != null) { + // Found previous version of this repository + if (canUpdateInPlace(repositoryMetadata, repository) == false) { + // Previous version is different from the version in settings + logger.debug("updating repository {}", projectRepoString(projectId, repositoryMetadata.name())); + closeRepository(repository); + archiveRepositoryStats(repository, version); + repository = null; try { - repository = createRepository(repositoryMetadata, typesRegistry, RepositoriesService::createUnknownTypeRepository); + repository = createRepository( + projectId, + repositoryMetadata, + typesRegistry, + RepositoriesService::createUnknownTypeRepository + ); } catch (RepositoryException ex) { - logger.warn(() -> "failed to create repository [" + repositoryMetadata.name() + "]", ex); - repository = new InvalidRepository(state.metadata().getProject().id(), repositoryMetadata, ex); + // TODO: this catch is bogus, it means the old repo is already closed, + // but we have nothing to replace it + logger.warn(() -> "failed to change repository " + projectRepoString(projectId, repositoryMetadata.name()), ex); + repository = new InvalidRepository(projectId, repositoryMetadata, ex); } } - assert repository != null : "repository should not be null here"; - logger.debug("registering repository [{}]", repositoryMetadata.name()); - builder.put(repositoryMetadata.name(), repository); - } - for (Repository repo : builder.values()) { - repo.updateState(state); + } else { + try { + repository = createRepository( + projectId, + repositoryMetadata, + typesRegistry, + RepositoriesService::createUnknownTypeRepository + ); + } catch (RepositoryException ex) { + logger.warn(() -> "failed to create repository " + projectRepoString(projectId, repositoryMetadata.name()), ex); + repository = new InvalidRepository(projectId, repositoryMetadata, ex); + } } - repositories = unmodifiableMap(builder); - } catch (Exception ex) { - assert false : new AssertionError(ex); - logger.warn("failure updating cluster state ", ex); + assert repository != null : "repository should not be null here"; + logger.debug("registering repository [{}]", projectRepoString(projectId, repositoryMetadata.name())); + builder.put(repositoryMetadata.name(), repository); + } + for (Repository repo : builder.values()) { + repo.updateState(state.cluster()); + } + if (builder.isEmpty() == false) { + repositories.put(projectId, unmodifiableMap(builder)); + } else { + repositories.remove(projectId); } } @@ -692,12 +789,13 @@ private static boolean canUpdateInPlace(RepositoryMetadata updatedMetadata, Repo /** * Gets the {@link RepositoryData} for the given repository. * + * @param projectId project to look for the repository * @param repositoryName repository name * @param listener listener to pass {@link RepositoryData} to */ - public void getRepositoryData(final String repositoryName, final ActionListener listener) { + public void getRepositoryData(final ProjectId projectId, final String repositoryName, final ActionListener listener) { try { - Repository repository = repository(repositoryName); + Repository repository = repository(projectId, repositoryName); assert repository != null; // should only be called once we've validated the repository exists repository.getRepositoryData( EsExecutors.DIRECT_EXECUTOR_SERVICE, // TODO contemplate threading here, do we need to fork, see #101445? @@ -709,29 +807,62 @@ public void getRepositoryData(final String repositoryName, final ActionListener< } /** - * Returns registered repository + * Returns registered repository, either internal or external * * @param repositoryName repository name * @return registered repository * @throws RepositoryMissingException if repository with such name isn't registered */ + @FixForMultiProject + @Deprecated(forRemoval = true) public Repository repository(String repositoryName) { - Repository repository = repositories.get(repositoryName); + return repository(ProjectId.DEFAULT, repositoryName); + } + + /** + * Returns registered repository, either internal or external + * + * @param projectId the project to look for the repository + * @param repositoryName repository name + * @return registered repository + * @throws RepositoryMissingException if repository with such name isn't registered + */ + public Repository repository(ProjectId projectId, String repositoryName) { + Repository repository = repositoryOrNull(projectId, repositoryName); if (repository != null) { return repository; } - repository = internalRepositories.get(repositoryName); + throw new RepositoryMissingException(repositoryName); + } + + /** + * Similar to {@link #repository(ProjectId, String)}, but returns {@code null} instead of throw if the repository is not found. + */ + public Repository repositoryOrNull(ProjectId projectId, String repositoryName) { + Repository repository = repositories.getOrDefault(projectId, Map.of()).get(repositoryName); if (repository != null) { return repository; } - throw new RepositoryMissingException(repositoryName); + return internalRepositories.getOrDefault(projectId, Map.of()).get(repositoryName); + } + + /** + * @return the current collection of registered repositories from all projects. + */ + public List getRepositories() { + return repositories.values().stream().map(Map::values).flatMap(Collection::stream).toList(); } /** - * @return the current collection of registered repositories, keyed by name. + * @return the current collection of registered repositories for the given project, keyed by name. */ - public Map getRepositories() { - return unmodifiableMap(repositories); + public Map getProjectRepositories(ProjectId projectId) { + return repositories.getOrDefault(projectId, Map.of()); + } + + // Package private for testing + boolean hasRepositoryTrackingForProject(ProjectId projectId) { + return repositories.containsKey(projectId) || internalRepositories.containsKey(projectId); } public List repositoriesStats() { @@ -745,8 +876,7 @@ public List repositoriesStats() { public RepositoriesStats getRepositoriesThrottlingStats() { return new RepositoriesStats( - repositories.values() - .stream() + getRepositories().stream() .collect( Collectors.toMap( r -> r.getMetadata().name(), @@ -757,7 +887,10 @@ public RepositoriesStats getRepositoriesThrottlingStats() { } private List getRepositoryStatsForActiveRepositories() { - return Stream.concat(repositories.values().stream(), internalRepositories.values().stream()) + return Stream.concat( + repositories.values().stream().map(Map::values).flatMap(Collection::stream), + internalRepositories.values().stream().map(Map::values).flatMap(Collection::stream) + ) .filter(r -> r instanceof MeteredBlobStoreRepository) .map(r -> (MeteredBlobStoreRepository) r) .map(MeteredBlobStoreRepository::statsSnapshot) @@ -768,12 +901,26 @@ public List clearRepositoriesStatsArchive(long maxVersi return repositoriesStatsArchive.clear(maxVersionToClear); } - public void registerInternalRepository(String name, String type) { + public void registerInternalRepository(ProjectId projectId, String name, String type) { RepositoryMetadata metadata = new RepositoryMetadata(name, type, Settings.EMPTY); - Repository repository = internalRepositories.computeIfAbsent(name, (n) -> { - logger.debug("put internal repository [{}][{}]", name, type); - return createRepository(metadata, internalTypesRegistry, RepositoriesService::throwRepositoryTypeDoesNotExists); - }); + Repository repository = internalRepositories.compute(projectId, (ignored, existingRepos) -> { + if (existingRepos == null) { + existingRepos = Map.of(); + } + if (existingRepos.containsKey(name)) { + return existingRepos; + } + logger.debug("put internal repository [{}][{}]", projectRepoString(projectId, name), type); + final var repo = createRepository( + projectId, + metadata, + internalTypesRegistry, + RepositoriesService::throwRepositoryTypeDoesNotExists + ); + final var newRepos = new HashMap<>(existingRepos); + newRepos.put(name, repo); + return unmodifiableMap(newRepos); + }).get(name); if (type.equals(repository.getMetadata().type()) == false) { logger.warn( () -> format( @@ -785,7 +932,7 @@ public void registerInternalRepository(String name, String type) { type ) ); - } else if (repositories.containsKey(name)) { + } else if (getProjectRepositories(projectId).containsKey(name)) { logger.warn( () -> format( "non-internal repository [%s] already registered. this repository will block the " @@ -798,11 +945,24 @@ public void registerInternalRepository(String name, String type) { } } - public void unregisterInternalRepository(String name) { - Repository repository = internalRepositories.remove(name); + public void unregisterInternalRepository(ProjectId projectId, String name) { + final var repositoryRef = new AtomicReference(); + internalRepositories.computeIfPresent(projectId, (ignored, existingRepos) -> { + if (existingRepos.containsKey(name) == false) { + return existingRepos; + } + final var newRepos = new HashMap<>(existingRepos); + repositoryRef.set(newRepos.remove(name)); + if (newRepos.isEmpty()) { + return null; + } else { + return unmodifiableMap(newRepos); + } + }); + Repository repository = repositoryRef.get(); if (repository != null) { RepositoryMetadata metadata = repository.getMetadata(); - logger.debug(() -> format("delete internal repository [%s][%s].", metadata.type(), name)); + logger.debug(() -> format("delete internal repository [%s][%s].", metadata.type(), projectRepoString(projectId, name))); closeRepository(repository); } } @@ -811,7 +971,11 @@ public void unregisterInternalRepository(String name) { * Closes the given repository. */ private static void closeRepository(Repository repository) { - logger.debug("closing repository [{}][{}]", repository.getMetadata().type(), repository.getMetadata().name()); + logger.debug( + "closing repository [{}]{}", + repository.getMetadata().type(), + projectRepoString(repository.getProjectId(), repository.getMetadata().name()) + ); repository.close(); } @@ -824,19 +988,6 @@ private void archiveRepositoryStats(Repository repository, long clusterStateVers } } - /** - * Creates repository holder. This method starts the repository - */ - @FixForMultiProject(description = "resolve the actual ProjectId") - @Deprecated(forRemoval = true) - private static Repository createRepository( - RepositoryMetadata repositoryMetadata, - Map factories, - BiFunction defaultFactory - ) { - return createRepository(ProjectId.DEFAULT, repositoryMetadata, factories, defaultFactory); - } - /** * Creates repository holder. This method starts the repository */ @@ -863,27 +1014,6 @@ private static Repository createRepository( } } - /** - * Creates a repository holder. - * - *

WARNING: This method is intended for expert only usage mainly in plugins/modules. Please take note of the following:

- * - *
    - *
  • This method does not register the repository (e.g., in the cluster state).
  • - *
  • This method starts the repository. The repository should be closed after use.
  • - *
  • The repository metadata should be associated to an already registered non-internal repository type and factory pair.
  • - *
- * - * @param repositoryMetadata the repository metadata - * @return the started repository - * @throws RepositoryException if repository type is not registered - */ - @FixForMultiProject(description = "resolve the actual ProjectId") - @Deprecated(forRemoval = true) - public Repository createRepository(RepositoryMetadata repositoryMetadata) { - return createRepository(ProjectId.DEFAULT, repositoryMetadata); - } - /** * Creates a repository holder. * @@ -947,25 +1077,26 @@ public static void validateRepositoryName(final String repositoryName) { } } - private static void ensureRepositoryNotInUseForWrites(ClusterState clusterState, String repository) { - if (SnapshotsInProgress.get(clusterState).forRepo(repository).isEmpty() == false) { + private static void ensureRepositoryNotInUseForWrites(ProjectState projectState, String repository) { + final ProjectId projectId = projectState.projectId(); + if (SnapshotsInProgress.get(projectState.cluster()).forRepo(projectId, repository).isEmpty() == false) { throw newRepositoryConflictException(repository, "snapshot is in progress"); } - for (SnapshotDeletionsInProgress.Entry entry : SnapshotDeletionsInProgress.get(clusterState).getEntries()) { - if (entry.repository().equals(repository)) { + for (SnapshotDeletionsInProgress.Entry entry : SnapshotDeletionsInProgress.get(projectState.cluster()).getEntries()) { + if (entry.projectId().equals(projectId) && entry.repository().equals(repository)) { throw newRepositoryConflictException(repository, "snapshot deletion is in progress"); } } - for (RepositoryCleanupInProgress.Entry entry : RepositoryCleanupInProgress.get(clusterState).entries()) { - if (entry.repository().equals(repository)) { + for (RepositoryCleanupInProgress.Entry entry : RepositoryCleanupInProgress.get(projectState.cluster()).entries()) { + if (entry.projectId().equals(projectId) && entry.repository().equals(repository)) { throw newRepositoryConflictException(repository, "repository clean up is in progress"); } } } - private static void ensureRepositoryNotInUse(ClusterState clusterState, String repository) { - ensureRepositoryNotInUseForWrites(clusterState, repository); - for (RestoreInProgress.Entry entry : RestoreInProgress.get(clusterState)) { + private static void ensureRepositoryNotInUse(ProjectState projectState, String repository) { + ensureRepositoryNotInUseForWrites(projectState, repository); + for (RestoreInProgress.Entry entry : RestoreInProgress.get(projectState.cluster())) { if (repository.equals(entry.snapshot().getRepository())) { throw newRepositoryConflictException(repository, "snapshot restore is in progress"); } @@ -979,11 +1110,11 @@ public static boolean isReadOnly(Settings repositorySettings) { /** * Test-only check for the invariant that read-only repositories never have any write activities. */ - private static boolean assertReadonlyRepositoriesNotInUseForWrites(ClusterState clusterState) { - for (final var repositoryMetadata : RepositoriesMetadata.get(clusterState).repositories()) { + private static boolean assertReadonlyRepositoriesNotInUseForWrites(ProjectState projectState) { + for (final var repositoryMetadata : RepositoriesMetadata.get(projectState.metadata()).repositories()) { if (isReadOnly(repositoryMetadata.settings())) { try { - ensureRepositoryNotInUseForWrites(clusterState, repositoryMetadata.name()); + ensureRepositoryNotInUseForWrites(projectState, repositoryMetadata.name()); } catch (Exception e) { throw new AssertionError("repository [" + repositoryMetadata + "] is readonly but still in use", e); } @@ -1071,7 +1202,7 @@ public RepositoryUsageStats getUsageStats() { return RepositoryUsageStats.EMPTY; } final var statsByType = new HashMap>(); - for (final var repository : repositories.values()) { + for (final var repository : getRepositories()) { final var repositoryType = repository.getMetadata().type(); final var typeStats = statsByType.computeIfAbsent(repositoryType, ignored -> new HashMap<>()); typeStats.compute(COUNT_USAGE_STATS_NAME, (k, count) -> (count == null ? 0L : count) + 1); @@ -1096,8 +1227,8 @@ protected void doStop() {} protected void doClose() throws IOException { clusterService.removeApplier(this); final Collection repos = new ArrayList<>(); - repos.addAll(internalRepositories.values()); - repos.addAll(repositories.values()); + repos.addAll(internalRepositories.values().stream().map(Map::values).flatMap(Collection::stream).toList()); + repos.addAll(getRepositories()); IOUtils.close(repos); for (Repository repo : repos) { repo.awaitIdle(); diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoryOperation.java b/server/src/main/java/org/elasticsearch/repositories/RepositoryOperation.java index 5357218b03f45..03015941f619c 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoryOperation.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoryOperation.java @@ -13,6 +13,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.core.FixForMultiProject; import java.io.IOException; @@ -41,6 +42,7 @@ public interface RepositoryOperation { * @param projectId The project that the repository belongs to * @param name Name of the repository */ + @FixForMultiProject(description = "move it to its own file") record ProjectRepo(ProjectId projectId, String name) implements Writeable { public ProjectRepo(StreamInput in) throws IOException { @@ -52,6 +54,19 @@ public void writeTo(StreamOutput out) throws IOException { projectId.writeTo(out); out.writeString(name); } + + @Override + public String toString() { + return projectRepoString(projectId, name); + } + } + + static ProjectRepo projectRepo(ProjectId projectId, String repositoryName) { + return new ProjectRepo(projectId, repositoryName); + } + + static String projectRepoString(ProjectId projectId, String repositoryName) { + return "[" + projectId + "][" + repositoryName + "]"; } DiffableUtils.KeySerializer PROJECT_REPO_SERIALIZER = new DiffableUtils.KeySerializer<>() { diff --git a/server/src/main/java/org/elasticsearch/repositories/ResolvedRepositories.java b/server/src/main/java/org/elasticsearch/repositories/ResolvedRepositories.java index ea147f336cb7b..91b9e348bf34c 100644 --- a/server/src/main/java/org/elasticsearch/repositories/ResolvedRepositories.java +++ b/server/src/main/java/org/elasticsearch/repositories/ResolvedRepositories.java @@ -9,7 +9,7 @@ package org.elasticsearch.repositories; -import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.metadata.RepositoriesMetadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.common.Strings; @@ -21,7 +21,7 @@ import java.util.Set; /** - * The result of calling {@link #resolve(ClusterState, String[])} to resolve a description of some snapshot repositories (from a path + * The result of calling {@link #resolve(ProjectMetadata, String[])} to resolve a description of some snapshot repositories (from a path * component of a request to the get-repositories or get-snapshots APIs) against the known repositories in the cluster state: the * {@link RepositoryMetadata} for the extant repositories that match the description, together with a list of the parts of the description * that failed to match any known repository. @@ -38,8 +38,8 @@ public static boolean isMatchAll(String[] patterns) { || (patterns.length == 1 && (ALL_PATTERN.equalsIgnoreCase(patterns[0]) || Regex.isMatchAllPattern(patterns[0]))); } - public static ResolvedRepositories resolve(ClusterState state, String[] patterns) { - final var repositories = RepositoriesMetadata.get(state); + public static ResolvedRepositories resolve(ProjectMetadata projectMetadata, String[] patterns) { + final var repositories = RepositoriesMetadata.get(projectMetadata); if (isMatchAll(patterns)) { return new ResolvedRepositories(repositories.repositories(), List.of()); } diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 9611f42437765..cb7428b30ae73 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -2476,6 +2476,7 @@ private void doGetRepositoryData(ActionListener listener) { // someone switched the repo contents out from under us RepositoriesService.updateRepositoryUuidInMetadata( clusterService, + getProjectId(), metadata.name(), loaded, new ThreadedActionListener<>(threadPool.generic(), listener.map(v -> loaded)) @@ -3115,7 +3116,8 @@ private ClusterState updateRepositoryGenerationsIfNecessary(ClusterState state, } private RepositoryMetadata getRepoMetadata(ClusterState state) { - final RepositoryMetadata repositoryMetadata = RepositoriesMetadata.get(state).repository(metadata.name()); + final RepositoryMetadata repositoryMetadata = RepositoriesMetadata.get(state.getMetadata().getProject(getProjectId())) + .repository(metadata.name()); assert repositoryMetadata != null || lifecycle.stoppedOrClosed() : "did not find metadata for repo [" + metadata.name() + "] in state [" + lifecycleState() + "]"; return repositoryMetadata; diff --git a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java index bf6b078571bfc..f46fb1311d2ae 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -602,7 +602,9 @@ static void refreshRepositoryUuids( return; } - for (Repository repository : repositoriesService.getRepositories().values()) { + @FixForMultiProject + final var projectId = ProjectId.DEFAULT; + for (Repository repository : repositoriesService.getProjectRepositories(projectId).values()) { // We only care about BlobStoreRepositories because they're the only ones that can contain a searchable snapshot, and we // only care about ones with missing UUIDs. It's possible to have the UUID change from under us if, e.g., the repository was // wiped by an external force, but in this case any searchable snapshots are lost anyway so it doesn't really matter. diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 68a935599ed64..d49b670451f5b 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -605,7 +605,7 @@ private void snapshot( throw new IndexShardSnapshotFailedException(shardId, "shard didn't fully recover yet"); } - final Repository repository = repositoriesService.repository(snapshot.getRepository()); + final Repository repository = repositoriesService.repository(snapshot.getProjectId(), snapshot.getRepository()); SnapshotIndexCommit snapshotIndexCommit = null; try { snapshotStatus.updateStatusDescription("acquiring commit reference from IndexShard: triggers a shard flush"); diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index abbf2ab223436..8a1f1ad79a17f 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -1377,7 +1377,7 @@ private void endSnapshot(SnapshotsInProgress.Entry entry, Metadata metadata, @Nu final String repoName = snapshot.getRepository(); if (tryEnterRepoLoop(repoName)) { if (repositoryData == null) { - repositoriesService.repository(repoName) + repositoriesService.repository(snapshot.getProjectId(), repoName) .getRepositoryData( EsExecutors.DIRECT_EXECUTOR_SERVICE, // TODO contemplate threading here, do we need to fork, see #101445? new ActionListener<>() { @@ -1486,7 +1486,7 @@ protected void doRun() { } final String repository = snapshot.getRepository(); final ListenableFuture metadataListener = new ListenableFuture<>(); - final Repository repo = repositoriesService.repository(snapshot.getRepository()); + final Repository repo = repositoriesService.repository(snapshot.getProjectId(), snapshot.getRepository()); if (entry.isClone()) { threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(metadataListener, () -> { final Metadata existingMetadata = repo.getSnapshotGlobalMetadata(entry.source()); @@ -2441,7 +2441,7 @@ public static boolean includeFileInfoWriterUUID(IndexVersion repositoryMetaVersi */ private void deleteSnapshotsFromRepository(SnapshotDeletionsInProgress.Entry deleteEntry, IndexVersion minNodeVersion) { final long expectedRepoGen = deleteEntry.repositoryStateId(); - repositoriesService.getRepositoryData(deleteEntry.repository(), new ActionListener<>() { + repositoriesService.getRepositoryData(deleteEntry.projectId(), deleteEntry.repository(), new ActionListener<>() { @Override public void onResponse(RepositoryData repositoryData) { assert repositoryData.getGenId() == expectedRepoGen @@ -2564,7 +2564,7 @@ private void deleteSnapshotsFromRepository( return; } final SubscribableListener doneFuture = new SubscribableListener<>(); - repositoriesService.repository(deleteEntry.repository()) + repositoriesService.repository(deleteEntry.projectId(), deleteEntry.repository()) .deleteSnapshots(snapshotIds, repositoryData.getGenId(), minNodeVersion, new ActionListener<>() { @Override public void onResponse(RepositoryData updatedRepoData) { @@ -3788,7 +3788,7 @@ private void startExecutableClones(List entries) { entry.source(), clone.getValue(), clone.getKey(), - repositoriesService.repository(entry.repository()) + repositoriesService.repository(entry.projectId(), entry.repository()) ); } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/repositories/reservedstate/ReservedRepositoryActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/repositories/reservedstate/ReservedRepositoryActionTests.java index d62d99aa1c434..cee2c5fd8d41d 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/repositories/reservedstate/ReservedRepositoryActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/repositories/reservedstate/ReservedRepositoryActionTests.java @@ -34,6 +34,7 @@ import static org.hamcrest.Matchers.containsInAnyOrder; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -147,12 +148,12 @@ public Repository create(ProjectId projectId, RepositoryMetadata metadata) { ); doAnswer(invocation -> { - var request = (PutRepositoryRequest) invocation.getArguments()[0]; + var request = (PutRepositoryRequest) invocation.getArguments()[1]; if (request.type().equals("inter_planetary")) { throw new RepositoryException(request.name(), "repository type [" + request.type() + "] does not exist"); } return null; - }).when(repositoriesService).validateRepositoryCanBeCreated(any()); + }).when(repositoriesService).validateRepositoryCanBeCreated(eq(ProjectId.DEFAULT), any()); return repositoriesService; } diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterChangedEventTests.java b/server/src/test/java/org/elasticsearch/cluster/ClusterChangedEventTests.java index ea83c92bfcdd6..88e31884af3e9 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterChangedEventTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterChangedEventTests.java @@ -530,7 +530,7 @@ public void testProjectsDelta() { .metadata(Metadata.builder(state0.metadata()).put(ReservedStateMetadata.builder("test").build())) .build(); ClusterChangedEvent event = new ClusterChangedEvent("test", state1, state0); - assertTrue(event.projectDelta().isEmpty()); + assertTrue(event.projectDelta().hasNoChange()); // Add projects final List projectIds = randomList(1, 5, ESTestCase::randomUniqueProjectId); diff --git a/server/src/test/java/org/elasticsearch/health/node/tracker/RepositoriesHealthTrackerTests.java b/server/src/test/java/org/elasticsearch/health/node/tracker/RepositoriesHealthTrackerTests.java index 561c05165eab5..e565971cef942 100644 --- a/server/src/test/java/org/elasticsearch/health/node/tracker/RepositoriesHealthTrackerTests.java +++ b/server/src/test/java/org/elasticsearch/health/node/tracker/RepositoriesHealthTrackerTests.java @@ -22,7 +22,6 @@ import org.junit.Before; import java.util.List; -import java.util.Map; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -44,7 +43,7 @@ public void setUp() throws Exception { } public void testGetHealthNoRepos() { - when(repositoriesService.getRepositories()).thenReturn(Map.of()); + when(repositoriesService.getRepositories()).thenReturn(List.of()); var health = repositoriesHealthTracker.determineCurrentHealth(); @@ -58,7 +57,7 @@ public void testGetHealthCorrectRepo() { when(metadata.generation()).thenReturn(randomNonNegativeLong()); var repo = mock(Repository.class); when(repo.getMetadata()).thenReturn(metadata); - when(repositoriesService.getRepositories()).thenReturn(Map.of(randomAlphaOfLength(10), repo)); + when(repositoriesService.getRepositories()).thenReturn(List.of(repo)); var health = repositoriesHealthTracker.determineCurrentHealth(); @@ -68,9 +67,7 @@ public void testGetHealthCorrectRepo() { public void testGetHealthUnknownType() { var repo = createRepositoryMetadata(); - when(repositoriesService.getRepositories()).thenReturn( - Map.of(randomAlphaOfLength(10), new UnknownTypeRepository(randomProjectIdOrDefault(), repo)) - ); + when(repositoriesService.getRepositories()).thenReturn(List.of(new UnknownTypeRepository(randomProjectIdOrDefault(), repo))); var health = repositoriesHealthTracker.determineCurrentHealth(); @@ -82,7 +79,7 @@ public void testGetHealthUnknownType() { public void testGetHealthInvalid() { var repo = createRepositoryMetadata(); when(repositoriesService.getRepositories()).thenReturn( - Map.of(repo.name(), new InvalidRepository(randomProjectIdOrDefault(), repo, new RepositoryException(repo.name(), "Test"))) + List.of(new InvalidRepository(randomProjectIdOrDefault(), repo, new RepositoryException(repo.name(), "Test"))) ); var health = repositoriesHealthTracker.determineCurrentHealth(); diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java index 8c4fcd28c9883..96d601f9091ff 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java @@ -23,12 +23,14 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.metadata.RepositoriesMetadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.project.TestProjectResolvers; +import org.elasticsearch.cluster.routing.GlobalRoutingTable; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; @@ -68,8 +70,16 @@ import java.util.concurrent.Executor; import java.util.function.BooleanSupplier; +import static org.hamcrest.Matchers.aMapWithSize; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasItems; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.isA; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Mockito.mock; public class RepositoriesServiceTests extends ESTestCase { @@ -77,6 +87,7 @@ public class RepositoriesServiceTests extends ESTestCase { private ClusterService clusterService; private RepositoriesService repositoriesService; private ThreadPool threadPool; + private ProjectId projectId; @Override public void setUp() throws Exception { @@ -94,6 +105,13 @@ public void setUp() throws Exception { Collections.emptySet() ); clusterService = ClusterServiceUtils.createClusterService(threadPool); + projectId = randomProjectIdOrDefault(); + if (ProjectId.DEFAULT.equals(projectId) == false) { + ClusterServiceUtils.setState( + clusterService, + ClusterState.builder(clusterService.state()).putProjectMetadata(ProjectMetadata.builder(projectId)).build() + ); + } DiscoveryNode localNode = DiscoveryNodeUtils.builder("local").name("local").roles(Set.of(DiscoveryNodeRole.MASTER_ROLE)).build(); NodeClient client = new NodeClient(Settings.EMPTY, threadPool, TestProjectResolvers.alwaysThrow()); @@ -157,9 +175,9 @@ public void tearDown() throws Exception { public void testRegisterInternalRepository() { String repoName = "name"; - expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(repoName)); - repositoriesService.registerInternalRepository(repoName, TestRepository.TYPE); - Repository repository = repositoriesService.repository(repoName); + expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(projectId, repoName)); + repositoriesService.registerInternalRepository(projectId, repoName, TestRepository.TYPE); + Repository repository = repositoriesService.repository(projectId, repoName); assertEquals(repoName, repository.getMetadata().name()); assertEquals(TestRepository.TYPE, repository.getMetadata().type()); assertEquals(Settings.EMPTY, repository.getMetadata().settings()); @@ -168,24 +186,24 @@ public void testRegisterInternalRepository() { public void testUnregisterInternalRepository() { String repoName = "name"; - expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(repoName)); - repositoriesService.registerInternalRepository(repoName, TestRepository.TYPE); - Repository repository = repositoriesService.repository(repoName); + expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(projectId, repoName)); + repositoriesService.registerInternalRepository(projectId, repoName, TestRepository.TYPE); + Repository repository = repositoriesService.repository(projectId, repoName); assertFalse(((TestRepository) repository).isClosed); - repositoriesService.unregisterInternalRepository(repoName); - expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(repoName)); + repositoriesService.unregisterInternalRepository(projectId, repoName); + expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(projectId, repoName)); assertTrue(((TestRepository) repository).isClosed); } public void testRegisterWillNotUpdateIfInternalRepositoryWithNameExists() { String repoName = "name"; - expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(repoName)); - repositoriesService.registerInternalRepository(repoName, TestRepository.TYPE); - Repository repository = repositoriesService.repository(repoName); + expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(projectId, repoName)); + repositoriesService.registerInternalRepository(projectId, repoName, TestRepository.TYPE); + Repository repository = repositoriesService.repository(projectId, repoName); assertFalse(((TestRepository) repository).isClosed); - repositoriesService.registerInternalRepository(repoName, TestRepository.TYPE); + repositoriesService.registerInternalRepository(projectId, repoName, TestRepository.TYPE); assertFalse(((TestRepository) repository).isClosed); - Repository repository2 = repositoriesService.repository(repoName); + Repository repository2 = repositoriesService.repository(projectId, repoName); assertSame(repository, repository2); } @@ -203,11 +221,11 @@ public void testPutRepositoryVerificationFails() { .type(VerificationFailRepository.TYPE) .verify(true); var resultListener = new SubscribableListener(); - repositoriesService.registerRepository(request, resultListener); + repositoriesService.registerRepository(projectId, request, resultListener); var failure = safeAwaitFailure(resultListener); assertThat(failure, isA(RepositoryVerificationException.class)); // also make sure that cluster state does not include failed repo - assertThrows(RepositoryMissingException.class, () -> { repositoriesService.repository(repoName); }); + assertThrows(RepositoryMissingException.class, () -> { repositoriesService.repository(projectId, repoName); }); } public void testPutRepositoryVerificationFailsOnExisting() { @@ -216,7 +234,7 @@ public void testPutRepositoryVerificationFailsOnExisting() { .type(TestRepository.TYPE) .verify(true); var resultListener = new SubscribableListener(); - repositoriesService.registerRepository(request, resultListener); + repositoriesService.registerRepository(projectId, request, resultListener); var ackResponse = safeAwait(resultListener); assertTrue(ackResponse.isAcknowledged()); @@ -225,10 +243,10 @@ public void testPutRepositoryVerificationFailsOnExisting() { .type(VerificationFailRepository.TYPE) .verify(true); resultListener = new SubscribableListener<>(); - repositoriesService.registerRepository(request, resultListener); + repositoriesService.registerRepository(projectId, request, resultListener); var failure = safeAwaitFailure(resultListener); assertThat(failure, isA(RepositoryVerificationException.class)); - var repository = repositoriesService.repository(repoName); + var repository = repositoriesService.repository(projectId, repoName); assertEquals(repository.getMetadata().type(), TestRepository.TYPE); } @@ -238,14 +256,14 @@ public void testPutRepositorySkipVerification() { .type(VerificationFailRepository.TYPE) .verify(false); var resultListener = new SubscribableListener(); - repositoriesService.registerRepository(request, resultListener); + repositoriesService.registerRepository(projectId, request, resultListener); var ackResponse = safeAwait(resultListener); assertTrue(ackResponse.isAcknowledged()); } public void testRepositoriesStatsCanHaveTheSameNameAndDifferentTypeOverTime() { String repoName = "name"; - expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(repoName)); + expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(projectId, repoName)); ClusterState clusterStateWithRepoTypeA = createClusterStateWithRepo(repoName, MeteredRepositoryTypeA.TYPE); @@ -276,7 +294,7 @@ public void testHandlesUnknownRepositoryTypeWhenApplyingClusterState() { var clusterState = createClusterStateWithRepo(repoName, "unknown"); repositoriesService.applyClusterState(new ClusterChangedEvent("starting", clusterState, emptyState())); - var repo = repositoriesService.repository(repoName); + var repo = repositoriesService.repository(projectId, repoName); assertThat(repo, isA(UnknownTypeRepository.class)); } @@ -288,7 +306,7 @@ public void testRemoveUnknownRepositoryTypeWhenApplyingClusterState() { repositoriesService.applyClusterState(new ClusterChangedEvent("removing repo", emptyState(), clusterState)); assertThat( - expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(repoName)).getMessage(), + expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(projectId, repoName)).getMessage(), equalTo("[" + repoName + "] missing") ); } @@ -297,7 +315,7 @@ public void testRegisterRepositoryFailsForUnknownType() { var repoName = randomAlphaOfLengthBetween(10, 25); var request = new PutRepositoryRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT).name(repoName).type("unknown"); - repositoriesService.registerRepository(request, new ActionListener<>() { + repositoriesService.registerRepository(projectId, request, new ActionListener<>() { @Override public void onResponse(AcknowledgedResponse acknowledgedResponse) { fail("Should not register unknown repository type"); @@ -306,7 +324,10 @@ public void onResponse(AcknowledgedResponse acknowledgedResponse) { @Override public void onFailure(Exception e) { assertThat(e, isA(RepositoryException.class)); - assertThat(e.getMessage(), equalTo("[" + repoName + "] repository type [unknown] does not exist for project [default]")); + assertThat( + e.getMessage(), + equalTo("[" + repoName + "] repository type [unknown] does not exist for project [" + projectId + "]") + ); } }); } @@ -318,7 +339,7 @@ public void testHandlesCreationFailureWhenApplyingClusterState() { var clusterState = createClusterStateWithRepo(repoName, UnstableRepository.TYPE); repositoriesService.applyClusterState(new ClusterChangedEvent("put unstable repository", clusterState, emptyState())); - var repo = repositoriesService.repository(repoName); + var repo = repositoriesService.repository(projectId, repoName); assertThat(repo, isA(InvalidRepository.class)); } @@ -329,12 +350,12 @@ public void testReplaceInvalidRepositoryWhenCreationSuccess() { var clusterState = createClusterStateWithRepo(repoName, UnstableRepository.TYPE); repositoriesService.applyClusterState(new ClusterChangedEvent("put unstable repository", clusterState, emptyState())); - var repo = repositoriesService.repository(repoName); + var repo = repositoriesService.repository(projectId, repoName); assertThat(repo, isA(InvalidRepository.class)); clusterState = createClusterStateWithRepo(repoName, TestRepository.TYPE); repositoriesService.applyClusterState(new ClusterChangedEvent("put test repository", clusterState, emptyState())); - repo = repositoriesService.repository(repoName); + repo = repositoriesService.repository(projectId, repoName); assertThat(repo, isA(TestRepository.class)); } @@ -346,7 +367,7 @@ public void testRemoveInvalidRepositoryTypeWhenApplyingClusterState() { repositoriesService.applyClusterState(new ClusterChangedEvent("put unstable repository", clusterState, emptyState())); repositoriesService.applyClusterState(new ClusterChangedEvent("removing repo", emptyState(), clusterState)); assertThat( - expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(repoName)).getMessage(), + expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(projectId, repoName)).getMessage(), equalTo("[" + repoName + "] missing") ); } @@ -370,17 +391,17 @@ public void testRegisterRepositorySuccessAfterCreationFailed() { var clusterState = createClusterStateWithRepo(repoName, UnstableRepository.TYPE); repositoriesService.applyClusterState(new ClusterChangedEvent("put unstable repository", clusterState, emptyState())); - var repo = repositoriesService.repository(repoName); + var repo = repositoriesService.repository(projectId, repoName); assertThat(repo, isA(InvalidRepository.class)); // 2. repository creation successfully when current node become master node and repository is put again var request = new PutRepositoryRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT).name(repoName).type(TestRepository.TYPE); var resultListener = new SubscribableListener(); - repositoriesService.registerRepository(request, resultListener); + repositoriesService.registerRepository(projectId, request, resultListener); var response = safeAwait(resultListener); assertTrue(response.isAcknowledged()); - assertThat(repositoriesService.repository(repoName), isA(TestRepository.class)); + assertThat(repositoriesService.repository(projectId, repoName), isA(TestRepository.class)); } public void testCannotSetRepositoryReadonlyFlagDuringGenerationChange() { @@ -393,19 +414,21 @@ public void testCannotSetRepositoryReadonlyFlagDuringGenerationChange() { .newForked( l -> repositoriesService.registerRepository( + projectId, new PutRepositoryRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, repoName).type(TestRepository.TYPE), l.map(ignored -> null) ) ) .andThen(l -> updateGenerations(repoName, originalGeneration, newGeneration, l)) .andThenAccept(ignored -> { - final var metadata = repositoriesService.repository(repoName).getMetadata(); + final var metadata = repositoriesService.repository(projectId, repoName).getMetadata(); assertEquals(originalGeneration, metadata.generation()); assertEquals(newGeneration, metadata.pendingGeneration()); assertNull(metadata.settings().getAsBoolean(BlobStoreRepository.READONLY_SETTING_KEY, null)); }) .andThen( l -> repositoriesService.registerRepository( + projectId, new PutRepositoryRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, repoName).type(TestRepository.TYPE) .settings(Settings.builder().put(BlobStoreRepository.READONLY_SETTING_KEY, true)), ActionTestUtils.assertNoSuccessListener(e -> { @@ -425,20 +448,21 @@ public void testCannotSetRepositoryReadonlyFlagDuringGenerationChange() { ) ) .andThenAccept(ignored -> { - final var metadata = repositoriesService.repository(repoName).getMetadata(); + final var metadata = repositoriesService.repository(projectId, repoName).getMetadata(); assertEquals(originalGeneration, metadata.generation()); assertEquals(newGeneration, metadata.pendingGeneration()); assertNull(metadata.settings().getAsBoolean(BlobStoreRepository.READONLY_SETTING_KEY, null)); }) .andThen(l -> updateGenerations(repoName, newGeneration, newGeneration, l)) .andThenAccept(ignored -> { - final var metadata = repositoriesService.repository(repoName).getMetadata(); + final var metadata = repositoriesService.repository(projectId, repoName).getMetadata(); assertEquals(newGeneration, metadata.generation()); assertEquals(newGeneration, metadata.pendingGeneration()); assertNull(metadata.settings().getAsBoolean(BlobStoreRepository.READONLY_SETTING_KEY, null)); }) .andThen( l -> repositoriesService.registerRepository( + projectId, new PutRepositoryRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, repoName).type(TestRepository.TYPE) .settings(Settings.builder().put(BlobStoreRepository.READONLY_SETTING_KEY, true)), l.map(ignored -> null) @@ -446,7 +470,7 @@ public void testCannotSetRepositoryReadonlyFlagDuringGenerationChange() { ) .andThenAccept( ignored -> assertTrue( - repositoriesService.repository(repoName) + repositoriesService.repository(projectId, repoName) .getMetadata() .settings() .getAsBoolean(BlobStoreRepository.READONLY_SETTING_KEY, null) @@ -455,17 +479,120 @@ public void testCannotSetRepositoryReadonlyFlagDuringGenerationChange() { ); } + public void testRepositoryUpdatesForMultipleProjects() { + assertThat(repositoriesService.getRepositories(), empty()); + // 1. Initial project + final var repoName = "repo"; + final var state0 = createClusterStateWithRepo(repoName, TestRepository.TYPE); + repositoriesService.applyClusterState(new ClusterChangedEvent("test", state0, emptyState())); + assertThat(repositoriesService.getProjectRepositories(projectId), aMapWithSize(1)); + final var initialProjectRepo = (TestRepository) repositoriesService.getProjectRepositories(projectId).values().iterator().next(); + assertThat(repositoriesService.getRepositories(), contains(initialProjectRepo)); + if (ProjectId.DEFAULT.equals(projectId) == false) { + assertFalse(repositoriesService.hasRepositoryTrackingForProject(ProjectId.DEFAULT)); + } + + // 2. Add a new project + final var anotherProjectId = randomUniqueProjectId(); + final var anotherRepoName = "another-repo"; + final var state1 = ClusterState.builder(state0) + .putProjectMetadata( + ProjectMetadata.builder(anotherProjectId) + .putCustom( + RepositoriesMetadata.TYPE, + new RepositoriesMetadata( + List.of( + new RepositoryMetadata(repoName, TestRepository.TYPE, Settings.EMPTY), + new RepositoryMetadata(anotherRepoName, TestRepository.TYPE, Settings.EMPTY) + ) + ) + ) + ) + .build(); + repositoriesService.applyClusterState(new ClusterChangedEvent("test", state1, state0)); + assertThat(repositoriesService.getProjectRepositories(anotherProjectId), aMapWithSize(2)); + assertThat(repositoriesService.getRepositories(), hasSize(3)); + assertThat(repositoriesService.getRepositories(), hasItem(initialProjectRepo)); + final Collection anotherProjectRepos = repositoriesService.getProjectRepositories(anotherProjectId).values(); + assertThat(repositoriesService.getRepositories(), hasItems(anotherProjectRepos.toArray(Repository[]::new))); + + // 3. Update existing project + assertFalse(initialProjectRepo.isClosed); + final var state2 = ClusterState.builder(state1) + .putProjectMetadata( + ProjectMetadata.builder(projectId) + .putCustom( + RepositoriesMetadata.TYPE, + new RepositoriesMetadata( + List.of( + new RepositoryMetadata(repoName, TestRepository.TYPE, Settings.builder().put("foo", "bar").build()), + new RepositoryMetadata(anotherRepoName, TestRepository.TYPE, Settings.EMPTY) + ) + ) + ) + ) + .build(); + repositoriesService.applyClusterState(new ClusterChangedEvent("test", state2, state1)); + assertTrue(initialProjectRepo.isClosed); + assertThat(repositoriesService.getProjectRepositories(projectId), aMapWithSize(2)); + assertThat(repositoriesService.getRepositories(), hasSize(4)); + assertThat( + repositoriesService.getRepositories(), + hasItems(repositoriesService.getProjectRepositories(projectId).values().toArray(Repository[]::new)) + ); + assertThat(repositoriesService.getRepositories(), hasItems(anotherProjectRepos.toArray(Repository[]::new))); + + // 4. Remove another project + anotherProjectRepos.forEach(repo -> assertFalse(((TestRepository) repo).isClosed)); + final var state3 = ClusterState.builder(state2) + .metadata(Metadata.builder(state2.metadata()).removeProject(anotherProjectId)) + .routingTable(GlobalRoutingTable.builder(state2.globalRoutingTable()).removeProject(anotherProjectId).build()) + .build(); + repositoriesService.applyClusterState(new ClusterChangedEvent("test", state3, state2)); + anotherProjectRepos.forEach(repo -> assertTrue(((TestRepository) repo).isClosed)); + assertFalse(repositoriesService.hasRepositoryTrackingForProject(anotherProjectId)); + assertThat(repositoriesService.getRepositories(), hasSize(2)); + assertThat( + repositoriesService.getRepositories(), + hasItems(repositoriesService.getProjectRepositories(projectId).values().toArray(Repository[]::new)) + ); + } + + public void testInternalRepositoryForMultiProjects() { + assertThat(repositoriesService.getRepositories(), empty()); + String repoName = "name"; + repositoriesService.registerInternalRepository(projectId, repoName, TestRepository.TYPE); + final TestRepository initialProjectRepo = (TestRepository) repositoriesService.repository(projectId, repoName); + + // Repo of the same name but different project is a different repository instance + final var anotherProjectId = randomUniqueProjectId(); + repositoriesService.registerInternalRepository(anotherProjectId, repoName, TestRepository.TYPE); + final TestRepository anotherProjectRepo = (TestRepository) repositoriesService.repository(anotherProjectId, repoName); + assertThat(initialProjectRepo, not(sameInstance(anotherProjectRepo))); + + // Remove the project repository, the repo should be closed and the project is removed from tracking + repositoriesService.unregisterInternalRepository(projectId, repoName); + assertFalse(repositoriesService.hasRepositoryTrackingForProject(projectId)); + assertTrue(initialProjectRepo.isClosed); + assertThat(repositoriesService.repository(anotherProjectId, repoName), sameInstance(anotherProjectRepo)); + assertTrue(anotherProjectRepo.isStarted); + } + private void updateGenerations(String repositoryName, long safeGeneration, long pendingGeneration, ActionListener listener) { clusterService.submitUnbatchedStateUpdateTask("update repo generations", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { - return new ClusterState.Builder(currentState).metadata( - Metadata.builder(currentState.metadata()) - .putCustom( - RepositoriesMetadata.TYPE, - RepositoriesMetadata.get(currentState).withUpdatedGeneration(repositoryName, safeGeneration, pendingGeneration) - ) - ).build(); + final ProjectMetadata projectMetadata = currentState.getMetadata().getProject(projectId); + return ClusterState.builder(currentState) + .putProjectMetadata( + ProjectMetadata.builder(projectMetadata) + .putCustom( + RepositoriesMetadata.TYPE, + RepositoriesMetadata.get(projectMetadata) + .withUpdatedGeneration(repositoryName, safeGeneration, pendingGeneration) + ) + ) + .build(); } @Override @@ -482,12 +609,13 @@ public void clusterStateProcessed(ClusterState initialState, ClusterState newSta private ClusterState createClusterStateWithRepo(String repoName, String repoType) { ClusterState.Builder state = ClusterState.builder(new ClusterName("test")); - Metadata.Builder mdBuilder = Metadata.builder(); - mdBuilder.putCustom( - RepositoriesMetadata.TYPE, - new RepositoriesMetadata(Collections.singletonList(new RepositoryMetadata(repoName, repoType, Settings.EMPTY))) + state.putProjectMetadata( + ProjectMetadata.builder(projectId) + .putCustom( + RepositoriesMetadata.TYPE, + new RepositoriesMetadata(Collections.singletonList(new RepositoryMetadata(repoName, repoType, Settings.EMPTY))) + ) ); - state.metadata(mdBuilder); return state.build(); } @@ -500,6 +628,7 @@ private void assertThrowsOnRegister(String repoName) { expectThrows( RepositoryException.class, () -> repositoriesService.registerRepository( + projectId, new PutRepositoryRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, repoName), null ) @@ -625,7 +754,7 @@ public IndexShardSnapshotStatus.Copy getShardSnapshotStatus(SnapshotId snapshotI @Override public void updateState(final ClusterState state) { - metadata = RepositoriesMetadata.get(state).repository(metadata.name()); + metadata = RepositoriesMetadata.get(state.metadata().getProject(getProjectId())).repository(metadata.name()); } @Override diff --git a/server/src/test/java/org/elasticsearch/repositories/ResolvedRepositoriesTests.java b/server/src/test/java/org/elasticsearch/repositories/ResolvedRepositoriesTests.java index 69521529cd039..4df62531230b5 100644 --- a/server/src/test/java/org/elasticsearch/repositories/ResolvedRepositoriesTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/ResolvedRepositoriesTests.java @@ -10,6 +10,8 @@ package org.elasticsearch.repositories; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.metadata.RepositoriesMetadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.common.Strings; @@ -23,25 +25,36 @@ public class ResolvedRepositoriesTests extends ESTestCase { + private ProjectId projectId; + + @Override + public void setUp() throws Exception { + super.setUp(); + projectId = randomProjectIdOrDefault(); + } + public void testAll() { runMatchAllTest(); runMatchAllTest("*"); runMatchAllTest("_all"); } - private static void runMatchAllTest(String... patterns) { + private void runMatchAllTest(String... patterns) { final var state = clusterStateWithRepositories(randomList(1, 4, ESTestCase::randomIdentifier).toArray(String[]::new)); final var result = getRepositories(state, patterns); - assertEquals(RepositoriesMetadata.get(state).repositories(), result.repositoryMetadata()); + assertEquals(RepositoriesMetadata.get(state.metadata().getProject(projectId)).repositories(), result.repositoryMetadata()); assertThat(result.missing(), Matchers.empty()); assertFalse(result.hasMissingRepositories()); } public void testMatchingName() { final var state = clusterStateWithRepositories(randomList(1, 4, ESTestCase::randomIdentifier).toArray(String[]::new)); - final var name = randomFrom(RepositoriesMetadata.get(state).repositories()).name(); + final var name = randomFrom(RepositoriesMetadata.get(state.metadata().getProject(projectId)).repositories()).name(); final var result = getRepositories(state, name); - assertEquals(List.of(RepositoriesMetadata.get(state).repository(name)), result.repositoryMetadata()); + assertEquals( + List.of(RepositoriesMetadata.get(state.metadata().getProject(projectId)).repository(name)), + result.repositoryMetadata() + ); assertThat(result.missing(), Matchers.empty()); assertFalse(result.hasMissingRepositories()); } @@ -49,7 +62,7 @@ public void testMatchingName() { public void testMismatchingName() { final var state = clusterStateWithRepositories(randomList(1, 4, ESTestCase::randomIdentifier).toArray(String[]::new)); final var notAName = randomValueOtherThanMany( - n -> RepositoriesMetadata.get(state).repositories().stream().anyMatch(m -> n.equals(m.name())), + n -> RepositoriesMetadata.get(state.metadata().getProject(projectId)).repositories().stream().anyMatch(m -> n.equals(m.name())), ESTestCase::randomIdentifier ); final var result = getRepositories(state, notAName); @@ -70,25 +83,29 @@ public void testWildcards() { runWildcardTest(state, List.of("other-repo", "test-match-1", "test-match-2"), "other-repo", "test-*", "-*-exclude"); } - private static void runWildcardTest(ClusterState clusterState, List expectedNames, String... patterns) { + private void runWildcardTest(ClusterState clusterState, List expectedNames, String... patterns) { final var result = getRepositories(clusterState, patterns); final var description = Strings.format("%s should yield %s", Arrays.toString(patterns), expectedNames); assertFalse(description, result.hasMissingRepositories()); assertEquals(description, expectedNames, result.repositoryMetadata().stream().map(RepositoryMetadata::name).toList()); } - private static ResolvedRepositories getRepositories(ClusterState clusterState, String... patterns) { - return ResolvedRepositories.resolve(clusterState, patterns); + private ResolvedRepositories getRepositories(ClusterState clusterState, String... patterns) { + return ResolvedRepositories.resolve(clusterState.metadata().getProject(projectId), patterns); } - private static ClusterState clusterStateWithRepositories(String... repoNames) { + private ClusterState clusterStateWithRepositories(String... repoNames) { final var repositories = new ArrayList(repoNames.length); for (final var repoName : repoNames) { repositories.add(new RepositoryMetadata(repoName, "test", Settings.EMPTY)); } - return ClusterState.EMPTY_STATE.copyAndUpdateMetadata( - b -> b.putCustom(RepositoriesMetadata.TYPE, new RepositoriesMetadata(repositories)) - ); + return ClusterState.EMPTY_STATE.copyAndUpdateMetadata(b -> { + ProjectMetadata.Builder projectBuilder = b.getProject(projectId); + if (projectBuilder == null) { + projectBuilder = ProjectMetadata.builder(projectId); + } + b.put(projectBuilder.putCustom(RepositoriesMetadata.TYPE, new RepositoriesMetadata(repositories))); + }); } } diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java index 044b054ad873a..6c20831969738 100644 --- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java @@ -12,6 +12,7 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.tests.util.TestUtil; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; @@ -21,6 +22,7 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.core.IOUtils; import org.elasticsearch.env.Environment; import org.elasticsearch.env.TestEnvironment; @@ -219,11 +221,13 @@ public void testSnapshotWithConflictingName() throws Exception { /** Create a {@link Repository} with a random name **/ private Repository createRepository() { + @FixForMultiProject(description = "randomize when snapshot and restore support multiple projects, see also ES-10225, ES-10228") + final ProjectId projectId = ProjectId.DEFAULT; Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build(); RepositoryMetadata repositoryMetadata = new RepositoryMetadata(randomAlphaOfLength(10), FsRepository.TYPE, settings); - final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(repositoryMetadata); + final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(projectId, repositoryMetadata); final FsRepository repository = new FsRepository( - randomProjectIdOrDefault(), + projectId, repositoryMetadata, createEnvironment(), xContentRegistry(), diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java index 076be2cd27e84..eb0b545712982 100644 --- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.RepositoriesMetadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.cluster.service.ClusterService; @@ -520,11 +521,12 @@ private RepositoryData addRandomSnapshotsToRepoData(RepositoryData repoData, boo } public void testEnsureUploadListenerIsResolvedWhenAFileSnapshotTaskFails() throws Exception { + final ProjectId projectId = randomProjectIdOrDefault(); Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build(); RepositoryMetadata repositoryMetadata = new RepositoryMetadata(randomAlphaOfLength(10), FsRepository.TYPE, settings); - final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(repositoryMetadata); + final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(projectId, repositoryMetadata); final FsRepository repository = new FsRepository( - randomProjectIdOrDefault(), + projectId, repositoryMetadata, createEnvironment(), xContentRegistry(), diff --git a/server/src/test/java/org/elasticsearch/snapshots/RestoreServiceTests.java b/server/src/test/java/org/elasticsearch/snapshots/RestoreServiceTests.java index 259fc6fea6935..e45dc4e3d8cd5 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/RestoreServiceTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/RestoreServiceTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.cluster.metadata.DataStreamTestHelper; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; @@ -259,7 +260,7 @@ public void testRefreshRepositoryUuidsRefreshesAsNeeded() { } final RepositoriesService repositoriesService = mock(RepositoriesService.class); - when(repositoriesService.getRepositories()).thenReturn(repositories); + when(repositoriesService.getProjectRepositories(eq(ProjectId.DEFAULT))).thenReturn(repositories); final AtomicBoolean completed = new AtomicBoolean(); RestoreService.refreshRepositoryUuids( true, diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 572c7110beaee..be3d04be54c5a 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -2767,7 +2767,14 @@ public boolean clusterHasFeature(ClusterState state, NodeFeature feature) { ); actions.put( TransportPutRepositoryAction.TYPE, - new TransportPutRepositoryAction(transportService, clusterService, repositoriesService, threadPool, actionFilters) + new TransportPutRepositoryAction( + transportService, + clusterService, + repositoriesService, + threadPool, + actionFilters, + TestProjectResolvers.DEFAULT_PROJECT_ONLY + ) ); actions.put( TransportCleanupRepositoryAction.TYPE, diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java index e9a50c12eae0a..0a16a7f23e382 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java @@ -398,14 +398,14 @@ public static ClusterService mockClusterService() { * @param repositoryMetadata RepositoryMetadata to initialize the cluster state with * @return Mock ClusterService */ - public static ClusterService mockClusterService(RepositoryMetadata repositoryMetadata) { + public static ClusterService mockClusterService(ProjectId projectId, RepositoryMetadata repositoryMetadata) { return mockClusterService( ClusterState.builder(ClusterState.EMPTY_STATE) .metadata( - Metadata.builder() + Metadata.builder(ClusterState.EMPTY_STATE.metadata()) .clusterUUID(UUIDs.randomBase64UUID(random())) .put( - ProjectMetadata.builder(ProjectId.DEFAULT) + ProjectMetadata.builder(projectId) .putCustom( RepositoriesMetadata.TYPE, new RepositoriesMetadata(Collections.singletonList(repositoryMetadata)) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryAction.java index 61922ee0b8878..433a85eee8293 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryAction.java @@ -12,7 +12,9 @@ import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.TransportAction; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.tasks.Task; @@ -45,7 +47,9 @@ public TransportDeleteInternalRepositoryAction( @Override protected void doExecute(Task task, DeleteInternalCcrRepositoryRequest request, ActionListener listener) { - repositoriesService.unregisterInternalRepository(request.getName()); + @FixForMultiProject + final var projectId = ProjectId.DEFAULT; + repositoriesService.unregisterInternalRepository(projectId, request.getName()); listener.onResponse(ActionResponse.Empty.INSTANCE); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryAction.java index ee44b43037603..581bf8486337b 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryAction.java @@ -12,7 +12,9 @@ import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.TransportAction; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.tasks.Task; @@ -45,7 +47,9 @@ public TransportPutInternalRepositoryAction( @Override protected void doExecute(Task task, PutInternalCcrRepositoryRequest request, ActionListener listener) { - repositoriesService.registerInternalRepository(request.getName(), request.getType()); + @FixForMultiProject + final var projectId = ProjectId.DEFAULT; + repositoriesService.registerInternalRepository(projectId, request.getName(), request.getType()); listener.onResponse(ActionResponse.Empty.INSTANCE); } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotShardTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotShardTests.java index 207ac4f38aed6..12a464fc683e6 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotShardTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotShardTests.java @@ -571,7 +571,7 @@ private Environment createEnvironment() { private Repository createRepository(ProjectId projectId) { Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build(); RepositoryMetadata repositoryMetadata = new RepositoryMetadata(randomAlphaOfLength(10), FsRepository.TYPE, settings); - final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(repositoryMetadata); + final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(projectId, repositoryMetadata); final Repository repository = new FsRepository( projectId, repositoryMetadata, diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/RepositorySupplier.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/RepositorySupplier.java index 63522ce2309a1..8777851a99315 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/RepositorySupplier.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/RepositorySupplier.java @@ -9,6 +9,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.core.Nullable; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; @@ -46,14 +48,16 @@ public BlobStoreRepository get() { } private Repository getRepository() { + @FixForMultiProject + final var projectId = ProjectId.DEFAULT; if (repositoryUuid == null) { // repository containing pre-7.12 snapshots has no UUID so we assume it matches by name - final Repository repository = repositoriesService.repository(repositoryName); + final Repository repository = repositoriesService.repository(projectId, repositoryName); assert repository.getMetadata().name().equals(repositoryName) : repository.getMetadata().name() + " vs " + repositoryName; return repository; } - final Map repositoriesByName = repositoriesService.getRepositories(); + final Map repositoriesByName = repositoriesService.getProjectRepositories(projectId); final String currentRepositoryNameHint = repositoryNameHint; final Repository repositoryByLastKnownName = repositoriesByName.get(currentRepositoryNameHint); diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectoryTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectoryTests.java index c090c715cafd8..35d71ba23e283 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectoryTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectoryTests.java @@ -41,6 +41,7 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.blobcache.shared.SharedBlobCacheService; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.common.CheckedBiConsumer; import org.elasticsearch.common.UUIDs; @@ -592,8 +593,9 @@ private void testDirectories( repositorySettings.build() ); + final ProjectId projectId = randomProjectIdOrDefault(); final BlobStoreRepository repository = new FsRepository( - randomProjectIdOrDefault(), + projectId, repositoryMetadata, new Environment( Settings.builder() @@ -604,7 +606,7 @@ private void testDirectories( null ), NamedXContentRegistry.EMPTY, - BlobStoreTestUtil.mockClusterService(repositoryMetadata), + BlobStoreTestUtil.mockClusterService(projectId, repositoryMetadata), MockBigArrays.NON_RECYCLING_INSTANCE, new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)) ); From d2f4f3d5bdcbd44b8fd88ee983ab475aee7d8b1e Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Mon, 23 Jun 2025 13:26:38 +1000 Subject: [PATCH 2/9] more test --- .../elasticsearch/cluster/ClusterChangedEventTests.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterChangedEventTests.java b/server/src/test/java/org/elasticsearch/cluster/ClusterChangedEventTests.java index 88e31884af3e9..9a7e94cee4774 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterChangedEventTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterChangedEventTests.java @@ -26,6 +26,7 @@ import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexVersion; @@ -531,6 +532,7 @@ public void testProjectsDelta() { .build(); ClusterChangedEvent event = new ClusterChangedEvent("test", state1, state0); assertTrue(event.projectDelta().hasNoChange()); + assertThat(event.projectDelta().common(), equalTo(Set.of(ProjectId.DEFAULT))); // Add projects final List projectIds = randomList(1, 5, ESTestCase::randomUniqueProjectId); @@ -542,6 +544,7 @@ public void testProjectsDelta() { event = new ClusterChangedEvent("test", state2, state1); assertThat(event.projectDelta().added(), containsInAnyOrder(projectIds.toArray())); assertThat(event.projectDelta().removed(), empty()); + assertThat(event.projectDelta().common(), equalTo(Set.of(ProjectId.DEFAULT))); // Add more projects and delete one final var removedProjectIds = randomNonEmptySubsetOf(projectIds); @@ -561,6 +564,10 @@ public void testProjectsDelta() { event = new ClusterChangedEvent("test", state3, state2); assertThat(event.projectDelta().added(), containsInAnyOrder(moreProjectIds.toArray())); assertThat(event.projectDelta().removed(), containsInAnyOrder(removedProjectIds.toArray())); + assertThat( + event.projectDelta().common(), + equalTo(Sets.union(Sets.difference(Set.copyOf(projectIds), Set.copyOf(removedProjectIds)), Set.of(ProjectId.DEFAULT))) + ); // Remove all projects final List remainingProjects = state3.metadata() @@ -579,6 +586,7 @@ public void testProjectsDelta() { event = new ClusterChangedEvent("test", state4, state3); assertThat(event.projectDelta().added(), empty()); assertThat(event.projectDelta().removed(), containsInAnyOrder(remainingProjects.toArray())); + assertThat(event.projectDelta().common(), equalTo(Set.of(ProjectId.DEFAULT))); } private static class CustomClusterMetadata2 extends TestClusterCustomMetadata { From 2e32d6c99384ef36319c758f4eb5ab04f373bb5b Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Mon, 23 Jun 2025 16:36:56 +1000 Subject: [PATCH 3/9] fix and mute tests --- .../repositories/RepositoriesService.java | 2 ++ .../repositories/VerifyNodeRepositoryAction.java | 8 ++++++-- .../repositories/blobstore/BlobStoreRepository.java | 2 +- .../snapshots/SnapshotResiliencyTests.java | 3 ++- .../sourceonly/SourceOnlySnapshotShardTests.java | 10 ++++++---- .../build.gradle | 4 +++- 6 files changed, 20 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java index ebb085fb00b6f..a421ef5bf9fcf 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java @@ -45,6 +45,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ListenableFuture; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Nullable; @@ -229,6 +230,7 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState) .>newForked(verifyRepositoryStep -> { if (taskResult.ackResponse.isAcknowledged() && taskResult.changed) { + final ThreadContext threadContext = threadPool.getThreadContext(); verifyRepository(projectId, request.name(), verifyRepositoryStep); } else { verifyRepositoryStep.onResponse(null); diff --git a/server/src/main/java/org/elasticsearch/repositories/VerifyNodeRepositoryAction.java b/server/src/main/java/org/elasticsearch/repositories/VerifyNodeRepositoryAction.java index a2d4f2921555c..f84adbe407b7a 100644 --- a/server/src/main/java/org/elasticsearch/repositories/VerifyNodeRepositoryAction.java +++ b/server/src/main/java/org/elasticsearch/repositories/VerifyNodeRepositoryAction.java @@ -17,6 +17,7 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -39,6 +40,7 @@ public static class TransportAction extends HandledTransportAction listener) { DiscoveryNode localNode = clusterService.state().nodes().getLocalNode(); try { - Repository repository = repositoriesService.repository(request.repository); + Repository repository = repositoriesService.repository(projectResolver.getProjectId(), request.repository); repository.verify(request.verificationToken, localNode); listener.onResponse(ActionResponse.Empty.INSTANCE); } catch (Exception e) { diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index cb7428b30ae73..996c9cda4deab 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -2407,7 +2407,7 @@ private ClusterState getClusterStateWithUpdatedRepositoryGeneration(ClusterState if (repoMetadata.generation() != RepositoryData.UNKNOWN_REPO_GEN) { throw new RepositoryException(repoMetadata.name(), "Found unexpected initialized repo metadata [" + repoMetadata + "]"); } - final var project = currentState.metadata().getDefaultProject(); + final var project = currentState.metadata().getProject(getProjectId()); return ClusterState.builder(currentState) .putProjectMetadata( ProjectMetadata.builder(project) diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index be3d04be54c5a..66f2712580f71 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -2520,7 +2520,8 @@ public RecyclerBytesStreamOutput newNetworkBytesStream() { actionFilters, threadPool, clusterService, - repositoriesService + repositoriesService, + TestProjectResolvers.DEFAULT_PROJECT_ONLY ) ); actions.put( diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotShardTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotShardTests.java index 12a464fc683e6..fbb7ddfb6024c 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotShardTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotShardTests.java @@ -44,6 +44,7 @@ import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.core.IOUtils; import org.elasticsearch.env.Environment; import org.elasticsearch.env.TestEnvironment; @@ -95,6 +96,7 @@ import static org.elasticsearch.cluster.routing.TestShardRouting.shardRoutingBuilder; import static org.hamcrest.Matchers.equalTo; +@FixForMultiProject(description = "Randomizing projectId once snapshot and restore support multiple projects, ES-10225, ES-10228") public class SourceOnlySnapshotShardTests extends IndexShardTestCase { public void testSourceIncomplete() throws IOException { @@ -123,7 +125,7 @@ public void testSourceIncomplete() throws IOException { } SnapshotId snapshotId = new SnapshotId("test", "test"); IndexId indexId = new IndexId(shard.shardId().getIndexName(), shard.shardId().getIndex().getUUID()); - final var projectId = randomProjectIdOrDefault(); + final var projectId = ProjectId.DEFAULT; SourceOnlySnapshotRepository repository = new SourceOnlySnapshotRepository(createRepository(projectId)); assertThat(repository.getProjectId(), equalTo(projectId)); repository.start(); @@ -174,7 +176,7 @@ public void testSourceIncompleteSyntheticSourceNoDoc() throws IOException { recoverShardFromStore(shard); SnapshotId snapshotId = new SnapshotId("test", "test"); IndexId indexId = new IndexId(shard.shardId().getIndexName(), shard.shardId().getIndex().getUUID()); - final var projectId = randomProjectIdOrDefault(); + final var projectId = ProjectId.DEFAULT; SourceOnlySnapshotRepository repository = new SourceOnlySnapshotRepository(createRepository(projectId)); assertThat(repository.getProjectId(), equalTo(projectId)); repository.start(); @@ -215,7 +217,7 @@ public void testIncrementalSnapshot() throws IOException { } IndexId indexId = new IndexId(shard.shardId().getIndexName(), shard.shardId().getIndex().getUUID()); - final var projectId = randomProjectIdOrDefault(); + final var projectId = ProjectId.DEFAULT; SourceOnlySnapshotRepository repository = new SourceOnlySnapshotRepository(createRepository(projectId)); assertThat(repository.getProjectId(), equalTo(projectId)); repository.start(); @@ -344,7 +346,7 @@ public void testRestoreMinimal() throws IOException { } SnapshotId snapshotId = new SnapshotId("test", "test"); IndexId indexId = new IndexId(shard.shardId().getIndexName(), shard.shardId().getIndex().getUUID()); - final var projectId = randomProjectIdOrDefault(); + final var projectId = ProjectId.DEFAULT; SourceOnlySnapshotRepository repository = new SourceOnlySnapshotRepository(createRepository(projectId)); assertThat(repository.getProjectId(), equalTo(projectId)); repository.start(); diff --git a/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/build.gradle b/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/build.gradle index 39115556d290f..960c79d6348f9 100644 --- a/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/build.gradle +++ b/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/build.gradle @@ -47,8 +47,10 @@ tasks.named("yamlRestTest").configure { ArrayList blacklist = [ /* These tests don't work on multi-project yet - we need to go through each of them and make them work */ '^cat.recovery/*/*', + '^cat.repositories/*/*', '^cat.snapshots/*/*', '^cluster.desired_balance/10_basic/*', + '^cluster.stats/10_basic/snapshot stats reported in get cluster stats', '^data_stream/40_supported_apis/Verify shard stores api', // uses _shard_stores API '^health/10_basic/*', '^indices.get_alias/10_basic/Get alias against closed indices', // Does NOT work with security enabled, see also core-rest-tests-with-security @@ -61,7 +63,7 @@ tasks.named("yamlRestTest").configure { '^snapshot.create/*/*', '^snapshot.delete/*/*', '^snapshot.get/*/*', - '^snapshot.get_repository/20_repository_uuid/*', + '^snapshot.get_repository/*/*', '^snapshot.restore/*/*', '^snapshot.status/*/*', '^synonyms/*/*', From 272266dd5810cb83f27deb7fe4259ca79fd50fac Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Mon, 23 Jun 2025 17:13:34 +1000 Subject: [PATCH 4/9] tweak --- .../cluster/ClusterChangedEvent.java | 44 ++++++++++++++----- .../repositories/RepositoriesService.java | 6 +-- .../cluster/ClusterChangedEventTests.java | 40 ++++++++++------- 3 files changed, 61 insertions(+), 29 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java b/server/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java index e572fa8335dfe..065229b586363 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java @@ -18,6 +18,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.core.Nullable; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.index.Index; @@ -242,11 +243,16 @@ public boolean nodesChanged() { return nodesRemoved() || nodesAdded(); } - /** - * Returns the {@link ProjectsDelta} between the previous cluster state and the new cluster state. - */ - public ProjectsDelta projectDelta() { - return projectsDelta; + public Set addedProjects() { + return projectsDelta.added(); + } + + public Set removedProjects() { + return projectsDelta.removed(); + } + + public Set commonProjects() { + return projectsDelta.common(state); } /** @@ -354,7 +360,7 @@ private static ProjectsDelta calculateProjectDelta(Metadata previousMetadata, Me && previousMetadata.hasProject(ProjectId.DEFAULT) && currentMetadata.projects().size() == 1 && currentMetadata.hasProject(ProjectId.DEFAULT))) { - return ProjectsDelta.NO_CHANGE_DEFAULT_PROJECT; + return ProjectsDelta.NO_CHANGE; } final Set currentProjectIds = currentMetadata.projects().keySet(); @@ -376,7 +382,7 @@ private static ProjectsDelta calculateProjectDelta(Metadata previousMetadata, Me // assert removed.contains(ProjectId.DEFAULT) == false; if (added.isEmpty() && removed.isEmpty()) { - return new ProjectsDelta(Set.of(), Set.of(), currentProjectIds); + return ProjectsDelta.NO_CHANGE; } else { return new ProjectsDelta( Collections.unmodifiableSet(added), @@ -386,12 +392,30 @@ private static ProjectsDelta calculateProjectDelta(Metadata previousMetadata, Me } } - public record ProjectsDelta(Set added, Set removed, Set common) { + private record ProjectsDelta( + Set added, + Set removed, + @Nullable Set common // null if all projects are common + ) { - private static final ProjectsDelta NO_CHANGE_DEFAULT_PROJECT = new ProjectsDelta(Set.of(), Set.of(), Set.of(ProjectId.DEFAULT)); + private static final ProjectsDelta NO_CHANGE = new ProjectsDelta(Set.of(), Set.of(), null); - public boolean hasNoChange() { + private boolean hasNoChange() { return added.isEmpty() && removed.isEmpty(); } + + @Override + public Set common() { + throw new UnsupportedOperationException("Use common(ClusterState state) instead"); + } + + Set common(ClusterState state) { + if (common == null) { + assert hasNoChange() : this; + return state.metadata().projects().keySet(); + } else { + return common; + } + } } } diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java index a421ef5bf9fcf..6dd2e01e0430d 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java @@ -659,15 +659,15 @@ public void applyClusterState(ClusterChangedEvent event) { final ClusterState state = event.state(); final ClusterState previousState = event.previousState(); - for (var projectId : event.projectDelta().removed()) { // removed projects + for (var projectId : event.removedProjects()) { // removed projects applyProjectState(state.version(), null, previousState.projectState(projectId)); } - for (var projectId : event.projectDelta().added()) { // added projects + for (var projectId : event.addedProjects()) { // added projects applyProjectState(state.version(), state.projectState(projectId), null); } - for (var projectId : event.projectDelta().common()) { // existing projects + for (var projectId : event.commonProjects()) { // existing projects applyProjectState(state.version(), state.projectState(projectId), previousState.projectState(projectId)); } } catch (Exception ex) { diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterChangedEventTests.java b/server/src/test/java/org/elasticsearch/cluster/ClusterChangedEventTests.java index 9a7e94cee4774..9fb649a153cf8 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterChangedEventTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterChangedEventTests.java @@ -531,8 +531,9 @@ public void testProjectsDelta() { .metadata(Metadata.builder(state0.metadata()).put(ReservedStateMetadata.builder("test").build())) .build(); ClusterChangedEvent event = new ClusterChangedEvent("test", state1, state0); - assertTrue(event.projectDelta().hasNoChange()); - assertThat(event.projectDelta().common(), equalTo(Set.of(ProjectId.DEFAULT))); + assertThat(event.addedProjects(), empty()); + assertThat(event.removedProjects(), empty()); + assertThat(event.commonProjects(), equalTo(Set.of(ProjectId.DEFAULT))); // Add projects final List projectIds = randomList(1, 5, ESTestCase::randomUniqueProjectId); @@ -542,9 +543,9 @@ public void testProjectsDelta() { } final var state2 = ClusterState.builder(state1).metadata(metadataBuilder.build()).build(); event = new ClusterChangedEvent("test", state2, state1); - assertThat(event.projectDelta().added(), containsInAnyOrder(projectIds.toArray())); - assertThat(event.projectDelta().removed(), empty()); - assertThat(event.projectDelta().common(), equalTo(Set.of(ProjectId.DEFAULT))); + assertThat(event.addedProjects(), containsInAnyOrder(projectIds.toArray())); + assertThat(event.removedProjects(), empty()); + assertThat(event.commonProjects(), equalTo(Set.of(ProjectId.DEFAULT))); // Add more projects and delete one final var removedProjectIds = randomNonEmptySubsetOf(projectIds); @@ -562,31 +563,38 @@ public void testProjectsDelta() { final var state3 = ClusterState.builder(state2).metadata(metadataBuilder.build()).routingTable(routingTableBuilder.build()).build(); event = new ClusterChangedEvent("test", state3, state2); - assertThat(event.projectDelta().added(), containsInAnyOrder(moreProjectIds.toArray())); - assertThat(event.projectDelta().removed(), containsInAnyOrder(removedProjectIds.toArray())); + assertThat(event.addedProjects(), containsInAnyOrder(moreProjectIds.toArray())); + assertThat(event.removedProjects(), containsInAnyOrder(removedProjectIds.toArray())); assertThat( - event.projectDelta().common(), + event.commonProjects(), equalTo(Sets.union(Sets.difference(Set.copyOf(projectIds), Set.copyOf(removedProjectIds)), Set.of(ProjectId.DEFAULT))) ); + // An update without project membership changes + final var state4 = ClusterState.builder(state3).version(state3.version() + 1).build(); + event = new ClusterChangedEvent("test", state4, state3); + assertThat(event.addedProjects(), empty()); + assertThat(event.removedProjects(), empty()); + assertThat(event.commonProjects(), equalTo(state4.metadata().projects().keySet())); + // Remove all projects - final List remainingProjects = state3.metadata() + final List remainingProjects = state4.metadata() .projects() .keySet() .stream() .filter(projectId -> ProjectId.DEFAULT.equals(projectId) == false) .toList(); - metadataBuilder = Metadata.builder(state3.metadata()); - routingTableBuilder = GlobalRoutingTable.builder(state3.globalRoutingTable()); + metadataBuilder = Metadata.builder(state4.metadata()); + routingTableBuilder = GlobalRoutingTable.builder(state4.globalRoutingTable()); for (ProjectId projectId : remainingProjects) { metadataBuilder.removeProject(projectId); routingTableBuilder.removeProject(projectId); } - final var state4 = ClusterState.builder(state3).metadata(metadataBuilder.build()).routingTable(routingTableBuilder.build()).build(); - event = new ClusterChangedEvent("test", state4, state3); - assertThat(event.projectDelta().added(), empty()); - assertThat(event.projectDelta().removed(), containsInAnyOrder(remainingProjects.toArray())); - assertThat(event.projectDelta().common(), equalTo(Set.of(ProjectId.DEFAULT))); + final var state5 = ClusterState.builder(state4).metadata(metadataBuilder.build()).routingTable(routingTableBuilder.build()).build(); + event = new ClusterChangedEvent("test", state5, state4); + assertThat(event.addedProjects(), empty()); + assertThat(event.removedProjects(), containsInAnyOrder(remainingProjects.toArray())); + assertThat(event.commonProjects(), equalTo(Set.of(ProjectId.DEFAULT))); } private static class CustomClusterMetadata2 extends TestClusterCustomMetadata { From c317b60583aa9d933d562e4e2b653330f69ae984 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Mon, 23 Jun 2025 18:36:02 +1000 Subject: [PATCH 5/9] mute --- .../core-rest-tests-with-multiple-projects/build.gradle | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/build.gradle b/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/build.gradle index 960c79d6348f9..f064c97ff296e 100644 --- a/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/build.gradle +++ b/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/build.gradle @@ -59,6 +59,7 @@ tasks.named("yamlRestTest").configure { '^indices.resolve_cluster/*/*/*', '^indices.shard_stores/*/*', '^migration/*/*', + '^nodes.stats/70_repository_throttling_stats/Repository throttling stats (some repositories exist)', '^snapshot.clone/*/*', '^snapshot.create/*/*', '^snapshot.delete/*/*', From ca0ea7a10cb676deccd184059427a66140d1a2b8 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Tue, 24 Jun 2025 10:32:05 +1000 Subject: [PATCH 6/9] revert changes to ClusterChangedEvent --- .../cluster/ClusterChangedEvent.java | 74 ++++--------------- .../repositories/RepositoriesService.java | 11 ++- .../cluster/ClusterChangedEventTests.java | 40 +++------- 3 files changed, 36 insertions(+), 89 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java b/server/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java index 065229b586363..8285900f0b00f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java @@ -18,7 +18,6 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.common.util.set.Sets; -import org.elasticsearch.core.Nullable; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.index.Index; @@ -243,16 +242,11 @@ public boolean nodesChanged() { return nodesRemoved() || nodesAdded(); } - public Set addedProjects() { - return projectsDelta.added(); - } - - public Set removedProjects() { - return projectsDelta.removed(); - } - - public Set commonProjects() { - return projectsDelta.common(state); + /** + * Returns the {@link ProjectsDelta} between the previous cluster state and the new cluster state. + */ + public ProjectsDelta projectDelta() { + return projectsDelta; } /** @@ -360,62 +354,26 @@ private static ProjectsDelta calculateProjectDelta(Metadata previousMetadata, Me && previousMetadata.hasProject(ProjectId.DEFAULT) && currentMetadata.projects().size() == 1 && currentMetadata.hasProject(ProjectId.DEFAULT))) { - return ProjectsDelta.NO_CHANGE; - } - - final Set currentProjectIds = currentMetadata.projects().keySet(); - final Set previousProjectIds = previousMetadata.projects().keySet(); - - final var added = new HashSet(); - final var common = new HashSet(); - for (var projectId : currentProjectIds) { - if (previousProjectIds.contains(projectId)) { - common.add(projectId); - } else { - added.add(projectId); - } + return ProjectsDelta.EMPTY; } - final Set removed = Sets.difference(previousProjectIds, currentProjectIds); + final Set added = Collections.unmodifiableSet( + Sets.difference(currentMetadata.projects().keySet(), previousMetadata.projects().keySet()) + ); + final Set removed = Collections.unmodifiableSet( + Sets.difference(previousMetadata.projects().keySet(), currentMetadata.projects().keySet()) + ); // TODO: Enable the following assertions once tests no longer add or remove default projects // assert added.contains(ProjectId.DEFAULT) == false; // assert removed.contains(ProjectId.DEFAULT) == false; - - if (added.isEmpty() && removed.isEmpty()) { - return ProjectsDelta.NO_CHANGE; - } else { - return new ProjectsDelta( - Collections.unmodifiableSet(added), - Collections.unmodifiableSet(removed), - Collections.unmodifiableSet(common) - ); - } + return new ProjectsDelta(added, removed); } - private record ProjectsDelta( - Set added, - Set removed, - @Nullable Set common // null if all projects are common - ) { - - private static final ProjectsDelta NO_CHANGE = new ProjectsDelta(Set.of(), Set.of(), null); + public record ProjectsDelta(Set added, Set removed) { + private static final ProjectsDelta EMPTY = new ProjectsDelta(Set.of(), Set.of()); - private boolean hasNoChange() { + public boolean isEmpty() { return added.isEmpty() && removed.isEmpty(); } - - @Override - public Set common() { - throw new UnsupportedOperationException("Use common(ClusterState state) instead"); - } - - Set common(ClusterState state) { - if (common == null) { - assert hasNoChange() : this; - return state.metadata().projects().keySet(); - } else { - return common; - } - } } } diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java index 6dd2e01e0430d..40b89539d8b78 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java @@ -46,6 +46,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ListenableFuture; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Nullable; @@ -659,15 +660,19 @@ public void applyClusterState(ClusterChangedEvent event) { final ClusterState state = event.state(); final ClusterState previousState = event.previousState(); - for (var projectId : event.removedProjects()) { // removed projects + for (var projectId : event.projectDelta().removed()) { // removed projects applyProjectState(state.version(), null, previousState.projectState(projectId)); } - for (var projectId : event.addedProjects()) { // added projects + for (var projectId : event.projectDelta().added()) { // added projects applyProjectState(state.version(), state.projectState(projectId), null); } - for (var projectId : event.commonProjects()) { // existing projects + // existing projects + final var common = event.projectDelta().added().isEmpty() + ? state.metadata().projects().keySet() + : Sets.difference(state.metadata().projects().keySet(), event.projectDelta().added()); + for (var projectId : common) { applyProjectState(state.version(), state.projectState(projectId), previousState.projectState(projectId)); } } catch (Exception ex) { diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterChangedEventTests.java b/server/src/test/java/org/elasticsearch/cluster/ClusterChangedEventTests.java index 9fb649a153cf8..ea83c92bfcdd6 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterChangedEventTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterChangedEventTests.java @@ -26,7 +26,6 @@ import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexVersion; @@ -531,9 +530,7 @@ public void testProjectsDelta() { .metadata(Metadata.builder(state0.metadata()).put(ReservedStateMetadata.builder("test").build())) .build(); ClusterChangedEvent event = new ClusterChangedEvent("test", state1, state0); - assertThat(event.addedProjects(), empty()); - assertThat(event.removedProjects(), empty()); - assertThat(event.commonProjects(), equalTo(Set.of(ProjectId.DEFAULT))); + assertTrue(event.projectDelta().isEmpty()); // Add projects final List projectIds = randomList(1, 5, ESTestCase::randomUniqueProjectId); @@ -543,9 +540,8 @@ public void testProjectsDelta() { } final var state2 = ClusterState.builder(state1).metadata(metadataBuilder.build()).build(); event = new ClusterChangedEvent("test", state2, state1); - assertThat(event.addedProjects(), containsInAnyOrder(projectIds.toArray())); - assertThat(event.removedProjects(), empty()); - assertThat(event.commonProjects(), equalTo(Set.of(ProjectId.DEFAULT))); + assertThat(event.projectDelta().added(), containsInAnyOrder(projectIds.toArray())); + assertThat(event.projectDelta().removed(), empty()); // Add more projects and delete one final var removedProjectIds = randomNonEmptySubsetOf(projectIds); @@ -563,38 +559,26 @@ public void testProjectsDelta() { final var state3 = ClusterState.builder(state2).metadata(metadataBuilder.build()).routingTable(routingTableBuilder.build()).build(); event = new ClusterChangedEvent("test", state3, state2); - assertThat(event.addedProjects(), containsInAnyOrder(moreProjectIds.toArray())); - assertThat(event.removedProjects(), containsInAnyOrder(removedProjectIds.toArray())); - assertThat( - event.commonProjects(), - equalTo(Sets.union(Sets.difference(Set.copyOf(projectIds), Set.copyOf(removedProjectIds)), Set.of(ProjectId.DEFAULT))) - ); - - // An update without project membership changes - final var state4 = ClusterState.builder(state3).version(state3.version() + 1).build(); - event = new ClusterChangedEvent("test", state4, state3); - assertThat(event.addedProjects(), empty()); - assertThat(event.removedProjects(), empty()); - assertThat(event.commonProjects(), equalTo(state4.metadata().projects().keySet())); + assertThat(event.projectDelta().added(), containsInAnyOrder(moreProjectIds.toArray())); + assertThat(event.projectDelta().removed(), containsInAnyOrder(removedProjectIds.toArray())); // Remove all projects - final List remainingProjects = state4.metadata() + final List remainingProjects = state3.metadata() .projects() .keySet() .stream() .filter(projectId -> ProjectId.DEFAULT.equals(projectId) == false) .toList(); - metadataBuilder = Metadata.builder(state4.metadata()); - routingTableBuilder = GlobalRoutingTable.builder(state4.globalRoutingTable()); + metadataBuilder = Metadata.builder(state3.metadata()); + routingTableBuilder = GlobalRoutingTable.builder(state3.globalRoutingTable()); for (ProjectId projectId : remainingProjects) { metadataBuilder.removeProject(projectId); routingTableBuilder.removeProject(projectId); } - final var state5 = ClusterState.builder(state4).metadata(metadataBuilder.build()).routingTable(routingTableBuilder.build()).build(); - event = new ClusterChangedEvent("test", state5, state4); - assertThat(event.addedProjects(), empty()); - assertThat(event.removedProjects(), containsInAnyOrder(remainingProjects.toArray())); - assertThat(event.commonProjects(), equalTo(Set.of(ProjectId.DEFAULT))); + final var state4 = ClusterState.builder(state3).metadata(metadataBuilder.build()).routingTable(routingTableBuilder.build()).build(); + event = new ClusterChangedEvent("test", state4, state3); + assertThat(event.projectDelta().added(), empty()); + assertThat(event.projectDelta().removed(), containsInAnyOrder(remainingProjects.toArray())); } private static class CustomClusterMetadata2 extends TestClusterCustomMetadata { From 75f2b754a2191766ccc65c1523d0d18aba3da8e9 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Tue, 24 Jun 2025 11:05:52 +1000 Subject: [PATCH 7/9] expand annotations --- .../repositories/reservedstate/ReservedRepositoryAction.java | 4 ++-- .../snapshots/status/TransportSnapshotsStatusAction.java | 2 +- .../org/elasticsearch/repositories/IndexSnapshotsService.java | 2 +- .../org/elasticsearch/repositories/RepositoryOperation.java | 1 - .../main/java/org/elasticsearch/snapshots/RestoreService.java | 2 +- .../repositories/DeleteInternalCcrRepositoryAction.java | 2 +- .../action/repositories/PutInternalCcrRepositoryAction.java | 2 +- .../xpack/searchablesnapshots/store/RepositorySupplier.java | 2 +- 8 files changed, 8 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/reservedstate/ReservedRepositoryAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/reservedstate/ReservedRepositoryAction.java index 3a307c9712ceb..35ee668347169 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/reservedstate/ReservedRepositoryAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/reservedstate/ReservedRepositoryAction.java @@ -62,7 +62,7 @@ public Collection prepare(Object input) { for (var repositoryRequest : repositories) { validate(repositoryRequest); RepositoriesService.validateRepositoryName(repositoryRequest.name()); - @FixForMultiProject + @FixForMultiProject(description = "resolve the actual projectId, ES-10479") final var projectId = ProjectId.DEFAULT; repositoriesService.validateRepositoryCanBeCreated(projectId, repositoryRequest); } @@ -76,7 +76,7 @@ public TransformState transform(List source, TransformStat ClusterState state = prevState.state(); - @FixForMultiProject + @FixForMultiProject(description = "resolve the actual projectId, ES-10479") final var projectId = ProjectId.DEFAULT; for (var request : requests) { RepositoriesService.RegisterRepositoryTask task = new RepositoriesService.RegisterRepositoryTask( diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java index bba666dce0a62..fbb3815eab079 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java @@ -299,7 +299,7 @@ private void loadRepositoryData( ) { final Set requestedSnapshotNames = Sets.newHashSet(request.snapshots()); final ListenableFuture repositoryDataListener = new ListenableFuture<>(); - @FixForMultiProject + @FixForMultiProject(description = "resolve the actual projectId, ES-10166") final var projectId = ProjectId.DEFAULT; repositoriesService.getRepositoryData(projectId, repositoryName, repositoryDataListener); final Collection snapshotIdsToLoad = new ArrayList<>(); diff --git a/server/src/main/java/org/elasticsearch/repositories/IndexSnapshotsService.java b/server/src/main/java/org/elasticsearch/repositories/IndexSnapshotsService.java index 01c7f521c3306..c46012e8a418c 100644 --- a/server/src/main/java/org/elasticsearch/repositories/IndexSnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/repositories/IndexSnapshotsService.java @@ -137,7 +137,7 @@ public void getLatestSuccessfulSnapshotForShard( } private Repository getRepository(String repositoryName) { - @FixForMultiProject + @FixForMultiProject(description = "resolve the actual projectId, ES-12176") final var projectId = ProjectId.DEFAULT; return repositoriesService.repositoryOrNull(projectId, repositoryName); } diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoryOperation.java b/server/src/main/java/org/elasticsearch/repositories/RepositoryOperation.java index 03015941f619c..1ebf32db4593d 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoryOperation.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoryOperation.java @@ -42,7 +42,6 @@ public interface RepositoryOperation { * @param projectId The project that the repository belongs to * @param name Name of the repository */ - @FixForMultiProject(description = "move it to its own file") record ProjectRepo(ProjectId projectId, String name) implements Writeable { public ProjectRepo(StreamInput in) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java index f46fb1311d2ae..f57db79d4625d 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -602,7 +602,7 @@ static void refreshRepositoryUuids( return; } - @FixForMultiProject + @FixForMultiProject(description = "resolve the actual projectId, ES-10228") final var projectId = ProjectId.DEFAULT; for (Repository repository : repositoriesService.getProjectRepositories(projectId).values()) { // We only care about BlobStoreRepositories because they're the only ones that can contain a searchable snapshot, and we diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryAction.java index 433a85eee8293..4e101e6b0d24e 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryAction.java @@ -47,7 +47,7 @@ public TransportDeleteInternalRepositoryAction( @Override protected void doExecute(Task task, DeleteInternalCcrRepositoryRequest request, ActionListener listener) { - @FixForMultiProject + @FixForMultiProject(description = "resolve the actual projectId, ES-12139") final var projectId = ProjectId.DEFAULT; repositoriesService.unregisterInternalRepository(projectId, request.getName()); listener.onResponse(ActionResponse.Empty.INSTANCE); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryAction.java index 581bf8486337b..a96771d29b14d 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryAction.java @@ -47,7 +47,7 @@ public TransportPutInternalRepositoryAction( @Override protected void doExecute(Task task, PutInternalCcrRepositoryRequest request, ActionListener listener) { - @FixForMultiProject + @FixForMultiProject(description = "resolve the actual projectId, ES-12139") final var projectId = ProjectId.DEFAULT; repositoriesService.registerInternalRepository(projectId, request.getName(), request.getType()); listener.onResponse(ActionResponse.Empty.INSTANCE); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/RepositorySupplier.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/RepositorySupplier.java index 8777851a99315..9994b90c59ba2 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/RepositorySupplier.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/RepositorySupplier.java @@ -48,7 +48,7 @@ public BlobStoreRepository get() { } private Repository getRepository() { - @FixForMultiProject + @FixForMultiProject(description = "resolve the actual projectId, ES-12138") final var projectId = ProjectId.DEFAULT; if (repositoryUuid == null) { // repository containing pre-7.12 snapshots has no UUID so we assume it matches by name From bf1b67d4bf0c32e10b70150eea48f30e8d723c09 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Tue, 24 Jun 2025 01:13:57 +0000 Subject: [PATCH 8/9] [CI] Auto commit changes from spotless --- .../java/org/elasticsearch/repositories/RepositoryOperation.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoryOperation.java b/server/src/main/java/org/elasticsearch/repositories/RepositoryOperation.java index 1ebf32db4593d..e6bdf5bc5efc8 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoryOperation.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoryOperation.java @@ -13,7 +13,6 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.core.FixForMultiProject; import java.io.IOException; From f62e3fe169c6a75823b3b3a100510a0c5d0ac781 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Tue, 24 Jun 2025 11:28:20 +1000 Subject: [PATCH 9/9] refactor --- .../repositories/RepositoriesService.java | 82 ++++++++++++------- 1 file changed, 51 insertions(+), 31 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java index 40b89539d8b78..6248e74bee109 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java @@ -661,11 +661,11 @@ public void applyClusterState(ClusterChangedEvent event) { final ClusterState previousState = event.previousState(); for (var projectId : event.projectDelta().removed()) { // removed projects - applyProjectState(state.version(), null, previousState.projectState(projectId)); + applyProjectStateForRemovedProject(state.version(), previousState.projectState(projectId)); } for (var projectId : event.projectDelta().added()) { // added projects - applyProjectState(state.version(), state.projectState(projectId), null); + applyProjectStateForAddedOrExistingProject(state.version(), state.projectState(projectId), null); } // existing projects @@ -673,7 +673,11 @@ public void applyClusterState(ClusterChangedEvent event) { ? state.metadata().projects().keySet() : Sets.difference(state.metadata().projects().keySet(), event.projectDelta().added()); for (var projectId : common) { - applyProjectState(state.version(), state.projectState(projectId), previousState.projectState(projectId)); + applyProjectStateForAddedOrExistingProject( + state.version(), + state.projectState(projectId), + previousState.projectState(projectId) + ); } } catch (Exception ex) { assert false : new AssertionError(ex); @@ -682,30 +686,41 @@ public void applyClusterState(ClusterChangedEvent event) { } /** - * Apply changes for one project. The project can be either newly added, removed or an existing one. + * Apply changes for one removed project. * * @param version The cluster state version of the change. - * @param state The current project state, or {@code null} if the project was removed. - * @param previousState the previous project state, or {@code null} if the project was newly added. + * @param previousState The previous project state for the removed project. */ - private void applyProjectState(long version, @Nullable ProjectState state, @Nullable ProjectState previousState) { - assert state != null || previousState != null : "state and previousState cannot both be null"; - assert state == null || assertReadonlyRepositoriesNotInUseForWrites(state); + private void applyProjectStateForRemovedProject(long version, ProjectState previousState) { + final var projectId = previousState.projectId(); + assert ProjectId.DEFAULT.equals(projectId) == false : "default project cannot be removed"; + final var survivors = closeRemovedRepositories(version, projectId, getProjectRepositories(projectId), RepositoriesMetadata.EMPTY); + assert survivors.isEmpty() : "expect no repositories for removed project [" + projectId + "], but got " + survivors.keySet(); + repositories.remove(projectId); + } - final var projectId = state != null ? state.projectId() : previousState.projectId(); - assert ProjectId.DEFAULT.equals(projectId) == false || (state != null && previousState != null) - : "default project cannot be added or removed"; + /** + * Apply changes for one project. The project can be either newly added or an existing one. + * + * @param version The cluster state version of the change. + * @param state The current project state + * @param previousState The previous project state, or {@code null} if the project was newly added. + */ + private void applyProjectStateForAddedOrExistingProject(long version, ProjectState state, @Nullable ProjectState previousState) { + assert assertReadonlyRepositoriesNotInUseForWrites(state); + final var projectId = state.projectId(); + assert ProjectId.DEFAULT.equals(projectId) == false || previousState != null : "default project cannot be added"; assert previousState == null || projectId.equals(previousState.projectId()) : "current and previous states must refer to the same project, but got " + projectId + " != " + previousState.projectId(); - final RepositoriesMetadata newMetadata = state == null ? RepositoriesMetadata.EMPTY : RepositoriesMetadata.get(state.metadata()); + final RepositoriesMetadata newMetadata = RepositoriesMetadata.get(state.metadata()); final RepositoriesMetadata oldMetadata = previousState == null ? RepositoriesMetadata.EMPTY : RepositoriesMetadata.get(previousState.metadata()); final Map projectRepositories = getProjectRepositories(projectId); // Check if repositories got changed - if (state != null && oldMetadata.equalsIgnoreGenerations(newMetadata)) { + if (oldMetadata.equalsIgnoreGenerations(newMetadata)) { for (Repository repo : projectRepositories.values()) { repo.updateState(state.cluster()); } @@ -714,24 +729,8 @@ private void applyProjectState(long version, @Nullable ProjectState state, @Null logger.trace("processing new index repositories for project [{}] and state version [{}]", projectId, version); - Map survivors = new HashMap<>(); // First, remove repositories that are no longer there - for (Map.Entry entry : projectRepositories.entrySet()) { - if (newMetadata.repository(entry.getKey()) == null) { - logger.debug("unregistering repository {}", projectRepoString(projectId, entry.getKey())); - Repository repository = entry.getValue(); - closeRepository(repository); - archiveRepositoryStats(repository, version); - } else { - survivors.put(entry.getKey(), entry.getValue()); - } - } - - if (state == null) { // removed project - assert survivors.isEmpty() : "expect no repositories for removed project [" + projectId + "], but got " + survivors.keySet(); - repositories.remove(projectId); - return; - } + final var survivors = closeRemovedRepositories(version, projectId, projectRepositories, newMetadata); Map builder = new HashMap<>(); @@ -787,6 +786,26 @@ private void applyProjectState(long version, @Nullable ProjectState state, @Null } } + private Map closeRemovedRepositories( + long version, + ProjectId projectId, + Map projectRepositories, + RepositoriesMetadata newMetadata + ) { + Map survivors = new HashMap<>(); + for (Map.Entry entry : projectRepositories.entrySet()) { + if (newMetadata.repository(entry.getKey()) == null) { + logger.debug("unregistering repository {}", projectRepoString(projectId, entry.getKey())); + Repository repository = entry.getValue(); + closeRepository(repository); + archiveRepositoryStats(repository, version); + } else { + survivors.put(entry.getKey(), entry.getValue()); + } + } + return survivors; + } + private static boolean canUpdateInPlace(RepositoryMetadata updatedMetadata, Repository repository) { assert updatedMetadata.name().equals(repository.getMetadata().name()); return repository.getMetadata().type().equals(updatedMetadata.type()) @@ -1118,6 +1137,7 @@ public static boolean isReadOnly(Settings repositorySettings) { * Test-only check for the invariant that read-only repositories never have any write activities. */ private static boolean assertReadonlyRepositoriesNotInUseForWrites(ProjectState projectState) { + assert projectState != null; for (final var repositoryMetadata : RepositoriesMetadata.get(projectState.metadata()).repositories()) { if (isReadOnly(repositoryMetadata.settings())) { try {