Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.BackoffPolicy;
import org.elasticsearch.common.blobstore.BlobContainer;
Expand Down Expand Up @@ -235,8 +236,8 @@ public TestGoogleCloudStoragePlugin(Settings settings) {
}

@Override
protected GoogleCloudStorageService createStorageService(boolean isServerless) {
return new GoogleCloudStorageService() {
protected GoogleCloudStorageService createStorageService(ClusterService clusterService, ProjectResolver projectResolver) {
return new GoogleCloudStorageService(clusterService, projectResolver) {
@Override
StorageOptions createStorageOptions(
final GoogleCloudStorageClientSettings gcsClientSettings,
Expand Down Expand Up @@ -280,7 +281,7 @@ public Map<String, Repository.Factory> getRepositories(
projectId,
metadata,
registry,
this.storageService,
this.storageService.get(),
clusterService,
bigArrays,
recoverySettings,
Expand All @@ -289,10 +290,11 @@ public Map<String, Repository.Factory> getRepositories(
@Override
protected GoogleCloudStorageBlobStore createBlobStore() {
return new GoogleCloudStorageBlobStore(
getProjectId(),
metadata.settings().get("bucket"),
"test",
metadata.name(),
storageService,
storageService.get(),
bigArrays,
randomIntBetween(1, 8) * 1024,
BackoffPolicy.noBackoff(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.BackoffPolicy;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
Expand All @@ -40,6 +41,7 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Streams;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
Expand Down Expand Up @@ -104,6 +106,8 @@ class GoogleCloudStorageBlobStore implements BlobStore {
}
}

@Nullable // for cluster level object store in MP
private final ProjectId projectId;
private final String bucketName;
private final String clientName;
private final String repositoryName;
Expand All @@ -114,6 +118,7 @@ class GoogleCloudStorageBlobStore implements BlobStore {
private final BackoffPolicy casBackoffPolicy;

GoogleCloudStorageBlobStore(
ProjectId projectId,
String bucketName,
String clientName,
String repositoryName,
Expand All @@ -123,6 +128,7 @@ class GoogleCloudStorageBlobStore implements BlobStore {
BackoffPolicy casBackoffPolicy,
GcsRepositoryStatsCollector statsCollector
) {
this.projectId = projectId;
this.bucketName = bucketName;
this.clientName = clientName;
this.repositoryName = repositoryName;
Expand All @@ -134,7 +140,7 @@ class GoogleCloudStorageBlobStore implements BlobStore {
}

private MeteredStorage client() throws IOException {
return storageService.client(clientName, repositoryName, statsCollector);
return storageService.client(projectId, clientName, repositoryName, statsCollector);
}

@Override
Expand All @@ -144,7 +150,7 @@ public BlobContainer blobContainer(BlobPath path) {

@Override
public void close() {
storageService.closeRepositoryClients(repositoryName);
storageService.closeRepositoryClients(projectId, repositoryName);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,286 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.repositories.gcs;

import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.common.CheckedBiFunction;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.ProjectSecrets;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

import static java.util.Collections.emptyMap;
import static org.elasticsearch.core.Strings.format;

public class GoogleCloudStorageClientsManager implements ClusterStateApplier {

private static final Logger logger = LogManager.getLogger(GoogleCloudStorageClientsManager.class);
private static final String GCS_SETTING_PREFIX = "gcs.";

private final Settings nodeGcsSettings;
private final CheckedBiFunction<
GoogleCloudStorageClientSettings,
GcsRepositoryStatsCollector,
MeteredStorage,
IOException> clientBuilder;
// A map of projectId to clients holder. Adding to and removing from the map happen only in the applier thread.
private final Map<ProjectId, ClientsHolder> perProjectClientsHolders;
private final ClusterClientsHolder clusterClientsHolder;

public GoogleCloudStorageClientsManager(
Settings nodeSettings,
CheckedBiFunction<GoogleCloudStorageClientSettings, GcsRepositoryStatsCollector, MeteredStorage, IOException> clientBuilder,
boolean supportsMultipleProjects
) {
this.nodeGcsSettings = Settings.builder()
.put(nodeSettings.getByPrefix(GCS_SETTING_PREFIX), false) // not rely on any cluster scoped secrets
.normalizePrefix(GCS_SETTING_PREFIX)
.build();
this.clientBuilder = clientBuilder;
if (supportsMultipleProjects) {
this.perProjectClientsHolders = ConcurrentCollections.newConcurrentMap();
} else {
this.perProjectClientsHolders = null;
}
this.clusterClientsHolder = new ClusterClientsHolder();
}

@Override
public void applyClusterState(ClusterChangedEvent event) {
assert perProjectClientsHolders != null;
final Map<ProjectId, ProjectMetadata> currentProjects = event.state().metadata().projects();

final var updatedPerProjectClients = new HashMap<ProjectId, ClientsHolder>();
for (var project : currentProjects.values()) {
// Skip the default project, it is tracked separately with clusterClientsHolder and
// updated differently with the ReloadablePlugin interface
if (ProjectId.DEFAULT.equals(project.id())) {
continue;
}
final ProjectSecrets projectSecrets = project.custom(ProjectSecrets.TYPE);
// Project secrets can be null when node restarts. It may not have any s3 credentials if s3 is not in use.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s3?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy/pasted code 🤦 pushed fbdd313

if (projectSecrets == null || projectSecrets.getSettingNames().stream().noneMatch(key -> key.startsWith(GCS_SETTING_PREFIX))) {
// Most likely there won't be any existing client, but attempt to remove it anyway just in case
perProjectClientsHolders.remove(project.id());
continue;
}

final Settings currentSettings = Settings.builder()
// merge with static settings such as max retries etc
// TODO: https://elasticco.atlassian.net/browse/ES-11716 Consider change this to use per-project settings
.put(nodeGcsSettings)
.setSecureSettings(projectSecrets.getSettings())
.build();
final Map<String, GoogleCloudStorageClientSettings> clientSettings = GoogleCloudStorageClientSettings.load(currentSettings)
.entrySet()
.stream()
// Skip project clients that have no credentials configured. This should not happen in serverless.
// But it is safer to skip them and is also a more consistent behaviour with the cases when
// project secrets are not present.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this supposed to be a transient state? Does it worth a warn log?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be a configuration problem on the CP side. Yes, we should log it. There is one for empty clients settings map after this. But I think we can combine them and continue the processing. Pushed 57de151 which also adds a test. I'll update it similarly for s3 in a separate PR.

.filter(entry -> entry.getValue().getCredential() != null)
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue));

if (clientSettings.isEmpty()) {
// clientSettings should not be empty, i.e. there should be at least one client configured.
// But if it does somehow happen, log a warning and continue. The project will not have usable client but that is ok.
logger.warn("Skipping project [{}] with no client settings", project.id());
continue;
}

// TODO: If performance is an issue, we may consider comparing just the relevant project secrets for new or updated clients
// and avoid building the clientSettings
if (newOrUpdated(project.id(), clientSettings)) {
updatedPerProjectClients.put(project.id(), new PerProjectClientsHolder(clientSettings));
}
}

// Updated projects
for (var projectId : updatedPerProjectClients.keySet()) {
assert ProjectId.DEFAULT.equals(projectId) == false;
perProjectClientsHolders.put(projectId, updatedPerProjectClients.get(projectId));
}
// Removed projects
for (var projectId : perProjectClientsHolders.keySet()) {
if (currentProjects.containsKey(projectId) == false) {
assert ProjectId.DEFAULT.equals(projectId) == false;
perProjectClientsHolders.remove(projectId);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The approach for managing per-project clients is similar to what we did for s3 but simpler. It is simpler because there is no ref-counting for the clients and we don't actively close it when it is not needed. Therefore, we simply remove and replace it when needed.

}
}
}

void refreshAndClearCacheForClusterClients(Map<String, GoogleCloudStorageClientSettings> clientsSettings) {
clusterClientsHolder.refreshAndClearCache(clientsSettings);
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used for stateful or cluster level clients to keep the behaviour identical to existing code.


MeteredStorage client(ProjectId projectId, String clientName, String repositoryName, GcsRepositoryStatsCollector statsCollector)
throws IOException {
if (projectId == null || ProjectId.DEFAULT.equals(projectId)) {
return clusterClientsHolder.client(clientName, repositoryName, statsCollector);
} else {
return getClientsHolderSafe(projectId).client(clientName, repositoryName, statsCollector);
}
}

void closeRepositoryClients(ProjectId projectId, String repositoryName) {
if (projectId == null || ProjectId.DEFAULT.equals(projectId)) {
clusterClientsHolder.closeRepositoryClients(repositoryName);
} else {
final var old = perProjectClientsHolders.get(projectId);
if (old != null) {
old.closeRepositoryClients(repositoryName);
}
}
}

// package private for tests
ClusterClientsHolder getClusterClientsHolder() {
return clusterClientsHolder;
}

// package private for tests
Map<ProjectId, ClientsHolder> getPerProjectClientsHolders() {
return perProjectClientsHolders == null ? null : Map.copyOf(perProjectClientsHolders);
}

private boolean newOrUpdated(ProjectId projectId, Map<String, GoogleCloudStorageClientSettings> currentClientSettings) {
final var old = perProjectClientsHolders.get(projectId);
if (old == null) {
return true;
}
return currentClientSettings.equals(old.allClientSettings()) == false;
}

private ClientsHolder getClientsHolderSafe(ProjectId projectId) {
assert ProjectId.DEFAULT.equals(projectId) == false;
final var clientsHolder = perProjectClientsHolders.get(projectId);
if (clientsHolder == null) {
throw new IllegalArgumentException("No GCS client is configured for project [" + projectId + "]");
}
return clientsHolder;
}

abstract class ClientsHolder {

/**
* Dictionary of client instances. Client instances are built lazily from the
* latest settings. Clients are cached by a composite repositoryName key.
*/
protected volatile Map<String, MeteredStorage> clientCache = emptyMap();

/**
* Get the current client settings for all clients in this holder.
*/
protected abstract Map<String, GoogleCloudStorageClientSettings> allClientSettings();

/**
* Attempts to retrieve a client from the cache. If the client does not exist it
* will be created from the latest settings and will populate the cache. The
* returned instance should not be cached by the calling code. Instead, for each
* use, the (possibly updated) instance should be requested by calling this
* method.
*
* @param clientName name of the client settings used to create the client
* @param repositoryName name of the repository that would use the client
* @return a cached client storage instance that can be used to manage objects
* (blobs)
*/
MeteredStorage client(final String clientName, final String repositoryName, final GcsRepositoryStatsCollector statsCollector)
throws IOException {
{
final MeteredStorage storage = clientCache.get(repositoryName);
if (storage != null) {
return storage;
}
}
synchronized (this) {
final MeteredStorage existing = clientCache.get(repositoryName);

if (existing != null) {
return existing;
}

final GoogleCloudStorageClientSettings settings = allClientSettings().get(clientName);

if (settings == null) {
throw new IllegalArgumentException(
"Unknown client name ["
+ clientName
+ "]. Existing client configs: "
+ Strings.collectionToDelimitedString(allClientSettings().keySet(), ",")
);
}

logger.debug(() -> format("creating GCS client with client_name [%s], endpoint [%s]", clientName, settings.getHost()));
final MeteredStorage storage = clientBuilder.apply(settings, statsCollector);
clientCache = Maps.copyMapWithAddedEntry(clientCache, repositoryName, storage);
return storage;
}
}

synchronized void closeRepositoryClients(String repositoryName) {
clientCache = clientCache.entrySet()
.stream()
.filter(entry -> entry.getKey().equals(repositoryName) == false)
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, java.util.Map.Entry::getValue));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can probably drop the java.util.

}

// package private for tests
final boolean hasCachedClientForRepository(String repositoryName) {
return clientCache.containsKey(repositoryName);
}
}

final class ClusterClientsHolder extends ClientsHolder {

private volatile Map<String, GoogleCloudStorageClientSettings> clientSettings = emptyMap();

@Override
protected Map<String, GoogleCloudStorageClientSettings> allClientSettings() {
return clientSettings;
}

/**
* Refreshes the client settings and clears the client cache. Subsequent calls to
* {@code GoogleCloudStorageService#client} will return new clients constructed
* using the parameter settings.
*
* @param clientsSettings the new settings used for building clients for subsequent requests
*/
synchronized void refreshAndClearCache(Map<String, GoogleCloudStorageClientSettings> clientsSettings) {
this.clientCache = emptyMap();
this.clientSettings = Maps.ofEntries(clientsSettings.entrySet());
}
}

final class PerProjectClientsHolder extends ClientsHolder {

private final Map<String, GoogleCloudStorageClientSettings> clientSettings;

PerProjectClientsHolder(Map<String, GoogleCloudStorageClientSettings> clientSettings) {
this.clientSettings = clientSettings;
}

@Override
protected Map<String, GoogleCloudStorageClientSettings> allClientSettings() {
return clientSettings;
}
}
}
Loading