Skip to content

Commit b504e46

Browse files
committed
Add StreamPublisher.unsafe
1 parent c59ed56 commit b504e46

File tree

1 file changed

+14
-1
lines changed

1 file changed

+14
-1
lines changed

core/shared/src/main/scala/fs2/interop/flow/StreamPublisher.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@ package fs2
2323
package interop
2424
package flow
2525

26+
import cats.effect.IO
2627
import cats.effect.kernel.{Async, Resource}
2728
import cats.effect.std.Dispatcher
29+
import cats.effect.unsafe.IORuntime
2830

2931
import java.util.Objects.requireNonNull
3032
import java.util.concurrent.Flow.{Publisher, Subscriber, Subscription}
@@ -69,9 +71,20 @@ private[flow] object StreamPublisher {
6971
stream: Stream[F, A]
7072
)(implicit F: Async[F]): Resource[F, StreamPublisher[F, A]] =
7173
Dispatcher.parallel[F](await = false).map { startDispatcher =>
72-
new StreamPublisher(stream, startDispatcher.unsafeRunAndForget)
74+
new StreamPublisher(
75+
stream,
76+
runSubscription = startDispatcher.unsafeRunAndForget
77+
)
7378
}
7479

80+
def unsafe[A](
81+
stream: Stream[IO, A]
82+
)(implicit runtime: IORuntime): StreamPublisher[IO, A] =
83+
new StreamPublisher(
84+
stream,
85+
runSubscription = _.unsafeRunAndForget()
86+
)
87+
7588
private object CanceledStreamPublisherException
7689
extends IllegalStateException(
7790
"This StreamPublisher is not longer accepting subscribers"

0 commit comments

Comments
 (0)