Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 132 additions & 0 deletions src/main/java/engineering/swat/watch/impl/jdk/JDKFileTreeWatch.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package engineering.swat.watch.impl.jdk;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import engineering.swat.watch.WatchEvent;
import engineering.swat.watch.impl.EventHandlingWatch;

public class JDKFileTreeWatch extends JDKBaseWatch {
private final Logger logger = LogManager.getLogger();
private final Map<Path, JDKFileTreeWatch> childWatches = new ConcurrentHashMap<>();
private final JDKBaseWatch internal;

public JDKFileTreeWatch(Path root, Executor exec,
BiConsumer<EventHandlingWatch, WatchEvent> eventHandler) {

super(root, exec, eventHandler);
var internalEventHandler = updateChildWatches().andThen(eventHandler);
this.internal = new JDKDirectoryWatch(root, exec, internalEventHandler);
}

/**
* @return An event handler that updates the child watches according to the
* following rules: (a) when an overflow happens, it's propagated to each
* existing child watch; (b) when a subdirectory creation happens, a new
* child watch is opened for that subdirectory; (c) when a subdirectory
* deletion happens, an existing child watch is closed for that
* subdirectory.
*/
private BiConsumer<EventHandlingWatch, WatchEvent> updateChildWatches() {
return (watch, event) -> {
var kind = event.getKind();

if (kind == WatchEvent.Kind.OVERFLOW) {
forEachChild(this::reportOverflowToChildWatch);
return;
}

var child = event.calculateFullPath();
var directory = child.toFile().isDirectory();

if (kind == WatchEvent.Kind.CREATED && directory) {
openChildWatch(child);
// Events in the newly created directory (`child`) might have
// been missed between its creation (`event`) and setting up its
// watch. Erring on the side of caution, generate an overflow
// event for the watch.
reportOverflowToChildWatch(child);
}

if (kind == WatchEvent.Kind.DELETED && directory) {
closeChildWatch(child);
}
};
}

private void openChildWatch(Path child) {
var childWatch = new JDKFileTreeWatch(child, exec, (w, e) ->
// Same as `eventHandler`, except each event is pre-processed such
// that the last segment of the root path becomes the first segment
// of the relative path. For instance, `foo/bar` (root path) and
// `baz.txt` (relative path) are pre-processed to `foo` (root path)
// and `bar/baz.txt` (relative path). This is to ensure the parent
// directory of a child directory is reported as the root directory
// of the event.
eventHandler.accept(w, relativize(e))
);

if (childWatches.putIfAbsent(child, childWatch) == null) {
try {
childWatch.open();
} catch (IOException e) {
logger.error("Could not open (nested) file tree watch for: {} ({})", child, e);
}
}
}

private void closeChildWatch(Path child) {
var childWatch = childWatches.remove(child);
if (childWatch != null) {
try {
childWatch.close();
} catch (IOException e) {
logger.error("Could not close (nested) file tree watch for: {} ({})", child, e);
}
}
}

private void reportOverflowToChildWatch(Path child) {
var childWatch = childWatches.get(child);
if (childWatch != null) {
var overflow = new WatchEvent(WatchEvent.Kind.OVERFLOW, child);
childWatch.handleEvent(overflow);
}
}

private void forEachChild(Consumer<Path> action) {
try (var children = Files.find(path, 1, (p, attrs) -> p != path && attrs.isDirectory())) {
children.forEach(action);
} catch (IOException e) {
logger.error("File tree watch (for: {}) could not iterate over its children ({})", path, e);
}
}

// -- JDKBaseWatch --

@Override
public void handleEvent(WatchEvent event) {
internal.handleEvent(event);
}

@Override
public synchronized void close() throws IOException {
forEachChild(this::closeChildWatch);
internal.close();
}

@Override
protected synchronized void start() throws IOException {
internal.open();
forEachChild(this::openChildWatch);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package engineering.swat.watch.impl.jdk;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import engineering.swat.watch.ActiveWatch;
import engineering.swat.watch.WatchEvent;

public class JDKRecursiveDirectoryWatch2 extends JDKBaseWatch {
private final Logger logger = LogManager.getLogger();
private final Map<Path, JDKRecursiveDirectoryWatch2> childWatches = new ConcurrentHashMap<>();
private final JDKBaseWatch delegate;

public JDKRecursiveDirectoryWatch2(Path directory, Executor exec, BiConsumer<ActiveWatch, WatchEvent> eventHandler) {
super(directory, exec, eventHandler);
this.delegate = new JDKDirectoryWatch(directory, exec, updateChildWatches().andThen(eventHandler), false);
}

/**
* @return An event handler that updates the child watches according to the
* following rules: (a) when an overflow happens, it's propagated to each
* existing child watch; (b) when a subdirectory creation happens, a new
* child watch is opened for that subdirectory; (c) when a subdirectory
* deletion happens, an existing child watch is closed for that
* subdirectory.
*/
private BiConsumer<ActiveWatch, WatchEvent> updateChildWatches() {
return (watch, event) -> {
var kind = event.getKind();

if (kind == WatchEvent.Kind.OVERFLOW) {
forEachChild(this::reportOverflowToChildWatch);
return;
}

var child = event.calculateFullPath();
var directory = child.toFile().isDirectory();

if (kind == WatchEvent.Kind.CREATED && directory) {
openChildWatch(child);
// Events in the newly created directory (`child`) might have
// been missed between its creation (`event`) and setting up its
// watch. Erring on the side of caution, generate an overflow
// event for the watch.
reportOverflowToChildWatch(child);
}

if (kind == WatchEvent.Kind.DELETED && directory) {
closeChildWatch(child);
}
};
}

private void openChildWatch(Path child) {
var childWatch = new JDKRecursiveDirectoryWatch2(child, exec, (w, e) ->
// Same as `eventHandler`, except each event is pre-processed such
// that the last segment of the root path becomes the first segment
// of the relative path. For instance, `foo/bar` (root path) and
// `baz.txt` (relative path) are pre-processed to `foo` (root path)
// and `bar/baz.txt` (relative path). This is to ensure the parent
// directory of a child directory is reported as the root directory
// of the event.
eventHandler.accept(w, relativize(e))
);

if (childWatches.putIfAbsent(child, childWatch) == null) {
try {
childWatch.open();
} catch (IOException e) {
logger.error("Could not open (recursive) subdirectory watch for: {} ({})", child, e);
}
}
}

private void closeChildWatch(Path child) {
var childWatch = childWatches.remove(child);
if (childWatch != null) {
try {
childWatch.close();
} catch (IOException e) {
logger.error("Could not close (recursive) subdirectory watch for: {} ({})", child, e);
}
}
}

private void reportOverflowToChildWatch(Path child) {
var childWatch = childWatches.get(child);
if (childWatch != null) {
var overflow = new WatchEvent(WatchEvent.Kind.OVERFLOW, child);
childWatch.handleEvent(overflow);
}
}

private void forEachChild(Consumer<Path> action) {
try (var children = Files.find(path, 1, (p, attrs) -> p != path && attrs.isDirectory())) {
children.forEach(action);
} catch (IOException e) {
logger.error("Recursive directory watch (for: {}) could not iterate over its children ({})", path, e);
}
}

// -- JDKBaseWatch --

@Override
public void handleEvent(WatchEvent event) {
delegate.handleEvent(event);
}

@Override
public synchronized void close() throws IOException {
delegate.close();
for (var childWatch : childWatches.values()) {
childWatch.close();
}
}

@Override
protected synchronized void start() throws IOException {
// It's important to open the watch for the parent first, before opening
// watches for the children. Otherwise, new directories created while
// the watches are being set up might remain unnoticed.
delegate.open();
forEachChild(this::openChildWatch);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package engineering.swat.watch.impl.overflows;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileTime;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.stream.Stream;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import engineering.swat.watch.WatchEvent;
import engineering.swat.watch.impl.EventHandlingWatch;

public class IndexingRescanner extends MemorylessRescanner {
private final Logger logger = LogManager.getLogger();
private final Map<Path, FileTime> index = new ConcurrentHashMap<>();

public IndexingRescanner(Executor exec, boolean recursive) {
super(exec, recursive);
}

public void indexContent(Path root, boolean recursive) {
var maxDepth = recursive ? Integer.MAX_VALUE : 1;
try (var content = contentOf(root, maxDepth, logger)) {
content.forEach(this::index);
}
}

private void index(Path p) {
try {
index.put(p, Files.getLastModifiedTime(p));
} catch (IOException e) {
logger.error("Could not get modification time of: {} ({})", p, e);
}
}

// -- MemorylessRescanner --

@Override
protected Stream<WatchEvent> generateEvents(Path path) {
try {
var lastModifiedOld = index.get(path);
var lastModifiedNew = Files.getLastModifiedTime(path);

// The file isn't indexed yet
if (lastModifiedOld == null) {
index.put(path, lastModifiedNew);
return super.generateEvents(path);
}

// The file is already indexed, and the old modification time is
// strictly before the new modification time
else if (lastModifiedOld.compareTo(lastModifiedNew) < 0) {
index.put(path, lastModifiedNew);
return Stream.of(modified(path));
}

// The file is already indexed, but the old modification time isn't
// strictly before the new modification time
else {
return Stream.empty();
}

} catch (IOException e) {
logger.error("Could not generate events (while rescanning) for: {} ({})", path, e);
return Stream.empty();
}
}

@Override
public void accept(EventHandlingWatch watch, WatchEvent event) {
var kind = event.getKind();
var fullPath = event.calculateFullPath();

switch (kind) {
case MODIFIED:
// If a modified event happens for a path that's not in the
// index yet, then a create event might have been missed.
if (!index.containsKey(fullPath)) {
watch.handleEvent(created(fullPath));
}
// Fallthrough intended
case CREATED:
index(fullPath);
break;
case DELETED:
index.remove(fullPath);
break;
case OVERFLOW:
super.accept(watch, event);
break;
}
}
}
Loading
Loading