Skip to content

Commit f18f0ac

Browse files
authored
GH-1277: SafeNotifyService threads leak in CuratorFrameWorkImpl (#1278)
CURATOR-495 introduced a new runSafeService field in CuratorFrameworkImpl class, and this field is either initialized by an external ExecutorService via the builder, or it is created internally within the class. In the CuratorFrameworkImpl#close method though, this Executor is never closed, so the threads that are opened by the instances are lingering there until the VM is closed by default. Worse, if someone specifies a thread factory to the framework implementation via the builder that produces non-daemon threads, the VM never exits due to the unstopped single thread executor. Fixes #1277.
1 parent 3bc3ea1 commit f18f0ac

File tree

3 files changed

+86
-7
lines changed

3 files changed

+86
-7
lines changed

curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -509,6 +509,9 @@ public Builder schemaSet(SchemaSet schemaSet) {
509509
* By default, an executor is allocated internally using the provided (or default)
510510
* {@link #threadFactory(java.util.concurrent.ThreadFactory)}. Use this method
511511
* to set a custom executor.
512+
* Whenever a custom executor is set, it is the caller's responsibility to close the
513+
* executor after the CuratorFramework closure.
514+
* The internally created executor is closed when CuratorFramework is closed.
512515
*
513516
* @param runSafeService executor to use for calls to notifyAll from Watcher callbacks etc
514517
* @return this

curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.Arrays;
2727
import java.util.Collection;
2828
import java.util.List;
29+
import java.util.Optional;
2930
import java.util.concurrent.BlockingQueue;
3031
import java.util.concurrent.Callable;
3132
import java.util.concurrent.CompletableFuture;
@@ -100,6 +101,7 @@ public final class CuratorFrameworkImpl extends CuratorFrameworkBase {
100101
private final EnsembleTracker ensembleTracker;
101102
private final SchemaSet schemaSet;
102103
private final Executor runSafeService;
104+
private boolean isExternalRunSafeService = false;
103105
private final ZookeeperCompatibility zookeeperCompatibility;
104106

105107
private volatile ExecutorService executorService;
@@ -194,6 +196,7 @@ public void process(WatchedEvent watchedEvent) {
194196

195197
private Executor makeRunSafeService(CuratorFrameworkFactory.Builder builder) {
196198
if (builder.getRunSafeService() != null) {
199+
isExternalRunSafeService = true;
197200
return builder.getRunSafeService();
198201
}
199202
ThreadFactory threadFactory = builder.getThreadFactory();
@@ -373,16 +376,19 @@ public void close() {
373376
}
374377
});
375378

379+
Optional<CompletableFuture<Void>> executorServiceClosure = Optional.empty();
376380
if (executorService != null) {
377-
executorService.shutdownNow();
378-
try {
379-
executorService.awaitTermination(maxCloseWaitMs, TimeUnit.MILLISECONDS);
380-
} catch (InterruptedException e) {
381-
// Interrupted while interrupting; I give up.
382-
Thread.currentThread().interrupt();
383-
}
381+
executorServiceClosure = Optional.of(shutdownAndAwaitTerminationAsync(executorService));
384382
}
385383

384+
Optional<CompletableFuture<Void>> runSafeServiceClosure = Optional.empty();
385+
if (!isExternalRunSafeService && runSafeService != null) {
386+
runSafeServiceClosure =
387+
Optional.of(shutdownAndAwaitTerminationAsync(((ExecutorService) runSafeService)));
388+
}
389+
executorServiceClosure.ifPresent(CompletableFuture::join);
390+
runSafeServiceClosure.ifPresent(CompletableFuture::join);
391+
386392
if (ensembleTracker != null) {
387393
ensembleTracker.close();
388394
}
@@ -400,6 +406,26 @@ public void close() {
400406
}
401407
}
402408

409+
/**
410+
* Utility method to run the executor service shutdown in a background thread.
411+
* This is in order to ensure we don't extend the wait time above maxCloseWaitMs by waiting on multiple
412+
* executors to terminate.
413+
*
414+
* @param service the ExecutorService to shut down.
415+
* @return the future represents the job closing the executor service and waits on its termination.
416+
*/
417+
private CompletableFuture<Void> shutdownAndAwaitTerminationAsync(final ExecutorService service) {
418+
return CompletableFuture.runAsync(() -> {
419+
service.shutdownNow();
420+
try {
421+
service.awaitTermination(maxCloseWaitMs, TimeUnit.MILLISECONDS);
422+
} catch (InterruptedException e) {
423+
// Interrupted while interrupting; I give up.
424+
Thread.currentThread().interrupt();
425+
}
426+
});
427+
}
428+
403429
NamespaceImpl getNamespaceImpl() {
404430
return namespace;
405431
}

curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,17 @@
2525
import com.google.common.collect.Lists;
2626
import com.google.common.collect.Queues;
2727
import java.util.ArrayList;
28+
import java.util.Arrays;
2829
import java.util.List;
2930
import java.util.concurrent.BlockingQueue;
3031
import java.util.concurrent.CountDownLatch;
32+
import java.util.concurrent.ExecutorService;
33+
import java.util.concurrent.Executors;
3134
import java.util.concurrent.TimeUnit;
3235
import java.util.concurrent.atomic.AtomicBoolean;
3336
import java.util.concurrent.atomic.AtomicLong;
3437
import java.util.concurrent.atomic.AtomicReference;
38+
import java.util.stream.Stream;
3539
import org.apache.curator.framework.CuratorFramework;
3640
import org.apache.curator.framework.CuratorFrameworkFactory;
3741
import org.apache.curator.framework.api.ACLProvider;
@@ -45,6 +49,7 @@
4549
import org.apache.curator.test.BaseClassForTests;
4650
import org.apache.curator.test.Timing;
4751
import org.apache.curator.utils.CloseableUtils;
52+
import org.apache.curator.utils.ThreadUtils;
4853
import org.apache.zookeeper.KeeperException.Code;
4954
import org.apache.zookeeper.data.ACL;
5055
import org.junit.jupiter.api.Test;
@@ -306,4 +311,49 @@ public void listen(OperationAndData<?> data) {
306311
CloseableUtils.closeQuietly(client);
307312
}
308313
}
314+
315+
@Test
316+
public void testCloseShutsDownInternalRunSafeService() {
317+
Timing timing = new Timing();
318+
CuratorFramework client = CuratorFrameworkFactory.newClient(
319+
server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
320+
client.start();
321+
client.runSafe(() -> {});
322+
assertTrue(enumerateThreads().anyMatch(t -> t.getName().contains("SafeNotifyService")));
323+
324+
client.close();
325+
326+
assertTrue(enumerateThreads().noneMatch(t -> t.getName().contains("SafeNotifyService")));
327+
}
328+
329+
@Test
330+
public void testCloseLeavesExternalRunSafeServiceRunning() throws Exception {
331+
Timing timing = new Timing();
332+
ExecutorService externalRunner =
333+
Executors.newSingleThreadScheduledExecutor(ThreadUtils.newThreadFactory("ExternalSafeNotifyService"));
334+
CuratorFramework client = CuratorFrameworkFactory.builder()
335+
.connectString(server.getConnectString())
336+
.sessionTimeoutMs(timing.session())
337+
.connectionTimeoutMs(timing.connection())
338+
.retryPolicy(new RetryOneTime(1))
339+
.maxCloseWaitMs(timing.forWaiting().milliseconds())
340+
.runSafeService(externalRunner)
341+
.build();
342+
client.start();
343+
client.runSafe(() -> {});
344+
assertTrue(enumerateThreads().anyMatch(t -> t.getName().contains("ExternalSafeNotifyService")));
345+
346+
client.close();
347+
348+
assertTrue(enumerateThreads().anyMatch(t -> t.getName().contains("ExternalSafeNotifyService")));
349+
350+
externalRunner.shutdownNow();
351+
assertTrue(externalRunner.awaitTermination(10, TimeUnit.SECONDS));
352+
}
353+
354+
private static Stream<Thread> enumerateThreads() {
355+
Thread[] threads = new Thread[Thread.activeCount()];
356+
Thread.enumerate(threads);
357+
return Arrays.stream(threads);
358+
}
309359
}

0 commit comments

Comments
 (0)