From 345f90228675415c456967eca0a30d718350b94f Mon Sep 17 00:00:00 2001 From: Lucas Satabin Date: Tue, 1 Apr 2025 19:27:25 +0200 Subject: [PATCH] Document new interop with reactive streams The documentation still mentions only the deprecated reactive streams interop features. Now interop with Java Flow can be done directly on the `Stream` class and companion object. This should be reflected in the documentation. --- site/guide.md | 53 +++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 41 insertions(+), 12 deletions(-) diff --git a/site/guide.md b/site/guide.md index eabaf14dd9..771ce18ab6 100644 --- a/site/guide.md +++ b/site/guide.md @@ -21,7 +21,7 @@ This is the official FS2 guide. It gives an overview of the library and its feat * [Exercises (concurrency)](#exercises-concurrency) * [Interruption](#interruption) * [Talking to the external world](#talking-to-the-external-world) -* [Reactive streams](#reactive-streams) +* [Java Flow and reactive streams](#java-flow-and-reactive-streams) * [Learning more](#learning-more) * [Appendixes](#appendixes) @@ -782,43 +782,72 @@ def rows[F[_]](h: CSVHandle)(implicit F: Async[F]): Stream[F,Row] = { See [`Queue`](https://github.com/typelevel/cats-effect/blob/series/3.x/std/shared/src/main/scala/cats/effect/std/Queue.scala) for more useful methods. Most concurrent queues in cats effect support tracking their size, which is handy for implementing size-based throttling of the producer. -### Reactive streams +### Java Flow and reactive streams -The [reactive streams initiative](http://www.reactive-streams.org/) is complicated, mutable and unsafe - it is not something that is desired for use over fs2. +The [reactive streams initiative](http://www.reactive-streams.org/) and its successor `java.util.concurrent.Flow` are complicated, mutable and unsafe - it is not something that is desired for use over fs2. But there are times when we need use fs2 in conjunction with a different streaming library, and this is where reactive streams shines. -Any reactive streams system can interoperate with any other reactive streams system by exposing an `org.reactivestreams.Publisher` or an `org.reactivestreams.Subscriber`. +Any reactive streams system can interoperate with any other reactive streams system by exposing a `Publisher` or a `Subscriber`. -The `reactive-streams` library provides instances of reactive streams compliant publishers and subscribers to ease interoperability with other streaming libraries. +FS2 provides instances of reactive streams compliant publishers and subscribers to ease interoperability with other streaming libraries. -#### Usage +#### Usage with Java Flow You may require the following imports: ```scala mdoc:reset import fs2._ -import fs2.interop.reactivestreams._ import cats.effect.{IO, Resource} +import java.util.concurrent.Flow.Publisher +``` + +To convert a `Stream` into a downstream `java.util.concurrent.Flow.Publisher`: + +```scala mdoc +val stream = Stream(1, 2, 3).covary[IO] +stream.toPublisherResource +``` + +To convert an upstream `java.util.concurrent.Publisher` into a `Stream`: + +```scala mdoc +val publisher: Resource[IO, Publisher[Int]] = Stream(1, 2, 3).covary[IO].toPublisherResource +publisher.use { p => + Stream.fromPublisher[IO](p, chunkSize = 10).compile.toList +} +``` + +#### Usage with reactive streams + +If your are integrating a library that is still using `org.reactivestreams.Publisher`, you can use `org.reactivestreams.FlowAdapters` to convert to Java Flow. + +You may require the following imports: + +```scala mdoc:reset +import fs2._ +import cats.effect.{IO, Resource} +import org.reactivestreams.{FlowAdapters, Publisher} ``` To convert a `Stream` into a downstream unicast `org.reactivestreams.Publisher`: ```scala mdoc val stream = Stream(1, 2, 3).covary[IO] -stream.toUnicastPublisher +stream.toPublisherResource.map(FlowAdapters.toPublisher[Int]) ``` To convert an upstream `org.reactivestreams.Publisher` into a `Stream`: ```scala mdoc -val publisher: Resource[IO, StreamUnicastPublisher[IO, Int]] = Stream(1, 2, 3).covary[IO].toUnicastPublisher +val publisher: Resource[IO, Publisher[Int]] = Stream(1, 2, 3).covary[IO].toPublisherResource.map(FlowAdapters.toPublisher[Int]) publisher.use { p => - p.toStream[IO].compile.toList + Stream + .fromPublisher[IO](FlowAdapters.toFlowPublisher(p), chunkSize = 10) + .compile + .toList } ``` -A unicast publisher must have a single subscriber only. - ### Learning more Want to learn more?