Skip to content

Commit 6a34d00

Browse files
committed
Added Processes to IONative
1 parent 51ff6f2 commit 6a34d00

File tree

3 files changed

+259
-31
lines changed

3 files changed

+259
-31
lines changed

io/native/src/main/scala/fs2/io/ioplatform.scala

Lines changed: 34 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -57,36 +57,7 @@ private[fs2] trait ioplatform extends iojvmnative {
5757
/** Stream of bytes read asynchronously from standard input. */
5858
def stdin[F[_]: Async: LiftIO](bufSize: Int): Stream[F, Byte] =
5959
if (LinktimeInfo.isLinux || LinktimeInfo.isMac)
60-
Stream
61-
.resource {
62-
Resource
63-
.eval {
64-
setNonBlocking(STDIN_FILENO) *> fileDescriptorPoller[F]
65-
}
66-
.flatMap { poller =>
67-
poller.registerFileDescriptor(STDIN_FILENO, true, false).mapK(LiftIO.liftK)
68-
}
69-
}
70-
.flatMap { handle =>
71-
Stream.repeatEval {
72-
handle
73-
.pollReadRec(()) { _ =>
74-
IO {
75-
val buf = new Array[Byte](bufSize)
76-
val readed = guard(read(STDIN_FILENO, buf.atUnsafe(0), bufSize.toULong))
77-
if (readed > 0)
78-
Right(Some(Chunk.array(buf, 0, readed)))
79-
else if (readed == 0)
80-
Right(None)
81-
else
82-
Left(())
83-
}
84-
}
85-
.to
86-
}
87-
}
88-
.unNoneTerminate
89-
.unchunks
60+
readFd(STDIN_FILENO, bufSize)
9061
else
9162
readInputStream(Sync[F].blocking(System.in), bufSize, false)
9263

@@ -104,7 +75,39 @@ private[fs2] trait ioplatform extends iojvmnative {
10475
else
10576
writeOutputStream(Sync[F].blocking(System.err), false)
10677

107-
private[this] def writeFd[F[_]: Async: LiftIO](fd: Int): Pipe[F, Byte, Nothing] = in =>
78+
private[fs2] def readFd[F[_]: Async: LiftIO](fd: Int, bufSize: Int): Stream[F, Byte] =
79+
Stream
80+
.resource {
81+
Resource
82+
.eval {
83+
setNonBlocking(STDIN_FILENO) *> fileDescriptorPoller[F]
84+
}
85+
.flatMap { poller =>
86+
poller.registerFileDescriptor(fd, true, false).mapK(LiftIO.liftK)
87+
}
88+
}
89+
.flatMap { handle =>
90+
Stream.repeatEval {
91+
handle
92+
.pollReadRec(()) { _ =>
93+
IO {
94+
val buf = new Array[Byte](bufSize)
95+
val readed = guard(read(STDIN_FILENO, buf.atUnsafe(0), bufSize.toULong))
96+
if (readed > 0)
97+
Right(Some(Chunk.array(buf, 0, readed)))
98+
else if (readed == 0)
99+
Right(None)
100+
else
101+
Left(())
102+
}
103+
}
104+
.to
105+
}
106+
}
107+
.unNoneTerminate
108+
.unchunks
109+
110+
private[fs2] def writeFd[F[_]: Async: LiftIO](fd: Int): Pipe[F, Byte, Nothing] = in =>
108111
Stream
109112
.resource {
110113
Resource
Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
/*
2+
* Copyright (c) 2013 Functional Streams for Scala
3+
*
4+
* Permission is hereby granted, free of charge, to any person obtaining a copy of
5+
* this software and associated documentation files (the "Software"), to deal in
6+
* the Software without restriction, including without limitation the rights to
7+
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
8+
* the Software, and to permit persons to whom the Software is furnished to do so,
9+
* subject to the following conditions:
10+
*
11+
* The above copyright notice and this permission notice shall be included in all
12+
* copies or substantial portions of the Software.
13+
*
14+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
16+
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
17+
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
18+
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
19+
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
20+
*/
21+
22+
package fs2
23+
package io
24+
package process
25+
26+
import cats.effect.kernel.{Async, Resource}
27+
import cats.syntax.all.*
28+
import fs2.{Stream, Pipe}
29+
import scala.scalanative.unsafe.*
30+
import scala.scalanative.unsigned.*
31+
import scala.scalanative.libc.*
32+
import scala.scalanative.posix.sys.wait.*
33+
import scala.scalanative.posix.errno.*
34+
import scala.scalanative.meta.LinktimeInfo
35+
import scala.scalanative.posix.unistd.*
36+
import scala.scalanative.posix.spawn.*
37+
import scala.scalanative.posix.signal.*
38+
import java.io.IOException
39+
import cats.effect.LiftIO
40+
import cats.effect.IO
41+
42+
@extern
43+
object SyscallBindings {
44+
def syscall(number: CLong, arg1: CLong, arg2: CLong): CLong = extern
45+
}
46+
47+
object PidFd {
48+
private val SYS_pidfd_open = 434L
49+
val PIDFD_NONBLOCK = 1
50+
51+
def pidfd_open(pid: pid_t, flags: Int): Int = {
52+
val fd = SyscallBindings.syscall(SYS_pidfd_open, pid.toLong, flags.toLong)
53+
fd.toInt
54+
}
55+
}
56+
57+
private[process] trait ProcessesCompanionPlatform {
58+
def forAsync[F[_]: LiftIO](implicit F: Async[F]): Processes[F] = new UnsealedProcesses[F] {
59+
60+
def spawn(process: ProcessBuilder): Resource[F, Process[F]] = {
61+
62+
def createProcess(): F[NativeProcess] = F.blocking {
63+
Zone { implicit z =>
64+
val allArgs = process.command +: process.args
65+
val argv = stackalloc[CString](allArgs.length.toULong + 1.toULong)
66+
allArgs.zipWithIndex.foreach { case (arg, i) =>
67+
argv(i.toULong) = toCString(arg)
68+
}
69+
argv(allArgs.length.toULong) = null
70+
71+
val envMap =
72+
if (process.inheritEnv)
73+
sys.env ++ process.extraEnv
74+
else
75+
process.extraEnv
76+
77+
val envp = stackalloc[CString]((envMap.size + 1).toULong)
78+
envMap.zipWithIndex.foreach { case ((k, v), i) =>
79+
envp(i.toULong) = toCString(s"$k=$v")
80+
}
81+
envp(envMap.size.toULong) = null
82+
83+
val stdinPipe = stackalloc[CInt](2)
84+
val stdoutPipe = stackalloc[CInt](2)
85+
val stderrPipe = stackalloc[CInt](2)
86+
87+
if (pipe(stdinPipe) != 0 || pipe(stdoutPipe) != 0 || pipe(stderrPipe) != 0) {
88+
F.raiseError(new RuntimeException("Failed to create stdin pipe"))
89+
}
90+
91+
val fileActions = stackalloc[posix_spawn_file_actions_t]()
92+
posix_spawn_file_actions_init(fileActions)
93+
94+
posix_spawn_file_actions_adddup2(fileActions, stdinPipe(0), STDIN_FILENO)
95+
posix_spawn_file_actions_adddup2(fileActions, stdoutPipe(1), STDOUT_FILENO)
96+
posix_spawn_file_actions_adddup2(fileActions, stderrPipe(1), STDERR_FILENO)
97+
98+
posix_spawn_file_actions_addclose(fileActions, stdinPipe(1))
99+
posix_spawn_file_actions_addclose(fileActions, stdoutPipe(0))
100+
posix_spawn_file_actions_addclose(fileActions, stderrPipe(0))
101+
102+
val pid = stackalloc[pid_t]()
103+
println("hi")
104+
println("argv(0): " + fromCString(argv(0)))
105+
var i = 0
106+
while (argv(i) != null) {
107+
println(s" argv($i): " + fromCString(argv(i)))
108+
i += 1
109+
}
110+
val result = posix_spawn(
111+
pid,
112+
argv(0),
113+
fileActions,
114+
null,
115+
argv,
116+
envp
117+
)
118+
119+
posix_spawn_file_actions_destroy(fileActions)
120+
121+
if (result != 0) {
122+
close(stdinPipe(0)); close(stdinPipe(1))
123+
close(stdoutPipe(0)); close(stdoutPipe(1))
124+
close(stderrPipe(0)); close(stderrPipe(1))
125+
throw new RuntimeException(s"posix_spawn failed: $result")
126+
}
127+
128+
close(stdinPipe(0))
129+
close(stdoutPipe(1))
130+
close(stderrPipe(1))
131+
132+
NativeProcess(
133+
pid = !pid,
134+
stdinFd = stdinPipe(1),
135+
stdoutFd = stdoutPipe(0),
136+
stderrFd = stderrPipe(0)
137+
)
138+
}
139+
}
140+
141+
def cleanup(proc: NativeProcess): F[Unit] =
142+
F.blocking {
143+
close(proc.stdinFd); close(proc.stdoutFd); close(proc.stderrFd)
144+
} *>
145+
F.delay(kill(proc.pid, SIGKILL)) *>
146+
F.blocking {
147+
val status = stackalloc[CInt]()
148+
val r = waitpid(proc.pid, status, 0)
149+
if (r < 0 && errno.errno != ECHILD)
150+
F.raiseError(new RuntimeException(s"waitpid failed: errno=${errno.errno}"))
151+
()
152+
}
153+
154+
Resource.make(createProcess())(cleanup).map { nativeProcess =>
155+
new UnsealedProcess[F] {
156+
def isAlive: F[Boolean] = F.delay {
157+
kill(nativeProcess.pid, 0) == 0 || errno.errno != ESRCH
158+
}
159+
160+
def exitValue: F[Int] =
161+
if (LinktimeInfo.isLinux) {
162+
F.delay(PidFd.pidfd_open(nativeProcess.pid, PidFd.PIDFD_NONBLOCK)).flatMap { pidfd =>
163+
if (pidfd >= 0) {
164+
fileDescriptorPoller[F].flatMap { poller =>
165+
poller
166+
.registerFileDescriptor(pidfd, true, false)
167+
.use { handle =>
168+
handle.pollReadRec(()) { _ =>
169+
IO {
170+
Zone { _ =>
171+
val statusPtr = stackalloc[CInt]()
172+
val result = waitpid(nativeProcess.pid, statusPtr, WNOHANG)
173+
174+
if (result == nativeProcess.pid) {
175+
val exitCode = WEXITSTATUS(!statusPtr)
176+
Right(exitCode)
177+
} else if (result == 0) {
178+
Left(())
179+
} else {
180+
if (errno.errno == ECHILD) {
181+
throw new IOException("No such process")
182+
} else {
183+
throw new IOException(
184+
s"waitpid failed with errno: ${errno.errno}"
185+
)
186+
}
187+
}
188+
}
189+
}
190+
}
191+
}
192+
.to
193+
}
194+
} else {
195+
fallbackExitValue(nativeProcess.pid)
196+
}
197+
}
198+
} else {
199+
fallbackExitValue(nativeProcess.pid)
200+
}
201+
202+
def stdin: Pipe[F, Byte, Nothing] = writeFd(nativeProcess.stdinFd)
203+
204+
def stdout: Stream[F, Byte] = readFd(nativeProcess.stdoutFd, 8192)
205+
206+
def stderr: Stream[F, Byte] = readFd(nativeProcess.stderrFd, 8192)
207+
}
208+
}
209+
}
210+
211+
private def fallbackExitValue(pid: pid_t): F[Int] = F.delay {
212+
val status = stackalloc[CInt]()
213+
val result = waitpid(pid, status, 0)
214+
if (result < 0) throw new IOException(s"waitpid failed with errno: ${errno.errno}")
215+
WEXITSTATUS(!status)
216+
}
217+
}
218+
}
219+
220+
case class NativeProcess(
221+
pid: pid_t,
222+
stdinFd: Int,
223+
stdoutFd: Int,
224+
stderrFd: Int
225+
)

0 commit comments

Comments
 (0)