diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java new file mode 100644 index 00000000..9aa575f7 --- /dev/null +++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java @@ -0,0 +1,132 @@ +package engineering.swat.watch.impl.jdk; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import engineering.swat.watch.WatchEvent; +import engineering.swat.watch.impl.EventHandlingWatch; + +public class JDKFileTreeWatch extends JDKBaseWatch { + private final Logger logger = LogManager.getLogger(); + private final Map childWatches = new ConcurrentHashMap<>(); + private final JDKBaseWatch internal; + + public JDKFileTreeWatch(Path root, Executor exec, + BiConsumer eventHandler) { + + super(root, exec, eventHandler); + var internalEventHandler = updateChildWatches().andThen(eventHandler); + this.internal = new JDKDirectoryWatch(root, exec, internalEventHandler); + } + + /** + * @return An event handler that updates the child watches according to the + * following rules: (a) when an overflow happens, it's propagated to each + * existing child watch; (b) when a subdirectory creation happens, a new + * child watch is opened for that subdirectory; (c) when a subdirectory + * deletion happens, an existing child watch is closed for that + * subdirectory. + */ + private BiConsumer updateChildWatches() { + return (watch, event) -> { + var kind = event.getKind(); + + if (kind == WatchEvent.Kind.OVERFLOW) { + forEachChild(this::reportOverflowToChildWatch); + return; + } + + var child = event.calculateFullPath(); + var directory = child.toFile().isDirectory(); + + if (kind == WatchEvent.Kind.CREATED && directory) { + openChildWatch(child); + // Events in the newly created directory (`child`) might have + // been missed between its creation (`event`) and setting up its + // watch. Erring on the side of caution, generate an overflow + // event for the watch. + reportOverflowToChildWatch(child); + } + + if (kind == WatchEvent.Kind.DELETED && directory) { + closeChildWatch(child); + } + }; + } + + private void openChildWatch(Path child) { + var childWatch = new JDKFileTreeWatch(child, exec, (w, e) -> + // Same as `eventHandler`, except each event is pre-processed such + // that the last segment of the root path becomes the first segment + // of the relative path. For instance, `foo/bar` (root path) and + // `baz.txt` (relative path) are pre-processed to `foo` (root path) + // and `bar/baz.txt` (relative path). This is to ensure the parent + // directory of a child directory is reported as the root directory + // of the event. + eventHandler.accept(w, relativize(e)) + ); + + if (childWatches.putIfAbsent(child, childWatch) == null) { + try { + childWatch.open(); + } catch (IOException e) { + logger.error("Could not open (nested) file tree watch for: {} ({})", child, e); + } + } + } + + private void closeChildWatch(Path child) { + var childWatch = childWatches.remove(child); + if (childWatch != null) { + try { + childWatch.close(); + } catch (IOException e) { + logger.error("Could not close (nested) file tree watch for: {} ({})", child, e); + } + } + } + + private void reportOverflowToChildWatch(Path child) { + var childWatch = childWatches.get(child); + if (childWatch != null) { + var overflow = new WatchEvent(WatchEvent.Kind.OVERFLOW, child); + childWatch.handleEvent(overflow); + } + } + + private void forEachChild(Consumer action) { + try (var children = Files.find(path, 1, (p, attrs) -> p != path && attrs.isDirectory())) { + children.forEach(action); + } catch (IOException e) { + logger.error("File tree watch (for: {}) could not iterate over its children ({})", path, e); + } + } + + // -- JDKBaseWatch -- + + @Override + public void handleEvent(WatchEvent event) { + internal.handleEvent(event); + } + + @Override + public synchronized void close() throws IOException { + forEachChild(this::closeChildWatch); + internal.close(); + } + + @Override + protected synchronized void start() throws IOException { + internal.open(); + forEachChild(this::openChildWatch); + } +} diff --git a/src/main/java/engineering/swat/watch/impl/jdk/JDKRecursiveDirectoryWatch2.java b/src/main/java/engineering/swat/watch/impl/jdk/JDKRecursiveDirectoryWatch2.java new file mode 100644 index 00000000..fa738a93 --- /dev/null +++ b/src/main/java/engineering/swat/watch/impl/jdk/JDKRecursiveDirectoryWatch2.java @@ -0,0 +1,134 @@ +package engineering.swat.watch.impl.jdk; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import engineering.swat.watch.ActiveWatch; +import engineering.swat.watch.WatchEvent; + +public class JDKRecursiveDirectoryWatch2 extends JDKBaseWatch { + private final Logger logger = LogManager.getLogger(); + private final Map childWatches = new ConcurrentHashMap<>(); + private final JDKBaseWatch delegate; + + public JDKRecursiveDirectoryWatch2(Path directory, Executor exec, BiConsumer eventHandler) { + super(directory, exec, eventHandler); + this.delegate = new JDKDirectoryWatch(directory, exec, updateChildWatches().andThen(eventHandler), false); + } + + /** + * @return An event handler that updates the child watches according to the + * following rules: (a) when an overflow happens, it's propagated to each + * existing child watch; (b) when a subdirectory creation happens, a new + * child watch is opened for that subdirectory; (c) when a subdirectory + * deletion happens, an existing child watch is closed for that + * subdirectory. + */ + private BiConsumer updateChildWatches() { + return (watch, event) -> { + var kind = event.getKind(); + + if (kind == WatchEvent.Kind.OVERFLOW) { + forEachChild(this::reportOverflowToChildWatch); + return; + } + + var child = event.calculateFullPath(); + var directory = child.toFile().isDirectory(); + + if (kind == WatchEvent.Kind.CREATED && directory) { + openChildWatch(child); + // Events in the newly created directory (`child`) might have + // been missed between its creation (`event`) and setting up its + // watch. Erring on the side of caution, generate an overflow + // event for the watch. + reportOverflowToChildWatch(child); + } + + if (kind == WatchEvent.Kind.DELETED && directory) { + closeChildWatch(child); + } + }; + } + + private void openChildWatch(Path child) { + var childWatch = new JDKRecursiveDirectoryWatch2(child, exec, (w, e) -> + // Same as `eventHandler`, except each event is pre-processed such + // that the last segment of the root path becomes the first segment + // of the relative path. For instance, `foo/bar` (root path) and + // `baz.txt` (relative path) are pre-processed to `foo` (root path) + // and `bar/baz.txt` (relative path). This is to ensure the parent + // directory of a child directory is reported as the root directory + // of the event. + eventHandler.accept(w, relativize(e)) + ); + + if (childWatches.putIfAbsent(child, childWatch) == null) { + try { + childWatch.open(); + } catch (IOException e) { + logger.error("Could not open (recursive) subdirectory watch for: {} ({})", child, e); + } + } + } + + private void closeChildWatch(Path child) { + var childWatch = childWatches.remove(child); + if (childWatch != null) { + try { + childWatch.close(); + } catch (IOException e) { + logger.error("Could not close (recursive) subdirectory watch for: {} ({})", child, e); + } + } + } + + private void reportOverflowToChildWatch(Path child) { + var childWatch = childWatches.get(child); + if (childWatch != null) { + var overflow = new WatchEvent(WatchEvent.Kind.OVERFLOW, child); + childWatch.handleEvent(overflow); + } + } + + private void forEachChild(Consumer action) { + try (var children = Files.find(path, 1, (p, attrs) -> p != path && attrs.isDirectory())) { + children.forEach(action); + } catch (IOException e) { + logger.error("Recursive directory watch (for: {}) could not iterate over its children ({})", path, e); + } + } + + // -- JDKBaseWatch -- + + @Override + public void handleEvent(WatchEvent event) { + delegate.handleEvent(event); + } + + @Override + public synchronized void close() throws IOException { + delegate.close(); + for (var childWatch : childWatches.values()) { + childWatch.close(); + } + } + + @Override + protected synchronized void start() throws IOException { + // It's important to open the watch for the parent first, before opening + // watches for the children. Otherwise, new directories created while + // the watches are being set up might remain unnoticed. + delegate.open(); + forEachChild(this::openChildWatch); + } +} diff --git a/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java b/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java new file mode 100644 index 00000000..e9137160 --- /dev/null +++ b/src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java @@ -0,0 +1,98 @@ +package engineering.swat.watch.impl.overflows; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.attribute.FileTime; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.stream.Stream; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import engineering.swat.watch.WatchEvent; +import engineering.swat.watch.impl.EventHandlingWatch; + +public class IndexingRescanner extends MemorylessRescanner { + private final Logger logger = LogManager.getLogger(); + private final Map index = new ConcurrentHashMap<>(); + + public IndexingRescanner(Executor exec, boolean recursive) { + super(exec, recursive); + } + + public void indexContent(Path root, boolean recursive) { + var maxDepth = recursive ? Integer.MAX_VALUE : 1; + try (var content = contentOf(root, maxDepth, logger)) { + content.forEach(this::index); + } + } + + private void index(Path p) { + try { + index.put(p, Files.getLastModifiedTime(p)); + } catch (IOException e) { + logger.error("Could not get modification time of: {} ({})", p, e); + } + } + + // -- MemorylessRescanner -- + + @Override + protected Stream generateEvents(Path path) { + try { + var lastModifiedOld = index.get(path); + var lastModifiedNew = Files.getLastModifiedTime(path); + + // The file isn't indexed yet + if (lastModifiedOld == null) { + index.put(path, lastModifiedNew); + return super.generateEvents(path); + } + + // The file is already indexed, and the old modification time is + // strictly before the new modification time + else if (lastModifiedOld.compareTo(lastModifiedNew) < 0) { + index.put(path, lastModifiedNew); + return Stream.of(modified(path)); + } + + // The file is already indexed, but the old modification time isn't + // strictly before the new modification time + else { + return Stream.empty(); + } + + } catch (IOException e) { + logger.error("Could not generate events (while rescanning) for: {} ({})", path, e); + return Stream.empty(); + } + } + + @Override + public void accept(EventHandlingWatch watch, WatchEvent event) { + var kind = event.getKind(); + var fullPath = event.calculateFullPath(); + + switch (kind) { + case MODIFIED: + // If a modified event happens for a path that's not in the + // index yet, then a create event might have been missed. + if (!index.containsKey(fullPath)) { + watch.handleEvent(created(fullPath)); + } + // Fallthrough intended + case CREATED: + index(fullPath); + break; + case DELETED: + index.remove(fullPath); + break; + case OVERFLOW: + super.accept(watch, event); + break; + } + } +} diff --git a/src/main/java/engineering/swat/watch/impl/overflows/MemorylessRescanner.java b/src/main/java/engineering/swat/watch/impl/overflows/MemorylessRescanner.java new file mode 100644 index 00000000..29ada7f6 --- /dev/null +++ b/src/main/java/engineering/swat/watch/impl/overflows/MemorylessRescanner.java @@ -0,0 +1,74 @@ +package engineering.swat.watch.impl.overflows; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.concurrent.Executor; +import java.util.function.BiConsumer; +import java.util.stream.Stream; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import engineering.swat.watch.WatchEvent; +import engineering.swat.watch.impl.EventHandlingWatch; + +public class MemorylessRescanner implements BiConsumer { + private final Logger logger = LogManager.getLogger(); + private final Executor exec; + private final int maxDepth; + + public MemorylessRescanner(Executor exec, boolean recursive) { + this.exec = exec; + this.maxDepth = recursive ? Integer.MAX_VALUE : 1; + } + + protected void rescan(EventHandlingWatch watch) { + var root = watch.getPath(); + try (var content = contentOf(root, maxDepth, logger)) { + content + .flatMap(this::generateEvents) + .map(watch::relativize) + .forEach(watch::handleEvent); + } + } + + protected Stream generateEvents(Path path) { + try { + if (Files.size(path) == 0) { + return Stream.of(created(path)); + } else { + return Stream.of(created(path), modified(path)); + } + } catch (IOException e) { + logger.error("Could not generate events (while rescanning) for: {} ({})", path, e); + return Stream.empty(); + } + } + + protected static WatchEvent created(Path path) { + return new WatchEvent(WatchEvent.Kind.CREATED, path); + } + + protected static WatchEvent modified(Path path) { + return new WatchEvent(WatchEvent.Kind.MODIFIED, path); + } + + protected static Stream contentOf(Path path, int maxDepth, Logger logger) { + try { + return Files.walk(path, maxDepth).filter(p -> p != path); + } catch (IOException e) { + logger.error("Could not walk: {} ({})", path, e); + return Stream.empty(); + } + } + + // -- BiConsumer -- + + @Override + public void accept(EventHandlingWatch watch, WatchEvent event) { + if (event.getKind() == WatchEvent.Kind.OVERFLOW) { + exec.execute(() -> rescan(watch)); + } + } +}