Skip to content

Commit b5e8fb1

Browse files
committed
fork client closing
1 parent 35cd05b commit b5e8fb1

File tree

2 files changed

+15
-5
lines changed

2 files changed

+15
-5
lines changed

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3PerProjectClientManager.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,23 +23,28 @@
2323
import org.elasticsearch.core.IOUtils;
2424

2525
import java.io.Closeable;
26+
import java.util.ArrayList;
2627
import java.util.Collections;
2728
import java.util.HashMap;
29+
import java.util.List;
2830
import java.util.Map;
2931
import java.util.concurrent.ConcurrentHashMap;
32+
import java.util.concurrent.Executor;
3033
import java.util.concurrent.atomic.AtomicBoolean;
3134
import java.util.function.Function;
3235

3336
public class S3PerProjectClientManager implements ClusterStateListener {
3437

3538
private final Settings settings;
3639
private final Function<S3ClientSettings, AmazonS3> clientBuilder;
40+
private final Executor executor;
3741
// A map of projectId to clients holder. Adding to and removing from the map happen only with the cluster state listener thread.
3842
private final Map<ProjectId, ClientsHolder> perProjectClientsCache;
3943

40-
public S3PerProjectClientManager(Settings settings, Function<S3ClientSettings, AmazonS3> clientBuilder) {
44+
public S3PerProjectClientManager(Settings settings, Function<S3ClientSettings, AmazonS3> clientBuilder, Executor executor) {
4145
this.settings = settings;
4246
this.clientBuilder = clientBuilder;
47+
this.executor = executor;
4348
this.perProjectClientsCache = new ConcurrentHashMap<>();
4449
}
4550

@@ -70,22 +75,25 @@ public void clusterChanged(ClusterChangedEvent event) {
7075
}
7176
}
7277

78+
final List<ClientsHolder> clientsHoldersToClose = new ArrayList<>();
7379
// Updated projects
7480
for (var projectId : updatedPerProjectClients.keySet()) {
7581
final var old = perProjectClientsCache.put(projectId, updatedPerProjectClients.get(projectId));
7682
if (old != null) {
77-
old.close();
83+
clientsHoldersToClose.add(old);
7884
}
7985
}
80-
8186
// removed projects
8287
for (var projectId : perProjectClientsCache.keySet()) {
8388
if (currentProjects.containsKey(projectId) == false) {
8489
final var removed = perProjectClientsCache.remove(projectId);
8590
assert removed != null;
86-
removed.close();
91+
clientsHoldersToClose.add(removed);
8792
}
8893
}
94+
if (clientsHoldersToClose.isEmpty() == false) {
95+
executor.execute(() -> IOUtils.closeWhileHandlingException(clientsHoldersToClose));
96+
}
8997
}
9098

9199
public AmazonS3Reference client(ProjectId projectId, RepositoryMetadata repositoryMetadata) {

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.plugins.RepositoryPlugin;
2828
import org.elasticsearch.repositories.RepositoriesMetrics;
2929
import org.elasticsearch.repositories.Repository;
30+
import org.elasticsearch.threadpool.ThreadPool;
3031
import org.elasticsearch.watcher.ResourceWatcherService;
3132
import org.elasticsearch.xcontent.NamedXContentRegistry;
3233

@@ -91,7 +92,8 @@ public Collection<?> createComponents(PluginServices services) {
9192
if (services.projectResolver().supportsMultipleProjects()) {
9293
s3PerProjectClientManager = new S3PerProjectClientManager(
9394
settings,
94-
s3ClientSettings -> this.service.get().buildClient(s3ClientSettings)
95+
s3ClientSettings -> this.service.get().buildClient(s3ClientSettings),
96+
services.threadPool().executor(ThreadPool.Names.GENERIC)
9597
);
9698
services.clusterService().addListener(s3PerProjectClientManager);
9799
}

0 commit comments

Comments
 (0)