Skip to content

Commit 4d7bd69

Browse files
committed
no wait for async close
1 parent 833e085 commit 4d7bd69

File tree

2 files changed

+3
-103
lines changed

2 files changed

+3
-103
lines changed

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3ClientsManager.java

Lines changed: 2 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@
99

1010
package org.elasticsearch.repositories.s3;
1111

12-
import org.elasticsearch.action.ActionListener;
13-
import org.elasticsearch.action.support.SubscribableListener;
1412
import org.elasticsearch.cluster.ClusterChangedEvent;
1513
import org.elasticsearch.cluster.ClusterStateApplier;
1614
import org.elasticsearch.cluster.metadata.ProjectId;
@@ -22,7 +20,6 @@
2220
import org.elasticsearch.common.util.Maps;
2321
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
2422
import org.elasticsearch.core.IOUtils;
25-
import org.elasticsearch.core.Nullable;
2623
import org.elasticsearch.logging.LogManager;
2724
import org.elasticsearch.logging.Logger;
2825

@@ -33,9 +30,7 @@
3330
import java.util.List;
3431
import java.util.Map;
3532
import java.util.Objects;
36-
import java.util.concurrent.CountDownLatch;
3733
import java.util.concurrent.Executor;
38-
import java.util.concurrent.TimeUnit;
3934
import java.util.concurrent.atomic.AtomicBoolean;
4035
import java.util.function.Function;
4136
import java.util.function.UnaryOperator;
@@ -53,8 +48,6 @@ public class S3ClientsManager implements ClusterStateApplier {
5348
private final Executor executor;
5449
// A map of projectId to clients holder. Adding to and removing from the map happen only in the applier thread.
5550
private final Map<ProjectId, ClientsHolder<?>> clientsHolders;
56-
// Listener for tracking ongoing async closing of obsolete clients. Updated only in the applier thread.
57-
private volatile SubscribableListener<Void> clientsCloseListener = null;
5851

5952
S3ClientsManager(
6053
Settings nodeSettings,
@@ -142,31 +135,20 @@ public void applyClusterState(ClusterChangedEvent event) {
142135
}
143136
// Close stale clients asynchronously without blocking the applier thread
144137
if (clientsHoldersToClose.isEmpty() == false) {
145-
final var currentClientsCloseListener = new SubscribableListener<Void>();
146-
final var previousClientsCloseListener = clientsCloseListener;
147-
clientsCloseListener = currentClientsCloseListener;
148-
if (previousClientsCloseListener != null && previousClientsCloseListener.isDone() == false) {
149-
previousClientsCloseListener.addListener(
150-
ActionListener.running(() -> closeClientsAsync(clientsHoldersToClose, currentClientsCloseListener))
151-
);
152-
} else {
153-
closeClientsAsync(clientsHoldersToClose, currentClientsCloseListener);
154-
}
138+
closeClientsAsync(clientsHoldersToClose);
155139
}
156140
}
157141

158-
private void closeClientsAsync(List<PerProjectClientsHolder> clientsHoldersToClose, ActionListener<Void> listener) {
142+
private void closeClientsAsync(List<PerProjectClientsHolder> clientsHoldersToClose) {
159143
executor.execute(new AbstractRunnable() {
160144
@Override
161145
protected void doRun() throws Exception {
162146
IOUtils.closeWhileHandlingException(clientsHoldersToClose);
163-
listener.onResponse(null);
164147
}
165148

166149
@Override
167150
public void onFailure(Exception e) {
168151
logger.warn("Failed to close s3 clients", e);
169-
listener.onFailure(e);
170152
}
171153
});
172154
}
@@ -224,25 +206,6 @@ void releaseCachedClients(ProjectId projectId) {
224206
*/
225207
void close() {
226208
IOUtils.closeWhileHandlingException(clientsHolders.values());
227-
final var currentClientsCloseListener = clientsCloseListener;
228-
if (currentClientsCloseListener != null && currentClientsCloseListener.isDone() == false) {
229-
// Wait for async clients closing to be completed
230-
final CountDownLatch latch = new CountDownLatch(1);
231-
currentClientsCloseListener.addListener(ActionListener.running(latch::countDown));
232-
try {
233-
if (latch.await(1, TimeUnit.MINUTES) == false) {
234-
logger.warn("Waiting for async closing of s3 clients timed out");
235-
}
236-
} catch (InterruptedException e) {
237-
Thread.currentThread().interrupt();
238-
}
239-
}
240-
}
241-
242-
// visible for tests
243-
@Nullable
244-
SubscribableListener<Void> getClientsCloseListener() {
245-
return clientsCloseListener;
246209
}
247210

248211
private boolean newOrUpdated(ProjectId projectId, Map<String, S3ClientSettings> currentClientSettings) {

modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3ClientsManagerTests.java

Lines changed: 1 addition & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,12 @@
3838

3939
import java.nio.charset.StandardCharsets;
4040
import java.time.Duration;
41-
import java.util.ArrayList;
4241
import java.util.Arrays;
4342
import java.util.HashMap;
4443
import java.util.List;
4544
import java.util.Map;
4645
import java.util.concurrent.CountDownLatch;
4746
import java.util.concurrent.ExecutionException;
48-
import java.util.concurrent.atomic.AtomicBoolean;
4947
import java.util.concurrent.atomic.AtomicInteger;
5048
import java.util.concurrent.atomic.AtomicReference;
5149
import java.util.stream.Collectors;
@@ -75,14 +73,12 @@ public class S3ClientsManagerTests extends ESTestCase {
7573
private S3Service s3Service;
7674
private S3ClientsManager s3ClientsManager;
7775
private final AtomicReference<CountDownLatch> clientRefsCloseLatchRef = new AtomicReference<>();
78-
private final AtomicBoolean closeInternalInvoked = new AtomicBoolean(false);
7976

8077
@Override
8178
public void setUp() throws Exception {
8279
super.setUp();
8380
s3SecretsIdGenerators = ConcurrentCollections.newConcurrentMap();
8481
clientRefsCloseLatchRef.set(null);
85-
closeInternalInvoked.set(false);
8682
clientNames = IntStream.range(0, between(2, 5)).mapToObj(i -> randomIdentifier() + "_" + i).toList();
8783

8884
final Settings.Builder builder = Settings.builder();
@@ -111,28 +107,7 @@ public void setUp() throws Exception {
111107
TestProjectResolvers.allProjects(),
112108
mock(ResourceWatcherService.class),
113109
() -> Region.of("es-test-region")
114-
) {
115-
@Override
116-
protected AmazonS3Reference buildClientReference(S3ClientSettings clientSettings) {
117-
final var original = super.buildClientReference(clientSettings);
118-
final var closeLatch = clientRefsCloseLatchRef.get();
119-
if (closeLatch == null) {
120-
return original;
121-
}
122-
123-
original.decRef();
124-
final AmazonS3Reference proxy = new AmazonS3Reference(original.client(), DummySdkHttpClient.INSTANCE) {
125-
@Override
126-
protected void closeInternal() {
127-
closeInternalInvoked.set(true);
128-
safeAwait(closeLatch);
129-
original.close();
130-
}
131-
};
132-
proxy.mustIncRef();
133-
return proxy;
134-
}
135-
};
110+
);
136111
s3Service.refreshAndClearCache(S3ClientSettings.load(settings));
137112
s3ClientsManager = s3Service.getS3PerProjectClientManager();
138113
assertNotNull(s3ClientsManager);
@@ -145,8 +120,6 @@ public void tearDown() throws Exception {
145120
s3Service.close();
146121
clusterService.close();
147122
threadPool.close();
148-
final var clientsCloseListener = s3ClientsManager.getClientsCloseListener();
149-
assertTrue(clientsCloseListener == null || clientsCloseListener.isDone());
150123
s3ClientsManager.getClientsHolders().forEach((projectId, clientsHolder) -> assertTrue(clientsHolder.isClosed()));
151124
}
152125

@@ -279,42 +252,6 @@ public void testClientsForMultipleProjects() throws InterruptedException {
279252
}
280253
}
281254

282-
public void testWaitForAsyncClientClose() throws Exception {
283-
final CountDownLatch closeLatch = new CountDownLatch(1);
284-
clientRefsCloseLatchRef.set(closeLatch);
285-
286-
final List<ProjectId> projectIds = randomList(1, 3, ESTestCase::randomUniqueProjectId);
287-
final int iterations = between(3, 8);
288-
289-
final List<AmazonS3Reference> clientRefs = new ArrayList<>();
290-
for (int i = 0; i < iterations; i++) {
291-
for (var projectId : projectIds) {
292-
final List<String> subsetOfClientNames = randomNonEmptySubsetOf(clientNames);
293-
updateProjectInClusterState(projectId, newProjectClientsSecrets(projectId, subsetOfClientNames.toArray(String[]::new)));
294-
subsetOfClientNames.forEach(clientName -> {
295-
final var newClient = s3ClientsManager.client(projectId, createRepositoryMetadata(clientName));
296-
clientRefs.add(newClient);
297-
newClient.decRef();
298-
});
299-
if (randomBoolean() && randomBoolean()) {
300-
removeProjectFromClusterState(projectId);
301-
}
302-
}
303-
}
304-
305-
final Thread thread = new Thread(() -> s3Service.close());
306-
thread.start();
307-
308-
assertBusy(() -> assertTrue(closeInternalInvoked.get()));
309-
Thread.sleep(between(0, 100));
310-
assertFalse(s3ClientsManager.getClientsCloseListener().isDone());
311-
312-
closeLatch.countDown();
313-
assertTrue(thread.join(Duration.ofSeconds(10)));
314-
assertTrue(s3ClientsManager.getClientsCloseListener().isDone());
315-
clientRefs.forEach(clientRef -> assertFalse(clientRef.hasReferences()));
316-
}
317-
318255
public void testClusterAndProjectClients() {
319256
final ProjectId projectId = randomUniqueProjectId();
320257
final String clientName = randomFrom(clientNames);

0 commit comments

Comments
 (0)