Skip to content

Commit dec01d4

Browse files
committed
refactored cleanup and exitvalue
1 parent 00ac3e5 commit dec01d4

File tree

1 file changed

+99
-122
lines changed

1 file changed

+99
-122
lines changed

io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala

Lines changed: 99 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,8 @@ import scala.scalanative.meta.LinktimeInfo
3434
import scala.scalanative.posix.unistd.*
3535
import scala.scalanative.posix.signal.*
3636
import java.io.IOException
37-
import scala.concurrent.duration.*
3837
import cats.effect.LiftIO
3938
import cats.effect.IO
40-
import cats.effect.implicits.*
4139
import org.typelevel.scalaccompat.annotation._
4240

4341
@extern
@@ -63,122 +61,121 @@ final case class NativeProcess(
6361
)
6462

6563
private[process] trait ProcessesCompanionPlatform extends Processesjvmnative {
64+
65+
private def findExecutable(cmd: String)(implicit z: Zone): Option[String] = {
66+
val pathEnv = sys.env.getOrElse("PATH", "")
67+
pathEnv
68+
.split(':')
69+
.find { dir =>
70+
val full = s"$dir/$cmd"
71+
access(toCString(full), X_OK) == 0
72+
}
73+
.map(dir => s"$dir/$cmd")
74+
}
75+
76+
@inline private def closeAll(fds: Int*): Unit =
77+
fds.foreach(fd => if (fd >= 0) close(fd))
78+
6679
def forAsync[F[_]: LiftIO](implicit F: Async[F]): Processes[F] =
6780
if (LinktimeInfo.isMac || LinktimeInfo.isLinux) {
6881
new UnsealedProcesses[F] {
69-
7082
def spawn(process: ProcessBuilder): Resource[F, Process[F]] = {
7183

7284
def createProcess(): F[NativeProcess] = F.blocking {
7385
Zone { implicit z =>
74-
def findExecutable(command: String): Option[String] = {
75-
val pathEnv = sys.env.get("PATH").getOrElse("")
76-
val paths = pathEnv.split(":").toList
77-
78-
paths
79-
.find { dir =>
80-
val fullPath = s"$dir/$command"
81-
access(toCString(fullPath), X_OK) == 0
82-
}
83-
.map(dir => s"$dir/$command")
84-
85-
}
86-
87-
val envMap =
88-
if (process.inheritEnv)
89-
sys.env ++ process.extraEnv
90-
else process.extraEnv
91-
92-
val executable =
93-
if (process.command.contains("/")) {
94-
process.command
95-
} else {
96-
findExecutable(process.command).getOrElse(process.command)
97-
}
9886
val stdinPipe = stackalloc[CInt](2.toUInt)
9987
val stdoutPipe = stackalloc[CInt](2.toUInt)
10088
val stderrPipe = stackalloc[CInt](2.toUInt)
10189

10290
if (pipe(stdinPipe) != 0 || pipe(stdoutPipe) != 0 || pipe(stderrPipe) != 0) {
10391
throw new RuntimeException("Failed to create pipes")
10492
}
93+
val envMap =
94+
if (process.inheritEnv)
95+
sys.env ++ process.extraEnv
96+
else process.extraEnv
10597

106-
val pid = fork()
107-
if (pid < 0) {
108-
close(stdinPipe(0)); close(stdinPipe(1))
109-
close(stdoutPipe(0)); close(stdoutPipe(1))
110-
close(stderrPipe(0)); close(stderrPipe(1))
111-
throw new RuntimeException("fork failed")
112-
} else if (pid == 0) {
113-
close(stdinPipe(1))
114-
close(stdoutPipe(0))
115-
close(stderrPipe(0))
98+
val envp = stackalloc[CString]((envMap.size + 1).toULong)
99+
envMap.zipWithIndex.foreach { case ((k, v), i) =>
100+
envp(i.toULong) = toCString(s"$k=$v")
101+
}
102+
envp(envMap.size.toULong) = null
116103

117-
if (
118-
dup2(stdinPipe(0), STDIN_FILENO) == -1 ||
119-
dup2(stdoutPipe(1), STDOUT_FILENO) == -1 ||
120-
dup2(stderrPipe(1), STDERR_FILENO) == -1
121-
) {
122-
_exit(1)
123-
}
104+
val allArgs = process.command +: process.args
105+
val argv = stackalloc[CString](allArgs.length.toULong + 1.toULong)
106+
allArgs.zipWithIndex.foreach { case (arg, i) =>
107+
argv(i.toULong) = toCString(arg)
108+
}
109+
argv(allArgs.length.toULong) = null
124110

125-
close(stdinPipe(0))
126-
close(stdoutPipe(1))
127-
close(stderrPipe(1))
111+
val executable = findExecutable(process.command).getOrElse(process.command)
128112

129-
process.workingDirectory.foreach { dir =>
130-
if (chdir(toCString(dir.toString)) != 0) {
113+
fork() match {
114+
case -1 =>
115+
closeAll(
116+
stdinPipe(0),
117+
stdinPipe(1),
118+
stdoutPipe(0),
119+
stdoutPipe(1),
120+
stderrPipe(0),
121+
stderrPipe(1)
122+
)
123+
throw new IOException("Unable to fork process")
124+
case 0 =>
125+
closeAll(stdinPipe(1), stdoutPipe(0), stderrPipe(0))
126+
if (
127+
dup2(stdinPipe(0), STDIN_FILENO) == -1 ||
128+
dup2(stdoutPipe(1), STDOUT_FILENO) == -1 ||
129+
dup2(stderrPipe(1), STDERR_FILENO) == -1
130+
) {
131131
_exit(1)
132+
throw new IOException("Unable to redirect file descriptors")
132133
}
133-
}
134+
closeAll(stdinPipe(0), stdoutPipe(1), stderrPipe(1))
134135

135-
val allArgs = process.command +: process.args
136-
val argv = stackalloc[CString](allArgs.length.toULong + 1.toULong)
137-
allArgs.zipWithIndex.foreach { case (arg, i) =>
138-
argv(i.toULong) = toCString(arg)
139-
}
140-
argv(allArgs.length.toULong) = null
141-
142-
val envp = stackalloc[CString]((envMap.size + 1).toULong)
143-
envMap.zipWithIndex.foreach { case ((k, v), i) =>
144-
envp(i.toULong) = toCString(s"$k=$v")
145-
}
146-
envp(envMap.size.toULong) = null
136+
process.workingDirectory.foreach { dir =>
137+
if ((dir != null) && (dir.toString != "."))
138+
chdir(toCString(dir.toString))
139+
}
147140

148-
execve(toCString(executable), argv, envp)
149-
_exit(1)
150-
throw new RuntimeException(s"execve failed")
151-
} else {
152-
close(stdinPipe(0))
153-
close(stdoutPipe(1))
154-
close(stderrPipe(1))
155-
NativeProcess(
156-
pid = pid,
157-
stdinFd = stdinPipe(1),
158-
stdoutFd = stdoutPipe(0),
159-
stderrFd = stderrPipe(0)
160-
)
141+
execve(toCString(executable), argv, envp)
142+
_exit(127)
143+
throw new IOException(s"Failed to create process for command: ${process.command}")
144+
case pid =>
145+
closeAll(stdinPipe(0), stdoutPipe(1), stderrPipe(1))
146+
NativeProcess(
147+
pid = pid,
148+
stdinFd = stdinPipe(1),
149+
stdoutFd = stdoutPipe(0),
150+
stderrFd = stderrPipe(0)
151+
)
161152
}
162153
}
163154
}
164155

165156
def cleanup(proc: NativeProcess): F[Unit] =
166157
F.blocking {
167-
close(proc.stdinFd); close(proc.stdoutFd); close(proc.stderrFd)
168-
} *>
169-
F.delay(kill(proc.pid, SIGKILL)) *>
170-
F.blocking {
158+
closeAll(proc.stdinFd, proc.stdoutFd, proc.stderrFd)
159+
val alive = {
160+
val res = kill(proc.pid, 0)
161+
res == 0 || errno.errno == EPERM
162+
}
163+
if (alive) {
164+
kill(proc.pid, SIGKILL)
165+
val status = stackalloc[CInt]()
166+
waitpid(proc.pid, status, 0)
167+
()
168+
} else {
171169
val status = stackalloc[CInt]()
172-
val r = waitpid(proc.pid, status, WNOHANG)
173-
if (r < 0 && errno.errno != ECHILD)
174-
throw new RuntimeException(s"waitpid failed: errno=${errno.errno}")
170+
waitpid(proc.pid, status, WNOHANG)
175171
()
176172
}
173+
}
177174

178175
Resource.make(createProcess())(cleanup).map { nativeProcess =>
179176
new UnsealedProcess[F] {
180177
def isAlive: F[Boolean] = F.delay {
181-
kill(nativeProcess.pid, 0) == 0 || errno.errno != ESRCH
178+
kill(nativeProcess.pid, 0) == 0 || errno.errno != EPERM
182179
}
183180

184181
def exitValue: F[Int] =
@@ -191,23 +188,21 @@ private[process] trait ProcessesCompanionPlatform extends Processesjvmnative {
191188
.use { handle =>
192189
handle.pollReadRec(()) { _ =>
193190
IO {
194-
Zone { _ =>
195-
val statusPtr = stackalloc[CInt]()
196-
val result = waitpid(nativeProcess.pid, statusPtr, WNOHANG)
191+
val statusPtr = stackalloc[CInt]()
192+
val result = waitpid(nativeProcess.pid, statusPtr, WNOHANG)
197193

198-
if (result == nativeProcess.pid) {
199-
val exitCode = WEXITSTATUS(!statusPtr)
200-
Right(exitCode)
201-
} else if (result == 0) {
202-
Left(())
194+
if (result == nativeProcess.pid) {
195+
val exitCode = WEXITSTATUS(!statusPtr)
196+
Right(exitCode)
197+
} else if (result == 0) {
198+
Left(())
199+
} else {
200+
if (errno.errno == ECHILD) {
201+
throw new IOException("No such process")
203202
} else {
204-
if (errno.errno == ECHILD) {
205-
throw new IOException("No such process")
206-
} else {
207-
throw new IOException(
208-
s"waitpid failed with errno: ${errno.errno}"
209-
)
210-
}
203+
throw new IOException(
204+
s"waitpid failed with errno: ${errno.errno}"
205+
)
211206
}
212207
}
213208
}
@@ -249,31 +244,13 @@ private[process] trait ProcessesCompanionPlatform extends Processesjvmnative {
249244
}
250245
}
251246

252-
private def fallbackExitValue(pid: pid_t): F[Int] = {
253-
def loop: F[Int] =
254-
F.blocking {
255-
Zone { _ =>
256-
val status = stackalloc[CInt]()
257-
val result = waitpid(pid, status, WNOHANG)
258-
259-
if (result == pid) {
260-
Some(WEXITSTATUS(!status))
261-
} else if (result == 0) None
262-
else throw new IOException(s"waitpid failed with errno: ${errno.errno}")
263-
}
264-
}.flatMap {
265-
case Some(code) => F.pure(code)
266-
case None => F.sleep(100.millis) >> loop
267-
}
268-
269-
loop.onCancel {
270-
F.blocking {
271-
kill(pid, SIGKILL)
272-
()
273-
}
247+
private def fallbackExitValue(pid: pid_t): F[Int] =
248+
F.blocking {
249+
val status = stackalloc[CInt]()
250+
val result = waitpid(pid, status, 0)
251+
if (result == pid) WEXITSTATUS(!status)
252+
else throw new IOException(s"waitpid failed with errno: ${errno.errno}")
274253
}
275-
}
276-
277254
}
278255
} else super.forAsync[F]
279256
}

0 commit comments

Comments
 (0)