Skip to content

Commit 3f2cab8

Browse files
committed
Minor improvements
1 parent f326b98 commit 3f2cab8

File tree

4 files changed

+23
-21
lines changed

4 files changed

+23
-21
lines changed

core/shared/src/main/scala/fs2/interop/flow/StreamSubscriber.scala

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -50,49 +50,49 @@ private[flow] final class StreamSubscriber[F[_], A] private (
5050
// Subscriber API.
5151

5252
/** Receives a subscription from the upstream reactive-streams system. */
53-
override def onSubscribe(subscription: Subscription): Unit = {
53+
override final def onSubscribe(subscription: Subscription): Unit = {
5454
requireNonNull(
5555
subscription,
5656
"The subscription provided to onSubscribe must not be null"
5757
)
58-
nextState(Subscribe(subscription))
58+
nextState(input = Subscribe(subscription))
5959
}
6060

6161
/** Receives the next record from the upstream reactive-streams system. */
62-
override def onNext(a: A): Unit = {
62+
override final def onNext(a: A): Unit = {
6363
requireNonNull(
6464
a,
6565
"The element provided to onNext must not be null"
6666
)
67-
nextState(Next(a))
67+
nextState(input = Next(a))
6868
}
6969

7070
/** Called by the upstream reactive-streams system when it fails. */
71-
override def onError(ex: Throwable): Unit = {
71+
override final def onError(ex: Throwable): Unit = {
7272
requireNonNull(
7373
ex,
7474
"The throwable provided to onError must not be null"
7575
)
76-
nextState(Error(ex))
76+
nextState(input = Error(ex))
7777
}
7878

7979
/** Called by the upstream reactive-streams system when it has finished sending records. */
80-
override def onComplete(): Unit =
81-
nextState(Complete(canceled = false))
80+
override final def onComplete(): Unit =
81+
nextState(input = Complete(canceled = false))
8282

8383
// Interop API.
8484

8585
/** Creates a downstream [[Stream]] from this [[Subscriber]]. */
8686
private[flow] def stream(subscribe: F[Unit]): Stream[F, A] = {
8787
// Called when downstream has finished consuming records.
8888
val finalize =
89-
F.delay(nextState(Complete(canceled = true)))
89+
F.delay(nextState(input = Complete(canceled = true)))
9090

9191
// Producer for downstream.
9292
val dequeue1 =
9393
F.async[Option[Chunk[Any]]] { cb =>
9494
F.delay {
95-
nextState(Dequeue(cb))
95+
nextState(input = Dequeue(cb))
9696

9797
Some(finalize)
9898
}
@@ -112,8 +112,8 @@ private[flow] final class StreamSubscriber[F[_], A] private (
112112
private def run(block: => Unit): () => Unit = () => block
113113

114114
/** Runs a single step of the state machine. */
115-
private def step(in: Input): State => (State, () => Unit) =
116-
in match {
115+
private def step(input: Input): State => (State, () => Unit) =
116+
input match {
117117
case Subscribe(s) => {
118118
case Uninitialized(None) =>
119119
Idle(s) -> noop
@@ -263,9 +263,9 @@ private[flow] final class StreamSubscriber[F[_], A] private (
263263
* + `Error` & `Dequeue`: No matter the order in which they are processed, we will complete the callback with the error.
264264
* + cancellation & any other thing: Worst case, we will lose some data that we not longer care about; and eventually reach `Terminal`.
265265
*/
266-
private def nextState(in: Input): Unit = {
266+
private def nextState(input: Input): Unit = {
267267
val (_, effect) = currentState.updateAndGet { case (state, _) =>
268-
step(in)(state)
268+
step(input)(state)
269269
}
270270
// Only run the effect after the state update took place.
271271
effect()

core/shared/src/main/scala/fs2/interop/flow/StreamSubscription.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
3333

3434
/** Implementation of a [[Subscription]].
3535
*
36-
* This is used by the [[StreamUnicastPublisher]] to send elements from a [[Stream]] to a downstream reactive-streams system.
36+
* This is used by the [[StreamPublisher]] to send elements from a [[Stream]] to a downstream reactive-streams system.
3737
*
3838
* @see [[https://github.com/reactive-streams/reactive-streams-jvm#3-subscription-code]]
3939
*/
@@ -58,7 +58,7 @@ private[flow] final class StreamSubscription[F[_], A] private (
5858
sub.onComplete()
5959
}
6060

61-
private[flow] def run: F[Unit] = {
61+
val run: F[Unit] = {
6262
val subscriptionPipe: Pipe[F, A, A] = in => {
6363
def go(s: Stream[F, A]): Pull[F, A, Unit] =
6464
Pull.eval(F.delay(requests.get())).flatMap { n =>
@@ -133,14 +133,14 @@ private[flow] final class StreamSubscription[F[_], A] private (
133133
// then the request must be a NOOP.
134134
// See https://github.com/zainab-ali/fs2-reactive-streams/issues/29
135135
// and https://github.com/zainab-ali/fs2-reactive-streams/issues/46
136-
override def cancel(): Unit = {
136+
override final def cancel(): Unit = {
137137
val cancelCB = canceled.getAndSet(null)
138138
if (cancelCB ne null) {
139139
cancelCB.apply()
140140
}
141141
}
142142

143-
override def request(n: Long): Unit =
143+
override final def request(n: Long): Unit =
144144
// First, confirm we are not yet cancelled.
145145
if (canceled.get() ne null) {
146146
// Second, ensure we were requested a positive number of elements.

core/shared/src/main/scala/fs2/interop/flow/package.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ package object flow {
102102
subscriber.stream(subscribe(subscriber))
103103
}
104104

105-
/** Creates a [[Stream]] from an [[Publisher]].
105+
/** Creates a [[Stream]] from a [[Publisher]].
106106
*
107107
* @example {{{
108108
* scala> import cats.effect.IO
@@ -118,7 +118,7 @@ package object flow {
118118
* res0: Stream[IO, Int] = Stream(..)
119119
* }}}
120120
*
121-
* @note The publisher will not receive a subscriber until the stream is run.
121+
* @note The [[Publisher]] will not receive a [[Subscriber]] until the stream is run.
122122
*
123123
* @see the `toStream` extension method added to `Publisher`
124124
*

core/shared/src/main/scala/fs2/interop/flow/syntax.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import java.util.concurrent.Flow.{Publisher, Subscriber}
3232
object syntax {
3333
implicit final class PublisherOps[A](private val publisher: Publisher[A]) extends AnyVal {
3434

35-
/** Creates a [[Stream]] from an [[Publisher]].
35+
/** Creates a [[Stream]] from a [[Publisher]].
3636
*
3737
* @example {{{
3838
* scala> import cats.effect.IO
@@ -49,6 +49,8 @@ object syntax {
4949
* res0: Stream[IO, Int] = Stream(..)
5050
* }}}
5151
*
52+
* @note The [[Publisher]] will not receive a [[Subscriber]] until the stream is run.
53+
*
5254
* @param chunkSize setup the number of elements asked each time from the [[Publisher]].
5355
* A high number may be useful if the publisher is triggering from IO,
5456
* like requesting elements from a database.

0 commit comments

Comments
 (0)