Skip to content

Commit e75e94a

Browse files
ywangdmridula-s109
authored andcommitted
Snapshots support multi-project (elastic#130000)
This PR makes snapshot service code and APIs multi-project compatible. Resolves: ES-10225 Resolves: ES-10226
1 parent d112013 commit e75e94a

File tree

57 files changed

+805
-505
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+805
-505
lines changed

modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/TransportDeleteDataStreamActionTests.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.cluster.metadata.DataStream;
1616
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
1717
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
18+
import org.elasticsearch.cluster.metadata.ProjectId;
1819
import org.elasticsearch.common.Strings;
1920
import org.elasticsearch.common.collect.ImmutableOpenMap;
2021
import org.elasticsearch.common.settings.Settings;
@@ -138,8 +139,9 @@ public void testDeleteSnapshottingDataStream() {
138139
List.of(new Tuple<>(dataStreamName, 2), new Tuple<>(dataStreamName2, 2)),
139140
otherIndices
140141
);
141-
SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.EMPTY.withAddedEntry(createEntry(dataStreamName, "repo1", false))
142-
.withAddedEntry(createEntry(dataStreamName2, "repo2", true));
142+
SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.EMPTY.withAddedEntry(
143+
createEntry(dataStreamName, projectId, "repo1", false)
144+
).withAddedEntry(createEntry(dataStreamName2, projectId, "repo2", true));
143145
ClusterState snapshotCs = ClusterState.builder(cs).putCustom(SnapshotsInProgress.TYPE, snapshotsInProgress).build();
144146

145147
DeleteDataStreamAction.Request req = new DeleteDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { dataStreamName });
@@ -157,9 +159,9 @@ public void testDeleteSnapshottingDataStream() {
157159
);
158160
}
159161

160-
private SnapshotsInProgress.Entry createEntry(String dataStreamName, String repo, boolean partial) {
162+
private SnapshotsInProgress.Entry createEntry(String dataStreamName, ProjectId projectId, String repo, boolean partial) {
161163
return SnapshotsInProgress.Entry.snapshot(
162-
new Snapshot(repo, new SnapshotId("", "")),
164+
new Snapshot(projectId, repo, new SnapshotId("", "")),
163165
false,
164166
partial,
165167
SnapshotsInProgress.State.SUCCESS,

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,7 @@ public void finalizeSnapshot(final FinalizeSnapshotContext finalizeSnapshotConte
391391
if (SnapshotsService.useShardGenerations(finalizeSnapshotContext.repositoryMetaVersion()) == false) {
392392
final ListenableFuture<Void> metadataDone = new ListenableFuture<>();
393393
wrappedFinalizeContext = new FinalizeSnapshotContext(
394+
finalizeSnapshotContext.serializeProjectMetadata(),
394395
finalizeSnapshotContext.updatedShardGenerations(),
395396
finalizeSnapshotContext.repositoryStateId(),
396397
finalizeSnapshotContext.clusterMetadata(),

server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -890,7 +890,7 @@ private static BlobStoreIndexShardSnapshots readShardGeneration(
890890
ShardGeneration generation
891891
) throws IOException {
892892
return BlobStoreRepository.INDEX_SHARD_SNAPSHOTS_FORMAT.read(
893-
repository.getMetadata().name(),
893+
repository.getProjectRepo(),
894894
repository.shardContainer(repositoryShardId.index(), repositoryShardId.shardId()),
895895
generation.getGenerationUUID(),
896896
NamedXContentRegistry.EMPTY

server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -820,15 +820,16 @@ public void testSnapshotWithMissingShardLevelIndexFile() throws Exception {
820820
"fallback message",
821821
"org.elasticsearch.repositories.blobstore.BlobStoreRepository",
822822
Level.ERROR,
823-
"index [test-idx-1/*] shard generation [*] in [test-repo][*] not found - falling back to reading all shard snapshots"
823+
"index [test-idx-1/*] shard generation [*] in [default/test-repo][*] not found "
824+
+ "- falling back to reading all shard snapshots"
824825
)
825826
);
826827
mockLog.addExpectation(
827828
new MockLog.SeenEventExpectation(
828829
"shard blobs list",
829830
"org.elasticsearch.repositories.blobstore.BlobStoreRepository",
830831
Level.ERROR,
831-
"read shard snapshots [*] due to missing shard generation [*] for index [test-idx-1/*] in [test-repo][*]"
832+
"read shard snapshots [*] due to missing shard generation [*] for index [test-idx-1/*] in [default/test-repo][*]"
832833
)
833834
);
834835
if (repairWithDelete) {

server/src/internalClusterTest/java/org/elasticsearch/snapshots/MetadataLoadingDuringSnapshotRestoreIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,9 +189,9 @@ public CountingMockRepository(
189189
}
190190

191191
@Override
192-
public Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId) {
192+
public Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId, boolean fromProjectMetadata) {
193193
globalMetadata.computeIfAbsent(snapshotId.getName(), (s) -> new AtomicInteger(0)).incrementAndGet();
194-
return super.getSnapshotGlobalMetadata(snapshotId);
194+
return super.getSnapshotGlobalMetadata(snapshotId, fromProjectMetadata);
195195
}
196196

197197
@Override

server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotThrottlingIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ public void testWarningSpeedOverRecovery() throws Exception {
141141
"snapshot speed over recovery speed",
142142
"org.elasticsearch.repositories.blobstore.BlobStoreRepository",
143143
Level.WARN,
144-
"repository [test-repo] has a rate limit [max_snapshot_bytes_per_sec=1gb] per second which is above "
144+
"repository [default/test-repo] has a rate limit [max_snapshot_bytes_per_sec=1gb] per second which is above "
145145
+ "the effective recovery rate limit [indices.recovery.max_bytes_per_sec=100mb] per second, thus the repository "
146146
+ "rate limit will be superseded by the recovery rate limit"
147147
);
@@ -152,7 +152,7 @@ public void testWarningSpeedOverRecovery() throws Exception {
152152
"snapshot restore speed over recovery speed",
153153
"org.elasticsearch.repositories.blobstore.BlobStoreRepository",
154154
Level.WARN,
155-
"repository [test-repo] has a rate limit [max_restore_bytes_per_sec=2gb] per second which is above "
155+
"repository [default/test-repo] has a rate limit [max_restore_bytes_per_sec=2gb] per second which is above "
156156
+ "the effective recovery rate limit [indices.recovery.max_bytes_per_sec=100mb] per second, thus the repository "
157157
+ "rate limit will be superseded by the recovery rate limit"
158158
);

server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotsServiceIT.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1717
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
1818
import org.elasticsearch.cluster.metadata.IndexMetadata;
19+
import org.elasticsearch.cluster.metadata.ProjectId;
1920
import org.elasticsearch.cluster.service.MasterService;
2021
import org.elasticsearch.common.settings.Settings;
2122
import org.elasticsearch.snapshots.mockstore.MockRepository;
@@ -43,7 +44,7 @@ public void testDeletingSnapshotsIsLoggedAfterClusterStateIsProcessed() throws E
4344
"[does-not-exist]",
4445
SnapshotsService.class.getName(),
4546
Level.INFO,
46-
"deleting snapshots [does-not-exist] from repository [test-repo]"
47+
"deleting snapshots [does-not-exist] from repository [default/test-repo]"
4748
)
4849
);
4950

@@ -52,7 +53,7 @@ public void testDeletingSnapshotsIsLoggedAfterClusterStateIsProcessed() throws E
5253
"[deleting test-snapshot]",
5354
SnapshotsService.class.getName(),
5455
Level.INFO,
55-
"deleting snapshots [test-snapshot] from repository [test-repo]"
56+
"deleting snapshots [test-snapshot] from repository [default/test-repo]"
5657
)
5758
);
5859

@@ -61,7 +62,7 @@ public void testDeletingSnapshotsIsLoggedAfterClusterStateIsProcessed() throws E
6162
"[test-snapshot deleted]",
6263
SnapshotsService.class.getName(),
6364
Level.INFO,
64-
"snapshots [test-snapshot/*] deleted"
65+
"snapshots [test-snapshot/*] deleted in repository [default/test-repo]"
6566
)
6667
);
6768

@@ -90,7 +91,7 @@ public void testSnapshotDeletionFailureShouldBeLogged() throws Exception {
9091
"[test-snapshot]",
9192
SnapshotsService.class.getName(),
9293
Level.WARN,
93-
"failed to complete snapshot deletion for [test-snapshot] from repository [test-repo]"
94+
"failed to complete snapshot deletion for [test-snapshot] from repository [default/test-repo]"
9495
)
9596
);
9697

@@ -176,10 +177,10 @@ private SubscribableListener<Void> createSnapshotDeletionListener(String reposit
176177
return false;
177178
}
178179
if (deleteHasStarted.get() == false) {
179-
deleteHasStarted.set(deletionsInProgress.hasExecutingDeletion(repositoryName));
180+
deleteHasStarted.set(deletionsInProgress.hasExecutingDeletion(ProjectId.DEFAULT, repositoryName));
180181
return false;
181182
} else {
182-
return deletionsInProgress.hasExecutingDeletion(repositoryName) == false;
183+
return deletionsInProgress.hasExecutingDeletion(ProjectId.DEFAULT, repositoryName) == false;
183184
}
184185
});
185186
}

server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,13 @@
2222
import org.elasticsearch.cluster.block.ClusterBlockException;
2323
import org.elasticsearch.cluster.block.ClusterBlockLevel;
2424
import org.elasticsearch.cluster.metadata.ProjectId;
25+
import org.elasticsearch.cluster.metadata.ProjectMetadata;
2526
import org.elasticsearch.cluster.node.DiscoveryNode;
27+
import org.elasticsearch.cluster.project.ProjectResolver;
2628
import org.elasticsearch.cluster.service.ClusterService;
2729
import org.elasticsearch.common.blobstore.DeleteResult;
2830
import org.elasticsearch.common.util.concurrent.EsExecutors;
2931
import org.elasticsearch.common.util.concurrent.ListenableFuture;
30-
import org.elasticsearch.core.FixForMultiProject;
3132
import org.elasticsearch.core.Nullable;
3233
import org.elasticsearch.core.SuppressForbidden;
3334
import org.elasticsearch.injection.guice.Inject;
@@ -66,14 +67,16 @@ public final class TransportCleanupRepositoryAction extends TransportMasterNodeA
6667
private static final Logger logger = LogManager.getLogger(TransportCleanupRepositoryAction.class);
6768

6869
private final RepositoriesService repositoriesService;
70+
private final ProjectResolver projectResolver;
6971

7072
@Inject
7173
public TransportCleanupRepositoryAction(
7274
TransportService transportService,
7375
ClusterService clusterService,
7476
RepositoriesService repositoriesService,
7577
ThreadPool threadPool,
76-
ActionFilters actionFilters
78+
ActionFilters actionFilters,
79+
ProjectResolver projectResolver
7780
) {
7881
super(
7982
TYPE.name(),
@@ -86,6 +89,7 @@ public TransportCleanupRepositoryAction(
8689
EsExecutors.DIRECT_EXECUTOR_SERVICE
8790
);
8891
this.repositoriesService = repositoriesService;
92+
this.projectResolver = projectResolver;
8993
// We add a state applier that will remove any dangling repository cleanup actions on master failover.
9094
// This is safe to do since cleanups will increment the repository state id before executing any operations to prevent concurrent
9195
// operations from corrupting the repository. This is the same safety mechanism used by snapshot deletes.
@@ -134,22 +138,23 @@ protected void masterOperation(
134138
ClusterState state,
135139
ActionListener<CleanupRepositoryResponse> listener
136140
) {
137-
cleanupRepo(request.name(), listener.map(CleanupRepositoryResponse::new));
141+
cleanupRepo(projectResolver.getProjectId(), request.name(), listener.map(CleanupRepositoryResponse::new));
138142
}
139143

140144
@Override
141145
protected ClusterBlockException checkBlock(CleanupRepositoryRequest request, ClusterState state) {
142146
// Cluster is not affected but we look up repositories in metadata
143-
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
147+
return state.blocks().globalBlockedException(projectResolver.getProjectId(), ClusterBlockLevel.METADATA_READ);
144148
}
145149

146150
/**
147151
* Runs cleanup operations on the given repository.
152+
* @param projectId Project for the repository
148153
* @param repositoryName Repository to clean up
149154
* @param listener Listener for cleanup result
150155
*/
151-
private void cleanupRepo(String repositoryName, ActionListener<RepositoryCleanupResult> listener) {
152-
final Repository repository = repositoriesService.repository(repositoryName);
156+
private void cleanupRepo(ProjectId projectId, String repositoryName, ActionListener<RepositoryCleanupResult> listener) {
157+
final Repository repository = repositoriesService.repository(projectId, repositoryName);
153158
if (repository instanceof BlobStoreRepository == false) {
154159
listener.onFailure(new IllegalArgumentException("Repository [" + repositoryName + "] does not support repository cleanup"));
155160
return;
@@ -172,8 +177,10 @@ private void cleanupRepo(String repositoryName, ActionListener<RepositoryCleanup
172177

173178
@Override
174179
public ClusterState execute(ClusterState currentState) {
175-
SnapshotsService.ensureRepositoryExists(repositoryName, currentState);
176-
SnapshotsService.ensureNotReadOnly(currentState, repositoryName);
180+
final ProjectMetadata projectMetadata = currentState.metadata().getProject(projectId);
181+
SnapshotsService.ensureRepositoryExists(repositoryName, projectMetadata);
182+
SnapshotsService.ensureNotReadOnly(projectMetadata, repositoryName);
183+
// Repository cleanup is intentionally cluster wide exclusive
177184
final RepositoryCleanupInProgress repositoryCleanupInProgress = RepositoryCleanupInProgress.get(currentState);
178185
if (repositoryCleanupInProgress.hasCleanupInProgress()) {
179186
throw new IllegalStateException(
@@ -200,8 +207,6 @@ public ClusterState execute(ClusterState currentState) {
200207
"Cannot cleanup [" + repositoryName + "] - a snapshot is currently running in [" + snapshots + "]"
201208
);
202209
}
203-
@FixForMultiProject
204-
final var projectId = ProjectId.DEFAULT;
205210
return ClusterState.builder(currentState)
206211
.putCustom(
207212
RepositoryCleanupInProgress.TYPE,

server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/clone/TransportCloneSnapshotAction.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.cluster.ClusterState;
1818
import org.elasticsearch.cluster.block.ClusterBlockException;
1919
import org.elasticsearch.cluster.block.ClusterBlockLevel;
20+
import org.elasticsearch.cluster.project.ProjectResolver;
2021
import org.elasticsearch.cluster.service.ClusterService;
2122
import org.elasticsearch.common.util.concurrent.EsExecutors;
2223
import org.elasticsearch.injection.guice.Inject;
@@ -32,14 +33,16 @@ public final class TransportCloneSnapshotAction extends AcknowledgedTransportMas
3233

3334
public static final ActionType<AcknowledgedResponse> TYPE = new ActionType<>("cluster:admin/snapshot/clone");
3435
private final SnapshotsService snapshotsService;
36+
private final ProjectResolver projectResolver;
3537

3638
@Inject
3739
public TransportCloneSnapshotAction(
3840
TransportService transportService,
3941
ClusterService clusterService,
4042
ThreadPool threadPool,
4143
SnapshotsService snapshotsService,
42-
ActionFilters actionFilters
44+
ActionFilters actionFilters,
45+
ProjectResolver projectResolver
4346
) {
4447
super(
4548
TYPE.name(),
@@ -51,12 +54,13 @@ public TransportCloneSnapshotAction(
5154
EsExecutors.DIRECT_EXECUTOR_SERVICE
5255
);
5356
this.snapshotsService = snapshotsService;
57+
this.projectResolver = projectResolver;
5458
}
5559

5660
@Override
5761
protected ClusterBlockException checkBlock(CloneSnapshotRequest request, ClusterState state) {
5862
// Cluster is not affected but we look up repositories in metadata
59-
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
63+
return state.blocks().globalBlockedException(projectResolver.getProjectId(), ClusterBlockLevel.METADATA_READ);
6064
}
6165

6266
@Override
@@ -66,6 +70,6 @@ protected void masterOperation(
6670
ClusterState state,
6771
final ActionListener<AcknowledgedResponse> listener
6872
) {
69-
snapshotsService.cloneSnapshot(request, listener.map(v -> AcknowledgedResponse.TRUE));
73+
snapshotsService.cloneSnapshot(projectResolver.getProjectId(), request, listener.map(v -> AcknowledgedResponse.TRUE));
7074
}
7175
}

server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/TransportCreateSnapshotAction.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.cluster.ClusterState;
1717
import org.elasticsearch.cluster.block.ClusterBlockException;
1818
import org.elasticsearch.cluster.block.ClusterBlockLevel;
19+
import org.elasticsearch.cluster.project.ProjectResolver;
1920
import org.elasticsearch.cluster.service.ClusterService;
2021
import org.elasticsearch.common.util.concurrent.EsExecutors;
2122
import org.elasticsearch.injection.guice.Inject;
@@ -31,14 +32,16 @@
3132
public class TransportCreateSnapshotAction extends TransportMasterNodeAction<CreateSnapshotRequest, CreateSnapshotResponse> {
3233
public static final ActionType<CreateSnapshotResponse> TYPE = new ActionType<>("cluster:admin/snapshot/create");
3334
private final SnapshotsService snapshotsService;
35+
private final ProjectResolver projectResolver;
3436

3537
@Inject
3638
public TransportCreateSnapshotAction(
3739
TransportService transportService,
3840
ClusterService clusterService,
3941
ThreadPool threadPool,
4042
SnapshotsService snapshotsService,
41-
ActionFilters actionFilters
43+
ActionFilters actionFilters,
44+
ProjectResolver projectResolver
4245
) {
4346
super(
4447
TYPE.name(),
@@ -51,12 +54,13 @@ public TransportCreateSnapshotAction(
5154
EsExecutors.DIRECT_EXECUTOR_SERVICE
5255
);
5356
this.snapshotsService = snapshotsService;
57+
this.projectResolver = projectResolver;
5458
}
5559

5660
@Override
5761
protected ClusterBlockException checkBlock(CreateSnapshotRequest request, ClusterState state) {
5862
// We only check metadata block, as we want to snapshot closed indices (which have a read block)
59-
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
63+
return state.blocks().globalBlockedException(projectResolver.getProjectId(), ClusterBlockLevel.METADATA_READ);
6064
}
6165

6266
@Override
@@ -67,9 +71,13 @@ protected void masterOperation(
6771
final ActionListener<CreateSnapshotResponse> listener
6872
) {
6973
if (request.waitForCompletion()) {
70-
snapshotsService.executeSnapshot(request, listener.map(CreateSnapshotResponse::new));
74+
snapshotsService.executeSnapshot(projectResolver.getProjectId(), request, listener.map(CreateSnapshotResponse::new));
7175
} else {
72-
snapshotsService.createSnapshot(request, listener.map(snapshot -> new CreateSnapshotResponse((SnapshotInfo) null)));
76+
snapshotsService.createSnapshot(
77+
projectResolver.getProjectId(),
78+
request,
79+
listener.map(snapshot -> new CreateSnapshotResponse((SnapshotInfo) null))
80+
);
7381
}
7482
}
7583
}

0 commit comments

Comments
 (0)