Skip to content

Commit d2df820

Browse files
authored
Catch fatal exceptions in futures (#4223)
Fixes #4221 Otherwise they just silently terminate the future and leave downstream `Await`s hanging, resulting in the process hanging waiting for a future that will never complete. `def reportFailure` doesn't seem to log stuff properly when this happens for some reason Covered with additional integration tests
1 parent c5cf3c8 commit d2df820

File tree

3 files changed

+119
-64
lines changed

3 files changed

+119
-64
lines changed
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package build
2+
import mill._
3+
4+
def fatalException(msg: String) = {
5+
// Needs to be a fatal error according to scala.util.control.NonFatal,
6+
// not just any error!
7+
val ex = new java.lang.LinkageError(msg)
8+
assert(!scala.util.control.NonFatal.apply(ex))
9+
ex
10+
}
11+
def fatalTask = Task{
12+
throw fatalException("CUSTOM FATAL ERROR IN TASK")
13+
123
14+
}
15+
16+
def alwaysInvalidates = Task.Input(math.random())
17+
def fatalCloseWorker = Task.Worker{
18+
alwaysInvalidates()
19+
new AutoCloseable {
20+
override def close(): Unit =
21+
throw fatalException("CUSTOM FATAL ERROR ON CLOSE")
22+
}
23+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package mill.integration
2+
3+
import mill.testkit.UtestIntegrationTestSuite
4+
5+
import utest._
6+
7+
object CompileErrorTests extends UtestIntegrationTestSuite {
8+
val tests: Tests = Tests {
9+
test - integrationTest { tester =>
10+
val res = tester.eval("fatalTask")
11+
12+
assert(res.isSuccess == false)
13+
assert(res.err.contains("""java.lang.LinkageError: CUSTOM FATAL ERROR IN TASK"""))
14+
15+
// Only run this test in client-server mode, since workers are not shutdown
16+
// with `close()` in no-server mode so the error does not trigger
17+
if (clientServerMode) {
18+
// This worker invalidates re-evaluates every time due to being dependent on
19+
// an upstream `Task.Input`. Make sure that a fatal error in the `close()`
20+
// call does not hang the Mill process
21+
tester.eval("fatalCloseWorker")
22+
val res3 = tester.eval("fatalCloseWorker")
23+
assert(res3.err.contains("""java.lang.LinkageError: CUSTOM FATAL ERROR"""))
24+
}
25+
}
26+
}
27+
}

main/eval/src/mill/eval/EvaluatorCore.scala

Lines changed: 69 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -124,72 +124,77 @@ private[mill] trait EvaluatorCore extends GroupEvaluator {
124124
)
125125
} else {
126126
futures(terminal) = Future.sequence(deps.map(futures)).map { upstreamValues =>
127-
val countMsg = mill.util.Util.leftPad(
128-
count.getAndIncrement().toString,
129-
terminals.length.toString.length,
130-
'0'
131-
)
132-
133-
val verboseKeySuffix = s"/${terminals0.size}"
134-
logger.setPromptHeaderPrefix(s"$countMsg$verboseKeySuffix")
135-
if (failed.get()) None
136-
else {
137-
val upstreamResults = upstreamValues
138-
.iterator
139-
.flatMap(_.iterator.flatMap(_.newResults))
140-
.toMap
141-
142-
val startTime = System.nanoTime() / 1000
143-
144-
// should we log progress?
145-
val inputResults = for {
146-
target <- group.indexed.filterNot(upstreamResults.contains)
147-
item <- target.inputs.filterNot(group.contains)
148-
} yield upstreamResults(item).map(_._1)
149-
val logRun = inputResults.forall(_.result.isInstanceOf[Result.Success[_]])
150-
151-
val tickerPrefix = terminal.render.collect {
152-
case targetLabel if logRun && logger.enableTicker => targetLabel
153-
}
154-
155-
val contextLogger = new PrefixLogger(
156-
logger0 = logger,
157-
key0 = if (!logger.enableTicker) Nil else Seq(countMsg),
158-
verboseKeySuffix = verboseKeySuffix,
159-
message = tickerPrefix,
160-
noPrefix = exclusive
127+
try {
128+
val countMsg = mill.util.Util.leftPad(
129+
count.getAndIncrement().toString,
130+
terminals.length.toString.length,
131+
'0'
161132
)
162133

163-
val res = evaluateGroupCached(
164-
terminal = terminal,
165-
group = sortedGroups.lookupKey(terminal),
166-
results = upstreamResults,
167-
countMsg = countMsg,
168-
verboseKeySuffix = verboseKeySuffix,
169-
zincProblemReporter = reporter,
170-
testReporter = testReporter,
171-
logger = contextLogger,
172-
classToTransitiveClasses,
173-
allTransitiveClassMethods,
174-
forkExecutionContext,
175-
exclusive
176-
)
177-
178-
if (failFast && res.newResults.values.exists(_.result.asSuccess.isEmpty))
179-
failed.set(true)
180-
181-
val endTime = System.nanoTime() / 1000
182-
val duration = endTime - startTime
183-
184-
val threadId = threadNumberer.getThreadId(Thread.currentThread())
185-
chromeProfileLogger.log(terminal, "job", startTime, duration, threadId, res.cached)
186-
187-
if (!res.cached) uncached.put(terminal, ())
188-
if (res.valueHashChanged) changedValueHash.put(terminal, ())
189-
190-
profileLogger.log(terminal, duration, res, deps)
191-
192-
Some(res)
134+
val verboseKeySuffix = s"/${terminals0.size}"
135+
logger.setPromptHeaderPrefix(s"$countMsg$verboseKeySuffix")
136+
if (failed.get()) None
137+
else {
138+
val upstreamResults = upstreamValues
139+
.iterator
140+
.flatMap(_.iterator.flatMap(_.newResults))
141+
.toMap
142+
143+
val startTime = System.nanoTime() / 1000
144+
145+
// should we log progress?
146+
val inputResults = for {
147+
target <- group.indexed.filterNot(upstreamResults.contains)
148+
item <- target.inputs.filterNot(group.contains)
149+
} yield upstreamResults(item).map(_._1)
150+
val logRun = inputResults.forall(_.result.isInstanceOf[Result.Success[_]])
151+
152+
val tickerPrefix = terminal.render.collect {
153+
case targetLabel if logRun && logger.enableTicker => targetLabel
154+
}
155+
156+
val contextLogger = new PrefixLogger(
157+
logger0 = logger,
158+
key0 = if (!logger.enableTicker) Nil else Seq(countMsg),
159+
verboseKeySuffix = verboseKeySuffix,
160+
message = tickerPrefix,
161+
noPrefix = exclusive
162+
)
163+
164+
val res = evaluateGroupCached(
165+
terminal = terminal,
166+
group = sortedGroups.lookupKey(terminal),
167+
results = upstreamResults,
168+
countMsg = countMsg,
169+
verboseKeySuffix = verboseKeySuffix,
170+
zincProblemReporter = reporter,
171+
testReporter = testReporter,
172+
logger = contextLogger,
173+
classToTransitiveClasses,
174+
allTransitiveClassMethods,
175+
forkExecutionContext,
176+
exclusive
177+
)
178+
179+
if (failFast && res.newResults.values.exists(_.result.asSuccess.isEmpty))
180+
failed.set(true)
181+
182+
val endTime = System.nanoTime() / 1000
183+
val duration = endTime - startTime
184+
185+
val threadId = threadNumberer.getThreadId(Thread.currentThread())
186+
chromeProfileLogger.log(terminal, "job", startTime, duration, threadId, res.cached)
187+
188+
if (!res.cached) uncached.put(terminal, ())
189+
if (res.valueHashChanged) changedValueHash.put(terminal, ())
190+
191+
profileLogger.log(terminal, duration, res, deps)
192+
193+
Some(res)
194+
}
195+
} catch {
196+
case e: Throwable if !scala.util.control.NonFatal(e) =>
197+
throw new Exception(e)
193198
}
194199
}
195200
}

0 commit comments

Comments
 (0)