1212import com .amazonaws .http .IdleConnectionReaper ;
1313import com .amazonaws .services .s3 .AmazonS3 ;
1414
15+ import org .elasticsearch .action .ActionListener ;
16+ import org .elasticsearch .action .support .SubscribableListener ;
1517import org .elasticsearch .cluster .ClusterChangedEvent ;
1618import org .elasticsearch .cluster .ClusterStateListener ;
1719import org .elasticsearch .cluster .metadata .ProjectId ;
2931import java .util .List ;
3032import java .util .Map ;
3133import java .util .concurrent .ConcurrentHashMap ;
34+ import java .util .concurrent .CountDownLatch ;
3235import java .util .concurrent .Executor ;
36+ import java .util .concurrent .TimeUnit ;
3337import java .util .concurrent .atomic .AtomicBoolean ;
3438import java .util .function .Function ;
3539
@@ -38,8 +42,10 @@ public class S3PerProjectClientManager implements ClusterStateListener {
3842 private final Settings settings ;
3943 private final Function <S3ClientSettings , AmazonS3 > clientBuilder ;
4044 private final Executor executor ;
41- // A map of projectId to clients holder. Adding to and removing from the map happen only with the cluster state listener thread.
45+ // A map of projectId to clients holder. Adding to and removing from the map happen only in the cluster state listener thread.
4246 private final Map <ProjectId , ClientsHolder > perProjectClientsCache ;
47+ // Listener for tracking ongoing async closing of obsolete clients. Updated only in the cluster state listener thread.
48+ private volatile SubscribableListener <Void > clientsCloseListener = null ;
4349
4450 public S3PerProjectClientManager (Settings settings , Function <S3ClientSettings , AmazonS3 > clientBuilder , Executor executor ) {
4551 this .settings = settings ;
@@ -92,10 +98,26 @@ public void clusterChanged(ClusterChangedEvent event) {
9298 }
9399 }
94100 if (clientsHoldersToClose .isEmpty () == false ) {
95- executor .execute (() -> IOUtils .closeWhileHandlingException (clientsHoldersToClose ));
101+ final var currentClientsCloseListener = new SubscribableListener <Void >();
102+ final var previousClientsCloseListener = clientsCloseListener ;
103+ clientsCloseListener = currentClientsCloseListener ;
104+ if (previousClientsCloseListener != null && previousClientsCloseListener .isDone () == false ) {
105+ previousClientsCloseListener .addListener (
106+ ActionListener .running (() -> closeClientsAsync (clientsHoldersToClose , currentClientsCloseListener ))
107+ );
108+ } else {
109+ closeClientsAsync (clientsHoldersToClose , currentClientsCloseListener );
110+ }
96111 }
97112 }
98113
114+ private void closeClientsAsync (List <ClientsHolder > clientsHoldersToClose , ActionListener <Void > listener ) {
115+ executor .execute (() -> {
116+ IOUtils .closeWhileHandlingException (clientsHoldersToClose );
117+ listener .onResponse (null );
118+ });
119+ }
120+
99121 public AmazonS3Reference client (ProjectId projectId , RepositoryMetadata repositoryMetadata ) {
100122 final var clientsHolder = perProjectClientsCache .get (projectId );
101123 if (clientsHolder == null ) {
@@ -125,6 +147,19 @@ public void clearCacheForProject(ProjectId projectId) {
125147 */
126148 public void close () {
127149 IOUtils .closeWhileHandlingException (perProjectClientsCache .values ());
150+ final var currentClientsCloseListener = clientsCloseListener ;
151+ if (currentClientsCloseListener != null && currentClientsCloseListener .isDone () == false ) {
152+ // Wait for async clients closing to be completed
153+ final CountDownLatch latch = new CountDownLatch (1 );
154+ currentClientsCloseListener .addListener (ActionListener .running (latch ::countDown ));
155+ try {
156+ if (latch .await (1 , TimeUnit .MINUTES ) == false ) {
157+ // TODO: log warning
158+ }
159+ } catch (InterruptedException e ) {
160+ Thread .currentThread ().interrupt ();
161+ }
162+ }
128163 }
129164
130165 private boolean newOrUpdated (ProjectId projectId , Map <String , S3ClientSettings > currentClientSettings ) {
0 commit comments