Skip to content

Commit 28c5da4

Browse files
committed
WIP
1 parent 6edc76d commit 28c5da4

File tree

11 files changed

+676
-45
lines changed

11 files changed

+676
-45
lines changed

modules/repository-s3/qa/insecure-credentials/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
import org.apache.logging.log4j.LogManager;
1818
import org.apache.logging.log4j.Logger;
1919
import org.elasticsearch.client.internal.node.NodeClient;
20+
import org.elasticsearch.cluster.project.ProjectResolver;
21+
import org.elasticsearch.cluster.service.ClusterService;
2022
import org.elasticsearch.common.settings.MockSecureSettings;
2123
import org.elasticsearch.common.settings.Settings;
2224
import org.elasticsearch.common.settings.SettingsFilter;
@@ -256,7 +258,12 @@ public ProxyS3RepositoryPlugin(Settings settings) {
256258
}
257259

258260
@Override
259-
S3Service s3Service(Environment environment, Settings nodeSettings, ResourceWatcherService resourceWatcherService) {
261+
S3Service s3Service(
262+
Environment environment,
263+
ClusterService clusterService,
264+
ProjectResolver projectResolver,
265+
ResourceWatcherService resourceWatcherService
266+
) {
260267
return new ProxyS3Service(environment, nodeSettings, resourceWatcherService);
261268
}
262269

@@ -293,8 +300,13 @@ public static final class ProxyS3Service extends S3Service {
293300

294301
private static final Logger logger = LogManager.getLogger(ProxyS3Service.class);
295302

296-
ProxyS3Service(Environment environment, Settings nodeSettings, ResourceWatcherService resourceWatcherService) {
297-
super(environment, nodeSettings, resourceWatcherService, () -> null);
303+
ProxyS3Service(
304+
Environment environment,
305+
ClusterService clusterService,
306+
ProjectResolver projectResolver,
307+
ResourceWatcherService resourceWatcherService
308+
) {
309+
super(environment, clusterService, projectResolver, resourceWatcherService, () -> null);
298310
}
299311

300312
@Override
Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.repositories.s3;
11+
12+
import org.elasticsearch.action.ActionListener;
13+
import org.elasticsearch.action.support.SubscribableListener;
14+
import org.elasticsearch.cluster.ClusterChangedEvent;
15+
import org.elasticsearch.cluster.ClusterStateListener;
16+
import org.elasticsearch.cluster.metadata.ProjectId;
17+
import org.elasticsearch.cluster.metadata.ProjectMetadata;
18+
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
19+
import org.elasticsearch.common.settings.ProjectSecrets;
20+
import org.elasticsearch.common.settings.Settings;
21+
import org.elasticsearch.common.util.Maps;
22+
import org.elasticsearch.core.IOUtils;
23+
import org.elasticsearch.core.Nullable;
24+
import org.elasticsearch.logging.LogManager;
25+
import org.elasticsearch.logging.Logger;
26+
27+
import java.io.Closeable;
28+
import java.util.ArrayList;
29+
import java.util.Collections;
30+
import java.util.HashMap;
31+
import java.util.List;
32+
import java.util.Map;
33+
import java.util.concurrent.ConcurrentHashMap;
34+
import java.util.concurrent.CountDownLatch;
35+
import java.util.concurrent.Executor;
36+
import java.util.concurrent.TimeUnit;
37+
import java.util.concurrent.atomic.AtomicBoolean;
38+
import java.util.function.Function;
39+
import java.util.stream.Collectors;
40+
41+
public class S3PerProjectClientManager implements ClusterStateListener {
42+
43+
private static final Logger logger = LogManager.getLogger(S3PerProjectClientManager.class);
44+
45+
private final Settings settings;
46+
private final Function<S3ClientSettings, AmazonS3Reference> clientBuilder;
47+
private final Executor executor;
48+
// A map of projectId to clients holder. Adding to and removing from the map happen only in the cluster state listener thread.
49+
private final Map<ProjectId, ClientsHolder> projectClientsHolders;
50+
// Listener for tracking ongoing async closing of obsolete clients. Updated only in the cluster state listener thread.
51+
private volatile SubscribableListener<Void> clientsCloseListener = null;
52+
53+
S3PerProjectClientManager(Settings settings, Function<S3ClientSettings, AmazonS3Reference> clientBuilder, Executor executor) {
54+
this.settings = settings;
55+
this.clientBuilder = clientBuilder;
56+
this.executor = executor;
57+
this.projectClientsHolders = new ConcurrentHashMap<>();
58+
}
59+
60+
// visible for tests
61+
Map<ProjectId, ClientsHolder> getProjectClientsHolders() {
62+
return Map.copyOf(projectClientsHolders);
63+
}
64+
65+
public void clusterChanged(ClusterChangedEvent event) {
66+
final Map<ProjectId, ProjectMetadata> currentProjects = event.state().metadata().projects();
67+
68+
final var updatedPerProjectClients = new HashMap<ProjectId, ClientsHolder>();
69+
for (var project : currentProjects.values()) {
70+
final ProjectSecrets projectSecrets = project.custom(ProjectSecrets.TYPE);
71+
if (projectSecrets == null) {
72+
// This can only happen when a node restarts, it will be processed again when file settings are loaded
73+
continue;
74+
}
75+
final Settings currentSettings = Settings.builder()
76+
// merge with static settings such as max retries etc, exclude secure settings
77+
// TODO: We may need to update this if per-project settings decide to support hierarchical overrides
78+
.put(settings, false) // do not fallback to cluster scoped secrets
79+
.setSecureSettings(projectSecrets.getSettings())
80+
.build();
81+
final Map<String, S3ClientSettings> clientSettings = S3ClientSettings.load(currentSettings)
82+
.entrySet()
83+
.stream()
84+
// Skip project clients that have no credentials configured. This should not happen in serverless since all clients should
85+
// have credentials configured. But it is safer to skip them.
86+
.filter(entry -> entry.getValue().credentials != null)
87+
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue));
88+
89+
// TODO: clientSettings should not be empty, i.e. there should be at least one client configured
90+
// Maybe log a warning if it is empty and continue. The project will not have usable client but that is probably ok.
91+
92+
// TODO: Building and comparing the whole S3ClientSettings may be inefficient, we could just compare the relevant secrets
93+
if (newOrUpdated(project.id(), clientSettings)) {
94+
updatedPerProjectClients.put(project.id(), new ClientsHolder(clientSettings));
95+
}
96+
}
97+
98+
final List<ClientsHolder> clientsHoldersToClose = new ArrayList<>();
99+
// Updated projects
100+
for (var projectId : updatedPerProjectClients.keySet()) {
101+
final var old = projectClientsHolders.put(projectId, updatedPerProjectClients.get(projectId));
102+
if (old != null) {
103+
clientsHoldersToClose.add(old);
104+
}
105+
}
106+
// removed projects
107+
for (var projectId : projectClientsHolders.keySet()) {
108+
if (currentProjects.containsKey(projectId) == false) {
109+
final var removed = projectClientsHolders.remove(projectId);
110+
assert removed != null;
111+
clientsHoldersToClose.add(removed);
112+
}
113+
}
114+
// Close stale clients asynchronously without blocking the cluster state thread
115+
if (clientsHoldersToClose.isEmpty() == false) {
116+
final var currentClientsCloseListener = new SubscribableListener<Void>();
117+
final var previousClientsCloseListener = clientsCloseListener;
118+
clientsCloseListener = currentClientsCloseListener;
119+
if (previousClientsCloseListener != null && previousClientsCloseListener.isDone() == false) {
120+
previousClientsCloseListener.addListener(
121+
ActionListener.running(() -> closeClientsAsync(clientsHoldersToClose, currentClientsCloseListener))
122+
);
123+
} else {
124+
closeClientsAsync(clientsHoldersToClose, currentClientsCloseListener);
125+
}
126+
}
127+
}
128+
129+
private void closeClientsAsync(List<ClientsHolder> clientsHoldersToClose, ActionListener<Void> listener) {
130+
executor.execute(() -> {
131+
IOUtils.closeWhileHandlingException(clientsHoldersToClose);
132+
listener.onResponse(null);
133+
});
134+
}
135+
136+
public AmazonS3Reference client(ProjectId projectId, RepositoryMetadata repositoryMetadata) {
137+
final var clientsHolder = projectClientsHolders.get(projectId);
138+
if (clientsHolder == null) {
139+
throw new IllegalArgumentException("project [" + projectId + "] does not exist");
140+
}
141+
final String clientName = S3Repository.CLIENT_NAME.get(repositoryMetadata.settings());
142+
return clientsHolder.client(clientName);
143+
}
144+
145+
/**
146+
* Similar to S3Service#releaseCachedClients but only clears the cache for the given project.
147+
* All clients for the project are closed and will be recreated on next access, also similar to S3Service#releaseCachedClients
148+
*/
149+
public void releaseProjectClients(ProjectId projectId) {
150+
final var old = projectClientsHolders.get(projectId);
151+
if (old != null) {
152+
old.clearCache();
153+
}
154+
}
155+
156+
/**
157+
* Shutdown the manager by closing all clients holders. This is called when the node is shutting down.
158+
* It attempts to wait (1 min) for any async client closing to complete.
159+
*/
160+
public void close() {
161+
IOUtils.closeWhileHandlingException(projectClientsHolders.values());
162+
final var currentClientsCloseListener = clientsCloseListener;
163+
if (currentClientsCloseListener != null && currentClientsCloseListener.isDone() == false) {
164+
// Wait for async clients closing to be completed
165+
final CountDownLatch latch = new CountDownLatch(1);
166+
currentClientsCloseListener.addListener(ActionListener.running(latch::countDown));
167+
try {
168+
if (latch.await(1, TimeUnit.MINUTES) == false) {
169+
logger.warn("async closing of s3 clients timed out");
170+
}
171+
} catch (InterruptedException e) {
172+
Thread.currentThread().interrupt();
173+
}
174+
}
175+
}
176+
177+
// visible for tests
178+
@Nullable
179+
SubscribableListener<Void> getClientsCloseListener() {
180+
return clientsCloseListener;
181+
}
182+
183+
private boolean newOrUpdated(ProjectId projectId, Map<String, S3ClientSettings> currentClientSettings) {
184+
final var old = projectClientsHolders.get(projectId);
185+
if (old == null) {
186+
return true;
187+
}
188+
return currentClientSettings.equals(old.clientSettings()) == false;
189+
}
190+
191+
/**
192+
* Holder class of s3 clients for a single project. It is instantiated in the cluster state thread with client
193+
* settings. The clients are created and cached lazily when the {@link #client(String)} method is called.
194+
* Cached clients are closed and cleared out when the {@link #clearCache()} method is called. Subsequent calls to
195+
* {@link #client(String)} will recreate them. The call to {@link #close()} method clears the cache as well but
196+
* also flags the holder to be closed so that no new clients can be created.
197+
*/
198+
final class ClientsHolder implements Closeable {
199+
private final AtomicBoolean closed = new AtomicBoolean(false);
200+
private final Map<String, S3ClientSettings> clientSettings;
201+
// Client name -> client reference
202+
private volatile Map<String, AmazonS3Reference> clientsCache = Collections.emptyMap();
203+
204+
ClientsHolder(Map<String, S3ClientSettings> clientSettings) {
205+
this.clientSettings = clientSettings;
206+
}
207+
208+
Map<String, S3ClientSettings> clientSettings() {
209+
return clientSettings;
210+
}
211+
212+
AmazonS3Reference client(String clientName) {
213+
final var clientReference = clientsCache.get(clientName);
214+
// It is ok to retrieve an existing client when the cache is being cleared or the holder is closing.
215+
// As long as there are paired incRef/decRef calls, the client will be closed when the last reference is released
216+
// by either the caller of this method or the clearCache() method.
217+
if (clientReference != null && clientReference.tryIncRef()) {
218+
return clientReference;
219+
}
220+
final var settings = clientSettings.get(clientName);
221+
if (settings == null) {
222+
throw new IllegalArgumentException("client [" + clientName + "] does not exist");
223+
}
224+
synchronized (this) {
225+
final var existing = clientsCache.get(clientName);
226+
if (existing != null && existing.tryIncRef()) {
227+
return existing;
228+
}
229+
if (closed.get()) {
230+
// Not adding a new client once the manager is closed since there won't be anything to close it
231+
throw new IllegalStateException("client manager is closed");
232+
}
233+
// The close() method maybe called after we checked it, it is ok since we are already inside the synchronized block.
234+
// The clearCache() will clear the newly added client.
235+
final var newClientReference = clientBuilder.apply(settings);
236+
clientsCache = Maps.copyMapWithAddedEntry(clientsCache, clientName, newClientReference);
237+
return newClientReference;
238+
}
239+
}
240+
241+
/**
242+
* Clear the cache by closing and clear out all clients. Subsequent {@link #client(String)} calls will recreate
243+
* the clients and populate the cache again.
244+
*/
245+
synchronized void clearCache() {
246+
IOUtils.closeWhileHandlingException(clientsCache.values());
247+
clientsCache = Collections.emptyMap();
248+
}
249+
250+
/**
251+
* Similar to {@link #clearCache()} but also flag the holder to be closed so that no new client can be created.
252+
*/
253+
public void close() {
254+
if (closed.compareAndSet(false, true)) {
255+
clearCache();
256+
}
257+
}
258+
259+
// visible for tests
260+
boolean isClosed() {
261+
return closed.get();
262+
}
263+
}
264+
}

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.apache.lucene.util.SetOnce;
1616
import org.elasticsearch.SpecialPermission;
1717
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
18+
import org.elasticsearch.cluster.project.ProjectResolver;
1819
import org.elasticsearch.cluster.service.ClusterService;
1920
import org.elasticsearch.common.settings.Setting;
2021
import org.elasticsearch.common.settings.Settings;
@@ -87,13 +88,20 @@ protected S3Repository createRepository(
8788

8889
@Override
8990
public Collection<?> createComponents(PluginServices services) {
90-
service.set(s3Service(services.environment(), services.clusterService().getSettings(), services.resourceWatcherService()));
91+
service.set(
92+
s3Service(services.environment(), services.clusterService(), services.projectResolver(), services.resourceWatcherService())
93+
);
9194
this.service.get().refreshAndClearCache(S3ClientSettings.load(settings));
9295
return List.of(service.get());
9396
}
9497

95-
S3Service s3Service(Environment environment, Settings nodeSettings, ResourceWatcherService resourceWatcherService) {
96-
return new S3Service(environment, nodeSettings, resourceWatcherService, S3RepositoryPlugin::getDefaultRegion);
98+
S3Service s3Service(
99+
Environment environment,
100+
ClusterService clusterService,
101+
ProjectResolver projectResolver,
102+
ResourceWatcherService resourceWatcherService
103+
) {
104+
return new S3Service(environment, clusterService, projectResolver, resourceWatcherService, S3RepositoryPlugin::getDefaultRegion);
97105
}
98106

99107
private static Region getDefaultRegion() {

0 commit comments

Comments
 (0)