|
11 | 11 |
|
12 | 12 | import org.elasticsearch.cluster.ClusterChangedEvent; |
13 | 13 | import org.elasticsearch.cluster.ClusterStateApplier; |
| 14 | +import org.elasticsearch.cluster.metadata.ProjectId; |
| 15 | +import org.elasticsearch.cluster.metadata.ProjectMetadata; |
14 | 16 | import org.elasticsearch.common.CheckedBiFunction; |
15 | 17 | import org.elasticsearch.common.Strings; |
| 18 | +import org.elasticsearch.common.settings.ProjectSecrets; |
| 19 | +import org.elasticsearch.common.settings.Settings; |
16 | 20 | import org.elasticsearch.common.util.Maps; |
17 | 21 | import org.elasticsearch.logging.LogManager; |
18 | 22 | import org.elasticsearch.logging.Logger; |
19 | 23 |
|
20 | 24 | import java.io.IOException; |
| 25 | +import java.util.HashMap; |
21 | 26 | import java.util.Map; |
| 27 | +import java.util.concurrent.ConcurrentHashMap; |
22 | 28 | import java.util.stream.Collectors; |
23 | 29 |
|
24 | 30 | import static java.util.Collections.emptyMap; |
|
27 | 33 | public class GoogleCloudStorageClientsManager implements ClusterStateApplier { |
28 | 34 |
|
29 | 35 | private static final Logger logger = LogManager.getLogger(GoogleCloudStorageClientsManager.class); |
| 36 | + private static final String GCS_SETTING_PREFIX = "gcs."; |
30 | 37 |
|
| 38 | + private final Settings nodeGcsSettings; |
31 | 39 | private final CheckedBiFunction< |
32 | 40 | GoogleCloudStorageClientSettings, |
33 | 41 | GcsRepositoryStatsCollector, |
34 | 42 | MeteredStorage, |
35 | 43 | IOException> clientBuilder; |
36 | | - private final ClientsHolder clusterClientsHolder; |
| 44 | + private final Map<ProjectId, ClientsHolder> clientHolders; |
37 | 45 |
|
38 | 46 | public GoogleCloudStorageClientsManager( |
39 | | - CheckedBiFunction<GoogleCloudStorageClientSettings, GcsRepositoryStatsCollector, MeteredStorage, IOException> clientBuilder |
| 47 | + Settings nodeSettings, |
| 48 | + CheckedBiFunction<GoogleCloudStorageClientSettings, GcsRepositoryStatsCollector, MeteredStorage, IOException> clientBuilder, |
| 49 | + boolean supportsMultipleProjects |
40 | 50 | ) { |
| 51 | + this.nodeGcsSettings = Settings.builder() |
| 52 | + .put(nodeSettings.getByPrefix(GCS_SETTING_PREFIX), false) // not rely on any cluster scoped secrets |
| 53 | + .normalizePrefix(GCS_SETTING_PREFIX) |
| 54 | + .build(); |
41 | 55 | this.clientBuilder = clientBuilder; |
42 | | - this.clusterClientsHolder = new ClientsHolder(); |
| 56 | + if (supportsMultipleProjects) { |
| 57 | + // If multiple projects are supported, we need to track per-project clients |
| 58 | + this.clientHolders = new ConcurrentHashMap<>(Map.of(ProjectId.DEFAULT, new ClientsHolder())); |
| 59 | + } else { |
| 60 | + // If only a single project is supported, we use a single holder for the default project |
| 61 | + this.clientHolders = Map.of(ProjectId.DEFAULT, new ClientsHolder()); |
| 62 | + } |
43 | 63 | } |
44 | 64 |
|
45 | 65 | @Override |
46 | 66 | public void applyClusterState(ClusterChangedEvent event) { |
| 67 | + final Map<ProjectId, ProjectMetadata> currentProjects = event.state().metadata().projects(); |
| 68 | + |
| 69 | + final var updatedPerProjectClients = new HashMap<ProjectId, ClientsHolder>(); |
| 70 | + for (var project : currentProjects.values()) { |
| 71 | + // Skip the default project, it is tracked separately with clusterClientsHolder and |
| 72 | + // updated differently with the ReloadablePlugin interface |
| 73 | + if (ProjectId.DEFAULT.equals(project.id())) { |
| 74 | + continue; |
| 75 | + } |
| 76 | + final ProjectSecrets projectSecrets = project.custom(ProjectSecrets.TYPE); |
| 77 | + // Project secrets can be null when node restarts. It may not have any s3 credentials if s3 is not in use. |
| 78 | + if (projectSecrets == null || projectSecrets.getSettingNames().stream().noneMatch(key -> key.startsWith(GCS_SETTING_PREFIX))) { |
| 79 | + // Most likely there won't be any existing client, but attempt to remove it anyway just in case |
| 80 | + clientHolders.remove(project.id()); |
| 81 | + continue; |
| 82 | + } |
| 83 | + |
| 84 | + final Settings currentSettings = Settings.builder() |
| 85 | + // merge with static settings such as max retries etc |
| 86 | + // TODO: https://elasticco.atlassian.net/browse/ES-11716 Consider change this to use per-project settings |
| 87 | + .put(nodeGcsSettings) |
| 88 | + .setSecureSettings(projectSecrets.getSettings()) |
| 89 | + .build(); |
| 90 | + final Map<String, GoogleCloudStorageClientSettings> clientSettings = GoogleCloudStorageClientSettings.load(currentSettings) |
| 91 | + .entrySet() |
| 92 | + .stream() |
| 93 | + // Skip project clients that have no credentials configured. This should not happen in serverless. |
| 94 | + // But it is safer to skip them and is also a more consistent behaviour with the cases when |
| 95 | + // project secrets are not present. |
| 96 | + .filter(entry -> entry.getValue().getCredential() != null) |
| 97 | + .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue)); |
| 98 | + |
| 99 | + if (clientSettings.isEmpty()) { |
| 100 | + // clientSettings should not be empty, i.e. there should be at least one client configured. |
| 101 | + // But if it does somehow happen, log a warning and continue. The project will not have usable client but that is ok. |
| 102 | + logger.warn("Skipping project [{}] with no client settings", project.id()); |
| 103 | + continue; |
| 104 | + } |
| 105 | + |
| 106 | + // TODO: If performance is an issue, we may consider comparing just the relevant project secrets for new or updated clients |
| 107 | + // and avoid building the clientSettings |
| 108 | + if (newOrUpdated(project.id(), clientSettings)) { |
| 109 | + updatedPerProjectClients.put(project.id(), new ClientsHolder()); |
| 110 | + } |
| 111 | + } |
| 112 | + |
| 113 | + // Updated projects |
| 114 | + for (var projectId : updatedPerProjectClients.keySet()) { |
| 115 | + assert ProjectId.DEFAULT.equals(projectId) == false; |
| 116 | + clientHolders.put(projectId, updatedPerProjectClients.get(projectId)); |
| 117 | + } |
| 118 | + // Removed projects |
| 119 | + for (var projectId : clientHolders.keySet()) { |
| 120 | + assert ProjectId.DEFAULT.equals(projectId) == false; |
| 121 | + if (currentProjects.containsKey(projectId) == false) { |
| 122 | + clientHolders.remove(projectId); |
| 123 | + } |
| 124 | + } |
| 125 | + } |
47 | 126 |
|
| 127 | + @Deprecated(forRemoval = true) |
| 128 | + MeteredStorage client(String clientName, String repositoryName, GcsRepositoryStatsCollector statsCollector) throws IOException { |
| 129 | + return client(ProjectId.DEFAULT, clientName, repositoryName, statsCollector); |
48 | 130 | } |
49 | 131 |
|
| 132 | + @Deprecated(forRemoval = true) |
50 | 133 | void refreshAndClearCache(Map<String, GoogleCloudStorageClientSettings> clientsSettings) { |
51 | | - clusterClientsHolder.refreshAndClearCache(clientsSettings); |
| 134 | + refreshAndClearCache(ProjectId.DEFAULT, clientsSettings); |
52 | 135 | } |
53 | 136 |
|
54 | | - MeteredStorage client(final String clientName, final String repositoryName, final GcsRepositoryStatsCollector statsCollector) |
| 137 | + @Deprecated(forRemoval = true) |
| 138 | + void closeRepositoryClients(String repositoryName) { |
| 139 | + closeRepositoryClients(ProjectId.DEFAULT, repositoryName); |
| 140 | + } |
| 141 | + |
| 142 | + MeteredStorage client(ProjectId projectId, String clientName, String repositoryName, GcsRepositoryStatsCollector statsCollector) |
55 | 143 | throws IOException { |
56 | | - return clusterClientsHolder.client(clientName, repositoryName, statsCollector); |
| 144 | + return getClientsHolderSafe(projectId).client(clientName, repositoryName, statsCollector); |
57 | 145 | } |
58 | 146 |
|
59 | | - void closeRepositoryClients(String repositoryName) { |
60 | | - clusterClientsHolder.closeRepositoryClients(repositoryName); |
| 147 | + void refreshAndClearCache(ProjectId projectId, Map<String, GoogleCloudStorageClientSettings> clientsSettings) { |
| 148 | + getClientsHolderSafe(projectId).refreshAndClearCache(clientsSettings); |
| 149 | + } |
| 150 | + |
| 151 | + void closeRepositoryClients(ProjectId projectId, String repositoryName) { |
| 152 | + getClientsHolderSafe(projectId).closeRepositoryClients(repositoryName); |
| 153 | + } |
| 154 | + |
| 155 | + private boolean newOrUpdated(ProjectId projectId, Map<String, GoogleCloudStorageClientSettings> currentClientSettings) { |
| 156 | + final var old = clientHolders.get(projectId); |
| 157 | + if (old == null) { |
| 158 | + return true; |
| 159 | + } |
| 160 | + return currentClientSettings.equals(old.clientSettings) == false; |
| 161 | + } |
| 162 | + |
| 163 | + private ClientsHolder getClientsHolderSafe(ProjectId projectId) { |
| 164 | + final var clientsHolder = clientHolders.get(projectId); |
| 165 | + if (clientsHolder == null) { |
| 166 | + assert ProjectId.DEFAULT.equals(projectId) == false; |
| 167 | + throw new IllegalArgumentException("No GCS client is configured for project [" + projectId + "]"); |
| 168 | + } |
| 169 | + return clientsHolder; |
61 | 170 | } |
62 | 171 |
|
63 | 172 | class ClientsHolder { |
|
0 commit comments