Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion build.mill
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,9 @@ object Deps {
val junitInterface = ivy"com.github.sbt:junit-interface:0.13.3"
val commonsIo = ivy"commons-io:commons-io:2.18.0"
val log4j2Core = ivy"org.apache.logging.log4j:log4j-core:2.24.3"
val osLib = ivy"com.lihaoyi::os-lib:0.11.4-M6"
val osLibVersion = "0.11.5-M7"
val osLib = ivy"com.lihaoyi::os-lib:${osLibVersion}"
val osLibWatch = ivy"com.lihaoyi::os-lib-watch:${osLibVersion}"
val pprint = ivy"com.lihaoyi::pprint:0.9.0"
val mainargs = ivy"com.lihaoyi::mainargs:0.7.6"
val millModuledefsVersion = "0.11.2"
Expand Down
6 changes: 6 additions & 0 deletions main/util/src/mill/util/Watchable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,15 @@ import mill.api.internal
*/
@internal
private[mill] trait Watchable {
/** @return the hashcode of a watched value. */
def poll(): Long

/** The initial hashcode of a watched value. */
def signature: Long

/** @return true if the watched value has not changed */
def validate(): Boolean = poll() == signature

def pretty: String
}
@internal
Expand Down
3 changes: 2 additions & 1 deletion runner/package.mill
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ object `package` extends RootModule with build.MillPublishScalaModule {
build.Deps.windowsAnsi,
build.Deps.coursier,
build.Deps.coursierJvm,
build.Deps.logback
build.Deps.logback,
build.Deps.osLibWatch
)
def buildInfoObjectName = "Versions"
def buildInfoMembers = Seq(
Expand Down
5 changes: 5 additions & 0 deletions runner/src/mill/runner/MillCliConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ case class MillCliConfig(
doc = """Watch and re-run the given tasks when when their inputs change."""
)
watch: Flag = Flag(),
@arg(
name = "watch-via-fs-notify",
doc = "Use filesystem based file watching instead of polling based one (experimental, defaults to false).",
)
watchViaFsNotify: Boolean = false,
@arg(
short = 's',
doc =
Expand Down
8 changes: 4 additions & 4 deletions runner/src/mill/runner/MillMain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,10 @@ object MillMain {
}
val (isSuccess, evalStateOpt) = Watching.watchLoop(
ringBell = config.ringBell.value,
watch = config.watch.value,
watch = Option.when(config.watch.value)(Watching.WatchArgs(
setIdle, colors, useNotify = config.watchViaFsNotify, serverDir = serverDir
)),
streams = streams,
setIdle = setIdle,
evaluate = (enterKeyPressed: Boolean, prevState: Option[RunnerState]) => {
adjustJvmProperties(userSpecifiedProperties, initialSystemProperties)

Expand Down Expand Up @@ -285,8 +286,7 @@ object MillMain {
}
}
}
},
colors = colors
}
)
bspContext.foreach { ctx =>
repeatForBsp =
Expand Down
211 changes: 167 additions & 44 deletions runner/src/mill/runner/Watching.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package mill.runner

import mill.api.internal
import mill.api.{SystemStreams, internal}
import mill.util.{Colors, Watchable}
import mill.api.SystemStreams

import java.io.InputStream
import scala.annotation.tailrec
import scala.util.Using

/**
* Logic around the "watch and wait" functionality in Mill: re-run on change,
Expand All @@ -15,72 +15,194 @@ import scala.annotation.tailrec
object Watching {
case class Result[T](watched: Seq[Watchable], error: Option[String], result: T)

trait Evaluate[T] {
def apply(enterKeyPressed: Boolean, previousState: Option[T]): Result[T]
}

/**
* @param useNotify whether to use filesystem based watcher. If it is false uses polling.
* @param serverDir the directory for storing logs of the mill server
*/
case class WatchArgs(
setIdle: Boolean => Unit,
colors: Colors,
useNotify: Boolean,
serverDir: os.Path
)

def watchLoop[T](
ringBell: Boolean,
watch: Boolean,
watch: Option[WatchArgs],
streams: SystemStreams,
setIdle: Boolean => Unit,
evaluate: (Boolean, Option[T]) => Result[T],
colors: Colors
evaluate: Evaluate[T]
): (Boolean, T) = {
var prevState: Option[T] = None
var enterKeyPressed = false
while (true) {
val Result(watchables, errorOpt, result) = evaluate(enterKeyPressed, prevState)
prevState = Some(result)
def handleError(errorOpt: Option[String]): Unit = {
errorOpt.foreach(streams.err.println)
if (ringBell) {
if (errorOpt.isEmpty) println("\u0007")
else {
println("\u0007")
Thread.sleep(250)
println("\u0007")
}
}
doRingBell(hasError = errorOpt.isDefined)
}

if (!watch) {
return (errorOpt.isEmpty, result)
}
def doRingBell(hasError: Boolean): Unit = {
if (!ringBell) return

val alreadyStale = watchables.exists(!_.validate())
enterKeyPressed = false
if (!alreadyStale) {
enterKeyPressed = Watching.watchAndWait(streams, setIdle, streams.in, watchables, colors)
println("\u0007")
if (hasError) {
// If we have an error ring the bell again
Thread.sleep(250)
println("\u0007")
}
}
???

watch match {
case None =>
val Result(watchables, errorOpt, result) =
evaluate(enterKeyPressed = false, previousState = None)
handleError(errorOpt)
(errorOpt.isEmpty, result)

case Some(watchArgs) =>
var prevState: Option[T] = None
var enterKeyPressed = false

// Exits when the thread gets interruped.
while (true) {
val Result(watchables, errorOpt, result) = evaluate(enterKeyPressed, prevState)
prevState = Some(result)
handleError(errorOpt)

// Do not enter watch if already stale, re-evaluate instantly.
val alreadyStale = watchables.exists(w => !w.validate())
if (alreadyStale) {
enterKeyPressed = false
} else {
enterKeyPressed = watchAndWait(
streams,
watchArgs.setIdle,
streams.in,
watchables,
watchArgs.colors,
useNotify = watchArgs.useNotify,
serverDir = watchArgs.serverDir
)
}
}
throw new IllegalStateException("unreachable")
}
}

def watchAndWait(
streams: SystemStreams,
setIdle: Boolean => Unit,
stdin: InputStream,
watched: Seq[Watchable],
colors: Colors
colors: Colors,
useNotify: Boolean,
serverDir: os.Path
): Boolean = {
setIdle(true)
val watchedPaths = watched.collect { case p: Watchable.Path => p.p.path }
val watchedValues = watched.size - watchedPaths.size
val (watchedPollables, watchedPathsSeq) = watched.partitionMap {
case w: Watchable.Value => Left(w)
case p: Watchable.Path => Right(p)
}
val watchedPathsSet = watchedPathsSeq.iterator.map(p => p.p.path).toSet
val watchedValueCount = watched.size - watchedPathsSeq.size

val watchedValueStr = if (watchedValues == 0) "" else s" and $watchedValues other values"
val watchedValueStr =
if (watchedValueCount == 0) "" else s" and $watchedValueCount other values"

streams.err.println(
streams.err.println {
val viaFsNotify = if (useNotify) " (via fsnotify)" else ""
colors.info(
s"Watching for changes to ${watchedPaths.size} paths$watchedValueStr... (Enter to re-run, Ctrl-C to exit)"
s"Watching for changes to ${watchedPathsSeq.size} paths$viaFsNotify$watchedValueStr... (Enter to re-run, Ctrl-C to exit)"
).toString
)
}

def doWatch(notifiablesChanged: () => Boolean) = {
val enterKeyPressed = statWatchWait(watchedPollables, stdin, notifiablesChanged)
setIdle(false)
enterKeyPressed
}

val enterKeyPressed = statWatchWait(watched, stdin)
setIdle(false)
enterKeyPressed
if (useNotify) Using.resource(os.write.outputStream(serverDir / "fsNotifyWatchLog")) { watchLog =>
def writeToWatchLog(s: String): Unit = {
watchLog.write(s.getBytes(java.nio.charset.StandardCharsets.UTF_8))
watchLog.write('\n')
}

@volatile var pathChangesDetected = false

// oslib watch only works with folders, so we have to watch the parent folders instead

writeToWatchLog(s"[watched-paths:unfiltered] ${watchedPathsSet.toSeq.sorted.mkString("\n")}")

val workspaceRoot = mill.api.WorkspaceRoot.workspaceRoot

/** Paths that are descendants of [[workspaceRoot]]. */
val pathsUnderWorkspaceRoot = watchedPathsSet.filter { path =>
val isUnderWorkspaceRoot = path.startsWith(workspaceRoot)
if (!isUnderWorkspaceRoot) {
streams.err.println(colors.error(
s"Watched path $path is outside workspace root $workspaceRoot, this is unsupported."
).toString())
}

isUnderWorkspaceRoot
}

// If I have 'root/a/b/c'
//
// Then I want to watch:
// root/a/b/c
// root/a/b
// root/a
// root
val filterPaths = pathsUnderWorkspaceRoot.flatMap { path =>
path.relativeTo(workspaceRoot).segments.inits.map(segments => workspaceRoot / segments)
}
writeToWatchLog(s"[watched-paths:filtered] ${filterPaths.toSeq.sorted.mkString("\n")}")

Using.resource(os.watch.watch(
// Just watch the root folder
Seq(workspaceRoot),
filter = path => {
val shouldBeWatched =
filterPaths.contains(path) || watchedPathsSet.exists(watchedPath => path.startsWith(watchedPath))
writeToWatchLog(s"[filter] (shouldBeWatched=$shouldBeWatched) $path")
shouldBeWatched
},
onEvent = changedPaths => {
// Make sure that the changed paths are actually the ones in our watch list and not some adjacent files in the
// same folder
val hasWatchedPath =
changedPaths.exists(p => watchedPathsSet.exists(watchedPath => p.startsWith(watchedPath)))
writeToWatchLog(s"[changed-paths] (hasWatchedPath=$hasWatchedPath) ${changedPaths.mkString("\n")}")
if (hasWatchedPath) {
pathChangesDetected = true
}
},
logger = (eventType, data) => writeToWatchLog(s"[watch:event] $eventType: ${pprint.apply(data).plainText}")
)) { _ =>
doWatch(notifiablesChanged = () => pathChangesDetected)
}
}
else {
doWatch(notifiablesChanged = () => watchedPathsSeq.exists(p => !p.validate()))
}
}

// Returns `true` if enter key is pressed to re-run tasks explicitly
def statWatchWait(watched: Seq[Watchable], stdin: InputStream): Boolean = {
/**
* @param notifiablesChanged returns true if any of the notifiables have changed
*
* @return `true` if enter key is pressed to re-run tasks explicitly, false if changes in watched files occured.
*/
def statWatchWait(
watched: Seq[Watchable],
stdin: InputStream,
notifiablesChanged: () => Boolean
): Boolean = {
val buffer = new Array[Byte](4 * 1024)

@tailrec def statWatchWait0(): Boolean = {
if (watched.forall(_.validate())) {
if (!notifiablesChanged() && watched.forall(_.validate())) {
if (lookForEnterKey()) {
true
} else {
Expand All @@ -94,17 +216,18 @@ object Watching {
if (stdin.available() == 0) false
else stdin.read(buffer) match {
case 0 | -1 => false
case n =>
case bytesRead =>
buffer.indexOf('\n') match {
case -1 => lookForEnterKey()
case i =>
if (i >= n) lookForEnterKey()
case index =>
// If we found the newline further than the bytes read, that means it's not from this read and thus we
// should try reading again.
if (index >= bytesRead) lookForEnterKey()
else true
}
}
}

statWatchWait0()
}

}
Loading