Skip to content

Commit db7696e

Browse files
committed
unguard ProcessSuit and add nowarn212
1 parent 6a34d00 commit db7696e

File tree

3 files changed

+92
-81
lines changed

3 files changed

+92
-81
lines changed

io/native/src/main/scala/fs2/io/ioplatform.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ private[fs2] trait ioplatform extends iojvmnative {
8080
.resource {
8181
Resource
8282
.eval {
83-
setNonBlocking(STDIN_FILENO) *> fileDescriptorPoller[F]
83+
setNonBlocking(fd) *> fileDescriptorPoller[F]
8484
}
8585
.flatMap { poller =>
8686
poller.registerFileDescriptor(fd, true, false).mapK(LiftIO.liftK)
@@ -92,7 +92,7 @@ private[fs2] trait ioplatform extends iojvmnative {
9292
.pollReadRec(()) { _ =>
9393
IO {
9494
val buf = new Array[Byte](bufSize)
95-
val readed = guard(read(STDIN_FILENO, buf.atUnsafe(0), bufSize.toULong))
95+
val readed = guard(read(fd, buf.atUnsafe(0), bufSize.toULong))
9696
if (readed > 0)
9797
Right(Some(Chunk.array(buf, 0, readed)))
9898
else if (readed == 0)

io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala

Lines changed: 36 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,13 @@ import scala.scalanative.posix.unistd.*
3636
import scala.scalanative.posix.spawn.*
3737
import scala.scalanative.posix.signal.*
3838
import java.io.IOException
39+
import scala.concurrent.duration.*
3940
import cats.effect.LiftIO
4041
import cats.effect.IO
42+
import org.typelevel.scalaccompat.annotation._
4143

4244
@extern
45+
@nowarn212("cat=unused")
4346
object SyscallBindings {
4447
def syscall(number: CLong, arg1: CLong, arg2: CLong): CLong = extern
4548
}
@@ -92,22 +95,19 @@ private[process] trait ProcessesCompanionPlatform {
9295
posix_spawn_file_actions_init(fileActions)
9396

9497
posix_spawn_file_actions_adddup2(fileActions, stdinPipe(0), STDIN_FILENO)
95-
posix_spawn_file_actions_adddup2(fileActions, stdoutPipe(1), STDOUT_FILENO)
96-
posix_spawn_file_actions_adddup2(fileActions, stderrPipe(1), STDERR_FILENO)
97-
9898
posix_spawn_file_actions_addclose(fileActions, stdinPipe(1))
99+
posix_spawn_file_actions_addclose(fileActions, stdinPipe(0))
100+
101+
posix_spawn_file_actions_adddup2(fileActions, stdoutPipe(1), STDOUT_FILENO)
99102
posix_spawn_file_actions_addclose(fileActions, stdoutPipe(0))
103+
posix_spawn_file_actions_addclose(fileActions, stdoutPipe(1))
104+
105+
posix_spawn_file_actions_adddup2(fileActions, stderrPipe(1), STDERR_FILENO)
100106
posix_spawn_file_actions_addclose(fileActions, stderrPipe(0))
107+
posix_spawn_file_actions_addclose(fileActions, stderrPipe(1))
101108

102109
val pid = stackalloc[pid_t]()
103-
println("hi")
104-
println("argv(0): " + fromCString(argv(0)))
105-
var i = 0
106-
while (argv(i) != null) {
107-
println(s" argv($i): " + fromCString(argv(i)))
108-
i += 1
109-
}
110-
val result = posix_spawn(
110+
val result = posix_spawnp(
111111
pid,
112112
argv(0),
113113
fileActions,
@@ -145,7 +145,7 @@ private[process] trait ProcessesCompanionPlatform {
145145
F.delay(kill(proc.pid, SIGKILL)) *>
146146
F.blocking {
147147
val status = stackalloc[CInt]()
148-
val r = waitpid(proc.pid, status, 0)
148+
val r = waitpid(proc.pid, status, WNOHANG)
149149
if (r < 0 && errno.errno != ECHILD)
150150
F.raiseError(new RuntimeException(s"waitpid failed: errno=${errno.errno}"))
151151
()
@@ -192,11 +192,11 @@ private[process] trait ProcessesCompanionPlatform {
192192
.to
193193
}
194194
} else {
195-
fallbackExitValue(nativeProcess.pid)
195+
fallbackExitValue(nativeProcess.pid).to
196196
}
197197
}
198198
} else {
199-
fallbackExitValue(nativeProcess.pid)
199+
fallbackExitValue(nativeProcess.pid).to
200200
}
201201

202202
def stdin: Pipe[F, Byte, Nothing] = writeFd(nativeProcess.stdinFd)
@@ -208,12 +208,29 @@ private[process] trait ProcessesCompanionPlatform {
208208
}
209209
}
210210

211-
private def fallbackExitValue(pid: pid_t): F[Int] = F.delay {
212-
val status = stackalloc[CInt]()
213-
val result = waitpid(pid, status, 0)
214-
if (result < 0) throw new IOException(s"waitpid failed with errno: ${errno.errno}")
215-
WEXITSTATUS(!status)
211+
private def fallbackExitValue(pid: pid_t): IO[Int] = {
212+
def loop: IO[Int] =
213+
IO.blocking {
214+
Zone { _ =>
215+
val status = stackalloc[CInt]()
216+
val result = waitpid(pid, status, WNOHANG)
217+
if (result == pid) Some(WEXITSTATUS(!status))
218+
else if (result == 0) None
219+
else throw new IOException(s"waitpid failed with errno: ${errno.errno}")
220+
}
221+
}.flatMap {
222+
case Some(code) => IO.pure(code)
223+
case None => IO.sleep(100.millis) >> loop
224+
}
225+
226+
loop.onCancel {
227+
IO.blocking {
228+
kill(pid, SIGKILL)
229+
()
230+
}
231+
}
216232
}
233+
217234
}
218235
}
219236

io/shared/src/test/scala/fs2/io/process/ProcessSuite.scala

Lines changed: 54 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -72,20 +72,19 @@ class ProcessSuite extends Fs2Suite {
7272
}
7373
}
7474

75-
if (!isNative)
76-
test("cat") {
77-
ProcessBuilder("cat").spawn[IO].use { p =>
78-
val verySpecialMsg = "FS2 rocks!"
79-
val in = Stream.emit(verySpecialMsg).through(fs2.text.utf8.encode).through(p.stdin)
80-
val out = p.stdout.through(fs2.text.utf8.decode)
81-
82-
out
83-
.concurrently(in)
84-
.compile
85-
.string
86-
.assertEquals(verySpecialMsg)
87-
}
75+
test("cat") {
76+
ProcessBuilder("cat").spawn[IO].use { p =>
77+
val verySpecialMsg = "FS2 rocks!"
78+
val in = Stream.emit(verySpecialMsg).through(fs2.text.utf8.encode).through(p.stdin)
79+
val out = p.stdout.through(fs2.text.utf8.decode)
80+
81+
out
82+
.concurrently(in)
83+
.compile
84+
.string
85+
.assertEquals(verySpecialMsg)
8886
}
87+
}
8988

9089
test("working directory") {
9190
Files[IO].tempDirectory.use { wd0 =>
@@ -125,60 +124,55 @@ class ProcessSuite extends Fs2Suite {
125124
}
126125
}
127126

128-
if (!isNative)
129-
test("stdin cancelation") {
130-
ProcessBuilder("cat")
131-
.spawn[IO]
132-
.use { p =>
133-
Stream
134-
// apparently big enough to force `cat` to backpressure
135-
.emit(Chunk.array(new Array[Byte](1024 * 1024)))
136-
.unchunks
137-
.repeat
138-
.covary[IO]
139-
.through(p.stdin)
140-
.compile
141-
.drain
142-
}
143-
.timeoutTo(1.second, IO.unit) // assert that cancelation does not hang
144-
}
127+
test("stdin cancelation") {
128+
ProcessBuilder("cat")
129+
.spawn[IO]
130+
.use { p =>
131+
Stream
132+
// apparently big enough to force `cat` to backpressure
133+
.emit(Chunk.array(new Array[Byte](1024 * 1024)))
134+
.unchunks
135+
.repeat
136+
.covary[IO]
137+
.through(p.stdin)
138+
.compile
139+
.drain
140+
}
141+
.timeoutTo(1.second, IO.unit) // assert that cancelation does not hang
142+
}
145143

146-
if (!isNative)
147-
test("stdout cancelation") {
148-
ProcessBuilder("cat")
149-
.spawn[IO]
150-
.use(_.stdout.compile.drain)
151-
.timeoutTo(1.second, IO.unit) // assert that cancelation does not hang
152-
}
144+
test("stdout cancelation") {
145+
ProcessBuilder("cat")
146+
.spawn[IO]
147+
.use(_.stdout.compile.drain)
148+
.timeoutTo(1.second, IO.unit) // assert that cancelation does not hang
149+
}
153150

154-
if (!isNative)
155-
test("stderr cancelation") {
156-
ProcessBuilder("cat")
157-
.spawn[IO]
158-
.use(_.stderr.compile.drain)
159-
.timeoutTo(1.second, IO.unit) // assert that cancelation does not hang
160-
}
151+
test("stderr cancelation") {
152+
ProcessBuilder("cat")
153+
.spawn[IO]
154+
.use(_.stderr.compile.drain)
155+
.timeoutTo(1.second, IO.unit) // assert that cancelation does not hang
156+
}
161157

162-
if (!isNative)
163-
test("exit value cancelation") {
164-
ProcessBuilder("cat")
165-
.spawn[IO]
166-
.use(_.exitValue.void)
167-
.timeoutTo(1.second, IO.unit) // assert that cancelation does not hang
168-
}
158+
test("exit value cancelation") {
159+
ProcessBuilder("cat")
160+
.spawn[IO]
161+
.use(_.exitValue.void)
162+
.timeoutTo(1.second, IO.unit) // assert that cancelation does not hang
163+
}
169164

170-
if (!isNative)
171-
test("flush") {
172-
ProcessBuilder("cat").spawn[IO].use { p =>
173-
val in = (Stream.emit("all drains lead to the ocean") ++ Stream.never[IO])
174-
.through(fs2.text.utf8.encode)
175-
.through(p.stdin)
165+
test("flush") {
166+
ProcessBuilder("cat").spawn[IO].use { p =>
167+
val in = (Stream.emit("all drains lead to the ocean") ++ Stream.never[IO])
168+
.through(fs2.text.utf8.encode)
169+
.through(p.stdin)
176170

177-
val out = p.stdout.through(fs2.text.utf8.decode).exists(_.contains("ocean"))
171+
val out = p.stdout.through(fs2.text.utf8.decode).exists(_.contains("ocean"))
178172

179-
out.concurrently(in).compile.drain // will hang if not flushed
180-
}
173+
out.concurrently(in).compile.drain // will hang if not flushed
181174
}
175+
}
182176

183177
test("close stdin") {
184178
ProcessBuilder("dd", "count=1").spawn[IO].use { p =>

0 commit comments

Comments
 (0)