diff --git a/io/jvm-native/src/main/scala/fs2/io/process/ProcessesPlatform.scala b/io/jvm-native/src/main/scala/fs2/io/process/ProcessesCompanionJvmNative.scala similarity index 98% rename from io/jvm-native/src/main/scala/fs2/io/process/ProcessesPlatform.scala rename to io/jvm-native/src/main/scala/fs2/io/process/ProcessesCompanionJvmNative.scala index 0c9f5452bf..d7e2da2d55 100644 --- a/io/jvm-native/src/main/scala/fs2/io/process/ProcessesPlatform.scala +++ b/io/jvm-native/src/main/scala/fs2/io/process/ProcessesCompanionJvmNative.scala @@ -30,7 +30,7 @@ import fs2.io.CollectionCompat.* import java.lang -private[process] trait ProcessesCompanionPlatform { +private[process] trait ProcessesCompanionJvmNative { def forAsync[F[_]](implicit F: Async[F]): Processes[F] = new UnsealedProcesses[F] { def spawn(process: ProcessBuilder): Resource[F, Process[F]] = diff --git a/io/jvm/src/main/scala/fs2/io/process/ProcessesPlatform.scala b/io/jvm/src/main/scala/fs2/io/process/ProcessesPlatform.scala new file mode 100644 index 0000000000..7bd3f693c1 --- /dev/null +++ b/io/jvm/src/main/scala/fs2/io/process/ProcessesPlatform.scala @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2013 Functional Streams for Scala + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2 +package io +package process + +private[process] trait ProcessesCompanionPlatform extends ProcessesCompanionJvmNative diff --git a/io/native/src/main/scala/fs2/io/ioplatform.scala b/io/native/src/main/scala/fs2/io/ioplatform.scala index 919246af1c..57473b3cd0 100644 --- a/io/native/src/main/scala/fs2/io/ioplatform.scala +++ b/io/native/src/main/scala/fs2/io/ioplatform.scala @@ -31,7 +31,6 @@ import cats.effect.kernel.Resource import cats.effect.kernel.Sync import cats.syntax.all._ import fs2.io.internal.NativeUtil._ - import java.io.OutputStream import java.nio.charset.Charset import java.nio.charset.StandardCharsets @@ -60,36 +59,7 @@ private[fs2] trait ioplatform extends iojvmnative { /** Stream of bytes read asynchronously from standard input. */ def stdin[F[_]: Async: LiftIO](bufSize: Int): Stream[F, Byte] = if (LinktimeInfo.isLinux || LinktimeInfo.isMac) - Stream - .resource { - Resource - .eval { - setNonBlocking(STDIN_FILENO) *> fileDescriptorPoller[F] - } - .flatMap { poller => - poller.registerFileDescriptor(STDIN_FILENO, true, false).mapK(LiftIO.liftK) - } - } - .flatMap { handle => - Stream.repeatEval { - handle - .pollReadRec(()) { _ => - IO { - val buf = new Array[Byte](bufSize) - val readed = guard(read(STDIN_FILENO, buf.atUnsafe(0), bufSize.toUSize)) - if (readed > 0) - Right(Some(Chunk.array(buf, 0, readed))) - else if (readed == 0) - Right(None) - else - Left(()) - } - } - .to - } - } - .unNoneTerminate - .unchunks + readFd(STDIN_FILENO, bufSize) else readInputStream(Sync[F].blocking(System.in), bufSize, false) @@ -107,7 +77,39 @@ private[fs2] trait ioplatform extends iojvmnative { else writeOutputStream(Sync[F].blocking(System.err), false) - private[this] def writeFd[F[_]: Async: LiftIO](fd: Int): Pipe[F, Byte, Nothing] = in => + private[fs2] def readFd[F[_]: Async: LiftIO](fd: Int, bufSize: Int): Stream[F, Byte] = + Stream + .resource { + Resource + .eval { + setNonBlocking(fd) *> fileDescriptorPoller[F] + } + .flatMap { poller => + poller.registerFileDescriptor(fd, true, false).mapK(LiftIO.liftK) + } + } + .flatMap { handle => + Stream.repeatEval { + handle + .pollReadRec(()) { _ => + IO { + val buf = new Array[Byte](bufSize) + val readed = guard(read(fd, buf.atUnsafe(0), bufSize.toUSize)) + if (readed > 0) + Right(Some(Chunk.array(buf, 0, readed))) + else if (readed == 0) + Right(None) + else + Left(()) + } + } + .to + } + } + .unNoneTerminate + .unchunks + + private[fs2] def writeFd[F[_]: Async: LiftIO](fd: Int): Pipe[F, Byte, Nothing] = in => Stream .resource { Resource diff --git a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala new file mode 100644 index 0000000000..cced898d4c --- /dev/null +++ b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala @@ -0,0 +1,270 @@ +/* + * Copyright (c) 2013 Functional Streams for Scala + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2 +package io +package process + +import cats.effect.kernel.{Async, Resource} +import cats.syntax.all.* +import scala.scalanative.unsafe.* +import scala.scalanative.unsigned.* +import scala.scalanative.libc.* +import scala.scalanative.posix.sys.wait.* +import scala.scalanative.posix.errno.{EPERM, ECHILD} +import scala.scalanative.meta.LinktimeInfo +import scala.scalanative.posix.unistd.* +import scala.scalanative.posix.signal.* +import java.io.IOException +import cats.effect.LiftIO +import cats.effect.IO +import org.typelevel.scalaccompat.annotation._ +import fs2.io.internal.NativeUtil._ +import cats.effect.unsafe.KqueueSystem.Kqueue + +@extern +@nowarn212("cat=unused") +object LibC { + def pidfd_open(pid: CInt, flags: CInt): CInt = extern +} + +private final case class NativeProcess( + pid: pid_t, + stdinFd: Int, + stdoutFd: Int, + stderrFd: Int, + pidfd: Option[Int] = None +) + +private[process] trait ProcessesCompanionPlatform extends ProcessesCompanionJvmNative { + + private def findExecutable(cmd: String)(implicit z: Zone): Option[String] = { + val pathEnv = sys.env.getOrElse("PATH", "") + pathEnv + .split(':') + .find { dir => + val full = s"$dir/$cmd" + access(toCString(full), X_OK) == 0 + } + .map(dir => s"$dir/$cmd") + } + + @inline private def closeAll(fds: Int*): Unit = + fds.foreach(close) + + def pipeResource[F[_]](implicit F: Async[F]): Resource[F, (Int, Int)] = + Resource.make { + F.blocking { + val fds = stackalloc[CInt](2.toUInt) + guard_(pipe(fds)) + (fds(0), fds(1)) + } + } { case (r, w) => F.blocking { close(r); close(w); () } } + + def forAsync[F[_]: LiftIO](implicit F: Async[F]): Processes[F] = + if (LinktimeInfo.isMac || LinktimeInfo.isLinux) { + new UnsealedProcesses[F] { + def spawn(process: ProcessBuilder): Resource[F, Process[F]] = { + + val pipesResource: Resource[F, ((Int, Int), (Int, Int), (Int, Int))] = + for { + stdinPipe <- pipeResource[F] + stdoutPipe <- pipeResource[F] + stderrPipe <- pipeResource[F] + } yield (stdinPipe, stdoutPipe, stderrPipe) + + def createProcess( + stdinPipe: (Int, Int), + stdoutPipe: (Int, Int), + stderrPipe: (Int, Int) + ): F[NativeProcess] = F.blocking { + Zone.acquire { implicit z => + val envMap = + if (process.inheritEnv) + sys.env ++ process.extraEnv + else process.extraEnv + + val envp = stackalloc[CString]((envMap.size + 1).toUSize) + envMap.zipWithIndex.foreach { case ((k, v), i) => + envp(i.toUSize) = toCString(s"$k=$v") + } + envp(envMap.size.toUSize) = null + + val allArgs = process.command +: process.args + val argv = stackalloc[CString](allArgs.length.toUSize + 1.toUSize) + allArgs.zipWithIndex.foreach { case (arg, i) => + argv(i.toUSize) = toCString(arg) + } + argv(allArgs.length.toUSize) = null + + val executable = + if (process.command.startsWith("/")) + process.command + else + findExecutable(process.command).getOrElse(process.command) + val ret = guard(fork()) + ret match { + case 0 => + closeAll(stdinPipe._2, stdoutPipe._1, stderrPipe._1) + guard_(dup2(stdinPipe._1, STDIN_FILENO)) + guard_(dup2(stdoutPipe._2, STDOUT_FILENO)) + guard_(dup2(stderrPipe._2, STDERR_FILENO)) + closeAll(stdinPipe._1, stdoutPipe._2, stderrPipe._2) + + process.workingDirectory.foreach { dir => + if ((dir != null) && (dir.toString != ".")) { + guard_(chdir(toCString(dir.toString))) + } + } + + execve(toCString(executable), argv, envp) + _exit(127) + throw new AssertionError("unreachable") + case pid => + closeAll(stdinPipe._1, stdoutPipe._2, stderrPipe._2) + val pidfd = + if (LinktimeInfo.isLinux) { + val fd = LibC.pidfd_open(pid, 0) + if (fd >= 0) Some(fd) else None + } else None + NativeProcess( + pid = pid, + stdinFd = stdinPipe._2, + stdoutFd = stdoutPipe._1, + stderrFd = stderrPipe._1, + pidfd + ) + } + } + } + + def cleanup(proc: NativeProcess): F[Unit] = + F.blocking { + closeAll(proc.stdinFd, proc.stdoutFd, proc.stderrFd) + val alive = { + val res = kill(proc.pid, 0) + res == 0 || errno.errno == EPERM + } + if (alive) { + kill(proc.pid, SIGKILL) + val status = stackalloc[CInt]() + waitpid(proc.pid, status, 0) + () + } else { + val status = stackalloc[CInt]() + waitpid(proc.pid, status, WNOHANG) + () + } + } + + pipesResource.flatMap { case (stdinPipe, stdoutPipe, stderrPipe) => + Resource + .make(createProcess(stdinPipe, stdoutPipe, stderrPipe))(cleanup) + .flatMap { nativeProcess => + nativeProcess.pidfd match { + case Some(pidfd) => + for { + poller <- Resource.eval(fileDescriptorPoller[F]) + handle <- poller.registerFileDescriptor(pidfd, true, false).mapK(LiftIO.liftK) + } yield (nativeProcess, Some(handle)) + case None => + Resource.pure((nativeProcess, None)) + } + } + .map { case (nativeProcess, pollHandleOpt) => + new UnsealedProcess[F] { + def isAlive: F[Boolean] = F.delay { + kill(nativeProcess.pid, 0) == 0 || errno.errno == EPERM + } + + def exitValue: F[Int] = + if (LinktimeInfo.isLinux) { + (nativeProcess.pidfd, pollHandleOpt) match { + case (Some(_), Some(handle)) => + handle + .pollReadRec(()) { _ => + IO { + val statusPtr = stackalloc[CInt]() + val result = waitpid(nativeProcess.pid, statusPtr, WNOHANG) + if (result == nativeProcess.pid) { + val exitCode = WEXITSTATUS(!statusPtr) + Right(exitCode) + } else if (result == 0) { + Left(()) + } else { + if (errno.errno == ECHILD) + throw new IOException("No such process") + else + throw new IOException( + s"waitpid failed with errno: ${errno.errno}" + ) + } + } + } + .to + case _ => + fallbackExitValue(nativeProcess.pid) + } + } else { + fileDescriptorPoller[F] match { + case kq: Kqueue => + kq.awaitEvent(nativeProcess.pid, -5, 0x0005, 0x80000000).to.map(_.toInt) + case _ => fallbackExitValue(nativeProcess.pid) + } + } + + def stdin: Pipe[F, Byte, Nothing] = { in => + in + .through(writeFd(nativeProcess.stdinFd)) + .onFinalize { + F.blocking { + close(nativeProcess.stdinFd) + }.void + } + } + + def stdout: Stream[F, Byte] = readFd(nativeProcess.stdoutFd, 8192) + .onFinalize { + F.blocking { + close(nativeProcess.stdoutFd) + }.void + } + + def stderr: Stream[F, Byte] = readFd(nativeProcess.stderrFd, 8192) + .onFinalize { + F.blocking { + close(nativeProcess.stderrFd) + }.void + } + } + } + } + } + + private def fallbackExitValue(pid: pid_t): F[Int] = F.delay { + val status = stackalloc[CInt]() + guard_(waitpid(pid, status, 0)) + WEXITSTATUS(!status) + } + + } + } else super.forAsync[F] +} diff --git a/io/shared/src/test/scala/fs2/io/process/ProcessSuite.scala b/io/shared/src/test/scala/fs2/io/process/ProcessSuite.scala index 118aa0edd1..9d1bdb2943 100644 --- a/io/shared/src/test/scala/fs2/io/process/ProcessSuite.scala +++ b/io/shared/src/test/scala/fs2/io/process/ProcessSuite.scala @@ -72,20 +72,19 @@ class ProcessSuite extends Fs2Suite { } } - if (!isNative) - test("cat") { - ProcessBuilder("cat").spawn[IO].use { p => - val verySpecialMsg = "FS2 rocks!" - val in = Stream.emit(verySpecialMsg).through(fs2.text.utf8.encode).through(p.stdin) - val out = p.stdout.through(fs2.text.utf8.decode) - - out - .concurrently(in) - .compile - .string - .assertEquals(verySpecialMsg) - } + test("cat") { + ProcessBuilder("cat").spawn[IO].use { p => + val verySpecialMsg = "FS2 rocks!" + val in = Stream.emit(verySpecialMsg).through(fs2.text.utf8.encode).through(p.stdin) + val out = p.stdout.through(fs2.text.utf8.decode) + + out + .concurrently(in) + .compile + .string + .assertEquals(verySpecialMsg) } + } test("working directory") { Files[IO].tempDirectory.use { wd0 => @@ -125,60 +124,55 @@ class ProcessSuite extends Fs2Suite { } } - if (!isNative) - test("stdin cancelation") { - ProcessBuilder("cat") - .spawn[IO] - .use { p => - Stream - // apparently big enough to force `cat` to backpressure - .emit(Chunk.array(new Array[Byte](1024 * 1024))) - .unchunks - .repeat - .covary[IO] - .through(p.stdin) - .compile - .drain - } - .timeoutTo(1.second, IO.unit) // assert that cancelation does not hang - } + test("stdin cancelation") { + ProcessBuilder("cat") + .spawn[IO] + .use { p => + Stream + // apparently big enough to force `cat` to backpressure + .emit(Chunk.array(new Array[Byte](1024 * 1024))) + .unchunks + .repeat + .covary[IO] + .through(p.stdin) + .compile + .drain + } + .timeoutTo(1.second, IO.unit) // assert that cancelation does not hang + } - if (!isNative) - test("stdout cancelation") { - ProcessBuilder("cat") - .spawn[IO] - .use(_.stdout.compile.drain) - .timeoutTo(1.second, IO.unit) // assert that cancelation does not hang - } + test("stdout cancelation") { + ProcessBuilder("cat") + .spawn[IO] + .use(_.stdout.compile.drain) + .timeoutTo(1.second, IO.unit) // assert that cancelation does not hang + } - if (!isNative) - test("stderr cancelation") { - ProcessBuilder("cat") - .spawn[IO] - .use(_.stderr.compile.drain) - .timeoutTo(1.second, IO.unit) // assert that cancelation does not hang - } + test("stderr cancelation") { + ProcessBuilder("cat") + .spawn[IO] + .use(_.stderr.compile.drain) + .timeoutTo(1.second, IO.unit) // assert that cancelation does not hang + } - if (!isNative) - test("exit value cancelation") { - ProcessBuilder("cat") - .spawn[IO] - .use(_.exitValue.void) - .timeoutTo(1.second, IO.unit) // assert that cancelation does not hang - } + test("exit value cancelation") { + ProcessBuilder("cat") + .spawn[IO] + .use(_.exitValue.void) + .timeoutTo(1.second, IO.unit) // assert that cancelation does not hang + } - if (!isNative) - test("flush") { - ProcessBuilder("cat").spawn[IO].use { p => - val in = (Stream.emit("all drains lead to the ocean") ++ Stream.never[IO]) - .through(fs2.text.utf8.encode) - .through(p.stdin) + test("flush") { + ProcessBuilder("cat").spawn[IO].use { p => + val in = (Stream.emit("all drains lead to the ocean") ++ Stream.never[IO]) + .through(fs2.text.utf8.encode) + .through(p.stdin) - val out = p.stdout.through(fs2.text.utf8.decode).exists(_.contains("ocean")) + val out = p.stdout.through(fs2.text.utf8.decode).exists(_.contains("ocean")) - out.concurrently(in).compile.drain // will hang if not flushed - } + out.concurrently(in).compile.drain // will hang if not flushed } + } test("close stdin") { ProcessBuilder("dd", "count=1").spawn[IO].use { p =>