Skip to content

Commit b5e91af

Browse files
committed
Wait for Dispatcher in StreamPublisher
1 parent 5c766b9 commit b5e91af

File tree

3 files changed

+9
-5
lines changed

3 files changed

+9
-5
lines changed

core/shared/src/main/scala/fs2/interop/flow/StreamPublisher.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,12 @@ private[flow] sealed abstract class StreamPublisher[F[_], A] private (
7272
private[flow] object StreamPublisher {
7373
private final class DispatcherStreamPublisher[F[_], A](
7474
stream: Stream[F, A],
75-
dispatcher: Dispatcher[F]
75+
startDispatcher: Dispatcher[F]
7676
)(implicit
7777
F: Async[F]
7878
) extends StreamPublisher[F, A](stream) {
7979
override protected final def runSubscription(subscribe: F[Unit]): Unit =
80-
dispatcher.unsafeRunAndForget(subscribe)
80+
startDispatcher.unsafeRunAndForget(subscribe)
8181
}
8282

8383
private final class IORuntimeStreamPublisher[A](
@@ -94,7 +94,7 @@ private[flow] object StreamPublisher {
9494
)(implicit
9595
F: Async[F]
9696
): Resource[F, StreamPublisher[F, A]] =
97-
Dispatcher.parallel[F](await = false).map { startDispatcher =>
97+
Dispatcher.parallel[F](await = true).map { startDispatcher =>
9898
new DispatcherStreamPublisher(stream, startDispatcher)
9999
}
100100

core/shared/src/main/scala/fs2/interop/flow/package.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,9 @@ package object flow {
136136
/** Creates a [[Publisher]] from a [[Stream]].
137137
*
138138
* The stream is only ran when elements are requested.
139-
* Closing the [[Resource]] means gracefully shutting down all active subscriptions.
139+
* Closing the [[Resource]] means not accepting new subscriptions,
140+
* but waiting for all active ones to finish consuming.
141+
* Canceling the [[Resource.use]] means gracefully shutting down all active subscriptions.
140142
* Thus, no more elements will be published.
141143
*
142144
* @note This [[Publisher]] can be reused for multiple [[Subscribers]],

core/shared/src/main/scala/fs2/interop/flow/syntax.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,9 @@ object syntax {
6767
/** Creates a [[Publisher]] from a [[Stream]].
6868
*
6969
* The stream is only ran when elements are requested.
70-
* Closing the [[Resource]] means gracefully shutting down all active subscriptions.
70+
* Closing the [[Resource]] means not accepting new subscriptions,
71+
* but waiting for all active ones to finish consuming.
72+
* Canceling the [[Resource.use]] means gracefully shutting down all active subscriptions.
7173
* Thus, no more elements will be published.
7274
*
7375
* @note This [[Publisher]] can be reused for multiple [[Subscribers]],

0 commit comments

Comments
 (0)