Skip to content

Commit 0c1f1a1

Browse files
committed
.
1 parent 9e7efc3 commit 0c1f1a1

File tree

2 files changed

+150
-12
lines changed

2 files changed

+150
-12
lines changed

os/src/ProcessOps.scala

Lines changed: 129 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,11 @@
11
package os
22

3-
import java.util.concurrent.{ArrayBlockingQueue, Semaphore, TimeUnit}
43
import collection.JavaConverters._
5-
import scala.annotation.tailrec
64
import java.lang.ProcessBuilder.Redirect
75
import os.SubProcess.InputStream
86
import java.io.IOException
97
import java.util.concurrent.LinkedBlockingQueue
108
import ProcessOps._
11-
import scala.util.Try
129

1310
object call {
1411

@@ -28,7 +25,8 @@ object call {
2825
timeout: Long = -1,
2926
check: Boolean = true,
3027
propagateEnv: Boolean = true,
31-
timeoutGracePeriod: Long = 100
28+
timeoutGracePeriod: Long = 100,
29+
shutdownHook: Boolean = false
3230
): CommandResult = {
3331
os.proc(cmd).call(
3432
cwd = cwd,
@@ -40,7 +38,38 @@ object call {
4038
timeout = timeout,
4139
check = check,
4240
propagateEnv = propagateEnv,
43-
timeoutGracePeriod = timeoutGracePeriod
41+
timeoutGracePeriod = timeoutGracePeriod,
42+
shutdownHook = shutdownHook
43+
)
44+
}
45+
def apply(
46+
cmd: Shellable,
47+
env: Map[String, String],
48+
// Make sure `cwd` only comes after `env`, so `os.call("foo", path)` is a compile error
49+
// since the correct syntax is `os.call(("foo", path))`
50+
cwd: Path,
51+
stdin: ProcessInput,
52+
stdout: ProcessOutput,
53+
stderr: ProcessOutput,
54+
mergeErrIntoOut: Boolean,
55+
timeout: Long,
56+
check: Boolean,
57+
propagateEnv: Boolean,
58+
timeoutGracePeriod: Long,
59+
): CommandResult = {
60+
call(
61+
cmd = cmd,
62+
cwd = cwd,
63+
env = env,
64+
stdin = stdin,
65+
stdout = stdout,
66+
stderr = stderr,
67+
mergeErrIntoOut = mergeErrIntoOut,
68+
timeout = timeout,
69+
check = check,
70+
propagateEnv = propagateEnv,
71+
timeoutGracePeriod = timeoutGracePeriod,
72+
shutdownHook = false
4473
)
4574
}
4675
}
@@ -59,7 +88,8 @@ object spawn {
5988
stdout: ProcessOutput = Pipe,
6089
stderr: ProcessOutput = os.Inherit,
6190
mergeErrIntoOut: Boolean = false,
62-
propagateEnv: Boolean = true
91+
propagateEnv: Boolean = true,
92+
shutdownHook: Boolean = false
6393
): SubProcess = {
6494
os.proc(cmd).spawn(
6595
cwd = cwd,
@@ -68,7 +98,32 @@ object spawn {
6898
stdout = stdout,
6999
stderr = stderr,
70100
mergeErrIntoOut = mergeErrIntoOut,
71-
propagateEnv = propagateEnv
101+
propagateEnv = propagateEnv,
102+
shutdownHook = shutdownHook
103+
)
104+
}
105+
def apply(
106+
cmd: Shellable,
107+
// Make sure `cwd` only comes after `env`, so `os.spawn("foo", path)` is a compile error
108+
// since the correct syntax is `os.spawn(("foo", path))`
109+
env: Map[String, String],
110+
cwd: Path,
111+
stdin: ProcessInput,
112+
stdout: ProcessOutput,
113+
stderr: ProcessOutput,
114+
mergeErrIntoOut: Boolean,
115+
propagateEnv: Boolean
116+
): SubProcess = {
117+
spawn(
118+
cmd = cmd,
119+
cwd = cwd,
120+
env = env,
121+
stdin = stdin,
122+
stdout = stdout,
123+
stderr = stderr,
124+
mergeErrIntoOut = mergeErrIntoOut,
125+
propagateEnv = propagateEnv,
126+
shutdownHook = false
72127
)
73128
}
74129
}
@@ -138,7 +193,8 @@ case class proc(command: Shellable*) {
138193
check: Boolean = true,
139194
propagateEnv: Boolean = true,
140195
// this cannot be next to `timeout` as this will introduce a bin-compat break (default arguments are numbered in the bytecode)
141-
timeoutGracePeriod: Long = 100
196+
timeoutGracePeriod: Long = 100,
197+
shutdownHook: Boolean = false
142198
): CommandResult = {
143199

144200
val chunks = new java.util.concurrent.ConcurrentLinkedQueue[Either[geny.Bytes, geny.Bytes]]
@@ -191,6 +247,32 @@ case class proc(command: Shellable*) {
191247
timeoutGracePeriod = 100
192248
)
193249

250+
private[os] def call(
251+
cwd: Path,
252+
env: Map[String, String],
253+
stdin: ProcessInput,
254+
stdout: ProcessOutput,
255+
stderr: ProcessOutput,
256+
mergeErrIntoOut: Boolean,
257+
timeout: Long,
258+
check: Boolean,
259+
propagateEnv: Boolean,
260+
timeoutGracePeriod: Long
261+
): CommandResult = call(
262+
cwd,
263+
env,
264+
stdin,
265+
stdout,
266+
stderr,
267+
mergeErrIntoOut,
268+
timeout,
269+
check,
270+
propagateEnv,
271+
timeoutGracePeriod,
272+
shutdownHook = false
273+
)
274+
275+
194276
/**
195277
* The most flexible of the [[os.proc]] calls, `os.proc.spawn` simply configures
196278
* and starts a subprocess, and returns it as a `java.lang.Process` for you to
@@ -208,7 +290,9 @@ case class proc(command: Shellable*) {
208290
stdout: ProcessOutput = Pipe,
209291
stderr: ProcessOutput = os.Inherit,
210292
mergeErrIntoOut: Boolean = false,
211-
propagateEnv: Boolean = true
293+
propagateEnv: Boolean = true,
294+
shutdownGracePeriod: Long = 100,
295+
shutdownHook: Boolean = false
212296
): SubProcess = {
213297

214298
val cmdChunks = commandChunks
@@ -230,19 +314,54 @@ case class proc(command: Shellable*) {
230314
propagateEnv
231315
)
232316

317+
lazy val shutdownHookThread =
318+
if (!shutdownHook) None
319+
else Some(new Thread("subprocess-shutdown-hook") {
320+
override def run(): Unit = proc.destroyForcibly(shutdownGracePeriod)
321+
})
322+
323+
lazy val shutdownHookMonitorThread = shutdownHookThread.map(t =>
324+
new Thread("subprocess-shutdown-hook-monitor") {
325+
override def run(): Unit = {
326+
while(proc.wrapped.isAlive) Thread.sleep(1)
327+
Runtime.getRuntime().removeShutdownHook(t)
328+
}
329+
}
330+
)
331+
332+
shutdownHookThread.foreach(Runtime.getRuntime().addShutdownHook)
333+
233334
lazy val proc: SubProcess = new SubProcess(
234335
builder.start(),
235336
resolvedStdin.processInput(proc.stdin).map(new Thread(_, commandStr + " stdin thread")),
236337
resolvedStdout.processOutput(proc.stdout).map(new Thread(_, commandStr + " stdout thread")),
237-
resolvedStderr.processOutput(proc.stderr).map(new Thread(_, commandStr + " stderr thread"))
338+
resolvedStderr.processOutput(proc.stderr).map(new Thread(_, commandStr + " stderr thread")),
339+
shutdownGracePeriod = shutdownGracePeriod,
340+
shutdownHookMonitorThread = shutdownHookMonitorThread
238341
)
239342

343+
shutdownHookMonitorThread.foreach(_.start())
344+
240345
proc.inputPumperThread.foreach(_.start())
241346
proc.outputPumperThread.foreach(_.start())
242347
proc.errorPumperThread.foreach(_.start())
243348
proc
244349
}
245350

351+
def spawn(
352+
cwd: Path,
353+
env: Map[String, String],
354+
stdin: ProcessInput,
355+
stdout: ProcessOutput,
356+
stderr: ProcessOutput,
357+
mergeErrIntoOut: Boolean,
358+
propagateEnv: Boolean
359+
): SubProcess = spawn(
360+
cwd = cwd, env = env, stdin = stdin, stdout = stdout, stderr = stderr,
361+
mergeErrIntoOut = mergeErrIntoOut,
362+
propagateEnv = propagateEnv,
363+
shutdownGracePeriod = 100, shutdownHook = false
364+
)
246365
/**
247366
* Pipes the output of this process into the input of the [[next]] process. Returns a
248367
* [[ProcGroup]] containing both processes, which you can then either execute or

os/src/SubProcess.scala

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,17 @@ class SubProcess(
106106
val wrapped: java.lang.Process,
107107
val inputPumperThread: Option[Thread],
108108
val outputPumperThread: Option[Thread],
109-
val errorPumperThread: Option[Thread]
109+
val errorPumperThread: Option[Thread],
110+
val shutdownGracePeriod: Long,
111+
val shutdownHookMonitorThread: Option[Thread]
110112
) extends ProcessLike {
113+
def this(
114+
wrapped: java.lang.Process,
115+
inputPumperThread: Option[Thread],
116+
outputPumperThread: Option[Thread],
117+
errorPumperThread: Option[Thread]) = this(
118+
wrapped, inputPumperThread, outputPumperThread, errorPumperThread, 100, None
119+
)
111120
val stdin: SubProcess.InputStream = new SubProcess.InputStream(wrapped.getOutputStream)
112121
val stdout: SubProcess.OutputStream = new SubProcess.OutputStream(wrapped.getInputStream)
113122
val stderr: SubProcess.OutputStream = new SubProcess.OutputStream(wrapped.getErrorStream)
@@ -133,7 +142,17 @@ class SubProcess(
133142
/**
134143
* Force-destroys the subprocess, via the underlying JVM APIs
135144
*/
136-
def destroyForcibly(): Unit = wrapped.destroyForcibly()
145+
def destroyForcibly(shutdownGracePeriod: Long = this.shutdownGracePeriod): Unit = {
146+
val now = System.currentTimeMillis()
147+
148+
while(wrapped.isAlive && System.currentTimeMillis() - now < shutdownGracePeriod){
149+
Thread.sleep(1)
150+
}
151+
152+
if (wrapped.isAlive) wrapped.destroyForcibly()
153+
}
154+
155+
def destroyForcibly(): Unit = destroyForcibly(shutdownGracePeriod = this.shutdownGracePeriod)
137156

138157
/**
139158
* Alias for [[destroy]]

0 commit comments

Comments
 (0)