@@ -32,10 +32,8 @@ import java.nio.channels.{
3232}
3333import java .nio .channels .AsynchronousChannelGroup
3434import cats .syntax .all ._
35- import cats .effect .syntax .all ._
3635import cats .effect .kernel .{Async , Resource }
3736import com .comcast .ip4s .{Host , IpAddress , Port , SocketAddress }
38- import fs2 .concurrent .Channel
3937
4038private [net] trait SocketGroupCompanionPlatform { self : SocketGroup .type =>
4139 private [fs2] def unsafe [F [_]: Async ](channelGroup : AsynchronousChannelGroup ): SocketGroup [F ] =
@@ -86,94 +84,73 @@ private[net] trait SocketGroupCompanionPlatform { self: SocketGroup.type =>
8684 options : List [SocketOption ]
8785 ): Resource [F , (SocketAddress [IpAddress ], Stream [F , Socket [F ]])] = {
8886
89- val setup : Resource [
90- F ,
91- (AsynchronousServerSocketChannel , Channel [F , Either [Throwable , AsynchronousSocketChannel ]])
92- ] =
87+ val setup : Resource [F , AsynchronousServerSocketChannel ] =
9388 Resource .eval(address.traverse(_.resolve[F ])).flatMap { addr =>
9489 Resource
9590 .make(
9691 Async [F ].delay(
9792 AsynchronousServerSocketChannel .open(channelGroup)
9893 )
9994 )(sch => Async [F ].delay(if (sch.isOpen) sch.close()))
100- .evalTap { sch =>
95+ .evalTap(ch =>
10196 Async [F ].delay(
102- sch .bind(
97+ ch .bind(
10398 new InetSocketAddress (
10499 addr.map(_.toInetAddress).orNull,
105100 port.map(_.value).getOrElse(0 )
106101 )
107102 )
108103 )
109- }
110- .mproduct { sch =>
111- def acceptChannel : F [AsynchronousSocketChannel ] =
112- Async [F ].async[AsynchronousSocketChannel ] { cb =>
113- Async [F ]
114- .delay {
115- sch.accept(
116- null ,
117- new CompletionHandler [AsynchronousSocketChannel , Void ] {
118- def completed (ch : AsynchronousSocketChannel , attachment : Void ): Unit =
119- cb(Right (ch))
120- def failed (rsn : Throwable , attachment : Void ): Unit =
121- cb(Left (rsn))
122- }
123- )
124- }
125- .as(Some (Async [F ].delay(sch.close())))
126- }
104+ )
105+ }
127106
128- Resource
129- .make(Channel .synchronous[F , Either [Throwable , AsynchronousSocketChannel ]]) {
130- accepted =>
131- accepted.close *>
132- accepted.stream
133- .foreach(_.traverse_(ch => Async [F ].delay(ch.close())))
134- .compile
135- .drain
136- }
137- .flatTap { accepted =>
138- Stream
139- .repeatEval(acceptChannel.attempt)
140- .through(accepted.sendAll)
141- .compile
142- .drain
143- .background
107+ def acceptIncoming (
108+ sch : AsynchronousServerSocketChannel
109+ ): Stream [F , Socket [F ]] = {
110+ def go : Stream [F , Socket [F ]] = {
111+ def acceptChannel : F [AsynchronousSocketChannel ] =
112+ Async [F ].async[AsynchronousSocketChannel ] { cb =>
113+ Async [F ]
114+ .delay {
115+ sch.accept(
116+ null ,
117+ new CompletionHandler [AsynchronousSocketChannel , Void ] {
118+ def completed (ch : AsynchronousSocketChannel , attachment : Void ): Unit =
119+ cb(Right (ch))
120+ def failed (rsn : Throwable , attachment : Void ): Unit =
121+ cb(Left (rsn))
122+ }
123+ )
144124 }
125+ .as(Some (Async [F ].delay(sch.close())))
145126 }
146127
147- }
148-
149- def acceptIncoming (sch : AsynchronousServerSocketChannel )(
150- incoming : Stream [F , Either [Throwable , AsynchronousSocketChannel ]]
151- ): Stream [F , Socket [F ]] = {
152- def setOpts (ch : AsynchronousSocketChannel ) =
153- Async [F ].delay {
154- options.foreach(o => ch.setOption(o.key, o.value))
155- }
128+ def setOpts (ch : AsynchronousSocketChannel ) =
129+ Async [F ].delay {
130+ options.foreach(o => ch.setOption(o.key, o.value))
131+ }
156132
157- incoming
158- .flatMap {
133+ Stream .eval(acceptChannel.attempt).flatMap {
159134 case Left (_) => Stream .empty[F ]
160135 case Right (accepted) =>
161136 Stream .resource(Socket .forAsync(accepted).evalTap(_ => setOpts(accepted)))
162- }
163- .handleErrorWith {
164- case err : AsynchronousCloseException =>
165- Stream .eval(Async [F ].delay(sch.isOpen)).flatMap { isOpen =>
166- if (isOpen) Stream .raiseError[F ](err)
167- else Stream .empty
168- }
169- case err => Stream .raiseError[F ](err)
170- }
137+ } ++ go
138+ }
139+
140+ go.handleErrorWith {
141+ case err : AsynchronousCloseException =>
142+ Stream .eval(Async [F ].delay(sch.isOpen)).flatMap { isOpen =>
143+ if (isOpen) Stream .raiseError[F ](err)
144+ else Stream .empty
145+ }
146+ case err => Stream .raiseError[F ](err)
147+ }
171148 }
172149
173- setup.map { case ( sch, incoming) =>
150+ setup.map { sch =>
174151 val jLocalAddress = sch.getLocalAddress.asInstanceOf [java.net.InetSocketAddress ]
175152 val localAddress = SocketAddress .fromInetSocketAddress(jLocalAddress)
176- (localAddress, incoming.stream.through( acceptIncoming(sch)(_) ))
153+ (localAddress, acceptIncoming(sch))
177154 }
178155 }
179156 }
0 commit comments