Skip to content

Commit a712a3a

Browse files
authored
Merge pull request #21 from SWAT-engineering/improved-overflow-support/event-handlers-with-active-watch
Improved overflow support: Internal event handlers can dispatch new events
2 parents 8d4dd06 + d52cfeb commit a712a3a

File tree

5 files changed

+26
-22
lines changed

5 files changed

+26
-22
lines changed

src/main/java/engineering/swat/watch/Watcher.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,13 @@
3232
import java.nio.file.Path;
3333
import java.util.concurrent.CompletableFuture;
3434
import java.util.concurrent.Executor;
35+
import java.util.function.BiConsumer;
3536
import java.util.function.Consumer;
3637

3738
import org.apache.logging.log4j.LogManager;
3839
import org.apache.logging.log4j.Logger;
3940

41+
import engineering.swat.watch.impl.EventHandlingWatch;
4042
import engineering.swat.watch.impl.jdk.JDKDirectoryWatch;
4143
import engineering.swat.watch.impl.jdk.JDKFileWatch;
4244
import engineering.swat.watch.impl.jdk.JDKRecursiveDirectoryWatch;
@@ -54,8 +56,8 @@ public class Watcher {
5456
private final Path path;
5557
private volatile Executor executor = CompletableFuture::runAsync;
5658

57-
private static final Consumer<WatchEvent> EMPTY_HANDLER = p -> {};
58-
private volatile Consumer<WatchEvent> eventHandler = EMPTY_HANDLER;
59+
private static final BiConsumer<EventHandlingWatch, WatchEvent> EMPTY_HANDLER = (w, e) -> {};
60+
private volatile BiConsumer<EventHandlingWatch, WatchEvent> eventHandler = EMPTY_HANDLER;
5961

6062

6163
private Watcher(WatchScope scope, Path path) {
@@ -103,7 +105,7 @@ public Watcher on(Consumer<WatchEvent> eventHandler) {
103105
if (this.eventHandler != EMPTY_HANDLER) {
104106
throw new IllegalArgumentException("on handler cannot be set more than once");
105107
}
106-
this.eventHandler = eventHandler;
108+
this.eventHandler = (w, e) -> eventHandler.accept(e);
107109
return this;
108110
}
109111

@@ -114,7 +116,7 @@ public Watcher on(WatchEventListener listener) {
114116
if (this.eventHandler != EMPTY_HANDLER) {
115117
throw new IllegalArgumentException("on handler cannot be set more than once");
116118
}
117-
this.eventHandler = ev -> {
119+
this.eventHandler = (w, ev) -> {
118120
switch (ev.getKind()) {
119121
case CREATED:
120122
listener.onCreated(ev);

src/main/java/engineering/swat/watch/impl/jdk/JDKBaseWatch.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import java.nio.file.StandardWatchEventKinds;
3232
import java.util.concurrent.Executor;
3333
import java.util.concurrent.atomic.AtomicBoolean;
34-
import java.util.function.Consumer;
34+
import java.util.function.BiConsumer;
3535

3636
import org.apache.logging.log4j.LogManager;
3737
import org.apache.logging.log4j.Logger;
@@ -45,10 +45,10 @@ public abstract class JDKBaseWatch implements EventHandlingWatch {
4545

4646
protected final Path path;
4747
protected final Executor exec;
48-
protected final Consumer<WatchEvent> eventHandler;
48+
protected final BiConsumer<EventHandlingWatch, WatchEvent> eventHandler;
4949
protected final AtomicBoolean started = new AtomicBoolean();
5050

51-
protected JDKBaseWatch(Path path, Executor exec, Consumer<WatchEvent> eventHandler) {
51+
protected JDKBaseWatch(Path path, Executor exec, BiConsumer<EventHandlingWatch, WatchEvent> eventHandler) {
5252
this.path = path;
5353
this.exec = exec;
5454
this.eventHandler = eventHandler;
@@ -120,7 +120,7 @@ private WatchEvent.Kind translate(java.nio.file.WatchEvent.Kind<?> jdkKind) {
120120

121121
@Override
122122
public void handleEvent(WatchEvent e) {
123-
eventHandler.accept(e);
123+
eventHandler.accept(this, e);
124124
}
125125

126126
@Override

src/main/java/engineering/swat/watch/impl/jdk/JDKDirectoryWatch.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,14 @@
3131
import java.nio.file.Path;
3232
import java.util.List;
3333
import java.util.concurrent.Executor;
34-
import java.util.function.Consumer;
34+
import java.util.function.BiConsumer;
3535

3636
import org.apache.logging.log4j.LogManager;
3737
import org.apache.logging.log4j.Logger;
3838
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
3939

4040
import engineering.swat.watch.WatchEvent;
41+
import engineering.swat.watch.impl.EventHandlingWatch;
4142
import engineering.swat.watch.impl.util.BundledSubscription;
4243
import engineering.swat.watch.impl.util.SubscriptionKey;
4344

@@ -49,11 +50,11 @@ public class JDKDirectoryWatch extends JDKBaseWatch {
4950
private static final BundledSubscription<SubscriptionKey, List<java.nio.file.WatchEvent<?>>>
5051
BUNDLED_JDK_WATCHERS = new BundledSubscription<>(JDKPoller::register);
5152

52-
public JDKDirectoryWatch(Path directory, Executor exec, Consumer<WatchEvent> eventHandler) {
53+
public JDKDirectoryWatch(Path directory, Executor exec, BiConsumer<EventHandlingWatch, WatchEvent> eventHandler) {
5354
this(directory, exec, eventHandler, false);
5455
}
5556

56-
public JDKDirectoryWatch(Path directory, Executor exec, Consumer<WatchEvent> eventHandler, boolean nativeRecursive) {
57+
public JDKDirectoryWatch(Path directory, Executor exec, BiConsumer<EventHandlingWatch, WatchEvent> eventHandler, boolean nativeRecursive) {
5758
super(directory, exec, eventHandler);
5859
this.nativeRecursive = nativeRecursive;
5960
}

src/main/java/engineering/swat/watch/impl/jdk/JDKFileWatch.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,14 @@
2929
import java.io.IOException;
3030
import java.nio.file.Path;
3131
import java.util.concurrent.Executor;
32-
import java.util.function.Consumer;
32+
import java.util.function.BiConsumer;
3333

3434
import org.apache.logging.log4j.LogManager;
3535
import org.apache.logging.log4j.Logger;
3636
import org.checkerframework.checker.nullness.qual.Nullable;
3737

3838
import engineering.swat.watch.WatchEvent;
39+
import engineering.swat.watch.impl.EventHandlingWatch;
3940

4041
/**
4142
* It's not possible to monitor a single file (or directory), so we have to find a directory watcher, and connect to that
@@ -46,17 +47,17 @@ public class JDKFileWatch extends JDKBaseWatch {
4647
private final Logger logger = LogManager.getLogger();
4748
private final JDKBaseWatch internal;
4849

49-
public JDKFileWatch(Path file, Executor exec, Consumer<WatchEvent> eventHandler) {
50+
public JDKFileWatch(Path file, Executor exec, BiConsumer<EventHandlingWatch, WatchEvent> eventHandler) {
5051
super(file, exec, eventHandler);
5152

5253
var message = "The root path is not a valid path for a file watch";
5354
var parent = requireNonNull(file.getParent(), message);
5455
var fileName = requireNonNull(file.getFileName(), message);
5556
assert !parent.equals(file);
5657

57-
this.internal = new JDKDirectoryWatch(parent, exec, e -> {
58+
this.internal = new JDKDirectoryWatch(parent, exec, (w, e) -> {
5859
if (fileName.equals(e.getRelativePath())) {
59-
eventHandler.accept(e);
60+
eventHandler.accept(w, e);
6061
}
6162
});
6263

src/main/java/engineering/swat/watch/impl/jdk/JDKRecursiveDirectoryWatch.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,24 +42,25 @@
4242
import java.util.concurrent.ConcurrentHashMap;
4343
import java.util.concurrent.ConcurrentMap;
4444
import java.util.concurrent.Executor;
45-
import java.util.function.Consumer;
45+
import java.util.function.BiConsumer;
4646

4747
import org.apache.logging.log4j.LogManager;
4848
import org.apache.logging.log4j.Logger;
4949

5050
import engineering.swat.watch.WatchEvent;
51+
import engineering.swat.watch.impl.EventHandlingWatch;
5152

5253
public class JDKRecursiveDirectoryWatch extends JDKBaseWatch {
5354
private final Logger logger = LogManager.getLogger();
5455
private final ConcurrentMap<Path, JDKDirectoryWatch> activeWatches = new ConcurrentHashMap<>();
5556

56-
public JDKRecursiveDirectoryWatch(Path directory, Executor exec, Consumer<WatchEvent> eventHandler) {
57+
public JDKRecursiveDirectoryWatch(Path directory, Executor exec, BiConsumer<EventHandlingWatch, WatchEvent> eventHandler) {
5758
super(directory, exec, eventHandler);
5859
}
5960

6061
private void processEvents(WatchEvent ev) {
6162
logger.trace("Forwarding event: {}", ev);
62-
eventHandler.accept(ev);
63+
eventHandler.accept(this, ev);
6364
logger.trace("Unwrapping event: {}", ev);
6465
switch (ev.getKind()) {
6566
case CREATED: handleCreate(ev); break;
@@ -71,10 +72,9 @@ private void processEvents(WatchEvent ev) {
7172

7273
private void publishExtraEvents(List<WatchEvent> ev) {
7374
logger.trace("Reporting new nested directories & files: {}", ev);
74-
ev.forEach(eventHandler);
75+
ev.forEach(e -> eventHandler.accept(this, e));
7576
}
7677

77-
7878
private void handleCreate(WatchEvent ev) {
7979
// between the event and the current state of the file system
8080
// we might have some nested directories we missed
@@ -161,9 +161,9 @@ private void addNewDirectory(Path dir) throws IOException {
161161
}
162162

163163
/** Make sure that the events are relative to the actual root of the recursive watch */
164-
private Consumer<WatchEvent> relocater(Path subRoot) {
164+
private BiConsumer<EventHandlingWatch, WatchEvent> relocater(Path subRoot) {
165165
final Path newRelative = path.relativize(subRoot);
166-
return ev -> {
166+
return (w, ev) -> {
167167
var rewritten = new WatchEvent(ev.getKind(), path, newRelative.resolve(ev.getRelativePath()));
168168
processEvents(rewritten);
169169
};

0 commit comments

Comments
 (0)