Skip to content

Commit c063710

Browse files
committed
Add smoke test (and a corresponding auxiliary main) to start/stop an FS event stream using the native macOS APIs
1 parent 0c16dae commit c063710

File tree

1 file changed

+180
-0
lines changed
  • src/test/java/engineering/swat/watch/impl/mac

1 file changed

+180
-0
lines changed
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
package engineering.swat.watch.impl.mac;
2+
3+
import static engineering.swat.watch.impl.mac.apis.FileSystemEvents.FSEventStreamCreateFlag.FILE_EVENTS;
4+
import static engineering.swat.watch.impl.mac.apis.FileSystemEvents.FSEventStreamCreateFlag.NO_DEFER;
5+
import static engineering.swat.watch.impl.mac.apis.FileSystemEvents.FSEventStreamCreateFlag.WATCH_ROOT;
6+
import static org.awaitility.Awaitility.await;
7+
8+
import java.io.BufferedReader;
9+
import java.io.Closeable;
10+
import java.io.IOException;
11+
import java.io.InputStreamReader;
12+
import java.nio.file.Files;
13+
import java.util.concurrent.ConcurrentHashMap;
14+
import java.util.concurrent.atomic.AtomicBoolean;
15+
import java.util.stream.Collectors;
16+
import java.util.stream.Stream;
17+
18+
import org.apache.logging.log4j.LogManager;
19+
import org.apache.logging.log4j.Logger;
20+
import org.junit.jupiter.api.Test;
21+
22+
import com.sun.jna.Memory;
23+
import com.sun.jna.Native;
24+
import com.sun.jna.Pointer;
25+
import com.sun.jna.platform.mac.CoreFoundation;
26+
import com.sun.jna.platform.mac.CoreFoundation.CFArrayRef;
27+
import com.sun.jna.platform.mac.CoreFoundation.CFIndex;
28+
import com.sun.jna.platform.mac.CoreFoundation.CFStringRef;
29+
30+
import engineering.swat.watch.TestDirectory;
31+
import engineering.swat.watch.impl.mac.apis.DispatchObjects;
32+
import engineering.swat.watch.impl.mac.apis.DispatchQueue;
33+
import engineering.swat.watch.impl.mac.apis.FileSystemEvents;
34+
import engineering.swat.watch.impl.mac.apis.FileSystemEvents.FSEventStreamEventFlag;
35+
36+
class APIs {
37+
private static final Logger LOGGER = LogManager.getLogger();
38+
39+
// Native APIs
40+
private static final CoreFoundation CF = CoreFoundation.INSTANCE;
41+
private static final DispatchObjects DO = DispatchObjects.INSTANCE;
42+
private static final DispatchQueue DQ = DispatchQueue.INSTANCE;
43+
private static final FileSystemEvents FSE = FileSystemEvents.INSTANCE;
44+
45+
@Test
46+
void smokeTest() throws IOException {
47+
try (var test = new TestDirectory()) {
48+
var ready = new AtomicBoolean(false);
49+
var paths = ConcurrentHashMap.<String> newKeySet();
50+
51+
var s = test.getTestDirectory().toString();
52+
var handler = (MinimalWorkingExample.EventHandler) (path, flags, id) -> {
53+
synchronized (ready) {
54+
while (!ready.get()) {
55+
try {
56+
ready.wait();
57+
} catch (InterruptedException e) {
58+
LOGGER.error("Unexpected interrupt. Test likely to fail. Event ignored ({}).", prettyPrint(path, flags, id));
59+
Thread.currentThread().interrupt();
60+
return;
61+
}
62+
}
63+
}
64+
paths.remove(path);
65+
};
66+
67+
try (var mwe = new MinimalWorkingExample(s, handler)) {
68+
var dir = test.getTestDirectory().toRealPath();
69+
paths.add(Files.writeString(dir.resolve("a.txt"), "foo").toString());
70+
paths.add(Files.writeString(dir.resolve("b.txt"), "bar").toString());
71+
paths.add(Files.createFile(dir.resolve("d.txt")).toString());
72+
73+
synchronized (ready) {
74+
ready.set(true);
75+
ready.notifyAll();
76+
}
77+
78+
await("The event handler has been called").until(paths::isEmpty);
79+
}
80+
}
81+
}
82+
83+
public static void main(String[] args) throws IOException {
84+
var s = "/Users/sungshik/Desktop/tmp";
85+
var handler = (MinimalWorkingExample.EventHandler) (path, flags, id) -> {
86+
LOGGER.info(prettyPrint(path, flags, id));
87+
};
88+
89+
try (var mwe = new MinimalWorkingExample(s, handler)) {
90+
// Block the program from terminating until `ENTER` is pressed
91+
new BufferedReader(new InputStreamReader(System.in)).readLine();
92+
}
93+
}
94+
95+
private static String prettyPrint(String path, int flags, long id) {
96+
var flagsPrettyPrinted = Stream
97+
.of(FSEventStreamEventFlag.values())
98+
.filter(f -> (f.mask & flags) == f.mask)
99+
.map(Object::toString)
100+
.collect(Collectors.joining(", "));
101+
102+
var format = "path: \"%s\", flags: [%s], id: %s";
103+
return String.format(format, path, flagsPrettyPrinted, id);
104+
}
105+
106+
private static class MinimalWorkingExample implements Closeable {
107+
FileSystemEvents.FSEventStreamCallback callback;
108+
Pointer stream;
109+
Pointer queue;
110+
111+
public MinimalWorkingExample(String s, EventHandler handler) {
112+
113+
// Allocate singleton array of paths
114+
CFStringRef pathToWatch = CFStringRef.createCFString(s);
115+
CFArrayRef pathsToWatch = null;
116+
{
117+
var values = new Memory(Native.getNativeSize(CFStringRef.class));
118+
values.setPointer(0, pathToWatch.getPointer());
119+
pathsToWatch = CF.CFArrayCreate(
120+
CF.CFAllocatorGetDefault(),
121+
values,
122+
new CFIndex(1),
123+
null);
124+
} // Automatically free `values` when it goes out of scope
125+
126+
// Allocate callback
127+
this.callback = (x1, x2, x3, x4, x5, x6) -> {
128+
var paths = x4.getStringArray(0, (int) x3);
129+
var flags = x5.getIntArray(0, (int) x3);
130+
var ids = x6.getLongArray(0, (int) x3);
131+
for (int i = 0; i < x3; i++) {
132+
handler.handle(paths[i], flags[i], ids[i]);
133+
}
134+
};
135+
136+
// Allocate stream
137+
this.stream = FSE.FSEventStreamCreate(
138+
CF.CFAllocatorGetDefault(),
139+
callback,
140+
Pointer.NULL,
141+
pathsToWatch,
142+
FSE.FSEventsGetCurrentEventId(),
143+
0.15,
144+
NO_DEFER.mask | WATCH_ROOT.mask | FILE_EVENTS.mask);
145+
146+
// Deallocate array of paths
147+
pathsToWatch.release();
148+
pathToWatch.release();
149+
150+
// Allocate queue
151+
this.queue = DQ.dispatch_queue_create("q", null);
152+
153+
// Start stream
154+
FSE.FSEventStreamSetDispatchQueue(stream, queue);
155+
FSE.FSEventStreamStart(stream);
156+
FSE.FSEventStreamShow(stream);
157+
}
158+
159+
@Override
160+
public void close() throws IOException {
161+
162+
// Stop stream
163+
FSE.FSEventStreamStop(stream);
164+
FSE.FSEventStreamSetDispatchQueue(stream, Pointer.NULL);
165+
FSE.FSEventStreamInvalidate(stream);
166+
FSE.FSEventStreamRelease(stream);
167+
DO.dispatch_release(queue);
168+
169+
// Deallocate queue, stream, and callback
170+
this.queue = null;
171+
this.stream = null;
172+
this.callback = null;
173+
}
174+
175+
@FunctionalInterface
176+
private static interface EventHandler {
177+
void handle(String path, int flags, long id);
178+
}
179+
}
180+
}

0 commit comments

Comments
 (0)