2525import org .apache .logging .log4j .LogManager ;
2626import org .apache .logging .log4j .Logger ;
2727import org .elasticsearch .common .Strings ;
28+ import org .elasticsearch .common .blobstore .OperationPurpose ;
2829import org .elasticsearch .common .util .Maps ;
2930import org .elasticsearch .core .Nullable ;
3031import org .elasticsearch .core .SuppressForbidden ;
4041import java .net .URL ;
4142import java .security .KeyStore ;
4243import java .util .Map ;
44+ import java .util .stream .Collectors ;
4345
4446import static java .nio .charset .StandardCharsets .UTF_8 ;
4547import static java .util .Collections .emptyMap ;
@@ -51,12 +53,15 @@ public class GoogleCloudStorageService {
5153
5254 private volatile Map <String , GoogleCloudStorageClientSettings > clientSettings = emptyMap ();
5355
56+ private record ClientKey (OperationPurpose purpose , String repositoryName ) {}
57+
5458 /**
5559 * Dictionary of client instances. Client instances are built lazily from the
56- * latest settings. Each repository has its own client instance identified by
57- * the repository name.
60+ * latest settings. Clients are cached by a composite OperationPurpose/repositoryName
61+ * key.
62+ * @see ClientKey
5863 */
59- private volatile Map <String , Storage > clientCache = emptyMap ();
64+ private volatile Map <ClientKey , Storage > clientCache = emptyMap ();
6065
6166 /**
6267 * Refreshes the client settings and clears the client cache. Subsequent calls to
@@ -79,20 +84,26 @@ public synchronized void refreshAndClearCache(Map<String, GoogleCloudStorageClie
7984 *
8085 * @param clientName name of the client settings used to create the client
8186 * @param repositoryName name of the repository that would use the client
87+ * @param operationPurpose the purpose for which the client will be used
8288 * @param stats the stats collector used to gather information about the underlying SKD API calls.
8389 * @return a cached client storage instance that can be used to manage objects
8490 * (blobs)
8591 */
86- public Storage client (final String clientName , final String repositoryName , final GoogleCloudStorageOperationsStats stats )
87- throws IOException {
92+ public Storage client (
93+ final String clientName ,
94+ final String repositoryName ,
95+ final OperationPurpose operationPurpose ,
96+ final GoogleCloudStorageOperationsStats stats
97+ ) throws IOException {
98+ ClientKey clientKey = new ClientKey (operationPurpose , repositoryName );
8899 {
89- final Storage storage = clientCache .get (repositoryName );
100+ final Storage storage = clientCache .get (clientKey );
90101 if (storage != null ) {
91102 return storage ;
92103 }
93104 }
94105 synchronized (this ) {
95- final Storage existing = clientCache .get (repositoryName );
106+ final Storage existing = clientCache .get (clientKey );
96107
97108 if (existing != null ) {
98109 return existing ;
@@ -110,26 +121,33 @@ public Storage client(final String clientName, final String repositoryName, fina
110121 }
111122
112123 logger .debug (() -> format ("creating GCS client with client_name [%s], endpoint [%s]" , clientName , settings .getHost ()));
113- final Storage storage = createClient (settings , stats );
114- clientCache = Maps .copyMapWithAddedEntry (clientCache , repositoryName , storage );
124+ final Storage storage = createClient (settings , stats , operationPurpose );
125+ clientCache = Maps .copyMapWithAddedEntry (clientCache , clientKey , storage );
115126 return storage ;
116127 }
117128 }
118129
119- synchronized void closeRepositoryClient (String repositoryName ) {
120- clientCache = Maps .copyMapWithRemovedEntry (clientCache , repositoryName );
130+ synchronized void closeRepositoryClients (String repositoryName ) {
131+ clientCache = clientCache .entrySet ()
132+ .stream ()
133+ .filter (entry -> entry .getKey ().repositoryName ().equals (repositoryName ) == false )
134+ .collect (Collectors .toUnmodifiableMap (Map .Entry ::getKey , Map .Entry ::getValue ));
121135 }
122136
123137 /**
124138 * Creates a client that can be used to manage Google Cloud Storage objects. The client is thread-safe.
125139 *
126140 * @param gcsClientSettings client settings to use, including secure settings
127141 * @param stats the stats collector to use by the underlying SDK
142+ * @param operationPurpose the purpose this client will be used for
128143 * @return a new client storage instance that can be used to manage objects
129144 * (blobs)
130145 */
131- private Storage createClient (GoogleCloudStorageClientSettings gcsClientSettings , GoogleCloudStorageOperationsStats stats )
132- throws IOException {
146+ private Storage createClient (
147+ GoogleCloudStorageClientSettings gcsClientSettings ,
148+ GoogleCloudStorageOperationsStats stats ,
149+ OperationPurpose operationPurpose
150+ ) throws IOException {
133151 final HttpTransport httpTransport = SocketAccess .doPrivilegedIOException (() -> {
134152 final NetHttpTransport .Builder builder = new NetHttpTransport .Builder ();
135153 // requires java.lang.RuntimePermission "setFactory"
@@ -149,7 +167,7 @@ private Storage createClient(GoogleCloudStorageClientSettings gcsClientSettings,
149167 return builder .build ();
150168 });
151169
152- final GoogleCloudStorageHttpStatsCollector httpStatsCollector = new GoogleCloudStorageHttpStatsCollector (stats );
170+ final GoogleCloudStorageHttpStatsCollector httpStatsCollector = new GoogleCloudStorageHttpStatsCollector (stats , operationPurpose );
153171
154172 final HttpTransportOptions httpTransportOptions = new HttpTransportOptions (
155173 HttpTransportOptions .newBuilder ()
0 commit comments