Skip to content

Commit c59ed56

Browse files
committed
Generalize the runSubscription mechanism of StreamPublisher
1 parent 53ba75c commit c59ed56

File tree

1 file changed

+3
-3
lines changed

1 file changed

+3
-3
lines changed

core/shared/src/main/scala/fs2/interop/flow/StreamPublisher.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ import scala.util.control.NoStackTrace
4141
*/
4242
private[flow] final class StreamPublisher[F[_], A] private (
4343
stream: Stream[F, A],
44-
startDispatcher: Dispatcher[F]
44+
runSubscription: F[Unit] => Unit
4545
)(implicit F: Async[F])
4646
extends Publisher[A] {
4747
override def subscribe(subscriber: Subscriber[_ >: A]): Unit = {
@@ -50,7 +50,7 @@ private[flow] final class StreamPublisher[F[_], A] private (
5050
"The subscriber provided to subscribe must not be null"
5151
)
5252
try
53-
startDispatcher.unsafeRunAndForget(
53+
runSubscription(
5454
StreamSubscription.subscribe(stream, subscriber)
5555
)
5656
catch {
@@ -69,7 +69,7 @@ private[flow] object StreamPublisher {
6969
stream: Stream[F, A]
7070
)(implicit F: Async[F]): Resource[F, StreamPublisher[F, A]] =
7171
Dispatcher.parallel[F](await = false).map { startDispatcher =>
72-
new StreamPublisher(stream, startDispatcher)
72+
new StreamPublisher(stream, startDispatcher.unsafeRunAndForget)
7373
}
7474

7575
private object CanceledStreamPublisherException

0 commit comments

Comments
 (0)