Skip to content

Commit d9612e1

Browse files
authored
Stream exit code over ProxyStreams rather than writing to a file on disk (#5131)
Fixes #5130 We already send a `ProxyStream.END` byte for any normal termination, so it's straightforward to make the server send one byte after representing the exit code, and have the client read the exit code byte before closing the connection This also explicitly closes the `serverSocket` and releases the `serverLock` when shutting down, as leaving those open sometimes keeps the server running 500-1000ms longer than expected. This should fix the occasional `exitCode file not found` errors we have been seeing since time immemorial
1 parent c3162da commit d9612e1

File tree

12 files changed

+108
-70
lines changed

12 files changed

+108
-70
lines changed

core/constants/src/mill/constants/ProxyStream.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,10 @@ public class ProxyStream {
3737
public static final int END = 0;
3838
public static final int HEARTBEAT = 127;
3939

40-
public static void sendEnd(OutputStream out) throws IOException {
40+
public static void sendEnd(OutputStream out, int exitCode) throws IOException {
4141
synchronized (out) {
4242
out.write(ProxyStream.END);
43+
out.write(exitCode);
4344
out.flush();
4445
}
4546
}
@@ -113,6 +114,7 @@ public static class Pumper implements Runnable {
113114
private final OutputStream destOut;
114115
private final OutputStream destErr;
115116
private final Object synchronizer;
117+
public volatile int exitCode = 255;
116118

117119
public Pumper(
118120
InputStream src, OutputStream destOut, OutputStream destErr, Object synchronizer) {
@@ -144,8 +146,11 @@ public void run() {
144146
// that only header values > 0 represent actual data to read:
145147
// - sign((byte)header) represents which stream the data should be sent to
146148
// - abs((byte)header) represents the length of the data to read and send
147-
if (header == -1 || header == END) break;
148-
else if (header == HEARTBEAT) continue;
149+
if (header == -1) break;
150+
else if (header == END) {
151+
exitCode = src.read();
152+
break;
153+
} else if (header == HEARTBEAT) continue;
149154
else {
150155
int stream = (byte) header > 0 ? 1 : -1;
151156
int quantity0 = (byte) header;

core/constants/src/mill/constants/ServerFiles.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,6 @@ public static String pipe(String base) {
4848
*/
4949
public static final String serverLog = "server.log";
5050

51-
/**
52-
* File the server writes to pass the exit code of a completed run back to the
53-
* client
54-
*/
55-
public static final String exitCode = "exitCode";
56-
5751
/**
5852
* Where the server's stdout is piped to
5953
*/

core/constants/test/src/mill/client/ProxyStreamTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public void test0(byte[] outData, byte[] errData, int repeats, boolean gracefulE
7777
srcErr.write(errData);
7878
}
7979

80-
if (gracefulEnd) ProxyStream.sendEnd(pipedOutputStream);
80+
if (gracefulEnd) ProxyStream.sendEnd(pipedOutputStream, 0);
8181
else {
8282
pipedOutputStream.close();
8383
}

core/internal/src/mill/internal/PromptLogger.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,10 @@ private[mill] object PromptLogger {
337337
// Close the write side of the pipe first but do not close the read side, so
338338
// the `pumperThread` can continue reading remaining text in the pipe buffer
339339
// before terminating on its own
340-
ProxyStream.sendEnd(pipe.output)
340+
ProxyStream.sendEnd(
341+
pipe.output,
342+
0 // exit code value is not used since this ProxyStream doesn't wrap a subprocess
343+
)
341344
pipe.output.close()
342345
pumperThread.join()
343346
}

integration/feature/shutdown-exit-code/src/ShutdownExitCodeTests.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,13 @@ object ShutdownExitCodeTests extends UtestIntegrationTestSuite {
1010
test("test") - integrationTest { tester =>
1111
val result1 = tester.eval(("resolve", "_"))
1212
assert(result1.isSuccess == true)
13+
1314
val result2 = tester.eval("shutdown")
1415
assert(result2.isSuccess == true)
1516

1617
val result3 = tester.eval("doesnt-exit")
17-
assert(result3.isSuccess == false)
18+
assert(result3.exitCode == 1)
19+
1820
val result4 = tester.eval("shutdown")
1921
assert(result4.isSuccess == true)
2022
}

integration/ide/bsp-modules/src/BspModulesTests.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ object BspModulesTests extends UtestIntegrationTestSuite {
1818
os.read(workspacePath / Constants.bspDir / s"${Constants.serverName}.json")
1919
)
2020

21+
eval("shutdown")
22+
Thread.sleep(1000)
2123
val executable = json("argv").arr(0).str
2224
val checkRes = os.call((executable, "checkExecutable"), cwd = workspacePath)
2325
assert(checkRes.exitCode == 0)

integration/invalidation/version-change/src/VersionChangeTests.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ object VersionChangeTests extends UtestIntegrationTestSuite {
1414
os.write.over(workspacePath / ".mill-jvm-version", "temurin:19.0.2")
1515

1616
val javaVersion2 = eval(("show", "javaVersion"))
17-
assert(javaVersion2.out == "\"19.0.2\"")
17+
assert(javaVersion2.out.contains("\"19.0.2\""))
1818

1919
}
2020
}

runner/client/src/mill/client/ServerLauncher.java

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package mill.client;
22

3-
import java.io.IOException;
43
import java.io.InputStream;
54
import java.io.OutputStream;
65
import java.io.PrintStream;
@@ -90,17 +89,17 @@ public Result run(Path serverDir, String javaHome) throws Exception {
9089

9190
Socket ioSocket = launchConnectToServer(serverDir);
9291

92+
Result result = new Result();
9393
try {
94-
Thread outPumperThread = startStreamPumpers(ioSocket, javaHome);
94+
PumperThread outPumperThread = startStreamPumpers(ioSocket, javaHome);
9595
forceTestFailure(serverDir);
9696
outPumperThread.join();
97+
result.exitCode = outPumperThread.exitCode();
98+
result.serverDir = serverDir;
9799
} finally {
98100
ioSocket.close();
99101
}
100102

101-
Result result = new Result();
102-
result.exitCode = readExitCode(serverDir);
103-
result.serverDir = serverDir;
104103
return result;
105104
}
106105

@@ -137,7 +136,20 @@ private void forceTestFailure(Path serverDir) throws Exception {
137136
}
138137
}
139138

140-
Thread startStreamPumpers(Socket ioSocket, String javaHome) throws Exception {
139+
class PumperThread extends Thread {
140+
ProxyStream.Pumper runnable;
141+
142+
public PumperThread(ProxyStream.Pumper runnable, String name) {
143+
super(runnable, name);
144+
this.runnable = runnable;
145+
}
146+
147+
public int exitCode() {
148+
return runnable.exitCode;
149+
}
150+
}
151+
152+
PumperThread startStreamPumpers(Socket ioSocket, String javaHome) throws Exception {
141153
InputStream outErr = ioSocket.getInputStream();
142154
OutputStream in = ioSocket.getOutputStream();
143155
in.write(Util.hasConsole() ? 1 : 0);
@@ -147,22 +159,12 @@ Thread startStreamPumpers(Socket ioSocket, String javaHome) throws Exception {
147159
ClientUtil.writeMap(env, in);
148160
ProxyStream.Pumper outPumper = new ProxyStream.Pumper(outErr, stdout, stderr);
149161
InputPumper inPump = new InputPumper(() -> stdin, () -> in, true);
150-
Thread outPumperThread = new Thread(outPumper, "outPump");
162+
PumperThread outPumperThread = new PumperThread(outPumper, "outPump");
151163
outPumperThread.setDaemon(true);
152164
Thread inThread = new Thread(inPump, "inPump");
153165
inThread.setDaemon(true);
154166
outPumperThread.start();
155167
inThread.start();
156168
return outPumperThread;
157169
}
158-
159-
int readExitCode(Path serverDir) throws IOException {
160-
Path exitCodeFile = serverDir.resolve(ServerFiles.exitCode);
161-
if (Files.exists(exitCodeFile)) {
162-
return Integer.parseInt(Files.readAllLines(exitCodeFile).get(0));
163-
} else {
164-
System.err.println("mill-server/ exitCode file not found");
165-
return 1;
166-
}
167-
}
168170
}

runner/launcher/src/mill/launcher/MillProcessLauncher.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,6 @@ public static boolean checkTputExists() {
364364
public static void prepareMillRunFolder(Path serverDir) throws Exception {
365365
// Clear out run-related files from the server folder to make sure we
366366
// never hit issues where we are reading the files from a previous run
367-
Files.deleteIfExists(serverDir.resolve(ServerFiles.exitCode));
368367
Files.deleteIfExists(serverDir.resolve(ServerFiles.terminfo));
369368

370369
Path sandbox = serverDir.resolve(ServerFiles.sandbox);

runner/server/src/mill/server/Server.scala

Lines changed: 65 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,14 @@ import mill.constants.InputPumper
99
import mill.constants.ProxyStream
1010

1111
import java.io.*
12-
import java.net.{InetAddress, Socket}
12+
import java.net.{InetAddress, Socket, ServerSocket}
1313
import scala.jdk.CollectionConverters.*
1414
import scala.util.Try
1515
import scala.util.Using
1616
import mill.constants.OutFiles
1717

18+
import java.util.concurrent.atomic.AtomicBoolean
19+
1820
/**
1921
* Models a long-lived server that receives requests from a client and calls a [[main0]]
2022
* method to run the commands in-process. Provides the command args, env variables,
@@ -52,7 +54,7 @@ abstract class Server[T](
5254
val initialSystemProperties = sys.props.toMap
5355

5456
try {
55-
Server.tryLockBlock(locks.serverLock) {
57+
Server.tryLockBlock(locks.serverLock) { locked =>
5658
serverLog("server file locked")
5759
Server.watchProcessIdFile(
5860
serverDir / ServerFiles.processId,
@@ -64,27 +66,42 @@ abstract class Server[T](
6466
}
6567
)
6668
val serverSocket = new java.net.ServerSocket(0, 0, InetAddress.getByName(null))
67-
os.write.over(serverDir / ServerFiles.socketPort, serverSocket.getLocalPort.toString)
68-
serverLog("listening on port " + serverSocket.getLocalPort)
69-
while (
70-
running && {
71-
interruptWithTimeout(() => serverSocket.close(), () => serverSocket.accept()) match {
72-
case None => false
73-
case Some(sock) =>
74-
serverLog("handling run")
75-
new Thread(
76-
() =>
77-
try handleRun(sock, initialSystemProperties)
78-
catch {
79-
case e: Throwable =>
80-
serverLog(e.toString + "\n" + e.getStackTrace.mkString("\n"))
81-
} finally sock.close();,
82-
"HandleRunThread"
83-
).start()
84-
true
85-
}
69+
try {
70+
os.write.over(serverDir / ServerFiles.socketPort, serverSocket.getLocalPort.toString)
71+
serverLog("listening on port " + serverSocket.getLocalPort)
72+
73+
def systemExit(exitCode: Int) = {
74+
// Explicitly close serverSocket before exiting otherwise it can keep the
75+
// server alive 500-1000ms before letting it exit properly
76+
serverSocket.close()
77+
// Explicitly release process lock to indicate this serverwill not be
78+
// taking any more requests, and a new server should be spawned if necessary.
79+
// Otherwise launchers may continue trying to connect to the server and
80+
// failing since the socket is closed.
81+
locked.release()
82+
sys.exit(exitCode)
8683
}
87-
) ()
84+
85+
while (
86+
running && {
87+
interruptWithTimeout(() => serverSocket.close(), () => serverSocket.accept()) match {
88+
case None => false
89+
case Some(sock) =>
90+
serverLog("handling run")
91+
new Thread(
92+
() =>
93+
try handleRun(systemExit, sock, initialSystemProperties)
94+
catch {
95+
case e: Throwable =>
96+
serverLog(e.toString + "\n" + e.getStackTrace.mkString("\n"))
97+
} finally sock.close();,
98+
"HandleRunThread"
99+
).start()
100+
true
101+
}
102+
}
103+
) ()
104+
} finally serverSocket.close()
88105
serverLog("server loop ended")
89106
}.getOrElse(throw new Exception("Mill server process already present"))
90107
} catch {
@@ -142,9 +159,19 @@ abstract class Server[T](
142159
}
143160
}
144161

145-
def handleRun(clientSocket: Socket, initialSystemProperties: Map[String, String]): Unit = {
146-
162+
def handleRun(
163+
systemExit: Int => Nothing,
164+
clientSocket: Socket,
165+
initialSystemProperties: Map[String, String]
166+
): Unit = {
147167
val currentOutErr = clientSocket.getOutputStream
168+
val writtenExitCode = AtomicBoolean()
169+
def writeExitCode(code: Int) = {
170+
if (!writtenExitCode.getAndSet(true)) {
171+
ProxyStream.sendEnd(currentOutErr, code)
172+
}
173+
}
174+
148175
var clientDisappeared = false
149176
// We cannot use Socket#{isConnected, isClosed, isBound} because none of these
150177
// detect client-side connection closing, so instead we send a no-op heartbeat
@@ -181,6 +208,7 @@ abstract class Server[T](
181208

182209
val millVersionChanged = lastMillVersion.exists(_ != clientMillVersion)
183210
val javaVersionChanged = lastJavaVersion.exists(_ != clientJavaVersion)
211+
184212
if (millVersionChanged || javaVersionChanged) {
185213
Server.withOutLock(
186214
noBuildLock = false,
@@ -204,11 +232,9 @@ abstract class Server[T](
204232
s"Java version changed ($lastJavaVersion -> $clientJavaVersion), re-starting server"
205233
)
206234
}
207-
os.write(
208-
serverDir / ServerFiles.exitCode,
209-
ClientUtil.ExitServerCodeWhenVersionMismatch().toString.getBytes()
210-
)
211-
System.exit(ClientUtil.ExitServerCodeWhenVersionMismatch())
235+
236+
writeExitCode(ClientUtil.ExitServerCodeWhenVersionMismatch())
237+
systemExit(ClientUtil.ExitServerCodeWhenVersionMismatch())
212238
}
213239
}
214240
lastMillVersion = Some(clientMillVersion)
@@ -228,15 +254,15 @@ abstract class Server[T](
228254
Map(),
229255
initialSystemProperties,
230256
systemExit = exitCode => {
231-
os.write.over(serverDir / ServerFiles.exitCode, exitCode.toString)
232-
sys.exit(exitCode)
257+
writeExitCode(exitCode)
258+
systemExit(exitCode)
233259
}
234260
)
235261

236262
stateCache = newStateCache
237-
val exitCode = if (result) "0" else "1"
263+
val exitCode = if (result) 0 else 1
238264
serverLog("exitCode " + exitCode)
239-
os.write.over(serverDir / ServerFiles.exitCode, exitCode)
265+
writeExitCode(exitCode)
240266
} finally {
241267
done = true
242268
idle = true
@@ -271,7 +297,10 @@ abstract class Server[T](
271297
System.err.flush()
272298

273299
} finally {
274-
if (!clientDisappeared) ProxyStream.sendEnd(currentOutErr) // Send a termination
300+
try writeExitCode(1) // Send a termination if it has not already happened
301+
catch {
302+
case e: Throwable => /*donothing*/
303+
}
275304
}
276305
}
277306

@@ -329,12 +358,12 @@ object Server {
329358
processIdThread.start()
330359
}
331360

332-
def tryLockBlock[T](lock: Lock)(t: => T): Option[T] = {
361+
def tryLockBlock[T](lock: Lock)(block: mill.client.lock.TryLocked => T): Option[T] = {
333362
lock.tryLock() match {
334363
case null => None
335364
case l =>
336365
if (l.isLocked) {
337-
try Some(t)
366+
try Some(block(l))
338367
finally l.release()
339368
} else {
340369
None

0 commit comments

Comments
 (0)