Skip to content

Commit 1804b43

Browse files
committed
Add flow.unsafeToPublisher
1 parent b504e46 commit 1804b43

File tree

1 file changed

+23
-2
lines changed

1 file changed

+23
-2
lines changed

core/shared/src/main/scala/fs2/interop/flow/package.scala

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222
package fs2
2323
package interop
2424

25+
import cats.effect.IO
2526
import cats.effect.kernel.{Async, Resource}
27+
import cats.effect.unsafe.IORuntime
2628

2729
import java.util.concurrent.Flow.{Publisher, Subscriber, defaultBufferSize}
2830

@@ -137,9 +139,10 @@ package object flow {
137139
* Closing the [[Resource]] means gracefully shutting down all active subscriptions.
138140
* Thus, no more elements will be published.
139141
*
140-
* @note This Publisher can be reused for multiple Subscribers,
141-
* each subscription will re-run the [[Stream]] from the beginning.
142+
* @note This [[Publisher]] can be reused for multiple [[Subscribers]],
143+
* each [[Subscription]] will re-run the [[Stream]] from the beginning.
142144
*
145+
* @see [[unsafeToPublisher]] for an unsafe version that returns a plain [[Publisher]].
143146
* @see [[subscribeStream]] for a simpler version that only requires a [[Subscriber]].
144147
*
145148
* @param stream The [[Stream]] to transform.
@@ -151,6 +154,24 @@ package object flow {
151154
): Resource[F, Publisher[A]] =
152155
StreamPublisher(stream)
153156

157+
/** Creates a [[Publisher]] from a [[Stream]].
158+
*
159+
* The stream is only ran when elements are requested.
160+
*
161+
* @note This [[Publisher]] can be reused for multiple [[Subscribers]],
162+
* each [[Subscription]] will re-run the [[Stream]] from the beginning.
163+
*
164+
* @see [[toPublisher]] for a safe version that returns a [[Resource]].
165+
*
166+
* @param stream The [[Stream]] to transform.
167+
*/
168+
def unsafeToPublisher[A](
169+
stream: Stream[IO, A]
170+
)(implicit
171+
runtime: IORuntime
172+
): Publisher[A] =
173+
StreamPublisher.unsafe(stream)
174+
154175
/** Allows subscribing a [[Subscriber]] to a [[Stream]].
155176
*
156177
* The returned program will run until

0 commit comments

Comments
 (0)