Skip to content

Commit a9e0747

Browse files
committed
Optimize StreamPublisher
1 parent b0e5710 commit a9e0747

File tree

1 file changed

+35
-16
lines changed

1 file changed

+35
-16
lines changed

core/shared/src/main/scala/fs2/interop/flow/StreamPublisher.scala

Lines changed: 35 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,14 @@ import scala.util.control.NoStackTrace
4141
*
4242
* @see [[https://github.com/reactive-streams/reactive-streams-jvm#1-publisher-code]]
4343
*/
44-
private[flow] final class StreamPublisher[F[_], A] private (
45-
stream: Stream[F, A],
46-
runSubscription: F[Unit] => Unit
47-
)(implicit F: Async[F])
48-
extends Publisher[A] {
49-
override def subscribe(subscriber: Subscriber[_ >: A]): Unit = {
44+
private[flow] sealed abstract class StreamPublisher[F[_], A] private (
45+
stream: Stream[F, A]
46+
)(implicit
47+
F: Async[F]
48+
) extends Publisher[A] {
49+
protected def runSubscription(subscribe: F[Unit]): Unit
50+
51+
override final def subscribe(subscriber: Subscriber[_ >: A]): Unit = {
5052
requireNonNull(
5153
subscriber,
5254
"The subscriber provided to subscribe must not be null"
@@ -67,23 +69,40 @@ private[flow] final class StreamPublisher[F[_], A] private (
6769
}
6870

6971
private[flow] object StreamPublisher {
72+
private final class DispatcherStreamPublisher[F[_], A](
73+
stream: Stream[F, A],
74+
dispatcher: Dispatcher[F]
75+
)(implicit
76+
F: Async[F]
77+
) extends StreamPublisher[F, A](stream) {
78+
override protected final def runSubscription(subscribe: F[Unit]): Unit =
79+
dispatcher.unsafeRunAndForget(subscribe)
80+
}
81+
82+
private final class IORuntimeStreamPublisher[A](
83+
stream: Stream[IO, A]
84+
)(implicit
85+
runtime: IORuntime
86+
) extends StreamPublisher[IO, A](stream) {
87+
override protected final def runSubscription(subscribe: IO[Unit]): Unit =
88+
subscribe.unsafeRunAndForget()
89+
}
90+
7091
def apply[F[_], A](
7192
stream: Stream[F, A]
72-
)(implicit F: Async[F]): Resource[F, StreamPublisher[F, A]] =
93+
)(implicit
94+
F: Async[F]
95+
): Resource[F, StreamPublisher[F, A]] =
7396
Dispatcher.parallel[F](await = false).map { startDispatcher =>
74-
new StreamPublisher(
75-
stream,
76-
runSubscription = startDispatcher.unsafeRunAndForget
77-
)
97+
new DispatcherStreamPublisher(stream, startDispatcher)
7898
}
7999

80100
def unsafe[A](
81101
stream: Stream[IO, A]
82-
)(implicit runtime: IORuntime): StreamPublisher[IO, A] =
83-
new StreamPublisher(
84-
stream,
85-
runSubscription = _.unsafeRunAndForget()
86-
)
102+
)(implicit
103+
runtime: IORuntime
104+
): StreamPublisher[IO, A] =
105+
new IORuntimeStreamPublisher(stream)
87106

88107
private object CanceledStreamPublisherException
89108
extends IllegalStateException(

0 commit comments

Comments
 (0)