Skip to content

Commit 998d49a

Browse files
authored
Fix non-daemon-threads in the JDK poller and avoid polluting the commonPool (#59)
* Switch to an internal daemon threads based executor that also closes threads when not used that much * Do not use the common pool, but use our own fallback This way the default will make it harder to get users in a bad situation and provide a helper class that users might like to re-use.
1 parent 24af885 commit 998d49a

File tree

3 files changed

+87
-8
lines changed

3 files changed

+87
-8
lines changed
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* BSD 2-Clause License
3+
*
4+
* Copyright (c) 2023, Swat.engineering
5+
*
6+
* Redistribution and use in source and binary forms, with or without
7+
* modification, are permitted provided that the following conditions are met:
8+
*
9+
* 1. Redistributions of source code must retain the above copyright notice, this
10+
* list of conditions and the following disclaimer.
11+
*
12+
* 2. Redistributions in binary form must reproduce the above copyright notice,
13+
* this list of conditions and the following disclaimer in the documentation
14+
* and/or other materials provided with the distribution.
15+
*
16+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17+
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18+
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19+
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
20+
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21+
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
22+
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
23+
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
24+
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
25+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26+
*/
27+
package engineering.swat.watch;
28+
29+
import java.util.concurrent.ExecutorService;
30+
import java.util.concurrent.LinkedBlockingQueue;
31+
import java.util.concurrent.ThreadFactory;
32+
import java.util.concurrent.ThreadPoolExecutor;
33+
import java.util.concurrent.TimeUnit;
34+
import java.util.concurrent.atomic.AtomicInteger;
35+
36+
/**
37+
* Build thread pools that even when not properly shutdown, will still not prevent the termination of the JVM.
38+
*/
39+
public class DaemonThreadPool {
40+
private DaemonThreadPool() {}
41+
42+
/**
43+
* Generate a thread pool that will reuse threads, clear them after a while, but constrain the total amount of threads.
44+
* @param name name of the thread pool
45+
* @param maxThreads the maximum amount of threads to start in this pool, after this things will get queued.
46+
* @return an exectutor with deamon threads and constainted to a certain maximum
47+
*/
48+
public static ExecutorService buildConstrainedCached(String name, int maxThreads) {
49+
return new ThreadPoolExecutor(0, maxThreads,
50+
60, TimeUnit.SECONDS,
51+
new LinkedBlockingQueue<>(),
52+
buildFactory(name)
53+
);
54+
}
55+
56+
private static ThreadFactory buildFactory(String name) {
57+
return new ThreadFactory() {
58+
private final AtomicInteger id = new AtomicInteger(0);
59+
private final ThreadGroup group = new ThreadGroup(name);
60+
@Override
61+
public Thread newThread(Runnable r) {
62+
var t = new Thread(group, r, name + "-" + id.getAndIncrement());
63+
t.setDaemon(true);
64+
return t;
65+
}
66+
};
67+
}
68+
69+
70+
71+
}

src/main/java/engineering/swat/watch/Watch.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,14 @@
3030
import java.nio.file.Files;
3131
import java.nio.file.LinkOption;
3232
import java.nio.file.Path;
33-
import java.util.concurrent.CompletableFuture;
3433
import java.util.concurrent.Executor;
3534
import java.util.function.BiConsumer;
3635
import java.util.function.Consumer;
3736
import java.util.function.Predicate;
3837

3938
import org.apache.logging.log4j.LogManager;
4039
import org.apache.logging.log4j.Logger;
40+
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
4141

4242
import engineering.swat.watch.impl.EventHandlingWatch;
4343
import engineering.swat.watch.impl.jdk.JDKDirectoryWatch;
@@ -58,7 +58,9 @@ public class Watch {
5858
private final Path path;
5959
private final WatchScope scope;
6060
private volatile Approximation approximateOnOverflow = Approximation.ALL;
61-
private volatile Executor executor = CompletableFuture::runAsync;
61+
62+
private static final Executor FALLBACK_EXECUTOR = DaemonThreadPool.buildConstrainedCached("JavaWatch-internal-handler",Runtime.getRuntime().availableProcessors());
63+
private volatile @MonotonicNonNull Executor executor = null;
6264

6365
private static final BiConsumer<EventHandlingWatch, WatchEvent> EMPTY_HANDLER = (w, e) -> {};
6466
private volatile BiConsumer<EventHandlingWatch, WatchEvent> eventHandler = EMPTY_HANDLER;
@@ -162,11 +164,14 @@ Watch filter(Predicate<WatchEvent> predicate) {
162164

163165
/**
164166
* Optionally configure the executor in which the {@link #on(Consumer)} callbacks are scheduled.
165-
* If not defined, every task will be scheduled on the {@link java.util.concurrent.ForkJoinPool#commonPool()}.
167+
* Make sure to consider the termination of the threadpool, it should be after the close of the active watch.
166168
* @param callbackHandler worker pool to use
167169
* @return this for optional method chaining
168170
*/
169171
public Watch withExecutor(Executor callbackHandler) {
172+
if (callbackHandler == null) {
173+
throw new IllegalArgumentException("null is allowed");
174+
}
170175
this.executor = callbackHandler;
171176
return this;
172177
}
@@ -197,8 +202,12 @@ public ActiveWatch start() throws IOException {
197202
if (this.eventHandler == EMPTY_HANDLER) {
198203
throw new IllegalStateException("There is no onEvent handler defined");
199204
}
205+
var executor = this.executor;
206+
if (executor == null) {
207+
executor = FALLBACK_EXECUTOR;
208+
}
200209

201-
var h = applyApproximateOnOverflow();
210+
var h = applyApproximateOnOverflow(executor);
202211

203212
switch (scope) {
204213
case PATH_AND_CHILDREN: {
@@ -230,7 +239,7 @@ public ActiveWatch start() throws IOException {
230239
}
231240
}
232241

233-
private BiConsumer<EventHandlingWatch, WatchEvent> applyApproximateOnOverflow() {
242+
private BiConsumer<EventHandlingWatch, WatchEvent> applyApproximateOnOverflow(Executor executor) {
234243
switch (approximateOnOverflow) {
235244
case NONE:
236245
return eventHandler;

src/main/java/engineering/swat/watch/impl/jdk/JDKPoller.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
import java.util.concurrent.ConcurrentHashMap;
4646
import java.util.concurrent.ExecutionException;
4747
import java.util.concurrent.ExecutorService;
48-
import java.util.concurrent.Executors;
4948
import java.util.concurrent.TimeUnit;
5049
import java.util.function.Consumer;
5150

@@ -55,6 +54,7 @@
5554

5655
import com.sun.nio.file.ExtendedWatchEventModifier;
5756

57+
import engineering.swat.watch.DaemonThreadPool;
5858
import engineering.swat.watch.impl.mac.MacWatchService;
5959
import engineering.swat.watch.impl.util.SubscriptionKey;
6060

@@ -67,12 +67,11 @@ private JDKPoller() {}
6767
private static final Logger logger = LogManager.getLogger();
6868
private static final Map<WatchKey, Consumer<List<WatchEvent<?>>>> watchers = new ConcurrentHashMap<>();
6969
private static final WatchService service;
70-
private static final int nCores = Runtime.getRuntime().availableProcessors();
7170
/**
7271
* We have to be a bit careful with registering too many paths in parallel
7372
* Linux can be thrown into a deadlock if you try to start 1000 threads and then do a register at the same time.
7473
*/
75-
private static final ExecutorService registerPool = Executors.newFixedThreadPool(nCores);
74+
private static final ExecutorService registerPool = DaemonThreadPool.buildConstrainedCached("JavaWatch-rate-limit-registry", Runtime.getRuntime().availableProcessors());
7675

7776
static {
7877
try {

0 commit comments

Comments
 (0)