@@ -23,7 +23,9 @@ package fs2
2323package interop
2424package flow
2525
26+ import cats .effect .IO
2627import cats .effect .kernel .{Async , Resource }
28+ import cats .effect .unsafe .IORuntime
2729
2830import java .util .concurrent .Flow .{Publisher , Subscriber }
2931
@@ -66,9 +68,10 @@ object syntax {
6668 * Closing the [[Resource ]] means gracefully shutting down all active subscriptions.
6769 * Thus, no more elements will be published.
6870 *
69- * @note This Publisher can be reused for multiple Subscribers,
70- * each subscription will re-run the [[Stream ]] from the beginning.
71+ * @note This [[ Publisher ]] can be reused for multiple [[ Subscribers ]] ,
72+ * each [[ Subscription ]] will re-run the [[Stream ]] from the beginning.
7173 *
74+ * @see [[unsafeToPublisher ]] for an unsafe version that returns a plain [[Publisher ]].
7275 * @see [[subscribe ]] for a simpler version that only requires a [[Subscriber ]].
7376 */
7477 def toPublisher (implicit F : Async [F ]): Resource [F , Publisher [A ]] =
@@ -86,6 +89,25 @@ object syntax {
8689 flow.subscribeStream(stream, subscriber)
8790 }
8891
92+ implicit final class StreamIOOps [A ](private val stream : Stream [IO , A ]) extends AnyVal {
93+
94+ /** Creates a [[Publisher ]] from a [[Stream ]].
95+ *
96+ * The stream is only ran when elements are requested.
97+ *
98+ * @note This [[Publisher ]] can be reused for multiple [[Subscribers ]],
99+ * each [[Subscription ]] will re-run the [[Stream ]] from the beginning.
100+ *
101+ * @see [[toPublisher ]] for a safe version that returns a [[Resource ]].
102+ *
103+ * @param stream The [[Stream ]] to transform.
104+ */
105+ def unsafeToPublisher ()(implicit
106+ runtime : IORuntime
107+ ): Publisher [A ] =
108+ flow.unsafeToPublisher(stream)
109+ }
110+
89111 final class FromPublisherPartiallyApplied [F [_]](private val dummy : Boolean ) extends AnyVal {
90112 def apply [A ](
91113 publisher : Publisher [A ],
0 commit comments