Skip to content

Commit d6b1f28

Browse files
committed
Improved torture tests
1 parent 80628ab commit d6b1f28

File tree

4 files changed

+99
-31
lines changed

4 files changed

+99
-31
lines changed

src/main/java/engineering/swat/watch/impl/BundledSubscription.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,11 @@ public Closeable subscribe(Key target, Consumer<Event> eventListener) throws IOE
5757
Subscription<Event> active = this.subscriptions.computeIfAbsent(target, t -> new Subscription<>());
5858
// after this, there will only be 1 instance of active subscription in the map.
5959
// but we might have a race with remove, which can close the subscript between our get and our addition
60+
// since this code is very hard to get right without locks, and shouldn't be run too often
61+
// we take a big lock around the subscription management
6062
synchronized(active) {
6163
if (active.closed) {
62-
// we lost the race with closing the subscription
63-
// so we retry
64+
// we lost the race with closing the subscription, so we retry
6465
continue;
6566
}
6667
active.add(eventListener);

src/main/java/engineering/swat/watch/impl/SubscriptionKey.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,9 @@ public boolean equals(@Nullable Object obj) {
3636
public int hashCode() {
3737
return Objects.hash(path, recursive);
3838
}
39+
40+
@Override
41+
public String toString() {
42+
return path.toString() + (recursive ? "[recursive]" : "");
43+
}
3944
}

src/test/java/engineering/swat/watch/TortureTests.java

Lines changed: 46 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package engineering.swat.watch;
22

3-
import static org.awaitility.Awaitility.doNotCatchUncaughtExceptionsByDefault;
43
import static org.awaitility.Awaitility.await;
54
import static org.junit.jupiter.api.Assertions.assertTrue;
65
import static org.junit.jupiter.api.Assertions.fail;
@@ -11,8 +10,8 @@
1110
import java.time.Duration;
1211
import java.time.LocalTime;
1312
import java.util.Random;
13+
import java.util.Collections;
1414
import java.util.Set;
15-
import java.util.concurrent.CompletableFuture;
1615
import java.util.concurrent.ConcurrentHashMap;
1716
import java.util.concurrent.ConcurrentLinkedDeque;
1817
import java.util.concurrent.Executor;
@@ -27,10 +26,8 @@
2726
import org.apache.logging.log4j.Logger;
2827
import org.junit.jupiter.api.AfterEach;
2928
import org.junit.jupiter.api.BeforeEach;
30-
import org.junit.jupiter.api.Disabled;
29+
import org.junit.jupiter.api.RepeatedTest;
3130
import org.junit.jupiter.api.Test;
32-
import org.junit.jupiter.api.condition.DisabledIfEnvironmentVariable;
33-
import org.junit.jupiter.api.condition.EnabledIf;
3431
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
3532

3633
import engineering.swat.watch.WatchEvent.Kind;
@@ -173,24 +170,30 @@ void pressureOnFSShouldNotMissNewFilesAnything() throws InterruptedException, IO
173170
}
174171
}
175172

176-
@Test
173+
private final int TORTURE_REGISTRATION_THREADS = THREADS * 500;
174+
175+
@RepeatedTest(failureThreshold=1, value = 20)
177176
void manyRegistrationsForSamePath() throws InterruptedException, IOException {
178177
var startRegistering = new Semaphore(0);
178+
var startedWatching = new Semaphore(0);
179179
var startDeregistring = new Semaphore(0);
180180
var done = new Semaphore(0);
181181
var seen = ConcurrentHashMap.<Path>newKeySet();
182182
var exceptions = new LinkedBlockingDeque<Exception>();
183-
for (int t = 0; t < THREADS * 100; t++) {
183+
184+
for (int t = 0; t < TORTURE_REGISTRATION_THREADS; t++) {
184185
var r = new Thread(() -> {
185186
try {
186187
var watcher = Watcher
187188
.watch(testDir.getTestDirectory(), WatchScope.PATH_AND_CHILDREN)
188-
.onEvent(e -> { if (e.getKind() == Kind.CREATED) seen.add(e.calculateFullPath()); });
189+
.onEvent(e -> seen.add(e.calculateFullPath()));
189190
startRegistering.acquire();
190191
try (var c = watcher.start()) {
192+
startedWatching.release();
191193
startDeregistring.acquire();
192194
}
193195
catch(Exception e) {
196+
startedWatching.release();
194197
exceptions.push(e);
195198
}
196199
} catch (InterruptedException e1) {
@@ -203,29 +206,39 @@ void manyRegistrationsForSamePath() throws InterruptedException, IOException {
203206
r.start();
204207
}
205208

206-
startRegistering.release(THREADS * 100);
207-
startDeregistring.release((THREADS * 100) - 1);
208-
done.acquire((THREADS * 100) - 1);
209-
assertTrue(seen.isEmpty(), "No events should have been sent");
210-
Files.writeString(testDir.getTestDirectory().resolve("test124.txt"), "Hello World");
211-
await("We should see only one event")
212-
.failFast(() -> !exceptions.isEmpty())
213-
.timeout(TestHelper.LONG_WAIT)
214-
.pollInterval(Duration.ofMillis(10))
215-
.until(seen::size, s -> s == 1);
216-
if (!exceptions.isEmpty()) {
217-
fail(exceptions.pop());
209+
try {
210+
startRegistering.release(TORTURE_REGISTRATION_THREADS);
211+
startDeregistring.release(TORTURE_REGISTRATION_THREADS - 1);
212+
startedWatching.acquire(TORTURE_REGISTRATION_THREADS); // make sure they area ll started
213+
done.acquire(TORTURE_REGISTRATION_THREADS - 1);
214+
assertTrue(seen.isEmpty(), "No events should have been sent");
215+
var target = testDir.getTestDirectory().resolve("test124.txt");
216+
//logger.info("Writing: {}", target);
217+
Files.writeString(target, "Hello World");
218+
var expected = Collections.singleton(target);
219+
await("We should see only one event")
220+
.failFast(() -> !exceptions.isEmpty())
221+
.timeout(TestHelper.LONG_WAIT)
222+
.pollInterval(Duration.ofMillis(10))
223+
.until(() -> seen, expected::equals);
224+
if (!exceptions.isEmpty()) {
225+
fail(exceptions.pop());
226+
}
227+
}
228+
finally {
229+
startDeregistring.release(TORTURE_REGISTRATION_THREADS);
218230
}
219-
startDeregistring.release();
220231
}
221232

222-
@Test
233+
@RepeatedTest(failureThreshold=1, value = 20)
223234
void manyRegisterAndUnregisterSameTime() throws InterruptedException, IOException {
224235
var startRegistering = new Semaphore(0);
236+
var startedWatching = new Semaphore(0);
225237
var stopAll = new Semaphore(0);
226238
var done = new Semaphore(0);
227-
var seen = new ConcurrentLinkedDeque<Path>();
239+
var seen = ConcurrentHashMap.<Long>newKeySet();
228240
var exceptions = new LinkedBlockingDeque<Exception>();
241+
var target = testDir.getTestDirectory().resolve("test124.txt");
229242
int amountOfWatchersActive = 0;
230243
try {
231244
for (int t = 0; t < THREADS; t++) {
@@ -239,10 +252,14 @@ void manyRegisterAndUnregisterSameTime() throws InterruptedException, IOExceptio
239252
for (int k = 0; k < 1000; k++) {
240253
var watcher = Watcher
241254
.watch(testDir.getTestDirectory(), WatchScope.PATH_AND_CHILDREN)
242-
.onEvent(e -> { if (e.getKind() == Kind.CREATED) seen.add(e.calculateFullPath()); });
255+
.onEvent(e -> {
256+
if (e.calculateFullPath().equals(target)) {
257+
seen.add(Thread.currentThread().getId());
258+
}
259+
});
243260
try (var c = watcher.start()) {
244261
if (finishWatching && k + 1 == 1000) {
245-
logger.info("Waiting on stop signal");
262+
startedWatching.release();
246263
stopAll.acquire();
247264
}
248265
}
@@ -253,6 +270,7 @@ void manyRegisterAndUnregisterSameTime() throws InterruptedException, IOExceptio
253270
} catch (InterruptedException e1) {
254271
}
255272
finally {
273+
startedWatching.release();
256274
done.release();
257275
}
258276
});
@@ -262,9 +280,10 @@ void manyRegisterAndUnregisterSameTime() throws InterruptedException, IOExceptio
262280

263281
startRegistering.release(THREADS);
264282
done.acquire(THREADS - amountOfWatchersActive);
283+
startedWatching.acquire(THREADS);
265284
assertTrue(seen.isEmpty(), "No events should have been sent");
266-
Files.writeString(testDir.getTestDirectory().resolve("test124.txt"), "Hello World");
267-
await("We should see only exactly the events we expect")
285+
Files.writeString(target, "Hello World");
286+
await("We should see only exactly the " + amountOfWatchersActive + " events we expect")
268287
.failFast(() -> !exceptions.isEmpty())
269288
.pollDelay(TestHelper.NORMAL_WAIT.minusMillis(100))
270289
.until(seen::size, Predicate.isEqual(amountOfWatchersActive))

src/test/java/engineering/swat/watch/impl/BundlingTests.java

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,17 @@
1111
import java.util.concurrent.CompletableFuture;
1212
import java.util.concurrent.ConcurrentHashMap;
1313
import java.util.concurrent.TimeUnit;
14+
import java.util.concurrent.Semaphore;
1415
import java.util.concurrent.atomic.AtomicInteger;
1516
import java.util.function.Consumer;
1617

1718
import org.apache.logging.log4j.LogManager;
1819
import org.apache.logging.log4j.Logger;
1920
import org.awaitility.Awaitility;
21+
import org.hamcrest.core.IsEqual;
2022
import org.junit.jupiter.api.BeforeAll;
2123
import org.junit.jupiter.api.BeforeEach;
24+
import org.junit.jupiter.api.RepeatedTest;
2225
import org.junit.jupiter.api.Test;
2326

2427
import engineering.swat.watch.TestHelper;
@@ -36,7 +39,7 @@ private static class FakeSubscribable implements ISubscribable<Long, Boolean> {
3639
public Closeable subscribe(Long target, Consumer<Boolean> eventListener) throws IOException {
3740
subs.put(target, eventListener);
3841
return () -> {
39-
subs.remove(target);
42+
subs.remove(target, eventListener);
4043
};
4144
}
4245

@@ -60,7 +63,7 @@ static void setupEverything() {
6063
Awaitility.setDefaultTimeout(TestHelper.LONG_WAIT.getSeconds(), TimeUnit.SECONDS);
6164
}
6265

63-
private static final long SUBs = 100;
66+
private static final int SUBs = 100;
6467
private static final long MSGs = 100_000;
6568

6669
@Test
@@ -101,4 +104,44 @@ void manySubscriptions() throws IOException {
101104

102105

103106
}
107+
108+
@RepeatedTest(failureThreshold = 1, value=50)
109+
void parallelSubscriptions() throws IOException, InterruptedException {
110+
var hits = new AtomicInteger();
111+
var endPointReached = new Semaphore(0);
112+
var waitingForClose = new Semaphore(0);
113+
var done = new Semaphore(0);
114+
115+
int active = 0;
116+
for (int j = 0; j < SUBs; j++) {
117+
boolean keepAround = j % 2 == 0;
118+
if (keepAround) {
119+
active++;
120+
}
121+
var t = new Thread(() -> {
122+
for (int k =0; k < 1000; k++) {
123+
try (var c = target.subscribe(Long.valueOf(0), b -> hits.incrementAndGet())) {
124+
if (keepAround && k + 1 == 1000) {
125+
endPointReached.release();
126+
waitingForClose.acquire();
127+
}
128+
} catch (Exception ignored) {
129+
logger.catching(ignored);
130+
}
131+
}
132+
done.release();
133+
});
134+
t.setDaemon(true);
135+
t.start();
136+
}
137+
138+
endPointReached.acquire(active);
139+
done.acquire(SUBs - active);
140+
fakeSubs.publish(Long.valueOf(0));
141+
142+
await("Subscriptions should have hit")
143+
.untilAtomic(hits, IsEqual.equalTo(active));
144+
waitingForClose.release(active);
145+
}
146+
104147
}

0 commit comments

Comments
 (0)