@@ -23,8 +23,8 @@ package fs2.io.net.unixsocket
2323
2424import cats .effect .kernel .{Async , Resource }
2525import cats .effect .syntax .all ._
26- import fs2 .io .syntax .AsyncOps
2726import fs2 .io .file .Files
27+ import fs2 .io .evalOnVirtualThreadIfAvailable
2828import java .net .{StandardProtocolFamily , UnixDomainSocketAddress }
2929import java .nio .channels .{ServerSocketChannel , SocketChannel }
3030
@@ -42,33 +42,32 @@ object JdkUnixSockets {
4242private [unixsocket] class JdkUnixSocketsImpl [F [_]: Files ](implicit F : Async [F ])
4343 extends UnixSockets .AsyncUnixSockets [F ] {
4444 protected def openChannel (address : UnixSocketAddress ) =
45- Resource
46- .make(
47- F .blocking( SocketChannel .open( StandardProtocolFamily . UNIX )).evalOnVirtualThreadIfAvailable()
48- )(ch => F .blocking(ch.close() ))
49- . evalOnVirtualThreadIfAvailable()
50- .evalTap { ch =>
51- F .blocking(ch.connect(UnixDomainSocketAddress .of(address.path)))
52- .cancelable(F .blocking(ch.close()))
53- .evalOnVirtualThreadIfAvailable()
54- }
45+ evalOnVirtualThreadIfAvailable(
46+ Resource
47+ .make(
48+ F .blocking(SocketChannel .open( StandardProtocolFamily . UNIX ))
49+ )(ch => evalOnVirtualThreadIfAvailable(F .blocking(ch.close())) )
50+ .evalTap { ch =>
51+ F .blocking(ch.connect(UnixDomainSocketAddress .of(address.path)))
52+ .cancelable(F .blocking(ch.close()))
53+ }
54+ )
5555
5656 protected def openServerChannel (address : UnixSocketAddress ) =
57- Resource
58- .make(
59- F .blocking(ServerSocketChannel .open(StandardProtocolFamily .UNIX ))
60- .evalOnVirtualThreadIfAvailable()
61- )(ch => F .blocking(ch.close()).evalOnVirtualThreadIfAvailable())
62- .evalTap { sch =>
63- F .blocking(sch.bind(UnixDomainSocketAddress .of(address.path)))
64- .cancelable(F .blocking(sch.close()))
65- .evalOnVirtualThreadIfAvailable()
66- }
67- .map { sch =>
68- Resource .makeFull[F , SocketChannel ] { poll =>
69- poll(F .blocking(sch.accept).cancelable(F .blocking(sch.close())))
70- .evalOnVirtualThreadIfAvailable()
71- }(ch => F .blocking(ch.close()).evalOnVirtualThreadIfAvailable())
72- }
57+ evalOnVirtualThreadIfAvailable(
58+ Resource
59+ .make(
60+ F .blocking(ServerSocketChannel .open(StandardProtocolFamily .UNIX ))
61+ )(ch => F .blocking(ch.close()))
62+ .evalTap { sch =>
63+ F .blocking(sch.bind(UnixDomainSocketAddress .of(address.path)))
64+ .cancelable(evalOnVirtualThreadIfAvailable(F .blocking(sch.close())))
65+ }
66+ .map { sch =>
67+ Resource .makeFull[F , SocketChannel ] { poll =>
68+ poll(F .blocking(sch.accept).cancelable(F .blocking(sch.close())))
69+ }(ch => F .blocking(ch.close()))
70+ }
71+ )
7372
7473}
0 commit comments