|
3 | 3 | import java.io.Closeable; |
4 | 4 | import java.io.IOException; |
5 | 5 | import java.util.List; |
| 6 | +import java.util.concurrent.CompletableFuture; |
6 | 7 | import java.util.concurrent.ConcurrentHashMap; |
7 | 8 | import java.util.concurrent.ConcurrentMap; |
8 | 9 | import java.util.concurrent.CopyOnWriteArrayList; |
| 10 | +import java.util.concurrent.TimeUnit; |
9 | 11 | import java.util.function.Consumer; |
10 | 12 |
|
| 13 | +import org.apache.logging.log4j.LogManager; |
| 14 | +import org.apache.logging.log4j.Logger; |
11 | 15 | import org.checkerframework.checker.nullness.qual.MonotonicNonNull; |
12 | 16 | import org.checkerframework.checker.nullness.qual.NonNull; |
13 | 17 |
|
|
16 | 20 | * This is used (for example) to avoid multiple JDKPoller registries for the same path |
17 | 21 | */ |
18 | 22 | public class BundledSubscription<Key extends @NonNull Object, Event extends @NonNull Object> implements ISubscribable<Key,Event> { |
| 23 | + private static final Logger logger = LogManager.getLogger(); |
19 | 24 | private final ISubscribable<Key, Event> wrapped; |
20 | 25 | private final ConcurrentMap<Key, Subscription<Event>> subscriptions = new ConcurrentHashMap<>(); |
21 | 26 |
|
@@ -71,15 +76,33 @@ public Closeable subscribe(Key target, Consumer<Event> eventListener) throws IOE |
71 | 76 | } |
72 | 77 | } |
73 | 78 | return () -> { |
| 79 | + boolean scheduleClose = false; |
74 | 80 | synchronized(active) { |
75 | 81 | active.remove(eventListener); |
76 | | - if (!active.hasActiveConsumers() && !active.closed) { |
77 | | - active.closed = true; |
78 | | - this.subscriptions.remove(target, active); |
79 | | - if (active.toBeClosed != null) { |
80 | | - active.toBeClosed.close(); |
81 | | - } |
82 | | - } |
| 82 | + scheduleClose = !active.hasActiveConsumers() && !active.closed; |
| 83 | + } |
| 84 | + if (scheduleClose) { |
| 85 | + // to avoid hammering the system with closes & registers in a short periode |
| 86 | + // we schedule the cleanup of watches in the background, when even after a small delay |
| 87 | + // nobody is interested in a certain file anymore |
| 88 | + CompletableFuture |
| 89 | + .delayedExecutor(100, TimeUnit.MILLISECONDS) |
| 90 | + .execute(() -> { |
| 91 | + synchronized(active) { |
| 92 | + if (!active.hasActiveConsumers() && !active.closed) { |
| 93 | + // still ready to be closed |
| 94 | + active.closed = true; |
| 95 | + this.subscriptions.remove(target, active); |
| 96 | + if (active.toBeClosed != null) { |
| 97 | + try { |
| 98 | + active.toBeClosed.close(); |
| 99 | + } catch (IOException e) { |
| 100 | + logger.error("Unhandled exception while closing the watcher for {} in the background", target, e); |
| 101 | + } |
| 102 | + } |
| 103 | + } |
| 104 | + } |
| 105 | + }); |
83 | 106 | } |
84 | 107 | }; |
85 | 108 | } |
|
0 commit comments