diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml
index 21fa1ca7..01ee331f 100644
--- a/.github/workflows/build.yaml
+++ b/.github/workflows/build.yaml
@@ -8,6 +8,7 @@ on:
pull_request:
branches:
- main
+ - improved-overflow-support-main
jobs:
test:
diff --git a/README.md b/README.md
index a51c36d7..2b544e96 100644
--- a/README.md
+++ b/README.md
@@ -7,13 +7,14 @@ a java file watcher that works across platforms and supports recursion, single f
Features:
- monitor a single file (or directory) for changes
-- monitor a directory for changes to it's direct descendants
-- monitor a directory for changes for all it's descendants (aka recursive directory watch)
+- monitor a directory for changes to its direct descendants
+- monitor a directory for changes for all its descendants (aka recursive directory watch)
- edge cases dealt with:
- - in case of overflow we will still generate events for new descendants
- recursive watches will also continue in new directories
- multiple watches for the same directory are merged to avoid overloading the kernel
- events are processed in a configurable worker pool
+ - when an overflow happens, automatically approximate the events that were
+ missed using a configurable approximation policy
Planned features:
@@ -39,6 +40,7 @@ Start using java-watch:
var directory = Path.of("tmp", "test-dir");
var watcherSetup = Watcher.watch(directory, WatchScope.PATH_AND_CHILDREN)
.withExecutor(Executors.newCachedThreadPool()) // optionally configure a custom thread pool
+ .onOverflow(Approximation.DIFF) // optionally configure a handler for overflows
.on(watchEvent -> {
System.err.println(watchEvent);
});
diff --git a/src/main/java/engineering/swat/watch/ActiveWatch.java b/src/main/java/engineering/swat/watch/ActiveWatch.java
index 36f0a4b9..ba8899ed 100644
--- a/src/main/java/engineering/swat/watch/ActiveWatch.java
+++ b/src/main/java/engineering/swat/watch/ActiveWatch.java
@@ -27,12 +27,22 @@
package engineering.swat.watch;
import java.io.Closeable;
+import java.nio.file.Path;
/**
- *
Marker interface for an active watch, in the future might get properties you can inspect.
+ * Marker interface for an active watch, in the future might get more properties you can inspect.
*
- * For now, make sure to close the watch when not interested in new events
+ * For now, make sure to close the watch when not interested in new events.
*/
public interface ActiveWatch extends Closeable {
+ /**
+ * Gets the path watched by this watch.
+ */
+ Path getPath();
+
+ /**
+ * Gets the scope of this watch.
+ */
+ WatchScope getScope();
}
diff --git a/src/main/java/engineering/swat/watch/Approximation.java b/src/main/java/engineering/swat/watch/Approximation.java
new file mode 100644
index 00000000..1bf3fd6d
--- /dev/null
+++ b/src/main/java/engineering/swat/watch/Approximation.java
@@ -0,0 +1,108 @@
+/*
+ * BSD 2-Clause License
+ *
+ * Copyright (c) 2023, Swat.engineering
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+ * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+ * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+ * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+ * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+package engineering.swat.watch;
+
+/**
+ * Constants to indicate for which regular files/directories in the scope of the
+ * watch an approximation of synthetic events (of kinds
+ * {@link WatchEvent.Kind#CREATED}, {@link WatchEvent.Kind#MODIFIED}, and/or
+ * {@link WatchEvent.Kind#DELETED}) should be issued when an overflow event
+ * happens. These synthetic events, as well as the overflow event itself, are
+ * subsequently passed to the user-defined event handler of the watch.
+ * Typically, the user-defined event handler can ignore the original overflow
+ * event (i.e., handling the synthetic events is sufficient to address the
+ * overflow issue), but it doesn't have to (e.g., it may carry out additional
+ * overflow bookkeeping).
+ */
+public enum Approximation {
+
+ /**
+ * Synthetic events are issued for no regular files/directories in
+ * the scope of the watch. Thus, the user-defined event handler is fully
+ * responsible to handle overflow events.
+ */
+ NONE,
+
+ /**
+ *
+ * Synthetic events of kinds {@link WatchEvent.Kind#CREATED} and
+ * {@link WatchEvent.Kind#MODIFIED}, but not
+ * {@link WatchEvent.Kind#DELETED}, are issued for all regular
+ * files/directories in the scope of the watch. Specifically, when an
+ * overflow event happens:
+ *
+ *
+ * - CREATED events are issued for all regular files/directories
+ * (overapproximation).
+ *
- MODIFIED events are issued for all non-empty, regular files
+ * (overapproximation) but for no directories (underapproximation).
+ *
- DELETED events are issued for no regular files/directories
+ * (underapproximation).
+ *
+ *
+ *
+ * This approach is relatively cheap in terms of memory usage (cf.
+ * {@link #DIFF}), but it results in a large over/underapproximation of the
+ * actual events (cf. DIFF).
+ */
+ ALL,
+
+
+ /**
+ *
+ * Synthetic events of kinds {@link WatchEvent.Kind#CREATED},
+ * {@link WatchEvent.Kind#MODIFIED}, and {@link WatchEvent.Kind#DELETED} are
+ * issued for regular files/directories in the scope of the watch, when
+ * their current versions are different from their previous versions, as
+ * determined using last-modified-times. Specifically, when an
+ * overflow event happens:
+ *
+ *
+ * - CREATED events are issued for all regular files/directories when the
+ * previous last-modified-time is unknown, but the current
+ * last-modified-time is known (i.e., the file started existing).
+ *
- MODIFIED events are issued for all regular files/directories when the
+ * previous last-modified-time is before the current last-modified-time.
+ *
- DELETED events are issued for all regular files/directories when the
+ * previous last-modified-time is known, but the current
+ * last-modified-time is unknown (i.e., the file stopped existing).
+ *
+ *
+ *
+ * To keep track of last-modified-times, an internal index is
+ * populated with last-modified-times of all regular files/directories in
+ * the scope of the watch when the watch is started. Each time when any
+ * event happens, the index is updated accordingly, so when an overflow
+ * event happens, last-modified-times can be compared as described above.
+ *
+ *
+ * This approach results in a small overapproximation (cf. {@link #ALL}),
+ * but it is relatively expensive in terms of memory usage (cf. ALL), as the
+ * watch needs to keep track of last-modified-times.
+ */
+ DIFF
+}
diff --git a/src/main/java/engineering/swat/watch/WatchEvent.java b/src/main/java/engineering/swat/watch/WatchEvent.java
index 4d4874d5..bb879ef5 100644
--- a/src/main/java/engineering/swat/watch/WatchEvent.java
+++ b/src/main/java/engineering/swat/watch/WatchEvent.java
@@ -68,10 +68,16 @@ public enum Kind {
private final Path rootPath;
private final Path relativePath;
+ private static final Path EMPTY_PATH = Path.of("");
+
+ public WatchEvent(Kind kind, Path rootPath) {
+ this(kind, rootPath, null);
+ }
+
public WatchEvent(Kind kind, Path rootPath, @Nullable Path relativePath) {
this.kind = kind;
this.rootPath = rootPath;
- this.relativePath = relativePath == null ? Path.of("") : relativePath;
+ this.relativePath = relativePath == null ? EMPTY_PATH : relativePath;
}
public Kind getKind() {
@@ -101,6 +107,20 @@ public Path calculateFullPath() {
return rootPath.resolve(relativePath);
}
+ /**
+ * @return The file name of the full path of this event, or {@code null} if
+ * it has zero elements (cf. {@link Path#getFileName()}), but without
+ * calculating the full path. This method is equivalent to, but more
+ * efficient than, {@code calculateFullPath().getFileName()}.
+ */
+ public @Nullable Path getFileName() {
+ var fileName = relativePath.getFileName();
+ if (fileName == null || fileName.equals(EMPTY_PATH)) {
+ fileName = rootPath.getFileName();
+ }
+ return fileName;
+ }
+
@Override
public String toString() {
return String.format("WatchEvent[%s, %s, %s]", this.rootPath, this.kind, this.relativePath);
diff --git a/src/main/java/engineering/swat/watch/Watcher.java b/src/main/java/engineering/swat/watch/Watcher.java
index c176dfdb..c6a094de 100644
--- a/src/main/java/engineering/swat/watch/Watcher.java
+++ b/src/main/java/engineering/swat/watch/Watcher.java
@@ -32,14 +32,19 @@
import java.nio.file.Path;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
+import java.util.function.BiConsumer;
import java.util.function.Consumer;
+import java.util.function.Predicate;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import engineering.swat.watch.impl.EventHandlingWatch;
import engineering.swat.watch.impl.jdk.JDKDirectoryWatch;
+import engineering.swat.watch.impl.jdk.JDKFileTreeWatch;
import engineering.swat.watch.impl.jdk.JDKFileWatch;
-import engineering.swat.watch.impl.jdk.JDKRecursiveDirectoryWatch;
+import engineering.swat.watch.impl.overflows.IndexingRescanner;
+import engineering.swat.watch.impl.overflows.MemorylessRescanner;
/**
*
Watch a path for changes.
@@ -50,17 +55,19 @@
*/
public class Watcher {
private final Logger logger = LogManager.getLogger();
- private final WatchScope scope;
private final Path path;
+ private final WatchScope scope;
+ private volatile Approximation approximateOnOverflow = Approximation.ALL;
private volatile Executor executor = CompletableFuture::runAsync;
- private static final Consumer EMPTY_HANDLER = p -> {};
- private volatile Consumer eventHandler = EMPTY_HANDLER;
-
+ private static final BiConsumer EMPTY_HANDLER = (w, e) -> {};
+ private volatile BiConsumer eventHandler = EMPTY_HANDLER;
+ private static final Predicate TRUE_FILTER = e -> true;
+ private volatile Predicate eventFilter = TRUE_FILTER;
- private Watcher(WatchScope scope, Path path) {
- this.scope = scope;
+ private Watcher(Path path, WatchScope scope) {
this.path = path;
+ this.scope = scope;
}
/**
@@ -87,9 +94,8 @@ public static Watcher watch(Path path, WatchScope scope) {
break;
default:
throw new IllegalArgumentException("Unsupported scope: " + scope);
-
}
- return new Watcher(scope, path);
+ return new Watcher(path, scope);
}
/**
@@ -103,7 +109,7 @@ public Watcher on(Consumer eventHandler) {
if (this.eventHandler != EMPTY_HANDLER) {
throw new IllegalArgumentException("on handler cannot be set more than once");
}
- this.eventHandler = eventHandler;
+ this.eventHandler = (w, e) -> eventHandler.accept(e);
return this;
}
@@ -114,7 +120,7 @@ public Watcher on(WatchEventListener listener) {
if (this.eventHandler != EMPTY_HANDLER) {
throw new IllegalArgumentException("on handler cannot be set more than once");
}
- this.eventHandler = ev -> {
+ this.eventHandler = (w, ev) -> {
switch (ev.getKind()) {
case CREATED:
listener.onCreated(ev);
@@ -135,6 +141,22 @@ public Watcher on(WatchEventListener listener) {
return this;
}
+ /**
+ * Configures the event filter to determine which events should be passed to
+ * the event handler. By default (without calling this method), all events
+ * are passed. This method must be called at most once.
+ * @param predicate The predicate to determine an event should be kept
+ * ({@code true}) or dropped ({@code false})
+ * @return {@code this} (to support method chaining)
+ */
+ Watcher filter(Predicate predicate) {
+ if (this.eventFilter != TRUE_FILTER) {
+ throw new IllegalArgumentException("filter cannot be set more than once");
+ }
+ this.eventFilter = predicate;
+ return this;
+ }
+
/**
* Optionally configure the executor in which the {@link #on(Consumer)} callbacks are scheduled.
* If not defined, every task will be scheduled on the {@link java.util.concurrent.ForkJoinPool#commonPool()}.
@@ -146,6 +168,22 @@ public Watcher withExecutor(Executor callbackHandler) {
return this;
}
+ /**
+ * Optionally configure which regular files/directories in the scope of the
+ * watch an approximation of synthetic events (of kinds
+ * {@link WatchEvent.Kind#CREATED}, {@link WatchEvent.Kind#MODIFIED}, and/or
+ * {@link WatchEvent.Kind#DELETED}) should be issued when an overflow event
+ * happens. If not defined before this watcher is started, the
+ * {@link Approximation#ALL} approach will be used.
+ * @param whichFiles Constant to indicate for which regular
+ * files/directories to approximate
+ * @return This watcher for optional method chaining
+ */
+ public Watcher onOverflow(Approximation whichFiles) {
+ this.approximateOnOverflow = whichFiles;
+ return this;
+ }
+
/**
* Start watch the path for events.
* @return a subscription for the watch, when closed, new events will stop being registered to the worker pool.
@@ -157,28 +195,30 @@ public ActiveWatch start() throws IOException {
throw new IllegalStateException("There is no onEvent handler defined");
}
+ var h = applyApproximateOnOverflow();
+
switch (scope) {
case PATH_AND_CHILDREN: {
- var result = new JDKDirectoryWatch(path, executor, eventHandler, false);
+ var result = new JDKDirectoryWatch(path, executor, h, eventFilter);
result.open();
return result;
}
case PATH_AND_ALL_DESCENDANTS: {
try {
- var result = new JDKDirectoryWatch(path, executor, eventHandler, true);
+ var result = new JDKDirectoryWatch(path, executor, h, eventFilter, true);
result.open();
return result;
} catch (Throwable ex) {
// no native support, use the simulation
logger.debug("Not possible to register the native watcher, using fallback for {}", path);
logger.trace(ex);
- var result = new JDKRecursiveDirectoryWatch(path, executor, eventHandler);
+ var result = new JDKFileTreeWatch(path, executor, h, eventFilter);
result.open();
return result;
}
}
case PATH_ONLY: {
- var result = new JDKFileWatch(path, executor, eventHandler);
+ var result = new JDKFileWatch(path, executor, h, eventFilter);
result.open();
return result;
}
@@ -186,4 +226,17 @@ public ActiveWatch start() throws IOException {
throw new IllegalStateException("Not supported yet");
}
}
+
+ private BiConsumer applyApproximateOnOverflow() {
+ switch (approximateOnOverflow) {
+ case NONE:
+ return eventHandler;
+ case ALL:
+ return eventHandler.andThen(new MemorylessRescanner(executor));
+ case DIFF:
+ return eventHandler.andThen(new IndexingRescanner(executor, path, scope));
+ default:
+ throw new UnsupportedOperationException("No event handler has been defined yet for this overflow policy");
+ }
+ }
}
diff --git a/src/main/java/engineering/swat/watch/impl/EventHandlingWatch.java b/src/main/java/engineering/swat/watch/impl/EventHandlingWatch.java
new file mode 100644
index 00000000..525a09eb
--- /dev/null
+++ b/src/main/java/engineering/swat/watch/impl/EventHandlingWatch.java
@@ -0,0 +1,55 @@
+/*
+ * BSD 2-Clause License
+ *
+ * Copyright (c) 2023, Swat.engineering
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+ * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+ * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+ * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+ * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+package engineering.swat.watch.impl;
+
+import engineering.swat.watch.ActiveWatch;
+import engineering.swat.watch.WatchEvent;
+
+public interface EventHandlingWatch extends ActiveWatch {
+
+ /**
+ * Handles `event`. The purpose of this method is to trigger the event
+ * handler of this watch "from the outside" (in addition to having native
+ * file system libraries trigger the event handler "from the inside"). This
+ * is useful to report synthetic events (e.g., while handling overflows).
+ */
+ void handleEvent(WatchEvent event);
+
+ /**
+ * Relativizes the full path of `event` against the path watched by this
+ * watch (as per `getPath()`). Returns a new event whose root path and
+ * relative path are set in accordance with the relativization.
+ */
+ default WatchEvent relativize(WatchEvent event) {
+ var fullPath = event.calculateFullPath();
+
+ var kind = event.getKind();
+ var rootPath = getPath();
+ var relativePath = rootPath.relativize(fullPath);
+ return new WatchEvent(kind, rootPath, relativePath);
+ }
+}
diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKBaseWatch.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKBaseWatch.java
index fb90755a..e4357012 100644
--- a/src/main/java/engineering/swat/watch/impl/jdk/JDKBaseWatch.java
+++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKBaseWatch.java
@@ -31,27 +31,33 @@
import java.nio.file.StandardWatchEventKinds;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Consumer;
+import java.util.function.BiConsumer;
+import java.util.function.Predicate;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.checkerframework.checker.nullness.qual.Nullable;
-import engineering.swat.watch.ActiveWatch;
import engineering.swat.watch.WatchEvent;
+import engineering.swat.watch.impl.EventHandlingWatch;
-public abstract class JDKBaseWatch implements ActiveWatch {
+public abstract class JDKBaseWatch implements EventHandlingWatch {
private final Logger logger = LogManager.getLogger();
protected final Path path;
protected final Executor exec;
- protected final Consumer eventHandler;
+ protected final BiConsumer eventHandler;
+ protected final Predicate eventFilter;
protected final AtomicBoolean started = new AtomicBoolean();
- protected JDKBaseWatch(Path path, Executor exec, Consumer eventHandler) {
+ protected JDKBaseWatch(Path path, Executor exec,
+ BiConsumer eventHandler,
+ Predicate eventFilter) {
+
this.path = path;
this.exec = exec;
this.eventHandler = eventHandler;
+ this.eventFilter = eventFilter;
}
public void open() throws IOException {
@@ -90,27 +96,43 @@ protected boolean startIfFirstTime() throws IOException {
}
protected WatchEvent translate(java.nio.file.WatchEvent> jdkEvent) {
- WatchEvent.Kind kind;
- if (jdkEvent.kind() == StandardWatchEventKinds.ENTRY_CREATE) {
- kind = WatchEvent.Kind.CREATED;
- }
- else if (jdkEvent.kind() == StandardWatchEventKinds.ENTRY_MODIFY) {
- kind = WatchEvent.Kind.MODIFIED;
- }
- else if (jdkEvent.kind() == StandardWatchEventKinds.ENTRY_DELETE) {
- kind = WatchEvent.Kind.DELETED;
- }
- else if (jdkEvent.kind() == StandardWatchEventKinds.OVERFLOW) {
- kind = WatchEvent.Kind.OVERFLOW;
- }
- else {
- throw new IllegalArgumentException("Unexpected watch event: " + jdkEvent);
- }
+ var kind = translate(jdkEvent.kind());
var rootPath = path;
- var relativePath = kind == WatchEvent.Kind.OVERFLOW ? Path.of("") : (@Nullable Path)jdkEvent.context();
+ var relativePath = kind == WatchEvent.Kind.OVERFLOW ? null : (@Nullable Path) jdkEvent.context();
var event = new WatchEvent(kind, rootPath, relativePath);
logger.trace("Translated: {} to {}", jdkEvent, event);
return event;
}
+
+ protected WatchEvent.Kind translate(java.nio.file.WatchEvent.Kind> jdkKind) {
+ if (jdkKind == StandardWatchEventKinds.ENTRY_CREATE) {
+ return WatchEvent.Kind.CREATED;
+ }
+ if (jdkKind == StandardWatchEventKinds.ENTRY_MODIFY) {
+ return WatchEvent.Kind.MODIFIED;
+ }
+ if (jdkKind == StandardWatchEventKinds.ENTRY_DELETE) {
+ return WatchEvent.Kind.DELETED;
+ }
+ if (jdkKind == StandardWatchEventKinds.OVERFLOW) {
+ return WatchEvent.Kind.OVERFLOW;
+ }
+
+ throw new IllegalArgumentException("Unexpected watch kind: " + jdkKind);
+ }
+
+ // -- EventHandlingWatch --
+
+ @Override
+ public Path getPath() {
+ return path;
+ }
+
+ @Override
+ public void handleEvent(WatchEvent e) {
+ if (eventFilter.test(e)) {
+ eventHandler.accept(this, e);
+ }
+ }
}
diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKDirectoryWatch.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKDirectoryWatch.java
index fcb6004e..a7b5d936 100644
--- a/src/main/java/engineering/swat/watch/impl/jdk/JDKDirectoryWatch.java
+++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKDirectoryWatch.java
@@ -31,13 +31,16 @@
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.Executor;
-import java.util.function.Consumer;
+import java.util.function.BiConsumer;
+import java.util.function.Predicate;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import engineering.swat.watch.WatchEvent;
+import engineering.swat.watch.WatchScope;
+import engineering.swat.watch.impl.EventHandlingWatch;
import engineering.swat.watch.impl.util.BundledSubscription;
import engineering.swat.watch.impl.util.SubscriptionKey;
@@ -45,24 +48,35 @@ public class JDKDirectoryWatch extends JDKBaseWatch {
private final Logger logger = LogManager.getLogger();
private final boolean nativeRecursive;
private volatile @MonotonicNonNull Closeable bundledJDKWatcher;
+ private volatile boolean closed = false;
private static final BundledSubscription>>
BUNDLED_JDK_WATCHERS = new BundledSubscription<>(JDKPoller::register);
- public JDKDirectoryWatch(Path directory, Executor exec, Consumer eventHandler) {
- this(directory, exec, eventHandler, false);
+ public JDKDirectoryWatch(Path directory, Executor exec,
+ BiConsumer eventHandler,
+ Predicate eventFilter) {
+
+ this(directory, exec, eventHandler, eventFilter, false);
}
- public JDKDirectoryWatch(Path directory, Executor exec, Consumer eventHandler, boolean nativeRecursive) {
- super(directory, exec, eventHandler);
+ public JDKDirectoryWatch(Path directory, Executor exec,
+ BiConsumer eventHandler,
+ Predicate eventFilter, boolean nativeRecursive) {
+
+ super(directory, exec, eventHandler, eventFilter);
this.nativeRecursive = nativeRecursive;
}
- private void handleChanges(List> events) {
+ public boolean isClosed() {
+ return closed;
+ }
+
+ private void handleJDKEvents(List> events) {
exec.execute(() -> {
for (var ev : events) {
try {
- eventHandler.accept(translate(ev));
+ handleEvent(translate(ev));
}
catch (Throwable ignored) {
logger.error("Ignoring downstream exception:", ignored);
@@ -73,10 +87,23 @@ private void handleChanges(List> events) {
// -- JDKBaseWatch --
+ @Override
+ public WatchScope getScope() {
+ return nativeRecursive ? WatchScope.PATH_AND_ALL_DESCENDANTS : WatchScope.PATH_AND_CHILDREN;
+ }
+
+ @Override
+ public void handleEvent(WatchEvent e) {
+ if (!closed) {
+ super.handleEvent(e);
+ }
+ }
+
@Override
public synchronized void close() throws IOException {
- if (bundledJDKWatcher != null) {
+ if (!closed && bundledJDKWatcher != null) {
logger.trace("Closing watch for: {}", this.path);
+ closed = true;
bundledJDKWatcher.close();
}
}
@@ -85,6 +112,6 @@ public synchronized void close() throws IOException {
protected synchronized void start() throws IOException {
assert bundledJDKWatcher == null;
var key = new SubscriptionKey(path, nativeRecursive);
- bundledJDKWatcher = BUNDLED_JDK_WATCHERS.subscribe(key, this::handleChanges);
+ bundledJDKWatcher = BUNDLED_JDK_WATCHERS.subscribe(key, this::handleJDKEvents);
}
}
diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java
new file mode 100644
index 00000000..587bb1f4
--- /dev/null
+++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java
@@ -0,0 +1,303 @@
+/*
+ * BSD 2-Clause License
+ *
+ * Copyright (c) 2023, Swat.engineering
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+ * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+ * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+ * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+ * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+package engineering.swat.watch.impl.jdk;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+import engineering.swat.watch.WatchEvent;
+import engineering.swat.watch.WatchScope;
+import engineering.swat.watch.impl.EventHandlingWatch;
+
+public class JDKFileTreeWatch extends JDKBaseWatch {
+ private final Logger logger = LogManager.getLogger();
+ private final Path rootPath;
+ private final Path relativePathParent;
+ private final Map childWatches = new ConcurrentHashMap<>();
+ private final JDKDirectoryWatch internal;
+
+ public JDKFileTreeWatch(Path fullPath, Executor exec,
+ BiConsumer eventHandler,
+ Predicate eventFilter) {
+
+ this(fullPath, Path.of(""), exec, eventHandler, eventFilter);
+ }
+
+ public JDKFileTreeWatch(Path rootPath, Path relativePathParent, Executor exec,
+ BiConsumer eventHandler,
+ Predicate eventFilter) {
+
+ super(rootPath.resolve(relativePathParent), exec, eventHandler, eventFilter);
+ this.rootPath = rootPath;
+ this.relativePathParent = relativePathParent;
+
+ var internalEventHandler = eventHandler.andThen(new AsyncChildWatchesUpdater());
+ this.internal = new JDKDirectoryWatch(path, exec, internalEventHandler, eventFilter) {
+
+ // Override to ensure that this watch relativizes events wrt
+ // `rootPath` (instead of `path`, as is the default behavior)
+ @Override
+ public WatchEvent relativize(WatchEvent event) {
+ // Assumption: The parent of the full path of `event` and the
+ // path of this watch are the same, so we only need to append
+ // the file name of `event` to relativize.
+ assert Objects.equals(
+ event.calculateFullPath().getParent(),
+ rootPath.resolve(relativePathParent));
+
+ var fileName = event.getFileName();
+ return new WatchEvent(event.getKind(), rootPath,
+ fileName == null ? relativePathParent : relativePathParent.resolve(fileName));
+ }
+
+ // Override to ensure that this watch translates JDK events using
+ // `rootPath` (instead of `path`, as is the default behavior).
+ // Events returned by this method do not need to be relativized.
+ @Override
+ protected WatchEvent translate(java.nio.file.WatchEvent> jdkEvent) {
+ var kind = translate(jdkEvent.kind());
+
+ Path relativePath = null;
+ if (kind != WatchEvent.Kind.OVERFLOW) {
+ var child = (Path) jdkEvent.context();
+ if (child != null) {
+ relativePath = relativePathParent.resolve(child);
+ }
+ }
+
+ var event = new WatchEvent(kind, rootPath, relativePath);
+ logger.trace("Translated: {} to {}", jdkEvent, event);
+ return event;
+ }
+ };
+ }
+
+ /**
+ * Event handler that asynchronously (using {@link JDKBaseWatch#exec})
+ * updates the child watches according to the following rules: (a) when an
+ * overflow happens, the directory is rescanned, new child watches for
+ * created subdirectories are opened, existing child watches for deleted
+ * subdirectories are closed, and the overflow is propagated to each child
+ * watch; (b) when a subdirectory creation happens, a new child watch is
+ * opened for that subdirectory; (c) when a subdirectory deletion happens,
+ * an existing child watch is closed for that subdirectory.
+ */
+ private class AsyncChildWatchesUpdater implements BiConsumer {
+ @Override
+ public void accept(EventHandlingWatch watch, WatchEvent event) {
+ exec.execute(() -> {
+ switch (event.getKind()) {
+ case OVERFLOW: acceptOverflow(); break;
+ case CREATED: getFileNameAndThen(event, this::acceptCreated); break;
+ case DELETED: getFileNameAndThen(event, this::acceptDeleted); break;
+ case MODIFIED: break;
+ }
+ });
+ }
+
+ private void getFileNameAndThen(WatchEvent event, Consumer consumer) {
+ var child = event.getFileName();
+ if (child != null) {
+ consumer.accept(child);
+ } else {
+ logger.error("Could not get file name of event: {}", event);
+ }
+ }
+
+ private void acceptOverflow() {
+ syncChildWatchesWithFileSystem();
+ for (var childWatch : childWatches.values()) {
+ reportOverflowTo(childWatch);
+ }
+ }
+
+ private void acceptCreated(Path child) {
+ if (Files.isDirectory(path.resolve(child))) {
+ var childWatch = openChildWatch(child);
+ // Events in the newly created directory might have been missed
+ // between its creation and setting up its watch. So, generate
+ // an `OVERFLOW` event for the watch.
+ if (childWatch != null) {
+ reportOverflowTo(childWatch);
+ }
+ }
+ }
+
+ private void acceptDeleted(Path child) {
+ tryCloseChildWatch(child);
+ }
+
+ private void reportOverflowTo(JDKFileTreeWatch childWatch) {
+ var overflow = new WatchEvent(WatchEvent.Kind.OVERFLOW,
+ childWatch.rootPath, childWatch.relativePathParent);
+ childWatch.handleEvent(overflow);
+ }
+ }
+
+ private void syncChildWatchesWithFileSystem() {
+ var toBeClosed = new HashSet<>(childWatches.keySet());
+
+ try (var children = Files.find(path, 1, (p, attrs) -> p != path && attrs.isDirectory())) {
+ children.forEach(p -> {
+ var child = p.getFileName();
+ if (child != null) {
+ toBeClosed.remove(child);
+ openChildWatch(child);
+ } else {
+ logger.error("File tree watch (for: {}) could not open a child watch for: {}", path, p);
+ }
+ });
+ } catch (IOException e) {
+ logger.error("File tree watch (for: {}) could not iterate over its children ({})", path, e);
+ }
+
+ for (var child : toBeClosed) {
+ tryCloseChildWatch(child);
+ }
+ }
+
+ /**
+ * @return A child watch for {@code child} when the parent watch is still
+ * open, or {@code null} when it is already closed.
+ */
+ private @Nullable JDKFileTreeWatch openChildWatch(Path child) {
+ assert !child.isAbsolute();
+
+ Function newChildWatch = p -> new JDKFileTreeWatch(
+ rootPath, relativePathParent.resolve(child), exec, eventHandler, eventFilter);
+ var childWatch = childWatches.computeIfAbsent(child, newChildWatch);
+
+ // The following may have happened at this point:
+ // 1. Thread A: Reads `closed` at the beginning of an event handler,
+ // sees it's `false`, runs the event handler, and reaches the
+ // beginning of this method (but doesn't execute it yet).
+ // 2. Thread B: Writes `true` to `closed`, gets all child watches from
+ // the map, and closes them.
+ // 3. Thread A: Creates a new child watch and puts it into the map.
+ //
+ // Without additional synchronization, which is costly, there will
+ // always be a small window between the end of (1) and the beginning of
+ // (2) that causes a "dangling" child watch to remain open when the
+ // parent watch is closed. To mitigate this, after optimistically
+ // putting a child watch into the map, we immediately close it when
+ // needed.
+ if (internal.isClosed()) {
+ tryClose(childWatch);
+ return null;
+ }
+
+ try {
+ childWatch.startIfFirstTime();
+ } catch (IOException e) {
+ logger.error("Could not open (nested) file tree watch for: {} ({})", child, e);
+ }
+ return childWatch;
+ }
+
+ private void tryCloseChildWatch(Path child) {
+ try {
+ closeChildWatch(child);
+ } catch (IOException e) {
+ logger.error("Could not close (nested) file tree watch for: {} ({})", path.resolve(child), e);
+ }
+ }
+
+ private void closeChildWatch(Path child) throws IOException {
+ assert !child.isAbsolute();
+
+ var childWatch = childWatches.remove(child);
+ if (childWatch != null) {
+ childWatch.close();
+ }
+ }
+
+ private @Nullable IOException tryClose(Closeable c) {
+ try {
+ c.close();
+ return null;
+ } catch (IOException ex) {
+ logger.error("Could not close watch", ex);
+ return ex;
+ } catch (Exception ex) {
+ logger.error("Could not close watch", ex);
+ return new IOException("Unexpected exception when closing", ex);
+ }
+ }
+
+ // -- JDKBaseWatch --
+
+ @Override
+ public WatchScope getScope() {
+ return WatchScope.PATH_AND_ALL_DESCENDANTS;
+ }
+
+ @Override
+ public void handleEvent(WatchEvent event) {
+ internal.handleEvent(event);
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ var firstFail = tryClose(internal);
+ for (var c : childWatches.values()) {
+ var currentFail = tryClose(c);
+ if (currentFail != null && firstFail == null) {
+ firstFail = currentFail;
+ }
+ }
+ if (firstFail != null) {
+ throw firstFail;
+ }
+ }
+
+ @Override
+ protected synchronized void start() throws IOException {
+ internal.open();
+ syncChildWatchesWithFileSystem();
+ // There's no need to report an overflow event, because `internal` was
+ // opened *before* the file system was accessed to fetch children. Thus,
+ // if a new directory is created while this method is running, then at
+ // least one of the following is true: (a) the new directory is already
+ // visible by the time the file system is accessed; (b) its `CREATED`
+ // event is handled later, which starts a new child watch if needed.
+ }
+}
diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileWatch.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileWatch.java
index bd27b83c..ab64116e 100644
--- a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileWatch.java
+++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileWatch.java
@@ -29,14 +29,16 @@
import java.io.IOException;
import java.nio.file.Path;
import java.util.concurrent.Executor;
-import java.util.function.Consumer;
+import java.util.function.BiConsumer;
+import java.util.function.Predicate;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import engineering.swat.watch.WatchEvent;
+import engineering.swat.watch.WatchScope;
+import engineering.swat.watch.impl.EventHandlingWatch;
/**
* It's not possible to monitor a single file (or directory), so we have to find a directory watcher, and connect to that
@@ -45,17 +47,30 @@
*/
public class JDKFileWatch extends JDKBaseWatch {
private final Logger logger = LogManager.getLogger();
- private final Path parent;
- private final Path fileName;
- private volatile @MonotonicNonNull JDKDirectoryWatch parentWatch;
+ private final JDKBaseWatch internal;
- public JDKFileWatch(Path file, Executor exec, Consumer eventHandler) {
- super(file, exec, eventHandler);
+ public JDKFileWatch(Path file, Executor exec,
+ BiConsumer eventHandler,
+ Predicate eventFilter) {
+
+ super(file, exec, eventHandler, eventFilter);
var message = "The root path is not a valid path for a file watch";
- this.parent = requireNonNull(path.getParent(), message);
- this.fileName = requireNonNull(path.getFileName(), message);
- assert !parent.equals(path);
+ var parent = requireNonNull(file.getParent(), message);
+ var fileName = requireNonNull(file.getFileName(), message);
+ assert !parent.equals(file);
+
+ this.internal = new JDKDirectoryWatch(parent, exec, (w, e) -> {
+ if (e.getKind() == WatchEvent.Kind.OVERFLOW) {
+ var overflow = new WatchEvent(WatchEvent.Kind.OVERFLOW, file);
+ eventHandler.accept(w, overflow);
+ }
+ if (fileName.equals(e.getRelativePath())) {
+ eventHandler.accept(w, e);
+ }
+ }, eventFilter);
+
+ logger.debug("File watch (for: {}) is in reality a directory watch (for: {}) with a filter (for: {})", file, parent, fileName);
}
private static Path requireNonNull(@Nullable Path p, String message) {
@@ -65,26 +80,25 @@ private static Path requireNonNull(@Nullable Path p, String message) {
return p;
}
- private void filter(WatchEvent event) {
- if (fileName.equals(event.getRelativePath())) {
- eventHandler.accept(event);
- }
+ // -- JDKBaseWatch --
+
+ @Override
+ public WatchScope getScope() {
+ return WatchScope.PATH_ONLY;
}
- // -- JDKBaseWatch --
+ @Override
+ public void handleEvent(WatchEvent event) {
+ internal.handleEvent(event);
+ }
@Override
public synchronized void close() throws IOException {
- if (parentWatch != null) {
- parentWatch.close();
- }
+ internal.close();
}
@Override
protected synchronized void start() throws IOException {
- assert parentWatch == null;
- parentWatch = new JDKDirectoryWatch(parent, exec, this::filter);
- parentWatch.open();
- logger.debug("File watch (for: {}) is in reality a directory watch (for: {}) with a filter (for: {})", path, parent, fileName);
+ internal.open();
}
}
diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKRecursiveDirectoryWatch.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKRecursiveDirectoryWatch.java
deleted file mode 100644
index ec8b16b7..00000000
--- a/src/main/java/engineering/swat/watch/impl/jdk/JDKRecursiveDirectoryWatch.java
+++ /dev/null
@@ -1,331 +0,0 @@
-/*
- * BSD 2-Clause License
- *
- * Copyright (c) 2023, Swat.engineering
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice, this
- * list of conditions and the following disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
- * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
- * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
- * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
- * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
- * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
- * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
-package engineering.swat.watch.impl.jdk;
-
-import java.io.IOException;
-import java.nio.file.FileVisitResult;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.SimpleFileVisitor;
-import java.nio.file.attribute.BasicFileAttributes;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Deque;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executor;
-import java.util.function.Consumer;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import engineering.swat.watch.WatchEvent;
-
-public class JDKRecursiveDirectoryWatch extends JDKBaseWatch {
- private final Logger logger = LogManager.getLogger();
- private final ConcurrentMap activeWatches = new ConcurrentHashMap<>();
-
- public JDKRecursiveDirectoryWatch(Path directory, Executor exec, Consumer eventHandler) {
- super(directory, exec, eventHandler);
- }
-
- private void processEvents(WatchEvent ev) {
- logger.trace("Forwarding event: {}", ev);
- eventHandler.accept(ev);
- logger.trace("Unwrapping event: {}", ev);
- switch (ev.getKind()) {
- case CREATED: handleCreate(ev); break;
- case DELETED: handleDeleteDirectory(ev); break;
- case OVERFLOW: handleOverflow(ev); break;
- case MODIFIED: break;
- }
- }
-
- private void publishExtraEvents(List ev) {
- logger.trace("Reporting new nested directories & files: {}", ev);
- ev.forEach(eventHandler);
- }
-
-
- private void handleCreate(WatchEvent ev) {
- // between the event and the current state of the file system
- // we might have some nested directories we missed
- // so if we have a new directory, we have to go in and iterate over it
- // we also have to report all nested files & dirs as created paths
- // but we don't want to delay the publication of this
- // create till after the processing is done, so we schedule it in the background
- var fullPath = ev.calculateFullPath();
- if (!activeWatches.containsKey(fullPath)) {
- CompletableFuture
- .completedFuture(fullPath)
- .thenApplyAsync(this::registerForNewDirectory, exec)
- .thenAcceptAsync(this::publishExtraEvents, exec)
- .exceptionally(ex -> {
- logger.error("Could not locate new sub directories for: {}", ev.calculateFullPath(), ex);
- return null;
- });
- }
- }
-
- private void handleOverflow(WatchEvent ev) {
- logger.info("Overflow detected, rescanning to find missed entries in {}", path);
- CompletableFuture
- .completedFuture(ev.calculateFullPath())
- .thenApplyAsync(this::syncAfterOverflow, exec)
- .thenAcceptAsync(this::publishExtraEvents, exec)
- .exceptionally(ex -> {
- logger.error("Could not register new watch for: {} ({})", ev.calculateFullPath(), ex);
- return null;
- });
- }
-
- private void handleDeleteDirectory(WatchEvent ev) {
- var removedPath = ev.calculateFullPath();
- try {
- var existingWatch = activeWatches.remove(removedPath);
- if (existingWatch != null) {
- logger.debug("Clearing watch on removed directory: {}", removedPath);
- existingWatch.close();
- }
- } catch (IOException ex) {
- logger.error("Error clearing: {} {}", removedPath, ex);
- }
- }
-
- /** Only register a watch for every sub directory */
- private class InitialDirectoryScan extends SimpleFileVisitor {
- protected final Path subRoot;
-
- public InitialDirectoryScan(Path root) {
- this.subRoot = root;
- }
- @Override
- public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException {
- logger.error("We could not visit {} to schedule recursive file watches: {}", file, exc);
- return FileVisitResult.CONTINUE;
- }
-
- @Override
- public FileVisitResult preVisitDirectory(Path subdir, BasicFileAttributes attrs) throws IOException {
- addNewDirectory(subdir);
- return FileVisitResult.CONTINUE;
- }
-
- @Override
- public FileVisitResult postVisitDirectory(Path subdir, IOException exc) throws IOException {
- if (exc != null) {
- logger.error("Error during directory iteration: {} = {}", subdir, exc);
- }
- return FileVisitResult.CONTINUE;
- }
-
- private void addNewDirectory(Path dir) throws IOException {
- var watch = activeWatches.computeIfAbsent(dir, d -> new JDKDirectoryWatch(d, exec, relocater(dir)));
- try {
- if (!watch.startIfFirstTime()) {
- logger.debug("We lost the race on starting a nested watch, that shouldn't be a problem, but it's a very busy, so we might have lost a few events in {}", dir);
- }
- } catch (IOException ex) {
- activeWatches.remove(dir);
- logger.error("Could not register a watch for: {} ({})", dir, ex);
- throw ex;
- }
- }
-
- /** Make sure that the events are relative to the actual root of the recursive watch */
- private Consumer relocater(Path subRoot) {
- final Path newRelative = path.relativize(subRoot);
- return ev -> {
- var rewritten = new WatchEvent(ev.getKind(), path, newRelative.resolve(ev.getRelativePath()));
- processEvents(rewritten);
- };
- }
- }
-
- /** register watch for new sub-dir, but also simulate event for every file & subdir found */
- private class NewDirectoryScan extends InitialDirectoryScan {
- protected final List events;
- protected final Set seenFiles;
- protected final Set seenDirs;
- private boolean hasFiles = false;
- public NewDirectoryScan(Path subRoot, List events, Set seenFiles, Set seenDirs) {
- super(subRoot);
- this.events = events;
- this.seenFiles = seenFiles;
- this.seenDirs = seenDirs;
- }
-
- @Override
- public FileVisitResult preVisitDirectory(Path subdir, BasicFileAttributes attrs) throws IOException {
- try {
- hasFiles = false;
- if (!seenDirs.contains(subdir)) {
- if (!subdir.equals(subRoot)) {
- events.add(new WatchEvent(WatchEvent.Kind.CREATED, path, path.relativize(subdir)));
- }
- return super.preVisitDirectory(subdir, attrs);
- }
- // our children might have newer results
- return FileVisitResult.CONTINUE;
- } finally {
- seenDirs.add(subdir);
- }
- }
-
- @Override
- public FileVisitResult postVisitDirectory(Path subdir, IOException exc) throws IOException {
- if (hasFiles) {
- events.add(new WatchEvent(WatchEvent.Kind.MODIFIED, path, path.relativize(subdir)));
- }
- return super.postVisitDirectory(subdir, exc);
- }
-
- @Override
- public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
- if (!seenFiles.contains(file)) {
- hasFiles = true;
-
- var relative = path.relativize(file);
- events.add(new WatchEvent(WatchEvent.Kind.CREATED, path, relative));
- if (attrs.size() > 0) {
- events.add(new WatchEvent(WatchEvent.Kind.MODIFIED, path, relative));
- }
- seenFiles.add(file);
- }
- return FileVisitResult.CONTINUE;
- }
- }
-
- /** detect directories that aren't tracked yet, and generate events only for new entries */
- private class OverflowSyncScan extends NewDirectoryScan {
- private final Deque isNewDirectory = new ArrayDeque<>();
- public OverflowSyncScan(Path subRoot, List events, Set seenFiles, Set seenDirs) {
- super(subRoot, events, seenFiles, seenDirs);
- }
- @Override
- public FileVisitResult preVisitDirectory(Path subdir, BasicFileAttributes attrs) throws IOException {
- if (!activeWatches.containsKey(subdir)) {
- isNewDirectory.addLast(true);
- return super.preVisitDirectory(subdir, attrs);
- }
- isNewDirectory.addLast(false);
- return FileVisitResult.CONTINUE;
- }
- @Override
- public FileVisitResult postVisitDirectory(Path subdir, IOException exc) throws IOException {
- isNewDirectory.removeLast();
- return super.postVisitDirectory(subdir, exc);
- }
- @Override
- public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
- if (isNewDirectory.peekLast() == Boolean.TRUE || !seenFiles.contains(file)) {
- return super.visitFile(file, attrs);
- }
- return FileVisitResult.CONTINUE;
- }
- }
-
- private void registerInitialWatches(Path dir) throws IOException {
- Files.walkFileTree(dir, new InitialDirectoryScan(dir));
- }
-
- private List registerForNewDirectory(Path dir) {
- var events = new ArrayList();
- var seenFiles = new HashSet();
- var seenDirectories = new HashSet();
- try {
- Files.walkFileTree(dir, new NewDirectoryScan(dir, events, seenFiles, seenDirectories));
- detectedMissingEntries(dir, events, seenFiles, seenDirectories);
- return events;
- } catch (IOException ex) {
- throw new RuntimeException(ex);
- }
- }
-
-
- private List syncAfterOverflow(Path dir) {
- var events = new ArrayList();
- var seenFiles = new HashSet();
- var seenDirectories = new HashSet();
- try {
- Files.walkFileTree(dir, new OverflowSyncScan(dir, events, seenFiles, seenDirectories));
- detectedMissingEntries(dir, events, seenFiles, seenDirectories);
- return events;
- } catch (IOException ex) {
- throw new RuntimeException(ex);
- }
- }
-
- private void detectedMissingEntries(Path dir, ArrayList events, HashSet seenFiles, HashSet seenDirectories) throws IOException {
- // why a second round? well there is a race, between iterating the directory (and sending events)
- // and when the watches are active. so after we know all the new watches have been registered
- // we do a second scan and make sure to find paths that weren't visible the first time
- // and emulate events for them (and register new watches)
- // In essence this is the same as when an Overflow happened, so we can reuse that handler.
- int directoryCount = seenDirectories.size() - 1;
- while (directoryCount != seenDirectories.size()) {
- Files.walkFileTree(dir, new OverflowSyncScan(dir, events, seenFiles, seenDirectories));
- directoryCount = seenDirectories.size();
- }
- }
-
- // -- JDKBaseWatch --
-
- @Override
- public void close() throws IOException {
- IOException firstFail = null;
- for (var e : activeWatches.entrySet()) {
- try {
- e.getValue().close();
- } catch (IOException ex) {
- logger.error("Could not close watch", ex);
- if (firstFail == null) {
- firstFail = ex;
- }
- }
- catch (Exception ex) {
- logger.error("Could not close watch", ex);
- if (firstFail == null) {
- firstFail = new IOException("Unexpected exception when closing", ex);
- }
- }
- }
- if (firstFail != null) {
- throw firstFail;
- }
- }
-
- @Override
- protected void start() throws IOException {
- logger.debug("Running recursive watch for: {}", path);
- registerInitialWatches(path);
- }
-}
diff --git a/src/main/java/engineering/swat/watch/impl/overflows/BaseFileVisitor.java b/src/main/java/engineering/swat/watch/impl/overflows/BaseFileVisitor.java
new file mode 100644
index 00000000..e3233ebe
--- /dev/null
+++ b/src/main/java/engineering/swat/watch/impl/overflows/BaseFileVisitor.java
@@ -0,0 +1,87 @@
+/*
+ * BSD 2-Clause License
+ *
+ * Copyright (c) 2023, Swat.engineering
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+ * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+ * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+ * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+ * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+package engineering.swat.watch.impl.overflows;
+
+import java.io.IOException;
+import java.nio.file.FileVisitOption;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.util.EnumSet;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import engineering.swat.watch.WatchEvent;
+import engineering.swat.watch.WatchScope;
+
+/**
+ * Base extension of {@link SimpleFileVisitor}, intended to be further
+ * specialized by subclasses to auto-handle {@link WatchEvent.Kind#OVERFLOW}
+ * events. In particular, method {@link #walkFileTree} of this class internally
+ * calls {@link Files#walkFileTree} to visit the file tree that starts at
+ * {@link #path}, with a maximum depth inferred from {@link #scope}. Subclasses
+ * can be specialized, for instance, to generate synthetic events or index a
+ * file tree.
+ */
+public class BaseFileVisitor extends SimpleFileVisitor {
+ private final Logger logger = LogManager.getLogger();
+ protected final Path path;
+ protected final WatchScope scope;
+
+ public BaseFileVisitor(Path path, WatchScope scope) {
+ this.path = path;
+ this.scope = scope;
+ }
+
+ public void walkFileTree() {
+ var options = EnumSet.noneOf(FileVisitOption.class);
+ var maxDepth = scope == WatchScope.PATH_AND_ALL_DESCENDANTS ? Integer.MAX_VALUE : 1;
+ try {
+ Files.walkFileTree(path, options, maxDepth, this);
+ } catch (IOException e) {
+ logger.error("Could not walk: {} ({})", path, e);
+ }
+ }
+
+ // -- SimpleFileVisitor --
+
+ @Override
+ public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException {
+ logger.error("Could not walk regular file: {} ({})", file, exc);
+ return FileVisitResult.CONTINUE;
+ }
+
+ @Override
+ public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
+ if (exc != null) {
+ logger.error("Could not walk directory: {} ({})", dir, exc);
+ }
+ return FileVisitResult.CONTINUE;
+ }
+}
diff --git a/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java b/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java
new file mode 100644
index 00000000..4dcf2ac5
--- /dev/null
+++ b/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java
@@ -0,0 +1,306 @@
+/*
+ * BSD 2-Clause License
+ *
+ * Copyright (c) 2023, Swat.engineering
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+ * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+ * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+ * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+ * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+package engineering.swat.watch.impl.overflows;
+
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.nio.file.attribute.FileTime;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.function.BiFunction;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+import engineering.swat.watch.WatchEvent;
+import engineering.swat.watch.WatchScope;
+import engineering.swat.watch.impl.EventHandlingWatch;
+
+public class IndexingRescanner extends MemorylessRescanner {
+ private final Logger logger = LogManager.getLogger();
+ private final PathMap index = new PathMap<>();
+
+ public IndexingRescanner(Executor exec, Path path, WatchScope scope) {
+ super(exec);
+ new Indexer(path, scope).walkFileTree(); // Make an initial scan to populate the index
+ }
+
+ private static class PathMap {
+ private final Map> values = new ConcurrentHashMap<>();
+ // ^^^^ ^^^^
+ // Parent File name (regular file or directory)
+
+ public @Nullable V put(Path p, V value) {
+ return apply(put(value), p);
+ }
+
+ public @Nullable V get(Path p) {
+ return apply(this::get, p);
+ }
+
+ public Set getParents() {
+ return (Set) values.keySet(); // Cast for Checker Framework
+ }
+
+ public Set getFileNames(Path parent) {
+ var inner = values.get(parent);
+ return inner == null ? Collections.emptySet() : (Set) inner.keySet(); // Cast for Checker Framework
+ }
+
+ public @Nullable V remove(Path p) {
+ return apply(this::remove, p);
+ }
+
+ private static @Nullable V apply(BiFunction action, Path p) {
+ var parent = p.getParent();
+ var fileName = p.getFileName();
+ if (parent != null && fileName != null) {
+ return action.apply(parent, fileName);
+ } else {
+ throw new IllegalArgumentException("The path should have both a parent and a file name");
+ }
+ }
+
+ private BiFunction put(V value) {
+ return (parent, fileName) -> put(parent, fileName, value);
+ }
+
+ private @Nullable V put(Path parent, Path fileName, V value) {
+ var inner = values.computeIfAbsent(parent, x -> new ConcurrentHashMap<>());
+
+ // This thread (henceforth: "here") optimistically puts a new entry
+ // in `inner`. However, another thread (henceforth: "there") may
+ // concurrently remove `inner` from `values`. Thus, the new entry
+ // may be lost. The comments below explain the countermeasures.
+ var previous = inner.put(fileName, value);
+
+ // <-- At this point "here", if `values.remove(parent)` happens
+ // "there", then `values.get(parent) != inner` becomes true
+ // "here", so the new entry will be re-put "here".
+ if (values.get(parent) != inner) {
+ previous = put(parent, fileName, value);
+ }
+ // <-- At this point "here", `!inner.isEmpty()` has become true
+ // "there", so if `values.remove(parent)` happens "there", then
+ // the new entry will be re-put "there".
+ return previous;
+ }
+
+ private @Nullable V get(Path parent, Path fileName) {
+ var inner = values.get(parent);
+ return inner == null ? null : inner.get(fileName);
+ }
+
+ private @Nullable V remove(Path parent, Path fileName) {
+ var inner = values.get(parent);
+ if (inner != null) {
+ var removed = inner.remove(fileName);
+
+ // This thread (henceforth: "here") optimistically removes
+ // `inner` from `values` when it has become empty. However,
+ // another thread (henceforth: "there") may concurrently put a
+ // new entry in `inner`. Thus, the new entry may be lost. The
+ // comments below explain the countermeasures.
+ if (inner.isEmpty() && values.remove(parent, inner)) {
+
+ // <-- At this point "here", if `inner.put(...)` happens
+ // "there", then `!inner.isEmpty()` becomes true "here",
+ // so the new entry is re-put "here".
+ if (!inner.isEmpty()) {
+ for (var e : inner.entrySet()) {
+ put(parent, e.getKey(), e.getValue());
+ }
+ }
+ // <-- At this point "here", `values.get(parent) != inner`
+ // has become true "there", so if `inner.put(...)`
+ // happens "there", then the new entry will be re-put
+ // "there".
+ }
+ return removed;
+ } else {
+ return null;
+ }
+ }
+ }
+
+ private class Indexer extends BaseFileVisitor {
+ public Indexer(Path path, WatchScope scope) {
+ super(path, scope);
+ }
+
+ @Override
+ public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
+ if (!path.equals(dir)) {
+ index.put(dir, attrs.lastModifiedTime());
+ }
+ return FileVisitResult.CONTINUE;
+ }
+
+ @Override
+ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
+ index.put(file, attrs.lastModifiedTime());
+ return FileVisitResult.CONTINUE;
+ }
+ }
+
+ // -- MemorylessRescanner --
+
+ @Override
+ protected MemorylessRescanner.Generator newGenerator(Path path, WatchScope scope) {
+ return new Generator(path, scope);
+ }
+
+ protected class Generator extends MemorylessRescanner.Generator {
+ // Field to keep track of (a stack of) the paths that are visited during
+ // the current rescan (one frame for each nested subdirectory), to
+ // approximate `DELETED` events that happened since the previous rescan.
+ // Instances of this class are supposed to be used non-concurrently, so
+ // no synchronization to access this field is needed.
+ private final Deque> visited = new ArrayDeque<>();
+
+ public Generator(Path path, WatchScope scope) {
+ super(path, scope);
+ this.visited.push(new HashSet<>()); // Initial set for content of `path`
+ }
+
+ private void addToPeeked(Deque> deque, Path p) {
+ var peeked = deque.peek();
+ var fileName = p.getFileName();
+ if (peeked != null && fileName != null) {
+ peeked.add(fileName);
+ }
+ }
+
+ // -- MemorylessRescanner.Generator --
+
+ @Override
+ protected void generateEvents(Path path, BasicFileAttributes attrs) {
+ var lastModifiedTimeOld = index.get(path);
+ var lastModifiedTimeNew = attrs.lastModifiedTime();
+
+ // The path isn't indexed yet
+ if (lastModifiedTimeOld == null) {
+ super.generateEvents(path, attrs);
+ }
+
+ // The path is already indexed, and the previous last-modified-time
+ // is older than the current last-modified-time
+ else if (lastModifiedTimeOld.compareTo(lastModifiedTimeNew) < 0) {
+ events.add(new WatchEvent(WatchEvent.Kind.MODIFIED, path));
+ }
+ }
+
+ @Override
+ public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
+ addToPeeked(visited, dir);
+ visited.push(new HashSet<>());
+ return super.preVisitDirectory(dir, attrs);
+ }
+
+ @Override
+ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
+ addToPeeked(visited, file);
+ return super.visitFile(file, attrs);
+ }
+
+ @Override
+ public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
+ // Issue `DELETED` events based on the set of paths visited in `dir`
+ var visitedInDir = visited.pop();
+ if (visitedInDir != null) {
+ for (var p : index.getFileNames(dir)) {
+ if (!visitedInDir.contains(p)) {
+ var fullPath = dir.resolve(p);
+ // The index may have been updated during the visit, so
+ // even if `p` isn't contained in `visitedInDir`, by
+ // now, it may have come into existence.
+ if (!Files.exists(fullPath)) {
+ events.add(new WatchEvent(WatchEvent.Kind.DELETED, fullPath));
+ }
+ }
+ }
+ }
+ return super.postVisitDirectory(dir, exc);
+ }
+ }
+
+ // -- MemorylessRescanner --
+
+ @Override
+ public void accept(EventHandlingWatch watch, WatchEvent event) {
+ // Auto-handle `OVERFLOW` events
+ super.accept(watch, event);
+
+ // Additional processing is needed to update the index when `CREATED`,
+ // `MODIFIED`, and `DELETED` events happen.
+ var kind = event.getKind();
+ var fullPath = event.calculateFullPath();
+ switch (kind) {
+ case CREATED:
+ case MODIFIED:
+ try {
+ var lastModifiedTimeNew = Files.getLastModifiedTime(fullPath);
+ var lastModifiedTimeOld = index.put(fullPath, lastModifiedTimeNew);
+
+ // If a `MODIFIED` event happens for a path that wasn't in
+ // the index yet, then a `CREATED` event has somehow been
+ // missed. Just in case, it's issued synthetically here.
+ if (lastModifiedTimeOld == null && kind == WatchEvent.Kind.MODIFIED) {
+ var created = new WatchEvent(WatchEvent.Kind.CREATED, fullPath);
+ watch.handleEvent(watch.relativize(created));
+ }
+ } catch (IOException e) {
+ // It can happen that, by the time a `CREATED`/`MODIFIED`
+ // event is handled above, getting the last-modified-time
+ // fails because the file has already been deleted. That's
+ // fine: we can just ignore the event. (The corresponding
+ // `DELETED` event will later be handled and remove the file
+ // from the index.) If the file exists, though, something
+ // went legitimately wrong, so it needs to be reported.
+ if (Files.exists(fullPath)) {
+ logger.error("Could not get modification time of: {} ({})", fullPath, e);
+ }
+ }
+ break;
+ case DELETED:
+ index.remove(fullPath);
+ break;
+ case OVERFLOW: // Already auto-handled above
+ break;
+ }
+ }
+}
diff --git a/src/main/java/engineering/swat/watch/impl/overflows/MemorylessRescanner.java b/src/main/java/engineering/swat/watch/impl/overflows/MemorylessRescanner.java
new file mode 100644
index 00000000..dad8528b
--- /dev/null
+++ b/src/main/java/engineering/swat/watch/impl/overflows/MemorylessRescanner.java
@@ -0,0 +1,114 @@
+/*
+ * BSD 2-Clause License
+ *
+ * Copyright (c) 2023, Swat.engineering
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+ * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+ * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+ * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+ * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+package engineering.swat.watch.impl.overflows;
+
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Path;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.function.BiConsumer;
+import java.util.stream.Stream;
+
+import engineering.swat.watch.WatchEvent;
+import engineering.swat.watch.WatchScope;
+import engineering.swat.watch.impl.EventHandlingWatch;
+
+public class MemorylessRescanner implements BiConsumer {
+ private final Executor exec;
+
+ public MemorylessRescanner(Executor exec) {
+ this.exec = exec;
+ }
+
+ /**
+ * Rescan all files in the scope of `watch` and issue `CREATED` and
+ * `MODIFIED` events (not `DELETED` events) for each file. This method
+ * should typically be executed asynchronously (using `exec`).
+ */
+ protected void rescan(EventHandlingWatch watch) {
+ var generator = newGenerator(watch.getPath(), watch.getScope());
+ generator.walkFileTree();
+ generator.eventStream()
+ .map(watch::relativize)
+ .forEach(watch::handleEvent);
+ }
+
+ protected Generator newGenerator(Path path, WatchScope scope) {
+ return new Generator(path, scope);
+ }
+
+ protected class Generator extends BaseFileVisitor {
+ // When this class is used as intended, `events` is accessed only by one
+ // thread (the one that executes `Files.walkFileTree` via
+ // `BaseFileVisitor.walkFileTree`), so no additional thread-safety
+ // measures are needed to protect it from concurrent accesses.
+ protected final List events = new ArrayList<>();
+
+ public Generator(Path path, WatchScope scope) {
+ super(path, scope);
+ }
+
+ public Stream eventStream() {
+ return events.stream();
+ }
+
+ protected void generateEvents(Path path, BasicFileAttributes attrs) {
+ events.add(new WatchEvent(WatchEvent.Kind.CREATED, path));
+ if (attrs.isRegularFile() && attrs.size() > 0) {
+ events.add(new WatchEvent(WatchEvent.Kind.MODIFIED, path));
+ }
+ }
+
+ // -- BaseFileVisitor --
+
+ @Override
+ public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
+ if (!path.equals(dir)) {
+ generateEvents(dir, attrs);
+ }
+ return FileVisitResult.CONTINUE;
+ }
+
+ @Override
+ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
+ generateEvents(file, attrs);
+ return FileVisitResult.CONTINUE;
+ }
+ }
+
+ // -- BiConsumer --
+
+ @Override
+ public void accept(EventHandlingWatch watch, WatchEvent event) {
+ if (event.getKind() == WatchEvent.Kind.OVERFLOW) {
+ exec.execute(() -> rescan(watch));
+ }
+ }
+}
diff --git a/src/test/java/engineering/swat/watch/RecursiveWatchTests.java b/src/test/java/engineering/swat/watch/RecursiveWatchTests.java
index 5faa2122..1f931207 100644
--- a/src/test/java/engineering/swat/watch/RecursiveWatchTests.java
+++ b/src/test/java/engineering/swat/watch/RecursiveWatchTests.java
@@ -31,8 +31,12 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiPredicate;
+import java.util.function.Consumer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -41,8 +45,11 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
import engineering.swat.watch.WatchEvent.Kind;
+import engineering.swat.watch.impl.EventHandlingWatch;
class RecursiveWatchTests {
private final Logger logger = LogManager.getLogger();
@@ -141,4 +148,85 @@ void deleteOfFileInDirectoryShouldBeVisible() throws IOException {
}
}
+ @ParameterizedTest
+ @EnumSource // Repeat test for each `Approximation` value
+ void overflowsAreRecoveredFrom(Approximation whichFiles) throws IOException, InterruptedException {
+ var parent = testDir.getTestDirectory();
+ var descendants = new Path[] {
+ Path.of("foo"),
+ Path.of("bar"),
+ Path.of("bar", "x", "y", "z")
+ };
+
+ // Define a bunch of helper functions to test which events have happened
+ var events = ConcurrentHashMap. newKeySet(); // Stores all incoming events
+
+ BiPredicate eventsContains = (kind, descendant) ->
+ events.stream().anyMatch(e ->
+ e.getKind().equals(kind) &&
+ e.getRootPath().equals(parent) &&
+ e.getRelativePath().equals(descendant));
+
+ Consumer awaitCreation = p ->
+ await("Creation of `" + p + "` should be observed").until(
+ () -> eventsContains.test(Kind.CREATED, p));
+
+ Consumer awaitNotCreation = p ->
+ await("Creation of `" + p + "` shouldn't be observed: " + events)
+ .pollDelay(TestHelper.TINY_WAIT)
+ .until(() -> !eventsContains.test(Kind.CREATED, p));
+
+ // Configure and start watch
+ var dropEvents = new AtomicBoolean(false); // Toggles overflow simulation
+ var watchConfig = Watcher.watch(parent, WatchScope.PATH_AND_ALL_DESCENDANTS)
+ .withExecutor(ForkJoinPool.commonPool())
+ .onOverflow(whichFiles)
+ .filter(e -> !dropEvents.get())
+ .on(events::add);
+
+ try (var watch = (EventHandlingWatch) watchConfig.start()) {
+ // Begin overflow simulation
+ dropEvents.set(true);
+
+ // Create descendants and files. They *shouldn't* be observed yet.
+ var file1 = Path.of("file1.txt");
+ for (var descendant : descendants) {
+ Files.createDirectories(parent.resolve(descendant));
+ Files.createFile(parent.resolve(descendant).resolve(file1));
+ }
+ for (var descendant : descendants) {
+ awaitNotCreation.accept(descendant);
+ awaitNotCreation.accept(descendant.resolve(file1));
+ }
+
+ // End overflow simulation, and generate the `OVERFLOW` event. The
+ // previous creation of descendants and files *should* now be
+ // observed, unless no auto-handler for `OVERFLOW` events is
+ // configured.
+ dropEvents.set(false);
+ var overflow = new WatchEvent(WatchEvent.Kind.OVERFLOW, parent);
+ watch.handleEvent(overflow);
+
+ if (whichFiles != Approximation.NONE) { // Auto-handler is configured
+ for (var descendant : descendants) {
+ awaitCreation.accept(descendant);
+ awaitCreation.accept(descendant.resolve(file1));
+ }
+ } else {
+ // Give the watch some time to process the `OVERFLOW` event and
+ // do internal bookkeeping
+ Thread.sleep(TestHelper.TINY_WAIT.toMillis());
+ }
+
+ // Create more files. They *should* be observed (regardless of
+ // whether an auto-handler for `OVERFLOW` events is configured).
+ var file2 = Path.of("file2.txt");
+ for (var descendant : descendants) {
+ Files.createFile(parent.resolve(descendant).resolve(file2));
+ }
+ for (var descendant : descendants) {
+ awaitCreation.accept(descendant.resolve(file2));
+ }
+ }
+ }
}
diff --git a/src/test/java/engineering/swat/watch/SingleDirectoryTests.java b/src/test/java/engineering/swat/watch/SingleDirectoryTests.java
index 72d0c656..2738fdb4 100644
--- a/src/test/java/engineering/swat/watch/SingleDirectoryTests.java
+++ b/src/test/java/engineering/swat/watch/SingleDirectoryTests.java
@@ -26,11 +26,15 @@
*/
package engineering.swat.watch;
+import static engineering.swat.watch.WatchEvent.Kind.OVERFLOW;
import static org.awaitility.Awaitility.await;
import java.io.IOException;
import java.nio.file.Files;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
@@ -39,6 +43,7 @@
import org.junit.jupiter.api.Test;
import engineering.swat.watch.WatchEvent.Kind;
+import engineering.swat.watch.impl.EventHandlingWatch;
class SingleDirectoryTests {
private TestDirectory testDir;
@@ -118,4 +123,161 @@ public void onDeleted(WatchEvent ev) {
.untilTrue(seenCreate);
}
}
+
+ @Test
+ void memorylessRescanOnOverflow() throws IOException, InterruptedException {
+ var directory = testDir.getTestDirectory();
+ Files.writeString(directory.resolve("a.txt"), "foo");
+ Files.writeString(directory.resolve("b.txt"), "bar");
+
+ var nCreated = new AtomicInteger();
+ var nModified = new AtomicInteger();
+ var nOverflow = new AtomicInteger();
+ var watchConfig = Watcher.watch(directory, WatchScope.PATH_AND_CHILDREN)
+ .onOverflow(Approximation.ALL)
+ .on(e -> {
+ switch (e.getKind()) {
+ case CREATED:
+ nCreated.incrementAndGet();
+ break;
+ case MODIFIED:
+ nModified.incrementAndGet();
+ break;
+ case OVERFLOW:
+ nOverflow.incrementAndGet();
+ break;
+ default:
+ break;
+ }
+ });
+
+ try (var watch = watchConfig.start()) {
+ var overflow = new WatchEvent(WatchEvent.Kind.OVERFLOW, directory);
+ ((EventHandlingWatch) watch).handleEvent(overflow);
+ Thread.sleep(TestHelper.SHORT_WAIT.toMillis());
+
+ await("Overflow should trigger created events")
+ .until(nCreated::get, Predicate.isEqual(6)); // 3 directories + 3 files
+ await("Overflow should trigger modified events")
+ .until(nModified::get, Predicate.isEqual(2)); // 2 files (c.txt is still empty)
+ await("Overflow should be visible to user-defined event handler")
+ .until(nOverflow::get, Predicate.isEqual(1));
+ }
+ }
+
+ @Test
+ void indexingRescanOnOverflow() throws IOException, InterruptedException {
+ // Preface: This test looks a bit hacky because there's no API to
+ // directly manipulate, or prevent the auto-manipulation of, the index
+ // inside a watch. I've added some comments below to make it make sense.
+
+ var directory = testDir.getTestDirectory();
+ var semaphore = new Semaphore(0);
+
+ var nCreated = new AtomicInteger();
+ var nModified = new AtomicInteger();
+ var nDeleted = new AtomicInteger();
+
+ var watchConfig = Watcher.watch(directory, WatchScope.PATH_AND_CHILDREN)
+ .onOverflow(Approximation.DIFF)
+ .on(e -> {
+ var kind = e.getKind();
+ if (kind != OVERFLOW) {
+ // Threads can handle non-`OVERFLOW` events *only after*
+ // everything is "ready" for that (in which case a token is
+ // released to the semaphore, which is initially empty). See
+ // below for an explanation of "readiness".
+ semaphore.acquireUninterruptibly();
+ switch (e.getKind()) {
+ case CREATED:
+ nCreated.incrementAndGet();
+ break;
+ case MODIFIED:
+ nModified.incrementAndGet();
+ break;
+ case DELETED:
+ nDeleted.incrementAndGet();
+ break;
+ default:
+ break;
+ }
+ semaphore.release();
+ }
+ });
+
+ try (var watch = watchConfig.start()) {
+ Thread.sleep(TestHelper.NORMAL_WAIT.toMillis());
+ // At this point, the index of last-modified-times inside `watch` is
+ // populated with initial values.
+
+ Files.writeString(directory.resolve("a.txt"), "foo");
+ Files.writeString(directory.resolve("b.txt"), "bar");
+ Files.delete(directory.resolve("c.txt"));
+ Files.createFile(directory.resolve("d.txt"));
+ Thread.sleep(TestHelper.NORMAL_WAIT.toMillis());
+ // At this point, regular events have been generated for a.txt,
+ // b.txt, c.txt, and d.txt by the file system. These events won't be
+ // handled by `watch` just yet, though, because the semaphore is
+ // still empty (i.e., event-handling threads are blocked from making
+ // progress). Thus, the index inside `watch` still contains the
+ // initial last-modified-times. (Warning: The blockade works only
+ // when the rescanner runs after the user-defined event-handler.
+ // Currently, this is the case, but changing their order probably
+ // breaks this test.)
+
+ var overflow = new WatchEvent(WatchEvent.Kind.OVERFLOW, directory);
+ ((EventHandlingWatch) watch).handleEvent(overflow);
+ Thread.sleep(TestHelper.NORMAL_WAIT.toMillis());
+ // At this point, the current thread has presumably slept long
+ // enough for the `OVERFLOW` event to have been handled by the
+ // rescanner. This means that synthetic events must have been issued
+ // (because the index still contained the initial last-modified
+ // times).
+
+ // Readiness achieved: Threads can now start handling non-`OVERFLOW`
+ // events.
+ semaphore.release();
+
+ await("Overflow should trigger created events")
+ .until(nCreated::get, n -> n >= 2); // 1 synthetic event + >=1 regular event
+ await("Overflow should trigger modified events")
+ .until(nModified::get, n -> n >= 4); // 2 synthetic events + >=2 regular events
+ await("Overflow should trigger deleted events")
+ .until(nDeleted::get, n -> n >= 2); // 1 synthetic event + >=1 regular event
+
+ // Reset counters for next phase of the test
+ nCreated.set(0);
+ nModified.set(0);
+ nDeleted.set(0);
+
+ // Let's do some more file operations, trigger another `OVERFLOW`
+ // event, and observe that synthetic events *aren't* issued this
+ // time (because the index was already updated when the regular
+ // events were handled).
+ Files.writeString(directory.resolve("b.txt"), "baz");
+ Files.createFile(directory.resolve("c.txt"));
+ Files.delete(directory.resolve("d.txt"));
+
+ await("File create should trigger regular created event")
+ .until(nCreated::get, n -> n >= 1);
+ await("File write should trigger regular modified event")
+ .until(nModified::get, n -> n >= 1);
+ await("File delete should trigger regular deleted event")
+ .until(nDeleted::get, n -> n >= 1);
+
+ var nCreatedBeforeOverflow = nCreated.get();
+ var nModifiedBeforeOverflow = nModified.get();
+ var nDeletedBeforeOverflow = nDeleted.get();
+
+ ((EventHandlingWatch) watch).handleEvent(overflow);
+ Thread.sleep(TestHelper.NORMAL_WAIT.toMillis());
+
+ await("Overflow shouldn't trigger synthetic created event after file create (and index updated)")
+ .until(nCreated::get, Predicate.isEqual(nCreatedBeforeOverflow));
+ await("Overflow shouldn't trigger synthetic modified event after file write (and index updated)")
+ .until(nModified::get, Predicate.isEqual(nModifiedBeforeOverflow));
+ await("Overflow shouldn't trigger synthetic deleted event after file delete (and index updated)")
+ .until(nDeleted::get, Predicate.isEqual(nDeletedBeforeOverflow));
+ }
+ }
}
diff --git a/src/test/java/engineering/swat/watch/SingleFileTests.java b/src/test/java/engineering/swat/watch/SingleFileTests.java
index 206bc9bf..7e2fdb7c 100644
--- a/src/test/java/engineering/swat/watch/SingleFileTests.java
+++ b/src/test/java/engineering/swat/watch/SingleFileTests.java
@@ -26,13 +26,25 @@
*/
package engineering.swat.watch;
+import static engineering.swat.watch.WatchEvent.Kind.CREATED;
+import static engineering.swat.watch.WatchEvent.Kind.DELETED;
+import static engineering.swat.watch.WatchEvent.Kind.MODIFIED;
+import static engineering.swat.watch.WatchEvent.Kind.OVERFLOW;
import static org.awaitility.Awaitility.await;
import java.io.IOException;
import java.nio.file.Files;
+import java.nio.file.Path;
import java.nio.file.attribute.FileTime;
import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
@@ -40,6 +52,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import engineering.swat.watch.impl.EventHandlingWatch;
+
class SingleFileTests {
private TestDirectory testDir;
@@ -117,4 +131,72 @@ void singleFileThatMonitorsOnlyADirectory() throws IOException, InterruptedExcep
.untilTrue(seen);
}
}
+
+ @Test
+ void noRescanOnOverflow() throws IOException, InterruptedException {
+ var bookkeeper = new Bookkeeper();
+ try (var watch = startWatchAndTriggerOverflow(Approximation.NONE, bookkeeper)) {
+ Thread.sleep(TestHelper.SHORT_WAIT.toMillis());
+
+ await("Overflow shouldn't trigger created, modified, or deleted events")
+ .until(() -> bookkeeper.fullPaths(CREATED, MODIFIED, DELETED).count() == 0);
+ await("Overflow should be visible to user-defined event handler")
+ .until(() -> bookkeeper.fullPaths(OVERFLOW).count() == 1);
+ }
+ }
+
+ @Test
+ void memorylessRescanOnOverflow() throws IOException, InterruptedException {
+ var bookkeeper = new Bookkeeper();
+ try (var watch = startWatchAndTriggerOverflow(Approximation.ALL, bookkeeper)) {
+ Thread.sleep(TestHelper.SHORT_WAIT.toMillis());
+
+ var isFile = Predicate.isEqual(watch.getPath());
+ var isNotFile = Predicate.not(isFile);
+
+ await("Overflow should trigger created event for `file`")
+ .until(() -> bookkeeper.fullPaths(CREATED).filter(isFile).count() == 1);
+ await("Overflow shouldn't trigger created events for other files")
+ .until(() -> bookkeeper.fullPaths(CREATED).filter(isNotFile).count() == 0);
+ await("Overflow shouldn't trigger modified events (`file` is empty)")
+ .until(() -> bookkeeper.fullPaths(MODIFIED).count() == 0);
+ await("Overflow shouldn't trigger deleted events")
+ .until(() -> bookkeeper.fullPaths(DELETED).count() == 0);
+ await("Overflow should be visible to user-defined event handler")
+ .until(() -> bookkeeper.fullPaths(OVERFLOW).count() == 1);
+ }
+ }
+
+ private ActiveWatch startWatchAndTriggerOverflow(Approximation whichFiles, Bookkeeper bookkeeper) throws IOException {
+ var parent = testDir.getTestDirectory();
+ var file = parent.resolve("a.txt");
+
+ var watch = Watcher
+ .watch(file, WatchScope.PATH_ONLY)
+ .onOverflow(whichFiles)
+ .on(bookkeeper)
+ .start();
+
+ var overflow = new WatchEvent(WatchEvent.Kind.OVERFLOW, parent);
+ ((EventHandlingWatch) watch).handleEvent(overflow);
+ return watch;
+ }
+
+ private static class Bookkeeper implements Consumer {
+ private final List events = Collections.synchronizedList(new ArrayList<>());
+
+ public Stream events(WatchEvent.Kind... kinds) {
+ var list = Arrays.asList(kinds.length == 0 ? WatchEvent.Kind.values() : kinds);
+ return events.stream().filter(e -> list.contains(e.getKind()));
+ }
+
+ public Stream fullPaths(WatchEvent.Kind... kinds) {
+ return events(kinds).map(WatchEvent::calculateFullPath);
+ }
+
+ @Override
+ public void accept(WatchEvent e) {
+ events.add(e);
+ }
+ }
}
diff --git a/src/test/java/engineering/swat/watch/TestDirectory.java b/src/test/java/engineering/swat/watch/TestDirectory.java
index 6df4d47c..8628cc2a 100644
--- a/src/test/java/engineering/swat/watch/TestDirectory.java
+++ b/src/test/java/engineering/swat/watch/TestDirectory.java
@@ -37,12 +37,11 @@
import java.util.Comparator;
import java.util.List;
-class TestDirectory implements Closeable {
+public class TestDirectory implements Closeable {
private final Path testDirectory;
private final List testFiles;
-
- TestDirectory() throws IOException {
+ public TestDirectory() throws IOException {
testDirectory = Files.createTempDirectory("java-watch-test");
List testFiles = new ArrayList<>();
add3Files(testFiles, testDirectory);
diff --git a/src/test/java/engineering/swat/watch/TestHelper.java b/src/test/java/engineering/swat/watch/TestHelper.java
index 6c2d3104..1eafeba5 100644
--- a/src/test/java/engineering/swat/watch/TestHelper.java
+++ b/src/test/java/engineering/swat/watch/TestHelper.java
@@ -27,9 +27,13 @@
package engineering.swat.watch;
import java.time.Duration;
+import java.util.Arrays;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
public class TestHelper {
+ public static final Duration TINY_WAIT;
public static final Duration SHORT_WAIT;
public static final Duration NORMAL_WAIT;
public static final Duration LONG_WAIT;
@@ -48,9 +52,26 @@ else if (os.contains("win")) {
// especially on small core systems
delayFactor *= 4;
}
+ TINY_WAIT = Duration.ofMillis(250 * delayFactor);
SHORT_WAIT = Duration.ofSeconds(1 * delayFactor);
NORMAL_WAIT = Duration.ofSeconds(4 * delayFactor);
LONG_WAIT = Duration.ofSeconds(8 * delayFactor);
}
+ public static Stream streamOf(T[] values, int repetitions) {
+ return streamOf(values, repetitions, false);
+ }
+
+ public static Stream streamOf(T[] values, int repetitions, boolean sortByRepetition) {
+ if (sortByRepetition) {
+ return IntStream
+ .range(0, repetitions)
+ .boxed()
+ .flatMap(i -> Arrays.stream(values));
+ }
+ else { // Sort by value
+ return Arrays.stream(values).flatMap(v ->
+ IntStream.range(0, repetitions).mapToObj(i -> v));
+ }
+ }
}
diff --git a/src/test/java/engineering/swat/watch/TortureTests.java b/src/test/java/engineering/swat/watch/TortureTests.java
index c2e49568..740a76f3 100644
--- a/src/test/java/engineering/swat/watch/TortureTests.java
+++ b/src/test/java/engineering/swat/watch/TortureTests.java
@@ -46,6 +46,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
+import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -54,8 +55,10 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.RepeatedTest;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.MethodSource;
class TortureTests {
@@ -141,17 +144,18 @@ Set stop() throws InterruptedException {
private static final int THREADS = 4;
- @Test
- void pressureOnFSShouldNotMissNewFilesAnything() throws InterruptedException, IOException {
+ @ParameterizedTest
+ @EnumSource(names = { "ALL", "DIFF" })
+ void pressureOnFSShouldNotMissNewFilesAnything(Approximation whichFiles) throws InterruptedException, IOException {
final var root = testDir.getTestDirectory();
var pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 4);
var io = new IOGenerator(THREADS, root, pool);
-
var seenCreates = ConcurrentHashMap.newKeySet();
var watchConfig = Watcher.watch(testDir.getTestDirectory(), WatchScope.PATH_AND_ALL_DESCENDANTS)
.withExecutor(pool)
+ .onOverflow(whichFiles)
.on(ev -> {
var fullPath = ev.calculateFullPath();
switch (ev.getKind()) {
@@ -161,6 +165,10 @@ void pressureOnFSShouldNotMissNewFilesAnything() throws InterruptedException, IO
case MODIFIED:
// platform specific if this comes by or not
break;
+ case OVERFLOW:
+ // Overflows might happen, but they're auto-handled, so
+ // they can be ignored here
+ break;
default:
logger.error("Unexpected event: {}", ev);
break;
@@ -259,8 +267,14 @@ void manyRegistrationsForSamePath() throws InterruptedException, IOException {
}
}
- @RepeatedTest(failureThreshold=1, value = 20)
- void manyRegisterAndUnregisterSameTime() throws InterruptedException, IOException {
+ static Stream manyRegisterAndUnregisterSameTimeSource() {
+ Approximation[] values = { Approximation.ALL, Approximation.DIFF };
+ return TestHelper.streamOf(values, 5);
+ }
+
+ @ParameterizedTest
+ @MethodSource("manyRegisterAndUnregisterSameTimeSource")
+ void manyRegisterAndUnregisterSameTime(Approximation whichFiles) throws InterruptedException, IOException {
var startRegistering = new Semaphore(0);
var startedWatching = new Semaphore(0);
var stopAll = new Semaphore(0);
@@ -282,6 +296,7 @@ void manyRegisterAndUnregisterSameTime() throws InterruptedException, IOExceptio
for (int k = 0; k < 1000; k++) {
var watcher = Watcher
.watch(testDir.getTestDirectory(), WatchScope.PATH_AND_CHILDREN)
+ .onOverflow(whichFiles)
.on(e -> {
if (e.calculateFullPath().equals(target)) {
seen.add(id);
@@ -324,13 +339,13 @@ void manyRegisterAndUnregisterSameTime() throws InterruptedException, IOExceptio
finally {
stopAll.release(amountOfWatchersActive);
}
-
}
- @Test
+ @ParameterizedTest
+ @EnumSource(names = { "ALL", "DIFF" })
//Deletes can race the filesystem, so you might miss a few files in a dir, if that dir is already deleted
@EnabledIfEnvironmentVariable(named="TORTURE_DELETE", matches="true")
- void pressureOnFSShouldNotMissDeletes() throws InterruptedException, IOException {
+ void pressureOnFSShouldNotMissDeletes(Approximation whichFiles) throws InterruptedException, IOException {
final var root = testDir.getTestDirectory();
var pool = Executors.newCachedThreadPool();
@@ -346,6 +361,7 @@ void pressureOnFSShouldNotMissDeletes() throws InterruptedException, IOException
final var happened = new Semaphore(0);
var watchConfig = Watcher.watch(testDir.getTestDirectory(), WatchScope.PATH_AND_ALL_DESCENDANTS)
.withExecutor(pool)
+ .onOverflow(whichFiles)
.on(ev -> {
events.getAndIncrement();
happened.release();
@@ -389,8 +405,6 @@ void pressureOnFSShouldNotMissDeletes() throws InterruptedException, IOException
}
}
-
-
private void waitForStable(final AtomicInteger events, final Semaphore happened) throws InterruptedException {
int lastEventCount = events.get();
int stableCount = 0;
diff --git a/src/test/java/engineering/swat/watch/impl/EventHandlingWatchTests.java b/src/test/java/engineering/swat/watch/impl/EventHandlingWatchTests.java
new file mode 100644
index 00000000..c1ccc3d9
--- /dev/null
+++ b/src/test/java/engineering/swat/watch/impl/EventHandlingWatchTests.java
@@ -0,0 +1,73 @@
+/*
+ * BSD 2-Clause License
+ *
+ * Copyright (c) 2023, Swat.engineering
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+ * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+ * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+ * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+ * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+package engineering.swat.watch.impl;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.IOException;
+import java.nio.file.Path;
+
+import org.junit.jupiter.api.Test;
+
+import engineering.swat.watch.WatchEvent;
+import engineering.swat.watch.WatchScope;
+
+class EventHandlingWatchTests {
+
+ private static EventHandlingWatch emptyFileWatch(Path path) {
+ return new EventHandlingWatch() {
+ @Override
+ public void handleEvent(WatchEvent event) {
+ // Nothing to handle
+ }
+
+ @Override
+ public void close() throws IOException {
+ // Nothing to close
+ }
+
+ @Override
+ public WatchScope getScope() {
+ return WatchScope.PATH_ONLY;
+ }
+
+ @Override
+ public Path getPath() {
+ return path;
+ }
+ };
+ }
+
+ @Test
+ void relativizeTest() {
+ var e1 = new WatchEvent(WatchEvent.Kind.OVERFLOW, Path.of("foo"), Path.of("bar", "baz.txt"));
+ var e2 = new WatchEvent(WatchEvent.Kind.OVERFLOW, Path.of("foo", "bar", "baz.txt"));
+ var e3 = emptyFileWatch(Path.of("foo")).relativize(e2);
+ assertEquals(e1.getRootPath(), e3.getRootPath());
+ assertEquals(e1.getRelativePath(), e3.getRelativePath());
+ }
+}
diff --git a/src/test/java/engineering/swat/watch/impl/overflows/IndexingRescannerTests.java b/src/test/java/engineering/swat/watch/impl/overflows/IndexingRescannerTests.java
new file mode 100644
index 00000000..0915d5ce
--- /dev/null
+++ b/src/test/java/engineering/swat/watch/impl/overflows/IndexingRescannerTests.java
@@ -0,0 +1,105 @@
+/*
+ * BSD 2-Clause License
+ *
+ * Copyright (c) 2023, Swat.engineering
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+ * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+ * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+ * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+ * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+package engineering.swat.watch.impl.overflows;
+
+import static org.awaitility.Awaitility.await;
+
+import java.io.IOException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import engineering.swat.watch.Approximation;
+import engineering.swat.watch.TestDirectory;
+import engineering.swat.watch.TestHelper;
+import engineering.swat.watch.WatchEvent;
+import engineering.swat.watch.WatchScope;
+import engineering.swat.watch.Watcher;
+import engineering.swat.watch.impl.EventHandlingWatch;
+
+class IndexingRescannerTests {
+
+ private TestDirectory testDir;
+
+ @BeforeEach
+ void setup() throws IOException {
+ testDir = new TestDirectory();
+ }
+
+ @AfterEach
+ void cleanup() {
+ if (testDir != null) {
+ testDir.close();
+ }
+ }
+
+ @BeforeAll
+ static void setupEverything() {
+ Awaitility.setDefaultTimeout(TestHelper.NORMAL_WAIT);
+ }
+
+ @Test
+ void onlyEventsForFilesInScopeAreIssued() throws IOException, InterruptedException {
+ var path = testDir.getTestDirectory();
+
+ // Configure a non-recursive directory watch that monitors only the
+ // children (not all descendants) of `path`
+ var eventsOnlyForChildren = new AtomicBoolean(true);
+ var watchConfig = Watcher.watch(path, WatchScope.PATH_AND_CHILDREN)
+ .onOverflow(Approximation.NONE) // Disable the auto-handler here; we'll have an explicit one below
+ .on(e -> {
+ if (e.getRelativePath().getNameCount() > 1) {
+ eventsOnlyForChildren.set(false);
+ }
+ });
+
+ try (var watch = (EventHandlingWatch) watchConfig.start()) {
+ // Create a rescanner that initially indexes all descendants (not
+ // only the children) of `path`. The resulting initial index is an
+ // overestimation of the files monitored by the watch.
+ var rescanner = new IndexingRescanner(
+ ForkJoinPool.commonPool(), path,
+ WatchScope.PATH_AND_ALL_DESCENDANTS);
+
+ // Trigger a rescan. Because only the children (not all descendants)
+ // of `path` are watched, the rescan should issue events only for
+ // those children (even though the initial index contains entries
+ // for all descendants).
+ var overflow = new WatchEvent(WatchEvent.Kind.OVERFLOW, path);
+ rescanner.accept(watch, overflow);
+ Thread.sleep(TestHelper.SHORT_WAIT.toMillis());
+
+ await("No events for non-children descendants should have been issued")
+ .until(eventsOnlyForChildren::get);
+ }
+ }
+}