Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
89 commits
Select commit Hold shift + click to select a range
17f0cf7
Started initial implementation
DavyLandman Nov 20, 2023
bb6c56f
First working test of directory watcher
DavyLandman Dec 25, 2023
860ac75
Improved tests by removing sleeps
DavyLandman Dec 25, 2023
d3a47b4
Implemented initial recursive support
DavyLandman Dec 26, 2023
e6fc78a
Simplified recursive watcher
DavyLandman Dec 27, 2023
4313b3e
Simplified JDKPoller singleton and avoided extra thread
DavyLandman Dec 27, 2023
82615a3
Added recursive tests
DavyLandman Feb 24, 2024
a0ea7db
Implemented virtual events for new nested directories and files
DavyLandman Feb 24, 2024
a12b50e
Got checker framework to properly work
DavyLandman Feb 24, 2024
d58eff4
Added initial support for overflow event
DavyLandman Feb 25, 2024
4f22795
Rewrote tests to start with a fresh directory
DavyLandman Feb 26, 2024
4b3c167
Added full event support to the watcher interface
DavyLandman Feb 26, 2024
2194412
Making sure not to leak watches
DavyLandman Feb 27, 2024
515a56a
Added overflow support for the recursive watcher
DavyLandman Mar 17, 2024
6812d78
Refactoring the functions to make them smaller
DavyLandman Mar 17, 2024
aff2b72
Rewrote recursive watcher to report to relative root
DavyLandman Mar 30, 2024
829e506
Execute syntatic events in the correct order
DavyLandman Mar 31, 2024
c0a7f3f
Added torture tests to make sure everything works, even in very busy IO
DavyLandman Jul 19, 2024
0858ca2
Cleanup of code
DavyLandman Jul 19, 2024
46acd38
Bundle registered watches to avoid duplicate FS watches
DavyLandman Sep 5, 2024
de3ea33
Implemented single file support
DavyLandman Sep 5, 2024
939e57b
Improve how long we wait for failure condition
DavyLandman Sep 7, 2024
03ab088
Made the poller loop a bit more resistant to misbehaving callbacks
DavyLandman Sep 12, 2024
54d4d88
Added test for delete behavior
DavyLandman Sep 13, 2024
8cc2c39
Wrote javadocs
DavyLandman Sep 13, 2024
dd18b9b
Improved handling of deletes
DavyLandman Sep 13, 2024
6a41f57
[ci] starting with ci
DavyLandman Sep 13, 2024
1d4ced3
[ci] also run checker-framework
DavyLandman Sep 13, 2024
0ccca4c
Fixed broken tests
DavyLandman Sep 13, 2024
800d1b0
[ci] also test under different jdks
DavyLandman Sep 14, 2024
9cf1997
[ci] make sure deletes are also torture tested
DavyLandman Sep 14, 2024
e028181
Fixed null errors found by CF
DavyLandman Sep 14, 2024
174f4d6
Slight tweak to the documentation
DavyLandman Sep 14, 2024
fde2d6e
Improving the torture test to wait a bit long before events have stab…
DavyLandman Sep 14, 2024
2bf0c75
Cleanup of test api usage
DavyLandman Sep 14, 2024
66d588f
Trying to make the test run better on ci
DavyLandman Sep 14, 2024
bc777ef
Fixing broken test
DavyLandman Sep 16, 2024
c8d4fd5
Trying to make a better stabilizing torture test detection
DavyLandman Sep 16, 2024
524d690
Change smoke test to wait for appropriate time
DavyLandman Sep 16, 2024
d8170f8
Trying to make give the tests a bit more time
DavyLandman Sep 16, 2024
b26a1a3
Improved the tests by splitting up the 2 torture tests
DavyLandman Sep 16, 2024
6698332
Lets make sure sync events are generated more correctly
DavyLandman Sep 16, 2024
3157d62
Better print for torture test
DavyLandman Sep 16, 2024
44ed90c
Trying to unbreak the test suite
DavyLandman Sep 16, 2024
51a6930
Only start watch after the directory has been processed, to avoid a r…
DavyLandman Sep 16, 2024
c9cee56
Disable delete test, which will mostly fail, as it can be a race with…
DavyLandman Sep 16, 2024
424ad48
Fixed race on new directory watches in recursive watch in a better wa…
DavyLandman Sep 16, 2024
dae3f53
Trying to get the test to be faster
DavyLandman Sep 16, 2024
f282d49
Adding test to see if on linux the race breaks it
DavyLandman Sep 16, 2024
a5a76d6
Checking that we log what is thrown
DavyLandman Sep 16, 2024
6e9ca1b
Trying to give windows a bit more time to work through the tests
DavyLandman Sep 16, 2024
1666cf3
Trying to break the deadlock on linux
DavyLandman Sep 16, 2024
f97d8e0
Rewrote the register path to reduce the time in a limited thread spacve
DavyLandman Sep 17, 2024
7b0bfbc
Added a catchup-loop to make sure we're not missing events in the rec…
DavyLandman Sep 17, 2024
2090e35
Removed extra set that was just a premature optimization
DavyLandman Sep 17, 2024
51cfb9e
Trying to really wait for all events to have stabilized
DavyLandman Sep 18, 2024
5ca0fe3
Longer wait for stabilization
DavyLandman Sep 18, 2024
1fbc69b
Running sync in a background thread to get initial events faster
DavyLandman Sep 18, 2024
49da0a9
Trying my best to stabilize the torture tests
DavyLandman Sep 18, 2024
bfe3d74
Using await to just wait for the condition we care about
DavyLandman Sep 18, 2024
38f9b9d
Increased timeouts on windows
DavyLandman Sep 18, 2024
eb388e8
Increased timeouts on windows
DavyLandman Sep 18, 2024
14c501d
Increased timeouts on windows
DavyLandman Sep 18, 2024
bcc19a6
Moved towards native file watch support for windows and rewrote the A…
DavyLandman Sep 24, 2024
0d5a969
Nullable fix
DavyLandman Sep 24, 2024
6eca5ff
Increase the pressure a bit for the torture test
DavyLandman Sep 24, 2024
3a106ff
Do not print the exception everytime
DavyLandman Sep 24, 2024
e4f87a9
Applied all the comments from @sungshik
DavyLandman Sep 25, 2024
538b045
Added a bit more comment
DavyLandman Sep 25, 2024
39a8b74
Added registration test
DavyLandman Oct 1, 2024
e1c7186
Extra test around registration
DavyLandman Oct 1, 2024
096a6f9
Renamed enums and processes review comments
DavyLandman Oct 14, 2024
de22a03
Working on the fix for the race Sung found
DavyLandman Oct 14, 2024
7006548
Removed races around registering and unregistering by adding a big ol…
DavyLandman Oct 14, 2024
80628ab
Tweaked the test a bit
DavyLandman Oct 14, 2024
d6b1f28
Improved torture tests
DavyLandman Oct 14, 2024
4f8cf59
Fixed bug around fast registration and unregistration in JDKPoller
DavyLandman Oct 14, 2024
85efd25
Fixed test that would be triggered by events spread over different wo…
DavyLandman Oct 14, 2024
5bcf1ce
Remove JDK watch only after a delay, just to avoid hammering the regi…
DavyLandman Oct 14, 2024
ac9c73e
Extended readme with example and description
DavyLandman Oct 14, 2024
69870e2
Slight wording fix
DavyLandman Oct 14, 2024
9095927
Apply suggestions from code review
DavyLandman Oct 15, 2024
fb01d39
Improve documentation of `PATH_ONLY` watch scopes
sungshik Oct 18, 2024
23b239b
Refine test (deleteOfFileInDirectoryShouldBeVisible)
sungshik Oct 18, 2024
88170a2
Tweaked documentation a bit
DavyLandman Nov 11, 2024
87c63e6
Moved JDK classes to its own namespace
DavyLandman Nov 11, 2024
d392f1c
Improved impl directory
DavyLandman Nov 11, 2024
982ce65
Make sure to have a dedicated interface, to allow for future addition…
DavyLandman Nov 11, 2024
d1b6e9e
Missed refactor of test
DavyLandman Nov 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/java/engineering/swat/watch/WatchEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public enum Kind {
DELETED,
/**
* Rare event where there were so many file events, that the kernel lost a few.
* In that case you'll have to consider the whole directory (and it's sub directories) as modified.
* In that case you'll have to consider the whole directory (and its sub directories) as modified.
* The library will try and send events for new and deleted files, but it won't be able to detect modified files.
*/
OVERFLOW
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/engineering/swat/watch/WatchScope.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package engineering.swat.watch;

public enum WatchScope {
SINGLE,
INCLUDING_CHILDREN,
INCLUDING_ALL_DESCENDANTS
}
107 changes: 46 additions & 61 deletions src/main/java/engineering/swat/watch/Watcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,71 +24,47 @@
*/
public class Watcher {
private final Logger logger = LogManager.getLogger();
private final WatcherKind kind;
private final WatchScope scope;
private final Path path;
private Executor executor = CompletableFuture::runAsync;
private volatile Executor executor = CompletableFuture::runAsync;

private static final Consumer<WatchEvent> NULL_HANDLER = p -> {};
private Consumer<WatchEvent> eventHandler = NULL_HANDLER;
private volatile Consumer<WatchEvent> eventHandler = NULL_HANDLER;


private Watcher(WatcherKind kind, Path path) {
this.kind = kind;
private Watcher(WatchScope scope, Path path) {
this.scope = scope;
this.path = path;
logger.info("Constructor logger for: {} at {} level", path, kind);
}

private enum WatcherKind {
FILE,
DIRECTORY,
RECURSIVE_DIRECTORY
logger.info("Constructor logger for: {} at {} level", path, scope);
}

/**
* Request a watcher for a single path (file or directory).
* If it's a file, depending on the platform this will watch the whole directory and filter the results, or only watch a single file.
* @param path a single path entry, either a file or a directory
* @return a watcher that only fires events related to the requested path
* @throws IOException in case the path is not absolute
* Watch a path for updates, optionally also get events for its children/descendants
* @param path which absolute path to monitor, can be a file or a directory, but has to be absolute
* @param scope for directories you can also choose to monitor it's direct children or all it's descendants
* @throws IllegalArgumentException in case a path is not supported (in relation to the scope)
*/
public static Watcher single(Path path) throws IOException {
public static Watcher watch(Path path, WatchScope scope) {
if (!path.isAbsolute()) {
throw new IOException("We can only watch absolute paths");
}
return new Watcher(WatcherKind.FILE, path);
}

/**
* Request a watcher for a directory, getting events for its direct children.
* @param path a directory to monitor for changes
* @return a watcher that fires events for any of the direct children (and its self).
* @throws IOException in cas the path is not absolute or it's not an directory
*/
public static Watcher singleDirectory(Path path) throws IOException {
if (!path.isAbsolute()) {
throw new IOException("We can only watch absolute paths");
}
if (!Files.isDirectory(path, LinkOption.NOFOLLOW_LINKS)) {
throw new IOException("Only directories are supported");
throw new IllegalArgumentException("We can only watch absolute paths");
}
return new Watcher(WatcherKind.DIRECTORY, path);
}
switch (scope) {
case INCLUDING_CHILDREN: // intended fallthrough
case INCLUDING_ALL_DESCENDANTS:
if (!Files.isDirectory(path, LinkOption.NOFOLLOW_LINKS)) {
throw new IllegalArgumentException("Only directories are supported for this scope");
}
break;
case SINGLE:
if (Files.isSymbolicLink(path)) {
throw new IllegalArgumentException("Symlinks are not supported");
}
break;
default:
throw new IllegalArgumentException("Unsupported scope: " + scope);

/**
* Request a watcher for a directory, getting events for all of its children. Even those added after the watch started.
* On some platforms, this can be quite expansive, so be sure you want this.
* @param path a directory to monitor for changes
* @return a watcher that fires events for any of its children (and its self).
* @throws IOException in case the path is not absolute or it's not an directory
*/
public static Watcher recursiveDirectory(Path path) throws IOException {
if (!path.isAbsolute()) {
throw new IOException("We can only watch absolute paths");
}
if (!Files.isDirectory(path, LinkOption.NOFOLLOW_LINKS)) {
throw new IOException("Only directories are supported");
}
return new Watcher(WatcherKind.RECURSIVE_DIRECTORY, path);
return new Watcher(scope, path);
}

/**
Expand Down Expand Up @@ -118,24 +94,33 @@ public Watcher withExecutor(Executor callbackHandler) {
* Start watch the path for events.
* @return a subscription for the watch, when closed, new events will stop being registered to the worker pool.
* @throws IOException in case the starting of the watcher caused an underlying IO exception
* @throws IllegalStateException the watchers is not configured correctly (for example, missing {@link #onEvent(Consumer)})
* @throws IllegalStateException the watchers is not configured correctly (for example, missing {@link #onEvent(Consumer)}, or a watcher is started twice)
*/
public Closeable start() throws IOException, IllegalStateException {
public Closeable start() throws IOException {
if (this.eventHandler == NULL_HANDLER) {
throw new IllegalStateException("There is no onEvent handler defined");
}
switch (kind) {
case DIRECTORY: {
var result = new JDKDirectoryWatcher(path, executor, this.eventHandler);
switch (scope) {
case INCLUDING_CHILDREN: {
var result = new JDKDirectoryWatcher(path, executor, this.eventHandler, false);
result.start();
return result;
}
case RECURSIVE_DIRECTORY: {
var result = new JDKRecursiveDirectoryWatcher(path, executor, this.eventHandler);
result.start();
return result;
case INCLUDING_ALL_DESCENDANTS: {
try {
var result = new JDKDirectoryWatcher(path, executor, this.eventHandler, true);
result.start();
return result;
} catch (Throwable ex) {
// no native support, use the simulation
logger.debug("Not possible to register the native watcher, using fallback for {}", path);
logger.trace(ex);
var result = new JDKRecursiveDirectoryWatcher(path, executor, this.eventHandler);
result.start();
return result;
}
}
case FILE: {
case SINGLE: {
var result = new JDKFileWatcher(path, executor, this.eventHandler);
result.start();
return result;
Expand Down
73 changes: 39 additions & 34 deletions src/main/java/engineering/swat/watch/impl/BundledSubscription.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,33 +11,35 @@
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.NonNull;

public class BundledSubscription<A extends @NonNull Object, R extends @NonNull Object> implements ISubscribable<A,R> {
private final ISubscribable<A, R> around;
private final ConcurrentMap<A, Subscription<R>> subscriptions = new ConcurrentHashMap<>();
/**
* This is an internal class where we can join multiple subscriptions to the same target by only taking 1 actual subscription but forwarding them to all the interested parties.
* This is used (for example) to avoid multiple JDKPoller registries for the same path
*/
public class BundledSubscription<Key extends @NonNull Object, Event extends @NonNull Object> implements ISubscribable<Key,Event> {
private final ISubscribable<Key, Event> wrapped;
private final ConcurrentMap<Key, Subscription<Event>> subscriptions = new ConcurrentHashMap<>();

public BundledSubscription(ISubscribable<A, R> around) {
this.around = around;
public BundledSubscription(ISubscribable<Key, Event> wrapped) {
this.wrapped = wrapped;

}

private static class Subscription<R> implements Consumer<R> {
private final List<Consumer<R>> consumers = new CopyOnWriteArrayList<>();
private volatile @MonotonicNonNull Closeable closer;
Subscription(Consumer<R> initialConsumer) {
consumers.add(initialConsumer);
private volatile @MonotonicNonNull Closeable toBeClosed;
Subscription() {
}

public void setCloser(Closeable closer) {
this.closer = closer;
public void setToBeClosed(Closeable closer) {
this.toBeClosed = closer;
}

void add(Consumer<R> newConsumer) {
public void add(Consumer<R> newConsumer) {
consumers.add(newConsumer);
}

synchronized boolean remove(Consumer<R> existingConsumer) {
public void remove(Consumer<R> existingConsumer) {
consumers.remove(existingConsumer);
return consumers.isEmpty();
}

@Override
Expand All @@ -55,38 +57,41 @@ boolean hasActiveConsumers() {
}

@Override
public Closeable subscribe(A target, Consumer<R> eventListener) throws IOException {
var active = this.subscriptions.get(target);
if (active == null) {
active = new Subscription<>(eventListener);
var newSubscriptions = around.subscribe(target, active);
active.setCloser(newSubscriptions);
var lostRace = this.subscriptions.putIfAbsent(target, active);
if (lostRace != null) {
try {
newSubscriptions.close();
} catch (IOException _ignore) {
// ignore
public Closeable subscribe(Key target, Consumer<Event> eventListener) throws IOException {
var active = this.subscriptions.computeIfAbsent(target, t -> new Subscription<>());
boolean first = false;
if (active.toBeClosed == null) {
// we just added a new one
// so lets take a lock on it, and try to be the one that gets to initialize it
synchronized(active) {
// now lock on it to make sure nobo
if (active.toBeClosed == null) {
first = true;
active.add(eventListener); // we know we already have the lock, and we need to do this before we register the watch
var newSubscriptions = wrapped.subscribe(target, active);
active.setToBeClosed(newSubscriptions);
}
else {
}
lostRace.add(eventListener);
active = lostRace;
}
}
else {
// at this point we have to be sure that we're not the first to in the list
// since we might have won the race on the compute, but lost the race
if (!first) {
active.add(eventListener);
}
var finalActive = active;
return () -> {
if (finalActive.remove(eventListener)) {
active.remove(eventListener);
if (!active.hasActiveConsumers()) {
subscriptions.remove(target);
if (finalActive.hasActiveConsumers()) {
if (active.hasActiveConsumers()) {
// we lost the race, someone else added something again
// so we put it back in the list
subscriptions.put(target, finalActive);
subscriptions.put(target, active);
}
else {
if (finalActive.closer != null) {
finalActive.closer.close();
if (active.toBeClosed != null) {
active.toBeClosed.close();
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/engineering/swat/watch/impl/ISubscribable.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@
import java.util.function.Consumer;

@FunctionalInterface
public interface ISubscribable<A, R> {
Closeable subscribe(A target, Consumer<R> eventListener) throws IOException;
public interface ISubscribable<Key, Event> {
Closeable subscribe(Key target, Consumer<Event> eventListener) throws IOException;
}
15 changes: 11 additions & 4 deletions src/main/java/engineering/swat/watch/impl/JDKDirectoryWatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,35 @@ public class JDKDirectoryWatcher implements Closeable {
private final Executor exec;
private final Consumer<WatchEvent> eventHandler;
private volatile @MonotonicNonNull Closeable activeWatch;
private final boolean nativeRecursive;

private static final BundledSubscription<Path, List<java.nio.file.WatchEvent<?>>>
private static final BundledSubscription<SubscriptionKey, List<java.nio.file.WatchEvent<?>>>
BUNDLED_JDK_WATCHERS = new BundledSubscription<>(JDKPoller::register);

public JDKDirectoryWatcher(Path directory, Executor exec, Consumer<WatchEvent> eventHandler) {
this(directory, exec, eventHandler, false);
}

public JDKDirectoryWatcher(Path directory, Executor exec, Consumer<WatchEvent> eventHandler, boolean nativeRecursive) {
this.directory = directory;
this.exec = exec;
this.eventHandler = eventHandler;
this.nativeRecursive = nativeRecursive;
}


synchronized boolean safeStart() throws IOException {
if (activeWatch != null) {
return false;
}
activeWatch = BUNDLED_JDK_WATCHERS.subscribe(directory, this::handleChanges);
activeWatch = BUNDLED_JDK_WATCHERS.subscribe(new SubscriptionKey(directory, nativeRecursive), this::handleChanges);
return true;
}

public void start() throws IOException {
try {
if (!safeStart()) {
throw new IOException("Cannot start a watcher twice");
throw new IllegalStateException("Cannot start a watcher twice");
}
logger.debug("Started watch for: {}", directory);
} catch (IOException e) {
Expand Down Expand Up @@ -86,7 +93,7 @@ else if (ev.kind() == StandardWatchEventKinds.OVERFLOW) {
}

@Override
public void close() throws IOException {
public synchronized void close() throws IOException {
if (activeWatch != null) {
logger.debug("Closing watch for: {}", this.directory);
activeWatch.close();
Expand Down
28 changes: 17 additions & 11 deletions src/main/java/engineering/swat/watch/impl/JDKFileWatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

/**
* It's not possible to monitor a single file (or directory), so we have to find a directory watcher, and connect to that
*
* Note that you should take care to call start only once.
*/
public class JDKFileWatcher implements Closeable {
private final Logger logger = LogManager.getLogger();
Expand All @@ -34,23 +36,27 @@ public JDKFileWatcher(Path file, Executor exec, Consumer<WatchEvent> eventHandle
this.eventHandler = eventHandler;
}

/**
* Start the file watcher, but only do it once
* @throws IOException
*/
public void start() throws IOException {
try {
var dir = file.getParent();
if (dir == null) {
throw new IllegalArgumentException("cannot watch a single entry that is on the root");

}
assert !dir.equals(file);
JDKDirectoryWatcher parentWatch;
synchronized(this) {
if (activeWatch != null) {
throw new IOException("Cannot start an already started watch");
}
var dir = file.getParent();
if (dir == null) {
throw new IllegalArgumentException("cannot watch a single entry that is on the root");

}
assert !dir.equals(file);
var parentWatch = new JDKDirectoryWatcher(dir, exec, this::filter);
activeWatch = parentWatch;
activeWatch = parentWatch = new JDKDirectoryWatcher(dir, exec, this::filter);
parentWatch.start();
logger.debug("Started file watch for {} (in reality a watch on {}): {}", file, dir, parentWatch);
}
logger.debug("Started file watch for {} (in reality a watch on {}): {}", file, dir, parentWatch);

} catch (IOException e) {
throw new IOException("Could not register file watcher for: " + file, e);
Expand All @@ -59,12 +65,12 @@ public void start() throws IOException {

private void filter(WatchEvent event) {
if (fileName.equals(event.getRelativePath())) {
exec.execute(() -> eventHandler.accept(event));
eventHandler.accept(event);
}
}

@Override
public void close() throws IOException {
public synchronized void close() throws IOException {
if (activeWatch != null) {
activeWatch.close();
}
Expand Down
Loading