Skip to content

Commit b3be9fd

Browse files
committed
Merge remote-tracking branch 'upstream/main' into feature/signal-get-and-discrete-updates
2 parents 7aca6e9 + a04e1be commit b3be9fd

File tree

16 files changed

+1240
-10
lines changed

16 files changed

+1240
-10
lines changed

build.sbt

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -211,24 +211,28 @@ lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform)
211211
.settings(
212212
name := "fs2-core",
213213
libraryDependencies ++= Seq(
214+
"org.scodec" %%% "scodec-bits" % "1.1.34",
214215
"org.typelevel" %%% "cats-core" % "2.9.0",
215-
"org.typelevel" %%% "cats-laws" % "2.9.0" % Test,
216216
"org.typelevel" %%% "cats-effect" % "3.4.6",
217217
"org.typelevel" %%% "cats-effect-laws" % "3.4.6" % Test,
218218
"org.typelevel" %%% "cats-effect-testkit" % "3.4.6" % Test,
219-
"org.scodec" %%% "scodec-bits" % "1.1.34",
220-
"org.typelevel" %%% "scalacheck-effect-munit" % "2.0.0-M2" % Test,
219+
"org.typelevel" %%% "cats-laws" % "2.9.0" % Test,
220+
"org.typelevel" %%% "discipline-munit" % "2.0.0-M3" % Test,
221221
"org.typelevel" %%% "munit-cats-effect" % "2.0.0-M3" % Test,
222-
"org.typelevel" %%% "discipline-munit" % "2.0.0-M3" % Test
222+
"org.typelevel" %%% "scalacheck-effect-munit" % "2.0.0-M2" % Test
223223
),
224-
tlJdkRelease := Some(8),
224+
tlJdkRelease := None,
225225
Compile / doc / scalacOptions ++= (if (scalaVersion.value.startsWith("2.")) Seq("-nowarn")
226226
else Nil)
227227
)
228228

229229
lazy val coreJVM = core.jvm
230230
.settings(
231231
Test / fork := true,
232+
libraryDependencies ++= Seq(
233+
"org.reactivestreams" % "reactive-streams-tck-flow" % "1.0.4" % Test,
234+
"org.scalatestplus" %% "testng-7-5" % "3.2.14.0" % Test
235+
),
232236
doctestIgnoreRegex := Some(".*NotGiven.scala")
233237
)
234238

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Copyright (c) 2013 Functional Streams for Scala
3+
*
4+
* Permission is hereby granted, free of charge, to any person obtaining a copy of
5+
* this software and associated documentation files (the "Software"), to deal in
6+
* the Software without restriction, including without limitation the rights to
7+
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
8+
* the Software, and to permit persons to whom the Software is furnished to do so,
9+
* subject to the following conditions:
10+
*
11+
* The above copyright notice and this permission notice shall be included in all
12+
* copies or substantial portions of the Software.
13+
*
14+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
16+
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
17+
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
18+
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
19+
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
20+
*/
21+
22+
package fs2
23+
package interop
24+
package flow
25+
26+
import cats.effect.kernel.{Async, Resource}
27+
import cats.effect.std.Dispatcher
28+
29+
import java.util.Objects.requireNonNull
30+
import java.util.concurrent.Flow.{Publisher, Subscriber, Subscription}
31+
import scala.util.control.NoStackTrace
32+
33+
/** Implementation of a [[Publisher]].
34+
*
35+
* This is used to publish elements from a [[Stream]] to a downstream reactive-streams system.
36+
*
37+
* @note This Publisher can be reused for multiple Subscribers,
38+
* each subscription will re-run the [[Stream]] from the beginning.
39+
*
40+
* @see [[https://github.com/reactive-streams/reactive-streams-jvm#1-publisher-code]]
41+
*/
42+
private[flow] final class StreamPublisher[F[_], A] private (
43+
stream: Stream[F, A],
44+
startDispatcher: Dispatcher[F]
45+
)(implicit F: Async[F])
46+
extends Publisher[A] {
47+
override def subscribe(subscriber: Subscriber[_ >: A]): Unit = {
48+
requireNonNull(
49+
subscriber,
50+
"The subscriber provided to subscribe must not be null"
51+
)
52+
try
53+
startDispatcher.unsafeRunAndForget(
54+
StreamSubscription.subscribe(stream, subscriber)
55+
)
56+
catch {
57+
case _: IllegalStateException =>
58+
subscriber.onSubscribe(new Subscription {
59+
override def cancel(): Unit = ()
60+
override def request(x$1: Long): Unit = ()
61+
})
62+
subscriber.onError(StreamPublisher.CanceledStreamPublisherException)
63+
}
64+
}
65+
}
66+
67+
private[flow] object StreamPublisher {
68+
def apply[F[_], A](
69+
stream: Stream[F, A]
70+
)(implicit F: Async[F]): Resource[F, StreamPublisher[F, A]] =
71+
Dispatcher.parallel[F](await = false).map { startDispatcher =>
72+
new StreamPublisher(stream, startDispatcher)
73+
}
74+
75+
private object CanceledStreamPublisherException
76+
extends IllegalStateException(
77+
"This StreamPublisher is not longer accepting subscribers"
78+
)
79+
with NoStackTrace
80+
}
Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
/*
2+
* Copyright (c) 2013 Functional Streams for Scala
3+
*
4+
* Permission is hereby granted, free of charge, to any person obtaining a copy of
5+
* this software and associated documentation files (the "Software"), to deal in
6+
* the Software without restriction, including without limitation the rights to
7+
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
8+
* the Software, and to permit persons to whom the Software is furnished to do so,
9+
* subject to the following conditions:
10+
*
11+
* The above copyright notice and this permission notice shall be included in all
12+
* copies or substantial portions of the Software.
13+
*
14+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
16+
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
17+
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
18+
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
19+
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
20+
*/
21+
22+
package fs2
23+
package interop
24+
package flow
25+
26+
import cats.MonadThrow
27+
import cats.effect.kernel.Async
28+
import cats.syntax.all._
29+
30+
import java.util.Objects.requireNonNull
31+
import java.util.concurrent.Flow.{Subscriber, Subscription}
32+
import java.util.concurrent.atomic.AtomicReference
33+
34+
/** Implementation of a [[Subscriber]].
35+
*
36+
* This is used to obtain a [[Stream]] from an upstream reactive-streams system.
37+
*
38+
* @see [[https://github.com/reactive-streams/reactive-streams-jvm#2-subscriber-code]]
39+
*/
40+
private[flow] final class StreamSubscriber[F[_], A] private (
41+
private[flow] val subscriber: StreamSubscriber.FSM[F, A]
42+
)(implicit
43+
F: MonadThrow[F]
44+
) extends Subscriber[A] {
45+
46+
/** Called by an upstream reactive-streams system. */
47+
override def onSubscribe(subscription: Subscription): Unit = {
48+
requireNonNull(
49+
subscription,
50+
"The subscription provided to onSubscribe must not be null"
51+
)
52+
subscriber.onSubscribe(subscription)
53+
}
54+
55+
/** Called by an upstream reactive-streams system. */
56+
override def onNext(a: A): Unit = {
57+
requireNonNull(
58+
a,
59+
"The element provided to onNext must not be null"
60+
)
61+
subscriber.onNext(a)
62+
}
63+
64+
/** Called by an upstream reactive-streams system. */
65+
override def onComplete(): Unit =
66+
subscriber.onComplete()
67+
68+
/** Called by an upstream reactive-streams system. */
69+
override def onError(t: Throwable): Unit = {
70+
requireNonNull(
71+
t,
72+
"The throwable provided to onError must not be null"
73+
)
74+
subscriber.onError(t)
75+
}
76+
77+
/** Creates a [[Stream]] from this [[Subscriber]]. */
78+
def stream(subscribe: F[Unit]): Stream[F, A] =
79+
subscriber.stream(subscribe)
80+
}
81+
82+
private[flow] object StreamSubscriber {
83+
84+
/** Instantiates a new [[StreamSubscriber]] for the given buffer size. */
85+
def apply[F[_], A](chunkSize: Int)(implicit F: Async[F]): F[StreamSubscriber[F, A]] = {
86+
require(chunkSize > 0, "The buffer size MUST be positive")
87+
fsm[F, A](chunkSize).map(fsm => new StreamSubscriber(subscriber = fsm))
88+
}
89+
90+
/** A finite state machine describing the subscriber. */
91+
private[flow] trait FSM[F[_], A] {
92+
93+
/** Receives a subscription from upstream. */
94+
def onSubscribe(s: Subscription): Unit
95+
96+
/** Receives next record from upstream. */
97+
def onNext(a: A): Unit
98+
99+
/** Receives error from upstream. */
100+
def onError(t: Throwable): Unit
101+
102+
/** Called when upstream has finished sending records. */
103+
def onComplete(): Unit
104+
105+
/** Called when downstream has finished consuming records. */
106+
def onFinalize: F[Unit]
107+
108+
/** Producer for downstream. */
109+
def dequeue1: F[Either[Throwable, Option[Chunk[A]]]]
110+
111+
/** Downstream [[Stream]]. */
112+
final def stream(subscribe: F[Unit])(implicit ev: MonadThrow[F]): Stream[F, A] =
113+
Stream.bracket(subscribe)(_ => onFinalize) >>
114+
Stream
115+
.repeatEval(dequeue1)
116+
.rethrow
117+
.unNoneTerminate
118+
.unchunks
119+
}
120+
121+
private def fsm[F[_], A](
122+
chunkSize: Int
123+
)(implicit F: Async[F]): F[FSM[F, A]] = {
124+
type Out = Either[Throwable, Option[Chunk[A]]]
125+
126+
sealed trait Input
127+
case class OnSubscribe(s: Subscription) extends Input
128+
case class OnNext(a: A) extends Input
129+
case class OnError(e: Throwable) extends Input
130+
case object OnComplete extends Input
131+
case object OnFinalize extends Input
132+
case class OnDequeue(response: Out => Unit) extends Input
133+
134+
sealed trait State
135+
case object Uninitialized extends State
136+
case class Idle(sub: Subscription, buffer: Chunk[A]) extends State
137+
case class RequestBeforeSubscription(req: Out => Unit) extends State
138+
case class WaitingOnUpstream(
139+
sub: Subscription,
140+
buffer: Chunk[A],
141+
elementRequest: Out => Unit
142+
) extends State
143+
case object UpstreamCompletion extends State
144+
case object DownstreamCancellation extends State
145+
case class UpstreamError(err: Throwable) extends State
146+
147+
def reportFailure(e: Throwable): Unit =
148+
Thread.getDefaultUncaughtExceptionHandler match {
149+
case null => e.printStackTrace()
150+
case h => h.uncaughtException(Thread.currentThread(), e)
151+
}
152+
153+
def step(in: Input): State => (State, () => Unit) =
154+
in match {
155+
case OnSubscribe(s) => {
156+
case RequestBeforeSubscription(req) =>
157+
WaitingOnUpstream(s, Chunk.empty, req) -> (() => s.request(chunkSize.toLong))
158+
159+
case Uninitialized =>
160+
Idle(s, Chunk.empty) -> (() => ())
161+
162+
case o =>
163+
val err = new Error(s"Received subscription in invalid state [${o}]")
164+
o -> { () =>
165+
s.cancel()
166+
reportFailure(err)
167+
}
168+
}
169+
170+
case OnNext(a) => {
171+
case WaitingOnUpstream(s, buffer, r) =>
172+
val newBuffer = buffer ++ Chunk.singleton(a)
173+
if (newBuffer.size == chunkSize)
174+
Idle(s, Chunk.empty) -> (() => r(newBuffer.some.asRight))
175+
else
176+
WaitingOnUpstream(s, newBuffer, r) -> (() => ())
177+
178+
case DownstreamCancellation =>
179+
DownstreamCancellation -> (() => ())
180+
181+
case o =>
182+
o -> (() => reportFailure(new Error(s"Received record [${a}] in invalid state [${o}]")))
183+
}
184+
185+
case OnComplete => {
186+
case WaitingOnUpstream(_, buffer, r) =>
187+
if (buffer.nonEmpty)
188+
UpstreamCompletion -> (() => r(buffer.some.asRight))
189+
else
190+
UpstreamCompletion -> (() => r(None.asRight))
191+
192+
case _ =>
193+
UpstreamCompletion -> (() => ())
194+
}
195+
196+
case OnError(e) => {
197+
case WaitingOnUpstream(_, _, r) =>
198+
UpstreamError(e) -> (() => r(e.asLeft))
199+
200+
case _ =>
201+
UpstreamError(e) -> (() => ())
202+
}
203+
204+
case OnFinalize => {
205+
case WaitingOnUpstream(sub, _, r) =>
206+
DownstreamCancellation -> { () =>
207+
sub.cancel()
208+
r(None.asRight)
209+
}
210+
211+
case Idle(sub, _) =>
212+
DownstreamCancellation -> (() => sub.cancel())
213+
214+
case o =>
215+
o -> (() => ())
216+
}
217+
218+
case OnDequeue(r) => {
219+
case Uninitialized =>
220+
RequestBeforeSubscription(r) -> (() => ())
221+
222+
case Idle(sub, buffer) =>
223+
WaitingOnUpstream(sub, buffer, r) -> (() => sub.request(chunkSize.toLong))
224+
225+
case err @ UpstreamError(e) =>
226+
err -> (() => r(e.asLeft))
227+
228+
case UpstreamCompletion =>
229+
UpstreamCompletion -> (() => r(None.asRight))
230+
231+
case o =>
232+
o -> (() => r(new Error(s"Received request in invalid state [${o}]").asLeft))
233+
}
234+
}
235+
236+
F.delay(new AtomicReference[(State, () => Unit)]((Uninitialized, () => ()))).map { ref =>
237+
new FSM[F, A] {
238+
def nextState(in: Input): Unit = {
239+
val (_, effect) = ref.updateAndGet { case (state, _) =>
240+
step(in)(state)
241+
}
242+
effect()
243+
}
244+
245+
override final def onSubscribe(s: Subscription): Unit =
246+
nextState(OnSubscribe(s))
247+
248+
override final def onNext(a: A): Unit =
249+
nextState(OnNext(a))
250+
251+
override final def onError(t: Throwable): Unit =
252+
nextState(OnError(t))
253+
254+
override final def onComplete(): Unit =
255+
nextState(OnComplete)
256+
257+
override final val onFinalize: F[Unit] =
258+
F.delay(nextState(OnFinalize))
259+
260+
override final val dequeue1: F[Either[Throwable, Option[Chunk[A]]]] =
261+
F.async_ { cb =>
262+
nextState(OnDequeue(out => cb(Right(out))))
263+
}
264+
}
265+
}
266+
}
267+
}

0 commit comments

Comments
 (0)