Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
33dc439
WIP
ywangd May 2, 2025
a8dc278
add more tests
ywangd May 3, 2025
a318fc8
tweaks
ywangd May 3, 2025
c3b36e7
add jira issue
ywangd May 5, 2025
fdc9c1d
applier
ywangd May 9, 2025
acf6da4
Merge remote-tracking branch 'origin/main' into per-project-s3-clients
ywangd May 20, 2025
0f1bdd5
abstract runnable
ywangd May 20, 2025
9bc5d09
nodeSettings
ywangd May 20, 2025
d665303
Merge remote-tracking branch 'origin/main' into per-project-s3-clients
ywangd May 21, 2025
55c697c
generic
ywangd May 21, 2025
833e085
consolidate
ywangd May 21, 2025
4d7bd69
no wait for async close
ywangd May 21, 2025
6ce0828
update
ywangd May 21, 2025
616a770
tweak
ywangd May 21, 2025
39fa184
tweak
ywangd May 21, 2025
922327b
more assertion
ywangd May 22, 2025
f4537ef
no client creation after manager closing
ywangd May 28, 2025
cf7cf7f
Merge remote-tracking branch 'origin/main' into per-project-s3-clients
ywangd May 28, 2025
208a540
Merge branch 'main' into per-project-s3-clients
ywangd May 28, 2025
ab5f911
remove dead code
ywangd May 28, 2025
dc14ef7
[CI] Auto commit changes from spotless
May 28, 2025
ff631de
separate field for cluster clients
ywangd May 29, 2025
3962106
assert no double close
ywangd May 29, 2025
6cc05c6
use lightweight thread pool
ywangd May 29, 2025
f17e3e9
Merge remote-tracking branch 'origin/main' into per-project-s3-clients
ywangd May 29, 2025
ef5631b
tweak
ywangd May 29, 2025
9066af3
comments
ywangd May 29, 2025
4048064
fix test
ywangd May 29, 2025
38ee3b0
Merge branch 'main' into per-project-s3-clients
elasticmachine May 29, 2025
e1862dd
comment
ywangd May 29, 2025
9ab977b
Merge branch 'main' into per-project-s3-clients
elasticmachine May 30, 2025
d3c0dbb
Merge branch 'main' into per-project-s3-clients
elasticmachine May 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
Expand Down Expand Up @@ -256,8 +258,13 @@ public ProxyS3RepositoryPlugin(Settings settings) {
}

@Override
S3Service s3Service(Environment environment, Settings nodeSettings, ResourceWatcherService resourceWatcherService) {
return new ProxyS3Service(environment, nodeSettings, resourceWatcherService);
S3Service s3Service(
Environment environment,
ClusterService clusterService,
ProjectResolver projectResolver,
ResourceWatcherService resourceWatcherService
) {
return new ProxyS3Service(environment, clusterService, projectResolver, resourceWatcherService);
}

/**
Expand Down Expand Up @@ -293,8 +300,13 @@ public static final class ProxyS3Service extends S3Service {

private static final Logger logger = LogManager.getLogger(ProxyS3Service.class);

ProxyS3Service(Environment environment, Settings nodeSettings, ResourceWatcherService resourceWatcherService) {
super(environment, nodeSettings, resourceWatcherService, () -> null);
ProxyS3Service(
Environment environment,
ClusterService clusterService,
ProjectResolver projectResolver,
ResourceWatcherService resourceWatcherService
) {
super(environment, clusterService, projectResolver, resourceWatcherService, () -> null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,283 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.repositories.s3;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.common.settings.ProjectSecrets;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;

public class S3PerProjectClientManager implements ClusterStateApplier {

private static final Logger logger = LogManager.getLogger(S3PerProjectClientManager.class);
private static final String S3_SETTING_PREFIX = "s3.";

private final Settings nodeS3Settings;
private final Function<S3ClientSettings, AmazonS3Reference> clientBuilder;
private final Executor executor;
// A map of projectId to clients holder. Adding to and removing from the map happen only in the applier thread.
private final Map<ProjectId, ClientsHolder> projectClientsHolders;
// Listener for tracking ongoing async closing of obsolete clients. Updated only in the applier thread.
private volatile SubscribableListener<Void> clientsCloseListener = null;

S3PerProjectClientManager(Settings settings, Function<S3ClientSettings, AmazonS3Reference> clientBuilder, Executor executor) {
this.nodeS3Settings = Settings.builder()
.put(settings.getByPrefix(S3_SETTING_PREFIX), false) // not rely on any cluster scoped secrets
.normalizePrefix(S3_SETTING_PREFIX)
.build();
this.clientBuilder = clientBuilder;
this.executor = executor;
this.projectClientsHolders = new ConcurrentHashMap<>();
}

// visible for tests
Map<ProjectId, ClientsHolder> getProjectClientsHolders() {
return Map.copyOf(projectClientsHolders);
}

@Override
public void applyClusterState(ClusterChangedEvent event) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change this to be an applier instead of a listener so that the repository (also created in an applier) is fully functional in any listeners of the same cluster state. This is necessary so that the repository can be used to bootstrapping a new project.

final Map<ProjectId, ProjectMetadata> currentProjects = event.state().metadata().projects();

final var updatedPerProjectClients = new HashMap<ProjectId, ClientsHolder>();
final List<ClientsHolder> clientsHoldersToClose = new ArrayList<>();
for (var project : currentProjects.values()) {
final ProjectSecrets projectSecrets = project.custom(ProjectSecrets.TYPE);
// Project secrets can be null when node restarts. It may not have any s3 credentials if s3 is not in use.
if (projectSecrets == null || projectSecrets.getSettingNames().stream().noneMatch(key -> key.startsWith("s3."))) {
// Most likely there won't be any existing client, but attempt to remove it anyway just in case
final ClientsHolder removed = projectClientsHolders.remove(project.id());
if (removed != null) {
clientsHoldersToClose.add(removed);
}
continue;
}

final Settings currentSettings = Settings.builder()
// merge with static settings such as max retries etc
// TODO: https://elasticco.atlassian.net/browse/ES-11716 Consider change this to use per-project settings
.put(nodeS3Settings)
.setSecureSettings(projectSecrets.getSettings())
.build();
final Map<String, S3ClientSettings> clientSettings = S3ClientSettings.load(currentSettings)
.entrySet()
.stream()
// Skip project clients that have no credentials configured. This should not happen in serverless.
// But it is safer to skip them and is also a more consistent behaviour with the cases when
// project secrets are not present.
.filter(entry -> entry.getValue().credentials != null)
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue));

if (clientSettings.isEmpty()) {
// clientSettings should not be empty, i.e. there should be at least one client configured.
// But if it does somehow happen, log a warning and continue. The project will not have usable client but that is ok.
logger.warn("Skipping project [{}] with no client settings", project.id());
continue;
}

// TODO: If performance is an issue, we may consider comparing just the relevant project secrets for new or updated clients
// and avoid building the clientSettings
if (newOrUpdated(project.id(), clientSettings)) {
updatedPerProjectClients.put(project.id(), new ClientsHolder(project.id(), clientSettings));
}
}

// Updated projects
for (var projectId : updatedPerProjectClients.keySet()) {
final var old = projectClientsHolders.put(projectId, updatedPerProjectClients.get(projectId));
if (old != null) {
clientsHoldersToClose.add(old);
}
}
// removed projects
for (var projectId : projectClientsHolders.keySet()) {
if (currentProjects.containsKey(projectId) == false) {
final var removed = projectClientsHolders.remove(projectId);
assert removed != null;
clientsHoldersToClose.add(removed);
}
}
// Close stale clients asynchronously without blocking the applier thread
if (clientsHoldersToClose.isEmpty() == false) {
final var currentClientsCloseListener = new SubscribableListener<Void>();
final var previousClientsCloseListener = clientsCloseListener;
clientsCloseListener = currentClientsCloseListener;
if (previousClientsCloseListener != null && previousClientsCloseListener.isDone() == false) {
previousClientsCloseListener.addListener(
ActionListener.running(() -> closeClientsAsync(clientsHoldersToClose, currentClientsCloseListener))
);
} else {
closeClientsAsync(clientsHoldersToClose, currentClientsCloseListener);
}
}
}

private void closeClientsAsync(List<ClientsHolder> clientsHoldersToClose, ActionListener<Void> listener) {
executor.execute(() -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: please use an AbstractRunnable (here ActionRunnable.run) rather than executing a bare Runnable, so that exceptions (e.g. a rejection) still complete the listener.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should have learnt this by now. Thanks for spotting it. See 0f1bdd5

IOUtils.closeWhileHandlingException(clientsHoldersToClose);
listener.onResponse(null);
});
}

public AmazonS3Reference client(ProjectId projectId, String clientName) {
assert projectId != null && ProjectId.DEFAULT.equals(projectId) == false : projectId;
final var clientsHolder = projectClientsHolders.get(projectId);
if (clientsHolder == null) {
throw new IllegalArgumentException("no s3 client is configured for project [" + projectId + "]");
}
return clientsHolder.client(clientName);
}

/**
* Similar to S3Service#releaseCachedClients but only clears the cache for the given project.
* All clients for the project are closed and will be recreated on next access, also similar to S3Service#releaseCachedClients
*/
public void releaseProjectClients(ProjectId projectId) {
assert projectId != null && ProjectId.DEFAULT.equals(projectId) == false : projectId;
final var old = projectClientsHolders.get(projectId);
if (old != null) {
old.clearCache();
}
}

/**
* Shutdown the manager by closing all clients holders. This is called when the node is shutting down.
* It attempts to wait (1 min) for any async client closing to complete.
*/
public void close() {
IOUtils.closeWhileHandlingException(projectClientsHolders.values());
final var currentClientsCloseListener = clientsCloseListener;
if (currentClientsCloseListener != null && currentClientsCloseListener.isDone() == false) {
// Wait for async clients closing to be completed
final CountDownLatch latch = new CountDownLatch(1);
currentClientsCloseListener.addListener(ActionListener.running(latch::countDown));
try {
if (latch.await(1, TimeUnit.MINUTES) == false) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure we need to wait here? With S3Service today we just decref the clients and move on, allowing any ongoing uses to finish and shut down separately. Is that not ok here?

If so, why 1 minute? This needs to be configurable in case we need to change it. And if it's ok to proceed after a minute why wait at all?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The wait here is to ensure S3Service does not close until all asyncly closed clients (from the applier threads) have finished closing per your comment. But maybe I misunderstood what you were suggesting?

The 1 minute is meant to be a best effort to close things cleanly similar to the wait for IndicesService to close in 1min in Stateless.java. It's unlike to happen. If somehow a close call does stall, it's not fatal and should not prevent progress (decRef can still trigger closeInternal and close both S3Client and SdkHttpClient inline which might take a bit longer?). I can make it configurable if necessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm I now wonder what I meant there. Tho at the time I wrote that these things had a kinda weird lifecycle, fixed in the upgrade to SDKv2, so maybe I was worried about that. As things stand now it should be safe to just decref each client and move on.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be safe to just decref each client and move on

The async closing is to avoid having synchronized method calls in the applier thread. I think this point still stands. Do you mean we no longer need to track the close listeners and assume close calls are completed fast enough so that there is no need to wait for them to finish when the node shuts down?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, +1 to async closing still, just I don't see a need to block the shutdown here. We're not assuming that close calls happen fast enough, we're relying on the refcounting and other lifecycle management to do the right thing even for clients that other subsystems happen to be using while we're shutting down.

logger.warn("Waiting for async closing of s3 clients timed out");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

// visible for tests
@Nullable
SubscribableListener<Void> getClientsCloseListener() {
return clientsCloseListener;
}

private boolean newOrUpdated(ProjectId projectId, Map<String, S3ClientSettings> currentClientSettings) {
final var old = projectClientsHolders.get(projectId);
if (old == null) {
return true;
}
return currentClientSettings.equals(old.clientSettings()) == false;
}

/**
* Holder class of s3 clients for a single project. It is instantiated in the cluster state thread with client
* settings. The clients are created and cached lazily when the {@link #client(String)} method is called.
* Cached clients are closed and cleared out when the {@link #clearCache()} method is called. Subsequent calls to
* {@link #client(String)} will recreate them. The call to {@link #close()} method clears the cache as well but
* also flags the holder to be closed so that no new clients can be created.
*/
final class ClientsHolder implements Closeable {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final ProjectId projectId;
private final Map<String, S3ClientSettings> clientSettings;
// Client name -> client reference
private volatile Map<String, AmazonS3Reference> clientsCache = Collections.emptyMap();

ClientsHolder(ProjectId projectId, Map<String, S3ClientSettings> clientSettings) {
this.projectId = projectId;
this.clientSettings = clientSettings;
}

Map<String, S3ClientSettings> clientSettings() {
return clientSettings;
}

AmazonS3Reference client(String clientName) {
final var clientReference = clientsCache.get(clientName);
// It is ok to retrieve an existing client when the cache is being cleared or the holder is closing.
// As long as there are paired incRef/decRef calls, the client will be closed when the last reference is released
// by either the caller of this method or the clearCache() method.
if (clientReference != null && clientReference.tryIncRef()) {
return clientReference;
}
final var settings = clientSettings.get(clientName);
if (settings == null) {
throw new IllegalArgumentException("s3 client [" + clientName + "] does not exist for project [" + projectId + "]");
}
synchronized (this) {
final var existing = clientsCache.get(clientName);
if (existing != null && existing.tryIncRef()) {
return existing;
}
if (closed.get()) {
// Not adding a new client once the manager is closed since there won't be anything to close it
throw new IllegalStateException("client manager is closed");
}
// The close() method maybe called after we checked it, it is ok since we are already inside the synchronized block.
// The clearCache() will clear the newly added client.
final var newClientReference = clientBuilder.apply(settings);
clientsCache = Maps.copyMapWithAddedEntry(clientsCache, clientName, newClientReference);
return newClientReference;
}
}

/**
* Clear the cache by closing and clear out all clients. Subsequent {@link #client(String)} calls will recreate
* the clients and populate the cache again.
*/
synchronized void clearCache() {
IOUtils.closeWhileHandlingException(clientsCache.values());
clientsCache = Collections.emptyMap();
}

/**
* Similar to {@link #clearCache()} but also flag the holder to be closed so that no new client can be created.
*/
public void close() {
if (closed.compareAndSet(false, true)) {
clearCache();
}
}

// visible for tests
boolean isClosed() {
return closed.get();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -87,13 +88,20 @@ protected S3Repository createRepository(

@Override
public Collection<?> createComponents(PluginServices services) {
service.set(s3Service(services.environment(), services.clusterService().getSettings(), services.resourceWatcherService()));
service.set(
s3Service(services.environment(), services.clusterService(), services.projectResolver(), services.resourceWatcherService())
);
this.service.get().refreshAndClearCache(S3ClientSettings.load(settings));
return List.of(service.get());
}

S3Service s3Service(Environment environment, Settings nodeSettings, ResourceWatcherService resourceWatcherService) {
return new S3Service(environment, nodeSettings, resourceWatcherService, S3RepositoryPlugin::getDefaultRegion);
S3Service s3Service(
Environment environment,
ClusterService clusterService,
ProjectResolver projectResolver,
ResourceWatcherService resourceWatcherService
) {
return new S3Service(environment, clusterService, projectResolver, resourceWatcherService, S3RepositoryPlugin::getDefaultRegion);
}

private static Region getDefaultRegion() {
Expand Down
Loading