Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
d51d88f
Update torture test to ignore overflows (they're auto-handled)
sungshik Mar 10, 2025
671d5eb
Add `JDKFileTreeWatch`
sungshik Mar 10, 2025
62f4b1c
Move `updateChildWatches` to inner class
sungshik Mar 10, 2025
36c4299
Merge branch 'improved-overflow-support-main' into improved-overflow-…
sungshik Mar 11, 2025
47b8a10
Use `computeIfAbsent` instead of `putIfAbsent`
sungshik Mar 11, 2025
500e238
Update `JDKRecursiveDirectoryWatch`
sungshik Mar 11, 2025
6a328bf
Add import
sungshik Mar 11, 2025
4aed091
Improve code quality of `JDKFileTreeWatch`
sungshik Mar 12, 2025
71ac833
Improve code quality of `JDKFileTreeWatch`
sungshik Mar 12, 2025
fa65b30
Add mechanism to avoid relativization in `JDKFileTreeWatch`
sungshik Mar 12, 2025
fdd24f8
Simplify relativization of paths in `JDKFileTreeWatch`
sungshik Mar 14, 2025
3bdafe6
Change order of closing internal/child watches in `JDKFileTreeWatch`
sungshik Mar 14, 2025
4582d23
Simplify relativization of paths in `JDKFileTreeWatch`
sungshik Mar 14, 2025
6a7df86
Use file names to store child watches (instead of full paths)
sungshik Mar 14, 2025
e676b5f
Use `JDKFileTreeWatch`
sungshik Mar 14, 2025
387e7c3
Add asynchronous bookkeeping of `CREATED` and `OVERFLOW` events
sungshik Mar 14, 2025
748e8ac
Fix issue that `JDKFileTreeWatch` relied on overflow handling to pres…
sungshik Mar 18, 2025
4a1423b
Add license
sungshik Mar 18, 2025
1ab8f29
Make the child watches updater asynchronous
sungshik Mar 18, 2025
385db76
Add code to close child watches when their directories no longer exis…
sungshik Mar 18, 2025
58d9561
Remove `JDKRecursiveDirectoryWatch` (replaced by `JDKFileTreeWatch`)
sungshik Mar 25, 2025
c96c943
Add filtering mechanism to `Watcher` and `JDK...` classes
sungshik Mar 25, 2025
eca305b
Move method implementation from base class to subclass (was already o…
sungshik Mar 25, 2025
408c9d7
Improve logic to close `JDK...Watch` classes (avoid event handling o…
sungshik Mar 25, 2025
e0f039d
Fix a few relativization issues in `JDKFileTreeWatch` and `IndexingRe…
sungshik Mar 25, 2025
b8adb45
Add event filter to test
sungshik Mar 25, 2025
bbd1d39
Add test to check if overflows are recoverd from
sungshik Mar 25, 2025
02b10b7
Fix JavaDoc
sungshik Mar 25, 2025
76ec380
Remove old test
sungshik Mar 25, 2025
2cc3c66
Remove `trySleep` helpers
sungshik Mar 26, 2025
b760db9
Rename method to better convey intent
sungshik Mar 26, 2025
9b58bc4
Revert change to `relativize` in `JDKFileTreeWatch` (and add comment …
sungshik Mar 26, 2025
84b627b
Move closed check to `handleEvent`
sungshik Mar 26, 2025
e53569a
Add general `handleEvent` implementation back to the base watch
sungshik Mar 26, 2025
6feac60
Fix race in closing child watches
sungshik Mar 26, 2025
3f80e77
Merge branch 'improved-overflow-support-main' into improved-overflow-…
sungshik Mar 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions src/main/java/engineering/swat/watch/Watcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Predicate;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -42,7 +43,6 @@
import engineering.swat.watch.impl.jdk.JDKDirectoryWatch;
import engineering.swat.watch.impl.jdk.JDKFileTreeWatch;
import engineering.swat.watch.impl.jdk.JDKFileWatch;
import engineering.swat.watch.impl.jdk.JDKRecursiveDirectoryWatch;
import engineering.swat.watch.impl.overflows.IndexingRescanner;
import engineering.swat.watch.impl.overflows.MemorylessRescanner;

Expand All @@ -62,6 +62,8 @@

private static final BiConsumer<EventHandlingWatch, WatchEvent> EMPTY_HANDLER = (w, e) -> {};
private volatile BiConsumer<EventHandlingWatch, WatchEvent> eventHandler = EMPTY_HANDLER;
private static final Predicate<WatchEvent> TRUE_FILTER = e -> true;
private volatile Predicate<WatchEvent> eventFilter = TRUE_FILTER;

private Watcher(Path path, WatchScope scope) {
this.path = path;
Expand Down Expand Up @@ -139,6 +141,22 @@
return this;
}

/**
* Configures the event filter to determine which events should be passed to
* the event handler. By default (without calling this method), all events
* are passed. This method must be called at most once.
* @param predicate The predicate to determine an event should be kept
* (`true`) or dropped (`false`)
* @return {@code this} (to support method chaining)
*/
Watcher filter(Predicate<WatchEvent> predicate) {
if (this.eventFilter != TRUE_FILTER) {
throw new IllegalArgumentException("filter cannot be set more than once");

Check warning on line 154 in src/main/java/engineering/swat/watch/Watcher.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/engineering/swat/watch/Watcher.java#L154

Added line #L154 was not covered by tests
}
this.eventFilter = predicate;
return this;
}

/**
* Optionally configure the executor in which the {@link #on(Consumer)} callbacks are scheduled.
* If not defined, every task will be scheduled on the {@link java.util.concurrent.ForkJoinPool#commonPool()}.
Expand Down Expand Up @@ -181,27 +199,26 @@

switch (scope) {
case PATH_AND_CHILDREN: {
var result = new JDKDirectoryWatch(path, executor, h);
var result = new JDKDirectoryWatch(path, executor, h, eventFilter);
result.open();
return result;
}
case PATH_AND_ALL_DESCENDANTS: {
try {
var result = new JDKDirectoryWatch(path, executor, h, true);
var result = new JDKDirectoryWatch(path, executor, h, eventFilter, true);
result.open();
return result;
} catch (Throwable ex) {
// no native support, use the simulation
logger.debug("Not possible to register the native watcher, using fallback for {}", path);
logger.trace(ex);
// var result = new JDKRecursiveDirectoryWatch(path, executor, h);
var result = new JDKFileTreeWatch(path, executor, h);
var result = new JDKFileTreeWatch(path, executor, h, eventFilter);
result.open();
return result;
}
}
case PATH_ONLY: {
var result = new JDKFileWatch(path, executor, h);
var result = new JDKFileWatch(path, executor, h, eventFilter);
result.open();
return result;
}
Expand Down
13 changes: 7 additions & 6 deletions src/main/java/engineering/swat/watch/impl/jdk/JDKBaseWatch.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Predicate;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -46,12 +47,17 @@ public abstract class JDKBaseWatch implements EventHandlingWatch {
protected final Path path;
protected final Executor exec;
protected final BiConsumer<EventHandlingWatch, WatchEvent> eventHandler;
protected final Predicate<WatchEvent> eventFilter;
protected final AtomicBoolean started = new AtomicBoolean();

protected JDKBaseWatch(Path path, Executor exec, BiConsumer<EventHandlingWatch, WatchEvent> eventHandler) {
protected JDKBaseWatch(Path path, Executor exec,
BiConsumer<EventHandlingWatch, WatchEvent> eventHandler,
Predicate<WatchEvent> eventFilter) {

this.path = path;
this.exec = exec;
this.eventHandler = eventHandler;
this.eventFilter = eventFilter;
}

public void open() throws IOException {
Expand Down Expand Up @@ -118,11 +124,6 @@ protected WatchEvent.Kind translate(java.nio.file.WatchEvent.Kind<?> jdkKind) {

// -- EventHandlingWatch --

@Override
public void handleEvent(WatchEvent e) {
eventHandler.accept(this, e);
}

@Override
public Path getPath() {
return path;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.List;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import java.util.function.Predicate;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -47,27 +48,36 @@
private final Logger logger = LogManager.getLogger();
private final boolean nativeRecursive;
private volatile @MonotonicNonNull Closeable bundledJDKWatcher;
private volatile boolean closed = false;

private static final BundledSubscription<SubscriptionKey, List<java.nio.file.WatchEvent<?>>>
BUNDLED_JDK_WATCHERS = new BundledSubscription<>(JDKPoller::register);

public JDKDirectoryWatch(Path directory, Executor exec, BiConsumer<EventHandlingWatch, WatchEvent> eventHandler) {
this(directory, exec, eventHandler, false);
public JDKDirectoryWatch(Path directory, Executor exec,
BiConsumer<EventHandlingWatch, WatchEvent> eventHandler,
Predicate<WatchEvent> eventFilter) {

this(directory, exec, eventHandler, eventFilter, false);
}

public JDKDirectoryWatch(Path directory, Executor exec, BiConsumer<EventHandlingWatch, WatchEvent> eventHandler, boolean nativeRecursive) {
super(directory, exec, eventHandler);
public JDKDirectoryWatch(Path directory, Executor exec,
BiConsumer<EventHandlingWatch, WatchEvent> eventHandler,
Predicate<WatchEvent> eventFilter, boolean nativeRecursive) {

super(directory, exec, eventHandler, eventFilter);
this.nativeRecursive = nativeRecursive;
}

private void handleJDKEvents(List<java.nio.file.WatchEvent<?>> events) {
exec.execute(() -> {
for (var ev : events) {
try {
handleEvent(translate(ev));
}
catch (Throwable ignored) {
logger.error("Ignoring downstream exception:", ignored);
if (!closed) {
for (var ev : events) {
try {
handleEvent(translate(ev));
}
catch (Throwable ignored) {
logger.error("Ignoring downstream exception:", ignored);

Check warning on line 79 in src/main/java/engineering/swat/watch/impl/jdk/JDKDirectoryWatch.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/engineering/swat/watch/impl/jdk/JDKDirectoryWatch.java#L78-L79

Added lines #L78 - L79 were not covered by tests
}
}
}
});
Expand All @@ -80,10 +90,18 @@
return nativeRecursive ? WatchScope.PATH_AND_ALL_DESCENDANTS : WatchScope.PATH_AND_CHILDREN;
}

@Override
public void handleEvent(WatchEvent e) {
if (eventFilter.test(e)) {
eventHandler.accept(this, e);
}
}

@Override
public synchronized void close() throws IOException {
if (bundledJDKWatcher != null) {
if (!closed && bundledJDKWatcher != null) {
logger.trace("Closing watch for: {}", this.path);
closed = true;
bundledJDKWatcher.close();
}
}
Expand Down
78 changes: 44 additions & 34 deletions src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
*/
package engineering.swat.watch.impl.jdk;

import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
Expand All @@ -36,9 +37,11 @@
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.checkerframework.checker.nullness.qual.Nullable;

import engineering.swat.watch.WatchEvent;
import engineering.swat.watch.WatchScope;
Expand All @@ -52,27 +55,39 @@
private final JDKBaseWatch internal;

public JDKFileTreeWatch(Path fullPath, Executor exec,
BiConsumer<EventHandlingWatch, WatchEvent> eventHandler) {
this(fullPath, Path.of(""), exec, eventHandler);
BiConsumer<EventHandlingWatch, WatchEvent> eventHandler,
Predicate<WatchEvent> eventFilter) {

this(fullPath, Path.of(""), exec, eventHandler, eventFilter);
}

public JDKFileTreeWatch(Path rootPath, Path relativePathParent, Executor exec,
BiConsumer<EventHandlingWatch, WatchEvent> eventHandler) {
BiConsumer<EventHandlingWatch, WatchEvent> eventHandler,
Predicate<WatchEvent> eventFilter) {

super(rootPath.resolve(relativePathParent), exec, eventHandler);
super(rootPath.resolve(relativePathParent), exec, eventHandler, eventFilter);
this.rootPath = rootPath;
this.relativePathParent = relativePathParent;

var internalEventHandler = eventHandler.andThen(new AsyncChildWatchesUpdater());
this.internal = new JDKDirectoryWatch(path, exec, internalEventHandler) {
this.internal = new JDKDirectoryWatch(path, exec, internalEventHandler, eventFilter) {

// Override to ensure that this watch relativizes events wrt
// `rootPath` (instead of `path`, as is the default behavior)
@Override
public WatchEvent relativize(WatchEvent event) {
var fileName = event.getFileName();
return new WatchEvent(event.getKind(), rootPath,
fileName == null ? relativePathParent : relativePathParent.resolve(fileName));
var relativePath = relativePathParent;

// Append a file name to `relativePath` if it exists
var fullPath = event.calculateFullPath();
if (!fullPath.equals(path)) {
var fileName = fullPath.getFileName();
if (fileName != null) {
relativePath = relativePath.resolve(fileName);
}
}

return new WatchEvent(event.getKind(), rootPath, relativePath);
}

// Override to ensure that this watch translates JDK events using
Expand Down Expand Up @@ -125,7 +140,7 @@
if (child != null) {
consumer.accept(child);
} else {
logger.error("Could not get file name of event: {}", event);

Check warning on line 143 in src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java#L143

Added line #L143 was not covered by tests
}
}

Expand Down Expand Up @@ -167,28 +182,28 @@
toBeClosed.remove(child);
openChildWatch(child);
} else {
logger.error("File tree watch (for: {}) could not open a child watch for: {}", path, p);

Check warning on line 185 in src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java#L185

Added line #L185 was not covered by tests
}
});
} catch (IOException e) {
logger.error("File tree watch (for: {}) could not iterate over its children ({})", path, e);

Check warning on line 189 in src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java#L188-L189

Added lines #L188 - L189 were not covered by tests
}

for (var child : toBeClosed) {
tryCloseChildWatch(child);
}

Check warning on line 194 in src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java#L193-L194

Added lines #L193 - L194 were not covered by tests
}

private JDKFileTreeWatch openChildWatch(Path child) {
assert !child.isAbsolute();

Function<Path, JDKFileTreeWatch> newChildWatch = p -> new JDKFileTreeWatch(
rootPath, relativePathParent.resolve(child), exec, eventHandler);
rootPath, relativePathParent.resolve(child), exec, eventHandler, eventFilter);
var childWatch = childWatches.computeIfAbsent(child, newChildWatch);
try {
childWatch.startIfFirstTime();
} catch (IOException e) {
logger.error("Could not open (nested) file tree watch for: {} ({})", child, e);

Check warning on line 206 in src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java#L205-L206

Added lines #L205 - L206 were not covered by tests
}
return childWatch;
}
Expand All @@ -196,8 +211,8 @@
private void tryCloseChildWatch(Path child) {
try {
closeChildWatch(child);
} catch (IOException e) {
logger.error("Could not close (nested) file tree watch for: {} ({})", path.resolve(child), e);

Check warning on line 215 in src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java#L214-L215

Added lines #L214 - L215 were not covered by tests
}
}

Expand All @@ -206,15 +221,28 @@

var childWatch = childWatches.remove(child);
if (childWatch != null) {
childWatch.close();

Check warning on line 224 in src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java#L224

Added line #L224 was not covered by tests
}
}

private @Nullable IOException tryClose(Closeable c) {
try {
c.close();
return null;
} catch (IOException ex) {
logger.error("Could not close watch", ex);
return ex;
} catch (Exception ex) {
logger.error("Could not close watch", ex);
return new IOException("Unexpected exception when closing", ex);

Check warning on line 237 in src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java#L232-L237

Added lines #L232 - L237 were not covered by tests
}
}

// -- JDKBaseWatch --

@Override
public WatchScope getScope() {
return WatchScope.PATH_AND_ALL_DESCENDANTS;

Check warning on line 245 in src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java#L245

Added line #L245 was not covered by tests
}

@Override
Expand All @@ -224,37 +252,19 @@

@Override
public synchronized void close() throws IOException {
IOException firstFail = null;

var internalOpen = true;
var children = childWatches.keySet().iterator();
do {
try {
// First, close the internal watch to prevent new child watches
// from being opened concurrently while this method is running.
if (internalOpen) {
internal.close();
internalOpen = false;
}
// Next, close all child watches
else {
closeChildWatch(children.next());
}
} catch (IOException ex) {
logger.error("Could not close watch", ex);
firstFail = firstFail == null ? ex : firstFail;
} catch (Exception ex) {
logger.error("Could not close watch", ex);
firstFail = firstFail == null ? new IOException("Unexpected exception when closing", ex) : firstFail;
var firstFail = tryClose(internal);
for (var c : childWatches.values()) {
var currentFail = tryClose(c);
if (currentFail != null && firstFail == null) {
firstFail = currentFail;

Check warning on line 259 in src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java#L259

Added line #L259 was not covered by tests
}
} while (children.hasNext());

}
if (firstFail != null) {
throw firstFail;

Check warning on line 263 in src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java#L263

Added line #L263 was not covered by tests
}
}

@Override
@Override
protected synchronized void start() throws IOException {
internal.open();
openAndCloseChildWatches();
Expand Down
10 changes: 7 additions & 3 deletions src/main/java/engineering/swat/watch/impl/jdk/JDKFileWatch.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.nio.file.Path;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import java.util.function.Predicate;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -48,8 +49,11 @@ public class JDKFileWatch extends JDKBaseWatch {
private final Logger logger = LogManager.getLogger();
private final JDKBaseWatch internal;

public JDKFileWatch(Path file, Executor exec, BiConsumer<EventHandlingWatch, WatchEvent> eventHandler) {
super(file, exec, eventHandler);
public JDKFileWatch(Path file, Executor exec,
BiConsumer<EventHandlingWatch, WatchEvent> eventHandler,
Predicate<WatchEvent> eventFilter) {

super(file, exec, eventHandler, eventFilter);

var message = "The root path is not a valid path for a file watch";
var parent = requireNonNull(file.getParent(), message);
Expand All @@ -64,7 +68,7 @@ public JDKFileWatch(Path file, Executor exec, BiConsumer<EventHandlingWatch, Wat
if (fileName.equals(e.getRelativePath())) {
eventHandler.accept(w, e);
}
});
}, eventFilter);

logger.debug("File watch (for: {}) is in reality a directory watch (for: {}) with a filter (for: {})", file, parent, fileName);
}
Expand Down
Loading
Loading