Skip to content

Commit 26b2ce6

Browse files
authored
Process piping (#200)
Resolve #187 This PR adds a way to pipe processes concisely and conveniently. Previously, creating pipelines behaving similarly to Unix's was impossible (or hard). `Broken pipe` error was handled by the parent process, so pipeline like `yes | head -n 5` would run forever, and the app itself crash/throw error. Additionally, there was no way to pass arguments like `pipefail`. All this is done with custom handler threads and not the JVM 9 API. Java's API is also incapable of handling `Broken pipe`. On Windows, the broken pipe handling is not supported - there is no `broken pipe` error. Pull request: #200
1 parent 94b2691 commit 26b2ce6

File tree

9 files changed

+706
-26
lines changed

9 files changed

+706
-26
lines changed

.github/workflows/build.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ jobs:
2525
with:
2626
distribution: 'temurin'
2727
java-version: ${{ matrix.java-version }}
28-
2928
- name: Fetch millw launcher (Windows)
3029
run: curl -Lo mill.bat "https://raw.githubusercontent.com/lefou/millw/main/millw.bat"
3130
if: matrix.os == 'windows-latest'

Readme.adoc

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1654,6 +1654,48 @@ val sha = os.proc("shasum", "-a", "256").spawn(stdin = gzip.stdout)
16541654
sha.stdout.trim ==> "acc142175fa520a1cb2be5b97cbbe9bea092e8bba3fe2e95afa645615908229e -"
16551655
----
16561656

1657+
== Spawning Pipelines of Subprocesses
1658+
1659+
After constructing a subprocess with `os.proc`, you can use the `pipeTo` method
1660+
to pipe its output to another subprocess:
1661+
1662+
[source,scala]
1663+
----
1664+
val wc = os.proc("ls", "-l")
1665+
.pipeTo(os.proc("wc", "-l"))
1666+
.call()
1667+
.out.text()
1668+
----
1669+
1670+
This is equivalent to the shell command `ls -l | wc -l`. You can chain together
1671+
as many subprocesses as you like. Note that by using this API you can utilize
1672+
the broken pipe behaviour of Unix systems. For example, you can take 10 first elements
1673+
of output from the `yes` command, and after the `head` command terminates, the `yes`
1674+
command will be terminated as well:
1675+
1676+
[source,scala]
1677+
----
1678+
val yes10 = os.proc("yes")
1679+
.pipeTo(os.proc("head", "-n", "10"))
1680+
.call()
1681+
.out.text()
1682+
----
1683+
1684+
This feature is implemented inside the library and will terminate any process reading the
1685+
stdin of other process in pipeline on every IO error. This behavior can be disabled via the
1686+
`handleBrokenPipe` flag on `call` and `spawn` methods. Note that Windows does not support
1687+
broken pipe behaviour, so a command like`yes` would run forever. `handleBrokenPipe` is set
1688+
to false by default on Windows.
1689+
1690+
Both `call` and `spawn` correspond in their behavior to their counterparts in the `os.proc`,
1691+
but `spawn` returns the `os.ProcessPipeline` instance instead. It offers the same
1692+
`API` as `SubProcess`, but will operate on the set of processes instead of a single one.
1693+
1694+
`Pipefail` is enabled by default, so if any of the processes in the pipeline fails, the whole
1695+
pipeline will have a non-zero exit code. This behavior can be disabled via the `pipefail` flag
1696+
on `call` and `spawn` methods. Note that the pipefail does not kill the processes in the pipeline,
1697+
it just sets the exit code of the pipeline to the exit code of the failed process.
1698+
16571699
=== Watching for Changes
16581700

16591701
==== `os.watch.watch`

build.sc

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@ trait MiMaChecks extends Mima {
5858
)
5959
}
6060

61+
object testJarWriter extends JavaModule
62+
object testJarReader extends JavaModule
63+
object testJarExit extends JavaModule
64+
6165
trait OsLibModule
6266
extends CrossScalaModule
6367
with PublishModule
@@ -84,7 +88,12 @@ trait OsLibModule
8488
def ivyDeps = Agg(Deps.utest, Deps.sourcecode)
8589

8690
// we check the textual output of system commands and expect it in english
87-
def forkEnv = super.forkEnv() ++ Map("LC_ALL" -> "C")
91+
def forkEnv = super.forkEnv() ++ Map(
92+
"LC_ALL" -> "C",
93+
"TEST_JAR_WRITER_ASSEMBLY" -> testJarWriter.assembly().path.toString,
94+
"TEST_JAR_READER_ASSEMBLY" -> testJarReader.assembly().path.toString,
95+
"TEST_JAR_EXIT_ASSEMBLY" -> testJarExit.assembly().path.toString
96+
)
8897
}
8998
}
9099

os/src-jvm/ProcessOps.scala

Lines changed: 235 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,14 @@
11
package os
22

33
import java.util.concurrent.{ArrayBlockingQueue, Semaphore, TimeUnit}
4-
4+
import collection.JavaConverters._
55
import scala.annotation.tailrec
6+
import java.lang.ProcessBuilder.Redirect
7+
import os.SubProcess.InputStream
8+
import java.io.IOException
9+
import java.util.concurrent.LinkedBlockingQueue
10+
import ProcessOps._
11+
import scala.util.Try
612

713
/**
814
* Convenience APIs around [[java.lang.Process]] and [[java.lang.ProcessBuilder]]:
@@ -21,6 +27,7 @@ import scala.annotation.tailrec
2127
* the standard stdin/stdout/stderr streams, using whatever protocol you
2228
* want
2329
*/
30+
2431
case class proc(command: Shellable*) {
2532
def commandChunks: Seq[String] = command.flatMap(_.value)
2633

@@ -79,7 +86,6 @@ case class proc(command: Shellable*) {
7986
mergeErrIntoOut,
8087
propagateEnv
8188
)
82-
import collection.JavaConverters._
8389

8490
sub.join(timeout)
8591

@@ -94,9 +100,6 @@ case class proc(command: Shellable*) {
94100
* and starts a subprocess, and returns it as a `java.lang.Process` for you to
95101
* interact with however you like.
96102
*
97-
* To implement pipes, you can spawn a process, take it's stdout, and pass it
98-
* as the stdin of a second spawned process.
99-
*
100103
* Note that if you provide `ProcessOutput` callbacks to `stdout`/`stderr`,
101104
* the calls to those callbacks take place on newly spawned threads that
102105
* execute in parallel with the main thread. Thus make sure any data
@@ -111,28 +114,13 @@ case class proc(command: Shellable*) {
111114
mergeErrIntoOut: Boolean = false,
112115
propagateEnv: Boolean = true
113116
): SubProcess = {
114-
val builder = new java.lang.ProcessBuilder()
115-
116-
val baseEnv =
117-
if (propagateEnv) sys.env
118-
else Map()
119-
for ((k, v) <- baseEnv ++ Option(env).getOrElse(Map())) {
120-
if (v != null) builder.environment().put(k, v)
121-
else builder.environment().remove(k)
122-
}
123-
124-
builder.directory(Option(cwd).getOrElse(os.pwd).toIO)
117+
val builder =
118+
buildProcess(commandChunks, cwd, env, stdin, stdout, stderr, mergeErrIntoOut, propagateEnv)
125119

126120
val cmdChunks = commandChunks
127121
val commandStr = cmdChunks.mkString(" ")
128122
lazy val proc: SubProcess = new SubProcess(
129-
builder
130-
.command(cmdChunks: _*)
131-
.redirectInput(stdin.redirectFrom)
132-
.redirectOutput(stdout.redirectTo)
133-
.redirectError(stderr.redirectTo)
134-
.redirectErrorStream(mergeErrIntoOut)
135-
.start(),
123+
builder.start(),
136124
stdin.processInput(proc.stdin).map(new Thread(_, commandStr + " stdin thread")),
137125
stdout.processOutput(proc.stdout).map(new Thread(_, commandStr + " stdout thread")),
138126
stderr.processOutput(proc.stderr).map(new Thread(_, commandStr + " stderr thread"))
@@ -143,4 +131,228 @@ case class proc(command: Shellable*) {
143131
proc.errorPumperThread.foreach(_.start())
144132
proc
145133
}
134+
135+
/**
136+
* Pipes the output of this process into the input of the [[next]] process. Returns a
137+
* [[ProcGroup]] containing both processes, which you can then either execute or
138+
* pipe further.
139+
*/
140+
def pipeTo(next: proc): ProcGroup = ProcGroup(Seq(this, next))
141+
}
142+
143+
/**
144+
* A group of processes that are piped together, corresponding to e.g. `ls -l | grep .scala`.
145+
* You can create a `ProcGroup` by calling `.pipeTo` on a [[proc]] multiple times.
146+
* Contains methods corresponding to the methods on [[proc]], but defined for pipelines
147+
* of processes.
148+
*/
149+
case class ProcGroup private[os] (commands: Seq[proc]) {
150+
assert(commands.size >= 2)
151+
152+
private lazy val isWindows = sys.props("os.name").toLowerCase().contains("windows")
153+
154+
/**
155+
* Invokes the given pipeline like a function, passing in input and returning a
156+
* [[CommandResult]]. You can then call `result.exitCode` to see how it exited, or
157+
* `result.out.bytes` or `result.err.string` to access the aggregated stdout and
158+
* stderr of the subprocess in a number of convenient ways. If a non-zero exit code
159+
* is returned, this throws a [[os.SubprocessException]] containing the
160+
* [[CommandResult]], unless you pass in `check = false`.
161+
*
162+
* For each process in pipeline, the output will be forwarded to the input of the next
163+
* process. Input of the first process is set to provided [[stdin]] The output of the last
164+
* process will be returned as the output of the pipeline. [[stderr]] is set for all processes.
165+
*
166+
* `call` provides a number of parameters that let you configure how the pipeline
167+
* is run:
168+
*
169+
* @param cwd the working directory of the pipeline
170+
* @param env any additional environment variables you wish to set in the pipeline
171+
* @param stdin any data you wish to pass to the pipelines's standard input (to the first process)
172+
* @param stdout How the pipelines's output stream is configured (the last process stdout)
173+
* @param stderr How the process's error stream is configured (set for all processes)
174+
* @param mergeErrIntoOut merges the pipeline's stderr stream into it's stdout. Note that then the
175+
* stderr will be forwarded with stdout to subsequent processes in the pipeline.
176+
* @param timeout how long to wait in milliseconds for the pipeline to complete
177+
* @param check disable this to avoid throwing an exception if the pipeline
178+
* fails with a non-zero exit code
179+
* @param propagateEnv disable this to avoid passing in this parent process's
180+
* environment variables to the pipeline
181+
* @param pipefail if true, the pipeline's exitCode will be the exit code of the first
182+
* failing process. If no process fails, the exit code will be 0.
183+
* @param handleBrokenPipe if true, every [[java.io.IOException]] when redirecting output of a process
184+
* will be caught and handled by killing the writing process. This behaviour
185+
* is consistent with handlers of SIGPIPE signals in most programs
186+
* supporting interruptable piping. Disabled by default on Windows.
187+
*/
188+
def call(
189+
cwd: Path = null,
190+
env: Map[String, String] = null,
191+
stdin: ProcessInput = Pipe,
192+
stdout: ProcessOutput = Pipe,
193+
stderr: ProcessOutput = os.Inherit,
194+
mergeErrIntoOut: Boolean = false,
195+
timeout: Long = -1,
196+
check: Boolean = true,
197+
propagateEnv: Boolean = true,
198+
pipefail: Boolean = true,
199+
handleBrokenPipe: Boolean = !isWindows
200+
): CommandResult = {
201+
val chunks = new java.util.concurrent.ConcurrentLinkedQueue[Either[geny.Bytes, geny.Bytes]]
202+
203+
val sub = spawn(
204+
cwd,
205+
env,
206+
stdin,
207+
if (stdout ne os.Pipe) stdout
208+
else os.ProcessOutput.ReadBytes((buf, n) =>
209+
chunks.add(Left(new geny.Bytes(java.util.Arrays.copyOf(buf, n))))
210+
),
211+
if (stderr ne os.Pipe) stderr
212+
else os.ProcessOutput.ReadBytes((buf, n) =>
213+
chunks.add(Right(new geny.Bytes(java.util.Arrays.copyOf(buf, n))))
214+
),
215+
mergeErrIntoOut,
216+
propagateEnv,
217+
pipefail
218+
)
219+
220+
sub.join(timeout)
221+
222+
val chunksSeq = chunks.iterator.asScala.toIndexedSeq
223+
val res =
224+
CommandResult(commands.flatMap(_.commandChunks :+ "|").init, sub.exitCode(), chunksSeq)
225+
if (res.exitCode == 0 || !check) res
226+
else throw SubprocessException(res)
227+
}
228+
229+
/**
230+
* The most flexible of the [[os.ProcGroup]] calls. It sets-up a pipeline of processes,
231+
* and returns a [[ProcessPipeline]] for you to interact with however you like.
232+
*
233+
* Note that if you provide `ProcessOutput` callbacks to `stdout`/`stderr`,
234+
* the calls to those callbacks take place on newly spawned threads that
235+
* execute in parallel with the main thread. Thus make sure any data
236+
* processing you do in those callbacks is thread safe!
237+
* @param cwd the working directory of the pipeline
238+
* @param env any additional environment variables you wish to set in the pipeline
239+
* @param stdin any data you wish to pass to the pipelines's standard input (to the first process)
240+
* @param stdout How the pipelines's output stream is configured (the last process stdout)
241+
* @param stderr How the process's error stream is configured (set for all processes)
242+
* @param mergeErrIntoOut merges the pipeline's stderr stream into it's stdout. Note that then the
243+
* stderr will be forwarded with stdout to subsequent processes in the pipeline.
244+
* @param propagateEnv disable this to avoid passing in this parent process's
245+
* environment variables to the pipeline
246+
* @param pipefail if true, the pipeline's exitCode will be the exit code of the first
247+
* failing process. If no process fails, the exit code will be 0.
248+
* @param handleBrokenPipe if true, every [[java.io.IOException]] when redirecting output of a process
249+
* will be caught and handled by killing the writing process. This behaviour
250+
* is consistent with handlers of SIGPIPE signals in most programs
251+
* supporting interruptable piping. Disabled by default on Windows.
252+
*/
253+
def spawn(
254+
cwd: Path = null,
255+
env: Map[String, String] = null,
256+
stdin: ProcessInput = Pipe,
257+
stdout: ProcessOutput = Pipe,
258+
stderr: ProcessOutput = os.Inherit,
259+
mergeErrIntoOut: Boolean = false,
260+
propagateEnv: Boolean = true,
261+
pipefail: Boolean = true,
262+
handleBrokenPipe: Boolean = !isWindows
263+
): ProcessPipeline = {
264+
val brokenPipeQueue = new LinkedBlockingQueue[Int]()
265+
val (_, procs) =
266+
commands.zipWithIndex.foldLeft((Option.empty[ProcessInput], Seq.empty[SubProcess])) {
267+
case ((None, _), (proc, _)) =>
268+
val spawned = proc.spawn(cwd, env, stdin, Pipe, stderr, mergeErrIntoOut, propagateEnv)
269+
(Some(spawned.stdout), Seq(spawned))
270+
case ((Some(input), acc), (proc, index)) if index == commands.length - 1 =>
271+
val spawned = proc.spawn(
272+
cwd,
273+
env,
274+
wrapWithBrokenPipeHandler(input, index - 1, brokenPipeQueue),
275+
stdout,
276+
stderr,
277+
mergeErrIntoOut,
278+
propagateEnv
279+
)
280+
(None, acc :+ spawned)
281+
case ((Some(input), acc), (proc, index)) =>
282+
val spawned = proc.spawn(
283+
cwd,
284+
env,
285+
wrapWithBrokenPipeHandler(input, index - 1, brokenPipeQueue),
286+
Pipe,
287+
stderr,
288+
mergeErrIntoOut,
289+
propagateEnv
290+
)
291+
(Some(spawned.stdout), acc :+ spawned)
292+
}
293+
val pipeline =
294+
new ProcessPipeline(procs, pipefail, if (handleBrokenPipe) Some(brokenPipeQueue) else None)
295+
pipeline.brokenPipeHandler.foreach(_.start())
296+
pipeline
297+
}
298+
299+
private def wrapWithBrokenPipeHandler(
300+
wrapped: ProcessInput,
301+
index: Int,
302+
queue: LinkedBlockingQueue[Int]
303+
) =
304+
new ProcessInput {
305+
override def redirectFrom: Redirect = wrapped.redirectFrom
306+
override def processInput(stdin: => InputStream): Option[Runnable] =
307+
wrapped.processInput(stdin).map { runnable =>
308+
new Runnable {
309+
def run() = {
310+
try {
311+
runnable.run()
312+
} catch {
313+
case e: IOException =>
314+
println(s"Broken pipe in process $index")
315+
queue.put(index)
316+
}
317+
}
318+
}
319+
}
320+
}
321+
322+
/**
323+
* Pipes the output of this pipeline into the input of the [[next]] process.
324+
*/
325+
def pipeTo(next: proc) = ProcGroup(commands :+ next)
326+
}
327+
328+
private[os] object ProcessOps {
329+
def buildProcess(
330+
command: Seq[String],
331+
cwd: Path = null,
332+
env: Map[String, String] = null,
333+
stdin: ProcessInput = Pipe,
334+
stdout: ProcessOutput = Pipe,
335+
stderr: ProcessOutput = os.Inherit,
336+
mergeErrIntoOut: Boolean = false,
337+
propagateEnv: Boolean = true
338+
): ProcessBuilder = {
339+
val builder = new java.lang.ProcessBuilder()
340+
341+
val baseEnv =
342+
if (propagateEnv) sys.env
343+
else Map()
344+
for ((k, v) <- baseEnv ++ Option(env).getOrElse(Map())) {
345+
if (v != null) builder.environment().put(k, v)
346+
else builder.environment().remove(k)
347+
}
348+
349+
builder.directory(Option(cwd).getOrElse(os.pwd).toIO)
350+
351+
builder
352+
.command(command: _*)
353+
.redirectInput(stdin.redirectFrom)
354+
.redirectOutput(stdout.redirectTo)
355+
.redirectError(stderr.redirectTo)
356+
.redirectErrorStream(mergeErrIntoOut)
357+
}
146358
}

0 commit comments

Comments
 (0)