@@ -37,32 +37,23 @@ import java.io.IOException
3737import cats .effect .LiftIO
3838import cats .effect .IO
3939import org .typelevel .scalaccompat .annotation ._
40- import scala .concurrent .duration .*
41- import cats .effect .implicits .*
40+ import fs2 .io .internal .NativeUtil ._
4241
4342@ extern
4443@ nowarn212(" cat=unused" )
45- object SyscallBindings {
46- def syscall ( number : CLong , arg1 : CLong , arg2 : CLong ): CLong = extern
44+ object LibC {
45+ def pidfd_open ( pid : CInt , flags : CInt ): CInt = extern
4746}
4847
49- object pidFd {
50- private val SYS_pidfd_open = 434L
51-
52- def pidfd_open (pid : pid_t, flags : Int ): Int = {
53- val fd = SyscallBindings .syscall(SYS_pidfd_open , pid.toLong, flags.toLong)
54- fd.toInt
55- }
56- }
57-
58- final case class NativeProcess (
48+ private final case class NativeProcess (
5949 pid : pid_t,
6050 stdinFd : Int ,
6151 stdoutFd : Int ,
62- stderrFd : Int
52+ stderrFd : Int ,
53+ pidfd : Option [Int ] = None
6354)
6455
65- private [process] trait ProcessesCompanionPlatform extends Processesjvmnative {
56+ private [process] trait ProcessesCompanionPlatform extends ProcessesCompanionJvmNative {
6657
6758 private def findExecutable (cmd : String )(implicit z : Zone ): Option [String ] = {
6859 val pathEnv = sys.env.getOrElse(" PATH" , " " )
@@ -76,22 +67,35 @@ private[process] trait ProcessesCompanionPlatform extends Processesjvmnative {
7667 }
7768
7869 @ inline private def closeAll (fds : Int * ): Unit =
79- fds.foreach { fd => close(fd); () }
70+ fds.foreach(close)
71+
72+ def pipeResource [F [_]](implicit F : Async [F ]): Resource [F , (Int , Int )] =
73+ Resource .make {
74+ F .blocking {
75+ val fds = stackalloc[CInt ](2 .toUInt)
76+ guard_(pipe(fds))
77+ (fds(0 ), fds(1 ))
78+ }
79+ } { case (r, w) => F .blocking { close(r); close(w); () } }
8080
8181 def forAsync [F [_]: LiftIO ](implicit F : Async [F ]): Processes [F ] =
8282 if (LinktimeInfo .isMac || LinktimeInfo .isLinux) {
8383 new UnsealedProcesses [F ] {
8484 def spawn (process : ProcessBuilder ): Resource [F , Process [F ]] = {
8585
86- def createProcess (): F [NativeProcess ] = F .blocking {
87- Zone { implicit z =>
88- val stdinPipe = stackalloc[CInt ](2 .toUInt)
89- val stdoutPipe = stackalloc[CInt ](2 .toUInt)
90- val stderrPipe = stackalloc[CInt ](2 .toUInt)
86+ val pipesResource : Resource [F , ((Int , Int ), (Int , Int ), (Int , Int ))] =
87+ for {
88+ stdinPipe <- pipeResource[F ]
89+ stdoutPipe <- pipeResource[F ]
90+ stderrPipe <- pipeResource[F ]
91+ } yield (stdinPipe, stdoutPipe, stderrPipe)
9192
92- if (pipe(stdinPipe) != 0 || pipe(stdoutPipe) != 0 || pipe(stderrPipe) != 0 ) {
93- throw new RuntimeException (" Failed to create pipes" )
94- }
93+ def createProcess (
94+ stdinPipe : (Int , Int ),
95+ stdoutPipe : (Int , Int ),
96+ stderrPipe : (Int , Int )
97+ ): F [NativeProcess ] = F .blocking {
98+ Zone { implicit z =>
9599 val envMap =
96100 if (process.inheritEnv)
97101 sys.env ++ process.extraEnv
@@ -110,49 +114,42 @@ private[process] trait ProcessesCompanionPlatform extends Processesjvmnative {
110114 }
111115 argv(allArgs.length.toULong) = null
112116
113- val executable = findExecutable(process.command).getOrElse(process.command)
114-
115- fork() match {
116- case - 1 =>
117- closeAll(
118- stdinPipe(0 ),
119- stdinPipe(1 ),
120- stdoutPipe(0 ),
121- stdoutPipe(1 ),
122- stderrPipe(0 ),
123- stderrPipe(1 )
124- )
125- throw new IOException (" Unable to fork process" )
117+ val executable =
118+ if (process.command.startsWith(" /" ))
119+ process.command
120+ else
121+ findExecutable(process.command).getOrElse(process.command)
122+ val ret = guard(fork())
123+ ret match {
126124 case 0 =>
127- closeAll(stdinPipe(1 ), stdoutPipe(0 ), stderrPipe(0 ))
128- if (
129- dup2(stdinPipe(0 ), STDIN_FILENO ) == - 1 ||
130- dup2(stdoutPipe(1 ), STDOUT_FILENO ) == - 1 ||
131- dup2(stderrPipe(1 ), STDERR_FILENO ) == - 1
132- ) {
133- _exit(1 )
134- throw new IOException (" Unable to redirect file descriptors" )
135- }
136- closeAll(stdinPipe(0 ), stdoutPipe(1 ), stderrPipe(1 ))
125+ closeAll(stdinPipe._2, stdoutPipe._1, stderrPipe._1)
126+ guard_(dup2(stdinPipe._1, STDIN_FILENO ))
127+ guard_(dup2(stdoutPipe._2, STDOUT_FILENO ))
128+ guard_(dup2(stderrPipe._2, STDERR_FILENO ))
129+ closeAll(stdinPipe._1, stdoutPipe._2, stderrPipe._2)
137130
138131 process.workingDirectory.foreach { dir =>
139132 if ((dir != null ) && (dir.toString != " ." )) {
140- val ret = chdir(toCString(dir.toString))
141- if (ret != 0 )
142- throw new IOException (s " Failed to chdir to ${dir.toString}" )
133+ guard_(chdir(toCString(dir.toString)))
143134 }
144135 }
145136
146137 execve(toCString(executable), argv, envp)
147138 _exit(127 )
148- throw new IOException ( s " Failed to create process for command: ${process.command} " )
139+ throw new AssertionError ( " unreachable " )
149140 case pid =>
150- closeAll(stdinPipe(0 ), stdoutPipe(1 ), stderrPipe(1 ))
141+ closeAll(stdinPipe._1, stdoutPipe._2, stderrPipe._2)
142+ val pidfd =
143+ if (LinktimeInfo .isLinux) {
144+ val fd = LibC .pidfd_open(pid, 0 )
145+ if (fd >= 0 ) Some (fd) else None
146+ } else None
151147 NativeProcess (
152148 pid = pid,
153- stdinFd = stdinPipe(1 ),
154- stdoutFd = stdoutPipe(0 ),
155- stderrFd = stderrPipe(0 )
149+ stdinFd = stdinPipe._2,
150+ stdoutFd = stdoutPipe._1,
151+ stderrFd = stderrPipe._1,
152+ pidfd
156153 )
157154 }
158155 }
@@ -177,101 +174,90 @@ private[process] trait ProcessesCompanionPlatform extends Processesjvmnative {
177174 }
178175 }
179176
180- Resource .make(createProcess())(cleanup).map { nativeProcess =>
181- new UnsealedProcess [F ] {
182- def isAlive : F [Boolean ] = F .delay {
183- kill(nativeProcess.pid, 0 ) == 0 || errno.errno != EPERM
177+ pipesResource.flatMap { case (stdinPipe, stdoutPipe, stderrPipe) =>
178+ Resource
179+ .make(createProcess(stdinPipe, stdoutPipe, stderrPipe))(cleanup)
180+ .flatMap { nativeProcess =>
181+ nativeProcess.pidfd match {
182+ case Some (pidfd) =>
183+ for {
184+ poller <- Resource .eval(fileDescriptorPoller[F ])
185+ handle <- poller.registerFileDescriptor(pidfd, true , false ).mapK(LiftIO .liftK)
186+ } yield (nativeProcess, Some (handle))
187+ case None =>
188+ Resource .pure((nativeProcess, None ))
189+ }
184190 }
191+ .map { case (nativeProcess, pollHandleOpt) =>
192+ new UnsealedProcess [F ] {
193+ def isAlive : F [Boolean ] = F .delay {
194+ kill(nativeProcess.pid, 0 ) == 0 || errno.errno == EPERM
195+ }
185196
186- def exitValue : F [Int ] =
187- if (LinktimeInfo .isLinux) {
188- F .delay(pidFd.pidfd_open(nativeProcess.pid, 0 )).flatMap { pidfd =>
189- if (pidfd >= 0 ) {
190- fileDescriptorPoller[F ].flatMap { poller =>
191- poller
192- .registerFileDescriptor(pidfd, true , false )
193- .use { handle =>
194- handle.pollReadRec(()) { _ =>
197+ def exitValue : F [Int ] =
198+ if (LinktimeInfo .isLinux) {
199+ (nativeProcess.pidfd, pollHandleOpt) match {
200+ case (Some (_), Some (handle)) =>
201+ handle
202+ .pollReadRec(()) { _ =>
195203 IO {
196204 val statusPtr = stackalloc[CInt ]()
197205 val result = waitpid(nativeProcess.pid, statusPtr, WNOHANG )
198-
199206 if (result == nativeProcess.pid) {
200207 val exitCode = WEXITSTATUS (! statusPtr)
201208 Right (exitCode)
202209 } else if (result == 0 ) {
203210 Left (())
204211 } else {
205- if (errno.errno == ECHILD ) {
212+ if (errno.errno == ECHILD )
206213 throw new IOException (" No such process" )
207- } else {
214+ else
208215 throw new IOException (
209216 s " waitpid failed with errno: ${errno.errno}"
210217 )
211- }
212218 }
213219 }
214220 }
215- }
216- .to
221+ .to
222+ case _ =>
223+ fallbackExitValue(nativeProcess.pid)
217224 }
218225 } else {
219226 fallbackExitValue(nativeProcess.pid)
220227 }
221- }
222- } else {
223- fallbackExitValue(nativeProcess.pid)
224- }
225228
226- def stdin : Pipe [F , Byte , Nothing ] = { in =>
227- in
228- .through(writeFd(nativeProcess.stdinFd))
229- .onFinalize {
230- F .blocking {
231- close(nativeProcess.stdinFd)
232- }.void
229+ def stdin : Pipe [F , Byte , Nothing ] = { in =>
230+ in
231+ .through(writeFd(nativeProcess.stdinFd))
232+ .onFinalize {
233+ F .blocking {
234+ close(nativeProcess.stdinFd)
235+ }.void
236+ }
233237 }
234- }
235- def stdout : Stream [F , Byte ] = readFd(nativeProcess.stdoutFd, 8192 )
236- .onFinalize {
237- F .blocking {
238- close(nativeProcess.stdoutFd)
239- }.void
240- }
241238
242- def stderr : Stream [F , Byte ] = readFd(nativeProcess.stderrFd, 8192 )
243- .onFinalize {
244- F .blocking {
245- close(nativeProcess.stderrFd)
246- }.void
239+ def stdout : Stream [F , Byte ] = readFd(nativeProcess.stdoutFd, 8192 )
240+ .onFinalize {
241+ F .blocking {
242+ close(nativeProcess.stdoutFd)
243+ }.void
244+ }
245+
246+ def stderr : Stream [F , Byte ] = readFd(nativeProcess.stderrFd, 8192 )
247+ .onFinalize {
248+ F .blocking {
249+ close(nativeProcess.stderrFd)
250+ }.void
251+ }
247252 }
248- }
253+ }
249254 }
250255 }
251256
252- private def fallbackExitValue (pid : pid_t): F [Int ] = {
253- def loop : F [Int ] =
254- F .blocking {
255- Zone { _ =>
256- val status = stackalloc[CInt ]()
257- val result = waitpid(pid, status, WNOHANG )
258-
259- if (result == pid) {
260- Some (WEXITSTATUS (! status))
261- } else if (result == 0 ) None
262- else throw new IOException (s " waitpid failed with errno: ${errno.errno}" )
263- }
264- }.flatMap {
265- case Some (code) => F .pure(code)
266- case None => F .sleep(10 .millis) >> loop
267- }
268-
269- loop.onCancel {
270- F .blocking {
271- kill(pid, SIGKILL )
272- ()
273- }
274- }
257+ private def fallbackExitValue (pid : pid_t): F [Int ] = F .blocking {
258+ val status = stackalloc[CInt ]()
259+ guard_(waitpid(pid, status, 0 ))
260+ WEXITSTATUS (! status)
275261 }
276262 }
277263 } else super .forAsync[F ]
0 commit comments