Skip to content

Commit 500e238

Browse files
committed
Update JDKRecursiveDirectoryWatch
1 parent 47b8a10 commit 500e238

File tree

2 files changed

+41
-168
lines changed

2 files changed

+41
-168
lines changed

src/main/java/engineering/swat/watch/Watcher.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,8 @@ public ActiveWatch start() throws IOException {
193193
// no native support, use the simulation
194194
logger.debug("Not possible to register the native watcher, using fallback for {}", path);
195195
logger.trace(ex);
196-
var result = new JDKFileTreeWatch(path, executor, h);
196+
var result = new JDKRecursiveDirectoryWatch(path, executor, h);
197+
// var result = new JDKFileTreeWatch(path, executor, h);
197198
result.open();
198199
return result;
199200
}

src/main/java/engineering/swat/watch/impl/jdk/JDKRecursiveDirectoryWatch.java

Lines changed: 39 additions & 167 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,6 @@
3232
import java.nio.file.Path;
3333
import java.nio.file.SimpleFileVisitor;
3434
import java.nio.file.attribute.BasicFileAttributes;
35-
import java.util.ArrayDeque;
36-
import java.util.ArrayList;
37-
import java.util.Deque;
38-
import java.util.HashSet;
39-
import java.util.List;
40-
import java.util.Set;
41-
import java.util.concurrent.CompletableFuture;
4235
import java.util.concurrent.ConcurrentHashMap;
4336
import java.util.concurrent.ConcurrentMap;
4437
import java.util.concurrent.Executor;
@@ -59,9 +52,9 @@ public JDKRecursiveDirectoryWatch(Path directory, Executor exec, BiConsumer<Even
5952
super(directory, exec, eventHandler);
6053
}
6154

62-
private void processEvents(WatchEvent ev) {
55+
private void processEvents(EventHandlingWatch w, WatchEvent ev) {
6356
logger.trace("Forwarding event: {}", ev);
64-
eventHandler.accept(this, ev);
57+
eventHandler.accept(w, ev);
6558
logger.trace("Unwrapping event: {}", ev);
6659
switch (ev.getKind()) {
6760
case CREATED: handleCreate(ev); break;
@@ -71,11 +64,6 @@ private void processEvents(WatchEvent ev) {
7164
}
7265
}
7366

74-
private void publishExtraEvents(List<WatchEvent> ev) {
75-
logger.trace("Reporting new nested directories & files: {}", ev);
76-
ev.forEach(e -> eventHandler.accept(this, e));
77-
}
78-
7967
private void handleCreate(WatchEvent ev) {
8068
// between the event and the current state of the file system
8169
// we might have some nested directories we missed
@@ -85,27 +73,24 @@ private void handleCreate(WatchEvent ev) {
8573
// create till after the processing is done, so we schedule it in the background
8674
var fullPath = ev.calculateFullPath();
8775
if (!activeWatches.containsKey(fullPath)) {
88-
CompletableFuture
89-
.completedFuture(fullPath)
90-
.thenApplyAsync(this::registerForNewDirectory, exec)
91-
.thenAcceptAsync(this::publishExtraEvents, exec)
92-
.exceptionally(ex -> {
93-
logger.error("Could not locate new sub directories for: {}", ev.calculateFullPath(), ex);
94-
return null;
95-
});
76+
try {
77+
if (Files.isDirectory(fullPath)) {
78+
addNewDirectory(fullPath);
79+
triggerOverflow(fullPath);
80+
}
81+
} catch (IOException ex) {
82+
logger.error("Could not locate new sub directories for: {}", ev.calculateFullPath(), ex);
83+
}
9684
}
9785
}
9886

9987
private void handleOverflow(WatchEvent ev) {
100-
logger.info("Overflow detected, rescanning to find missed entries in {}", path);
101-
CompletableFuture
102-
.completedFuture(ev.calculateFullPath())
103-
.thenApplyAsync(this::syncAfterOverflow, exec)
104-
.thenAcceptAsync(this::publishExtraEvents, exec)
105-
.exceptionally(ex -> {
106-
logger.error("Could not register new watch for: {} ({})", ev.calculateFullPath(), ex);
107-
return null;
108-
});
88+
var fullPath = ev.calculateFullPath();
89+
try (var children = Files.find(fullPath, 1, (p, attrs) -> p != fullPath && attrs.isDirectory())) {
90+
children.forEach(JDKRecursiveDirectoryWatch.this::triggerOverflow);
91+
} catch (IOException e) {
92+
logger.error("Could not handle overflow for: {} ({})", fullPath, e);
93+
}
10994
}
11095

11196
private void handleDeleteDirectory(WatchEvent ev) {
@@ -147,153 +132,40 @@ public FileVisitResult postVisitDirectory(Path subdir, IOException exc) throws I
147132
}
148133
return FileVisitResult.CONTINUE;
149134
}
150-
151-
private void addNewDirectory(Path dir) throws IOException {
152-
var watch = activeWatches.computeIfAbsent(dir, d -> new JDKDirectoryWatch(d, exec, relocater(dir)));
153-
try {
154-
if (!watch.startIfFirstTime()) {
155-
logger.debug("We lost the race on starting a nested watch, that shouldn't be a problem, but it's a very busy, so we might have lost a few events in {}", dir);
156-
}
157-
} catch (IOException ex) {
158-
activeWatches.remove(dir);
159-
logger.error("Could not register a watch for: {} ({})", dir, ex);
160-
throw ex;
161-
}
162-
}
163-
164-
/** Make sure that the events are relative to the actual root of the recursive watch */
165-
private BiConsumer<EventHandlingWatch, WatchEvent> relocater(Path subRoot) {
166-
final Path newRelative = path.relativize(subRoot);
167-
return (w, ev) -> {
168-
var rewritten = new WatchEvent(ev.getKind(), path, newRelative.resolve(ev.getRelativePath()));
169-
processEvents(rewritten);
170-
};
171-
}
172135
}
173136

174-
/** register watch for new sub-dir, but also simulate event for every file & subdir found */
175-
private class NewDirectoryScan extends InitialDirectoryScan {
176-
protected final List<WatchEvent> events;
177-
protected final Set<Path> seenFiles;
178-
protected final Set<Path> seenDirs;
179-
private boolean hasFiles = false;
180-
public NewDirectoryScan(Path subRoot, List<WatchEvent> events, Set<Path> seenFiles, Set<Path> seenDirs) {
181-
super(subRoot);
182-
this.events = events;
183-
this.seenFiles = seenFiles;
184-
this.seenDirs = seenDirs;
185-
}
186-
187-
@Override
188-
public FileVisitResult preVisitDirectory(Path subdir, BasicFileAttributes attrs) throws IOException {
189-
try {
190-
hasFiles = false;
191-
if (!seenDirs.contains(subdir)) {
192-
if (!subdir.equals(subRoot)) {
193-
events.add(new WatchEvent(WatchEvent.Kind.CREATED, path, path.relativize(subdir)));
194-
}
195-
return super.preVisitDirectory(subdir, attrs);
196-
}
197-
// our children might have newer results
198-
return FileVisitResult.CONTINUE;
199-
} finally {
200-
seenDirs.add(subdir);
201-
}
202-
}
203-
204-
@Override
205-
public FileVisitResult postVisitDirectory(Path subdir, IOException exc) throws IOException {
206-
if (hasFiles) {
207-
events.add(new WatchEvent(WatchEvent.Kind.MODIFIED, path, path.relativize(subdir)));
208-
}
209-
return super.postVisitDirectory(subdir, exc);
210-
}
211-
212-
@Override
213-
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
214-
if (!seenFiles.contains(file)) {
215-
hasFiles = true;
216-
217-
var relative = path.relativize(file);
218-
events.add(new WatchEvent(WatchEvent.Kind.CREATED, path, relative));
219-
if (attrs.size() > 0) {
220-
events.add(new WatchEvent(WatchEvent.Kind.MODIFIED, path, relative));
221-
}
222-
seenFiles.add(file);
137+
private void addNewDirectory(Path dir) throws IOException {
138+
var watch = activeWatches.computeIfAbsent(dir, d -> new JDKDirectoryWatch(d, exec, relocater(dir)));
139+
try {
140+
if (!watch.startIfFirstTime()) {
141+
logger.debug("We lost the race on starting a nested watch, that shouldn't be a problem, but it's a very busy, so we might have lost a few events in {}", dir);
223142
}
224-
return FileVisitResult.CONTINUE;
143+
} catch (IOException ex) {
144+
activeWatches.remove(dir);
145+
logger.error("Could not register a watch for: {} ({})", dir, ex);
146+
throw ex;
225147
}
226148
}
227149

228-
/** detect directories that aren't tracked yet, and generate events only for new entries */
229-
private class OverflowSyncScan extends NewDirectoryScan {
230-
private final Deque<Boolean> isNewDirectory = new ArrayDeque<>();
231-
public OverflowSyncScan(Path subRoot, List<WatchEvent> events, Set<Path> seenFiles, Set<Path> seenDirs) {
232-
super(subRoot, events, seenFiles, seenDirs);
233-
}
234-
@Override
235-
public FileVisitResult preVisitDirectory(Path subdir, BasicFileAttributes attrs) throws IOException {
236-
if (!activeWatches.containsKey(subdir)) {
237-
isNewDirectory.addLast(true);
238-
return super.preVisitDirectory(subdir, attrs);
239-
}
240-
isNewDirectory.addLast(false);
241-
return FileVisitResult.CONTINUE;
242-
}
243-
@Override
244-
public FileVisitResult postVisitDirectory(Path subdir, IOException exc) throws IOException {
245-
isNewDirectory.removeLast();
246-
return super.postVisitDirectory(subdir, exc);
247-
}
248-
@Override
249-
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
250-
if (isNewDirectory.peekLast() == Boolean.TRUE || !seenFiles.contains(file)) {
251-
return super.visitFile(file, attrs);
252-
}
253-
return FileVisitResult.CONTINUE;
254-
}
150+
/** Make sure that the events are relative to the actual root of the recursive watch */
151+
private BiConsumer<EventHandlingWatch, WatchEvent> relocater(Path subRoot) {
152+
final Path newRelative = path.relativize(subRoot);
153+
return (w, ev) -> {
154+
var rewritten = new WatchEvent(ev.getKind(), path, newRelative.resolve(ev.getRelativePath()));
155+
processEvents(w, rewritten);
156+
};
255157
}
256158

257159
private void registerInitialWatches(Path dir) throws IOException {
258160
Files.walkFileTree(dir, new InitialDirectoryScan(dir));
161+
triggerOverflow(dir);
259162
}
260163

261-
private List<WatchEvent> registerForNewDirectory(Path dir) {
262-
var events = new ArrayList<WatchEvent>();
263-
var seenFiles = new HashSet<Path>();
264-
var seenDirectories = new HashSet<Path>();
265-
try {
266-
Files.walkFileTree(dir, new NewDirectoryScan(dir, events, seenFiles, seenDirectories));
267-
detectedMissingEntries(dir, events, seenFiles, seenDirectories);
268-
return events;
269-
} catch (IOException ex) {
270-
throw new RuntimeException(ex);
271-
}
272-
}
273-
274-
private List<WatchEvent> syncAfterOverflow(Path dir) {
275-
var events = new ArrayList<WatchEvent>();
276-
var seenFiles = new HashSet<Path>();
277-
var seenDirectories = new HashSet<Path>();
278-
try {
279-
Files.walkFileTree(dir, new OverflowSyncScan(dir, events, seenFiles, seenDirectories));
280-
detectedMissingEntries(dir, events, seenFiles, seenDirectories);
281-
return events;
282-
} catch (IOException ex) {
283-
throw new RuntimeException(ex);
284-
}
285-
}
286-
287-
private void detectedMissingEntries(Path dir, ArrayList<WatchEvent> events, HashSet<Path> seenFiles, HashSet<Path> seenDirectories) throws IOException {
288-
// why a second round? well there is a race, between iterating the directory (and sending events)
289-
// and when the watches are active. so after we know all the new watches have been registered
290-
// we do a second scan and make sure to find paths that weren't visible the first time
291-
// and emulate events for them (and register new watches)
292-
// In essence this is the same as when an Overflow happened, so we can reuse that handler.
293-
int directoryCount = seenDirectories.size() - 1;
294-
while (directoryCount != seenDirectories.size()) {
295-
Files.walkFileTree(dir, new OverflowSyncScan(dir, events, seenFiles, seenDirectories));
296-
directoryCount = seenDirectories.size();
164+
private void triggerOverflow(Path p) {
165+
var w = activeWatches.get(p);
166+
if (w != null) {
167+
var overflow = new WatchEvent(WatchEvent.Kind.OVERFLOW, p);
168+
w.handleEvent(overflow);
297169
}
298170
}
299171

@@ -306,7 +178,7 @@ public WatchScope getScope() {
306178

307179
@Override
308180
public void handleEvent(WatchEvent event) {
309-
processEvents(event);
181+
processEvents(this, event);
310182
}
311183

312184
@Override

0 commit comments

Comments
 (0)