|
11 | 11 |
|
12 | 12 | import org.elasticsearch.cluster.ClusterChangedEvent; |
13 | 13 | import org.elasticsearch.cluster.ClusterStateApplier; |
| 14 | +import org.elasticsearch.common.CheckedBiFunction; |
| 15 | +import org.elasticsearch.common.Strings; |
| 16 | +import org.elasticsearch.common.util.Maps; |
14 | 17 | import org.elasticsearch.logging.LogManager; |
15 | 18 | import org.elasticsearch.logging.Logger; |
16 | 19 |
|
| 20 | +import java.io.IOException; |
| 21 | +import java.util.Map; |
| 22 | +import java.util.stream.Collectors; |
| 23 | + |
| 24 | +import static java.util.Collections.emptyMap; |
| 25 | +import static org.elasticsearch.core.Strings.format; |
| 26 | + |
17 | 27 | public class GoogleCloudStorageClientsManager implements ClusterStateApplier { |
18 | 28 |
|
19 | 29 | private static final Logger logger = LogManager.getLogger(GoogleCloudStorageClientsManager.class); |
20 | 30 |
|
| 31 | + private final CheckedBiFunction< |
| 32 | + GoogleCloudStorageClientSettings, |
| 33 | + GcsRepositoryStatsCollector, |
| 34 | + MeteredStorage, |
| 35 | + IOException> clientBuilder; |
| 36 | + private final ClientsHolder clusterClientsHolder; |
| 37 | + |
| 38 | + public GoogleCloudStorageClientsManager( |
| 39 | + CheckedBiFunction<GoogleCloudStorageClientSettings, GcsRepositoryStatsCollector, MeteredStorage, IOException> clientBuilder |
| 40 | + ) { |
| 41 | + this.clientBuilder = clientBuilder; |
| 42 | + this.clusterClientsHolder = new ClientsHolder(); |
| 43 | + } |
| 44 | + |
21 | 45 | @Override |
22 | 46 | public void applyClusterState(ClusterChangedEvent event) { |
23 | 47 |
|
24 | 48 | } |
| 49 | + |
| 50 | + void refreshAndClearCache(Map<String, GoogleCloudStorageClientSettings> clientsSettings) { |
| 51 | + clusterClientsHolder.refreshAndClearCache(clientsSettings); |
| 52 | + } |
| 53 | + |
| 54 | + MeteredStorage client(final String clientName, final String repositoryName, final GcsRepositoryStatsCollector statsCollector) |
| 55 | + throws IOException { |
| 56 | + return clusterClientsHolder.client(clientName, repositoryName, statsCollector); |
| 57 | + } |
| 58 | + |
| 59 | + void closeRepositoryClients(String repositoryName) { |
| 60 | + clusterClientsHolder.closeRepositoryClients(repositoryName); |
| 61 | + } |
| 62 | + |
| 63 | + class ClientsHolder { |
| 64 | + |
| 65 | + private volatile Map<String, GoogleCloudStorageClientSettings> clientSettings = emptyMap(); |
| 66 | + /** |
| 67 | + * Dictionary of client instances. Client instances are built lazily from the |
| 68 | + * latest settings. Clients are cached by a composite repositoryName key. |
| 69 | + */ |
| 70 | + private volatile Map<String, MeteredStorage> clientCache = emptyMap(); |
| 71 | + |
| 72 | + /** |
| 73 | + * Attempts to retrieve a client from the cache. If the client does not exist it |
| 74 | + * will be created from the latest settings and will populate the cache. The |
| 75 | + * returned instance should not be cached by the calling code. Instead, for each |
| 76 | + * use, the (possibly updated) instance should be requested by calling this |
| 77 | + * method. |
| 78 | + * |
| 79 | + * @param clientName name of the client settings used to create the client |
| 80 | + * @param repositoryName name of the repository that would use the client |
| 81 | + * @return a cached client storage instance that can be used to manage objects |
| 82 | + * (blobs) |
| 83 | + */ |
| 84 | + MeteredStorage client(final String clientName, final String repositoryName, final GcsRepositoryStatsCollector statsCollector) |
| 85 | + throws IOException { |
| 86 | + { |
| 87 | + final MeteredStorage storage = clientCache.get(repositoryName); |
| 88 | + if (storage != null) { |
| 89 | + return storage; |
| 90 | + } |
| 91 | + } |
| 92 | + synchronized (this) { |
| 93 | + final MeteredStorage existing = clientCache.get(repositoryName); |
| 94 | + |
| 95 | + if (existing != null) { |
| 96 | + return existing; |
| 97 | + } |
| 98 | + |
| 99 | + final GoogleCloudStorageClientSettings settings = clientSettings.get(clientName); |
| 100 | + |
| 101 | + if (settings == null) { |
| 102 | + throw new IllegalArgumentException( |
| 103 | + "Unknown client name [" |
| 104 | + + clientName |
| 105 | + + "]. Existing client configs: " |
| 106 | + + Strings.collectionToDelimitedString(clientSettings.keySet(), ",") |
| 107 | + ); |
| 108 | + } |
| 109 | + |
| 110 | + logger.debug(() -> format("creating GCS client with client_name [%s], endpoint [%s]", clientName, settings.getHost())); |
| 111 | + final MeteredStorage storage = clientBuilder.apply(settings, statsCollector); |
| 112 | + clientCache = Maps.copyMapWithAddedEntry(clientCache, repositoryName, storage); |
| 113 | + return storage; |
| 114 | + } |
| 115 | + } |
| 116 | + |
| 117 | + /** |
| 118 | + * Refreshes the client settings and clears the client cache. Subsequent calls to |
| 119 | + * {@code GoogleCloudStorageService#client} will return new clients constructed |
| 120 | + * using the parameter settings. |
| 121 | + * |
| 122 | + * @param clientsSettings the new settings used for building clients for subsequent requests |
| 123 | + */ |
| 124 | + synchronized void refreshAndClearCache(Map<String, GoogleCloudStorageClientSettings> clientsSettings) { |
| 125 | + this.clientCache = emptyMap(); |
| 126 | + this.clientSettings = Maps.ofEntries(clientsSettings.entrySet()); |
| 127 | + } |
| 128 | + |
| 129 | + synchronized void closeRepositoryClients(String repositoryName) { |
| 130 | + clientCache = clientCache.entrySet() |
| 131 | + .stream() |
| 132 | + .filter(entry -> entry.getKey().equals(repositoryName) == false) |
| 133 | + .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue)); |
| 134 | + } |
| 135 | + } |
25 | 136 | } |
0 commit comments