Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.repositories.s3;

import com.amazonaws.http.IdleConnectionReaper;
import com.amazonaws.services.s3.AmazonS3;

import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.common.settings.ProjectSecrets;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CachedSupplier;
import org.elasticsearch.core.IOUtils;

import java.io.Closeable;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;

public class PerProjectClientManager implements ClusterStateListener {

// Original settings at node startup time
private final Settings settings;
private final Function<S3ClientSettings, AmazonS3> clientBuilder;

// A map of per-project clients, where the key is the project ID and the value is a map of client name to client
private final Map<ProjectId, Map<String, ClientHolder>> perProjectClientsCache;

public PerProjectClientManager(Settings settings, Function<S3ClientSettings, AmazonS3> clientBuilder) {
this.settings = settings;
this.clientBuilder = clientBuilder;
this.perProjectClientsCache = new ConcurrentHashMap<>();
}

public void clusterChanged(ClusterChangedEvent event) {
final Map<ProjectId, ProjectMetadata> currentProjects = event.state().metadata().projects();

final var updatedPerProjectClients = new HashMap<ProjectId, Map<String, ClientHolder>>();
for (var project : currentProjects.values()) {
final ProjectSecrets projectSecrets = project.custom(ProjectSecrets.TYPE);
if (projectSecrets == null) {
// This can only happen when a node restarts, it will be processed again when file settings are loaded
continue;
}
final Settings currentSettings = Settings.builder()
// merge with static settings such as max retries etc, exclude secure settings
// TODO: We may need to update this if per-project settings decide to support hierarchical overrides
.put(settings, false)
.setSecureSettings(projectSecrets.getSettings())
.build();
final Map<String, S3ClientSettings> clientSettings = S3ClientSettings.load(currentSettings);

// TODO: clientSettings should not be empty, i.e. there should be at least one client configured
// Maybe log a warning if it is empty and continue. The project will not have usable client but that is probably ok.

// TODO: Building and comparing the whole S3ClientSettings may be insufficient, we could just compare the relevant secrets
if (newOrUpdated(project.id(), clientSettings)) {
updatedPerProjectClients.put(
project.id(),
clientSettings.entrySet()
.stream()
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, entry -> new ClientHolder(entry.getValue())))
);
}
}
// Updated projects
for (var projectId : updatedPerProjectClients.keySet()) {
final Map<String, ClientHolder> old = perProjectClientsCache.put(projectId, updatedPerProjectClients.get(projectId));
if (old != null) {
IOUtils.closeWhileHandlingException(old.values());
}
}

// removed projects
for (var projectId : perProjectClientsCache.keySet()) {
if (currentProjects.containsKey(projectId) == false) {
final Map<String, ClientHolder> removed = perProjectClientsCache.remove(projectId);
assert removed != null;
IOUtils.closeWhileHandlingException(removed.values());
}
}
}

public AmazonS3Reference client(ProjectId projectId, RepositoryMetadata repositoryMetadata) {
if (perProjectClientsCache.containsKey(projectId) == false) {
throw new IllegalArgumentException("project [" + projectId + "] does not exist");
}
final String clientName = S3Repository.CLIENT_NAME.get(repositoryMetadata.settings());
final Map<String, ClientHolder> clientHolders = perProjectClientsCache.get(projectId);

if (clientHolders.containsKey(clientName) == false) {
throw new IllegalArgumentException("client [" + clientName + "] does not exist");
}

return clientHolders.get(clientName).client();
}

/**
* Similar to S3Service#releaseCachedClients but only clears the cache for the given project.
* All clients for the project are closed and will be recreated on next access. Also, similar to S3Service#releaseCachedClients
*/
public void clearCacheForProject(ProjectId projectId) {
final Map<String, ClientHolder> old = perProjectClientsCache.get(projectId);
assert old != null : projectId;
IOUtils.closeWhileHandlingException(old.values());
perProjectClientsCache.put(
projectId,
old.entrySet()
.stream()
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, entry -> new ClientHolder(entry.getValue().clientSettings())))
);
// TODO: do we need this?
// shutdown IdleConnectionReaper background thread
// it will be restarted on new client usage
IdleConnectionReaper.shutdown();
}

public void close() {
for (var clientHolders : perProjectClientsCache.values()) {
IOUtils.closeWhileHandlingException(clientHolders.values());
}
perProjectClientsCache.clear();
}

private boolean newOrUpdated(ProjectId projectId, Map<String, S3ClientSettings> currentClientSettings) {
if (perProjectClientsCache.containsKey(projectId) == false) {
return true;
}
final var previousClientSettings = perProjectClientsCache.get(projectId)
.entrySet()
.stream()
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, entry -> entry.getValue().clientSettings()));
return currentClientSettings.equals(previousClientSettings) == false;
}

private final class ClientHolder implements Closeable {
private final S3ClientSettings clientSettings;
private final CachedSupplier<AmazonS3Reference> client;

ClientHolder(S3ClientSettings clientSettings) {
this.clientSettings = clientSettings;
this.client = CachedSupplier.wrap(() -> new AmazonS3Reference(clientBuilder.apply(clientSettings)));
}

public S3ClientSettings clientSettings() {
return clientSettings;
}

public AmazonS3Reference client() {
return client.get();
}

public void close() {
client.get().decRef();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ class S3BlobStore implements BlobStore {
S3RepositoriesMetrics s3RepositoriesMetrics,
BackoffPolicy retryThrottledDeleteBackoffPolicy
) {
// TODO: add a projectId field, maybe null for cluster level blobstore
this.service = service;
this.bigArrays = bigArrays;
this.bucket = bucket;
Expand Down Expand Up @@ -310,6 +311,7 @@ public String toString() {
}

public AmazonS3Reference clientReference() {
// TODO: change to service.client(projectId, repositoryMetadata)
return service.client(repositoryMetadata);
}

Expand Down Expand Up @@ -490,6 +492,7 @@ private static DeleteObjectsRequest bulkDelete(OperationPurpose purpose, S3BlobS

@Override
public void close() throws IOException {
// TODO: change to use service.onBlobStoreClose(projectId)
service.onBlobStoreClose();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ class S3Repository extends MeteredBlobStoreRepository {
buildBasePath(metadata),
buildLocation(metadata)
);
// TODO: add a projectId field
this.service = service;
this.s3RepositoriesMetrics = s3RepositoriesMetrics;
this.snapshotExecutor = threadPool().executor(ThreadPool.Names.SNAPSHOT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.env.Environment;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.plugins.Plugin;
Expand Down Expand Up @@ -86,15 +87,40 @@ protected S3Repository createRepository(

@Override
public Collection<?> createComponents(PluginServices services) {
service.set(s3Service(services.environment(), services.clusterService().getSettings(), services.resourceWatcherService()));
PerProjectClientManager perProjectClientManager = null;
if (services.projectResolver().supportsMultipleProjects()) {
perProjectClientManager = new PerProjectClientManager(
settings,
s3ClientSettings -> this.service.get().buildClient(s3ClientSettings)
);
services.clusterService().addListener(perProjectClientManager);
}
service.set(
s3Service(
services.environment(),
services.clusterService().getSettings(),
services.resourceWatcherService(),
perProjectClientManager
)
);
this.service.get().refreshAndClearCache(S3ClientSettings.load(settings));
return List.of(service);
}

@Deprecated(forRemoval = true)
S3Service s3Service(Environment environment, Settings nodeSettings, ResourceWatcherService resourceWatcherService) {
return new S3Service(environment, nodeSettings, resourceWatcherService);
}

S3Service s3Service(
Environment environment,
Settings nodeSettings,
ResourceWatcherService resourceWatcherService,
@Nullable PerProjectClientManager perProjectClientManager
) {
return new S3Service(environment, nodeSettings, resourceWatcherService, perProjectClientManager);
}

@Override
public Map<String, Repository.Factory> getRepositories(
final Environment env,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.coordination.stateless.StoreHeartbeatService;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.env.Environment;
import org.elasticsearch.rest.RestStatus;
Expand Down Expand Up @@ -99,8 +101,19 @@ class S3Service implements Closeable {
final TimeValue compareAndExchangeTimeToLive;
final TimeValue compareAndExchangeAntiContentionDelay;
final boolean isStateless;
private final PerProjectClientManager perProjectClientManager;

@Deprecated(forRemoval = true)
S3Service(Environment environment, Settings nodeSettings, ResourceWatcherService resourceWatcherService) {
this(environment, nodeSettings, resourceWatcherService, null);
}

S3Service(
Environment environment,
Settings nodeSettings,
ResourceWatcherService resourceWatcherService,
@Nullable PerProjectClientManager perProjectClientManager
) {
webIdentityTokenCredentialsProvider = new CustomWebIdentityTokenCredentialsProvider(
environment,
System::getenv,
Expand All @@ -111,6 +124,7 @@ class S3Service implements Closeable {
compareAndExchangeTimeToLive = REPOSITORY_S3_CAS_TTL_SETTING.get(nodeSettings);
compareAndExchangeAntiContentionDelay = REPOSITORY_S3_CAS_ANTI_CONTENTION_DELAY_SETTING.get(nodeSettings);
isStateless = DiscoveryNode.isStateless(nodeSettings);
this.perProjectClientManager = perProjectClientManager;
}

/**
Expand Down Expand Up @@ -153,6 +167,26 @@ public AmazonS3Reference client(RepositoryMetadata repositoryMetadata) {
}
}

/**
* Delegates to {@link #client(RepositoryMetadata)} when
* 1. per-project client is disabled
* 2. or when the blobstore is cluster level (projectId = null)
* Otherwise, attempts to retrieve a per-project client by the project-id and repository metadata from the
* per-project client manager. Throws if project-id or the client does not exist. The client maybe initialized lazily.
*/
public AmazonS3Reference client(@Nullable ProjectId projectId, RepositoryMetadata repositoryMetadata) {
if (perProjectClientManager == null) {
// Multi-Project is disabled and we have a single default project
assert ProjectId.DEFAULT.equals(projectId) : projectId;
return client(repositoryMetadata);
} else if (projectId == null) {
// Multi-Project is enabled and we are retrieving a client for the cluster level blobstore
return client(repositoryMetadata);
} else {
return perProjectClientManager.client(projectId, repositoryMetadata);
}
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the main interface to access repo client for different scopes. Note the usage of null projectId for cluster level blobstore similar to how cluster level persistent task is handled.


/**
* Either fetches {@link S3ClientSettings} for a given {@link RepositoryMetadata} from cached settings or creates them
* by overriding static client settings from {@link #staticClientSettings} with settings found in the repository metadata.
Expand Down Expand Up @@ -307,9 +341,25 @@ public void onBlobStoreClose() {
releaseCachedClients();
}

public void onBlobStoreClose(@Nullable ProjectId projectId) {
if (perProjectClientManager == null) {
// Multi-Project is disabled and we have a single default project
assert ProjectId.DEFAULT.equals(projectId) : projectId;
onBlobStoreClose();
} else if (projectId == null) {
// Multi-Project is enabled and this is for the cluster level blobstore
onBlobStoreClose();
} else {
perProjectClientManager.clearCacheForProject(projectId);
}
}

@Override
public void close() throws IOException {
releaseCachedClients();
if (perProjectClientManager != null) {
perProjectClientManager.close();
}
webIdentityTokenCredentialsProvider.shutdown();
}

Expand Down