Skip to content

Commit 50c1094

Browse files
committed
Fix Socket resource handling
1 parent 2adcb36 commit 50c1094

File tree

2 files changed

+27
-27
lines changed

2 files changed

+27
-27
lines changed

io/jvm-native/src/main/scala/fs2/io/net/SocketGroupPlatform.scala

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ private[net] trait SocketGroupCompanionPlatform { self: SocketGroup.type =>
7575
}
7676
}
7777

78-
setup.flatMap(ch => Resource.eval(connect(ch))).flatMap(Socket.forAsync(_))
78+
setup.evalMap(ch => connect(ch) *> Socket.forAsync(ch))
7979
}
8080

8181
def serverResource(
@@ -108,32 +108,34 @@ private[net] trait SocketGroupCompanionPlatform { self: SocketGroup.type =>
108108
sch: AsynchronousServerSocketChannel
109109
): Stream[F, Socket[F]] = {
110110
def go: Stream[F, Socket[F]] = {
111-
def acceptChannel: F[AsynchronousSocketChannel] =
112-
Async[F].async[AsynchronousSocketChannel] { cb =>
113-
Async[F]
114-
.delay {
115-
sch.accept(
116-
null,
117-
new CompletionHandler[AsynchronousSocketChannel, Void] {
118-
def completed(ch: AsynchronousSocketChannel, attachment: Void): Unit =
119-
cb(Right(ch))
120-
def failed(rsn: Throwable, attachment: Void): Unit =
121-
cb(Left(rsn))
122-
}
123-
)
124-
}
125-
.as(Some(Async[F].delay(sch.close())))
111+
def acceptChannel = Resource.makeFull[F, AsynchronousSocketChannel] { poll =>
112+
poll {
113+
Async[F].async[AsynchronousSocketChannel] { cb =>
114+
Async[F]
115+
.delay {
116+
sch.accept(
117+
null,
118+
new CompletionHandler[AsynchronousSocketChannel, Void] {
119+
def completed(ch: AsynchronousSocketChannel, attachment: Void): Unit =
120+
cb(Right(ch))
121+
def failed(rsn: Throwable, attachment: Void): Unit =
122+
cb(Left(rsn))
123+
}
124+
)
125+
}
126+
.as(Some(Async[F].delay(sch.close())))
127+
}
126128
}
129+
}(ch => Async[F].delay(if (ch.isOpen) ch.close else ()))
127130

128131
def setOpts(ch: AsynchronousSocketChannel) =
129132
Async[F].delay {
130133
options.foreach(o => ch.setOption(o.key, o.value))
131134
}
132135

133-
Stream.eval(acceptChannel.attempt).flatMap {
134-
case Left(_) => Stream.empty[F]
135-
case Right(accepted) =>
136-
Stream.resource(Socket.forAsync(accepted).evalTap(_ => setOpts(accepted)))
136+
Stream.resource(acceptChannel.attempt).flatMap {
137+
case Left(_) => Stream.empty[F]
138+
case Right(accepted) => Stream.eval(setOpts(accepted) *> Socket.forAsync(accepted))
137139
} ++ go
138140
}
139141

io/jvm-native/src/main/scala/fs2/io/net/SocketPlatform.scala

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ package io
2424
package net
2525

2626
import com.comcast.ip4s.{IpAddress, SocketAddress}
27-
import cats.effect.{Async, Resource}
27+
import cats.effect.Async
2828
import cats.effect.std.Semaphore
2929
import cats.syntax.all._
3030

@@ -35,12 +35,10 @@ import java.nio.{Buffer, ByteBuffer}
3535
private[net] trait SocketCompanionPlatform {
3636
private[net] def forAsync[F[_]: Async](
3737
ch: AsynchronousSocketChannel
38-
): Resource[F, Socket[F]] =
39-
Resource.make {
40-
(Semaphore[F](1), Semaphore[F](1)).mapN { (readSemaphore, writeSemaphore) =>
41-
new AsyncSocket[F](ch, readSemaphore, writeSemaphore)
42-
}
43-
}(_ => Async[F].delay(if (ch.isOpen) ch.close else ()))
38+
): F[Socket[F]] =
39+
(Semaphore[F](1), Semaphore[F](1)).mapN { (readSemaphore, writeSemaphore) =>
40+
new AsyncSocket[F](ch, readSemaphore, writeSemaphore)
41+
}
4442

4543
private[net] abstract class BufferedReads[F[_]](
4644
readSemaphore: Semaphore[F]

0 commit comments

Comments
 (0)