Skip to content

Commit cef5cd5

Browse files
authored
Merge pull request #3548 from onsah/issue/3182-jdk-unix-sockets
Use virtual threads when possible for JDK `UnixSocket`s
2 parents 6c8fd4a + 279f33c commit cef5cd5

File tree

2 files changed

+42
-31
lines changed

2 files changed

+42
-31
lines changed

io/jvm/src/main/scala/fs2/io/net/unixsocket/JdkUnixSockets.scala

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ package fs2.io.net.unixsocket
2424
import cats.effect.kernel.{Async, Resource}
2525
import cats.effect.syntax.all._
2626
import fs2.io.file.Files
27+
import fs2.io.evalOnVirtualThreadIfAvailable
2728
import java.net.{StandardProtocolFamily, UnixDomainSocketAddress}
2829
import java.nio.channels.{ServerSocketChannel, SocketChannel}
2930

@@ -41,28 +42,32 @@ object JdkUnixSockets {
4142
private[unixsocket] class JdkUnixSocketsImpl[F[_]: Files](implicit F: Async[F])
4243
extends UnixSockets.AsyncUnixSockets[F] {
4344
protected def openChannel(address: UnixSocketAddress) =
44-
Resource
45-
.make(F.blocking(SocketChannel.open(StandardProtocolFamily.UNIX)))(ch =>
46-
F.blocking(ch.close())
47-
)
48-
.evalTap { ch =>
49-
F.blocking(ch.connect(UnixDomainSocketAddress.of(address.path)))
50-
.cancelable(F.blocking(ch.close()))
51-
}
45+
evalOnVirtualThreadIfAvailable(
46+
Resource
47+
.make(
48+
F.blocking(SocketChannel.open(StandardProtocolFamily.UNIX))
49+
)(ch => F.blocking(ch.close()))
50+
.evalTap { ch =>
51+
F.blocking(ch.connect(UnixDomainSocketAddress.of(address.path)))
52+
.cancelable(F.blocking(ch.close()))
53+
}
54+
)
5255

5356
protected def openServerChannel(address: UnixSocketAddress) =
54-
Resource
55-
.make(F.blocking(ServerSocketChannel.open(StandardProtocolFamily.UNIX)))(ch =>
56-
F.blocking(ch.close())
57-
)
58-
.evalTap { sch =>
59-
F.blocking(sch.bind(UnixDomainSocketAddress.of(address.path)))
60-
.cancelable(F.blocking(sch.close()))
61-
}
62-
.map { sch =>
63-
Resource.makeFull[F, SocketChannel] { poll =>
64-
poll(F.blocking(sch.accept).cancelable(F.blocking(sch.close())))
65-
}(ch => F.blocking(ch.close()))
66-
}
57+
evalOnVirtualThreadIfAvailable(
58+
Resource
59+
.make(
60+
F.blocking(ServerSocketChannel.open(StandardProtocolFamily.UNIX))
61+
)(ch => F.blocking(ch.close()))
62+
.evalTap { sch =>
63+
F.blocking(sch.bind(UnixDomainSocketAddress.of(address.path)))
64+
.cancelable(F.blocking(sch.close()))
65+
}
66+
.map { sch =>
67+
Resource.makeFull[F, SocketChannel] { poll =>
68+
poll(F.blocking(sch.accept).cancelable(F.blocking(sch.close())))
69+
}(ch => F.blocking(ch.close()))
70+
}
71+
)
6772

6873
}

io/jvm/src/main/scala/fs2/io/net/unixsocket/UnixSocketsPlatform.scala

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import com.comcast.ip4s.{IpAddress, SocketAddress}
3131
import fs2.{Chunk, Stream}
3232
import fs2.io.file.{Files, Path}
3333
import fs2.io.net.Socket
34+
import fs2.io.evalOnVirtualThreadIfAvailable
3435
import java.nio.ByteBuffer
3536
import java.nio.channels.SocketChannel
3637

@@ -101,15 +102,16 @@ private[unixsocket] trait UnixSocketsCompanionPlatform {
101102
extends Socket.BufferedReads[F](readMutex) {
102103

103104
def readChunk(buff: ByteBuffer): F[Int] =
104-
F.blocking(ch.read(buff)).cancelable(close)
105+
evalOnVirtualThreadIfAvailable(F.blocking(ch.read(buff)))
106+
.cancelable(close)
105107

106108
def write(bytes: Chunk[Byte]): F[Unit] = {
107109
def go(buff: ByteBuffer): F[Unit] =
108110
F.blocking(ch.write(buff)).cancelable(close) *>
109111
F.delay(buff.remaining <= 0).ifM(F.unit, go(buff))
110112

111113
writeMutex.lock.surround {
112-
F.delay(bytes.toByteBuffer).flatMap(go)
114+
F.delay(bytes.toByteBuffer).flatMap(buffer => evalOnVirtualThreadIfAvailable(go(buffer)))
113115
}
114116
}
115117

@@ -118,15 +120,19 @@ private[unixsocket] trait UnixSocketsCompanionPlatform {
118120
private def raiseIpAddressError[A]: F[A] =
119121
F.raiseError(new UnsupportedOperationException("UnixSockets do not use IP addressing"))
120122

121-
def isOpen: F[Boolean] = F.blocking(ch.isOpen())
122-
def close: F[Unit] = F.blocking(ch.close())
123+
def isOpen: F[Boolean] = evalOnVirtualThreadIfAvailable(F.blocking(ch.isOpen()))
124+
def close: F[Unit] = evalOnVirtualThreadIfAvailable(F.blocking(ch.close()))
123125
def endOfOutput: F[Unit] =
124-
F.blocking {
125-
ch.shutdownOutput(); ()
126-
}
126+
evalOnVirtualThreadIfAvailable(
127+
F.blocking {
128+
ch.shutdownOutput(); ()
129+
}
130+
)
127131
def endOfInput: F[Unit] =
128-
F.blocking {
129-
ch.shutdownInput(); ()
130-
}
132+
evalOnVirtualThreadIfAvailable(
133+
F.blocking {
134+
ch.shutdownInput(); ()
135+
}
136+
)
131137
}
132138
}

0 commit comments

Comments
 (0)