Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
89 commits
Select commit Hold shift + click to select a range
17f0cf7
Started initial implementation
DavyLandman Nov 20, 2023
bb6c56f
First working test of directory watcher
DavyLandman Dec 25, 2023
860ac75
Improved tests by removing sleeps
DavyLandman Dec 25, 2023
d3a47b4
Implemented initial recursive support
DavyLandman Dec 26, 2023
e6fc78a
Simplified recursive watcher
DavyLandman Dec 27, 2023
4313b3e
Simplified JDKPoller singleton and avoided extra thread
DavyLandman Dec 27, 2023
82615a3
Added recursive tests
DavyLandman Feb 24, 2024
a0ea7db
Implemented virtual events for new nested directories and files
DavyLandman Feb 24, 2024
a12b50e
Got checker framework to properly work
DavyLandman Feb 24, 2024
d58eff4
Added initial support for overflow event
DavyLandman Feb 25, 2024
4f22795
Rewrote tests to start with a fresh directory
DavyLandman Feb 26, 2024
4b3c167
Added full event support to the watcher interface
DavyLandman Feb 26, 2024
2194412
Making sure not to leak watches
DavyLandman Feb 27, 2024
515a56a
Added overflow support for the recursive watcher
DavyLandman Mar 17, 2024
6812d78
Refactoring the functions to make them smaller
DavyLandman Mar 17, 2024
aff2b72
Rewrote recursive watcher to report to relative root
DavyLandman Mar 30, 2024
829e506
Execute syntatic events in the correct order
DavyLandman Mar 31, 2024
c0a7f3f
Added torture tests to make sure everything works, even in very busy IO
DavyLandman Jul 19, 2024
0858ca2
Cleanup of code
DavyLandman Jul 19, 2024
46acd38
Bundle registered watches to avoid duplicate FS watches
DavyLandman Sep 5, 2024
de3ea33
Implemented single file support
DavyLandman Sep 5, 2024
939e57b
Improve how long we wait for failure condition
DavyLandman Sep 7, 2024
03ab088
Made the poller loop a bit more resistant to misbehaving callbacks
DavyLandman Sep 12, 2024
54d4d88
Added test for delete behavior
DavyLandman Sep 13, 2024
8cc2c39
Wrote javadocs
DavyLandman Sep 13, 2024
dd18b9b
Improved handling of deletes
DavyLandman Sep 13, 2024
6a41f57
[ci] starting with ci
DavyLandman Sep 13, 2024
1d4ced3
[ci] also run checker-framework
DavyLandman Sep 13, 2024
0ccca4c
Fixed broken tests
DavyLandman Sep 13, 2024
800d1b0
[ci] also test under different jdks
DavyLandman Sep 14, 2024
9cf1997
[ci] make sure deletes are also torture tested
DavyLandman Sep 14, 2024
e028181
Fixed null errors found by CF
DavyLandman Sep 14, 2024
174f4d6
Slight tweak to the documentation
DavyLandman Sep 14, 2024
fde2d6e
Improving the torture test to wait a bit long before events have stab…
DavyLandman Sep 14, 2024
2bf0c75
Cleanup of test api usage
DavyLandman Sep 14, 2024
66d588f
Trying to make the test run better on ci
DavyLandman Sep 14, 2024
bc777ef
Fixing broken test
DavyLandman Sep 16, 2024
c8d4fd5
Trying to make a better stabilizing torture test detection
DavyLandman Sep 16, 2024
524d690
Change smoke test to wait for appropriate time
DavyLandman Sep 16, 2024
d8170f8
Trying to make give the tests a bit more time
DavyLandman Sep 16, 2024
b26a1a3
Improved the tests by splitting up the 2 torture tests
DavyLandman Sep 16, 2024
6698332
Lets make sure sync events are generated more correctly
DavyLandman Sep 16, 2024
3157d62
Better print for torture test
DavyLandman Sep 16, 2024
44ed90c
Trying to unbreak the test suite
DavyLandman Sep 16, 2024
51a6930
Only start watch after the directory has been processed, to avoid a r…
DavyLandman Sep 16, 2024
c9cee56
Disable delete test, which will mostly fail, as it can be a race with…
DavyLandman Sep 16, 2024
424ad48
Fixed race on new directory watches in recursive watch in a better wa…
DavyLandman Sep 16, 2024
dae3f53
Trying to get the test to be faster
DavyLandman Sep 16, 2024
f282d49
Adding test to see if on linux the race breaks it
DavyLandman Sep 16, 2024
a5a76d6
Checking that we log what is thrown
DavyLandman Sep 16, 2024
6e9ca1b
Trying to give windows a bit more time to work through the tests
DavyLandman Sep 16, 2024
1666cf3
Trying to break the deadlock on linux
DavyLandman Sep 16, 2024
f97d8e0
Rewrote the register path to reduce the time in a limited thread spacve
DavyLandman Sep 17, 2024
7b0bfbc
Added a catchup-loop to make sure we're not missing events in the rec…
DavyLandman Sep 17, 2024
2090e35
Removed extra set that was just a premature optimization
DavyLandman Sep 17, 2024
51cfb9e
Trying to really wait for all events to have stabilized
DavyLandman Sep 18, 2024
5ca0fe3
Longer wait for stabilization
DavyLandman Sep 18, 2024
1fbc69b
Running sync in a background thread to get initial events faster
DavyLandman Sep 18, 2024
49da0a9
Trying my best to stabilize the torture tests
DavyLandman Sep 18, 2024
bfe3d74
Using await to just wait for the condition we care about
DavyLandman Sep 18, 2024
38f9b9d
Increased timeouts on windows
DavyLandman Sep 18, 2024
eb388e8
Increased timeouts on windows
DavyLandman Sep 18, 2024
14c501d
Increased timeouts on windows
DavyLandman Sep 18, 2024
bcc19a6
Moved towards native file watch support for windows and rewrote the A…
DavyLandman Sep 24, 2024
0d5a969
Nullable fix
DavyLandman Sep 24, 2024
6eca5ff
Increase the pressure a bit for the torture test
DavyLandman Sep 24, 2024
3a106ff
Do not print the exception everytime
DavyLandman Sep 24, 2024
e4f87a9
Applied all the comments from @sungshik
DavyLandman Sep 25, 2024
538b045
Added a bit more comment
DavyLandman Sep 25, 2024
39a8b74
Added registration test
DavyLandman Oct 1, 2024
e1c7186
Extra test around registration
DavyLandman Oct 1, 2024
096a6f9
Renamed enums and processes review comments
DavyLandman Oct 14, 2024
de22a03
Working on the fix for the race Sung found
DavyLandman Oct 14, 2024
7006548
Removed races around registering and unregistering by adding a big ol…
DavyLandman Oct 14, 2024
80628ab
Tweaked the test a bit
DavyLandman Oct 14, 2024
d6b1f28
Improved torture tests
DavyLandman Oct 14, 2024
4f8cf59
Fixed bug around fast registration and unregistration in JDKPoller
DavyLandman Oct 14, 2024
85efd25
Fixed test that would be triggered by events spread over different wo…
DavyLandman Oct 14, 2024
5bcf1ce
Remove JDK watch only after a delay, just to avoid hammering the regi…
DavyLandman Oct 14, 2024
ac9c73e
Extended readme with example and description
DavyLandman Oct 14, 2024
69870e2
Slight wording fix
DavyLandman Oct 14, 2024
9095927
Apply suggestions from code review
DavyLandman Oct 15, 2024
fb01d39
Improve documentation of `PATH_ONLY` watch scopes
sungshik Oct 18, 2024
23b239b
Refine test (deleteOfFileInDirectoryShouldBeVisible)
sungshik Oct 18, 2024
88170a2
Tweaked documentation a bit
DavyLandman Nov 11, 2024
87c63e6
Moved JDK classes to its own namespace
DavyLandman Nov 11, 2024
d392f1c
Improved impl directory
DavyLandman Nov 11, 2024
982ce65
Make sure to have a dedicated interface, to allow for future addition…
DavyLandman Nov 11, 2024
d1b6e9e
Missed refactor of test
DavyLandman Nov 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 52 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,59 @@
# java-watch
a java file watcher that works across platforms and supports recursion and single file watches
a java file watcher that works across platforms and supports recursion, single file watches, and tries to make sure no events are missed.

## Features

Currently working features in java-watch:

- Recursive watches, even if platform doesn't support it natively.
- Recursive watches also work inside directories created after the watch started
- On overflow events no **new** directories (and it's recursive files) are missed, modification events will however not be simulated
- Single file watches
- Multiple watches for the same directory are merged to avoid overloading the kernel
- Events are process on a worker pool, which you can customize.

Future features:

- Avoid poll based watcher in macOS/OSX that only detects changes every 2 seconds
- Support file watches natively in linux
- Monitor only specific events (such as only CREATES)

## Usage

Import dependency in pom.xml:

```xml
<dependency>
<groupId>engineering.swat</groupId>
<artifactId>java-watch</artifactId>
<version>${java-watch-version}</version>
</dependency>
```

Start using java-watch:

```java
var directory = Path.of("tmp", "test-dir");
var watcherSetup = Watcher.watch(directory, WatchScope.PATH_AND_CHILDREN)
.withExecutor(Executors.newCachedThreadPool()) // optionally configure a custom thread pool
.onEvent(watchEvent -> {
System.err.println(watchEvent);
});

try(var active = watcherSetup.start()) {
System.out.println("Monitoring files, press any key to stop");
System.in.read();
}
// after active.close(), the watch is stopped and
// no new events will be scheduled on the threadpool
```

## Related work

Before starting this library, we wanted to use existing libraries, but they all lacked proper support for recursive file watches or lacked configurability. This library now has a growing collection of tests and a small API that should allow for future improvements without breaking compatibility.

The following section describes the related work research on the libraries and underlying limitations.

After reading the documentation of the following discussion on file system watches:

- [Paul Millr's nodejs chokidar](https://github.com/paulmillr/chokidar)
Expand Down
21 changes: 18 additions & 3 deletions src/main/java/engineering/swat/watch/WatchScope.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,22 @@
package engineering.swat.watch;

/**
* Configure the depth of the events you want to receive for a given path
*/
public enum WatchScope {
SINGLE,
INCLUDING_CHILDREN,
INCLUDING_ALL_DESCENDANTS
/**
* Watch changes to a single file or (metadata of) a directory
*/
PATH_ONLY,
/**
* Watch changes to (metadata of) a directory and its content,
* non-recursively. That is, changes to the content of nested directories
* are not watched.
*/
PATH_AND_CHILDREN,
/**
* Watch changes to (metadata of) a directory and its content, recursively.
* That is, changes to the content of nested directories are also watched.
*/
PATH_AND_ALL_DESCENDANTS
}
15 changes: 7 additions & 8 deletions src/main/java/engineering/swat/watch/Watcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ public class Watcher {
private Watcher(WatchScope scope, Path path) {
this.scope = scope;
this.path = path;
logger.info("Constructor logger for: {} at {} level", path, scope);
}

/**
Expand All @@ -49,13 +48,13 @@ public static Watcher watch(Path path, WatchScope scope) {
throw new IllegalArgumentException("We can only watch absolute paths");
}
switch (scope) {
case INCLUDING_CHILDREN: // intended fallthrough
case INCLUDING_ALL_DESCENDANTS:
case PATH_AND_CHILDREN: // intended fallthrough
case PATH_AND_ALL_DESCENDANTS:
if (!Files.isDirectory(path, LinkOption.NOFOLLOW_LINKS)) {
throw new IllegalArgumentException("Only directories are supported for this scope");
throw new IllegalArgumentException("Only directories are supported for this scope: " + scope);
}
break;
case SINGLE:
case PATH_ONLY:
if (Files.isSymbolicLink(path)) {
throw new IllegalArgumentException("Symlinks are not supported");
}
Expand Down Expand Up @@ -101,12 +100,12 @@ public Closeable start() throws IOException {
throw new IllegalStateException("There is no onEvent handler defined");
}
switch (scope) {
case INCLUDING_CHILDREN: {
case PATH_AND_CHILDREN: {
var result = new JDKDirectoryWatcher(path, executor, this.eventHandler, false);
result.start();
return result;
}
case INCLUDING_ALL_DESCENDANTS: {
case PATH_AND_ALL_DESCENDANTS: {
try {
var result = new JDKDirectoryWatcher(path, executor, this.eventHandler, true);
result.start();
Expand All @@ -120,7 +119,7 @@ public Closeable start() throws IOException {
return result;
}
}
case SINGLE: {
case PATH_ONLY: {
var result = new JDKFileWatcher(path, executor, this.eventHandler);
result.start();
return result;
Expand Down
91 changes: 50 additions & 41 deletions src/main/java/engineering/swat/watch/impl/BundledSubscription.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.NonNull;

Expand All @@ -16,6 +20,7 @@
* This is used (for example) to avoid multiple JDKPoller registries for the same path
*/
public class BundledSubscription<Key extends @NonNull Object, Event extends @NonNull Object> implements ISubscribable<Key,Event> {
private static final Logger logger = LogManager.getLogger();
private final ISubscribable<Key, Event> wrapped;
private final ConcurrentMap<Key, Subscription<Event>> subscriptions = new ConcurrentHashMap<>();

Expand All @@ -27,18 +32,15 @@ public BundledSubscription(ISubscribable<Key, Event> wrapped) {
private static class Subscription<R> implements Consumer<R> {
private final List<Consumer<R>> consumers = new CopyOnWriteArrayList<>();
private volatile @MonotonicNonNull Closeable toBeClosed;
private volatile boolean closed = false;
Subscription() {
}

public void setToBeClosed(Closeable closer) {
this.toBeClosed = closer;
}

public void add(Consumer<R> newConsumer) {
void add(Consumer<R> newConsumer) {
consumers.add(newConsumer);
}

public void remove(Consumer<R> existingConsumer) {
void remove(Consumer<R> existingConsumer) {
consumers.remove(existingConsumer);
}

Expand All @@ -52,51 +54,58 @@ public void accept(R t) {
boolean hasActiveConsumers() {
return !consumers.isEmpty();
}


}

@Override
public Closeable subscribe(Key target, Consumer<Event> eventListener) throws IOException {
var active = this.subscriptions.computeIfAbsent(target, t -> new Subscription<>());
boolean first = false;
if (active.toBeClosed == null) {
// we just added a new one
// so lets take a lock on it, and try to be the one that gets to initialize it
while (true) {
Subscription<Event> active = this.subscriptions.computeIfAbsent(target, t -> new Subscription<>());
// after this, there will only be 1 instance of active subscription in the map.
// but we might have a race with remove, which can close the subscript between our get and our addition
// since this code is very hard to get right without locks, and shouldn't be run too often
// we take a big lock around the subscription management
synchronized(active) {
// now lock on it to make sure nobo
if (active.toBeClosed == null) {
first = true;
active.add(eventListener); // we know we already have the lock, and we need to do this before we register the watch
var newSubscriptions = wrapped.subscribe(target, active);
active.setToBeClosed(newSubscriptions);
if (active.closed) {
// we lost the race with closing the subscription, so we retry
continue;
}
else {
active.add(eventListener);
if (active.toBeClosed == null) {
// the watch is not active yet, and we were the first to get the lock
active.toBeClosed = wrapped.subscribe(target, active);
}
}
}
// at this point we have to be sure that we're not the first to in the list
// since we might have won the race on the compute, but lost the race
if (!first) {
active.add(eventListener);
}
return () -> {
active.remove(eventListener);
if (!active.hasActiveConsumers()) {
subscriptions.remove(target);
if (active.hasActiveConsumers()) {
// we lost the race, someone else added something again
// so we put it back in the list
subscriptions.put(target, active);
return () -> {
boolean scheduleClose = false;
synchronized(active) {
active.remove(eventListener);
scheduleClose = !active.hasActiveConsumers() && !active.closed;
}
else {
if (active.toBeClosed != null) {
active.toBeClosed.close();
}
if (scheduleClose) {
// to avoid hammering the system with closes & registers in a short periode
// we schedule the cleanup of watches in the background, when even after a small delay
// nobody is interested in a certain file anymore
CompletableFuture
.delayedExecutor(100, TimeUnit.MILLISECONDS)
.execute(() -> {
synchronized(active) {
if (!active.hasActiveConsumers() && !active.closed) {
// still ready to be closed
active.closed = true;
this.subscriptions.remove(target, active);
if (active.toBeClosed != null) {
try {
active.toBeClosed.close();
} catch (IOException e) {
logger.error("Unhandled exception while closing the watcher for {} in the background", target, e);
}
}
}
}
});
}
}
};

};
}
}


Expand Down
8 changes: 5 additions & 3 deletions src/main/java/engineering/swat/watch/impl/JDKPoller.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
import static java.nio.file.StandardWatchEventKinds.OVERFLOW;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -92,7 +93,7 @@ public static Closeable register(SubscriptionKey path, Consumer<List<WatchEvent<
try {
return CompletableFuture.supplyAsync(() -> {
try {
WatchEvent.Kind<?>[] kinds = new WatchEvent.Kind[]{ ENTRY_CREATE, ENTRY_MODIFY, ENTRY_MODIFY, ENTRY_DELETE };
WatchEvent.Kind<?>[] kinds = new WatchEvent.Kind[]{ ENTRY_CREATE, ENTRY_MODIFY, OVERFLOW, ENTRY_DELETE };
if (path.isRecursive()) {
return path.getPath().register(service, kinds, ExtendedWatchEventModifier.FILE_TREE);
}
Expand All @@ -109,8 +110,9 @@ public static Closeable register(SubscriptionKey path, Consumer<List<WatchEvent<
@Override
public void close() throws IOException {
logger.debug("Closing watch for: {}", path);
key.cancel();
watchers.remove(key);
if (watchers.remove(key, changesHandler)) {
key.cancel();
}
}
};
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,9 @@ public boolean equals(@Nullable Object obj) {
public int hashCode() {
return Objects.hash(path, recursive);
}

@Override
public String toString() {
return path.toString() + (recursive ? "[recursive]" : "");
}
}
6 changes: 3 additions & 3 deletions src/test/java/engineering/swat/watch/DeleteLockTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ private void deleteAndVerify(Path target, WatchScope scope) throws IOException {
void watchedFileCanBeDeleted() throws IOException {
deleteAndVerify(
testDir.getTestFiles().get(0),
WatchScope.SINGLE
WatchScope.PATH_ONLY
);
}

Expand All @@ -70,7 +70,7 @@ void watchedFileCanBeDeleted() throws IOException {
void watchedDirectoryCanBeDeleted() throws IOException {
deleteAndVerify(
testDir.getTestDirectory(),
WatchScope.INCLUDING_CHILDREN
WatchScope.PATH_AND_CHILDREN
);
}

Expand All @@ -79,7 +79,7 @@ void watchedDirectoryCanBeDeleted() throws IOException {
void watchedRecursiveDirectoryCanBeDeleted() throws IOException {
deleteAndVerify(
testDir.getTestDirectory(),
WatchScope.INCLUDING_ALL_DESCENDANTS
WatchScope.PATH_AND_ALL_DESCENDANTS
);
}
}
6 changes: 3 additions & 3 deletions src/test/java/engineering/swat/watch/RecursiveWatchTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ void newDirectoryWithFilesChangesDetected() throws IOException {
var target = new AtomicReference<Path>();
var created = new AtomicBoolean(false);
var changed = new AtomicBoolean(false);
var watchConfig = Watcher.watch(testDir.getTestDirectory(), WatchScope.INCLUDING_ALL_DESCENDANTS)
var watchConfig = Watcher.watch(testDir.getTestDirectory(), WatchScope.PATH_AND_ALL_DESCENDANTS)
.onEvent(ev -> {
logger.debug("Event received: {}", ev);
if (ev.calculateFullPath().equals(target.get())) {
Expand Down Expand Up @@ -79,7 +79,7 @@ void newDirectoryWithFilesChangesDetected() throws IOException {
void correctRelativePathIsReported() throws IOException {
Path relative = Path.of("a","b", "c", "d.txt");
var seen = new AtomicBoolean(false);
var watcher = Watcher.watch(testDir.getTestDirectory(), WatchScope.INCLUDING_ALL_DESCENDANTS)
var watcher = Watcher.watch(testDir.getTestDirectory(), WatchScope.PATH_AND_ALL_DESCENDANTS)
.onEvent(ev -> {
logger.debug("Seen event: {}", ev);
if (ev.getRelativePath().equals(relative)) {
Expand All @@ -104,7 +104,7 @@ void deleteOfFileInDirectoryShouldBeVisible() throws IOException, InterruptedExc
.findAny()
.orElseThrow();
var seen = new AtomicBoolean(false);
var watchConfig = Watcher.watch(target.getParent(), WatchScope.INCLUDING_CHILDREN)
var watchConfig = Watcher.watch(target.getParent(), WatchScope.PATH_AND_CHILDREN)
.onEvent(ev -> {
if (ev.getKind() == Kind.DELETED && ev.calculateFullPath().equals(target)) {
seen.set(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ static void setupEverything() {
void deleteOfFileInDirectoryShouldBeVisible() throws IOException, InterruptedException {
var target = testDir.getTestFiles().get(0);
var seen = new AtomicBoolean(false);
var watchConfig = Watcher.watch(target.getParent(), WatchScope.INCLUDING_CHILDREN)
var watchConfig = Watcher.watch(target.getParent(), WatchScope.PATH_AND_CHILDREN)
.onEvent(ev -> {
if (ev.getKind() == Kind.DELETED && ev.calculateFullPath().equals(target)) {
seen.set(true);
Expand Down
4 changes: 2 additions & 2 deletions src/test/java/engineering/swat/watch/SingleFileTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ void singleFileShouldNotTriggerOnOtherFilesInSameDir() throws IOException, Inter
var target = testDir.getTestFiles().get(0);
var seen = new AtomicBoolean(false);
var others = new AtomicBoolean(false);
var watchConfig = Watcher.watch(target, WatchScope.SINGLE)
var watchConfig = Watcher.watch(target, WatchScope.PATH_ONLY)
.onEvent(ev -> {
if (ev.calculateFullPath().equals(target)) {
seen.set(true);
Expand Down Expand Up @@ -68,7 +68,7 @@ void singleFileThatMonitorsOnlyADirectory() throws IOException, InterruptedExcep
var target = testDir.getTestDirectory();
var seen = new AtomicBoolean(false);
var others = new AtomicBoolean(false);
var watchConfig = Watcher.watch(target, WatchScope.SINGLE)
var watchConfig = Watcher.watch(target, WatchScope.PATH_ONLY)
.onEvent(ev -> {
if (ev.calculateFullPath().equals(target)) {
seen.set(true);
Expand Down
Loading