Skip to content

Commit ff631de

Browse files
committed
separate field for cluster clients
1 parent dc14ef7 commit ff631de

File tree

3 files changed

+87
-69
lines changed

3 files changed

+87
-69
lines changed

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3ClientsManager.java

Lines changed: 59 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,10 @@
3030
import java.util.List;
3131
import java.util.Map;
3232
import java.util.Objects;
33+
import java.util.concurrent.ConcurrentHashMap;
3334
import java.util.concurrent.Executor;
3435
import java.util.concurrent.atomic.AtomicBoolean;
3536
import java.util.function.Function;
36-
import java.util.function.UnaryOperator;
3737
import java.util.stream.Collectors;
3838

3939
import static java.util.Collections.emptyMap;
@@ -55,42 +55,45 @@ public class S3ClientsManager implements ClusterStateApplier {
5555
private final Executor executor;
5656
private final AtomicBoolean managerClosed = new AtomicBoolean(false);
5757
// A map of projectId to clients holder. Adding to and removing from the map happen only in the applier thread.
58-
private final Map<ProjectId, ClientsHolder<?>> clientsHolders;
58+
private final Map<ProjectId, PerProjectClientsHolder> perProjectClientsHolders;
59+
private final ClusterClientsHolder clusterClientsHolder;
5960

6061
S3ClientsManager(
6162
Settings nodeSettings,
6263
Function<S3ClientSettings, AmazonS3Reference> clientBuilder,
63-
UnaryOperator<Map<ProjectId, ClientsHolder<?>>> clientsHoldersWrapper,
64-
Executor executor
64+
Executor executor,
65+
boolean supportsMultipleProjects
6566
) {
6667
this.nodeS3Settings = Settings.builder()
6768
.put(nodeSettings.getByPrefix(S3_SETTING_PREFIX), false) // not rely on any cluster scoped secrets
6869
.normalizePrefix(S3_SETTING_PREFIX)
6970
.build();
7071
this.clientBuilder = clientBuilder;
7172
this.executor = executor;
72-
this.clientsHolders = clientsHoldersWrapper.apply(Map.of(ProjectId.DEFAULT, new ClusterClientsHolder()));
73+
this.clusterClientsHolder = new ClusterClientsHolder();
74+
this.perProjectClientsHolders = supportsMultipleProjects ? new ConcurrentHashMap<>() : null;
7375
}
7476

7577
@Override
7678
public void applyClusterState(ClusterChangedEvent event) {
79+
assert perProjectClientsHolders != null : "expect per-project clients holders to be non-null";
7780
final Map<ProjectId, ProjectMetadata> currentProjects = event.state().metadata().projects();
7881

7982
final var updatedPerProjectClients = new HashMap<ProjectId, PerProjectClientsHolder>();
8083
final List<PerProjectClientsHolder> clientsHoldersToClose = new ArrayList<>();
8184
for (var project : currentProjects.values()) {
82-
// Skip the default project, it is handled differently with the ReloadablePlugin interface
85+
// Skip the default project, it is tracked separately with clusterClientsHolder and
86+
// updated differently with the ReloadablePlugin interface
8387
if (ProjectId.DEFAULT.equals(project.id())) {
8488
continue;
8589
}
8690
final ProjectSecrets projectSecrets = project.custom(ProjectSecrets.TYPE);
8791
// Project secrets can be null when node restarts. It may not have any s3 credentials if s3 is not in use.
8892
if (projectSecrets == null || projectSecrets.getSettingNames().stream().noneMatch(key -> key.startsWith("s3."))) {
8993
// Most likely there won't be any existing client, but attempt to remove it anyway just in case
90-
final var removed = clientsHolders.remove(project.id());
94+
final var removed = perProjectClientsHolders.remove(project.id());
9195
if (removed != null) {
92-
assert removed instanceof PerProjectClientsHolder;
93-
clientsHoldersToClose.add((PerProjectClientsHolder) removed);
96+
clientsHoldersToClose.add(removed);
9497
}
9598
continue;
9699
}
@@ -126,28 +129,27 @@ public void applyClusterState(ClusterChangedEvent event) {
126129

127130
// Updated projects
128131
for (var projectId : updatedPerProjectClients.keySet()) {
129-
final var old = clientsHolders.put(projectId, updatedPerProjectClients.get(projectId));
132+
assert ProjectId.DEFAULT.equals(projectId) == false;
133+
final var old = perProjectClientsHolders.put(projectId, updatedPerProjectClients.get(projectId));
130134
if (old != null) {
131-
assert old instanceof PerProjectClientsHolder;
132-
clientsHoldersToClose.add((PerProjectClientsHolder) old);
135+
clientsHoldersToClose.add(old);
133136
}
134137
}
135-
// removed projects
136-
for (var projectId : clientsHolders.keySet()) {
138+
// Removed projects
139+
for (var projectId : perProjectClientsHolders.keySet()) {
140+
assert ProjectId.DEFAULT.equals(projectId) == false;
137141
if (currentProjects.containsKey(projectId) == false) {
138-
assert ProjectId.DEFAULT.equals(projectId) == false;
139-
final var removed = clientsHolders.remove(projectId);
140-
assert removed instanceof PerProjectClientsHolder;
141-
clientsHoldersToClose.add((PerProjectClientsHolder) removed);
142+
final var removed = perProjectClientsHolders.remove(projectId);
143+
clientsHoldersToClose.add(removed);
142144
}
143145
}
144146
// Close stale clients asynchronously without blocking the applier thread
145147
if (clientsHoldersToClose.isEmpty() == false) {
146-
closeClientsAsync(clientsHoldersToClose);
148+
closePerProjectClientsAsync(clientsHoldersToClose);
147149
}
148150
}
149151

150-
private void closeClientsAsync(List<PerProjectClientsHolder> clientsHoldersToClose) {
152+
private void closePerProjectClientsAsync(List<PerProjectClientsHolder> clientsHoldersToClose) {
151153
executor.execute(new AbstractRunnable() {
152154
@Override
153155
protected void doRun() throws Exception {
@@ -162,34 +164,45 @@ public void onFailure(Exception e) {
162164
}
163165

164166
// visible for tests
165-
Map<ProjectId, ClientsHolder<?>> getClientsHolders() {
166-
return Map.copyOf(clientsHolders);
167+
ClusterClientsHolder getClusterClientsHolder() {
168+
return clusterClientsHolder;
169+
}
170+
171+
// visible for tests
172+
Map<ProjectId, PerProjectClientsHolder> getPerProjectClientsHolders() {
173+
return perProjectClientsHolders == null ? null : Map.copyOf(perProjectClientsHolders);
174+
}
175+
176+
// visible for tests
177+
boolean isManagerClosed() {
178+
return managerClosed.get();
167179
}
168180

169181
void refreshAndClearCacheForClusterClients(Map<String, S3ClientSettings> clientsSettings) {
170-
final var clientsHolder = clientsHolders.get(ProjectId.DEFAULT);
171-
if (clientsHolder instanceof ClusterClientsHolder clusterClientsHolder) {
172-
clusterClientsHolder.refreshAndClearCache(clientsSettings);
173-
} else {
174-
final String message = "expect cluster clients holder, got " + clientsHolder;
175-
assert false : message;
176-
throw new IllegalStateException(message);
177-
}
182+
clusterClientsHolder.refreshAndClearCache(clientsSettings);
178183
}
179184

180185
S3ClientSettings settingsForClient(ProjectId projectId, RepositoryMetadata repositoryMetadata) {
181-
final var clientsHolder = clientsHolders.get(Objects.requireNonNull(projectId));
186+
if (ProjectId.DEFAULT.equals(Objects.requireNonNull(projectId))) {
187+
return clusterClientsHolder.singleClientSettings(repositoryMetadata);
188+
}
189+
190+
assert perProjectClientsHolders != null : "expect per-project clients holders to be non-null";
191+
final var clientsHolder = perProjectClientsHolders.get(projectId);
182192
if (clientsHolder == null) {
183-
assert ProjectId.DEFAULT.equals(projectId) == false;
184193
throw new IllegalArgumentException("no s3 client is configured for project [" + projectId + "]");
185194
}
186195
return clientsHolder.singleClientSettings(repositoryMetadata);
187196
}
188197

189198
AmazonS3Reference client(ProjectId projectId, RepositoryMetadata repositoryMetadata) {
190-
final var clientsHolder = clientsHolders.get(Objects.requireNonNull(projectId));
199+
if (ProjectId.DEFAULT.equals(Objects.requireNonNull(projectId))) {
200+
return clusterClientsHolder.client(repositoryMetadata);
201+
}
202+
203+
assert perProjectClientsHolders != null : "expect per-project clients holders to be non-null";
204+
final var clientsHolder = perProjectClientsHolders.get(projectId);
191205
if (clientsHolder == null) {
192-
assert ProjectId.DEFAULT.equals(projectId) == false;
193206
throw new IllegalArgumentException("no s3 client is configured for project [" + projectId + "]");
194207
}
195208
return clientsHolder.client(repositoryMetadata);
@@ -200,11 +213,15 @@ AmazonS3Reference client(ProjectId projectId, RepositoryMetadata repositoryMetad
200213
* All clients for the project are closed and will be recreated on next access.
201214
*/
202215
void releaseCachedClients(ProjectId projectId) {
203-
final var old = clientsHolders.get(Objects.requireNonNull(projectId));
216+
if (ProjectId.DEFAULT.equals(Objects.requireNonNull(projectId))) {
217+
clusterClientsHolder.clearCache();
218+
return;
219+
}
220+
221+
assert perProjectClientsHolders != null : "expect per-project clients holders to be non-null";
222+
final var old = perProjectClientsHolders.get(projectId);
204223
if (old != null) {
205224
old.clearCache();
206-
} else {
207-
assert ProjectId.DEFAULT.equals(projectId) == false;
208225
}
209226
}
210227

@@ -216,12 +233,15 @@ void close() {
216233
// Close all clients holders, they will close their cached clients.
217234
// It's OK if a new clients holder is added concurrently or after this point because
218235
// no new client will be created once the manager is closed, i.e. nothing to release.
219-
IOUtils.closeWhileHandlingException(clientsHolders.values());
236+
if (perProjectClientsHolders != null) {
237+
IOUtils.closeWhileHandlingException(perProjectClientsHolders.values());
238+
}
239+
IOUtils.closeWhileHandlingException(clusterClientsHolder);
220240
}
221241
}
222242

223243
private boolean newOrUpdated(ProjectId projectId, Map<String, S3ClientSettings> currentClientSettings) {
224-
final var old = clientsHolders.get(projectId);
244+
final var old = perProjectClientsHolders.get(projectId);
225245
if (old == null) {
226246
return true;
227247
}

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Service.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,8 @@
7171
import java.util.Objects;
7272
import java.util.Optional;
7373
import java.util.concurrent.CompletableFuture;
74-
import java.util.concurrent.ConcurrentHashMap;
7574
import java.util.function.Consumer;
7675
import java.util.function.Supplier;
77-
import java.util.function.UnaryOperator;
7876

7977
import static software.amazon.awssdk.core.SdkSystemSetting.AWS_ROLE_ARN;
8078
import static software.amazon.awssdk.core.SdkSystemSetting.AWS_ROLE_SESSION_NAME;
@@ -136,16 +134,16 @@ class S3Service extends AbstractLifecycleComponent {
136134
s3ClientsManager = new S3ClientsManager(
137135
nodeSettings,
138136
this::buildClientReference,
139-
projectResolver.supportsMultipleProjects() ? ConcurrentHashMap::new : UnaryOperator.identity(),
140-
clusterService.threadPool().generic()
137+
clusterService.threadPool().generic(),
138+
projectResolver.supportsMultipleProjects()
141139
);
142140
if (projectResolver.supportsMultipleProjects()) {
143141
clusterService.addHighPriorityApplier(s3ClientsManager);
144142
}
145143
}
146144

147145
// visible to tests
148-
S3ClientsManager getS3PerProjectClientManager() {
146+
S3ClientsManager getS3ClientsManager() {
149147
return s3ClientsManager;
150148
}
151149

@@ -428,6 +426,7 @@ public void onBlobStoreClose() {
428426

429427
/**
430428
* Release clients for the specified project.
429+
* @param projectId The project associated with the client, or null if the client is cluster level
431430
*/
432431
public void onBlobStoreClose(@Nullable ProjectId projectId) {
433432
s3ClientsManager.releaseCachedClients(effectiveProjectId(projectId));

modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3ClientsManagerTests.java

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -97,13 +97,14 @@ public void setUp() throws Exception {
9797
s3Service = new S3Service(
9898
mock(Environment.class),
9999
clusterService,
100-
TestProjectResolvers.allProjects(),
100+
TestProjectResolvers.allProjects(), // with multiple projects support
101101
mock(ResourceWatcherService.class),
102102
() -> Region.of("es-test-region")
103103
);
104104
s3Service.refreshAndClearCache(S3ClientSettings.load(settings));
105-
s3ClientsManager = s3Service.getS3PerProjectClientManager();
106-
assertNotNull(s3ClientsManager);
105+
s3ClientsManager = s3Service.getS3ClientsManager();
106+
assertThat(s3ClientsManager.getClusterClientsHolder().allClientSettings(), equalTo(clusterClientsSettings));
107+
assertNotNull(s3ClientsManager.getPerProjectClientsHolders());
107108
s3Service.start();
108109
}
109110

@@ -113,19 +114,21 @@ public void tearDown() throws Exception {
113114
s3Service.close();
114115
clusterService.close();
115116
threadPool.close();
116-
s3ClientsManager.getClientsHolders().forEach((projectId, clientsHolder) -> assertTrue(clientsHolder.isClosed()));
117+
assertTrue(s3ClientsManager.isManagerClosed());
118+
s3ClientsManager.getPerProjectClientsHolders().forEach((projectId, clientsHolder) -> assertTrue(clientsHolder.isClosed()));
119+
assertTrue(s3ClientsManager.getClusterClientsHolder().isClosed());
117120
}
118121

119122
public void testDoesNotCreateClientWhenSecretsAreNotConfigured() {
120-
assertThat(getClientsHoldersExcludeDefaultProject(), anEmptyMap());
123+
assertThat(s3ClientsManager.getPerProjectClientsHolders(), anEmptyMap());
121124
final ProjectId projectId = randomUniqueProjectId();
122125

123126
// No project secrets at all
124127
ClusterServiceUtils.setState(
125128
clusterService,
126129
ClusterState.builder(clusterService.state()).putProjectMetadata(ProjectMetadata.builder(projectId)).build()
127130
);
128-
assertThat(getClientsHoldersExcludeDefaultProject(), anEmptyMap());
131+
assertThat(s3ClientsManager.getPerProjectClientsHolders(), anEmptyMap());
129132

130133
// Project secrets but no s3 credentials
131134
final var mockSecureSettings = new MockSecureSettings();
@@ -142,7 +145,7 @@ public void testDoesNotCreateClientWhenSecretsAreNotConfigured() {
142145
)
143146
.build()
144147
);
145-
assertThat(getClientsHoldersExcludeDefaultProject(), anEmptyMap());
148+
assertThat(s3ClientsManager.getPerProjectClientsHolders(), anEmptyMap());
146149
}
147150

148151
public void testClientsLifeCycleForSingleProject() throws Exception {
@@ -190,9 +193,9 @@ public void testClientsLifeCycleForSingleProject() throws Exception {
190193
antherClient.decRef();
191194
}
192195

193-
final var clientsHolder = s3ClientsManager.getClientsHolders().get(projectId);
196+
final var clientsHolder = s3ClientsManager.getPerProjectClientsHolders().get(projectId);
194197

195-
// Remove project secrets
198+
// Remove project secrets or the entire project
196199
if (randomBoolean()) {
197200
updateProjectInClusterState(projectId, Map.of());
198201
} else {
@@ -242,7 +245,7 @@ public void testClientsForMultipleProjects() throws InterruptedException {
242245
} else {
243246
removeProjectFromClusterState(projectId);
244247
}
245-
assertThat(getClientsHoldersExcludeDefaultProject(), not(hasKey(projectId)));
248+
assertThat(s3ClientsManager.getPerProjectClientsHolders(), not(hasKey(projectId)));
246249
clientNames.forEach(clientName -> assertClientNotFound(projectId, clientName));
247250
}
248251
}
@@ -268,9 +271,9 @@ public void testClusterAndProjectClients() {
268271
Settings.builder().put("client", clientName).build()
269272
);
270273

271-
final AmazonS3Reference clusterClient = s3Service.client(null, repositoryMetadata);
274+
final AmazonS3Reference clusterClient = s3Service.client(projectIdForClusterClient(), repositoryMetadata);
272275
if (configureProjectClientsFirst == false) {
273-
assertThat(getClientsHoldersExcludeDefaultProject(), anEmptyMap());
276+
assertThat(s3ClientsManager.getPerProjectClientsHolders(), anEmptyMap());
274277
}
275278
clusterClient.decRef();
276279

@@ -281,10 +284,12 @@ public void testClusterAndProjectClients() {
281284
assertThat(projectClient, not(sameInstance(clusterClient)));
282285
projectClient.decRef();
283286

284-
s3Service.onBlobStoreClose(null);
287+
// Release the cluster client
288+
s3Service.onBlobStoreClose(projectIdForClusterClient());
285289
assertFalse(clusterClient.hasReferences());
286290
assertTrue(projectClient.hasReferences());
287291

292+
// Release the project client
288293
s3Service.onBlobStoreClose(projectId);
289294
assertFalse(projectClient.hasReferences());
290295
}
@@ -296,7 +301,7 @@ public void testClientsHolderAfterManagerClosed() {
296301
s3ClientsManager.close();
297302
// New holder can be added after the manager is closed, but no actual client can be created
298303
updateProjectInClusterState(projectId, newProjectClientsSecrets(projectId, clientName));
299-
try (var clientsHolder = s3ClientsManager.getClientsHolders().get(projectId)) {
304+
try (var clientsHolder = s3ClientsManager.getPerProjectClientsHolders().get(projectId)) {
300305
assertNotNull(clientsHolder);
301306
assertFalse(clientsHolder.isClosed());
302307

@@ -319,8 +324,8 @@ public void testProjectClientsDisabled() {
319324
);
320325
s3ServiceWithNoProjectSupport.refreshAndClearCache(S3ClientSettings.load(clusterService.getSettings()));
321326
s3ServiceWithNoProjectSupport.start();
322-
assertNotNull(s3ServiceWithNoProjectSupport.getS3PerProjectClientManager());
323327
verify(clusterService, never()).addHighPriorityApplier(any());
328+
assertNull(s3ServiceWithNoProjectSupport.getS3ClientsManager().getPerProjectClientsHolders());
324329

325330
// Cluster client still works
326331
final String clientName = randomFrom(clientNames);
@@ -329,24 +334,18 @@ public void testProjectClientsDisabled() {
329334
"s3",
330335
Settings.builder().put("client", clientName).build()
331336
);
332-
final AmazonS3Reference clientRef = s3ServiceWithNoProjectSupport.client(ProjectId.DEFAULT, repositoryMetadata);
337+
final AmazonS3Reference clientRef = s3ServiceWithNoProjectSupport.client(projectIdForClusterClient(), repositoryMetadata);
333338
clientRef.decRef();
334339
s3ServiceWithNoProjectSupport.close();
335340
assertFalse(clientRef.hasReferences());
336341
}
337342

338-
private Map<ProjectId, S3ClientsManager.ClientsHolder<?>> getClientsHoldersExcludeDefaultProject() {
339-
final var holders = s3ClientsManager.getClientsHolders();
340-
// Clients holder for the default project always exists
341-
assertThat(holders, hasKey(ProjectId.DEFAULT));
342-
return holders.entrySet()
343-
.stream()
344-
.filter(entry -> entry.getKey() != ProjectId.DEFAULT)
345-
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue));
343+
private ProjectId projectIdForClusterClient() {
344+
return randomBoolean() ? ProjectId.DEFAULT : null;
346345
}
347346

348347
private void assertProjectClientSettings(ProjectId projectId, String... clientNames) {
349-
final var clientsHolder = s3ClientsManager.getClientsHolders().get(projectId);
348+
final var clientsHolder = s3ClientsManager.getPerProjectClientsHolders().get(projectId);
350349
assertNotNull(clientsHolder);
351350
final Map<String, S3ClientSettings> s3ClientSettingsMap = clientsHolder.allClientSettings();
352351
assertThat(s3ClientSettingsMap.keySet(), containsInAnyOrder(clientNames));

0 commit comments

Comments
 (0)