3232import java .nio .file .Path ;
3333import java .util .concurrent .CompletableFuture ;
3434import java .util .concurrent .Executor ;
35+ import java .util .function .BiConsumer ;
3536import java .util .function .Consumer ;
37+ import java .util .function .Predicate ;
3638
3739import org .apache .logging .log4j .LogManager ;
3840import org .apache .logging .log4j .Logger ;
3941
42+ import engineering .swat .watch .impl .EventHandlingWatch ;
4043import engineering .swat .watch .impl .jdk .JDKDirectoryWatch ;
44+ import engineering .swat .watch .impl .jdk .JDKFileTreeWatch ;
4145import engineering .swat .watch .impl .jdk .JDKFileWatch ;
42- import engineering .swat .watch .impl .jdk .JDKRecursiveDirectoryWatch ;
46+ import engineering .swat .watch .impl .overflows .IndexingRescanner ;
47+ import engineering .swat .watch .impl .overflows .MemorylessRescanner ;
4348
4449/**
4550 * <p>Watch a path for changes.</p>
5055 */
5156public class Watcher {
5257 private final Logger logger = LogManager .getLogger ();
53- private final WatchScope scope ;
5458 private final Path path ;
59+ private final WatchScope scope ;
60+ private volatile Approximation approximateOnOverflow = Approximation .ALL ;
5561 private volatile Executor executor = CompletableFuture ::runAsync ;
5662
57- private static final Consumer <WatchEvent > EMPTY_HANDLER = p -> {};
58- private volatile Consumer <WatchEvent > eventHandler = EMPTY_HANDLER ;
59-
63+ private static final BiConsumer <EventHandlingWatch , WatchEvent > EMPTY_HANDLER = (w , e ) -> {};
64+ private volatile BiConsumer <EventHandlingWatch , WatchEvent > eventHandler = EMPTY_HANDLER ;
65+ private static final Predicate <WatchEvent > TRUE_FILTER = e -> true ;
66+ private volatile Predicate <WatchEvent > eventFilter = TRUE_FILTER ;
6067
61- private Watcher (WatchScope scope , Path path ) {
62- this .scope = scope ;
68+ private Watcher (Path path , WatchScope scope ) {
6369 this .path = path ;
70+ this .scope = scope ;
6471 }
6572
6673 /**
@@ -87,9 +94,8 @@ public static Watcher watch(Path path, WatchScope scope) {
8794 break ;
8895 default :
8996 throw new IllegalArgumentException ("Unsupported scope: " + scope );
90-
9197 }
92- return new Watcher (scope , path );
98+ return new Watcher (path , scope );
9399 }
94100
95101 /**
@@ -103,7 +109,7 @@ public Watcher on(Consumer<WatchEvent> eventHandler) {
103109 if (this .eventHandler != EMPTY_HANDLER ) {
104110 throw new IllegalArgumentException ("on handler cannot be set more than once" );
105111 }
106- this .eventHandler = eventHandler ;
112+ this .eventHandler = ( w , e ) -> eventHandler . accept ( e ) ;
107113 return this ;
108114 }
109115
@@ -114,7 +120,7 @@ public Watcher on(WatchEventListener listener) {
114120 if (this .eventHandler != EMPTY_HANDLER ) {
115121 throw new IllegalArgumentException ("on handler cannot be set more than once" );
116122 }
117- this .eventHandler = ev -> {
123+ this .eventHandler = ( w , ev ) -> {
118124 switch (ev .getKind ()) {
119125 case CREATED :
120126 listener .onCreated (ev );
@@ -135,6 +141,22 @@ public Watcher on(WatchEventListener listener) {
135141 return this ;
136142 }
137143
144+ /**
145+ * Configures the event filter to determine which events should be passed to
146+ * the event handler. By default (without calling this method), all events
147+ * are passed. This method must be called at most once.
148+ * @param predicate The predicate to determine an event should be kept
149+ * ({@code true}) or dropped ({@code false})
150+ * @return {@code this} (to support method chaining)
151+ */
152+ Watcher filter (Predicate <WatchEvent > predicate ) {
153+ if (this .eventFilter != TRUE_FILTER ) {
154+ throw new IllegalArgumentException ("filter cannot be set more than once" );
155+ }
156+ this .eventFilter = predicate ;
157+ return this ;
158+ }
159+
138160 /**
139161 * Optionally configure the executor in which the {@link #on(Consumer)} callbacks are scheduled.
140162 * If not defined, every task will be scheduled on the {@link java.util.concurrent.ForkJoinPool#commonPool()}.
@@ -146,6 +168,22 @@ public Watcher withExecutor(Executor callbackHandler) {
146168 return this ;
147169 }
148170
171+ /**
172+ * Optionally configure which regular files/directories in the scope of the
173+ * watch an <i>approximation</i> of synthetic events (of kinds
174+ * {@link WatchEvent.Kind#CREATED}, {@link WatchEvent.Kind#MODIFIED}, and/or
175+ * {@link WatchEvent.Kind#DELETED}) should be issued when an overflow event
176+ * happens. If not defined before this watcher is started, the
177+ * {@link Approximation#ALL} approach will be used.
178+ * @param whichFiles Constant to indicate for which regular
179+ * files/directories to approximate
180+ * @return This watcher for optional method chaining
181+ */
182+ public Watcher onOverflow (Approximation whichFiles ) {
183+ this .approximateOnOverflow = whichFiles ;
184+ return this ;
185+ }
186+
149187 /**
150188 * Start watch the path for events.
151189 * @return a subscription for the watch, when closed, new events will stop being registered to the worker pool.
@@ -157,33 +195,48 @@ public ActiveWatch start() throws IOException {
157195 throw new IllegalStateException ("There is no onEvent handler defined" );
158196 }
159197
198+ var h = applyApproximateOnOverflow ();
199+
160200 switch (scope ) {
161201 case PATH_AND_CHILDREN : {
162- var result = new JDKDirectoryWatch (path , executor , eventHandler , false );
202+ var result = new JDKDirectoryWatch (path , executor , h , eventFilter );
163203 result .open ();
164204 return result ;
165205 }
166206 case PATH_AND_ALL_DESCENDANTS : {
167207 try {
168- var result = new JDKDirectoryWatch (path , executor , eventHandler , true );
208+ var result = new JDKDirectoryWatch (path , executor , h , eventFilter , true );
169209 result .open ();
170210 return result ;
171211 } catch (Throwable ex ) {
172212 // no native support, use the simulation
173213 logger .debug ("Not possible to register the native watcher, using fallback for {}" , path );
174214 logger .trace (ex );
175- var result = new JDKRecursiveDirectoryWatch (path , executor , eventHandler );
215+ var result = new JDKFileTreeWatch (path , executor , h , eventFilter );
176216 result .open ();
177217 return result ;
178218 }
179219 }
180220 case PATH_ONLY : {
181- var result = new JDKFileWatch (path , executor , eventHandler );
221+ var result = new JDKFileWatch (path , executor , h , eventFilter );
182222 result .open ();
183223 return result ;
184224 }
185225 default :
186226 throw new IllegalStateException ("Not supported yet" );
187227 }
188228 }
229+
230+ private BiConsumer <EventHandlingWatch , WatchEvent > applyApproximateOnOverflow () {
231+ switch (approximateOnOverflow ) {
232+ case NONE :
233+ return eventHandler ;
234+ case ALL :
235+ return eventHandler .andThen (new MemorylessRescanner (executor ));
236+ case DIFF :
237+ return eventHandler .andThen (new IndexingRescanner (executor , path , scope ));
238+ default :
239+ throw new UnsupportedOperationException ("No event handler has been defined yet for this overflow policy" );
240+ }
241+ }
189242}
0 commit comments