R traceExit(Message message, R result);
+ void warn(Marker marker, Message message);
+ void warn(Marker marker, Message message, @Nullable Throwable throwable);
+ void warn(Marker marker, MessageSupplier messageSupplier);
+ void warn(Marker marker, MessageSupplier messageSupplier, @Nullable Throwable throwable);
+ void warn(Marker marker, CharSequence message);
+ void warn(Marker marker, CharSequence message, @Nullable Throwable throwable);
+ void warn(Marker marker, Object message);
+ void warn(Marker marker, Object message, @Nullable Throwable throwable);
+ void warn(Marker marker, String message);
+ void warn(Marker marker, String message, @Nullable Object... params);
+ @SuppressWarnings("deprecation")
+ void warn(Marker marker, String message, Supplier>... paramSuppliers);
+ void warn(Marker marker, String message, @Nullable Throwable throwable);
+ @SuppressWarnings("deprecation")
+ void warn(Marker marker, Supplier> messageSupplier);
+ @SuppressWarnings("deprecation")
+ void warn(Marker marker, Supplier> messageSupplier, @Nullable Throwable throwable);
+ void warn(Message message);
+ void warn(Message message, @Nullable Throwable throwable);
+ void warn(MessageSupplier messageSupplier);
+ void warn(MessageSupplier messageSupplier, @Nullable Throwable throwable);
+ void warn(CharSequence message);
+ void warn(CharSequence message, @Nullable Throwable throwable);
+ void warn(Object message);
+ void warn(Object message, @Nullable Throwable throwable);
+ void warn(String message);
+ void warn(String message, @Nullable Object... params);
+ @SuppressWarnings("deprecation")
+ void warn(String message, Supplier>... paramSuppliers);
+ void warn(String message, @Nullable Throwable throwable);
+ @SuppressWarnings("deprecation")
+ void warn(Supplier> messageSupplier);
+ @SuppressWarnings("deprecation")
+ void warn(Supplier> messageSupplier, @Nullable Throwable throwable);
+ void warn(Marker marker, String message, @Nullable Object p0);
+ void warn(Marker marker, String message, @Nullable Object p0, @Nullable Object p1);
+ void warn(Marker marker, String message, @Nullable Object p0, @Nullable Object p1, @Nullable Object p2);
+ void warn(Marker marker, String message, @Nullable Object p0, @Nullable Object p1, @Nullable Object p2, @Nullable Object p3);
+ void warn(Marker marker, String message, @Nullable Object p0, @Nullable Object p1, @Nullable Object p2, @Nullable Object p3, @Nullable Object p4);
+ void warn(Marker marker, String message, @Nullable Object p0, @Nullable Object p1, @Nullable Object p2, @Nullable Object p3, @Nullable Object p4, @Nullable Object p5);
+ void warn(
+ Marker marker, String message, @Nullable Object p0, @Nullable Object p1, @Nullable Object p2, @Nullable Object p3, @Nullable Object p4, @Nullable Object p5, @Nullable Object p6);
+ void warn(
+ Marker marker,
+ String message,
+ @Nullable Object p0,
+ @Nullable Object p1,
+ @Nullable Object p2,
+ @Nullable Object p3,
+ @Nullable Object p4,
+ @Nullable Object p5,
+ @Nullable Object p6,
+ @Nullable Object p7);
+ void warn(
+ Marker marker,
+ String message,
+ @Nullable Object p0,
+ @Nullable Object p1,
+ @Nullable Object p2,
+ @Nullable Object p3,
+ @Nullable Object p4,
+ @Nullable Object p5,
+ @Nullable Object p6,
+ @Nullable Object p7,
+ @Nullable Object p8);
+ void warn(
+ Marker marker,
+ String message,
+ @Nullable Object p0,
+ @Nullable Object p1,
+ @Nullable Object p2,
+ @Nullable Object p3,
+ @Nullable Object p4,
+ @Nullable Object p5,
+ @Nullable Object p6,
+ @Nullable Object p7,
+ @Nullable Object p8,
+ @Nullable Object p9);
+ void warn(String message, @Nullable Object p0);
+ void warn(String message, @Nullable Object p0, @Nullable Object p1);
+ void warn(String message, @Nullable Object p0, @Nullable Object p1, @Nullable Object p2);
+ void warn(String message, @Nullable Object p0, @Nullable Object p1, @Nullable Object p2, @Nullable Object p3);
+ void warn(String message, @Nullable Object p0, @Nullable Object p1, @Nullable Object p2, @Nullable Object p3, @Nullable Object p4);
+ void warn(String message, @Nullable Object p0, @Nullable Object p1, @Nullable Object p2, @Nullable Object p3, @Nullable Object p4, @Nullable Object p5);
+ void warn(String message, @Nullable Object p0, @Nullable Object p1, @Nullable Object p2, @Nullable Object p3, @Nullable Object p4, @Nullable Object p5, @Nullable Object p6);
+ void warn(String message, @Nullable Object p0, @Nullable Object p1, @Nullable Object p2, @Nullable Object p3, @Nullable Object p4, @Nullable Object p5, @Nullable Object p6, @Nullable Object p7);
+ void warn(
+ String message,
+ @Nullable Object p0,
+ @Nullable Object p1,
+ @Nullable Object p2,
+ @Nullable Object p3,
+ @Nullable Object p4,
+ @Nullable Object p5,
+ @Nullable Object p6,
+ @Nullable Object p7,
+ @Nullable Object p8);
+ void warn(
+ String message,
+ @Nullable Object p0,
+ @Nullable Object p1,
+ @Nullable Object p2,
+ @Nullable Object p3,
+ @Nullable Object p4,
+ @Nullable Object p5,
+ @Nullable Object p6,
+ @Nullable Object p7,
+ @Nullable Object p8,
+ @Nullable Object p9);
+ default void logMessage(
+ Level level, Marker marker, String fqcn, StackTraceElement location, Message message, @Nullable Throwable throwable) {
+ // noop
+ }
+ default LogBuilder atTrace() {
+ return LogBuilder.NOOP;
+ }
+ default LogBuilder atDebug() {
+ return LogBuilder.NOOP;
+ }
+ default LogBuilder atInfo() {
+ return LogBuilder.NOOP;
+ }
+ default LogBuilder atWarn() {
+ return LogBuilder.NOOP;
+ }
+ default LogBuilder atError() {
+ return LogBuilder.NOOP;
+ }
+ default LogBuilder atFatal() {
+ return LogBuilder.NOOP;
+ }
+ default LogBuilder always() {
+ return LogBuilder.NOOP;
+ }
+ default LogBuilder atLevel(Level level) {
+ return LogBuilder.NOOP;
+ }
+}
diff --git a/src/main/java/engineering/swat/watch/ActiveWatch.java b/src/main/java/engineering/swat/watch/ActiveWatch.java
new file mode 100644
index 00000000..a3a3dc10
--- /dev/null
+++ b/src/main/java/engineering/swat/watch/ActiveWatch.java
@@ -0,0 +1,12 @@
+package engineering.swat.watch;
+
+import java.io.Closeable;
+
+/**
+ * Marker interface for an active watch, in the future might get properties you can inspect.
+ *
+ * For now, make sure to close the watch when not interested in new events
+ */
+public interface ActiveWatch extends Closeable {
+
+}
diff --git a/src/main/java/engineering/swat/watch/WatchEvent.java b/src/main/java/engineering/swat/watch/WatchEvent.java
new file mode 100644
index 00000000..27c34534
--- /dev/null
+++ b/src/main/java/engineering/swat/watch/WatchEvent.java
@@ -0,0 +1,83 @@
+package engineering.swat.watch;
+
+import java.nio.file.Path;
+
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * The library publishes these events to all subscribers, they are immutable and safe to share around.
+ */
+public class WatchEvent {
+
+ /**
+ * What happened with the file or directory
+ */
+ public enum Kind {
+ /**
+ * A path entry was created. Be careful not to assume that when the event arrives, the path still exists.
+ **/
+ CREATED,
+ /**
+ * The path entry was saved. It is platform specific if this relates to flushes or other events.
+ * a single user action can generate multiple of these events.
+ */
+ MODIFIED,
+ /**
+ * The path entry was deleted.
+ * Note that if the path entry was the watched item (aka the root of the watch),
+ * there is no guarantee if you will receive this event (depending on the level and on the platform).
+ * The watch will be invalid after that, even if a new item is created afterwards with the same name.
+ * In some cases this can be fixed/detected by also watching the parent, but that is only valid if they are on the same mountpoint.
+ */
+ DELETED,
+ /**
+ * Rare event where there were so many file events, that the kernel lost a few.
+ * In that case you'll have to consider the whole directory (and its sub directories) as modified.
+ * The library will try and send events for new and deleted files, but it won't be able to detect modified files.
+ */
+ OVERFLOW
+ }
+
+ private final Kind kind;
+ private final Path rootPath;
+ private final Path relativePath;
+
+ public WatchEvent(Kind kind, Path rootPath, @Nullable Path relativePath) {
+ this.kind = kind;
+ this.rootPath = rootPath;
+ this.relativePath = relativePath == null ? Path.of("") : relativePath;
+ }
+
+ public Kind getKind() {
+ return this.kind;
+ }
+
+ /**
+ *
+ * @return the path relative to the monitored root, it can be empty path if it's the root.
+ */
+ public Path getRelativePath() {
+ return relativePath;
+ }
+
+ /**
+ *
+ * @return A copy of the root path that this event belongs to.
+ */
+ public Path getRootPath() {
+ return rootPath;
+ }
+
+ /**
+ * @return utility function that resolves the relative path to the full path.
+ */
+ public Path calculateFullPath() {
+ return rootPath.resolve(relativePath);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("WatchEvent[%s, %s, %s]", this.rootPath, this.kind, this.relativePath);
+ }
+
+}
diff --git a/src/main/java/engineering/swat/watch/WatchScope.java b/src/main/java/engineering/swat/watch/WatchScope.java
new file mode 100644
index 00000000..ec0e11b7
--- /dev/null
+++ b/src/main/java/engineering/swat/watch/WatchScope.java
@@ -0,0 +1,32 @@
+package engineering.swat.watch;
+
+/**
+ * Configure the depth of the events you want to receive for a given path
+ */
+public enum WatchScope {
+ /**
+ * Watch changes to a single file or (metadata of) a single directory.
+ *
+ * Note, depending on the platform you can receive events for a directory
+ * in case of these events:
+ *
+ * - a MODIFIED caused by the creation of a nested file/directory
+ * - a MODIFIED caused by the deletion of a nested file/directory
+ * - a MODIFIED of its own metadata
+ *
+ *
+ * In most cases when Path is a Directory you're interested in which nested entries changes, in that case use {@link #PATH_AND_CHILDREN} or {@link #PATH_AND_ALL_DESCENDANTS}.
+ */
+ PATH_ONLY,
+ /**
+ * Watch changes to (metadata of) a directory and its content,
+ * non-recursively. That is, changes to the content of nested directories
+ * are not watched.
+ */
+ PATH_AND_CHILDREN,
+ /**
+ * Watch changes to (metadata of) a directory and its content, recursively.
+ * That is, changes to the content of nested directories are also watched.
+ */
+ PATH_AND_ALL_DESCENDANTS
+}
diff --git a/src/main/java/engineering/swat/watch/Watcher.java b/src/main/java/engineering/swat/watch/Watcher.java
new file mode 100644
index 00000000..91c1e8db
--- /dev/null
+++ b/src/main/java/engineering/swat/watch/Watcher.java
@@ -0,0 +1,133 @@
+package engineering.swat.watch;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.LinkOption;
+import java.nio.file.Path;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import engineering.swat.watch.impl.jdk.JDKDirectoryWatcher;
+import engineering.swat.watch.impl.jdk.JDKFileWatcher;
+import engineering.swat.watch.impl.jdk.JDKRecursiveDirectoryWatcher;
+
+/**
+ * Watch a path for changes.
+ *
+ *
+ * It will avoid common errors using the raw apis, and will try to use the most native api where possible.
+ * Note, there are differences per platform that cannot be avoided, please review the readme of the library.
+ */
+public class Watcher {
+ private final Logger logger = LogManager.getLogger();
+ private final WatchScope scope;
+ private final Path path;
+ private volatile Executor executor = CompletableFuture::runAsync;
+
+ private static final Consumer NULL_HANDLER = p -> {};
+ private volatile Consumer eventHandler = NULL_HANDLER;
+
+
+ private Watcher(WatchScope scope, Path path) {
+ this.scope = scope;
+ this.path = path;
+ }
+
+ /**
+ * Watch a path for updates, optionally also get events for its children/descendants
+ * @param path which absolute path to monitor, can be a file or a directory, but has to be absolute
+ * @param scope for directories you can also choose to monitor it's direct children or all it's descendants
+ * @throws IllegalArgumentException in case a path is not supported (in relation to the scope)
+ */
+ public static Watcher watch(Path path, WatchScope scope) {
+ if (!path.isAbsolute()) {
+ throw new IllegalArgumentException("We can only watch absolute paths");
+ }
+ switch (scope) {
+ case PATH_AND_CHILDREN: // intended fallthrough
+ case PATH_AND_ALL_DESCENDANTS:
+ if (!Files.isDirectory(path, LinkOption.NOFOLLOW_LINKS)) {
+ throw new IllegalArgumentException("Only directories are supported for this scope: " + scope);
+ }
+ break;
+ case PATH_ONLY:
+ if (Files.isSymbolicLink(path)) {
+ throw new IllegalArgumentException("Symlinks are not supported");
+ }
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported scope: " + scope);
+
+ }
+ return new Watcher(scope, path);
+ }
+
+ /**
+ * Callback that gets executed for every event. Can get called quite a bit, so be careful what happens here.
+ * Use the {@link #withExecutor(Executor)} function to influence the sequencing of these events.
+ * By default they can arrive in parallel.
+ * @param eventHandler a callback that handles the watch event, will be called once per event.
+ * @return this for optional method chaining
+ */
+ public Watcher onEvent(Consumer eventHandler) {
+ this.eventHandler = eventHandler;
+ return this;
+ }
+
+ /**
+ * Optionally configure the executor in which the {@link #onEvent(Consumer)} callbacks are scheduled.
+ * If not defined, every task will be scheduled on the {@link java.util.concurrent.ForkJoinPool#commonPool()}.
+ * @param callbackHandler worker pool to use
+ * @return this for optional method chaining
+ */
+ public Watcher withExecutor(Executor callbackHandler) {
+ this.executor = callbackHandler;
+ return this;
+ }
+
+ /**
+ * Start watch the path for events.
+ * @return a subscription for the watch, when closed, new events will stop being registered to the worker pool.
+ * @throws IOException in case the starting of the watcher caused an underlying IO exception
+ * @throws IllegalStateException the watchers is not configured correctly (for example, missing {@link #onEvent(Consumer)}, or a watcher is started twice)
+ */
+ public ActiveWatch start() throws IOException {
+ if (this.eventHandler == NULL_HANDLER) {
+ throw new IllegalStateException("There is no onEvent handler defined");
+ }
+ switch (scope) {
+ case PATH_AND_CHILDREN: {
+ var result = new JDKDirectoryWatcher(path, executor, this.eventHandler, false);
+ result.start();
+ return result;
+ }
+ case PATH_AND_ALL_DESCENDANTS: {
+ try {
+ var result = new JDKDirectoryWatcher(path, executor, this.eventHandler, true);
+ result.start();
+ return result;
+ } catch (Throwable ex) {
+ // no native support, use the simulation
+ logger.debug("Not possible to register the native watcher, using fallback for {}", path);
+ logger.trace(ex);
+ var result = new JDKRecursiveDirectoryWatcher(path, executor, this.eventHandler);
+ result.start();
+ return result;
+ }
+ }
+ case PATH_ONLY: {
+ var result = new JDKFileWatcher(path, executor, this.eventHandler);
+ result.start();
+ return result;
+ }
+
+ default:
+ throw new IllegalStateException("Not supported yet");
+ }
+ }
+
+}
diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKDirectoryWatcher.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKDirectoryWatcher.java
new file mode 100644
index 00000000..a5c94dfd
--- /dev/null
+++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKDirectoryWatcher.java
@@ -0,0 +1,105 @@
+package engineering.swat.watch.impl.jdk;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.StandardWatchEventKinds;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+import engineering.swat.watch.ActiveWatch;
+import engineering.swat.watch.WatchEvent;
+import engineering.swat.watch.impl.util.BundledSubscription;
+import engineering.swat.watch.impl.util.SubscriptionKey;
+
+public class JDKDirectoryWatcher implements ActiveWatch {
+ private final Logger logger = LogManager.getLogger();
+ private final Path directory;
+ private final Executor exec;
+ private final Consumer eventHandler;
+ private volatile @MonotonicNonNull Closeable activeWatch;
+ private final boolean nativeRecursive;
+
+ private static final BundledSubscription>>
+ BUNDLED_JDK_WATCHERS = new BundledSubscription<>(JDKPoller::register);
+
+ public JDKDirectoryWatcher(Path directory, Executor exec, Consumer eventHandler) {
+ this(directory, exec, eventHandler, false);
+ }
+
+ public JDKDirectoryWatcher(Path directory, Executor exec, Consumer eventHandler, boolean nativeRecursive) {
+ this.directory = directory;
+ this.exec = exec;
+ this.eventHandler = eventHandler;
+ this.nativeRecursive = nativeRecursive;
+ }
+
+
+ synchronized boolean safeStart() throws IOException {
+ if (activeWatch != null) {
+ return false;
+ }
+ activeWatch = BUNDLED_JDK_WATCHERS.subscribe(new SubscriptionKey(directory, nativeRecursive), this::handleChanges);
+ return true;
+ }
+
+ public void start() throws IOException {
+ try {
+ if (!safeStart()) {
+ throw new IllegalStateException("Cannot start a watcher twice");
+ }
+ logger.debug("Started watch for: {}", directory);
+ } catch (IOException e) {
+ throw new IOException("Could not register directory watcher for: " + directory, e);
+ }
+ }
+
+ private void handleChanges(List> events) {
+ exec.execute(() -> {
+ for (var ev : events) {
+ try {
+ eventHandler.accept(translate(ev));
+ }
+ catch (Throwable ignored) {
+ logger.error("Ignoring downstream exception:", ignored);
+ }
+ }
+ });
+ }
+
+ private WatchEvent translate(java.nio.file.WatchEvent> ev) {
+ WatchEvent.Kind kind;
+ if (ev.kind() == StandardWatchEventKinds.ENTRY_CREATE) {
+ kind = WatchEvent.Kind.CREATED;
+ }
+ else if (ev.kind() == StandardWatchEventKinds.ENTRY_MODIFY) {
+ kind = WatchEvent.Kind.MODIFIED;
+ }
+ else if (ev.kind() == StandardWatchEventKinds.ENTRY_DELETE) {
+ kind = WatchEvent.Kind.DELETED;
+ }
+ else if (ev.kind() == StandardWatchEventKinds.OVERFLOW) {
+ kind = WatchEvent.Kind.OVERFLOW;
+ }
+ else {
+ throw new IllegalArgumentException("Unexpected watch event: " + ev);
+ }
+ var path = kind == WatchEvent.Kind.OVERFLOW ? this.directory : (@Nullable Path)ev.context();
+ logger.trace("Translated: {} to {} at {}", ev, kind, path);
+ return new WatchEvent(kind, directory, path);
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ if (activeWatch != null) {
+ logger.trace("Closing watch for: {}", this.directory);
+ activeWatch.close();
+ }
+ }
+}
diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileWatcher.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileWatcher.java
new file mode 100644
index 00000000..535401a3
--- /dev/null
+++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileWatcher.java
@@ -0,0 +1,79 @@
+package engineering.swat.watch.impl.jdk;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+
+import engineering.swat.watch.ActiveWatch;
+import engineering.swat.watch.WatchEvent;
+
+/**
+ * It's not possible to monitor a single file (or directory), so we have to find a directory watcher, and connect to that
+ *
+ * Note that you should take care to call start only once.
+ */
+public class JDKFileWatcher implements ActiveWatch {
+ private final Logger logger = LogManager.getLogger();
+ private final Path file;
+ private final Path fileName;
+ private final Executor exec;
+ private final Consumer eventHandler;
+ private volatile @MonotonicNonNull Closeable activeWatch;
+
+ public JDKFileWatcher(Path file, Executor exec, Consumer eventHandler) {
+ this.file = file;
+ Path filename= file.getFileName();
+ if (filename == null) {
+ throw new IllegalArgumentException("Cannot pass in a root path");
+ }
+ this.fileName = filename;
+ this.exec = exec;
+ this.eventHandler = eventHandler;
+ }
+
+ /**
+ * Start the file watcher, but only do it once
+ * @throws IOException
+ */
+ public void start() throws IOException {
+ try {
+ var dir = file.getParent();
+ if (dir == null) {
+ throw new IllegalArgumentException("cannot watch a single entry that is on the root");
+
+ }
+ assert !dir.equals(file);
+ JDKDirectoryWatcher parentWatch;
+ synchronized(this) {
+ if (activeWatch != null) {
+ throw new IOException("Cannot start an already started watch");
+ }
+ activeWatch = parentWatch = new JDKDirectoryWatcher(dir, exec, this::filter);
+ parentWatch.start();
+ }
+ logger.debug("Started file watch for {} (in reality a watch on {}): {}", file, dir, parentWatch);
+
+ } catch (IOException e) {
+ throw new IOException("Could not register file watcher for: " + file, e);
+ }
+ }
+
+ private void filter(WatchEvent event) {
+ if (fileName.equals(event.getRelativePath())) {
+ eventHandler.accept(event);
+ }
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ if (activeWatch != null) {
+ activeWatch.close();
+ }
+ }
+}
diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKPoller.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKPoller.java
new file mode 100644
index 00000000..2f0c9992
--- /dev/null
+++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKPoller.java
@@ -0,0 +1,133 @@
+package engineering.swat.watch.impl.jdk;
+
+import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
+import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;
+import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
+import static java.nio.file.StandardWatchEventKinds.OVERFLOW;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.file.FileSystems;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import com.sun.nio.file.ExtendedWatchEventModifier;
+
+import engineering.swat.watch.impl.util.SubscriptionKey;
+
+/**
+ * This class is a wrapper around the JDK WatchService, it takes care to poll the service for new events, and then distributes them to the right parties
+ */
+class JDKPoller {
+ private JDKPoller() {}
+
+ private static final Logger logger = LogManager.getLogger();
+ private static final Map>>> watchers = new ConcurrentHashMap<>();
+ private static final WatchService service;
+ private static final int nCores = Runtime.getRuntime().availableProcessors();
+ /**
+ * We have to be a bit careful with registering too many paths in parallel
+ * Linux can be thrown into a deadlock if you try to start 1000 threads and then do a register at the same time.
+ */
+ private static final ExecutorService registerPool = Executors.newFixedThreadPool(nCores);
+
+ static {
+ try {
+ service = FileSystems.getDefault().newWatchService();
+ } catch (IOException e) {
+ throw new RuntimeException("Could not start watcher", e);
+ }
+ // kick off the poll loop
+ poll();
+ }
+
+ private static void poll() {
+ try {
+ WatchKey hit;
+ while ((hit = service.poll()) != null) {
+ logger.trace("Got hit: {}", hit);
+ try {
+ var watchHandler = watchers.get(hit);
+ if (watchHandler != null) {
+ var events = hit.pollEvents();
+ logger.trace("Found watcher for hit: {}, sending: {} (size: {})", watchHandler, events, events.size());
+ watchHandler.accept(events);
+ }
+ }
+ catch (Throwable t) {
+ logger.catching(Level.INFO, t);
+ // one exception shouldn't stop all the processing
+ }
+ finally{
+ hit.reset();
+ }
+ }
+ }
+ finally {
+ // schedule next run
+ // note we don't want to have multiple polls running in parallel
+ // so that is why we only schedule the next one after we're done
+ // processing all messages
+ CompletableFuture
+ .delayedExecutor(1, TimeUnit.MILLISECONDS)
+ .execute(JDKPoller::poll);
+ }
+ }
+
+
+ public static Closeable register(SubscriptionKey path, Consumer>> changesHandler) throws IOException {
+ logger.debug("Register watch for: {}", path);
+
+ try {
+ return CompletableFuture.supplyAsync(() -> {
+ try {
+ WatchEvent.Kind>[] kinds = new WatchEvent.Kind[]{ ENTRY_CREATE, ENTRY_MODIFY, OVERFLOW, ENTRY_DELETE };
+ if (path.isRecursive()) {
+ return path.getPath().register(service, kinds, ExtendedWatchEventModifier.FILE_TREE);
+ }
+ else {
+ return path.getPath().register(service, kinds);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }, registerPool) // read registerPool why we have to add a limiter here
+ .thenApplyAsync(key -> {
+ watchers.put(key, changesHandler);
+ return new Closeable() {
+ @Override
+ public void close() throws IOException {
+ logger.debug("Closing watch for: {}", path);
+ if (watchers.remove(key, changesHandler)) {
+ key.cancel();
+ }
+ }
+ };
+ })
+ .get(); // we have to do a get here, to make sure the `register` function blocks
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof RuntimeException && e.getCause().getCause() instanceof IOException) {
+ throw (IOException)e.getCause().getCause();
+ }
+ throw new IOException("Could not register path", e.getCause());
+ } catch (InterruptedException e) {
+ // the pool was closing, forward it
+ Thread.currentThread().interrupt();
+ throw new IOException("The registration was canceled");
+ }
+ }
+}
diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKRecursiveDirectoryWatcher.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKRecursiveDirectoryWatcher.java
new file mode 100644
index 00000000..64e15c6f
--- /dev/null
+++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKRecursiveDirectoryWatcher.java
@@ -0,0 +1,318 @@
+package engineering.swat.watch.impl.jdk;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import engineering.swat.watch.ActiveWatch;
+import engineering.swat.watch.WatchEvent;
+
+public class JDKRecursiveDirectoryWatcher implements ActiveWatch {
+ private final Logger logger = LogManager.getLogger();
+ private final Path root;
+ private final Executor exec;
+ private final Consumer eventHandler;
+ private final ConcurrentMap activeWatches = new ConcurrentHashMap<>();
+
+ public JDKRecursiveDirectoryWatcher(Path directory, Executor exec, Consumer eventHandler) {
+ this.root = directory;
+ this.exec = exec;
+ this.eventHandler = eventHandler;
+ }
+
+ public void start() throws IOException {
+ try {
+ logger.debug("Starting recursive watch for: {}", root);
+ registerInitialWatches(root);
+ } catch (IOException e) {
+ throw new IOException("Could not register directory watcher for: " + root, e);
+ }
+ }
+
+ private void processEvents(WatchEvent ev) {
+ logger.trace("Forwarding event: {}", ev);
+ eventHandler.accept(ev);
+ logger.trace("Unwrapping event: {}", ev);
+ try {
+ switch (ev.getKind()) {
+ case CREATED: handleCreate(ev); break;
+ case DELETED: handleDeleteDirectory(ev); break;
+ case OVERFLOW: handleOverflow(ev); break;
+ case MODIFIED: break;
+ }
+ } finally {
+ }
+ }
+
+ private void publishExtraEvents(List ev) {
+ logger.trace("Reporting new nested directories & files: {}", ev);
+ ev.forEach(eventHandler);
+ }
+
+
+ private void handleCreate(WatchEvent ev) {
+ // between the event and the current state of the file system
+ // we might have some nested directories we missed
+ // so if we have a new directory, we have to go in and iterate over it
+ // we also have to report all nested files & dirs as created paths
+ // but we don't want to delay the publication of this
+ // create till after the processing is done, so we schedule it in the background
+ var fullPath = ev.calculateFullPath();
+ if (!activeWatches.containsKey(fullPath)) {
+ CompletableFuture
+ .completedFuture(fullPath)
+ .thenApplyAsync(this::registerForNewDirectory, exec)
+ .thenAcceptAsync(this::publishExtraEvents, exec)
+ .exceptionally(ex -> {
+ logger.error("Could not locate new sub directories for: {}", ev.calculateFullPath(), ex);
+ return null;
+ });
+ }
+ }
+
+ private void handleOverflow(WatchEvent ev) {
+ logger.info("Overflow detected, rescanning to find missed entries in {}", root);
+ CompletableFuture
+ .completedFuture(ev.calculateFullPath())
+ .thenApplyAsync(this::syncAfterOverflow, exec)
+ .thenAcceptAsync(this::publishExtraEvents, exec)
+ .exceptionally(ex -> {
+ logger.error("Could not register new watch for: {} ({})", ev.calculateFullPath(), ex);
+ return null;
+ });
+ }
+
+ private void handleDeleteDirectory(WatchEvent ev) {
+ var removedPath = ev.calculateFullPath();
+ try {
+ var existingWatch = activeWatches.remove(removedPath);
+ if (existingWatch != null) {
+ logger.debug("Clearing watch on removed directory: {}", removedPath);
+ existingWatch.close();
+ }
+ } catch (IOException ex) {
+ logger.error("Error clearing: {} {}", removedPath, ex);
+ }
+ }
+
+ /** Only register a watch for every sub directory */
+ private class InitialDirectoryScan extends SimpleFileVisitor {
+ protected final Path subRoot;
+
+ public InitialDirectoryScan(Path root) {
+ this.subRoot = root;
+ }
+ @Override
+ public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException {
+ logger.error("We could not visit {} to schedule recursive file watches: {}", file, exc);
+ return FileVisitResult.CONTINUE;
+ }
+
+ @Override
+ public FileVisitResult preVisitDirectory(Path subdir, BasicFileAttributes attrs) throws IOException {
+ addNewDirectory(subdir);
+ return FileVisitResult.CONTINUE;
+ }
+
+ @Override
+ public FileVisitResult postVisitDirectory(Path subdir, IOException exc) throws IOException {
+ if (exc != null) {
+ logger.error("Error during directory iteration: {} = {}", subdir, exc);
+ }
+ return FileVisitResult.CONTINUE;
+ }
+
+ private void addNewDirectory(Path dir) throws IOException {
+ var watcher = activeWatches.computeIfAbsent(dir, d -> new JDKDirectoryWatcher(d, exec, relocater(dir)));
+ try {
+ if (!watcher.safeStart()) {
+ logger.debug("We lost the race on starting a nested watcher, that shouldn't be a problem, but it's a very busy, so we might have lost a few events in {}", dir);
+ }
+ } catch (IOException ex) {
+ activeWatches.remove(dir);
+ logger.error("Could not register a watch for: {} ({})", dir, ex);
+ throw ex;
+ }
+ }
+
+ /** Make sure that the events are relative to the actual root of the recursive watcher */
+ private Consumer relocater(Path subRoot) {
+ final Path newRelative = root.relativize(subRoot);
+ return ev -> {
+ var rewritten = new WatchEvent(ev.getKind(), root, newRelative.resolve(ev.getRelativePath()));
+ processEvents(rewritten);
+ };
+ }
+ }
+
+ /** register watch for new sub-dir, but also simulate event for every file & subdir found */
+ private class NewDirectoryScan extends InitialDirectoryScan {
+ protected final List events;
+ protected final Set seenFiles;
+ protected final Set seenDirs;
+ private boolean hasFiles = false;
+ public NewDirectoryScan(Path subRoot, List events, Set seenFiles, Set seenDirs) {
+ super(subRoot);
+ this.events = events;
+ this.seenFiles = seenFiles;
+ this.seenDirs = seenDirs;
+ }
+
+ @Override
+ public FileVisitResult preVisitDirectory(Path subdir, BasicFileAttributes attrs) throws IOException {
+ try {
+ hasFiles = false;
+ if (!seenDirs.contains(subdir)) {
+ if (!subdir.equals(subRoot)) {
+ events.add(new WatchEvent(WatchEvent.Kind.CREATED, root, root.relativize(subdir)));
+ }
+ return super.preVisitDirectory(subdir, attrs);
+ }
+ // our children might have newer results
+ return FileVisitResult.CONTINUE;
+ } finally {
+ seenDirs.add(subdir);
+ }
+ }
+
+ @Override
+ public FileVisitResult postVisitDirectory(Path subdir, IOException exc) throws IOException {
+ if (hasFiles) {
+ events.add(new WatchEvent(WatchEvent.Kind.MODIFIED, root, root.relativize(subdir)));
+ }
+ return super.postVisitDirectory(subdir, exc);
+ }
+
+ @Override
+ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
+ if (!seenFiles.contains(file)) {
+ hasFiles = true;
+
+ var relative = root.relativize(file);
+ events.add(new WatchEvent(WatchEvent.Kind.CREATED, root, relative));
+ if (attrs.size() > 0) {
+ events.add(new WatchEvent(WatchEvent.Kind.MODIFIED, root, relative));
+ }
+ seenFiles.add(file);
+ }
+ return FileVisitResult.CONTINUE;
+ }
+ }
+
+ /** detect directories that aren't tracked yet, and generate events only for new entries */
+ private class OverflowSyncScan extends NewDirectoryScan {
+ private final Deque isNewDirectory = new ArrayDeque<>();
+ public OverflowSyncScan(Path subRoot, List events, Set seenFiles, Set seenDirs) {
+ super(subRoot, events, seenFiles, seenDirs);
+ }
+ @Override
+ public FileVisitResult preVisitDirectory(Path subdir, BasicFileAttributes attrs) throws IOException {
+ if (!activeWatches.containsKey(subdir)) {
+ isNewDirectory.addLast(true);
+ return super.preVisitDirectory(subdir, attrs);
+ }
+ isNewDirectory.addLast(false);
+ return FileVisitResult.CONTINUE;
+ }
+ @Override
+ public FileVisitResult postVisitDirectory(Path subdir, IOException exc) throws IOException {
+ isNewDirectory.removeLast();
+ return super.postVisitDirectory(subdir, exc);
+ }
+ @Override
+ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
+ if (isNewDirectory.peekLast() == Boolean.TRUE || !seenFiles.contains(file)) {
+ return super.visitFile(file, attrs);
+ }
+ return FileVisitResult.CONTINUE;
+ }
+ }
+
+ private void registerInitialWatches(Path dir) throws IOException {
+ Files.walkFileTree(dir, new InitialDirectoryScan(dir));
+ }
+
+ private List registerForNewDirectory(Path dir) {
+ var events = new ArrayList();
+ var seenFiles = new HashSet();
+ var seenDirectories = new HashSet();
+ try {
+ Files.walkFileTree(dir, new NewDirectoryScan(dir, events, seenFiles, seenDirectories));
+ detectedMissingEntries(dir, events, seenFiles, seenDirectories);
+ return events;
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+
+ private List syncAfterOverflow(Path dir) {
+ var events = new ArrayList();
+ var seenFiles = new HashSet();
+ var seenDirectories = new HashSet();
+ try {
+ Files.walkFileTree(dir, new OverflowSyncScan(dir, events, seenFiles, seenDirectories));
+ detectedMissingEntries(dir, events, seenFiles, seenDirectories);
+ return events;
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ private void detectedMissingEntries(Path dir, ArrayList events, HashSet seenFiles, HashSet seenDirectories) throws IOException {
+ // why a second round? well there is a race, between iterating the directory (and sending events)
+ // and when the watches are active. so after we know all the new watches have been registered
+ // we do a second scan and make sure to find paths that weren't visible the first time
+ // and emulate events for them (and register new watches)
+ // In essence this is the same as when an Overflow happened, so we can reuse that handler.
+ int directoryCount = seenDirectories.size() - 1;
+ while (directoryCount != seenDirectories.size()) {
+ Files.walkFileTree(dir, new OverflowSyncScan(dir, events, seenFiles, seenDirectories));
+ directoryCount = seenDirectories.size();
+ }
+ }
+
+
+
+ @Override
+ public void close() throws IOException {
+ IOException firstFail = null;
+ for (var e : activeWatches.entrySet()) {
+ try {
+ e.getValue().close();
+ } catch (IOException ex) {
+ logger.error("Could not close watch", ex);
+ if (firstFail == null) {
+ firstFail = ex;
+ }
+ }
+ catch (Exception ex) {
+ logger.error("Could not close watch", ex);
+ if (firstFail == null) {
+ firstFail = new IOException("Unexpected exception when closing", ex);
+ }
+ }
+ }
+ if (firstFail != null) {
+ throw firstFail;
+ }
+ }
+}
diff --git a/src/main/java/engineering/swat/watch/impl/util/BundledSubscription.java b/src/main/java/engineering/swat/watch/impl/util/BundledSubscription.java
new file mode 100644
index 00000000..ac352996
--- /dev/null
+++ b/src/main/java/engineering/swat/watch/impl/util/BundledSubscription.java
@@ -0,0 +1,112 @@
+package engineering.swat.watch.impl.util;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
+/**
+ * This is an internal class where we can join multiple subscriptions to the same target by only taking 1 actual subscription but forwarding them to all the interested parties.
+ * This is used (for example) to avoid multiple JDKPoller registries for the same path
+ */
+public class BundledSubscription implements ISubscribable {
+ private static final Logger logger = LogManager.getLogger();
+ private final ISubscribable wrapped;
+ private final ConcurrentMap> subscriptions = new ConcurrentHashMap<>();
+
+ public BundledSubscription(ISubscribable wrapped) {
+ this.wrapped = wrapped;
+
+ }
+
+ private static class Subscription implements Consumer {
+ private final List> consumers = new CopyOnWriteArrayList<>();
+ private volatile @MonotonicNonNull Closeable toBeClosed;
+ private volatile boolean closed = false;
+ Subscription() {
+ }
+
+ void add(Consumer newConsumer) {
+ consumers.add(newConsumer);
+ }
+
+ void remove(Consumer existingConsumer) {
+ consumers.remove(existingConsumer);
+ }
+
+ @Override
+ public void accept(R t) {
+ for (var child: consumers) {
+ child.accept(t);
+ }
+ }
+
+ boolean hasActiveConsumers() {
+ return !consumers.isEmpty();
+ }
+ }
+
+ @Override
+ public Closeable subscribe(Key target, Consumer eventListener) throws IOException {
+ while (true) {
+ Subscription active = this.subscriptions.computeIfAbsent(target, t -> new Subscription<>());
+ // after this, there will only be 1 instance of active subscription in the map.
+ // but we might have a race with remove, which can close the subscript between our get and our addition
+ // since this code is very hard to get right without locks, and shouldn't be run too often
+ // we take a big lock around the subscription management
+ synchronized(active) {
+ if (active.closed) {
+ // we lost the race with closing the subscription, so we retry
+ continue;
+ }
+ active.add(eventListener);
+ if (active.toBeClosed == null) {
+ // the watch is not active yet, and we were the first to get the lock
+ active.toBeClosed = wrapped.subscribe(target, active);
+ }
+ }
+ return () -> {
+ boolean scheduleClose = false;
+ synchronized(active) {
+ active.remove(eventListener);
+ scheduleClose = !active.hasActiveConsumers() && !active.closed;
+ }
+ if (scheduleClose) {
+ // to avoid hammering the system with closes & registers in a short periode
+ // we schedule the cleanup of watches in the background, when even after a small delay
+ // nobody is interested in a certain file anymore
+ CompletableFuture
+ .delayedExecutor(100, TimeUnit.MILLISECONDS)
+ .execute(() -> {
+ synchronized(active) {
+ if (!active.hasActiveConsumers() && !active.closed) {
+ // still ready to be closed
+ active.closed = true;
+ this.subscriptions.remove(target, active);
+ if (active.toBeClosed != null) {
+ try {
+ active.toBeClosed.close();
+ } catch (IOException e) {
+ logger.error("Unhandled exception while closing the watcher for {} in the background", target, e);
+ }
+ }
+ }
+ }
+ });
+ }
+ };
+ }
+ }
+
+
+}
diff --git a/src/main/java/engineering/swat/watch/impl/util/ISubscribable.java b/src/main/java/engineering/swat/watch/impl/util/ISubscribable.java
new file mode 100644
index 00000000..5803c96a
--- /dev/null
+++ b/src/main/java/engineering/swat/watch/impl/util/ISubscribable.java
@@ -0,0 +1,10 @@
+package engineering.swat.watch.impl.util;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.function.Consumer;
+
+@FunctionalInterface
+public interface ISubscribable {
+ Closeable subscribe(Key target, Consumer eventListener) throws IOException;
+}
diff --git a/src/main/java/engineering/swat/watch/impl/util/SubscriptionKey.java b/src/main/java/engineering/swat/watch/impl/util/SubscriptionKey.java
new file mode 100644
index 00000000..4afa2ede
--- /dev/null
+++ b/src/main/java/engineering/swat/watch/impl/util/SubscriptionKey.java
@@ -0,0 +1,44 @@
+package engineering.swat.watch.impl.util;
+
+import java.nio.file.Path;
+import java.util.Objects;
+
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public class SubscriptionKey {
+ private final Path path;
+ private final boolean recursive;
+
+ public SubscriptionKey(Path path, boolean recursive) {
+ this.path = path;
+ this.recursive = recursive;
+ }
+
+ public Path getPath() {
+ return path;
+ }
+
+ public boolean isRecursive() {
+ return recursive;
+ }
+
+ @Override
+ public boolean equals(@Nullable Object obj) {
+ if (obj instanceof SubscriptionKey) {
+ var other = (SubscriptionKey)obj;
+ return (other.recursive == recursive)
+ && other.path.equals(path);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(path, recursive);
+ }
+
+ @Override
+ public String toString() {
+ return path.toString() + (recursive ? "[recursive]" : "");
+ }
+}
diff --git a/src/test/java/engineering/swat/watch/DeleteLockTests.java b/src/test/java/engineering/swat/watch/DeleteLockTests.java
new file mode 100644
index 00000000..44861bf0
--- /dev/null
+++ b/src/test/java/engineering/swat/watch/DeleteLockTests.java
@@ -0,0 +1,85 @@
+package engineering.swat.watch;
+
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Comparator;
+
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class DeleteLockTests {
+
+ private TestDirectory testDir;
+
+ @BeforeEach
+ void setup() throws IOException {
+ testDir = new TestDirectory();
+ }
+
+ @AfterEach
+ void cleanup() throws IOException {
+ if (testDir != null) {
+ testDir.close();
+ }
+ }
+
+ @BeforeAll
+ static void setupEverything() {
+ Awaitility.setDefaultTimeout(TestHelper.NORMAL_WAIT);
+ }
+
+
+ @FunctionalInterface
+ private interface Deleter {
+ void run(Path target) throws IOException;
+ }
+
+ private static void recursiveDelete(Path target) throws IOException {
+ try (var paths = Files.walk(target)) {
+ paths.sorted(Comparator.reverseOrder())
+ .map(Path::toFile)
+ .forEach(File::delete);
+ }
+ }
+
+ private void deleteAndVerify(Path target, WatchScope scope) throws IOException {
+ try (var watch = Watcher.watch(target, scope).onEvent(ev -> {}).start()) {
+ recursiveDelete(target);
+ assertFalse(Files.exists(target), "The file/directory shouldn't exist anymore");
+ }
+ }
+
+ @Test
+ void watchedFileCanBeDeleted() throws IOException {
+ deleteAndVerify(
+ testDir.getTestFiles().get(0),
+ WatchScope.PATH_ONLY
+ );
+ }
+
+
+ @Test
+ void watchedDirectoryCanBeDeleted() throws IOException {
+ deleteAndVerify(
+ testDir.getTestDirectory(),
+ WatchScope.PATH_AND_CHILDREN
+ );
+ }
+
+
+ @Test
+ void watchedRecursiveDirectoryCanBeDeleted() throws IOException {
+ deleteAndVerify(
+ testDir.getTestDirectory(),
+ WatchScope.PATH_AND_ALL_DESCENDANTS
+ );
+ }
+}
diff --git a/src/test/java/engineering/swat/watch/RecursiveWatchTests.java b/src/test/java/engineering/swat/watch/RecursiveWatchTests.java
new file mode 100644
index 00000000..7c744fbd
--- /dev/null
+++ b/src/test/java/engineering/swat/watch/RecursiveWatchTests.java
@@ -0,0 +1,120 @@
+package engineering.swat.watch;
+
+import static org.awaitility.Awaitility.await;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import engineering.swat.watch.WatchEvent.Kind;
+
+class RecursiveWatchTests {
+ private final Logger logger = LogManager.getLogger();
+
+ private TestDirectory testDir;
+
+ @BeforeEach
+ void setup() throws IOException {
+ testDir = new TestDirectory();
+ }
+
+ @AfterEach
+ void cleanup() throws IOException {
+ if (testDir != null) {
+ testDir.close();
+ }
+ }
+
+ @BeforeAll
+ static void setupEverything() throws IOException {
+ Awaitility.setDefaultTimeout(TestHelper.NORMAL_WAIT);
+ }
+
+
+ @Test
+ void newDirectoryWithFilesChangesDetected() throws IOException {
+ var target = new AtomicReference();
+ var created = new AtomicBoolean(false);
+ var changed = new AtomicBoolean(false);
+ var watchConfig = Watcher.watch(testDir.getTestDirectory(), WatchScope.PATH_AND_ALL_DESCENDANTS)
+ .onEvent(ev -> {
+ logger.debug("Event received: {}", ev);
+ if (ev.calculateFullPath().equals(target.get())) {
+ switch (ev.getKind()) {
+ case CREATED:
+ created.set(true);
+ break;
+ case MODIFIED:
+ changed.set(true);
+ break;
+ default:
+ break;
+ }
+ }
+ });
+
+ try (var activeWatch = watchConfig.start() ) {
+ var freshFile = Files.createTempDirectory(testDir.getTestDirectory(), "new-dir").resolve("test-file.txt");
+ target.set(freshFile);
+ logger.debug("Interested in: {}", freshFile);
+ Files.writeString(freshFile, "Hello world");
+ await("New files should have been seen").untilTrue(created);
+ Files.writeString(freshFile, "Hello world 2");
+ await("Fresh file change have been detected").untilTrue(changed);
+ }
+ }
+
+ @Test
+ void correctRelativePathIsReported() throws IOException {
+ Path relative = Path.of("a","b", "c", "d.txt");
+ var seen = new AtomicBoolean(false);
+ var watcher = Watcher.watch(testDir.getTestDirectory(), WatchScope.PATH_AND_ALL_DESCENDANTS)
+ .onEvent(ev -> {
+ logger.debug("Seen event: {}", ev);
+ if (ev.getRelativePath().equals(relative)) {
+ seen.set(true);
+ }
+ });
+
+ try (var w = watcher.start()) {
+ var targetFile = testDir.getTestDirectory().resolve(relative);
+ Files.createDirectories(targetFile.getParent());
+ Files.writeString(targetFile, "Hello World");
+ await("Nested path is seen").untilTrue(seen);
+ }
+
+ }
+
+ @Test
+ void deleteOfFileInDirectoryShouldBeVisible() throws IOException, InterruptedException {
+ var target = testDir.getTestFiles()
+ .stream()
+ .filter(p -> !p.getParent().equals(testDir.getTestDirectory()))
+ .findAny()
+ .orElseThrow();
+ var seen = new AtomicBoolean(false);
+ var watchConfig = Watcher.watch(target.getParent(), WatchScope.PATH_AND_CHILDREN)
+ .onEvent(ev -> {
+ if (ev.getKind() == Kind.DELETED && ev.calculateFullPath().equals(target)) {
+ seen.set(true);
+ }
+ });
+ try (var watch = watchConfig.start()) {
+ Files.delete(target);
+ await("File deletion should generate delete event")
+ .untilTrue(seen);
+ }
+ }
+
+}
diff --git a/src/test/java/engineering/swat/watch/SingleDirectoryTests.java b/src/test/java/engineering/swat/watch/SingleDirectoryTests.java
new file mode 100644
index 00000000..9628b900
--- /dev/null
+++ b/src/test/java/engineering/swat/watch/SingleDirectoryTests.java
@@ -0,0 +1,66 @@
+package engineering.swat.watch;
+
+import static org.awaitility.Awaitility.await;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import engineering.swat.watch.WatchEvent.Kind;
+
+public class SingleDirectoryTests {
+ private TestDirectory testDir;
+
+ @BeforeEach
+ void setup() throws IOException {
+ testDir = new TestDirectory();
+ }
+
+ @AfterEach
+ void cleanup() throws IOException {
+ if (testDir != null) {
+ testDir.close();
+ }
+ }
+
+ @BeforeAll
+ static void setupEverything() {
+ Awaitility.setDefaultTimeout(TestHelper.NORMAL_WAIT);
+ }
+
+ @Test
+ void deleteOfFileInDirectoryShouldBeVisible() throws IOException, InterruptedException {
+ var target = testDir.getTestFiles().get(0);
+ var seenDelete = new AtomicBoolean(false);
+ var seenCreate = new AtomicBoolean(false);
+ var watchConfig = Watcher.watch(target.getParent(), WatchScope.PATH_AND_CHILDREN)
+ .onEvent(ev -> {
+ if (ev.getKind() == Kind.DELETED && ev.calculateFullPath().equals(target)) {
+ seenDelete.set(true);
+ }
+ if (ev.getKind() == Kind.CREATED && ev.calculateFullPath().equals(target)) {
+ seenCreate.set(true);
+ }
+ });
+ try (var watch = watchConfig.start()) {
+
+ // Delete the file
+ Files.delete(target);
+ await("File deletion should generate delete event")
+ .untilTrue(seenDelete);
+
+ // Re-create it again
+ Files.writeString(target, "Hello World");
+ await("File creation should generate create event")
+ .untilTrue(seenCreate);
+ }
+ }
+}
diff --git a/src/test/java/engineering/swat/watch/SingleFileTests.java b/src/test/java/engineering/swat/watch/SingleFileTests.java
new file mode 100644
index 00000000..328c7c32
--- /dev/null
+++ b/src/test/java/engineering/swat/watch/SingleFileTests.java
@@ -0,0 +1,94 @@
+package engineering.swat.watch;
+
+import static org.awaitility.Awaitility.await;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.attribute.FileTime;
+import java.time.Instant;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class SingleFileTests {
+ private TestDirectory testDir;
+
+ @BeforeEach
+ void setup() throws IOException {
+ testDir = new TestDirectory();
+ }
+
+ @AfterEach
+ void cleanup() throws IOException {
+ if (testDir != null) {
+ testDir.close();
+ }
+ }
+
+ @BeforeAll
+ static void setupEverything() {
+ Awaitility.setDefaultTimeout(TestHelper.NORMAL_WAIT);
+ }
+
+ @Test
+ void singleFileShouldNotTriggerOnOtherFilesInSameDir() throws IOException, InterruptedException {
+ var target = testDir.getTestFiles().get(0);
+ var seen = new AtomicBoolean(false);
+ var others = new AtomicBoolean(false);
+ var watchConfig = Watcher.watch(target, WatchScope.PATH_ONLY)
+ .onEvent(ev -> {
+ if (ev.calculateFullPath().equals(target)) {
+ seen.set(true);
+ }
+ else {
+ others.set(true);
+ }
+ });
+ try (var watch = watchConfig.start()) {
+ for (var f : testDir.getTestFiles()) {
+ if (!f.equals(target)) {
+ Files.writeString(f, "Hello");
+ }
+ }
+ Thread.sleep(TestHelper.SHORT_WAIT.toMillis());
+ Files.writeString(target, "Hello world");
+ await("Single file does trigger")
+ .pollDelay(TestHelper.NORMAL_WAIT.minusMillis(10))
+ .failFast("No others should be notified", others::get)
+ .untilTrue(seen);
+ }
+ }
+
+ @Test
+ void singleFileThatMonitorsOnlyADirectory() throws IOException, InterruptedException {
+ var target = testDir.getTestDirectory();
+ var seen = new AtomicBoolean(false);
+ var others = new AtomicBoolean(false);
+ var watchConfig = Watcher.watch(target, WatchScope.PATH_ONLY)
+ .onEvent(ev -> {
+ if (ev.calculateFullPath().equals(target)) {
+ seen.set(true);
+ }
+ else {
+ others.set(true);
+ }
+ });
+ try (var watch = watchConfig.start()) {
+ for (var f : testDir.getTestFiles()) {
+ if (!f.equals(target)) {
+ Files.writeString(f, "Hello");
+ }
+ }
+ Thread.sleep(TestHelper.SHORT_WAIT.toMillis());
+ Files.setLastModifiedTime(target, FileTime.from(Instant.now()));
+ await("Single directory does trigger")
+ .pollDelay(TestHelper.NORMAL_WAIT.minusMillis(10))
+ .failFast("No others should be notified", others::get)
+ .untilTrue(seen);
+ }
+ }
+}
diff --git a/src/test/java/engineering/swat/watch/SmokeTests.java b/src/test/java/engineering/swat/watch/SmokeTests.java
new file mode 100644
index 00000000..3f38e484
--- /dev/null
+++ b/src/test/java/engineering/swat/watch/SmokeTests.java
@@ -0,0 +1,92 @@
+package engineering.swat.watch;
+
+
+import static engineering.swat.watch.WatchEvent.Kind.*;
+import static org.awaitility.Awaitility.await;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+
+class SmokeTests {
+ private TestDirectory testDir;
+
+ @BeforeEach
+ void setup() throws IOException {
+ testDir = new TestDirectory();
+ }
+
+ @AfterEach
+ void cleanup() throws IOException {
+ if (testDir != null) {
+ testDir.close();
+ }
+ }
+
+ @BeforeAll
+ static void setupEverything() {
+ Awaitility.setDefaultTimeout(TestHelper.NORMAL_WAIT);
+ }
+
+ @Test
+ void watchDirectory() throws IOException, InterruptedException {
+ var changed = new AtomicBoolean(false);
+ var target = testDir.getTestFiles().get(0);
+ var watchConfig = Watcher.watch(testDir.getTestDirectory(), WatchScope.PATH_AND_CHILDREN)
+ .onEvent(ev -> {if (ev.getKind() == MODIFIED && ev.calculateFullPath().equals(target)) { changed.set(true); }})
+ ;
+
+ try (var activeWatch = watchConfig.start() ) {
+ Files.writeString(target, "Hello world");
+ await("Target file change").untilTrue(changed);
+ }
+ }
+
+ @Test
+ void watchRecursiveDirectory() throws IOException, InterruptedException {
+ var changed = new AtomicBoolean(false);
+ var target = testDir.getTestFiles().stream()
+ .filter(p -> !p.getParent().equals(testDir.getTestDirectory()))
+ .findFirst()
+ .orElseThrow();
+ var watchConfig = Watcher.watch(testDir.getTestDirectory(), WatchScope.PATH_AND_ALL_DESCENDANTS)
+ .onEvent(ev -> { if (ev.getKind() == MODIFIED && ev.calculateFullPath().equals(target)) { changed.set(true);}})
+ ;
+
+ try (var activeWatch = watchConfig.start() ) {
+ Files.writeString(target, "Hello world");
+ await("Nested file change").untilTrue(changed);
+ }
+ }
+
+ @Test
+ void watchSingleFile() throws IOException {
+ var changed = new AtomicBoolean(false);
+ var target = testDir.getTestFiles().stream()
+ .filter(p -> p.getParent().equals(testDir.getTestDirectory()))
+ .findFirst()
+ .orElseThrow();
+
+ var watchConfig = Watcher.watch(target, WatchScope.PATH_ONLY)
+ .onEvent(ev -> {
+ if (ev.calculateFullPath().equals(target)) {
+ changed.set(true);
+ }
+ });
+
+ try (var watch = watchConfig.start()) {
+ Files.writeString(target, "Hello world");
+ await("Single file change").untilTrue(changed);
+ }
+ }
+
+
+}
diff --git a/src/test/java/engineering/swat/watch/TestDirectory.java b/src/test/java/engineering/swat/watch/TestDirectory.java
new file mode 100644
index 00000000..ebd8beb8
--- /dev/null
+++ b/src/test/java/engineering/swat/watch/TestDirectory.java
@@ -0,0 +1,58 @@
+package engineering.swat.watch;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+class TestDirectory implements Closeable {
+ private final Path testDirectory;
+ private final List testFiles;
+
+
+ TestDirectory() throws IOException {
+ testDirectory = Files.createTempDirectory("java-watch-test");
+ List testFiles = new ArrayList<>();
+ add3Files(testFiles, testDirectory);
+ for (var d: Arrays.asList("d1", "d2", "d3")) {
+ Files.createDirectories(testDirectory.resolve(d));
+ add3Files(testFiles, testDirectory.resolve(d));
+ }
+ this.testFiles = Collections.unmodifiableList(testFiles);
+ }
+
+ private static void add3Files(List testFiles, Path root) throws IOException {
+ for (var f : Arrays.asList("a.txt", "b.txt", "c.txt")) {
+ testFiles.add(Files.createFile(root.resolve(f)));
+ }
+ }
+
+ public void deleteAllFiles() throws IOException {
+ try (var files = Files.walk(testDirectory)) {
+ files.sorted(Comparator.reverseOrder())
+ .map(Path::toFile)
+ .forEach(File::delete);
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ deleteAllFiles();
+ } catch (IOException _ignored) { }
+ }
+
+ public Path getTestDirectory() {
+ return testDirectory;
+ }
+
+ public List getTestFiles() {
+ return testFiles;
+ }
+}
diff --git a/src/test/java/engineering/swat/watch/TestHelper.java b/src/test/java/engineering/swat/watch/TestHelper.java
new file mode 100644
index 00000000..717d5818
--- /dev/null
+++ b/src/test/java/engineering/swat/watch/TestHelper.java
@@ -0,0 +1,30 @@
+package engineering.swat.watch;
+
+import java.time.Duration;
+
+public class TestHelper {
+
+ public static final Duration SHORT_WAIT;
+ public static final Duration NORMAL_WAIT;
+ public static final Duration LONG_WAIT;
+
+ static {
+ var delayFactorConfig = System.getenv("DELAY_FACTOR");
+ int delayFactor = delayFactorConfig == null ? 1 : Integer.parseInt(delayFactorConfig);
+ var os = System.getProperty("os", "?").toLowerCase();
+ if (os.contains("mac")) {
+ // OSX is SLOW on it's watches
+ delayFactor *= 2;
+ }
+ else if (os.contains("win")) {
+ // windows watches can be slow to get everything
+ // published
+ // especially on small core systems
+ delayFactor *= 4;
+ }
+ SHORT_WAIT = Duration.ofSeconds(1 * delayFactor);
+ NORMAL_WAIT = Duration.ofSeconds(4 * delayFactor);
+ LONG_WAIT = Duration.ofSeconds(8 * delayFactor);
+ }
+
+}
diff --git a/src/test/java/engineering/swat/watch/TortureTests.java b/src/test/java/engineering/swat/watch/TortureTests.java
new file mode 100644
index 00000000..c2812895
--- /dev/null
+++ b/src/test/java/engineering/swat/watch/TortureTests.java
@@ -0,0 +1,387 @@
+package engineering.swat.watch;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.time.LocalTime;
+import java.util.Random;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
+
+import engineering.swat.watch.WatchEvent.Kind;
+
+class TortureTests {
+
+ private final Logger logger = LogManager.getLogger();
+
+ private TestDirectory testDir;
+
+ @BeforeEach
+ void setup() throws IOException {
+ testDir = new TestDirectory();
+ }
+
+ @AfterEach
+ void cleanup() throws IOException {
+ if (testDir != null) {
+ testDir.close();
+ }
+ }
+
+ private final class IOGenerator {
+ private final Set pathsWritten = ConcurrentHashMap.newKeySet();
+ private final Semaphore startRunning = new Semaphore(0);
+ private final Semaphore stopRunning = new Semaphore(0);
+ private final Semaphore done = new Semaphore(0);
+ private final int jobs;
+
+ IOGenerator(int jobs, Path root, Executor exec) {
+ this.jobs = jobs;
+ for (int j = 0; j < jobs; j++) {
+ startJob(root.resolve("run" + j), new Random(j), exec);
+ }
+ }
+
+ private final static int BURST_SIZE = 1000;
+
+ private void startJob(final Path root, Random r, Executor exec) {
+ exec.execute(() -> {
+ try {
+ startRunning.acquire();
+ var end = LocalTime.now().plus(TestHelper.NORMAL_WAIT.multipliedBy(2));
+ while (!stopRunning.tryAcquire(100, TimeUnit.MICROSECONDS)) {
+ if (LocalTime.now().isAfter(end)) {
+ break;
+ }
+ try {
+ // burst a bunch of creates and then sleep a bit
+ for (int i = 0; i< BURST_SIZE; i++) {
+ var file = root.resolve("l1-" + r.nextInt(1000))
+ .resolve("l2-" + r.nextInt(100))
+ .resolve("l3-" + r.nextInt() + ".txt");
+ Files.createDirectories(file.getParent());
+ Files.writeString(file, "Hello world");
+ pathsWritten.add(file);
+ }
+ } catch (IOException e) {
+ }
+ Thread.yield();
+ }
+ } catch (InterruptedException e) {
+ }
+ finally {
+ done.release();
+ }
+ });
+ }
+
+ void start() {
+ startRunning.release(jobs);
+ }
+
+ Set stop() throws InterruptedException {
+ stopRunning.release(jobs);
+ startRunning.release(jobs);
+ assertTrue(done.tryAcquire(jobs, TestHelper.NORMAL_WAIT.toMillis() * 2, TimeUnit.MILLISECONDS), "IO workers should stop in a reasonable time");
+ return pathsWritten;
+ }
+ }
+
+ private static final int THREADS = 4;
+
+ @Test
+ void pressureOnFSShouldNotMissNewFilesAnything() throws InterruptedException, IOException {
+ final var root = testDir.getTestDirectory();
+ var pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 4);
+
+ var io = new IOGenerator(THREADS, root, pool);
+
+
+ var seenCreates = ConcurrentHashMap.newKeySet();
+ var watchConfig = Watcher.watch(testDir.getTestDirectory(), WatchScope.PATH_AND_ALL_DESCENDANTS)
+ .withExecutor(pool)
+ .onEvent(ev -> {
+ var fullPath = ev.calculateFullPath();
+ switch (ev.getKind()) {
+ case CREATED:
+ seenCreates.add(fullPath);
+ break;
+ case MODIFIED:
+ // platform specific if this comes by or not
+ break;
+ default:
+ logger.error("Unexpected event: {}", ev);
+ break;
+ }
+ });
+
+ Set pathsWritten;
+
+ try (var activeWatch = watchConfig.start() ) {
+ logger.info("Starting {} jobs", THREADS);
+ io.start();
+ // now we generate a whole bunch of events
+ Thread.sleep(TestHelper.NORMAL_WAIT.toMillis());
+ logger.info("Stopping jobs");
+ pathsWritten = io.stop();
+ logger.info("Generated: {} files", pathsWritten.size());
+
+ await("After a while we should have seen all the create events")
+ .timeout(TestHelper.LONG_WAIT.multipliedBy(50))
+ .pollInterval(Duration.ofMillis(500))
+ .until(() -> seenCreates.containsAll(pathsWritten));
+ }
+ catch (Exception ex) {
+ logger.catching(ex);
+ throw ex;
+
+ }
+ finally {
+ try {
+ logger.info("stopping IOGenerator");
+ io.stop();
+ }
+ catch (Throwable _ignored) {}
+ logger.info("Shutting down pool");
+ // shutdown the pool (so no new events are registered)
+ pool.shutdown();
+ }
+ }
+
+ private final int TORTURE_REGISTRATION_THREADS = THREADS * 500;
+
+ @RepeatedTest(failureThreshold=1, value = 20)
+ void manyRegistrationsForSamePath() throws InterruptedException, IOException {
+ var startRegistering = new Semaphore(0);
+ var startedWatching = new Semaphore(0);
+ var startDeregistring = new Semaphore(0);
+ var done = new Semaphore(0);
+ var seen = ConcurrentHashMap.newKeySet();
+ var exceptions = new LinkedBlockingDeque();
+
+ for (int t = 0; t < TORTURE_REGISTRATION_THREADS; t++) {
+ var r = new Thread(() -> {
+ try {
+ var watcher = Watcher
+ .watch(testDir.getTestDirectory(), WatchScope.PATH_AND_CHILDREN)
+ .onEvent(e -> seen.add(e.calculateFullPath()));
+ startRegistering.acquire();
+ try (var c = watcher.start()) {
+ startedWatching.release();
+ startDeregistring.acquire();
+ }
+ catch(Exception e) {
+ startedWatching.release();
+ exceptions.push(e);
+ }
+ } catch (InterruptedException e1) {
+ }
+ finally {
+ done.release();
+ }
+ });
+ r.setDaemon(true);
+ r.start();
+ }
+
+ try {
+ startRegistering.release(TORTURE_REGISTRATION_THREADS);
+ startDeregistring.release(TORTURE_REGISTRATION_THREADS - 1);
+ startedWatching.acquire(TORTURE_REGISTRATION_THREADS); // make sure they are all started
+ done.acquire(TORTURE_REGISTRATION_THREADS - 1);
+ assertTrue(seen.isEmpty(), "No events should have been sent");
+ var target = testDir.getTestDirectory().resolve("test124.txt");
+ //logger.info("Writing: {}", target);
+ Files.writeString(target, "Hello World");
+ var expected = Collections.singleton(target);
+ await("We should see only one event")
+ .failFast(() -> !exceptions.isEmpty())
+ .timeout(TestHelper.LONG_WAIT)
+ .pollInterval(Duration.ofMillis(10))
+ .until(() -> seen, expected::equals);
+ if (!exceptions.isEmpty()) {
+ fail(exceptions.pop());
+ }
+ }
+ finally {
+ startDeregistring.release(TORTURE_REGISTRATION_THREADS);
+ }
+ }
+
+ @RepeatedTest(failureThreshold=1, value = 20)
+ void manyRegisterAndUnregisterSameTime() throws InterruptedException, IOException {
+ var startRegistering = new Semaphore(0);
+ var startedWatching = new Semaphore(0);
+ var stopAll = new Semaphore(0);
+ var done = new Semaphore(0);
+ var seen = ConcurrentHashMap.newKeySet();
+ var exceptions = new LinkedBlockingDeque();
+ var target = testDir.getTestDirectory().resolve("test124.txt");
+ int amountOfWatchersActive = 0;
+ try {
+ for (int t = 0; t < THREADS; t++) {
+ final boolean finishWatching = t % 2 == 0;
+ if (finishWatching) {
+ amountOfWatchersActive++;
+ }
+ var r = new Thread(() -> {
+ try {
+ var id = Thread.currentThread().getId();
+ startRegistering.acquire();
+ for (int k = 0; k < 1000; k++) {
+ var watcher = Watcher
+ .watch(testDir.getTestDirectory(), WatchScope.PATH_AND_CHILDREN)
+ .onEvent(e -> {
+ if (e.calculateFullPath().equals(target)) {
+ seen.add(id);
+ }
+ });
+ try (var c = watcher.start()) {
+ if (finishWatching && k + 1 == 1000) {
+ startedWatching.release();
+ stopAll.acquire();
+ }
+ }
+ catch(Exception e) {
+ exceptions.push(e);
+ }
+ }
+ } catch (InterruptedException e1) {
+ }
+ finally {
+ done.release();
+ }
+ });
+ r.setDaemon(true);
+ r.start();
+ }
+
+ startRegistering.release(THREADS);
+ done.acquire(THREADS - amountOfWatchersActive);
+ startedWatching.acquire(amountOfWatchersActive);
+ assertTrue(seen.isEmpty(), "No events should have been sent");
+ Files.writeString(target, "Hello World");
+ await("We should see only exactly the " + amountOfWatchersActive + " events we expect")
+ .failFast(() -> !exceptions.isEmpty())
+ .pollDelay(TestHelper.NORMAL_WAIT.minusMillis(100))
+ .until(seen::size, Predicate.isEqual(amountOfWatchersActive))
+ ;
+ if (!exceptions.isEmpty()) {
+ fail(exceptions.pop());
+ }
+ }
+ finally {
+ stopAll.release(amountOfWatchersActive);
+ }
+
+ }
+
+
+
+ @Test
+ //Deletes can race the filesystem, so you might miss a few files in a dir, if that dir is already deleted
+ @EnabledIfEnvironmentVariable(named="TORTURE_DELETE", matches="true")
+ void pressureOnFSShouldNotMissDeletes() throws InterruptedException, IOException {
+ final var root = testDir.getTestDirectory();
+ var pool = Executors.newCachedThreadPool();
+
+ Set pathsWritten;
+ var seenDeletes = ConcurrentHashMap.newKeySet();
+ var io = new IOGenerator(THREADS, root, pool);
+ try {
+ io.start();
+ Thread.sleep(TestHelper.NORMAL_WAIT.toMillis());
+ pathsWritten = io.stop();
+
+ final var events = new AtomicInteger(0);
+ final var happened = new Semaphore(0);
+ var watchConfig = Watcher.watch(testDir.getTestDirectory(), WatchScope.PATH_AND_ALL_DESCENDANTS)
+ .withExecutor(pool)
+ .onEvent(ev -> {
+ events.getAndIncrement();
+ happened.release();
+ var fullPath = ev.calculateFullPath();
+ switch (ev.getKind()) {
+ case DELETED:
+ seenDeletes.add(fullPath);
+ break;
+ case MODIFIED:
+ // happens on dir level, as the files are getting removed
+ break;
+ default:
+ logger.error("Unexpected event: {}", ev);
+ break;
+ }
+ });
+
+ try (var activeWatch = watchConfig.start() ) {
+ logger.info("Deleting files now", THREADS);
+ testDir.deleteAllFiles();
+ logger.info("Waiting for the events processing to stabilize");
+ waitForStable(events, happened);
+ }
+ }
+ finally {
+ try {
+ io.stop();
+ }
+ catch (Throwable _ignored) {}
+ // shutdown the pool (so no new events are registered)
+ pool.shutdown();
+ }
+
+ // but wait till all scheduled tasks have been completed
+ pool.awaitTermination(10, TimeUnit.SECONDS);
+
+ logger.info("Comparing events and files seen");
+ // now make sure that the two sets are the same
+ for (var f : pathsWritten) {
+ assertTrue(seenDeletes.contains(f), () -> "Missing delete event for: " + f);
+ }
+ }
+
+
+
+ private void waitForStable(final AtomicInteger events, final Semaphore happened) throws InterruptedException {
+ int lastEventCount = events.get();
+ int stableCount = 0;
+ do {
+ Thread.yield();
+ while (happened.tryAcquire(TestHelper.SHORT_WAIT.toMillis() * 2, TimeUnit.MILLISECONDS)) {
+ happened.drainPermits();
+ }
+
+ int currentEventCounts = events.get();
+ if (currentEventCounts == lastEventCount) {
+ stableCount++;
+ }
+ else {
+ lastEventCount = currentEventCounts;
+ stableCount = 0;
+ }
+ } while (stableCount < 60);
+ logger.info("Stable after: {} events", lastEventCount);
+ }
+}
diff --git a/src/test/java/engineering/swat/watch/impl/BundlingTests.java b/src/test/java/engineering/swat/watch/impl/BundlingTests.java
new file mode 100644
index 00000000..916a6401
--- /dev/null
+++ b/src/test/java/engineering/swat/watch/impl/BundlingTests.java
@@ -0,0 +1,149 @@
+package engineering.swat.watch.impl;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.awaitility.Awaitility;
+import org.hamcrest.core.IsEqual;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.Test;
+
+import engineering.swat.watch.TestHelper;
+import engineering.swat.watch.impl.util.BundledSubscription;
+import engineering.swat.watch.impl.util.ISubscribable;
+
+public class BundlingTests {
+
+ private final Logger logger = LogManager.getLogger();
+ private BundledSubscription target;
+ private FakeSubscribable fakeSubs;
+
+ private static class FakeSubscribable implements ISubscribable {
+ private final Map> subs = new ConcurrentHashMap<>();
+
+ @Override
+ public Closeable subscribe(Long target, Consumer eventListener) throws IOException {
+ subs.put(target, eventListener);
+ return () -> {
+ subs.remove(target, eventListener);
+ };
+ }
+
+ void publish(Long x) {
+ var s = subs.get(x);
+ if (s != null) {
+ s.accept(true);
+ }
+ }
+ };
+
+
+ @BeforeEach
+ void setup() {
+ fakeSubs = new FakeSubscribable();
+ target = new BundledSubscription<>(fakeSubs);
+ }
+
+ @BeforeAll
+ static void setupEverything() {
+ Awaitility.setDefaultTimeout(TestHelper.LONG_WAIT.getSeconds(), TimeUnit.SECONDS);
+ }
+
+ private static final int SUBs = 100;
+ private static final long MSGs = 100_000;
+
+ @Test
+ void manySubscriptions() throws IOException {
+ AtomicInteger hits = new AtomicInteger();
+ List closers = new ArrayList<>();
+
+ for (int i = 0; i < MSGs; i++) {
+ for (int j = 0; j < SUBs; j++) {
+ closers.add(target.subscribe(Long.valueOf(i), b -> hits.incrementAndGet()));
+ }
+ }
+
+ logger.info("Sending single message");
+ fakeSubs.publish(Long.valueOf(0));
+ assertEquals(SUBs, hits.get());
+ logger.info("Sending all messages");
+ hits.set(0);
+ for (int i = 0; i < MSGs; i++) {
+ fakeSubs.publish(Long.valueOf(i));
+ }
+ assertEquals(SUBs * MSGs, hits.get());
+
+ logger.info("Clearing subs in parallel");
+ for (var clos : closers) {
+ CompletableFuture.runAsync(() -> {
+ try {
+ clos.close();
+ } catch (IOException e) {
+ logger.catching(e);
+ }
+ });
+ }
+
+ await("Closing should finish")
+ .until(fakeSubs.subs::isEmpty);
+ logger.info("Done clearing");
+
+
+ }
+
+ @RepeatedTest(failureThreshold = 1, value=50)
+ void parallelSubscriptions() throws IOException, InterruptedException {
+ var hits = new AtomicInteger();
+ var endPointReached = new Semaphore(0);
+ var waitingForClose = new Semaphore(0);
+ var done = new Semaphore(0);
+
+ int active = 0;
+ for (int j = 0; j < SUBs; j++) {
+ boolean keepAround = j % 2 == 0;
+ if (keepAround) {
+ active++;
+ }
+ var t = new Thread(() -> {
+ for (int k =0; k < 1000; k++) {
+ try (var c = target.subscribe(Long.valueOf(0), b -> hits.incrementAndGet())) {
+ if (keepAround && k + 1 == 1000) {
+ endPointReached.release();
+ waitingForClose.acquire();
+ }
+ } catch (Exception ignored) {
+ logger.catching(ignored);
+ }
+ }
+ done.release();
+ });
+ t.setDaemon(true);
+ t.start();
+ }
+
+ endPointReached.acquire(active);
+ done.acquire(SUBs - active);
+ fakeSubs.publish(Long.valueOf(0));
+
+ await("Subscriptions should have hit")
+ .untilAtomic(hits, IsEqual.equalTo(active));
+ waitingForClose.release(active);
+ }
+
+}
diff --git a/src/test/resources/log4j2-test.xml b/src/test/resources/log4j2-test.xml
new file mode 100644
index 00000000..a6df566b
--- /dev/null
+++ b/src/test/resources/log4j2-test.xml
@@ -0,0 +1,16 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+