Skip to content

Commit 9f11811

Browse files
committed
revert
1 parent b007fbd commit 9f11811

File tree

1 file changed

+129
-116
lines changed

1 file changed

+129
-116
lines changed

io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala

Lines changed: 129 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -29,28 +29,37 @@ import scala.scalanative.unsafe.*
2929
import scala.scalanative.unsigned.*
3030
import scala.scalanative.libc.*
3131
import scala.scalanative.posix.sys.wait.*
32-
import scala.scalanative.posix.errno.{EPERM, ECHILD}
32+
import scala.scalanative.posix.errno.*
3333
import scala.scalanative.meta.LinktimeInfo
3434
import scala.scalanative.posix.unistd.*
3535
import scala.scalanative.posix.signal.*
3636
import java.io.IOException
3737
import cats.effect.LiftIO
3838
import cats.effect.IO
3939
import org.typelevel.scalaccompat.annotation._
40-
import fs2.io.internal.NativeUtil._
40+
import scala.concurrent.duration.*
41+
import cats.effect.implicits.*
4142

4243
@extern
4344
@nowarn212("cat=unused")
44-
object LibC {
45-
def pidfd_open(pid: CInt, flags: CInt): CInt = extern
45+
object SyscallBindings {
46+
def syscall(number: CLong, arg1: CLong, arg2: CLong): CLong = extern
4647
}
4748

48-
private final case class NativeProcess(
49+
object pidFd {
50+
private val SYS_pidfd_open = 434L
51+
52+
def pidfd_open(pid: pid_t, flags: Int): Int = {
53+
val fd = SyscallBindings.syscall(SYS_pidfd_open, pid.toLong, flags.toLong)
54+
fd.toInt
55+
}
56+
}
57+
58+
final case class NativeProcess(
4959
pid: pid_t,
5060
stdinFd: Int,
5161
stdoutFd: Int,
52-
stderrFd: Int,
53-
pidfd: Option[Int] = None
62+
stderrFd: Int
5463
)
5564

5665
private[process] trait ProcessesCompanionPlatform extends ProcessesCompanionJvmNative {
@@ -67,93 +76,86 @@ private[process] trait ProcessesCompanionPlatform extends ProcessesCompanionJvmN
6776
}
6877

6978
@inline private def closeAll(fds: Int*): Unit =
70-
fds.foreach(close)
71-
72-
def pipeResource[F[_]](implicit F: Async[F]): Resource[F, (Int, Int)] =
73-
Resource.make {
74-
F.blocking {
75-
val fds = stackalloc[CInt](2.toUInt)
76-
guard_(pipe(fds))
77-
(fds(0), fds(1))
78-
}
79-
} { case (r, w) => F.blocking { close(r); close(w); () } }
79+
fds.foreach { fd => close(fd); () }
8080

8181
def forAsync[F[_]: LiftIO](implicit F: Async[F]): Processes[F] =
8282
if (LinktimeInfo.isMac || LinktimeInfo.isLinux) {
8383
new UnsealedProcesses[F] {
8484
def spawn(process: ProcessBuilder): Resource[F, Process[F]] = {
8585

86-
val pipesResource: Resource[F, ((Int, Int), (Int, Int), (Int, Int))] =
87-
for {
88-
stdinPipe <- pipeResource[F]
89-
stdoutPipe <- pipeResource[F]
90-
stderrPipe <- pipeResource[F]
91-
} yield (stdinPipe, stdoutPipe, stderrPipe)
86+
def createProcess(): F[NativeProcess] = F.blocking {
87+
Zone { implicit z =>
88+
val stdinPipe = stackalloc[CInt](2.toUInt)
89+
val stdoutPipe = stackalloc[CInt](2.toUInt)
90+
val stderrPipe = stackalloc[CInt](2.toUInt)
9291

93-
def createProcess(
94-
stdinPipe: (Int, Int),
95-
stdoutPipe: (Int, Int),
96-
stderrPipe: (Int, Int)
97-
): F[NativeProcess] = F.blocking {
98-
val nativeProcess: NativeProcess = Zone { implicit z =>
92+
if (pipe(stdinPipe) != 0 || pipe(stdoutPipe) != 0 || pipe(stderrPipe) != 0) {
93+
throw new RuntimeException("Failed to create pipes")
94+
}
9995
val envMap =
10096
if (process.inheritEnv)
10197
sys.env ++ process.extraEnv
10298
else process.extraEnv
10399

104-
val envp = stackalloc[CString]((envMap.size + 1).toUSize)
100+
val envp = stackalloc[CString]((envMap.size + 1).toULong)
105101
envMap.zipWithIndex.foreach { case ((k, v), i) =>
106-
envp(i.toUSize) = toCString(s"$k=$v")
102+
envp(i.toULong) = toCString(s"$k=$v")
107103
}
108-
envp(envMap.size.toUSize) = null
104+
envp(envMap.size.toULong) = null
109105

110106
val allArgs = process.command +: process.args
111-
val argv = stackalloc[CString](allArgs.length.toUSize + 1.toUSize)
107+
val argv = stackalloc[CString](allArgs.length.toULong + 1.toULong)
112108
allArgs.zipWithIndex.foreach { case (arg, i) =>
113-
argv(i.toUSize) = toCString(arg)
109+
argv(i.toULong) = toCString(arg)
114110
}
115-
argv(allArgs.length.toUSize) = null
111+
argv(allArgs.length.toULong) = null
116112

117-
val executable =
118-
if (process.command.startsWith("/"))
119-
process.command
120-
else
121-
findExecutable(process.command).getOrElse(process.command)
122-
val ret = guard(fork())
123-
ret match {
113+
val executable = findExecutable(process.command).getOrElse(process.command)
114+
115+
fork() match {
116+
case -1 =>
117+
closeAll(
118+
stdinPipe(0),
119+
stdinPipe(1),
120+
stdoutPipe(0),
121+
stdoutPipe(1),
122+
stderrPipe(0),
123+
stderrPipe(1)
124+
)
125+
throw new IOException("Unable to fork process")
124126
case 0 =>
125-
closeAll(stdinPipe._2, stdoutPipe._1, stderrPipe._1)
126-
guard_(dup2(stdinPipe._1, STDIN_FILENO))
127-
guard_(dup2(stdoutPipe._2, STDOUT_FILENO))
128-
guard_(dup2(stderrPipe._2, STDERR_FILENO))
129-
closeAll(stdinPipe._1, stdoutPipe._2, stderrPipe._2)
127+
closeAll(stdinPipe(1), stdoutPipe(0), stderrPipe(0))
128+
if (
129+
dup2(stdinPipe(0), STDIN_FILENO) == -1 ||
130+
dup2(stdoutPipe(1), STDOUT_FILENO) == -1 ||
131+
dup2(stderrPipe(1), STDERR_FILENO) == -1
132+
) {
133+
_exit(1)
134+
throw new IOException("Unable to redirect file descriptors")
135+
}
136+
closeAll(stdinPipe(0), stdoutPipe(1), stderrPipe(1))
130137

131138
process.workingDirectory.foreach { dir =>
132139
if ((dir != null) && (dir.toString != ".")) {
133-
guard_(chdir(toCString(dir.toString)))
140+
val ret = chdir(toCString(dir.toString))
141+
if (ret != 0)
142+
throw new IOException(s"Failed to chdir to ${dir.toString}")
134143
}
135144
}
136145

137146
execve(toCString(executable), argv, envp)
138147
_exit(127)
139-
throw new AssertionError("unreachable")
148+
throw new IOException(s"Failed to create process for command: ${process.command}")
140149
case pid =>
141-
closeAll(stdinPipe._1, stdoutPipe._2, stderrPipe._2)
142-
val pidfd =
143-
if (LinktimeInfo.isLinux) {
144-
val fd = LibC.pidfd_open(pid, 0)
145-
if (fd >= 0) Some(fd) else None
146-
} else None
150+
closeAll(stdinPipe(0), stdoutPipe(1), stderrPipe(1))
147151
NativeProcess(
148152
pid = pid,
149-
stdinFd = stdinPipe._2,
150-
stdoutFd = stdoutPipe._1,
151-
stderrFd = stderrPipe._1,
152-
pidfd
153+
stdinFd = stdinPipe(1),
154+
stdoutFd = stdoutPipe(0),
155+
stderrFd = stderrPipe(0)
153156
)
154157
}
155-
}: NativeProcess
156-
nativeProcess
158+
}
157159
}
158160

159161
def cleanup(proc: NativeProcess): F[Unit] =
@@ -175,90 +177,101 @@ private[process] trait ProcessesCompanionPlatform extends ProcessesCompanionJvmN
175177
}
176178
}
177179

178-
pipesResource.flatMap { case (stdinPipe, stdoutPipe, stderrPipe) =>
179-
Resource
180-
.make(createProcess(stdinPipe, stdoutPipe, stderrPipe))(cleanup)
181-
.flatMap { nativeProcess =>
182-
nativeProcess.pidfd match {
183-
case Some(pidfd) =>
184-
for {
185-
poller <- Resource.eval(fileDescriptorPoller[F])
186-
handle <- poller.registerFileDescriptor(pidfd, true, false).mapK(LiftIO.liftK)
187-
} yield (nativeProcess, Some(handle))
188-
case None =>
189-
Resource.pure((nativeProcess, None))
190-
}
180+
Resource.make(createProcess())(cleanup).map { nativeProcess =>
181+
new UnsealedProcess[F] {
182+
def isAlive: F[Boolean] = F.delay {
183+
kill(nativeProcess.pid, 0) == 0 || errno.errno != EPERM
191184
}
192-
.map { case (nativeProcess, pollHandleOpt) =>
193-
new UnsealedProcess[F] {
194-
def isAlive: F[Boolean] = F.delay {
195-
kill(nativeProcess.pid, 0) == 0 || errno.errno == EPERM
196-
}
197185

198-
def exitValue: F[Int] =
199-
if (LinktimeInfo.isLinux) {
200-
(nativeProcess.pidfd, pollHandleOpt) match {
201-
case (Some(_), Some(handle)) =>
202-
handle
203-
.pollReadRec(()) { _ =>
186+
def exitValue: F[Int] =
187+
if (LinktimeInfo.isLinux) {
188+
F.delay(pidFd.pidfd_open(nativeProcess.pid, 0)).flatMap { pidfd =>
189+
if (pidfd >= 0) {
190+
fileDescriptorPoller[F].flatMap { poller =>
191+
poller
192+
.registerFileDescriptor(pidfd, true, false)
193+
.use { handle =>
194+
handle.pollReadRec(()) { _ =>
204195
IO {
205196
val statusPtr = stackalloc[CInt]()
206197
val result = waitpid(nativeProcess.pid, statusPtr, WNOHANG)
198+
207199
if (result == nativeProcess.pid) {
208200
val exitCode = WEXITSTATUS(!statusPtr)
209201
Right(exitCode)
210202
} else if (result == 0) {
211203
Left(())
212204
} else {
213-
if (errno.errno == ECHILD)
205+
if (errno.errno == ECHILD) {
214206
throw new IOException("No such process")
215-
else
207+
} else {
216208
throw new IOException(
217209
s"waitpid failed with errno: ${errno.errno}"
218210
)
211+
}
219212
}
220213
}
221214
}
222-
.to
223-
case _ =>
224-
fallbackExitValue(nativeProcess.pid)
215+
}
216+
.to
225217
}
226218
} else {
227219
fallbackExitValue(nativeProcess.pid)
228220
}
229-
230-
def stdin: Pipe[F, Byte, Nothing] = { in =>
231-
in
232-
.through(writeFd(nativeProcess.stdinFd))
233-
.onFinalize {
234-
F.blocking {
235-
close(nativeProcess.stdinFd)
236-
}.void
237-
}
238221
}
222+
} else {
223+
fallbackExitValue(nativeProcess.pid)
224+
}
239225

240-
def stdout: Stream[F, Byte] = readFd(nativeProcess.stdoutFd, 8192)
241-
.onFinalize {
242-
F.blocking {
243-
close(nativeProcess.stdoutFd)
244-
}.void
245-
}
226+
def stdin: Pipe[F, Byte, Nothing] = { in =>
227+
in
228+
.through(writeFd(nativeProcess.stdinFd))
229+
.onFinalize {
230+
F.blocking {
231+
close(nativeProcess.stdinFd)
232+
}.void
233+
}
234+
}
235+
def stdout: Stream[F, Byte] = readFd(nativeProcess.stdoutFd, 8192)
236+
.onFinalize {
237+
F.blocking {
238+
close(nativeProcess.stdoutFd)
239+
}.void
240+
}
246241

247-
def stderr: Stream[F, Byte] = readFd(nativeProcess.stderrFd, 8192)
248-
.onFinalize {
249-
F.blocking {
250-
close(nativeProcess.stderrFd)
251-
}.void
252-
}
242+
def stderr: Stream[F, Byte] = readFd(nativeProcess.stderrFd, 8192)
243+
.onFinalize {
244+
F.blocking {
245+
close(nativeProcess.stderrFd)
246+
}.void
253247
}
254-
}
248+
}
255249
}
256250
}
257251

258-
private def fallbackExitValue(pid: pid_t): F[Int] = F.blocking {
259-
val status = stackalloc[CInt]()
260-
guard_(waitpid(pid, status, 0))
261-
WEXITSTATUS(!status)
252+
private def fallbackExitValue(pid: pid_t): F[Int] = {
253+
def loop: F[Int] =
254+
F.blocking {
255+
Zone { _ =>
256+
val status = stackalloc[CInt]()
257+
val result = waitpid(pid, status, WNOHANG)
258+
259+
if (result == pid) {
260+
Some(WEXITSTATUS(!status))
261+
} else if (result == 0) None
262+
else throw new IOException(s"waitpid failed with errno: ${errno.errno}")
263+
}
264+
}.flatMap {
265+
case Some(code) => F.pure(code)
266+
case None => F.sleep(10.millis) >> loop
267+
}
268+
269+
loop.onCancel {
270+
F.blocking {
271+
kill(pid, SIGKILL)
272+
()
273+
}
274+
}
262275
}
263276
}
264277
} else super.forAsync[F]

0 commit comments

Comments
 (0)