Skip to content

Commit bab1d0e

Browse files
authored
Merge branch 'main' into update/sbt-doctest-0.12.1
2 parents 07e825f + 179dc20 commit bab1d0e

File tree

3 files changed

+178
-42
lines changed

3 files changed

+178
-42
lines changed

core/shared/src/main/scala/fs2/Stream.scala

Lines changed: 69 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -2000,39 +2000,16 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
20002000
Stream.force(fstream)
20012001
}
20022002

2003-
/** Interleaves the two inputs nondeterministically. The output stream
2004-
* halts after BOTH `s1` and `s2` terminate normally, or in the event
2005-
* of an uncaught failure on either `s1` or `s2`. Has the property that
2006-
* `merge(Stream.empty, s) == s` and `merge(raiseError(e), s)` will
2007-
* eventually terminate with `raiseError(e)`, possibly after emitting some
2008-
* elements of `s` first.
2009-
*
2010-
* The implementation always tries to pull one chunk from each side
2011-
* before waiting for it to be consumed by resulting stream.
2012-
* As such, there may be up to two chunks (one from each stream)
2013-
* waiting to be processed while the resulting stream
2014-
* is processing elements.
2015-
*
2016-
* Also note that if either side produces empty chunk,
2017-
* the processing on that side continues,
2018-
* w/o downstream requiring to consume result.
2003+
/** Implementation of [[merge]], however allows specifying how to combine the output stream.
2004+
* This can be used to control how chunks are emitted downstream. See [[mergeAndAwaitDownstream]] for example.
20192005
*
2020-
* If either side does not emit anything (i.e. as result of drain) that side
2021-
* will continue to run even when the resulting stream did not ask for more data.
2022-
*
2023-
* Note that even when this is equivalent to `Stream(this, that).parJoinUnbounded`,
2024-
* this implementation is little more efficient
2025-
*
2026-
* @example {{{
2027-
* scala> import scala.concurrent.duration._, cats.effect.IO, cats.effect.unsafe.implicits.global
2028-
* scala> val s1 = Stream.awakeEvery[IO](500.millis).scan(0)((acc, _) => acc + 1)
2029-
* scala> val s = s1.merge(Stream.sleep_[IO](250.millis) ++ s1)
2030-
* scala> s.take(6).compile.toVector.unsafeRunSync()
2031-
* res0: Vector[Int] = Vector(0, 0, 1, 1, 2, 2)
2032-
* }}}
2006+
* @param f The function that combines the output stream and a finalizer for the chunk.
2007+
* This way we can controll when to pull pull next chunk from upstream.
20332008
*/
2034-
def merge[F2[x] >: F[x], O2 >: O](
2009+
private def merge_[F2[x] >: F[x], O2 >: O](
20352010
that: Stream[F2, O2]
2011+
)(
2012+
f: (Stream[F2, O2], F2[Unit]) => Stream[F2, O2]
20362013
)(implicit F: Concurrent[F2]): Stream[F2, O2] =
20372014
Stream.force {
20382015
// `State` describes the state of an upstream stream (`this` and `that` are both upstream streams)
@@ -2063,12 +2040,10 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
20632040
case (Some(r1), Some(r2)) => CompositeFailure.fromResults(r1, r2)
20642041
}
20652042
def run(s: Stream[F2, O2]): F2[Unit] =
2066-
// `guard` ensures we do not pull another chunk until the previous one has been consumed downstream.
2043+
// `guard` ensures we do not pull another chunk until the previous one has been produced for downstream.
20672044
Semaphore[F2](1).flatMap { guard =>
2068-
def sendChunk(chk: Chunk[O2]): F2[Unit] = {
2069-
val outStr = Stream.chunk(chk).onFinalize(guard.release)
2070-
output.send(outStr) >> guard.acquire
2071-
}
2045+
def sendChunk(chk: Chunk[O2]): F2[Unit] =
2046+
output.send(f(Stream.chunk(chk), guard.release)) >> guard.acquire
20722047

20732048
(Stream.exec(guard.acquire) ++ s.chunks.foreach(sendChunk))
20742049
// Stop when the other upstream has errored or the downstream has completed.
@@ -2103,6 +2078,65 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
21032078
}
21042079
}
21052080

2081+
/** Like [[merge]], but ensures that each chunk is fully consumed downstream before pulling the next chunk from the same side.
2082+
* This looses the equivalence with `Stream(this, that).parJoinUnbounded` but can be useful when we need to never read ahead from
2083+
* the merged streams.
2084+
*
2085+
* @note Pay attention to possible deadlocks of "this" or "that" when using this function, notably in parallel processing
2086+
* as unless the chunk is fully processed / scope of the chunk is released, the next chunk will not be pulled.
2087+
*
2088+
* @example {{{
2089+
* scala> import scala.concurrent.duration._, cats.effect.IO, cats.effect.unsafe.implicits.global
2090+
* scala> import cats.effect._
2091+
* scala> Ref.of[IO, Int](0).flatMap{ ref =>
2092+
* | fs2.Stream.never[IO].mergeAndAwaitDownstream(fs2.Stream.repeatEval(ref.get)).evalMap(value => {
2093+
* | IO.sleep(1.second) >> ref.set(value + 1) as value
2094+
* | }).take(6).compile.toVector
2095+
* | }.unsafeRunSync()
2096+
* res0: Vector[Int] = Vector(0, 1, 2, 3, 4, 5)
2097+
* }}}
2098+
*/
2099+
def mergeAndAwaitDownstream[F2[x] >: F[x], O2 >: O](
2100+
that: Stream[F2, O2]
2101+
)(implicit F: Concurrent[F2]): Stream[F2, O2] =
2102+
merge_(that) { case (s, fin) => s.onFinalize(fin) }
2103+
2104+
/** Interleaves the two inputs nondeterministically. The output stream
2105+
* halts after BOTH `s1` and `s2` terminate normally, or in the event
2106+
* of an uncaught failure on either `s1` or `s2`. Has the property that
2107+
* `merge(Stream.empty, s) == s` and `merge(raiseError(e), s)` will
2108+
* eventually terminate with `raiseError(e)`, possibly after emitting some
2109+
* elements of `s` first.
2110+
*
2111+
* The implementation always tries to pull one chunk from each side
2112+
* before waiting for it to be consumed by resulting stream.
2113+
* As such, there may be up to two chunks (one from each stream)
2114+
* waiting to be processed while the resulting stream
2115+
* is processing elements.
2116+
*
2117+
* Also note that if either side produces empty chunk,
2118+
* the processing on that side continues,
2119+
* w/o downstream requiring to consume result.
2120+
*
2121+
* If either side does not emit anything (i.e. as result of drain) that side
2122+
* will continue to run even when the resulting stream did not ask for more data.
2123+
*
2124+
* Note that even when this is equivalent to `Stream(this, that).parJoinUnbounded`,
2125+
* this implementation is little more efficient
2126+
*
2127+
* @example {{{
2128+
* scala> import scala.concurrent.duration._, cats.effect.IO, cats.effect.unsafe.implicits.global
2129+
* scala> val s1 = Stream.awakeEvery[IO](500.millis).scan(0)((acc, _) => acc + 1)
2130+
* scala> val s = s1.merge(Stream.sleep_[IO](250.millis) ++ s1)
2131+
* scala> s.take(6).compile.toVector.unsafeRunSync()
2132+
* res0: Vector[Int] = Vector(0, 0, 1, 1, 2, 2)
2133+
* }}}
2134+
*/
2135+
def merge[F2[x] >: F[x], O2 >: O](
2136+
that: Stream[F2, O2]
2137+
)(implicit F: Concurrent[F2]): Stream[F2, O2] =
2138+
merge_(that) { case (s, fin) => Stream.exec(fin) ++ s }
2139+
21062140
/** Like `merge`, but halts as soon as _either_ branch halts. */
21072141
def mergeHaltBoth[F2[x] >: F[x]: Concurrent, O2 >: O](
21082142
that: Stream[F2, O2]

core/shared/src/test/scala/fs2/StreamMergeSuite.scala

Lines changed: 96 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@
2121

2222
package fs2
2323

24-
import scala.concurrent.duration._
25-
24+
import scala.concurrent.duration.*
2625
import cats.effect.IO
2726
import cats.effect.kernel.{Deferred, Ref}
27+
import cats.effect.testkit.TestControl
2828
import org.scalacheck.effect.PropF.forAllF
2929

3030
class StreamMergeSuite extends Fs2Suite {
@@ -224,7 +224,7 @@ class StreamMergeSuite extends Fs2Suite {
224224
}
225225
}
226226

227-
test("merge not emit ahead") {
227+
test("merge not emit ahead more than 1 chunk") {
228228
forAllF { (v: Int) =>
229229
Ref
230230
.of[IO, Int](v)
@@ -236,9 +236,100 @@ class StreamMergeSuite extends Fs2Suite {
236236
.repeatEval(ref.get)
237237
.merge(Stream.never[IO])
238238
.evalMap(sleepAndSet)
239-
.take(2)
240-
.assertEmits(List(v, v + 1))
239+
.take(6)
240+
.assertEmits(List(v, v, v + 1, v + 1, v + 2, v + 2))
241+
}
242+
}
243+
}
244+
245+
test("mergeAndAwaitDownstream not emit ahead") {
246+
forAllF { (v: Int) =>
247+
Ref
248+
.of[IO, Int](v)
249+
.flatMap { ref =>
250+
def sleepAndSet(value: Int): IO[Int] =
251+
IO.sleep(100.milliseconds) >> ref.set(value + 1) >> IO(value)
252+
253+
Stream
254+
.repeatEval(ref.get)
255+
.mergeAndAwaitDownstream(Stream.never[IO])
256+
.evalMap(sleepAndSet)
257+
.take(3)
258+
.assertEmits(List(v, v + 1, v + 2))
241259
}
242260
}
243261
}
262+
263+
test("merge produces when concurrently handled") {
264+
265+
// Create stream for each int that comes in,
266+
// then run them in parallel
267+
// Where we return the int value and then wait (Simulating some work that never ends, or ends in long time.).
268+
def run(source: Stream[IO, Int]): IO[Vector[Int]] =
269+
source
270+
.map { a =>
271+
Stream.emit(a) ++
272+
Stream.never[IO]
273+
}
274+
.parJoinUnbounded
275+
.timeoutOnPullTo(200.millis, Stream.empty)
276+
.compile
277+
.toVector
278+
279+
TestControl
280+
.executeEmbed(
281+
run(
282+
(Stream.emit(1) ++ Stream.sleep_[IO](50.millis) ++ Stream.emit(2)).merge(
283+
Stream.never[IO]
284+
)
285+
)
286+
)
287+
.assertEquals(Vector(1, 2))
288+
}
289+
290+
test("issue #3598") {
291+
292+
sealed trait Data
293+
294+
case class Item(value: Int) extends Data
295+
case object Tick1 extends Data
296+
case object Tick2 extends Data
297+
298+
def splitHead[F[_], O](in: fs2.Stream[F, O]): fs2.Stream[F, (O, fs2.Stream[F, O])] =
299+
in.pull.uncons1.flatMap {
300+
case Some((head, tail)) => fs2.Pull.output(Chunk((head, tail)))
301+
case None => fs2.Pull.done
302+
}.stream
303+
304+
val source =
305+
Stream.emits(1 to 2).evalMap(i => IO(Item(i)).delayBy(100.millis)) ++ Stream.never[IO]
306+
307+
val timer = fs2.Stream.awakeEvery[IO](50.millis).map(_ => Tick1)
308+
val timer2 = fs2.Stream.awakeEvery[IO](50.millis).map(_ => Tick2)
309+
310+
val sources = timer2.mergeHaltBoth(source.mergeHaltBoth(timer))
311+
312+
val program =
313+
splitHead(sources)
314+
.flatMap { case (head, tail) =>
315+
splitHead(tail)
316+
.flatMap { case (head2, tail) =>
317+
Stream.emit(head) ++ Stream.emit(head2) ++ tail
318+
}
319+
.parEvalMap(3) { i =>
320+
IO(i)
321+
}
322+
}
323+
.interruptAfter(230.millis)
324+
.compile
325+
.toVector
326+
327+
TestControl
328+
.executeEmbed(program)
329+
.assert { data =>
330+
data.count(_.isInstanceOf[Item]) == 2 &&
331+
data.count(_.isInstanceOf[Tick1.type]) == 4 &&
332+
data.count(_.isInstanceOf[Tick2.type]) == 4
333+
}
334+
}
244335
}

core/shared/src/test/scala/fs2/TimedPullsSuite.scala

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -313,8 +313,19 @@ class TimedPullsSuite extends Fs2Suite {
313313
}
314314

315315
test("After the first uncons, timeouts start immediately") {
316+
// Time how often we generate data in the main stream.
317+
// This is only started after the first uncons.
316318
val emissionTime = 100.millis
317-
val timeout = 200.millis
319+
320+
// Timeout which is registered before the first uncons, it is registered immediately
321+
// But we do not expect it to trigger.
322+
// This has to be longer than emissionTime, otherwise the first uncons would always timeout.
323+
val initialTimeout = 200.millis
324+
325+
// Timeout registered after the first uncons, this one should be fired
326+
val timeout = 50.millis
327+
328+
// Time we wait before doing uncons.
318329
val timedPullPause = Pull.eval(IO.sleep(150.millis))
319330

320331
val prog =
@@ -323,7 +334,7 @@ class TimedPullsSuite extends Fs2Suite {
323334
.repeatN(2)
324335
.pull
325336
.timed { tp =>
326-
tp.timeout(timeout) >>
337+
tp.timeout(initialTimeout) >>
327338
// If the first timeout started immediately, this pause
328339
// before uncons would cause a timeout to be emitted
329340
timedPullPause >>

0 commit comments

Comments
 (0)