Skip to content

Commit 6311bb3

Browse files
authored
Support per-project s3 clients (elastic#127631)
With project secrets in place, we can now create and manage per-project repository clients in addition to the cluster level repository clients. This PR adds a manager class for that. Note that the logic is not yet fully wired because it needs per-project repository/objec_store which will be added in separate PRs (as part of MP snapshots and MP objecStoreService). As such the tests are currently unit tests. Relates: elastic#126584 Resolves: ES-11713
1 parent b2e9eb2 commit 6311bb3

File tree

12 files changed

+1162
-133
lines changed

12 files changed

+1162
-133
lines changed

modules/repository-s3/qa/insecure-credentials/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
import org.apache.logging.log4j.LogManager;
1818
import org.apache.logging.log4j.Logger;
1919
import org.elasticsearch.client.internal.node.NodeClient;
20+
import org.elasticsearch.cluster.project.ProjectResolver;
21+
import org.elasticsearch.cluster.service.ClusterService;
2022
import org.elasticsearch.common.settings.MockSecureSettings;
2123
import org.elasticsearch.common.settings.Settings;
2224
import org.elasticsearch.common.settings.SettingsFilter;
@@ -256,8 +258,13 @@ public ProxyS3RepositoryPlugin(Settings settings) {
256258
}
257259

258260
@Override
259-
S3Service s3Service(Environment environment, Settings nodeSettings, ResourceWatcherService resourceWatcherService) {
260-
return new ProxyS3Service(environment, nodeSettings, resourceWatcherService);
261+
S3Service s3Service(
262+
Environment environment,
263+
ClusterService clusterService,
264+
ProjectResolver projectResolver,
265+
ResourceWatcherService resourceWatcherService
266+
) {
267+
return new ProxyS3Service(environment, clusterService, projectResolver, resourceWatcherService);
261268
}
262269

263270
/**
@@ -293,8 +300,13 @@ public static final class ProxyS3Service extends S3Service {
293300

294301
private static final Logger logger = LogManager.getLogger(ProxyS3Service.class);
295302

296-
ProxyS3Service(Environment environment, Settings nodeSettings, ResourceWatcherService resourceWatcherService) {
297-
super(environment, nodeSettings, resourceWatcherService, () -> null);
303+
ProxyS3Service(
304+
Environment environment,
305+
ClusterService clusterService,
306+
ProjectResolver projectResolver,
307+
ResourceWatcherService resourceWatcherService
308+
) {
309+
super(environment, clusterService, projectResolver, resourceWatcherService, () -> null);
298310
}
299311

300312
@Override

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3ClientsManager.java

Lines changed: 490 additions & 0 deletions
Large diffs are not rendered by default.

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.apache.lucene.util.SetOnce;
1616
import org.elasticsearch.cluster.metadata.ProjectId;
1717
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
18+
import org.elasticsearch.cluster.project.ProjectResolver;
1819
import org.elasticsearch.cluster.service.ClusterService;
1920
import org.elasticsearch.common.settings.Setting;
2021
import org.elasticsearch.common.settings.Settings;
@@ -80,13 +81,20 @@ protected S3Repository createRepository(
8081

8182
@Override
8283
public Collection<?> createComponents(PluginServices services) {
83-
service.set(s3Service(services.environment(), services.clusterService().getSettings(), services.resourceWatcherService()));
84+
service.set(
85+
s3Service(services.environment(), services.clusterService(), services.projectResolver(), services.resourceWatcherService())
86+
);
8487
this.service.get().refreshAndClearCache(S3ClientSettings.load(settings));
8588
return List.of(service.get());
8689
}
8790

88-
S3Service s3Service(Environment environment, Settings nodeSettings, ResourceWatcherService resourceWatcherService) {
89-
return new S3Service(environment, nodeSettings, resourceWatcherService, S3RepositoryPlugin::getDefaultRegion);
91+
S3Service s3Service(
92+
Environment environment,
93+
ClusterService clusterService,
94+
ProjectResolver projectResolver,
95+
ResourceWatcherService resourceWatcherService
96+
) {
97+
return new S3Service(environment, clusterService, projectResolver, resourceWatcherService, S3RepositoryPlugin::getDefaultRegion);
9098
}
9199

92100
private static Region getDefaultRegion() {

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Service.java

Lines changed: 68 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -37,17 +37,19 @@
3737
import org.apache.http.conn.DnsResolver;
3838
import org.apache.logging.log4j.LogManager;
3939
import org.apache.logging.log4j.Logger;
40-
import org.apache.lucene.store.AlreadyClosedException;
4140
import org.elasticsearch.ElasticsearchException;
4241
import org.elasticsearch.cluster.coordination.stateless.StoreHeartbeatService;
42+
import org.elasticsearch.cluster.metadata.ProjectId;
4343
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
4444
import org.elasticsearch.cluster.node.DiscoveryNode;
45+
import org.elasticsearch.cluster.project.ProjectResolver;
46+
import org.elasticsearch.cluster.service.ClusterService;
4547
import org.elasticsearch.common.Strings;
4648
import org.elasticsearch.common.component.AbstractLifecycleComponent;
4749
import org.elasticsearch.common.settings.Setting;
4850
import org.elasticsearch.common.settings.Settings;
49-
import org.elasticsearch.common.util.Maps;
5051
import org.elasticsearch.common.util.concurrent.RunOnce;
52+
import org.elasticsearch.core.FixForMultiProject;
5153
import org.elasticsearch.core.Nullable;
5254
import org.elasticsearch.core.Releasable;
5355
import org.elasticsearch.core.Releasables;
@@ -72,7 +74,6 @@
7274
import java.util.function.Consumer;
7375
import java.util.function.Supplier;
7476

75-
import static java.util.Collections.emptyMap;
7677
import static software.amazon.awssdk.core.SdkSystemSetting.AWS_ROLE_ARN;
7778
import static software.amazon.awssdk.core.SdkSystemSetting.AWS_ROLE_SESSION_NAME;
7879
import static software.amazon.awssdk.core.SdkSystemSetting.AWS_WEB_IDENTITY_TOKEN_FILE;
@@ -93,21 +94,6 @@ class S3Service extends AbstractLifecycleComponent {
9394
TimeValue.timeValueHours(24),
9495
Setting.Property.NodeScope
9596
);
96-
private volatile Map<S3ClientSettings, AmazonS3Reference> clientsCache = emptyMap();
97-
98-
/**
99-
* Client settings calculated from static configuration and settings in the keystore.
100-
*/
101-
private volatile Map<String, S3ClientSettings> staticClientSettings = Map.of(
102-
"default",
103-
S3ClientSettings.getClientSettings(Settings.EMPTY, "default")
104-
);
105-
106-
/**
107-
* Client settings derived from those in {@link #staticClientSettings} by combining them with settings
108-
* in the {@link RepositoryMetadata}.
109-
*/
110-
private volatile Map<Settings, S3ClientSettings> derivedClientSettings = emptyMap();
11197

11298
private final Runnable defaultRegionSetter;
11399
private volatile Region defaultRegion;
@@ -124,13 +110,16 @@ class S3Service extends AbstractLifecycleComponent {
124110
final TimeValue compareAndExchangeTimeToLive;
125111
final TimeValue compareAndExchangeAntiContentionDelay;
126112
final boolean isStateless;
113+
private final S3ClientsManager s3ClientsManager;
127114

128115
S3Service(
129116
Environment environment,
130-
Settings nodeSettings,
117+
ClusterService clusterService,
118+
ProjectResolver projectResolver,
131119
ResourceWatcherService resourceWatcherService,
132120
Supplier<Region> defaultRegionSupplier
133121
) {
122+
final Settings nodeSettings = clusterService.getSettings();
134123
webIdentityTokenCredentialsProvider = new CustomWebIdentityTokenCredentialsProvider(
135124
environment,
136125
System::getenv,
@@ -142,6 +131,20 @@ class S3Service extends AbstractLifecycleComponent {
142131
compareAndExchangeAntiContentionDelay = REPOSITORY_S3_CAS_ANTI_CONTENTION_DELAY_SETTING.get(nodeSettings);
143132
isStateless = DiscoveryNode.isStateless(nodeSettings);
144133
defaultRegionSetter = new RunOnce(() -> defaultRegion = defaultRegionSupplier.get());
134+
s3ClientsManager = new S3ClientsManager(
135+
nodeSettings,
136+
this::buildClientReference,
137+
clusterService.threadPool().generic(),
138+
projectResolver.supportsMultipleProjects()
139+
);
140+
if (projectResolver.supportsMultipleProjects()) {
141+
clusterService.addHighPriorityApplier(s3ClientsManager);
142+
}
143+
}
144+
145+
// visible to tests
146+
S3ClientsManager getS3ClientsManager() {
147+
return s3ClientsManager;
145148
}
146149

147150
/**
@@ -151,86 +154,55 @@ class S3Service extends AbstractLifecycleComponent {
151154
* of being returned to the cache.
152155
*/
153156
public synchronized void refreshAndClearCache(Map<String, S3ClientSettings> clientsSettings) {
154-
// shutdown all unused clients
155-
// others will shutdown on their respective release
156-
releaseCachedClients();
157-
this.staticClientSettings = Maps.ofEntries(clientsSettings.entrySet());
158-
derivedClientSettings = emptyMap();
159-
assert this.staticClientSettings.containsKey("default") : "always at least have 'default'";
160-
/* clients are built lazily by {@link #client} */
157+
s3ClientsManager.refreshAndClearCacheForClusterClients(clientsSettings);
161158
}
162159

163160
/**
164161
* Attempts to retrieve a client by its repository metadata and settings from the cache.
165162
* If the client does not exist it will be created.
166163
*/
164+
@FixForMultiProject(description = "can be removed once blobstore is project aware")
167165
public AmazonS3Reference client(RepositoryMetadata repositoryMetadata) {
168-
final S3ClientSettings clientSettings = settings(repositoryMetadata);
169-
{
170-
final AmazonS3Reference clientReference = clientsCache.get(clientSettings);
171-
if (clientReference != null && clientReference.tryIncRef()) {
172-
return clientReference;
173-
}
174-
}
175-
synchronized (this) {
176-
final AmazonS3Reference existing = clientsCache.get(clientSettings);
177-
if (existing != null && existing.tryIncRef()) {
178-
return existing;
179-
}
180-
181-
if (lifecycle.started() == false) {
182-
// doClose() calls releaseCachedClients() which is also synchronized (this) so if we're STARTED here then the client we
183-
// create will definitely not leak on close.
184-
throw new AlreadyClosedException("S3Service is in state [" + lifecycle + "]");
185-
}
166+
return client(ProjectId.DEFAULT, repositoryMetadata);
167+
}
186168

187-
final SdkHttpClient httpClient = buildHttpClient(clientSettings, getCustomDnsResolver());
188-
Releasable toRelease = httpClient::close;
189-
try {
190-
final AmazonS3Reference clientReference = new AmazonS3Reference(buildClient(clientSettings, httpClient), httpClient);
191-
clientReference.mustIncRef();
192-
clientsCache = Maps.copyMapWithAddedEntry(clientsCache, clientSettings, clientReference);
193-
toRelease = null;
194-
return clientReference;
195-
} finally {
196-
Releasables.close(toRelease);
197-
}
198-
}
169+
/**
170+
* Attempts to retrieve either a cluster or project client from the client manager. Throws if project-id or
171+
* the client name does not exist. The client maybe initialized lazily.
172+
* @param projectId The project associated with the client, or null if the client is cluster level
173+
*/
174+
public AmazonS3Reference client(@Nullable ProjectId projectId, RepositoryMetadata repositoryMetadata) {
175+
return s3ClientsManager.client(effectiveProjectId(projectId), repositoryMetadata);
199176
}
200177

201178
/**
202-
* Either fetches {@link S3ClientSettings} for a given {@link RepositoryMetadata} from cached settings or creates them
203-
* by overriding static client settings from {@link #staticClientSettings} with settings found in the repository metadata.
204-
* @param repositoryMetadata Repository Metadata
205-
* @return S3ClientSettings
179+
* We use the default project-id for cluster level clients.
206180
*/
207-
S3ClientSettings settings(RepositoryMetadata repositoryMetadata) {
208-
final Settings settings = repositoryMetadata.settings();
209-
{
210-
final S3ClientSettings existing = derivedClientSettings.get(settings);
211-
if (existing != null) {
212-
return existing;
213-
}
214-
}
215-
final String clientName = S3Repository.CLIENT_NAME.get(settings);
216-
final S3ClientSettings staticSettings = staticClientSettings.get(clientName);
217-
if (staticSettings != null) {
218-
synchronized (this) {
219-
final S3ClientSettings existing = derivedClientSettings.get(settings);
220-
if (existing != null) {
221-
return existing;
222-
}
223-
final S3ClientSettings newSettings = staticSettings.refine(settings);
224-
derivedClientSettings = Maps.copyMapWithAddedOrReplacedEntry(derivedClientSettings, settings, newSettings);
225-
return newSettings;
226-
}
181+
ProjectId effectiveProjectId(@Nullable ProjectId projectId) {
182+
return projectId == null ? ProjectId.DEFAULT : projectId;
183+
}
184+
185+
// TODO: consider moving client building into S3ClientsManager
186+
private AmazonS3Reference buildClientReference(final S3ClientSettings clientSettings) {
187+
final SdkHttpClient httpClient = buildHttpClient(clientSettings, getCustomDnsResolver());
188+
Releasable toRelease = httpClient::close;
189+
try {
190+
final AmazonS3Reference clientReference = new AmazonS3Reference(buildClient(clientSettings, httpClient), httpClient);
191+
clientReference.mustIncRef();
192+
toRelease = null;
193+
return clientReference;
194+
} finally {
195+
Releasables.close(toRelease);
227196
}
228-
throw new IllegalArgumentException(
229-
"Unknown s3 client name ["
230-
+ clientName
231-
+ "]. Existing client configs: "
232-
+ Strings.collectionToDelimitedString(staticClientSettings.keySet(), ",")
233-
);
197+
}
198+
199+
@FixForMultiProject(description = "can be removed once blobstore is project aware")
200+
S3ClientSettings settings(RepositoryMetadata repositoryMetadata) {
201+
return settings(ProjectId.DEFAULT, repositoryMetadata);
202+
}
203+
204+
S3ClientSettings settings(@Nullable ProjectId projectId, RepositoryMetadata repositoryMetadata) {
205+
return s3ClientsManager.settingsForClient(effectiveProjectId(projectId), repositoryMetadata);
234206
}
235207

236208
// proxy for testing
@@ -448,18 +420,17 @@ static AwsCredentialsProvider buildCredentials(
448420
}
449421
}
450422

451-
private synchronized void releaseCachedClients() {
452-
// the clients will shutdown when they will not be used anymore
453-
for (final AmazonS3Reference clientReference : clientsCache.values()) {
454-
clientReference.decRef();
455-
}
456-
// clear previously cached clients, they will be build lazily
457-
clientsCache = emptyMap();
458-
derivedClientSettings = emptyMap();
423+
@FixForMultiProject(description = "can be removed once blobstore is project aware")
424+
public void onBlobStoreClose() {
425+
onBlobStoreClose(ProjectId.DEFAULT);
459426
}
460427

461-
public void onBlobStoreClose() {
462-
releaseCachedClients();
428+
/**
429+
* Release clients for the specified project.
430+
* @param projectId The project associated with the client, or null if the client is cluster level
431+
*/
432+
public void onBlobStoreClose(@Nullable ProjectId projectId) {
433+
s3ClientsManager.releaseCachedClients(effectiveProjectId(projectId));
463434
}
464435

465436
@Override
@@ -472,7 +443,7 @@ protected void doStop() {}
472443

473444
@Override
474445
public void doClose() throws IOException {
475-
releaseCachedClients();
446+
s3ClientsManager.close();
476447
webIdentityTokenCredentialsProvider.close();
477448
}
478449

modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/AwsS3ServiceImplTests.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,12 @@
2222
import org.apache.logging.log4j.Logger;
2323
import org.apache.logging.log4j.util.Supplier;
2424
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
25+
import org.elasticsearch.cluster.project.TestProjectResolvers;
2526
import org.elasticsearch.common.settings.MockSecureSettings;
2627
import org.elasticsearch.common.settings.Settings;
28+
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
2729
import org.elasticsearch.env.Environment;
30+
import org.elasticsearch.test.ClusterServiceUtils;
2831
import org.elasticsearch.test.ESTestCase;
2932
import org.elasticsearch.watcher.ResourceWatcherService;
3033
import org.mockito.ArgumentCaptor;
@@ -234,7 +237,8 @@ public void testEndPointAndRegionOverrides() throws IOException {
234237
try (
235238
S3Service s3Service = new S3Service(
236239
mock(Environment.class),
237-
Settings.EMPTY,
240+
ClusterServiceUtils.createClusterService(new DeterministicTaskQueue().getThreadPool()),
241+
TestProjectResolvers.DEFAULT_PROJECT_ONLY,
238242
mock(ResourceWatcherService.class),
239243
() -> Region.of("es-test-region")
240244
)
@@ -248,7 +252,6 @@ public void testEndPointAndRegionOverrides() throws IOException {
248252
assertEquals("es-test-region", reference.client().serviceClientConfiguration().region().toString());
249253

250254
reference.close();
251-
s3Service.doClose();
252255
}
253256
}
254257

modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.logging.log4j.Level;
2323
import org.elasticsearch.ExceptionsHelper;
2424
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
25+
import org.elasticsearch.cluster.project.TestProjectResolvers;
2526
import org.elasticsearch.common.BackoffPolicy;
2627
import org.elasticsearch.common.Strings;
2728
import org.elasticsearch.common.blobstore.BlobContainer;
@@ -54,6 +55,7 @@
5455
import org.elasticsearch.telemetry.InstrumentType;
5556
import org.elasticsearch.telemetry.Measurement;
5657
import org.elasticsearch.telemetry.RecordingMeterRegistry;
58+
import org.elasticsearch.test.ClusterServiceUtils;
5759
import org.elasticsearch.test.ESTestCase;
5860
import org.elasticsearch.test.MockLog;
5961
import org.elasticsearch.watcher.ResourceWatcherService;
@@ -126,7 +128,13 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
126128
@Before
127129
public void setUp() throws Exception {
128130
shouldErrorOnDns = false;
129-
service = new S3Service(Mockito.mock(Environment.class), Settings.EMPTY, Mockito.mock(ResourceWatcherService.class), () -> null) {
131+
service = new S3Service(
132+
Mockito.mock(Environment.class),
133+
ClusterServiceUtils.createClusterService(new DeterministicTaskQueue().getThreadPool()),
134+
TestProjectResolvers.DEFAULT_PROJECT_ONLY,
135+
Mockito.mock(ResourceWatcherService.class),
136+
() -> null
137+
) {
130138
private InetAddress[] resolveHost(String host) throws UnknownHostException {
131139
assertEquals("127.0.0.1", host);
132140
if (shouldErrorOnDns && randomBoolean() && randomBoolean()) {
@@ -1308,7 +1316,11 @@ public void testRetryOn403InStateless() {
13081316

13091317
service = new S3Service(
13101318
Mockito.mock(Environment.class),
1311-
Settings.builder().put(STATELESS_ENABLED_SETTING_NAME, "true").build(),
1319+
ClusterServiceUtils.createClusterService(
1320+
new DeterministicTaskQueue().getThreadPool(),
1321+
Settings.builder().put(STATELESS_ENABLED_SETTING_NAME, "true").build()
1322+
),
1323+
TestProjectResolvers.DEFAULT_PROJECT_ONLY,
13121324
Mockito.mock(ResourceWatcherService.class),
13131325
() -> null
13141326
);

0 commit comments

Comments
 (0)