Skip to content

Commit 7643782

Browse files
authored
Merge pull request #3240 from armanbilge/feature/polling
I/O polling
2 parents c6de493 + b16280b commit 7643782

31 files changed

+1608
-110
lines changed

build.sbt

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -299,9 +299,9 @@ lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform)
299299
libraryDependencies ++= Seq(
300300
"org.scodec" %%% "scodec-bits" % "1.1.38",
301301
"org.typelevel" %%% "cats-core" % "2.11.0",
302-
"org.typelevel" %%% "cats-effect" % "3.5.7",
303-
"org.typelevel" %%% "cats-effect-laws" % "3.5.7" % Test,
304-
"org.typelevel" %%% "cats-effect-testkit" % "3.5.7" % Test,
302+
"org.typelevel" %%% "cats-effect" % "3.6.0-RC2",
303+
"org.typelevel" %%% "cats-effect-laws" % "3.6.0-RC1" % Test,
304+
"org.typelevel" %%% "cats-effect-testkit" % "3.6.0-RC1" % Test,
305305
"org.typelevel" %%% "cats-laws" % "2.11.0" % Test,
306306
"org.typelevel" %%% "discipline-munit" % "2.0.0-M3" % Test,
307307
"org.typelevel" %%% "munit-cats-effect" % "2.0.0" % Test,
@@ -370,9 +370,6 @@ lazy val io = crossProject(JVMPlatform, JSPlatform, NativePlatform)
370370
.nativeEnablePlugins(ScalaNativeBrewedConfigPlugin)
371371
.nativeSettings(commonNativeSettings)
372372
.nativeSettings(
373-
libraryDependencies ++= Seq(
374-
"com.armanbilge" %%% "epollcat" % "0.1.6" % Test
375-
),
376373
Test / nativeBrewFormulas += "s2n",
377374
Test / envVars ++= Map("S2N_DONT_MLOCK" -> "1")
378375
)

core/shared/src/main/scala/fs2/concurrent/Signal.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ object Signal extends SignalInstances {
215215
* need looping even without any other writers.
216216
*/
217217
abstract class SignallingRef[F[_], A] extends Ref[F, A] with Signal[F, A] {
218-
def mapK[G[_]](
218+
override def mapK[G[_]](
219219
f: FunctionK[F, G]
220220
)(implicit G: Functor[G], dummy: DummyImplicit): SignallingRef[G, A] =
221221
new TransformedSignallingRef(this, f)

io/jvm-native/src/main/scala/fs2/io/iojvmnative.scala

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -22,37 +22,15 @@
2222
package fs2
2323
package io
2424

25-
import cats._
2625
import cats.effect.kernel.Sync
2726
import cats.syntax.all._
2827

29-
import java.nio.charset.Charset
30-
import java.nio.charset.StandardCharsets
3128
import scala.reflect.ClassTag
3229

3330
private[fs2] trait iojvmnative {
3431
type InterruptedIOException = java.io.InterruptedIOException
3532
type ClosedChannelException = java.nio.channels.ClosedChannelException
3633

37-
//
38-
// STDIN/STDOUT Helpers
39-
40-
/** Pipe of bytes that writes emitted values to standard output asynchronously. */
41-
def stdout[F[_]: Sync]: Pipe[F, Byte, Nothing] =
42-
writeOutputStream(Sync[F].blocking(System.out), false)
43-
44-
/** Pipe of bytes that writes emitted values to standard error asynchronously. */
45-
def stderr[F[_]: Sync]: Pipe[F, Byte, Nothing] =
46-
writeOutputStream(Sync[F].blocking(System.err), false)
47-
48-
/** Writes this stream to standard output asynchronously, converting each element to
49-
* a sequence of bytes via `Show` and the given `Charset`.
50-
*/
51-
def stdoutLines[F[_]: Sync, O: Show](
52-
charset: Charset = StandardCharsets.UTF_8
53-
): Pipe[F, O, Nothing] =
54-
_.map(_.show).through(text.encode(charset)).through(stdout)
55-
5634
/** Stream of bytes read asynchronously from the specified resource relative to the class `C`.
5735
* @see [[readClassLoaderResource]] for a resource relative to a classloader.
5836
*/

io/jvm/src/main/scala/fs2/io/ioplatform.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
package fs2
2323
package io
2424

25+
import cats.Show
2526
import cats.effect.kernel.{Async, Outcome, Resource, Sync}
2627
import cats.effect.kernel.implicits._
2728
import cats.effect.kernel.Deferred
@@ -30,6 +31,8 @@ import fs2.internal.ThreadFactories
3031
import fs2.io.internal.PipedStreamBuffer
3132

3233
import java.io.{InputStream, OutputStream}
34+
import java.nio.charset.Charset
35+
import java.nio.charset.StandardCharsets
3336
import java.util.concurrent.Executors
3437
import scala.concurrent.ExecutionContext
3538
import java.util.concurrent.ExecutorService
@@ -73,6 +76,22 @@ private[fs2] trait ioplatform extends iojvmnative {
7376
def stdinUtf8[F[_]](bufSize: Int, F: Sync[F]): Stream[F, String] =
7477
stdin(bufSize, F).through(text.utf8.decode)
7578

79+
/** Pipe of bytes that writes emitted values to standard output asynchronously. */
80+
def stdout[F[_]: Sync]: Pipe[F, Byte, Nothing] =
81+
writeOutputStream(Sync[F].blocking(System.out), false)
82+
83+
/** Pipe of bytes that writes emitted values to standard error asynchronously. */
84+
def stderr[F[_]: Sync]: Pipe[F, Byte, Nothing] =
85+
writeOutputStream(Sync[F].blocking(System.err), false)
86+
87+
/** Writes this stream to standard output asynchronously, converting each element to
88+
* a sequence of bytes via `Show` and the given `Charset`.
89+
*/
90+
def stdoutLines[F[_]: Sync, O: Show](
91+
charset: Charset = StandardCharsets.UTF_8
92+
): Pipe[F, O, Nothing] =
93+
_.map(_.show).through(text.encode(charset)).through(stdout)
94+
7695
/** Pipe that converts a stream of bytes to a stream that will emit a single `java.io.InputStream`,
7796
* that is closed whenever the resulting stream terminates.
7897
*

io/jvm/src/main/scala/fs2/io/net/NetworkPlatform.scala

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ package net
2525

2626
import cats.effect.IO
2727
import cats.effect.LiftIO
28+
import cats.effect.Selector
2829
import cats.effect.kernel.{Async, Resource}
2930

3031
import com.comcast.ip4s.{Dns, Host, IpAddress, Port, SocketAddress}
@@ -78,10 +79,62 @@ private[net] trait NetworkCompanionPlatform extends NetworkLowPriority { self: N
7879

7980
def forIO: Network[IO] = forLiftIO
8081

81-
implicit def forLiftIO[F[_]: Async: LiftIO]: Network[F] = {
82-
val _ = LiftIO[F]
83-
forAsync
84-
}
82+
implicit def forLiftIO[F[_]: Async: LiftIO]: Network[F] =
83+
new UnsealedNetwork[F] {
84+
private lazy val fallback = forAsync[F]
85+
86+
private def tryGetSelector =
87+
IO.pollers.map(_.collectFirst { case selector: Selector => selector }).to[F]
88+
89+
private implicit def dns: Dns[F] = Dns.forAsync[F]
90+
91+
def socketGroup(threadCount: Int, threadFactory: ThreadFactory): Resource[F, SocketGroup[F]] =
92+
Resource.eval(tryGetSelector).flatMap {
93+
case Some(selector) => Resource.pure(new SelectingSocketGroup[F](selector))
94+
case None => fallback.socketGroup(threadCount, threadFactory)
95+
}
96+
97+
def datagramSocketGroup(threadFactory: ThreadFactory): Resource[F, DatagramSocketGroup[F]] =
98+
fallback.datagramSocketGroup(threadFactory)
99+
100+
def client(
101+
to: SocketAddress[Host],
102+
options: List[SocketOption]
103+
): Resource[F, Socket[F]] = Resource.eval(tryGetSelector).flatMap {
104+
case Some(selector) => new SelectingSocketGroup(selector).client(to, options)
105+
case None => fallback.client(to, options)
106+
}
107+
108+
def server(
109+
address: Option[Host],
110+
port: Option[Port],
111+
options: List[SocketOption]
112+
): Stream[F, Socket[F]] = Stream.eval(tryGetSelector).flatMap {
113+
case Some(selector) => new SelectingSocketGroup(selector).server(address, port, options)
114+
case None => fallback.server(address, port, options)
115+
}
116+
117+
def serverResource(
118+
address: Option[Host],
119+
port: Option[Port],
120+
options: List[SocketOption]
121+
): Resource[F, (SocketAddress[IpAddress], Stream[F, Socket[F]])] =
122+
Resource.eval(tryGetSelector).flatMap {
123+
case Some(selector) =>
124+
new SelectingSocketGroup(selector).serverResource(address, port, options)
125+
case None => fallback.serverResource(address, port, options)
126+
}
127+
128+
def openDatagramSocket(
129+
address: Option[Host],
130+
port: Option[Port],
131+
options: List[SocketOption],
132+
protocolFamily: Option[ProtocolFamily]
133+
): Resource[F, DatagramSocket[F]] =
134+
fallback.openDatagramSocket(address, port, options, protocolFamily)
135+
136+
def tlsContext: TLSContext.Builder[F] = TLSContext.Builder.forAsync[F]
137+
}
85138

86139
def forAsync[F[_]](implicit F: Async[F]): Network[F] =
87140
forAsyncAndDns(F, Dns.forAsync(F))
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Copyright (c) 2013 Functional Streams for Scala
3+
*
4+
* Permission is hereby granted, free of charge, to any person obtaining a copy of
5+
* this software and associated documentation files (the "Software"), to deal in
6+
* the Software without restriction, including without limitation the rights to
7+
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
8+
* the Software, and to permit persons to whom the Software is furnished to do so,
9+
* subject to the following conditions:
10+
*
11+
* The above copyright notice and this permission notice shall be included in all
12+
* copies or substantial portions of the Software.
13+
*
14+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
16+
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
17+
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
18+
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
19+
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
20+
*/
21+
22+
package fs2
23+
package io.net
24+
25+
import cats.effect.LiftIO
26+
import cats.effect.Selector
27+
import cats.effect.kernel.Async
28+
import cats.effect.std.Mutex
29+
import cats.syntax.all._
30+
import com.comcast.ip4s.IpAddress
31+
import com.comcast.ip4s.SocketAddress
32+
33+
import java.nio.ByteBuffer
34+
import java.nio.channels.SelectionKey.OP_READ
35+
import java.nio.channels.SelectionKey.OP_WRITE
36+
import java.nio.channels.SocketChannel
37+
38+
private final class SelectingSocket[F[_]: LiftIO] private (
39+
selector: Selector,
40+
ch: SocketChannel,
41+
readMutex: Mutex[F],
42+
writeMutex: Mutex[F],
43+
val localAddress: F[SocketAddress[IpAddress]],
44+
val remoteAddress: F[SocketAddress[IpAddress]]
45+
)(implicit F: Async[F])
46+
extends Socket.BufferedReads(readMutex) {
47+
48+
protected def readChunk(buf: ByteBuffer): F[Int] =
49+
F.delay(ch.read(buf)).flatMap { readed =>
50+
if (readed == 0) selector.select(ch, OP_READ).to *> readChunk(buf)
51+
else F.pure(readed)
52+
}
53+
54+
def write(bytes: Chunk[Byte]): F[Unit] = {
55+
def go(buf: ByteBuffer): F[Unit] =
56+
F.delay {
57+
ch.write(buf)
58+
buf.remaining()
59+
}.flatMap { remaining =>
60+
if (remaining > 0) {
61+
selector.select(ch, OP_WRITE).to *> go(buf)
62+
} else F.unit
63+
}
64+
writeMutex.lock.surround {
65+
F.delay(bytes.toByteBuffer).flatMap(go)
66+
}
67+
}
68+
69+
def isOpen: F[Boolean] = F.delay(ch.isOpen)
70+
71+
def endOfOutput: F[Unit] =
72+
F.delay {
73+
ch.shutdownOutput(); ()
74+
}
75+
76+
def endOfInput: F[Unit] =
77+
F.delay {
78+
ch.shutdownInput(); ()
79+
}
80+
81+
}
82+
83+
private object SelectingSocket {
84+
def apply[F[_]: LiftIO](
85+
selector: Selector,
86+
ch: SocketChannel,
87+
localAddress: F[SocketAddress[IpAddress]],
88+
remoteAddress: F[SocketAddress[IpAddress]]
89+
)(implicit F: Async[F]): F[Socket[F]] =
90+
(Mutex[F], Mutex[F]).flatMapN { (readMutex, writeMutex) =>
91+
F.delay {
92+
new SelectingSocket[F](
93+
selector,
94+
ch,
95+
readMutex,
96+
writeMutex,
97+
localAddress,
98+
remoteAddress
99+
)
100+
}
101+
}
102+
}

0 commit comments

Comments
 (0)