diff --git a/.gitignore b/.gitignore index c0b4b082..718ba353 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,8 @@ target/ .sbtserver .scala-build/ .bsp/ +.bloop +.metals project/.sbtserver tags nohup.out diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 00000000..e72490fb --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,5 @@ +{ + "files.watcherExclude": { + "**/target": true + } +} \ No newline at end of file diff --git a/os/test/src/TestUtil.scala b/os/test/src/TestUtil.scala index fc82a7c6..39746046 100644 --- a/os/test/src/TestUtil.scala +++ b/os/test/src/TestUtil.scala @@ -38,7 +38,8 @@ object TestUtil { str1Normalized == str2Normalized } - def prep[T](f: os.Path => T)(implicit tp: TestPath, fn: sourcecode.FullName) = { + /** Creates a temporary directory for the current test. */ + def mkDir[T](f: os.Path => T)(implicit tp: TestPath, fn: sourcecode.FullName) = { val segments = Seq("out", "scratch") ++ fn.value.split('.').drop(2) ++ tp.value val directory = Paths.get(segments.mkString("/")) @@ -58,6 +59,15 @@ object TestUtil { } ) + val wd = os.Path(directory.toAbsolutePath) + os.makeDir.all.apply(wd) + f(wd) + } + + /** Populates the directory with test resources. */ + def populate[T](directory: Path)(implicit tp: TestPath, fn: sourcecode.FullName): Unit = { + if (os.exists(os.Path(directory))) os.remove.all(os.Path(directory)) + val original = Paths.get(sys.env("OS_TEST_RESOURCE_FOLDER"), "test") Files.walkFileTree( original, @@ -73,8 +83,14 @@ object TestUtil { } } ) + } - f(os.Path(directory.toAbsolutePath)) + /** Creates a temporary directory for the current test and populates it with test resources. */ + def prep[T](f: os.Path => T)(implicit tp: TestPath, fn: sourcecode.FullName) = { + mkDir { wd => + populate(wd.toNIO) + f(wd) + } } def prepChecker[T](f: os.Path => T)(implicit tp: TestPath, fn: sourcecode.FullName): T = diff --git a/os/watch/src/CarbonApi.scala b/os/watch/src/CarbonApi.scala deleted file mode 100644 index 8cb9ce0b..00000000 --- a/os/watch/src/CarbonApi.scala +++ /dev/null @@ -1,104 +0,0 @@ -package os.watch - -import com.sun.jna._ - -import com.sun.jna.ptr.PointerByReference - -object CarbonApi { - def apply() = INSTANCE - val INSTANCE = Native.load("Carbon", classOf[CarbonApi]).asInstanceOf[CarbonApi] -} - -trait FSEventStreamCallback extends Callback { - def invoke( - streamRef: FSEventStreamRef, - clientCallBackInfo: Pointer, - numEvents: NativeLong, - eventPaths: Pointer, - eventFlags: Pointer, - eventIds: Pointer - ): Unit -} - -trait CarbonApi extends Library { - def CFRelease(cfTypeRef: Any): Unit - - def CFArrayCreate( - allocator: CFAllocatorRef, // always set to Pointer.NULL - values: Array[Pointer], - numValues: CFIndex, - callBacks: Void - ): CFArrayRef - - // always set to Pointer.NULL): CFArrayRef - def CFStringCreateWithCharacters( - alloc: Void, // always pass NULL - chars: Array[Char], - numChars: CFIndex - ): CFStringRef - - def FSEventStreamCreate( - v: Pointer, // always use Pointer.NULL - callback: FSEventStreamCallback, - context: Pointer, - pathsToWatch: CFArrayRef, - sinceWhen: Long, // use -1 for events since now - latency: Double, // in seconds - flags: Int - ): FSEventStreamRef - - // 0 is good for now): FSEventStreamRef - def FSEventStreamStart(streamRef: FSEventStreamRef): Boolean - - def FSEventStreamStop(streamRef: FSEventStreamRef): Unit - - def FSEventStreamInvalidate(streamRef: FSEventStreamRef): Unit - def FSEventStreamUnscheduleFromRunLoop( - streamRef: FSEventStreamRef, - runLoop: CFRunLoopRef, - runLoopMode: CFStringRef - ): Unit - def FSEventStreamRelease(streamRef: FSEventStreamRef): Unit - - def FSEventStreamScheduleWithRunLoop( - streamRef: FSEventStreamRef, - runLoop: CFRunLoopRef, - runLoopMode: CFStringRef - ): Unit - - def CFRunLoopGetCurrent(): CFRunLoopRef - - def CFRunLoopRun(): Unit - - def CFRunLoopStop(rl: CFRunLoopRef): Unit -} - -class CFAllocatorRef extends PointerByReference {} - -class CFArrayRef extends PointerByReference {} - -@SerialVersionUID(0) -object CFIndex { - def valueOf(i: Int) = { - val idx = new CFIndex - idx.setValue(i) - idx - } -} - -@SerialVersionUID(0) -class CFIndex extends NativeLong {} - -class CFRunLoopRef extends PointerByReference {} - -object CFStringRef { - def toCFString(s: String) = { - val chars = s.toCharArray - val length = chars.length - CarbonApi.INSTANCE.CFStringCreateWithCharacters(null, chars, CFIndex.valueOf(length)) - } -} - -class CFStringRef extends PointerByReference {} - -class FSEventStreamRef extends PointerByReference {} diff --git a/os/watch/src/FSEventsWatcher.scala b/os/watch/src/FSEventsWatcher.scala deleted file mode 100644 index d6011eab..00000000 --- a/os/watch/src/FSEventsWatcher.scala +++ /dev/null @@ -1,110 +0,0 @@ -package os.watch - -import com.sun.jna.{NativeLong, Pointer} - -import scala.util.control.NonFatal - -class FSEventsWatcher( - srcs: Seq[os.Path], - onEvent: Set[os.Path] => Unit, - filter: os.Path => Boolean, - logger: (String, Any) => Unit, - latency: Double -) extends Watcher { - private[this] var closed = false - private[this] val callback = new FSEventStreamCallback { - def invoke( - streamRef: FSEventStreamRef, - clientCallBackInfo: Pointer, - numEvents: NativeLong, - eventPaths: Pointer, - eventFlags: Pointer, - eventIds: Pointer - ) = { - val length = numEvents.intValue - val pathStrings = eventPaths.getStringArray(0, length) - logger("FSEVENT", pathStrings) - val paths = pathStrings.iterator.map(os.Path(_)).filter(filter).toArray - val nestedPaths = collection.mutable.Buffer.empty[os.Path] - // When folders are moved, OS-X does not emit file events for all sub-paths - // within the new folder, so we are forced to walk that folder and emit the - // paths ourselves - for (p <- paths) { - if (os.isDir(p, followLinks = false)) { - try os.walk.stream(p).foreach(p => if (filter(p)) nestedPaths.append(p)) - catch { case NonFatal(_) => /*do nothing*/ } - } - } - onEvent((paths.iterator ++ nestedPaths.iterator).toSet) - } - } - - // Not sure why this is necessary, but without it the `CarbonApi()` calls in the `run()` - // method running on a separate thread hang for unknown reasons. - val dummyCfString = CFStringRef.toCFString("dummy") - - private[this] var current: CFRunLoopRef = null - - def run() = { - assert(!closed) - val cfStrings = srcs.map(p => CFStringRef.toCFString(p.toString).getPointer).toArray - val cfArray = - CarbonApi().CFArrayCreate(null, cfStrings, CFIndex.valueOf(srcs.length), null) - val kCFRunLoopDefaultMode = CFStringRef.toCFString("kCFRunLoopDefaultMode") - val streamRef = CarbonApi().FSEventStreamCreate( - Pointer.NULL, - callback, - Pointer.NULL, - cfArray, - -1, - latency, - // Flags defined at https://developer.apple.com/documentation/coreservices - // /1455376-fseventstreamcreateflags?language=objc - // - // File-level notifications https://developer.apple.com/documentation/coreservices - // /1455376-fseventstreamcreateflags/kfseventstreamcreateflagfileevents?language=objc - 0x00000010 | - // - // Don't defer https://developer.apple.com/documentation/coreservices - // /1455376-fseventstreamcreateflags/kfseventstreamcreateflagnodefer?language=objc - // - 0x00000002 - ) - - current = CarbonApi().CFRunLoopGetCurrent() - CarbonApi().FSEventStreamScheduleWithRunLoop( - streamRef, - current, - kCFRunLoopDefaultMode - ) - CarbonApi().FSEventStreamStart(streamRef) - logger("FSLOOP RUN", ()) - try CarbonApi().CFRunLoopRun() - finally { - CarbonApi().FSEventStreamStop(streamRef) - - CarbonApi().FSEventStreamUnscheduleFromRunLoop( - streamRef, - current, - kCFRunLoopDefaultMode - ) - CarbonApi().FSEventStreamInvalidate(streamRef) - CarbonApi().FSEventStreamRelease(streamRef) - CarbonApi().CFRelease(cfArray) - for (s <- cfStrings) CarbonApi().CFRelease(s) - CarbonApi().CFRelease(kCFRunLoopDefaultMode) - } - logger("FSLOOP END", ()) - } - - def close() = { - assert(!closed) - closed = true - logger("FSLOOP CLOSE", ()) - - CarbonApi().CFRunLoopStop(current) - CarbonApi().CFRelease(dummyCfString) - - logger("FSLOOP CLOSE END", ()) - } -} diff --git a/os/watch/src/macos/CoreServicesApi.scala b/os/watch/src/macos/CoreServicesApi.scala new file mode 100644 index 00000000..5e2b2cd5 --- /dev/null +++ b/os/watch/src/macos/CoreServicesApi.scala @@ -0,0 +1,191 @@ +package os.watch.macos + +import com.sun.jna._ + +object CoreServicesApi { + def apply(): CoreServicesApi = INSTANCE + val INSTANCE: CoreServicesApi = Native.load("CoreServices", classOf[CoreServicesApi]) +} +trait CoreServicesApi extends Library { + + /** + * Objective-C definition: + * {{{ + * void CFRelease(CFTypeRef cf); + * typedef const void * CFTypeRef; + * }}} + * + * @see https://developer.apple.com/documentation/corefoundation/1521153-cfrelease + */ + def CFRelease(cfTypeRef: PointerType): Unit + + /** + * Objective-C definition: + * {{{ + * extern CFArrayRef CFArrayCreate( + * CFAllocatorRef allocator, const void * * values, CFIndex numValues, const CFArrayCallBacks * callBacks + * ); + * }}} + * + * @param allocator Always pass NULL to use the current default allocator. + * @see https://developer.apple.com/documentation/corefoundation/cfarraycreate(_:_:_:_:) + * @return can return NULL if there was a problem creating the array + */ + def CFArrayCreate( + allocator: Void, + values: Array[Pointer], + numValues: CFIndex, + callBacks: Void + ): CFArrayRef + + def CFArrayCreate(values: Array[Pointer]): Option[CFArrayRef] = { + val numValues = CFIndex(values.length) + val maybeCfArrayRef = CFArrayCreate(allocator = null, values, numValues, callBacks = null) + NativeUtils.optionOf(maybeCfArrayRef) + } + + /** + * @param alloc Always pass NULL to use the current default allocator. + * @param chars The buffer of Unicode characters to copy into the new string. + * @param numChars The number of characters in the buffer pointed to by chars. Only this number of characters will + * be copied to internal storage. + * @return An immutable string containing chars, or NULL if there was a problem creating the object. + * @see https://developer.apple.com/documentation/corefoundation/cfstringcreatewithcharacters(_:_:_:)?language=objc + */ + def CFStringCreateWithCharacters( + alloc: Void, + chars: Array[Char], + numChars: CFIndex + ): CFStringRef + + def CFStringCreateWithCharacters(chars: Array[Char]): Option[CFStringRef] = { + val numChars = CFIndex(chars.length) + val maybeCfStringRef = CFStringCreateWithCharacters(alloc = null, chars, numChars) + NativeUtils.optionOf(maybeCfStringRef) + } + + /** + * @param allocator Always pass NULL to use the current default allocator. + * @param context A pointer to the FSEventStreamContext structure the client wants to associate with this stream. + * Its fields are copied out into the stream itself so its memory can be released after the stream + * is created. Passing NULL is allowed and has the same effect as passing a structure whose fields + * are all set to zero. We don't use this context, so we only allow passing NULL. + * @param sinceWhen The time to start watching for events. Use -1 for events since now. + * @param latency The latency of the stream, in seconds. + * @param flags The flags for the stream. See + * https://developer.apple.com/documentation/coreservices/1455376-fseventstreamcreateflags?language=objc + * @see https://developer.apple.com/documentation/coreservices/1443980-fseventstreamcreate?language=objc + */ + def FSEventStreamCreate( + allocator: Void, + callback: FSEventStreamCallback, + context: Void, + pathsToWatch: CFArrayRef, + sinceWhen: Long, + latency: Double, + flags: Int + ): FSEventStreamRef + + def FSEventStreamCreate( + callback: FSEventStreamCallback, + pathsToWatch: CFArrayRef, + sinceWhen: Long, + latency: Double, + flags: Int + ): FSEventStreamRef = { + FSEventStreamCreate( + allocator = null, + callback = callback, + context = null, + pathsToWatch = pathsToWatch, + sinceWhen = sinceWhen, + latency = latency, + flags = flags + ) + } + + /** + * Objective-C definition: + * {{{ + * void FSEventStreamSetDispatchQueue(FSEventStreamRef streamRef, dispatch_queue_t q); + * }}} + * + * @see https://developer.apple.com/documentation/coreservices/1444164-fseventstreamsetdispatchqueue?language=objc + */ + def FSEventStreamSetDispatchQueue(streamRef: FSEventStreamRef, queue: dispatch_queue_t): Unit + + /** + * @return True if it succeeds, otherwise False if it fails. It ought to always succeed, but in the event it does + * not then your code should fall back to performing recursive scans of the directories of interest as + * appropriate. + * + * @see https://developer.apple.com/documentation/coreservices/1448000-fseventstreamstart?language=objc + */ + def FSEventStreamStart(streamRef: FSEventStreamRef): Boolean + + /** + * Objective-C definition: + * {{{ + * void FSEventStreamStop(FSEventStreamRef streamRef); + * }}} + * + * @see https://developer.apple.com/documentation/coreservices/1447673-fseventstreamstop?language=objc + */ + def FSEventStreamStop(streamRef: FSEventStreamRef): Unit + + /** @see https://developer.apple.com/documentation/coreservices/1446990-fseventstreaminvalidate?language=objc */ + def FSEventStreamInvalidate(streamRef: FSEventStreamRef): Unit + + /** @see https://developer.apple.com/documentation/coreservices/1445989-fseventstreamrelease?language=objc */ + def FSEventStreamRelease(streamRef: FSEventStreamRef): Unit +} + +trait FSEventStreamCallback extends Callback { + def invoke( + streamRef: FSEventStreamRef, + clientCallBackInfo: Pointer, + numEvents: NativeLong, + eventPaths: Pointer, + eventFlags: Pointer, + eventIds: Pointer + ): Unit +} + +/** + * Objective-C definition: `typedef const struct __CFArray * CFArrayRef;` + * + * @see https://developer.apple.com/documentation/corefoundation/cfarray?language=objc + */ +class CFArrayRef extends PointerType + +object CFIndex { + def apply(value: Long): CFIndex = new CFIndex(value) +} + +/** @see https://developer.apple.com/documentation/corefoundation/cfindex */ +class CFIndex(value: Long) extends NativeLong(value) { + // Required by JNA + def this() = this(0) +} + +/** + * Objective-C definition: `typedef const struct __CFString * CFStringRef;` + * + * @see https://developer.apple.com/documentation/corefoundation/cfstring?language=objc + */ +class CFStringRef extends PointerType +object CFStringRef { + + /** + * @return An immutable string containing chars, or [[None]] if there was a problem creating the object. + */ + def apply(s: String): Option[CFStringRef] = + CoreServicesApi.INSTANCE.CFStringCreateWithCharacters(s.toCharArray) +} + +/** + * Objective-C definition: `typedef struct __FSEventStream *FSEventStreamRef;` + * + * @see https://developer.apple.com/documentation/coreservices/fseventstreamref + */ +class FSEventStreamRef extends PointerType diff --git a/os/watch/src/macos/DispatchApi.scala b/os/watch/src/macos/DispatchApi.scala new file mode 100644 index 00000000..e3ca4903 --- /dev/null +++ b/os/watch/src/macos/DispatchApi.scala @@ -0,0 +1,30 @@ +package os.watch.macos + +import com.sun.jna._ + +/** @see https://developer.apple.com/documentation/dispatch?language=objc */ +object DispatchApi { + def apply(): DispatchApi = INSTANCE + val INSTANCE: DispatchApi = Native.load("c", classOf[DispatchApi]) +} +trait DispatchApi extends Library { + + /** + * @param attr The queue type. We always use `null` to indicate a serial queue. + * + * @see https://developer.apple.com/documentation/dispatch/1453030-dispatch_queue_create/ + */ + def dispatch_queue_create(label: String, attr: Void): dispatch_queue_t + + def dispatch_queue_create(label: String): dispatch_queue_t = dispatch_queue_create(label, null) + + /** @see https://developer.apple.com/documentation/dispatch/1496328-dispatch_release */ + def dispatch_release(queue: dispatch_queue_t): Unit +} + +/** + * Objective-C definition: `typedef NSObject * dispatch_queue_t;` + * + * @see https://developer.apple.com/documentation/dispatch/dispatch_queue_t?language=objc + */ +class dispatch_queue_t extends PointerType diff --git a/os/watch/src/macos/FSEventsWatcher.scala b/os/watch/src/macos/FSEventsWatcher.scala new file mode 100644 index 00000000..b931a8b2 --- /dev/null +++ b/os/watch/src/macos/FSEventsWatcher.scala @@ -0,0 +1,128 @@ +package os.watch.macos + +import com.sun.jna.{NativeLong, Pointer} +import os.watch.Watcher + +import scala.util.control.NonFatal + +/** + * Implements a watcher using FSEvents on macOS. + */ +class FSEventsWatcher( + srcs: Seq[os.Path], + onEvent: Set[os.Path] => Unit, + filter: os.Path => Boolean, + logger: (String, Any) => Unit, + latency: Double +) extends Watcher { + import CoreServicesApi.INSTANCE._ + import DispatchApi.INSTANCE._ + + @volatile private var closed = false + + // Start with an empty filter to make sure sentinels are picked up. + @volatile private var actualFilter: os.Path => Boolean = (_ => true) + + def sentinelsPickedUp(): Unit = { + // Apply the actual filter after watch has been set up. + actualFilter = filter + } + + private val callback = new FSEventStreamCallback { + def invoke( + streamRef: FSEventStreamRef, + clientCallBackInfo: Pointer, + numEvents: NativeLong, + eventPaths: Pointer, + eventFlags: Pointer, + eventIds: Pointer + ): Unit = { + try { + val length = numEvents.intValue + val pathStrings = eventPaths.getStringArray(0, length) + logger("FSEVENT", pathStrings) + val paths = pathStrings.iterator.map(os.Path(_)).filter(actualFilter).toArray + val nestedPaths = collection.mutable.Buffer.empty[os.Path] + // When folders are moved, OS-X does not emit file events for all sub-paths + // within the new folder, so we are forced to walk that folder and emit the + // paths ourselves + for (p <- paths) { + if (os.isDir(p, followLinks = false)) { + try os.walk.stream(p).foreach(p => if (actualFilter(p)) nestedPaths.append(p)) + catch { + case NonFatal(_) => /*do nothing*/ + } + } + } +// logger("FSEVENT paths", paths.toVector) +// logger("FSEVENT nested paths", nestedPaths.toVector) + onEvent((paths.iterator ++ nestedPaths.iterator).toSet) + } catch { + case e: Throwable => + logger("FSEVENT CALLBACK ERROR", e) + } + } + } + + /** Used to signal the watcher to stop. */ + private val signal: Object = new Object + + def run(): Unit = { + assert(!closed, "Cannot `run()` a closed watcher.") + + def startFsEventStream[A](streamRef: FSEventStreamRef)(f: => A): A = { + val success = FSEventStreamStart(streamRef) + if (!success) throw new IllegalStateException("FSEventStreamStart returned false") + + try f + finally FSEventStreamStop(streamRef) + } + + val pathsToWatchCfStrings = NativeUtils.assertAllSome( + srcs.iterator.map(path => CFStringRef(path.toString)).toArray, + CFRelease + ).getOrElse(throw new IllegalStateException( + s"Can't create CFStrings for $srcs, some values were null" + )) + try { + val pathsToWatchArrayRef = CFArrayCreate(pathsToWatchCfStrings.map(_.getPointer)).getOrElse( + throw new IllegalStateException(s"CFArrayCreate returned null for $srcs") + ) + try { + val streamRef = FSEventStreamCreate( + callback, + pathsToWatchArrayRef, + sinceWhen = -1, + latency, + flags = + // Flags defined at https://developer.apple.com/documentation/coreservices/1455376-fseventstreamcreateflags?language=objc + // + // File-level notifications https://developer.apple.com/documentation/coreservices/1455376-fseventstreamcreateflags/kfseventstreamcreateflagfileevents?language=objc + 0x00000010 | + // Don't defer https://developer.apple.com/documentation/coreservices/1455376-fseventstreamcreateflags/kfseventstreamcreateflagnodefer?language=objc + 0x00000002 + ) + try { + val queue = dispatch_queue_create("os.watch.macos.FSEventsWatcher") + try { + FSEventStreamSetDispatchQueue(streamRef, queue) + try { + startFsEventStream(streamRef) { + // We don't really need to have this thread blocked, as macOS has its own dispatch threads, + // but because Linux/Windows watchers are blocking, we do the same here in the sake of + // simplicity. + signal.synchronized(signal.wait()) + } + } finally FSEventStreamInvalidate(streamRef) + } finally dispatch_release(queue) + } finally FSEventStreamRelease(streamRef) + } finally CFRelease(pathsToWatchArrayRef) + } finally pathsToWatchCfStrings.foreach(CFRelease) + } + + /** This is thread-safe and idempotent. */ + def close(): Unit = { + signal.synchronized(signal.notify()) + closed = true + } +} diff --git a/os/watch/src/macos/NativeUtils.scala b/os/watch/src/macos/NativeUtils.scala new file mode 100644 index 00000000..578ee2ff --- /dev/null +++ b/os/watch/src/macos/NativeUtils.scala @@ -0,0 +1,28 @@ +package os.watch.macos + +import com.sun.jna.PointerType + +import scala.reflect.ClassTag + +object NativeUtils { + + /** Checks if a type-safe pointer is null. */ + def optionOf[A <: PointerType](a: A): Option[A] = + if (a.getPointer == null) None else Some(a) + + /** + * Ensures that all elements of `iterable` are [[Some]]. If any of them are [[None]], all the [[Some]] elements are + * released and then [[None]] is returned. + */ + def assertAllSome[A: ClassTag]( + array: Array[Option[A]], + release: A => Unit + ): Option[Array[A]] = { + if (array.forall(_.isDefined)) { + Some(array.map(_.get)) + } else { + array.foreach(_.foreach(release)) + None + } + } +} diff --git a/os/watch/src/package.scala b/os/watch/src/package.scala index 0520d441..995c60bd 100644 --- a/os/watch/src/package.scala +++ b/os/watch/src/package.scala @@ -1,10 +1,19 @@ package os +import java.util.UUID +import scala.concurrent.TimeoutException +import scala.util.Random +import scala.concurrent.duration._ +import scala.util.control.NonFatal + package object watch { + private type OnEvent = Set[os.Path] => Unit /** - * Efficiently watches the given `roots` folders for changes. Any time the - * filesystem is modified within those folders, the `onEvent` callback is + * Efficiently watches the given `roots` folders for changes. Note that these folders need + * to exist before the method is called. + * + * Any time the filesystem is modified within those folders, the `onEvent` callback is * called with the paths to the changed files or folders. * * Once the call to `watch` returns, `onEvent` is guaranteed to receive a @@ -25,6 +34,8 @@ package object watch { * at which the change happened. It is up to the `onEvent` handler to query * the filesystem and figure out what happened, and what it wants to do. * + * @param onEvent a callback that is called with the paths to the changed. Only starts emitting events once this + * method returns. * @param filter when new paths under `roots` are created, this function is * invoked with each path. If it returns `false`, the path is * not watched. @@ -35,18 +46,141 @@ package object watch { logger: (String, Any) => Unit = (_, _) => (), filter: os.Path => Boolean = _ => true ): AutoCloseable = { - val watcher = System.getProperty("os.name") match { - case "Mac OS X" => new os.watch.FSEventsWatcher(roots, onEvent, filter, logger, 0.05) - case _ => new os.watch.WatchServiceWatcher(roots, onEvent, filter, logger) + val errors = roots.iterator.flatMap { path => + if (!os.exists(path)) Some(s"Root path does not exist: $path") + else if (!os.isDir(path)) Some(s"Root path is not a directory: $path") + else None + }.toVector + + if (errors.nonEmpty) { + throw new IllegalArgumentException(errors.mkString("\n")) } + val sentinelFiles = roots.iterator.map(_ / s".os-lib-watch-sentinel-${UUID.randomUUID()}").toSet + + @volatile var customOnEvent: OnEvent = onEvent + // Needed because the function passed to the watcher implementation is stable and we need to change it + // upon every call. + val onEvent0: OnEvent = paths => customOnEvent(paths) + + val watcherEither = System.getProperty("os.name") match { + case "Mac OS X" => + Left(new os.watch.macos.FSEventsWatcher(roots, onEvent0, filter, logger, latency = 0.05)) + case _ => Right(new os.watch.WatchServiceWatcher(roots, onEvent0, filter, logger)) + } + val watcher = watcherEither.merge + val thread = new Thread { override def run(): Unit = { - watcher.run() + try { + watcher.run() + } catch { + case NonFatal(t) => + logger("EXCEPTION", t) + Console.err.println( + s"""Watcher thread failed: + | roots = $roots + | exception = $t""".stripMargin + ) + } } } thread.setDaemon(true) thread.start() + + logger("WAITING FOR SENTINELS", sentinelFiles) + sentinelFiles.foreach { sentinel => + waitUntilWatchIsSetUp( + sentinel, + setCustomOnEvent = { + case Some(custom) => customOnEvent = custom + case None => customOnEvent = onEvent + }, + logger + ) + } + watcherEither match { + case Left(macWatcher) => + // Apply the actual filter once watch has been set up + macWatcher.sentinelsPickedUp() + case Right(_) => + // nothing is necessary as `filter` for this watcher is only used for filtering out watched subdirectories + // and thus does not need to be updated + } + logger("SENTINELS PICKED UP", sentinelFiles) + watcher } + + private def waitUntilWatchIsSetUp( + sentinelFile: os.Path, + setCustomOnEvent: Option[OnEvent] => Unit, + logger: (String, Any) => Unit + ): Unit = { + def writeSentinel() = os.write.over( + sentinelFile, + s"""This file was created because Scala `os-lib` library is trying to set up a watch for this + |directory. It is automatically deleted when the watch is set up. + | + |If you are seeing this file, it means that the watch is not being set up correctly. + | + |Raise an issue at https://github.com/lihaoyi/os-lib/issues + |""".stripMargin + ) + + val timeout = 5.seconds + val timeoutNanos = timeout.toNanos + + def waitUntilPickedUp( + wasPickedUp: () => Boolean, + changeType: String, + afterSleep: () => Unit + ): Unit = { + logger(s"WAITING FOR SENTINEL TO BE PICKED UP ($changeType)", sentinelFile) + val start = System.nanoTime() + while (!wasPickedUp()) { + val taken = System.nanoTime() - start + if (taken >= timeoutNanos) + throw new TimeoutException( + s"can't set up watch, no file system changes detected (expected $changeType) within $timeout " + + s"for sentinel file $sentinelFile" + ) + Thread.sleep(5) + + afterSleep() + } + logger(s"SENTINEL PICKED UP ($changeType)", sentinelFile) + } + + try { + logger("WRITING SENTINEL", sentinelFile) + @volatile var pickedUp = false + setCustomOnEvent(Some { changed => + if (changed.contains(sentinelFile)) pickedUp = true + }) + writeSentinel() + + waitUntilPickedUp( + wasPickedUp = () => pickedUp, + changeType = "write", + afterSleep = writeSentinel + ) + } finally { + @volatile var pickedUp = false + setCustomOnEvent(Some { changed => + if (changed.contains(sentinelFile)) pickedUp = true + }) + try { + logger("REMOVING SENTINEL", sentinelFile) + os.remove(sentinelFile) + waitUntilPickedUp( + wasPickedUp = () => pickedUp, + changeType = "removal", + afterSleep = () => {} + ) + } finally { + setCustomOnEvent(None) + } + } + } } diff --git a/os/watch/test/src/WatchTests.scala b/os/watch/test/src/WatchTests.scala index 620c0660..8c4716ad 100644 --- a/os/watch/test/src/WatchTests.scala +++ b/os/watch/test/src/WatchTests.scala @@ -1,9 +1,14 @@ package test.os.watch -import scala.util.Properties.isWin +import os.Path + +import scala.util.Properties.{isWin, isMac} import scala.util.Random import utest._ +import scala.concurrent.{Await, Future, TimeoutException} +import scala.concurrent.duration._ + object WatchTests extends TestSuite with TestSuite.Retries { override val utestRetryCount = if (sys.env.get("CI").contains("true")) { @@ -13,161 +18,427 @@ object WatchTests extends TestSuite with TestSuite.Retries { 0 } - val tests = Tests { - test("singleFolder") - _root_.test.os.TestUtil.prep { wd => - val changedPaths = collection.mutable.Set.empty[os.Path] - _root_.os.watch.watch( + class ChangedPaths(wd: os.Path, filter: os.Path => Boolean = _ => true) { + private val changed = collection.mutable.Set.empty[os.Path] + + def withFilter(f: os.Path => Boolean) = new ChangedPaths(wd, filter = f) + + def withWatcher[A](f: => A) = { + val watcher = _root_.os.watch.watch( Seq(wd), - onEvent = _.foreach(changedPaths.add) + onEvent = onEvent, + filter = filter +// logger = (str, value) => println(s"$str $value") ) + try f + finally watcher.close() + } -// os.write(wd / "lols", "") -// Thread.sleep(100) + def onEvent(paths: Set[os.Path]): Unit = synchronized { + changed ++= paths + } - changedPaths.clear() + def clear(): Unit = synchronized { + changed.clear() + } - def checkFileManglingChanges(p: os.Path) = { + def checkChanges(action: => Unit, expectedChangedPaths: Set[os.SubPath]) = { + synchronized { changed.clear() } + action + Thread.sleep(200) + val changedSubPaths = synchronized { changed.map(_.subRelativeTo(wd)) } - checkChanges( - os.write(p, Random.nextString(100)), - Set(p.subRelativeTo(wd)) - ) + // on Windows sometimes we get more changes + if (isWin) assert(expectedChangedPaths.subsetOf(changedSubPaths)) + else assert(expectedChangedPaths == changedSubPaths) + } + } + object ChangedPaths { + def apply[A](wd: os.Path)(f: ChangedPaths => A): A = + apply(wd, identity)(f) - checkChanges( - os.write.append(p, "hello"), - Set(p.subRelativeTo(wd)) - ) + def apply[A](wd: os.Path, mod: ChangedPaths => ChangedPaths)(f: ChangedPaths => A): A = { + val changedPaths0 = new ChangedPaths(wd) + val changedPaths = mod(changedPaths0) + changedPaths.withWatcher(f(changedPaths)) + } + } - checkChanges( - os.write.over(p, "world"), - Set(p.subRelativeTo(wd)) - ) + val tests = Tests { + // Watching a non-existent folder throws + test("nonExistentFolder") - _root_.test.os.TestUtil.prep { wd => + intercept[IllegalArgumentException] { + _root_.os.watch.watch(Seq(wd / "does-not-exist"), onEvent = _ => ()) + } + } - checkChanges( - os.truncate(p, 1), - Set(p.subRelativeTo(wd)) - ) + test("emptyFolder") { + // Watching an empty folder does not leave a sentinel file there. + test("doesNotLeaveSentinel") - _root_.test.os.TestUtil.mkDir { wd => + val filesBefore = os.list(wd) + assert(filesBefore.isEmpty) - checkChanges( - os.remove(p), - Set(p.subRelativeTo(wd)) - ) + ChangedPaths(wd) { _ => + val files = os.list(wd) + assert(files.isEmpty) + } } - def checkChanges(action: => Unit, expectedChangedPaths: Set[os.SubPath]) = synchronized { - changedPaths.clear() - action - Thread.sleep(200) - val changedSubPaths = changedPaths.map(_.subRelativeTo(wd)) - // on Windows sometimes we get more changes - if (isWin) assert(expectedChangedPaths.subsetOf(changedSubPaths)) - else assert(expectedChangedPaths == changedSubPaths) + + // Sentinel file creation works even when filter is set + test("worksWithFilter") - _root_.test.os.TestUtil.mkDir { wd => + val filesBefore = os.list(wd) + assert(filesBefore.isEmpty) + + ChangedPaths(wd, _.withFilter(_ => false /* ignore everything */ )) { _ => + val files = os.list(wd) + assert(files.isEmpty) + } + } + + // Watching an empty folder only emits events for the file we change and not the sentinel + test("singleFileChangeManyTimes") - _root_.test.os.TestUtil.mkDir { wd => + val file = wd / "the-file" + os.write(file, "hello") + ChangedPaths(wd) { changedPaths => + (0 to 20).foreach { idx => + println(s"#$idx") + changedPaths.checkChanges( + os.write.over(file, s"#$idx: ${Random.nextInt()}"), + Set(file.subRelativeTo(wd)) + ) + } + } } + } - checkFileManglingChanges(wd / "test") + test("singleFolder") - _root_.test.os.TestUtil.prep { wd => + ChangedPaths(wd) { changedPaths => + // os.write(wd / "lols", "") + // Thread.sleep(100) - checkChanges( - os.remove(wd / "File.txt"), - Set(os.sub / "File.txt") - ) + changedPaths.clear() - checkChanges( - os.makeDir(wd / "my-new-folder"), - Set(os.sub / "my-new-folder") - ) + def checkFileManglingChanges(p: os.Path) = { + + changedPaths.checkChanges( + os.write(p, Random.nextString(100)), + Set(p.subRelativeTo(wd)) + ) + + changedPaths.checkChanges( + os.write.append(p, "hello"), + Set(p.subRelativeTo(wd)) + ) + + changedPaths.checkChanges( + os.write.over(p, "world"), + Set(p.subRelativeTo(wd)) + ) + + changedPaths.checkChanges( + os.truncate(p, 1), + Set(p.subRelativeTo(wd)) + ) + + changedPaths.checkChanges( + os.remove(p), + Set(p.subRelativeTo(wd)) + ) + } - checkFileManglingChanges(wd / "my-new-folder/test") + checkFileManglingChanges(wd / "test") - locally { - val expectedChanges = if (isWin) Set( - os.sub / "folder2", - os.sub / "folder3" + changedPaths.checkChanges( + os.remove(wd / "File.txt"), + Set(os.sub / "File.txt") ) - else Set( - os.sub / "folder2", - os.sub / "folder3", - os.sub / "folder3/nestedA", - os.sub / "folder3/nestedA/a.txt", - os.sub / "folder3/nestedB", - os.sub / "folder3/nestedB/b.txt" + + changedPaths.checkChanges( + os.makeDir(wd / "my-new-folder"), + Set(os.sub / "my-new-folder") ) - checkChanges( - os.move(wd / "folder2", wd / "folder3"), - expectedChanges + + checkFileManglingChanges(wd / "my-new-folder/test") + + locally { + val expectedChanges = if (isWin) Set( + os.sub / "folder2", + os.sub / "folder3" + ) + else Set( + os.sub / "folder2", + os.sub / "folder3", + os.sub / "folder3/nestedA", + os.sub / "folder3/nestedA/a.txt", + os.sub / "folder3/nestedB", + os.sub / "folder3/nestedB/b.txt" + ) + changedPaths.checkChanges( + os.move(wd / "folder2", wd / "folder3"), + expectedChanges + ) + } + + changedPaths.checkChanges( + os.copy(wd / "folder3", wd / "folder4"), + Set( + os.sub / "folder4", + os.sub / "folder4/nestedA", + os.sub / "folder4/nestedA/a.txt", + os.sub / "folder4/nestedB", + os.sub / "folder4/nestedB/b.txt" + ) ) - } - checkChanges( - os.copy(wd / "folder3", wd / "folder4"), - Set( - os.sub / "folder4", - os.sub / "folder4/nestedA", - os.sub / "folder4/nestedA/a.txt", - os.sub / "folder4/nestedB", - os.sub / "folder4/nestedB/b.txt" + changedPaths.checkChanges( + os.remove.all(wd / "folder4"), + Set( + os.sub / "folder4", + os.sub / "folder4/nestedA", + os.sub / "folder4/nestedA/a.txt", + os.sub / "folder4/nestedB", + os.sub / "folder4/nestedB/b.txt" + ) ) - ) - checkChanges( - os.remove.all(wd / "folder4"), - Set( - os.sub / "folder4", - os.sub / "folder4/nestedA", - os.sub / "folder4/nestedA/a.txt", - os.sub / "folder4/nestedB", - os.sub / "folder4/nestedB/b.txt" + checkFileManglingChanges(wd / "folder3/nestedA/double-nested-file") + checkFileManglingChanges(wd / "folder3/nestedB/double-nested-file") + + changedPaths.checkChanges( + os.symlink(wd / "newlink", wd / "doesntexist"), + Set(os.sub / "newlink") ) - ) - checkFileManglingChanges(wd / "folder3/nestedA/double-nested-file") - checkFileManglingChanges(wd / "folder3/nestedB/double-nested-file") + changedPaths.checkChanges( + os.symlink(wd / "newlink2", wd / "folder3"), + Set(os.sub / "newlink2") + ) - checkChanges( - os.symlink(wd / "newlink", wd / "doesntexist"), - Set(os.sub / "newlink") - ) + changedPaths.checkChanges( + os.hardlink(wd / "newlink3", wd / "folder3/nestedA/a.txt"), + System.getProperty("os.name") match { + case "Mac OS X" => + Set( + os.sub / "newlink3", + os.sub / "folder3/nestedA", + os.sub / "folder3/nestedA/a.txt" + ) + case _ => Set(os.sub / "newlink3") + } + ) + } + } - checkChanges( - os.symlink(wd / "newlink2", wd / "folder3"), - Set(os.sub / "newlink2") - ) + def createManyFilesInManyFolders(wd: Path, numPaths: Int) = { + val rng = new Random(100) + val paths = generateNRandomPaths(numPaths, wd, random = rng) + val directories = paths.iterator.map(_.toNIO.getParent.toAbsolutePath).toSet + directories.foreach(dir => os.makeDir.all.apply(Path(dir))) + paths.foreach(p => os.write.over(p, rng.nextString(100))) + paths + } - checkChanges( - os.hardlink(wd / "newlink3", wd / "folder3/nestedA/a.txt"), - System.getProperty("os.name") match { - case "Mac OS X" => - Set( - os.sub / "newlink3", - os.sub / "folder3/nestedA", - os.sub / "folder3/nestedA/a.txt" - ) - case _ => Set(os.sub / "newlink3") + def testManyFilesInManyFolders(wd: Path, paths: Vector[Path]): Unit = { + val changedPaths = collection.mutable.Set.empty[os.Path] + + def waitUntilFinished(): Unit = { + val timeoutMs = 500 + // print("Waiting for events to stop coming") + // System.out.flush() + var last = changedPaths.size + Thread.sleep(timeoutMs) + var current = last + while ({ current = changedPaths.size; last != current }) { + last = current + // print(".") + // System.out.flush() + Thread.sleep(timeoutMs) } + // println(" Done.") + } + + // println(s"Watching $wd") + val watcher = os.watch.watch( + Seq(wd), + onEvent = paths => changedPaths ++= paths + // logger = (evt, data) => println(s"$evt $data") ) + try { + // On mac os if you create a bunch of files and then start watching the directory + // AFTER those files are created, you will get events about those files. + // + // Which makes no sense, but it is what it is. Thus we wait until we aren't getting + // any more events and then make sure to clear the set before actually running our test. + waitUntilFinished() + changedPaths.clear() + val willChange = paths.iterator.take(paths.size / 2).toSet + willChange.foreach(p => os.write.over(p, "changed")) + waitUntilFinished() + + val unexpectedChanges = changedPaths.toSet -- willChange + val unexpectedChangeCount = unexpectedChanges.size + assert(unexpectedChangeCount == 0) + + val missingChanges = willChange -- changedPaths + val missingChangeCount = missingChanges.size + assert(missingChangeCount == 0) + } finally { + watcher.close() + } } - test("openClose") { - _root_.test.os.TestUtil.prep { wd => - println("openClose in " + wd) - for (index <- Range(0, 200)) { - println("watch index " + index) - @volatile var done = false - val res = os.watch.watch( - Seq(wd), - filter = _ => true, - onEvent = path => { - println(path); - done = true - }, - logger = (event, data) => println(event) - ) - Thread.sleep(10) - os.write.append(wd / s"file.txt", "" + index) - try { - while (!done) Thread.sleep(1) - } finally res.close() + test("manyFiles") { + test("inManyFoldersSmall") - _root_.test.os.TestUtil.prep { wd => + val paths = createManyFilesInManyFolders(wd, numPaths = 1000) + testManyFilesInManyFolders(wd, paths) + } + + test("inManyFoldersMedium") - _root_.test.os.TestUtil.prep { wd => + val paths = createManyFilesInManyFolders(wd, numPaths = 5000) + testManyFilesInManyFolders(wd, paths) + } + + test("inManyFoldersLarge") - _root_.test.os.TestUtil.prep { wd => + val paths = createManyFilesInManyFolders(wd, numPaths = 10000) + testManyFilesInManyFolders(wd, paths) + } + + test("inManyFoldersLargest") - _root_.test.os.TestUtil.prep { wd => + // On macOS this always fails, some changes are lost with that many files. + if (!isMac) { + val numPaths = + 12000 // My Linux machine starts overflowing and losing events at 13k files. + val paths = createManyFilesInManyFolders(wd, numPaths) + testManyFilesInManyFolders(wd, paths) + } + } + + test("inManyFoldersThreaded") - _root_.test.os.TestUtil.prep { wd => + import scala.concurrent.ExecutionContext.Implicits.global + + val numPaths = 1000 + val futures = (0 to 100).map { idx => + Future { + val myWd = wd / s"job-$idx" + val paths = createManyFilesInManyFolders(myWd, numPaths) + testManyFilesInManyFolders(myWd, paths) + } } + futures.foreach(Await.result(_, 20.seconds)) + } + + test("inManyFoldersThreadedSequential") - _root_.test.os.TestUtil.prep { wd => + import scala.concurrent.ExecutionContext.Implicits.global + + val numPaths = 1000 + val lock = new Object + val futures = (0 to 100).map { idx => + Future { + val myWd = wd / s"job-$idx" + val paths = createManyFilesInManyFolders(myWd, numPaths) + lock.synchronized { + Future(testManyFilesInManyFolders(myWd, paths)) + } + } + } + futures.foreach(Await.result(_, 20.seconds)) + } + } + + def testOpenClose(wd: os.Path, count: Int): Unit = { + println("openClose in " + wd) + for (index <- Range(0, count)) { + println("watch index " + index) + @volatile var done = false + val res = os.watch.watch( + Seq(wd), + filter = _ => true, + onEvent = path => { + println(path) + done = true + }, + logger = (event, data) => println(event) + ) + os.write.append(wd / s"file.txt", "" + index) + + val startTimeNanos = System.nanoTime() + val timeout = 3.seconds + val timeoutNanos = timeout.toNanos + try { + while (!done) { + val taken = System.nanoTime() - startTimeNanos + if (taken >= timeoutNanos) + throw new TimeoutException(s"no file system changes detected within $timeout") + Thread.sleep(1) + } + } finally res.close() + } + } + + test("openClose") { + test("once") { + _root_.test.os.TestUtil.prep(testOpenClose(_, 1)) + } + + test("manyTimes") { + _root_.test.os.TestUtil.prep(testOpenClose(_, 200)) } } + + test("closeIsSafeToInvokeMultipleTimes") - _root_.test.os.TestUtil.mkDir { wd => + import scala.concurrent.ExecutionContext.Implicits.global + + val res = os.watch.watch(Seq(wd), onEvent = _ => ()) + try { + val futures = (0 to 100).map { _ => Future(res.close()) } + futures.foreach(Await.result(_, 20.seconds)) + } finally { + res.close() + } + } + } + + /** + * Generates N random paths, arbitrarily nested under a given subdirectory. + * + * @param count The number of random paths to generate. + * @param baseSubdirectory Subdirectory under which paths will be generated. + * @param maxNestingDepth The maximum number of directory levels (0 means files directly in baseSubdirectory). + * @return A Vector of strings, where each string is a fully formed random path. + * @throws IllegalArgumentException if N is negative, or maxNestingDepth is negative. + */ + def generateNRandomPaths( + count: Int, + baseSubdirectory: Path, + maxNestingDepth: Int = 5, + random: Random + ): Vector[Path] = { + def randomAlphanumeric(length: Int): String = + random.alphanumeric.take(length).mkString + + def generateSingleRandomPath(baseDir: Path) = { + // actualNestingDepth can be 0 (file directly in baseDir) up to maxNestingDepth + val actualNestingDepth = random.nextInt(maxNestingDepth + 1) + + var currentPath: Path = baseDir + + // Create random subdirectories + for (_ <- 0 until actualNestingDepth) { + currentPath = currentPath / randomAlphanumeric(3).toLowerCase + } + + // Create random filename with extension + val fileName = s"${randomAlphanumeric(8)}.${randomAlphanumeric(3).toLowerCase}" + currentPath = currentPath / fileName + + currentPath + } + + if (count < 0) throw new IllegalArgumentException("Number of paths cannot be negative.") + if (maxNestingDepth < 0) + throw new IllegalArgumentException("maxNestingDepth cannot be negative.") + + Vector.fill(count)(generateSingleRandomPath(baseSubdirectory)) } }