Skip to content

Commit 51ca834

Browse files
authored
Use a single Mill server process (#5066)
Fixes #5031 * We now only have a single server process running at a time, and no longer keep up to 5 separate `serverDir-1` to `serverDir-5` folders. Essentially we are migrating from the style of concurrency used by `gradle` and `mvnd` to the style of concurrency used by `bazel` * We now only hold the `clientLock` until the server process is initialized (and has taken it's `processLock`) to avoid redundant initialization, and after that we release it to allow multiple clients to connect * Since there is only one server and one `serverDir`, we have to remove several per-server files since there is no way to ensure they are unique in the presence of multiple clients * `runArgs` can no longer be a file, and instead we send its contents over the socket before starting the `InputPumper` * We can no longer rely on `clientLock` probing to check if the client has disconnected, and so we instead have the server send `ProxyStream.HEARTBEAT` bytes (127) and look for errors * `Server#run` can no longer accept socket connections and process them in a single-threaded loop, and instead needs to spawn off threads to handle each connected socket * We cannot perform the mutex at the server/client/connection level because there are scenarios we want multiple clients connected at once, e.g. when one client is in `--watch` mode and idle we want to allow another client to proceed with execution. So locking needs to be done at a finer-grained granularity within the server * The original `millLock` file (renamed `millOutLock`) has been combined with a `MemoryLock` via `DoubleLock`, which lets us check the memory-lock first to avoid taking the file-lock twice on the same JVM and causing a `OverlappingFileLockException`. We still need to keep the file-lock to provide mutex between server and no-server processes, or between multiple no-server processes Now that things are in one process, that opens up the possibility of loosening the concurrency constraints in future, e.g. allowing multiple concurrent evaluations that do not interfere with each other.
1 parent 5b88d1e commit 51ca834

File tree

17 files changed

+222
-114
lines changed

17 files changed

+222
-114
lines changed

core/constants/src/mill/constants/OutFiles.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public class OutFiles {
6060
/**
6161
* Lock file used for exclusive access to the Mill output directory
6262
*/
63-
public static final String millLock = "mill-lock";
63+
public static final String millOutLock = "mill-out-lock";
6464

6565
/**
6666
* Any active Mill command that is currently run, for debugging purposes

core/constants/src/mill/constants/ProxyStream.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ public class ProxyStream {
3535
public static final int OUT = 1;
3636
public static final int ERR = -1;
3737
public static final int END = 0;
38+
public static final int HEARTBEAT = 127;
3839

3940
public static void sendEnd(OutputStream out) throws IOException {
4041
synchronized (out) {
@@ -43,6 +44,13 @@ public static void sendEnd(OutputStream out) throws IOException {
4344
}
4445
}
4546

47+
public static void sendHeartbeat(OutputStream out) throws IOException {
48+
synchronized (out) {
49+
out.write(ProxyStream.HEARTBEAT);
50+
out.flush();
51+
}
52+
}
53+
4654
public static class Output extends java.io.OutputStream {
4755
private final java.io.OutputStream destination;
4856
private final int key;
@@ -75,7 +83,7 @@ public void write(byte[] b, int off, int len) throws IOException {
7583
synchronized (destination) {
7684
int i = 0;
7785
while (i < len && i + off < b.length) {
78-
int chunkLength = Math.min(len - i, 127);
86+
int chunkLength = Math.min(len - i, 126);
7987
if (chunkLength > 0) {
8088
destination.write(chunkLength * key);
8189
destination.write(b, off + i, Math.min(b.length - off - i, chunkLength));
@@ -136,7 +144,8 @@ public void run() {
136144
// that only header values > 0 represent actual data to read:
137145
// - sign((byte)header) represents which stream the data should be sent to
138146
// - abs((byte)header) represents the length of the data to read and send
139-
if (header == -1 || header == 0) break;
147+
if (header == -1 || header == END) break;
148+
else if (header == HEARTBEAT) continue;
140149
else {
141150
int stream = (byte) header > 0 ? 1 : -1;
142151
int quantity0 = (byte) header;

core/constants/src/mill/constants/ServerFiles.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ public class ServerFiles {
2020
* folder. If multiple servers are spawned in the same folder, only one takes
2121
* the lock and the others fail to do so and terminate immediately.
2222
*/
23-
public static final String processLock = "processLock";
23+
public static final String serverLock = "serverLock";
2424

2525
/**
2626
* The port used to connect between server and client
@@ -48,12 +48,6 @@ public static String pipe(String base) {
4848
*/
4949
public static final String serverLog = "server.log";
5050

51-
/**
52-
* File that the client writes to pass the arguments, environment variables,
53-
* and other necessary metadata to the Mill server to kick off a run
54-
*/
55-
public static final String runArgs = "runArgs";
56-
5751
/**
5852
* File the server writes to pass the exit code of a completed run back to the
5953
* client

example/fundamentals/out-dir/1-out-files/build.mill

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ out/mill-build/...
2929
out/mill-profile.json
3030
out/mill-runner-state.json
3131
out/mill-dependency-tree.json
32-
out/mill-lock
32+
out/mill-out-lock
3333
out/mill-invalidation-tree.json
3434
out/mill-chrome-profile.json
3535
out/mill-server/...

runner/bsp/worker/src/mill/bsp/worker/BspWorkerImpl.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import mill.bsp.BuildInfo
55
import mill.api.internal.{BspServerHandle, BspServerResult, EvaluatorApi}
66
import mill.bsp.Constants
77
import mill.api.{Result, SystemStreams}
8+
import mill.client.lock.Lock
89
import org.eclipse.lsp4j.jsonrpc.Launcher
910

1011
import java.io.PrintWriter
@@ -17,7 +18,8 @@ object BspWorkerImpl {
1718
topLevelBuildRoot: os.Path,
1819
streams: SystemStreams,
1920
logDir: os.Path,
20-
canReload: Boolean
21+
canReload: Boolean,
22+
outLock: Lock
2123
): mill.api.Result[BspServerHandle] = {
2224

2325
try {
@@ -33,7 +35,8 @@ object BspWorkerImpl {
3335
logStream = streams.err,
3436
canReload = canReload,
3537
debugMessages = Option(System.getenv("MILL_BSP_DEBUG")).contains("true"),
36-
onShutdown = () => listening.cancel(true)
38+
onShutdown = () => listening.cancel(true),
39+
outLock = outLock
3740
) with MillJvmBuildServer with MillJavaBuildServer with MillScalaBuildServer
3841

3942
lazy val launcher = new Launcher.Builder[BuildClient]()

runner/bsp/worker/src/mill/bsp/worker/MillBuildServer.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,14 @@ import com.google.gson.JsonObject
66
import mill.api.*
77
import mill.api.internal.{JvmBuildTarget, ScalaBuildTarget, *}
88
import mill.api.Segment.Label
9-
import mill.bsp.{Constants}
9+
import mill.bsp.Constants
1010
import mill.bsp.worker.Utils.{makeBuildTarget, outputPaths, sanitizeUri}
11+
import mill.client.lock.Lock
1112
import mill.server.Server
1213

1314
import java.io.PrintStream
1415
import java.util.concurrent.CompletableFuture
16+
import java.util.concurrent.locks.ReentrantLock
1517
import scala.collection.mutable
1618
import scala.concurrent.Promise
1719
import scala.jdk.CollectionConverters.*
@@ -28,7 +30,8 @@ private class MillBuildServer(
2830
logStream: PrintStream,
2931
canReload: Boolean,
3032
debugMessages: Boolean,
31-
onShutdown: () => Unit
33+
onShutdown: () => Unit,
34+
outLock: Lock
3235
)(implicit ec: scala.concurrent.ExecutionContext) extends BuildServer {
3336

3437
import MillBuildServer._
@@ -724,7 +727,8 @@ private class MillBuildServer(
724727
case n: NamedTaskApi[_] => n.label
725728
case t => t.toString
726729
},
727-
streams = logger0.streams
730+
streams = logger0.streams,
731+
outLock = outLock
728732
) {
729733
evaluator.executeApi(
730734
goals,

runner/client/src/mill/client/ServerLauncher.java

Lines changed: 30 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import java.nio.file.Path;
1010
import java.util.Map;
1111
import mill.client.lock.Locks;
12-
import mill.client.lock.TryLocked;
1312
import mill.constants.InputPumper;
1413
import mill.constants.ProxyStream;
1514
import mill.constants.ServerFiles;
@@ -23,10 +22,10 @@
2322
*
2423
* - Client:
2524
* - Take clientLock
26-
* - If processLock is not yet taken, it means server is not running, so spawn a server
25+
* - If serverLock is not yet taken, it means server is not running, so spawn a server
2726
* - Wait for server socket to be available for connection
2827
* - Server:
29-
* - Take processLock.
28+
* - Take serverLock.
3029
* - If already taken, it means another server was running
3130
* (e.g. spawned by a different client) so exit immediately
3231
* - Server: loop:
@@ -46,7 +45,6 @@ public static class Result {
4645
public Path serverDir;
4746
}
4847

49-
final int serverProcessesLimit = 5;
5048
final int serverInitWaitMillis = 10000;
5149

5250
public abstract void initServer(Path serverDir, boolean b, Locks locks) throws Exception;
@@ -58,7 +56,7 @@ public static class Result {
5856
PrintStream stderr;
5957
Map<String, String> env;
6058
String[] args;
61-
Locks[] memoryLocks;
59+
Locks memoryLock;
6260
int forceFailureForTestingMillisDelay;
6361

6462
public ServerLauncher(
@@ -67,7 +65,7 @@ public ServerLauncher(
6765
PrintStream stderr,
6866
Map<String, String> env,
6967
String[] args,
70-
Locks[] memoryLocks,
68+
Locks memoryLock,
7169
int forceFailureForTestingMillisDelay) {
7270
this.stdin = stdin;
7371
this.stdout = stdout;
@@ -78,56 +76,32 @@ public ServerLauncher(
7876
// For testing in memory, we need to pass in the locks separately, so that the
7977
// locks can be shared between the different instances of `ServerLauncher` the
8078
// same way file locks are shared between different Mill client/server processes
81-
this.memoryLocks = memoryLocks;
79+
this.memoryLock = memoryLock;
8280

8381
this.forceFailureForTestingMillisDelay = forceFailureForTestingMillisDelay;
8482
}
8583

86-
public Result acquireLocksAndRun(Path serverDir0) throws Exception {
87-
84+
public Result acquireLocksAndRun(Path serverDir) throws Exception {
8885
final boolean setJnaNoSys = System.getProperty("jna.nosys") == null;
89-
if (setJnaNoSys) {
90-
System.setProperty("jna.nosys", "true");
91-
}
86+
if (setJnaNoSys) System.setProperty("jna.nosys", "true");
9287

93-
int serverIndex = 0;
94-
while (serverIndex < serverProcessesLimit) { // Try each possible server process (-1 to -5)
95-
serverIndex++;
96-
final Path serverDir =
97-
serverDir0.getParent().resolve(serverDir0.getFileName() + "-" + serverIndex);
98-
99-
Files.createDirectories(serverDir);
100-
101-
try (Locks locks = memoryLocks != null
102-
? memoryLocks[serverIndex - 1]
103-
: Locks.files(serverDir.toString());
104-
TryLocked clientLocked = locks.clientLock.tryLock()) {
105-
if (clientLocked.isLocked()) {
106-
Result result = new Result();
107-
preRun(serverDir);
108-
result.exitCode = run(serverDir, setJnaNoSys, locks);
109-
result.serverDir = serverDir;
110-
return result;
111-
}
112-
}
113-
}
114-
throw new ServerCouldNotBeStarted(
115-
"Reached max server processes limit: " + serverProcessesLimit);
116-
}
117-
118-
int run(Path serverDir, boolean setJnaNoSys, Locks locks) throws Exception {
88+
Files.createDirectories(serverDir);
11989

120-
try (OutputStream f = Files.newOutputStream(serverDir.resolve(ServerFiles.runArgs))) {
121-
f.write(Util.hasConsole() ? 1 : 0);
122-
ClientUtil.writeString(f, BuildInfo.millVersion);
123-
ClientUtil.writeArgs(args, f);
124-
ClientUtil.writeMap(env, f);
125-
}
90+
Result result = new Result();
91+
preRun(serverDir);
92+
result.exitCode = run(serverDir, setJnaNoSys);
93+
result.serverDir = serverDir;
94+
return result;
95+
}
12696

127-
if (locks.processLock.probe()) initServer(serverDir, setJnaNoSys, locks);
97+
int run(Path serverDir, boolean setJnaNoSys) throws Exception {
12898

129-
while (locks.processLock.probe()) Thread.sleep(1);
99+
try (Locks locks = memoryLock != null ? memoryLock : Locks.files(serverDir.toString());
100+
mill.client.lock.Locked locked = locks.clientLock.lock()) {
130101

102+
if (locks.serverLock.probe()) initServer(serverDir, setJnaNoSys, locks);
103+
while (locks.serverLock.probe()) Thread.sleep(1);
104+
}
131105
long retryStart = System.currentTimeMillis();
132106
Socket ioSocket = null;
133107
Throwable socketThrowable = null;
@@ -147,6 +121,10 @@ int run(Path serverDir, boolean setJnaNoSys, Locks locks) throws Exception {
147121

148122
InputStream outErr = ioSocket.getInputStream();
149123
OutputStream in = ioSocket.getOutputStream();
124+
in.write(Util.hasConsole() ? 1 : 0);
125+
ClientUtil.writeString(in, BuildInfo.millVersion);
126+
ClientUtil.writeArgs(args, in);
127+
ClientUtil.writeMap(env, in);
150128
ProxyStream.Pumper outPumper = new ProxyStream.Pumper(outErr, stdout, stderr);
151129
InputPumper inPump = new InputPumper(() -> stdin, () -> in, true);
152130
Thread outPumperThread = new Thread(outPumper, "outPump");
@@ -156,13 +134,13 @@ int run(Path serverDir, boolean setJnaNoSys, Locks locks) throws Exception {
156134
outPumperThread.start();
157135
inThread.start();
158136

159-
if (forceFailureForTestingMillisDelay > 0) {
160-
Thread.sleep(forceFailureForTestingMillisDelay);
161-
throw new Exception("Force failure for testing: " + serverDir);
162-
}
163-
outPumperThread.join();
164-
165137
try {
138+
if (forceFailureForTestingMillisDelay > 0) {
139+
Thread.sleep(forceFailureForTestingMillisDelay);
140+
throw new Exception("Force failure for testing: " + serverDir);
141+
}
142+
outPumperThread.join();
143+
166144
Path exitCodeFile = serverDir.resolve(ServerFiles.exitCode);
167145
if (Files.exists(exitCodeFile)) {
168146
return Integer.parseInt(Files.readAllLines(exitCodeFile).get(0));
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package mill.client.lock;
2+
3+
public class DoubleLock extends Lock {
4+
5+
private final Lock lock1;
6+
private final Lock lock2;
7+
8+
public DoubleLock(Lock lock1, Lock lock2) throws Exception {
9+
this.lock1 = lock1;
10+
this.lock2 = lock2;
11+
}
12+
13+
@Override
14+
public Locked lock() throws Exception {
15+
return new DoubleLocked(lock1.lock(), lock2.lock());
16+
}
17+
18+
@Override
19+
public TryLocked tryLock() throws Exception {
20+
TryLocked l1 = lock1.tryLock();
21+
TryLocked l2 = lock2.tryLock();
22+
if (l1.isLocked() && l2.isLocked()) {
23+
return new DoubleTryLocked(l1, l2);
24+
} else {
25+
l1.release();
26+
l2.release();
27+
return new DoubleTryLocked(null, null);
28+
}
29+
}
30+
31+
@Override
32+
public boolean probe() throws Exception {
33+
TryLocked tl = tryLock();
34+
if (!tl.isLocked()) return true;
35+
else {
36+
tl.release();
37+
return false;
38+
}
39+
}
40+
41+
@Override
42+
public void close() throws Exception {
43+
lock1.close();
44+
lock2.close();
45+
}
46+
47+
@Override
48+
public void delete() throws Exception {
49+
close();
50+
}
51+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package mill.client.lock;
2+
3+
class DoubleLocked implements Locked {
4+
5+
protected final Locked lock1;
6+
protected final Locked lock2;
7+
8+
public DoubleLocked(Locked lock1, Locked lock2) {
9+
this.lock1 = lock1;
10+
this.lock2 = lock2;
11+
}
12+
13+
@Override
14+
public void release() throws Exception {
15+
this.lock1.release();
16+
this.lock2.release();
17+
}
18+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package mill.client.lock;
2+
3+
class DoubleTryLocked extends DoubleLocked implements TryLocked {
4+
5+
public DoubleTryLocked(TryLocked lock1, TryLocked lock2) {
6+
super(lock1, lock2);
7+
}
8+
9+
@Override
10+
public boolean isLocked() {
11+
return lock1 != null && lock2 != null;
12+
}
13+
14+
@Override
15+
public void release() throws Exception {
16+
if (isLocked()) super.release();
17+
}
18+
}

0 commit comments

Comments
 (0)