Skip to content

Commit 773351b

Browse files
committed
Use virtual threads in UnixSocketsPlatform
1 parent bf021a7 commit 773351b

File tree

1 file changed

+19
-11
lines changed

1 file changed

+19
-11
lines changed

io/jvm/src/main/scala/fs2/io/net/unixsocket/UnixSocketsPlatform.scala

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import com.comcast.ip4s.{IpAddress, SocketAddress}
3131
import fs2.{Chunk, Stream}
3232
import fs2.io.file.{Files, Path}
3333
import fs2.io.net.Socket
34+
import fs2.io.evalOnVirtualThreadIfAvailable
3435
import java.nio.ByteBuffer
3536
import java.nio.channels.SocketChannel
3637

@@ -101,12 +102,15 @@ private[unixsocket] trait UnixSocketsCompanionPlatform {
101102
extends Socket.BufferedReads[F](readMutex) {
102103

103104
def readChunk(buff: ByteBuffer): F[Int] =
104-
F.blocking(ch.read(buff)).cancelable(close)
105+
evalOnVirtualThreadIfAvailable(F.blocking(ch.read(buff)))
106+
.cancelable(close)
105107

106108
def write(bytes: Chunk[Byte]): F[Unit] = {
107109
def go(buff: ByteBuffer): F[Unit] =
108-
F.blocking(ch.write(buff)).cancelable(close) *>
109-
F.delay(buff.remaining <= 0).ifM(F.unit, go(buff))
110+
evalOnVirtualThreadIfAvailable(
111+
F.blocking(ch.write(buff)).cancelable(close) *>
112+
F.delay(buff.remaining <= 0).ifM(F.unit, go(buff))
113+
)
110114

111115
writeMutex.lock.surround {
112116
F.delay(bytes.toByteBuffer).flatMap(go)
@@ -118,15 +122,19 @@ private[unixsocket] trait UnixSocketsCompanionPlatform {
118122
private def raiseIpAddressError[A]: F[A] =
119123
F.raiseError(new UnsupportedOperationException("UnixSockets do not use IP addressing"))
120124

121-
def isOpen: F[Boolean] = F.blocking(ch.isOpen())
122-
def close: F[Unit] = F.blocking(ch.close())
125+
def isOpen: F[Boolean] = evalOnVirtualThreadIfAvailable(F.blocking(ch.isOpen()))
126+
def close: F[Unit] = evalOnVirtualThreadIfAvailable(F.blocking(ch.close()))
123127
def endOfOutput: F[Unit] =
124-
F.blocking {
125-
ch.shutdownOutput(); ()
126-
}
128+
evalOnVirtualThreadIfAvailable(
129+
F.blocking {
130+
ch.shutdownOutput(); ()
131+
}
132+
)
127133
def endOfInput: F[Unit] =
128-
F.blocking {
129-
ch.shutdownInput(); ()
130-
}
134+
evalOnVirtualThreadIfAvailable(
135+
F.blocking {
136+
ch.shutdownInput(); ()
137+
}
138+
)
131139
}
132140
}

0 commit comments

Comments
 (0)