Skip to content

Commit 61ebbb8

Browse files
authored
Merge pull request #29 from SWAT-engineering/improved-overflow-support/parameterized-torture-tests
Improved overflow support: Parameterized torture tests
2 parents fd68764 + 23506ae commit 61ebbb8

File tree

3 files changed

+166
-19
lines changed

3 files changed

+166
-19
lines changed

src/main/java/engineering/swat/watch/impl/overflows/IndexingRescanner.java

Lines changed: 125 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,29 +33,130 @@
3333
import java.nio.file.attribute.BasicFileAttributes;
3434
import java.nio.file.attribute.FileTime;
3535
import java.util.ArrayDeque;
36+
import java.util.Collections;
3637
import java.util.Deque;
3738
import java.util.HashSet;
3839
import java.util.Map;
3940
import java.util.Set;
4041
import java.util.concurrent.ConcurrentHashMap;
4142
import java.util.concurrent.Executor;
43+
import java.util.function.BiFunction;
4244

4345
import org.apache.logging.log4j.LogManager;
4446
import org.apache.logging.log4j.Logger;
47+
import org.checkerframework.checker.nullness.qual.Nullable;
4548

4649
import engineering.swat.watch.WatchEvent;
4750
import engineering.swat.watch.WatchScope;
4851
import engineering.swat.watch.impl.EventHandlingWatch;
4952

5053
public class IndexingRescanner extends MemorylessRescanner {
5154
private final Logger logger = LogManager.getLogger();
52-
private final Map<Path, FileTime> index = new ConcurrentHashMap<>();
55+
private final PathMap<FileTime> index = new PathMap<>();
5356

5457
public IndexingRescanner(Executor exec, Path path, WatchScope scope) {
5558
super(exec);
5659
new Indexer(path, scope).walkFileTree(); // Make an initial scan to populate the index
5760
}
5861

62+
private static class PathMap<V> {
63+
private final Map<Path, Map<Path, V>> values = new ConcurrentHashMap<>();
64+
// ^^^^ ^^^^
65+
// Parent File name (regular file or directory)
66+
67+
public @Nullable V put(Path p, V value) {
68+
return apply(put(value), p);
69+
}
70+
71+
public @Nullable V get(Path p) {
72+
return apply(this::get, p);
73+
}
74+
75+
public Set<Path> getParents() {
76+
return (Set<Path>) values.keySet(); // Cast for Checker Framework
77+
}
78+
79+
public Set<Path> getFileNames(Path parent) {
80+
var inner = values.get(parent);
81+
return inner == null ? Collections.emptySet() : (Set<Path>) inner.keySet(); // Cast for Checker Framework
82+
}
83+
84+
public @Nullable V remove(Path p) {
85+
return apply(this::remove, p);
86+
}
87+
88+
private static <V> @Nullable V apply(BiFunction<Path, Path, @Nullable V> action, Path p) {
89+
var parent = p.getParent();
90+
var fileName = p.getFileName();
91+
if (parent != null && fileName != null) {
92+
return action.apply(parent, fileName);
93+
} else {
94+
throw new IllegalArgumentException("The path should have both a parent and a file name");
95+
}
96+
}
97+
98+
private BiFunction<Path, Path, @Nullable V> put(V value) {
99+
return (parent, fileName) -> put(parent, fileName, value);
100+
}
101+
102+
private @Nullable V put(Path parent, Path fileName, V value) {
103+
var inner = values.computeIfAbsent(parent, x -> new ConcurrentHashMap<>());
104+
105+
// This thread (henceforth: "here") optimistically puts a new entry
106+
// in `inner`. However, another thread (henceforth: "there") may
107+
// concurrently remove `inner` from `values`. Thus, the new entry
108+
// may be lost. The comments below explain the countermeasures.
109+
var previous = inner.put(fileName, value);
110+
111+
// <-- At this point "here", if `values.remove(parent)` happens
112+
// "there", then `values.get(parent) != inner` becomes true
113+
// "here", so the new entry will be re-put "here".
114+
if (values.get(parent) != inner) {
115+
previous = put(parent, fileName, value);
116+
}
117+
// <-- At this point "here", `!inner.isEmpty()` has become true
118+
// "there", so if `values.remove(parent)` happens "there", then
119+
// the new entry will be re-put "there".
120+
return previous;
121+
}
122+
123+
private @Nullable V get(Path parent, Path fileName) {
124+
var inner = values.get(parent);
125+
return inner == null ? null : inner.get(fileName);
126+
}
127+
128+
private @Nullable V remove(Path parent, Path fileName) {
129+
var inner = values.get(parent);
130+
if (inner != null) {
131+
var removed = inner.remove(fileName);
132+
133+
// This thread (henceforth: "here") optimistically removes
134+
// `inner` from `values` when it has become empty. However,
135+
// another thread (henceforth: "there") may concurrently put a
136+
// new entry in `inner`. Thus, the new entry may be lost. The
137+
// comments below explain the countermeasures.
138+
if (inner.isEmpty() && values.remove(parent, inner)) {
139+
140+
// <-- At this point "here", if `inner.put(...)` happens
141+
// "there", then `!inner.isEmpty()` becomes true "here",
142+
// so the new entry is re-put "here".
143+
if (!inner.isEmpty()) {
144+
for (var e : inner.entrySet()) {
145+
put(parent, e.getKey(), e.getValue());
146+
}
147+
}
148+
// <-- At this point "here", `values.get(parent) != inner`
149+
// has become true "there", so if `inner.put(...)`
150+
// happens "there", then the new entry will be re-put
151+
// "there".
152+
}
153+
return removed;
154+
} else {
155+
return null;
156+
}
157+
}
158+
}
159+
59160
private class Indexer extends BaseFileVisitor {
60161
public Indexer(Path path, WatchScope scope) {
61162
super(path, scope);
@@ -96,10 +197,11 @@ public Generator(Path path, WatchScope scope) {
96197
this.visited.push(new HashSet<>()); // Initial set for content of `path`
97198
}
98199

99-
private <T> void addToPeeked(Deque<Set<T>> deque, T t) {
200+
private void addToPeeked(Deque<Set<Path>> deque, Path p) {
100201
var peeked = deque.peek();
101-
if (peeked != null) {
102-
peeked.add(t);
202+
var fileName = p.getFileName();
203+
if (peeked != null && fileName != null) {
204+
peeked.add(fileName);
103205
}
104206
}
105207

@@ -140,9 +242,15 @@ public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOEx
140242
// Issue `DELETED` events based on the set of paths visited in `dir`
141243
var visitedInDir = visited.pop();
142244
if (visitedInDir != null) {
143-
for (var p : index.keySet()) {
144-
if (dir.equals(p.getParent()) && !visitedInDir.contains(p)) {
145-
events.add(new WatchEvent(WatchEvent.Kind.DELETED, p));
245+
for (var p : index.getFileNames(dir)) {
246+
if (!visitedInDir.contains(p)) {
247+
var fullPath = dir.resolve(p);
248+
// The index may have been updated during the visit, so
249+
// even if `p` isn't contained in `visitedInDir`, by
250+
// now, it may have come into existence.
251+
if (!Files.exists(fullPath)) {
252+
events.add(new WatchEvent(WatchEvent.Kind.DELETED, fullPath));
253+
}
146254
}
147255
}
148256
}
@@ -176,7 +284,16 @@ public void accept(EventHandlingWatch watch, WatchEvent event) {
176284
watch.handleEvent(watch.relativize(created));
177285
}
178286
} catch (IOException e) {
179-
logger.error("Could not get modification time of: {} ({})", fullPath, e);
287+
// It can happen that, by the time a `CREATED`/`MODIFIED`
288+
// event is handled above, getting the last-modified-time
289+
// fails because the file has already been deleted. That's
290+
// fine: we can just ignore the event. (The corresponding
291+
// `DELETED` event will later be handled and remove the file
292+
// from the index.) If the file exists, though, something
293+
// went legitimately wrong, so it needs to be reported.
294+
if (Files.exists(fullPath)) {
295+
logger.error("Could not get modification time of: {} ({})", fullPath, e);
296+
}
180297
}
181298
break;
182299
case DELETED:

src/test/java/engineering/swat/watch/TestHelper.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@
2727
package engineering.swat.watch;
2828

2929
import java.time.Duration;
30+
import java.util.Arrays;
31+
import java.util.stream.IntStream;
32+
import java.util.stream.Stream;
3033

3134
public class TestHelper {
3235

@@ -54,4 +57,21 @@ else if (os.contains("win")) {
5457
NORMAL_WAIT = Duration.ofSeconds(4 * delayFactor);
5558
LONG_WAIT = Duration.ofSeconds(8 * delayFactor);
5659
}
60+
61+
public static <T> Stream<T> streamOf(T[] values, int repetitions) {
62+
return streamOf(values, repetitions, false);
63+
}
64+
65+
public static <T> Stream<T> streamOf(T[] values, int repetitions, boolean sortByRepetition) {
66+
if (sortByRepetition) {
67+
return IntStream
68+
.range(0, repetitions)
69+
.boxed()
70+
.flatMap(i -> Arrays.stream(values));
71+
}
72+
else { // Sort by value
73+
return Arrays.stream(values).flatMap(v ->
74+
IntStream.range(0, repetitions).mapToObj(i -> v));
75+
}
76+
}
5777
}

src/test/java/engineering/swat/watch/TortureTests.java

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import java.util.concurrent.TimeUnit;
4747
import java.util.concurrent.atomic.AtomicInteger;
4848
import java.util.function.Predicate;
49+
import java.util.stream.Stream;
4950

5051
import org.apache.logging.log4j.LogManager;
5152
import org.apache.logging.log4j.Logger;
@@ -54,8 +55,10 @@
5455
import org.junit.jupiter.api.BeforeAll;
5556
import org.junit.jupiter.api.BeforeEach;
5657
import org.junit.jupiter.api.RepeatedTest;
57-
import org.junit.jupiter.api.Test;
5858
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
59+
import org.junit.jupiter.params.ParameterizedTest;
60+
import org.junit.jupiter.params.provider.EnumSource;
61+
import org.junit.jupiter.params.provider.MethodSource;
5962

6063
class TortureTests {
6164

@@ -141,17 +144,18 @@ Set<Path> stop() throws InterruptedException {
141144

142145
private static final int THREADS = 4;
143146

144-
@Test
145-
void pressureOnFSShouldNotMissNewFilesAnything() throws InterruptedException, IOException {
147+
@ParameterizedTest
148+
@EnumSource(names = { "ALL", "DIRTY" })
149+
void pressureOnFSShouldNotMissNewFilesAnything(OnOverflow whichFiles) throws InterruptedException, IOException {
146150
final var root = testDir.getTestDirectory();
147151
var pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 4);
148152

149153
var io = new IOGenerator(THREADS, root, pool);
150154

151-
152155
var seenCreates = ConcurrentHashMap.<Path>newKeySet();
153156
var watchConfig = Watcher.watch(testDir.getTestDirectory(), WatchScope.PATH_AND_ALL_DESCENDANTS)
154157
.withExecutor(pool)
158+
.approximate(whichFiles)
155159
.on(ev -> {
156160
var fullPath = ev.calculateFullPath();
157161
switch (ev.getKind()) {
@@ -263,8 +267,14 @@ void manyRegistrationsForSamePath() throws InterruptedException, IOException {
263267
}
264268
}
265269

266-
@RepeatedTest(failureThreshold=1, value = 20)
267-
void manyRegisterAndUnregisterSameTime() throws InterruptedException, IOException {
270+
static Stream<OnOverflow> manyRegisterAndUnregisterSameTimeSource() {
271+
OnOverflow[] values = { OnOverflow.ALL, OnOverflow.DIRTY };
272+
return TestHelper.streamOf(values, 5);
273+
}
274+
275+
@ParameterizedTest
276+
@MethodSource("manyRegisterAndUnregisterSameTimeSource")
277+
void manyRegisterAndUnregisterSameTime(OnOverflow whichFiles) throws InterruptedException, IOException {
268278
var startRegistering = new Semaphore(0);
269279
var startedWatching = new Semaphore(0);
270280
var stopAll = new Semaphore(0);
@@ -286,6 +296,7 @@ void manyRegisterAndUnregisterSameTime() throws InterruptedException, IOExceptio
286296
for (int k = 0; k < 1000; k++) {
287297
var watcher = Watcher
288298
.watch(testDir.getTestDirectory(), WatchScope.PATH_AND_CHILDREN)
299+
.approximate(whichFiles)
289300
.on(e -> {
290301
if (e.calculateFullPath().equals(target)) {
291302
seen.add(id);
@@ -328,13 +339,13 @@ void manyRegisterAndUnregisterSameTime() throws InterruptedException, IOExceptio
328339
finally {
329340
stopAll.release(amountOfWatchersActive);
330341
}
331-
332342
}
333343

334-
@Test
344+
@ParameterizedTest
345+
@EnumSource(names = { "ALL", "DIRTY" })
335346
//Deletes can race the filesystem, so you might miss a few files in a dir, if that dir is already deleted
336347
@EnabledIfEnvironmentVariable(named="TORTURE_DELETE", matches="true")
337-
void pressureOnFSShouldNotMissDeletes() throws InterruptedException, IOException {
348+
void pressureOnFSShouldNotMissDeletes(OnOverflow whichFiles) throws InterruptedException, IOException {
338349
final var root = testDir.getTestDirectory();
339350
var pool = Executors.newCachedThreadPool();
340351

@@ -350,6 +361,7 @@ void pressureOnFSShouldNotMissDeletes() throws InterruptedException, IOException
350361
final var happened = new Semaphore(0);
351362
var watchConfig = Watcher.watch(testDir.getTestDirectory(), WatchScope.PATH_AND_ALL_DESCENDANTS)
352363
.withExecutor(pool)
364+
.approximate(whichFiles)
353365
.on(ev -> {
354366
events.getAndIncrement();
355367
happened.release();
@@ -393,8 +405,6 @@ void pressureOnFSShouldNotMissDeletes() throws InterruptedException, IOException
393405
}
394406
}
395407

396-
397-
398408
private void waitForStable(final AtomicInteger events, final Semaphore happened) throws InterruptedException {
399409
int lastEventCount = events.get();
400410
int stableCount = 0;

0 commit comments

Comments
 (0)