1515import org .elasticsearch .cluster .ClusterStateApplier ;
1616import org .elasticsearch .cluster .metadata .ProjectId ;
1717import org .elasticsearch .cluster .metadata .ProjectMetadata ;
18+ import org .elasticsearch .cluster .metadata .RepositoryMetadata ;
1819import org .elasticsearch .common .settings .ProjectSecrets ;
1920import org .elasticsearch .common .settings .Settings ;
2021import org .elasticsearch .common .util .Maps ;
@@ -47,7 +48,7 @@ public class S3PerProjectClientManager implements ClusterStateApplier {
4748 private final Function <S3ClientSettings , AmazonS3Reference > clientBuilder ;
4849 private final Executor executor ;
4950 // A map of projectId to clients holder. Adding to and removing from the map happen only in the applier thread.
50- private final Map <ProjectId , ClientsHolder > projectClientsHolders ;
51+ private final Map <ProjectId , ClientsHolder <?> > projectClientsHolders ;
5152 // Listener for tracking ongoing async closing of obsolete clients. Updated only in the applier thread.
5253 private volatile SubscribableListener <Void > clientsCloseListener = null ;
5354
@@ -62,24 +63,25 @@ public class S3PerProjectClientManager implements ClusterStateApplier {
6263 }
6364
6465 // visible for tests
65- Map <ProjectId , ClientsHolder > getProjectClientsHolders () {
66+ Map <ProjectId , ClientsHolder <?> > getProjectClientsHolders () {
6667 return Map .copyOf (projectClientsHolders );
6768 }
6869
6970 @ Override
7071 public void applyClusterState (ClusterChangedEvent event ) {
7172 final Map <ProjectId , ProjectMetadata > currentProjects = event .state ().metadata ().projects ();
7273
73- final var updatedPerProjectClients = new HashMap <ProjectId , ClientsHolder >();
74- final List <ClientsHolder > clientsHoldersToClose = new ArrayList <>();
74+ final var updatedPerProjectClients = new HashMap <ProjectId , PerProjectClientsHolder >();
75+ final List <PerProjectClientsHolder > clientsHoldersToClose = new ArrayList <>();
7576 for (var project : currentProjects .values ()) {
7677 final ProjectSecrets projectSecrets = project .custom (ProjectSecrets .TYPE );
7778 // Project secrets can be null when node restarts. It may not have any s3 credentials if s3 is not in use.
7879 if (projectSecrets == null || projectSecrets .getSettingNames ().stream ().noneMatch (key -> key .startsWith ("s3." ))) {
7980 // Most likely there won't be any existing client, but attempt to remove it anyway just in case
80- final ClientsHolder removed = projectClientsHolders .remove (project .id ());
81+ final var removed = projectClientsHolders .remove (project .id ());
8182 if (removed != null ) {
82- clientsHoldersToClose .add (removed );
83+ assert removed instanceof PerProjectClientsHolder ;
84+ clientsHoldersToClose .add ((PerProjectClientsHolder ) removed );
8385 }
8486 continue ;
8587 }
@@ -109,23 +111,24 @@ public void applyClusterState(ClusterChangedEvent event) {
109111 // TODO: If performance is an issue, we may consider comparing just the relevant project secrets for new or updated clients
110112 // and avoid building the clientSettings
111113 if (newOrUpdated (project .id (), clientSettings )) {
112- updatedPerProjectClients .put (project .id (), new ClientsHolder (project .id (), clientSettings ));
114+ updatedPerProjectClients .put (project .id (), new PerProjectClientsHolder (project .id (), clientSettings ));
113115 }
114116 }
115117
116118 // Updated projects
117119 for (var projectId : updatedPerProjectClients .keySet ()) {
118120 final var old = projectClientsHolders .put (projectId , updatedPerProjectClients .get (projectId ));
119121 if (old != null ) {
120- clientsHoldersToClose .add (old );
122+ assert old instanceof PerProjectClientsHolder ;
123+ clientsHoldersToClose .add ((PerProjectClientsHolder ) old );
121124 }
122125 }
123126 // removed projects
124127 for (var projectId : projectClientsHolders .keySet ()) {
125128 if (currentProjects .containsKey (projectId ) == false ) {
126129 final var removed = projectClientsHolders .remove (projectId );
127- assert removed != null ;
128- clientsHoldersToClose .add (removed );
130+ assert removed instanceof PerProjectClientsHolder ;
131+ clientsHoldersToClose .add (( PerProjectClientsHolder ) removed );
129132 }
130133 }
131134 // Close stale clients asynchronously without blocking the applier thread
@@ -143,7 +146,7 @@ public void applyClusterState(ClusterChangedEvent event) {
143146 }
144147 }
145148
146- private void closeClientsAsync (List <ClientsHolder > clientsHoldersToClose , ActionListener <Void > listener ) {
149+ private void closeClientsAsync (List <PerProjectClientsHolder > clientsHoldersToClose , ActionListener <Void > listener ) {
147150 executor .execute (new AbstractRunnable () {
148151 @ Override
149152 protected void doRun () throws Exception {
@@ -165,7 +168,9 @@ public AmazonS3Reference client(ProjectId projectId, String clientName) {
165168 if (clientsHolder == null ) {
166169 throw new IllegalArgumentException ("no s3 client is configured for project [" + projectId + "]" );
167170 }
168- return clientsHolder .client (clientName );
171+ return clientsHolder .client (
172+ new RepositoryMetadata ("repo" , S3Repository .TYPE , Settings .builder ().put (S3Repository .CLIENT_NAME .getKey (), clientName ).build ())
173+ );
169174 }
170175
171176 /**
@@ -212,46 +217,44 @@ private boolean newOrUpdated(ProjectId projectId, Map<String, S3ClientSettings>
212217 if (old == null ) {
213218 return true ;
214219 }
215- return currentClientSettings .equals (old .clientSettings ()) == false ;
220+ return currentClientSettings .equals (old .allClientSettings ()) == false ;
216221 }
217222
218- /**
219- * Holder class of s3 clients for a single project. It is instantiated in the cluster state thread with client
220- * settings. The clients are created and cached lazily when the {@link #client(String)} method is called.
221- * Cached clients are closed and cleared out when the {@link #clearCache()} method is called. Subsequent calls to
222- * {@link #client(String)} will recreate them. The call to {@link #close()} method clears the cache as well but
223- * also flags the holder to be closed so that no new clients can be created.
224- */
225- final class ClientsHolder implements Closeable {
226- private final AtomicBoolean closed = new AtomicBoolean (false );
227- private final ProjectId projectId ;
228- private final Map <String , S3ClientSettings > clientSettings ;
229- // Client name -> client reference
230- private volatile Map <String , AmazonS3Reference > clientsCache = Collections .emptyMap ();
223+ abstract class ClientsHolder <K > implements Closeable {
224+ protected final ProjectId projectId ;
225+ protected final AtomicBoolean closed = new AtomicBoolean (false );
226+ protected volatile Map <K , AmazonS3Reference > clientsCache = Collections .emptyMap ();
231227
232- ClientsHolder (ProjectId projectId , Map < String , S3ClientSettings > clientSettings ) {
228+ ClientsHolder (ProjectId projectId ) {
233229 this .projectId = projectId ;
234- this .clientSettings = clientSettings ;
235230 }
236231
237- Map <String , S3ClientSettings > clientSettings () {
238- return clientSettings ;
239- }
232+ abstract K clientKey (RepositoryMetadata repositoryMetadata );
240233
241- AmazonS3Reference client (String clientName ) {
242- final var clientReference = clientsCache .get (clientName );
234+ abstract String clientName (K clientKey );
235+
236+ abstract S3ClientSettings singleClientSettings (K clientKey );
237+
238+ abstract Map <String , S3ClientSettings > allClientSettings ();
239+
240+ AmazonS3Reference client (RepositoryMetadata repositoryMetadata ) {
241+ final var clientKey = clientKey (repositoryMetadata );
242+
243+ final var clientReference = clientsCache .get (clientKey );
243244 // It is ok to retrieve an existing client when the cache is being cleared or the holder is closing.
244245 // As long as there are paired incRef/decRef calls, the client will be closed when the last reference is released
245246 // by either the caller of this method or the clearCache() method.
246247 if (clientReference != null && clientReference .tryIncRef ()) {
247248 return clientReference ;
248249 }
249- final var settings = clientSettings . get ( clientName );
250+ final var settings = singleClientSettings ( clientKey );
250251 if (settings == null ) {
251- throw new IllegalArgumentException ("s3 client [" + clientName + "] does not exist for project [" + projectId + "]" );
252+ throw new IllegalArgumentException (
253+ "s3 client [" + clientName (clientKey ) + "] does not exist for project [" + projectId + "]"
254+ );
252255 }
253256 synchronized (this ) {
254- final var existing = clientsCache .get (clientName );
257+ final var existing = clientsCache .get (clientKey );
255258 if (existing != null && existing .tryIncRef ()) {
256259 return existing ;
257260 }
@@ -262,13 +265,13 @@ AmazonS3Reference client(String clientName) {
262265 // The close() method maybe called after we checked it, it is ok since we are already inside the synchronized block.
263266 // The clearCache() will clear the newly added client.
264267 final var newClientReference = clientBuilder .apply (settings );
265- clientsCache = Maps .copyMapWithAddedEntry (clientsCache , clientName , newClientReference );
268+ clientsCache = Maps .copyMapWithAddedEntry (clientsCache , clientKey , newClientReference );
266269 return newClientReference ;
267270 }
268271 }
269272
270273 /**
271- * Clear the cache by closing and clear out all clients. Subsequent {@link #client(String )} calls will recreate
274+ * Clear the cache by closing and clear out all clients. Subsequent {@link #client(RepositoryMetadata )} calls will recreate
272275 * the clients and populate the cache again.
273276 */
274277 synchronized void clearCache () {
@@ -290,4 +293,50 @@ boolean isClosed() {
290293 return closed .get ();
291294 }
292295 }
296+
297+ /**
298+ * Holder class of s3 clients for a single project. It is instantiated in the cluster state thread with client
299+ * settings. The clients are created and cached lazily when the {@link #client(String)} method is called.
300+ * Cached clients are closed and cleared out when the {@link #clearCache()} method is called. Subsequent calls to
301+ * {@link #client(String)} will recreate them. The call to {@link #close()} method clears the cache as well but
302+ * also flags the holder to be closed so that no new clients can be created.
303+ */
304+ final class PerProjectClientsHolder extends ClientsHolder <String > {
305+ private final Map <String , S3ClientSettings > clientSettings ;
306+
307+ PerProjectClientsHolder (ProjectId projectId , Map <String , S3ClientSettings > clientSettings ) {
308+ super (projectId );
309+ this .clientSettings = clientSettings ;
310+ }
311+
312+ @ Override
313+ Map <String , S3ClientSettings > allClientSettings () {
314+ return clientSettings ;
315+ }
316+
317+ @ Override
318+ String clientKey (RepositoryMetadata repositoryMetadata ) {
319+ return S3Repository .CLIENT_NAME .get (repositoryMetadata .settings ());
320+ }
321+
322+ @ Override
323+ String clientName (String clientKey ) {
324+ return clientKey ;
325+ }
326+
327+ @ Override
328+ S3ClientSettings singleClientSettings (String clientKey ) {
329+ return clientSettings .get (clientKey );
330+ }
331+
332+ AmazonS3Reference client (String clientName ) {
333+ return client (
334+ new RepositoryMetadata (
335+ "repo" ,
336+ S3Repository .TYPE ,
337+ Settings .builder ().put (S3Repository .CLIENT_NAME .getKey (), clientName ).build ()
338+ )
339+ );
340+ }
341+ }
293342}
0 commit comments