Skip to content

Commit 4530015

Browse files
committed
Make s3 repository project aware
Pass project-id explicitly to repository factory and make it part of the repository interface. Relates: ES-11839
1 parent 043cd71 commit 4530015

File tree

12 files changed

+165
-9
lines changed

12 files changed

+165
-9
lines changed

modules/repository-s3/qa/third-party/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
*/
99
package org.elasticsearch.repositories.s3;
1010

11+
import org.elasticsearch.cluster.metadata.ProjectId;
12+
1113
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
1214
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
1315
import software.amazon.awssdk.services.s3.model.ListMultipartUploadsRequest;
@@ -147,6 +149,7 @@ public long absoluteTimeInMillis() {
147149
// construct our own repo instance so we can inject a threadpool that allows to control the passage of time
148150
try (
149151
var repository = new S3Repository(
152+
ProjectId.DEFAULT,
150153
node().injector().getInstance(RepositoriesService.class).repository(TEST_REPO_NAME).getMetadata(),
151154
xContentRegistry(),
152155
node().injector().getInstance(PluginsService.class).filterPlugins(S3RepositoryPlugin.class).findFirst().get().getService(),

modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99
package org.elasticsearch.repositories.s3;
1010

1111
import fixture.s3.S3HttpHandler;
12+
13+
import org.elasticsearch.cluster.metadata.ProjectId;
14+
1215
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
1316
import software.amazon.awssdk.core.internal.http.pipeline.stages.ApplyTransactionIdStage;
1417
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
@@ -605,7 +608,7 @@ protected S3Repository createRepository(
605608
RecoverySettings recoverySettings,
606609
S3RepositoriesMetrics s3RepositoriesMetrics
607610
) {
608-
return new S3Repository(metadata, registry, getService(), clusterService, bigArrays, recoverySettings, s3RepositoriesMetrics) {
611+
return new S3Repository(ProjectId.DEFAULT, metadata, registry, getService(), clusterService, bigArrays, recoverySettings, s3RepositoriesMetrics) {
609612

610613
@Override
611614
public BlobStore blobStore() {

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@
99

1010
package org.elasticsearch.repositories.s3;
1111

12+
import org.elasticsearch.cluster.metadata.ProjectId;
13+
14+
import org.elasticsearch.core.Nullable;
15+
1216
import software.amazon.awssdk.awscore.AwsRequest;
1317
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
1418
import software.amazon.awssdk.core.exception.SdkException;
@@ -74,6 +78,8 @@ class S3BlobStore implements BlobStore {
7478

7579
private static final Logger logger = LogManager.getLogger(S3BlobStore.class);
7680

81+
@Nullable // if the blobstore is at the cluster level
82+
private final ProjectId projectId;
7783
private final S3Service service;
7884

7985
private final BigArrays bigArrays;
@@ -106,6 +112,7 @@ class S3BlobStore implements BlobStore {
106112
private final boolean addPurposeCustomQueryParameter;
107113

108114
S3BlobStore(
115+
@Nullable ProjectId projectId,
109116
S3Service service,
110117
String bucket,
111118
boolean serverSideEncryption,
@@ -119,6 +126,7 @@ class S3BlobStore implements BlobStore {
119126
S3RepositoriesMetrics s3RepositoriesMetrics,
120127
BackoffPolicy retryThrottledDeleteBackoffPolicy
121128
) {
129+
this.projectId = projectId;
122130
this.service = service;
123131
this.bigArrays = bigArrays;
124132
this.bucket = bucket;
@@ -257,6 +265,7 @@ public String toString() {
257265
}
258266

259267
public AmazonS3Reference clientReference() {
268+
// TODO: use service.client(ProjectId, RepositoryMetadata), see https://github.com/elastic/elasticsearch/pull/127631
260269
return service.client(repositoryMetadata);
261270
}
262271

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.action.ActionListener;
1515
import org.elasticsearch.action.ActionRunnable;
1616
import org.elasticsearch.action.support.RefCountingRunnable;
17+
import org.elasticsearch.cluster.metadata.ProjectId;
1718
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
1819
import org.elasticsearch.cluster.service.ClusterService;
1920
import org.elasticsearch.common.BackoffPolicy;
@@ -275,6 +276,7 @@ class S3Repository extends MeteredBlobStoreRepository {
275276
* Constructs an s3 backed repository
276277
*/
277278
S3Repository(
279+
final ProjectId projectId,
278280
final RepositoryMetadata metadata,
279281
final NamedXContentRegistry namedXContentRegistry,
280282
final S3Service service,
@@ -284,6 +286,7 @@ class S3Repository extends MeteredBlobStoreRepository {
284286
final S3RepositoriesMetrics s3RepositoriesMetrics
285287
) {
286288
super(
289+
projectId,
287290
metadata,
288291
namedXContentRegistry,
289292
clusterService,
@@ -468,6 +471,7 @@ private static BlobPath buildBasePath(RepositoryMetadata metadata) {
468471
@Override
469472
protected S3BlobStore createBlobStore() {
470473
return new S3BlobStore(
474+
getProjectId(),
471475
service,
472476
bucket,
473477
serverSideEncryption,

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99

1010
package org.elasticsearch.repositories.s3;
1111

12+
import org.elasticsearch.cluster.metadata.ProjectId;
13+
1214
import software.amazon.awssdk.regions.Region;
1315
import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain;
1416

@@ -36,6 +38,7 @@
3638
import java.util.Collections;
3739
import java.util.List;
3840
import java.util.Map;
41+
import java.util.function.Function;
3942

4043
/**
4144
* A plugin to add a repository type that writes to and from the AWS S3.
@@ -57,14 +60,24 @@ S3Service getService() {
5760

5861
// proxy method for testing
5962
protected S3Repository createRepository(
63+
final ProjectId projectId,
6064
final RepositoryMetadata metadata,
6165
final NamedXContentRegistry registry,
6266
final ClusterService clusterService,
6367
final BigArrays bigArrays,
6468
final RecoverySettings recoverySettings,
6569
final S3RepositoriesMetrics s3RepositoriesMetrics
6670
) {
67-
return new S3Repository(metadata, registry, service.get(), clusterService, bigArrays, recoverySettings, s3RepositoriesMetrics);
71+
return new S3Repository(
72+
projectId,
73+
metadata,
74+
registry,
75+
service.get(),
76+
clusterService,
77+
bigArrays,
78+
recoverySettings,
79+
s3RepositoriesMetrics
80+
);
6881
}
6982

7083
@Override
@@ -97,10 +110,18 @@ public Map<String, Repository.Factory> getRepositories(
97110
final RepositoriesMetrics repositoriesMetrics
98111
) {
99112
final S3RepositoriesMetrics s3RepositoriesMetrics = new S3RepositoriesMetrics(repositoriesMetrics);
100-
return Collections.singletonMap(
101-
S3Repository.TYPE,
102-
metadata -> createRepository(metadata, registry, clusterService, bigArrays, recoverySettings, s3RepositoriesMetrics)
103-
);
113+
return Collections.singletonMap(S3Repository.TYPE, new Repository.Factory() {
114+
@Override
115+
public Repository create(RepositoryMetadata metadata) throws Exception {
116+
throw new IllegalStateException("Must use the create(ProjectId, RepositoryMetadata) method");
117+
}
118+
119+
@Override
120+
public Repository create(ProjectId projectId, RepositoryMetadata metadata, Function<String, Repository.Factory> typeLookup)
121+
throws Exception {
122+
return createRepository(projectId, metadata, registry, clusterService, bigArrays, recoverySettings, s3RepositoriesMetrics);
123+
}
124+
});
104125
}
105126

106127
@Override

modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99
package org.elasticsearch.repositories.s3;
1010

1111
import fixture.s3.S3HttpHandler;
12+
13+
import org.elasticsearch.cluster.metadata.ProjectId;
14+
1215
import software.amazon.awssdk.core.exception.SdkClientException;
1316
import software.amazon.awssdk.core.exception.SdkException;
1417
import software.amazon.awssdk.services.s3.model.S3Exception;
@@ -231,6 +234,7 @@ protected BlobContainer createBlobContainer(
231234
final RepositoryMetadata repositoryMetadata = new RepositoryMetadata("repository", S3Repository.TYPE, repositorySettings.build());
232235

233236
final S3BlobStore s3BlobStore = new S3BlobStore(
237+
ProjectId.DEFAULT,
234238
service,
235239
"bucket",
236240
S3Repository.SERVER_SIDE_ENCRYPTION_SETTING.getDefault(Settings.EMPTY),

modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99

1010
package org.elasticsearch.repositories.s3;
1111

12+
import org.elasticsearch.cluster.metadata.ProjectId;
13+
1214
import software.amazon.awssdk.http.SdkHttpClient;
1315
import software.amazon.awssdk.services.s3.S3Client;
1416

@@ -153,6 +155,7 @@ public void testEmptyBucketName() {
153155

154156
private S3Repository createS3Repo(RepositoryMetadata metadata) {
155157
return new S3Repository(
158+
ProjectId.DEFAULT,
156159
metadata,
157160
NamedXContentRegistry.EMPTY,
158161
new DummyS3Service(mock(Environment.class), mock(ResourceWatcherService.class)),

server/src/main/java/org/elasticsearch/repositories/FilterRepository.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.cluster.ClusterState;
1313
import org.elasticsearch.cluster.metadata.IndexMetadata;
1414
import org.elasticsearch.cluster.metadata.Metadata;
15+
import org.elasticsearch.cluster.metadata.ProjectId;
1516
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
1617
import org.elasticsearch.cluster.node.DiscoveryNode;
1718
import org.elasticsearch.common.component.Lifecycle;
@@ -44,6 +45,11 @@ public Repository getDelegate() {
4445
return in;
4546
}
4647

48+
@Override
49+
public ProjectId getProjectId() {
50+
return in.getProjectId();
51+
}
52+
4753
@Override
4854
public RepositoryMetadata getMetadata() {
4955
return in.getMetadata();

server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.cluster.SnapshotsInProgress;
3131
import org.elasticsearch.cluster.metadata.IndexMetadata;
3232
import org.elasticsearch.cluster.metadata.Metadata;
33+
import org.elasticsearch.cluster.metadata.ProjectId;
3334
import org.elasticsearch.cluster.metadata.ProjectMetadata;
3435
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
3536
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
@@ -44,7 +45,9 @@
4445
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
4546
import org.elasticsearch.common.util.concurrent.EsExecutors;
4647
import org.elasticsearch.common.util.concurrent.ListenableFuture;
48+
import org.elasticsearch.core.FixForMultiProject;
4749
import org.elasticsearch.core.IOUtils;
50+
import org.elasticsearch.core.Nullable;
4851
import org.elasticsearch.core.SuppressForbidden;
4952
import org.elasticsearch.core.TimeValue;
5053
import org.elasticsearch.index.Index;
@@ -822,19 +825,34 @@ private void archiveRepositoryStats(Repository repository, long clusterStateVers
822825
/**
823826
* Creates repository holder. This method starts the repository
824827
*/
828+
@FixForMultiProject
829+
@Deprecated(forRemoval = true)
825830
private static Repository createRepository(
826831
RepositoryMetadata repositoryMetadata,
827832
Map<String, Repository.Factory> factories,
828833
Function<RepositoryMetadata, Repository> defaultFactory
834+
) {
835+
return createRepository(ProjectId.DEFAULT, repositoryMetadata, factories, defaultFactory);
836+
}
837+
838+
/**
839+
* Creates repository holder. This method starts the repository
840+
*/
841+
private static Repository createRepository(
842+
@Nullable ProjectId projectId,
843+
RepositoryMetadata repositoryMetadata,
844+
Map<String, Repository.Factory> factories,
845+
Function<RepositoryMetadata, Repository> defaultFactory
829846
) {
830847
logger.debug("creating repository [{}][{}]", repositoryMetadata.type(), repositoryMetadata.name());
848+
@FixForMultiProject(description = "should defaultFactory handle projectId?")
831849
Repository.Factory factory = factories.get(repositoryMetadata.type());
832850
if (factory == null) {
833851
return defaultFactory.apply(repositoryMetadata);
834852
}
835853
Repository repository = null;
836854
try {
837-
repository = factory.create(repositoryMetadata, factories::get);
855+
repository = factory.create(projectId, repositoryMetadata, factories::get);
838856
repository.start();
839857
return repository;
840858
} catch (Exception e) {
@@ -859,8 +877,30 @@ private static Repository createRepository(
859877
* @return the started repository
860878
* @throws RepositoryException if repository type is not registered
861879
*/
880+
@FixForMultiProject
881+
@Deprecated(forRemoval = true)
862882
public Repository createRepository(RepositoryMetadata repositoryMetadata) {
863-
return createRepository(repositoryMetadata, typesRegistry, RepositoriesService::throwRepositoryTypeDoesNotExists);
883+
return createRepository(ProjectId.DEFAULT, repositoryMetadata);
884+
}
885+
886+
/**
887+
* Creates a repository holder.
888+
*
889+
* <p>WARNING: This method is intended for expert only usage mainly in plugins/modules. Please take note of the following:</p>
890+
*
891+
* <ul>
892+
* <li>This method does not register the repository (e.g., in the cluster state).</li>
893+
* <li>This method starts the repository. The repository should be closed after use.</li>
894+
* <li>The repository metadata should be associated to an already registered non-internal repository type and factory pair.</li>
895+
* </ul>
896+
*
897+
* @param projectId the project that the repository is associated with. May be null if the repository is at cluster level
898+
* @param repositoryMetadata the repository metadata
899+
* @return the started repository
900+
* @throws RepositoryException if repository type is not registered
901+
*/
902+
public Repository createRepository(@Nullable ProjectId projectId, RepositoryMetadata repositoryMetadata) {
903+
return createRepository(projectId, repositoryMetadata, typesRegistry, RepositoriesService::throwRepositoryTypeDoesNotExists);
864904
}
865905

866906
private static Repository throwRepositoryTypeDoesNotExists(RepositoryMetadata repositoryMetadata) {

server/src/main/java/org/elasticsearch/repositories/Repository.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.cluster.ClusterState;
1313
import org.elasticsearch.cluster.metadata.IndexMetadata;
1414
import org.elasticsearch.cluster.metadata.Metadata;
15+
import org.elasticsearch.cluster.metadata.ProjectId;
1516
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
1617
import org.elasticsearch.cluster.node.DiscoveryNode;
1718
import org.elasticsearch.common.component.LifecycleComponent;
@@ -61,11 +62,33 @@ interface Factory {
6162
*/
6263
Repository create(RepositoryMetadata metadata) throws Exception;
6364

65+
/**
66+
* Constructs a repository.
67+
* @param projectId the project-id for the repository
68+
* @param metadata metadata for the repository including name and settings
69+
* @param typeLookup a function that returns the repository factory for the given repository type.
70+
*/
71+
default Repository create(ProjectId projectId, RepositoryMetadata metadata, Function<String, Repository.Factory> typeLookup)
72+
throws Exception {
73+
return create(metadata, typeLookup);
74+
}
75+
6476
default Repository create(RepositoryMetadata metadata, Function<String, Repository.Factory> typeLookup) throws Exception {
6577
return create(metadata);
6678
}
6779
}
6880

81+
/**
82+
* Get the project-id for the repository.
83+
*
84+
* @return the project-id, or null if the repository is at the cluster level.
85+
* @throws UnsupportedOperationException if the project-id is not supported by the repository.
86+
*/
87+
@Nullable
88+
default ProjectId getProjectId() {
89+
throw new UnsupportedOperationException("getProjectId() not supported");
90+
}
91+
6992
/**
7093
* Returns metadata about this repository.
7194
*/

0 commit comments

Comments
 (0)