From 810af8a435b975875605922e12d77d629e191414 Mon Sep 17 00:00:00 2001 From: Justin Reardon Date: Wed, 25 Dec 2024 23:20:37 -0500 Subject: [PATCH 1/7] Fix #3076 parEvalMap resource scoping Updates parEvalMap* and broadcastThrough to extend the resource scope past the channel/topic used to implement concurrency for these operators. --- core/shared/src/main/scala/fs2/Stream.scala | 87 +++++++++++-------- .../src/test/scala/fs2/ParEvalMapSuite.scala | 30 +++++++ 2 files changed, 83 insertions(+), 34 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index b70a1372ac..0b052f2c5f 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -238,31 +238,39 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, */ def broadcastThrough[F2[x] >: F[x]: Concurrent, O2](pipes: Pipe[F2, O, O2]*): Stream[F2, O2] = { assert(pipes.nonEmpty, s"pipes should not be empty") - Stream.force { - for { - // topic: contains the chunk that the pipes are processing at one point. - // until and unless all pipes are finished with it, won't move to next one - topic <- Topic[F2, Chunk[O]] - // Coordination: neither the producer nor any consumer starts - // until and unless all consumers are subscribed to topic. - allReady <- CountDownLatch[F2](pipes.length) - } yield { - val checkIn = allReady.release >> allReady.await - - def dump(pipe: Pipe[F2, O, O2]): Stream[F2, O2] = - Stream.resource(topic.subscribeAwait(1)).flatMap { sub => - // Wait until all pipes are ready before consuming. - // Crucial: checkin is not passed to the pipe, - // so pipe cannot interrupt it and alter the latch count - Stream.exec(checkIn) ++ pipe(sub.unchunks) - } + underlying.uncons.flatMap { + case Some((hd, tl)) => + for { + // topic: contains the chunk that the pipes are processing at one point. + // until and unless all pipes are finished with it, won't move to next one + topic <- Pull.eval(Topic[F2, Chunk[O]]) + // Coordination: neither the producer nor any consumer starts + // until and unless all consumers are subscribed to topic. + allReady <- Pull.eval(CountDownLatch[F2](pipes.length)) + + checkIn = allReady.release >> allReady.await + + dump = (pipe: Pipe[F2, O, O2]) => + Stream.resource(topic.subscribeAwait(1)).flatMap { sub => + // Wait until all pipes are ready before consuming. + // Crucial: checkin is not passed to the pipe, + // so pipe cannot interrupt it and alter the latch count + Stream.exec(checkIn) ++ pipe(sub.unchunks) + } - val dumpAll: Stream[F2, O2] = Stream(pipes: _*).map(dump).parJoinUnbounded - // Wait until all pipes are checked in before pulling - val pump = Stream.exec(allReady.await) ++ topic.publish(chunks) - dumpAll.concurrently(pump) - } - } + dumpAll: Stream[F2, O2] <- + Pull.extendScopeTo(Stream(pipes: _*).map(dump).parJoinUnbounded) + + chunksStream = Stream.chunk(hd).append(tl.stream).chunks + + // Wait until all pipes are checked in before pulling + pump = Stream.exec(allReady.await) ++ topic.publish(chunksStream) + + _ <- dumpAll.concurrently(pump).underlying + } yield () + + case None => Pull.done + }.stream } /** Behaves like the identity function, but requests `n` elements at a time from the input. @@ -2366,17 +2374,28 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, } } - val background = - Stream.exec(semaphore.acquire) ++ - interruptWhen(stop.get.map(_.asRight[Throwable])) - .foreach(forkOnElem) - .onFinalizeCase { - case ExitCase.Succeeded => releaseAndCheckCompletion - case _ => stop.complete(()) *> releaseAndCheckCompletion - } + underlying.uncons.flatMap { + case Some((hd, tl)) => + for { + foreground <- Pull.extendScopeTo( + channel.stream.evalMap(_.rethrow).onFinalize(stop.complete(()) *> end.get) + ) + background = Stream + .exec(semaphore.acquire) ++ + Stream + .chunk(hd) + .append(tl.stream) + .interruptWhen(stop.get.map(_.asRight[Throwable])) + .foreach(forkOnElem) + .onFinalizeCase { + case ExitCase.Succeeded => releaseAndCheckCompletion + case _ => stop.complete(()) *> releaseAndCheckCompletion + } + _ <- foreground.concurrently(background).underlying + } yield () - val foreground = channel.stream.evalMap(_.rethrow) - foreground.onFinalize(stop.complete(()) *> end.get).concurrently(background) + case None => Pull.done + }.stream } Stream.force(action) diff --git a/core/shared/src/test/scala/fs2/ParEvalMapSuite.scala b/core/shared/src/test/scala/fs2/ParEvalMapSuite.scala index d394969623..3588434175 100644 --- a/core/shared/src/test/scala/fs2/ParEvalMapSuite.scala +++ b/core/shared/src/test/scala/fs2/ParEvalMapSuite.scala @@ -293,4 +293,34 @@ class ParEvalMapSuite extends Fs2Suite { .timeout(2.seconds) } } + + group("issue-3076, parEvalMap* runs resource finaliser before usage") { + test("parEvalMap") { + Deferred[IO, Unit] + .flatMap { d => + Stream + .bracket(IO.unit)(_ => d.complete(()).void) + .parEvalMap(2)(_ => IO.sleep(1.second)) + .evalMap(_ => IO.sleep(1.second) >> d.complete(())) + .timeout(5.seconds) + .compile + .last + } + .assertEquals(Some(true)) + } + + test("broadcastThrough") { + Deferred[IO, Unit] + .flatMap { d => + Stream + .bracket(IO.unit)(_ => d.complete(()).void) + .broadcastThrough(identity[Stream[IO, Unit]]) + .evalMap(_ => IO.sleep(1.second) >> d.complete(())) + .timeout(5.seconds) + .compile + .last + } + .assertEquals(Some(true)) + } + } } From cc55983834c90e0fcb5cef6eb9d6f50fa382f64c Mon Sep 17 00:00:00 2001 From: Justin Reardon Date: Thu, 26 Dec 2024 09:04:49 -0500 Subject: [PATCH 2/7] Abstract scope extension with extendScopeThrough Make extendScopeTo cancellation safe (see #3474) --- core/shared/src/main/scala/fs2/Pull.scala | 5 +- core/shared/src/main/scala/fs2/Stream.scala | 166 +++++++++--------- .../scala/fs2/StreamCombinatorsSuite.scala | 14 ++ 3 files changed, 99 insertions(+), 86 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Pull.scala b/core/shared/src/main/scala/fs2/Pull.scala index c583263b15..6a241b8fb5 100644 --- a/core/shared/src/main/scala/fs2/Pull.scala +++ b/core/shared/src/main/scala/fs2/Pull.scala @@ -464,10 +464,7 @@ object Pull extends PullLowPriority { def extendScopeTo[F[_], O]( s: Stream[F, O] )(implicit F: MonadError[F, Throwable]): Pull[F, Nothing, Stream[F, O]] = - for { - scope <- Pull.getScope[F] - lease <- Pull.eval(scope.lease) - } yield s.onFinalize(lease.cancel.redeemWith(F.raiseError(_), _ => F.unit)) + Pull.getScope[F].map(scope => Stream.bracket(scope.lease)(_.cancel.rethrow) *> s) /** Repeatedly uses the output of the pull as input for the next step of the * pull. Halts when a step terminates with `None` or `Pull.raiseError`. diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 0b052f2c5f..f65d72e428 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -238,19 +238,19 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, */ def broadcastThrough[F2[x] >: F[x]: Concurrent, O2](pipes: Pipe[F2, O, O2]*): Stream[F2, O2] = { assert(pipes.nonEmpty, s"pipes should not be empty") - underlying.uncons.flatMap { - case Some((hd, tl)) => + extendScopeThrough { source => + Stream.force { for { // topic: contains the chunk that the pipes are processing at one point. // until and unless all pipes are finished with it, won't move to next one - topic <- Pull.eval(Topic[F2, Chunk[O]]) + topic <- Topic[F2, Chunk[O]] // Coordination: neither the producer nor any consumer starts // until and unless all consumers are subscribed to topic. - allReady <- Pull.eval(CountDownLatch[F2](pipes.length)) - - checkIn = allReady.release >> allReady.await + allReady <- CountDownLatch[F2](pipes.length) + } yield { + val checkIn = allReady.release >> allReady.await - dump = (pipe: Pipe[F2, O, O2]) => + def dump(pipe: Pipe[F2, O, O2]): Stream[F2, O2] = Stream.resource(topic.subscribeAwait(1)).flatMap { sub => // Wait until all pipes are ready before consuming. // Crucial: checkin is not passed to the pipe, @@ -258,19 +258,13 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, Stream.exec(checkIn) ++ pipe(sub.unchunks) } - dumpAll: Stream[F2, O2] <- - Pull.extendScopeTo(Stream(pipes: _*).map(dump).parJoinUnbounded) - - chunksStream = Stream.chunk(hd).append(tl.stream).chunks - + val dumpAll: Stream[F2, O2] = Stream(pipes: _*).map(dump).parJoinUnbounded // Wait until all pipes are checked in before pulling - pump = Stream.exec(allReady.await) ++ topic.publish(chunksStream) - - _ <- dumpAll.concurrently(pump).underlying - } yield () - - case None => Pull.done - }.stream + val pump = Stream.exec(allReady.await) ++ topic.publish(source.chunks) + dumpAll.concurrently(pump) + } + } + } } /** Behaves like the identity function, but requests `n` elements at a time from the input. @@ -2331,75 +2325,65 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, channel: F2[Channel[F2, F2[Either[Throwable, O2]]]], isOrdered: Boolean, f: O => F2[O2] - )(implicit F: Concurrent[F2]): Stream[F2, O2] = { - val action = - ( - Semaphore[F2](concurrency), - channel, - Deferred[F2, Unit], - Deferred[F2, Unit] - ).mapN { (semaphore, channel, stop, end) => - def initFork(release: F2[Unit]): F2[Either[Throwable, O2] => F2[Unit]] = { - def ordered: F2[Either[Throwable, O2] => F2[Unit]] = { - def send(v: Deferred[F2, Either[Throwable, O2]]) = - (el: Either[Throwable, O2]) => v.complete(el).void - - Deferred[F2, Either[Throwable, O2]] - .flatTap(value => channel.send(release *> value.get)) - .map(send) - } - - def unordered: Either[Throwable, O2] => F2[Unit] = - (el: Either[Throwable, O2]) => release <* channel.send(F.pure(el)) + )(implicit F: Concurrent[F2]): Stream[F2, O2] = + extendScopeThrough { source => + Stream.force { + ( + Semaphore[F2](concurrency), + channel, + Deferred[F2, Unit], + Deferred[F2, Unit] + ).mapN { (semaphore, channel, stop, end) => + def initFork(release: F2[Unit]): F2[Either[Throwable, O2] => F2[Unit]] = { + def ordered: F2[Either[Throwable, O2] => F2[Unit]] = { + def send(v: Deferred[F2, Either[Throwable, O2]]) = + (el: Either[Throwable, O2]) => v.complete(el).void + + Deferred[F2, Either[Throwable, O2]] + .flatTap(value => channel.send(release *> value.get)) + .map(send) + } - if (isOrdered) ordered else F.pure(unordered) - } + def unordered: Either[Throwable, O2] => F2[Unit] = + (el: Either[Throwable, O2]) => release <* channel.send(F.pure(el)) - val releaseAndCheckCompletion = - semaphore.release *> - semaphore.available.flatMap { - case `concurrency` => channel.close *> end.complete(()).void - case _ => F.unit - } + if (isOrdered) ordered else F.pure(unordered) + } - def forkOnElem(el: O): F2[Unit] = - F.uncancelable { poll => - poll(semaphore.acquire) <* - Deferred[F2, Unit].flatMap { pushed => - val init = initFork(pushed.complete(()).void) - poll(init).onCancel(releaseAndCheckCompletion).flatMap { send => - val action = F.catchNonFatal(f(el)).flatten.attempt.flatMap(send) *> pushed.get - F.start(stop.get.race(action) *> releaseAndCheckCompletion) - } + val releaseAndCheckCompletion = + semaphore.release *> + semaphore.available.flatMap { + case `concurrency` => channel.close *> end.complete(()).void + case _ => F.unit } - } - underlying.uncons.flatMap { - case Some((hd, tl)) => - for { - foreground <- Pull.extendScopeTo( - channel.stream.evalMap(_.rethrow).onFinalize(stop.complete(()) *> end.get) - ) - background = Stream - .exec(semaphore.acquire) ++ - Stream - .chunk(hd) - .append(tl.stream) - .interruptWhen(stop.get.map(_.asRight[Throwable])) - .foreach(forkOnElem) - .onFinalizeCase { - case ExitCase.Succeeded => releaseAndCheckCompletion - case _ => stop.complete(()) *> releaseAndCheckCompletion + def forkOnElem(el: O): F2[Unit] = + F.uncancelable { poll => + poll(semaphore.acquire) <* + Deferred[F2, Unit].flatMap { pushed => + val init = initFork(pushed.complete(()).void) + poll(init).onCancel(releaseAndCheckCompletion).flatMap { send => + val action = F.catchNonFatal(f(el)).flatten.attempt.flatMap(send) *> pushed.get + F.start(stop.get.race(action) *> releaseAndCheckCompletion) } - _ <- foreground.concurrently(background).underlying - } yield () + } + } - case None => Pull.done - }.stream - } + val background = + Stream.exec(semaphore.acquire) ++ + source + .interruptWhen(stop.get.map(_.asRight[Throwable])) + .foreach(forkOnElem) + .onFinalizeCase { + case ExitCase.Succeeded => releaseAndCheckCompletion + case _ => stop.complete(()) *> releaseAndCheckCompletion + } - Stream.force(action) - } + val foreground = channel.stream.evalMap(_.rethrow) + foreground.onFinalize(stop.complete(()) *> end.get).concurrently(background) + } + } + } /** Concurrent zip. * @@ -2474,12 +2458,13 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, */ def prefetchN[F2[x] >: F[x]: Concurrent]( n: Int - ): Stream[F2, O] = + ): Stream[F2, O] = extendScopeThrough { source => Stream.eval(Channel.bounded[F2, Chunk[O]](n)).flatMap { chan => chan.stream.unchunks.concurrently { - chunks.through(chan.sendAll) + source.chunks.through(chan.sendAll) } } + } /** Prints each element of this stream to standard out, converting each element to a `String` via `Show`. */ def printlns[F2[x] >: F[x], O2 >: O](implicit @@ -2940,6 +2925,23 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, )(f: (Stream[F, O], Stream[F2, O2]) => Stream[F2, O3]): Stream[F2, O3] = f(this, s2) + /** Transforms this stream, explicitly extending the current scope through the given pipe. + * + * Use this when implementing a pipe where the resulting stream is not directly constructed from + * the source stream, e.g. when sending the source stream through a Channel and returning the + * channel's stream. + */ + def extendScopeThrough[F2[x] >: F[x], O2]( + f: Stream[F, O] => Stream[F2, O2] + )(implicit F: MonadError[F2, Throwable]): Stream[F2, O2] = + this.pull.peek + .flatMap { + case Some((_, tl)) => Pull.extendScopeTo(f(tl)) + case None => Pull.extendScopeTo(f(Stream.empty)) + } + .flatMap(_.underlying) + .stream + /** Fails this stream with a `TimeoutException` if it does not complete within given `timeout`. */ def timeout[F2[x] >: F[x]: Temporal]( timeout: FiniteDuration diff --git a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala index d7cc21d93b..fe27fc5492 100644 --- a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala @@ -1328,6 +1328,20 @@ class StreamCombinatorsSuite extends Fs2Suite { ) .assertEquals(4.seconds) } + + test("scope propagation") { + Deferred[IO, Unit] + .flatMap { d => + Stream + .bracket(IO.unit)(_ => d.complete(()).void) + .prefetch + .evalMap(_ => IO.sleep(1.second) >> d.complete(())) + .timeout(5.seconds) + .compile + .last + } + .assertEquals(Some(true)) + } } test("range") { From c16682fa49b8acc1aca014efe369dc953ef93a53 Mon Sep 17 00:00:00 2001 From: Iurii Malchenko Date: Tue, 31 Dec 2024 16:31:33 +0100 Subject: [PATCH 3/7] extendScopeThrough - walk the Pulls and bind acquisitions to scope --- core/shared/src/main/scala/fs2/Pull.scala | 90 +++++++++++++++++++ core/shared/src/main/scala/fs2/Stream.scala | 26 +++--- .../scala/fs2/StreamCombinatorsSuite.scala | 16 ++++ 3 files changed, 121 insertions(+), 11 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Pull.scala b/core/shared/src/main/scala/fs2/Pull.scala index 6a241b8fb5..58cdeba603 100644 --- a/core/shared/src/main/scala/fs2/Pull.scala +++ b/core/shared/src/main/scala/fs2/Pull.scala @@ -295,6 +295,7 @@ sealed abstract class Pull[+F[_], +O, +R] { _ => this, (l, _) => Pull.eval(l.cancel).rethrow ) + } object Pull extends PullLowPriority { @@ -466,6 +467,95 @@ object Pull extends PullLowPriority { )(implicit F: MonadError[F, Throwable]): Pull[F, Nothing, Stream[F, O]] = Pull.getScope[F].map(scope => Stream.bracket(scope.lease)(_.cancel.rethrow) *> s) + private class ScopedBind[+F[_], +O, X, +R]( + bind: Bind[F, O, X, R], + scope: Scope[F] + )(implicit F: MonadThrow[F]) + extends Bind[F, O, X, R]( + bindAcquireToScope(bind.step, scope) + ) { + def cont(r: Terminal[X]): Pull[F, O, R] = + bindAcquireToScope[F, O, R](bind.cont(r), scope) + } + + private[fs2] def bindAcquireToScope[F[_], O, R]( + pull: Pull[F, O, R], + scope: Scope[F] + )(implicit F: MonadThrow[F]): Pull[F, O, R] = + pull match { + case p: Pull.FlatMapOutput[F, O, p] => + bindAcquireToScope(p.stream, scope).flatMapOutput(o => + bindAcquireToScope(p.fun(o), scope) + ) + case p: Pull.Acquire[F, r] => + println(s" - Acquire: $pull") + + Pull + .eval( + scope.acquireResource( + poll => poll(p.resource), + (r: r, e: Resource.ExitCase) => { + println("--- releasing") + p.release(r, e) + } + ) + ) + .flatMap { + case Outcome.Succeeded(Left(id)) => + Pull.raiseError(new RuntimeException(s"what to do with id? $id")) + case Outcome.Succeeded(Right(r)) => + println(s"--- acquired: $r") + Pull.pure(r) + case Outcome.Errored(e) => Pull.raiseError(e) + case Outcome.Canceled() => Pull.raiseError(new InterruptedException) + } + + case p: Pull.Bind[F, O, x, R] => + println(s" - Bind: $pull") + new ScopedBind(p, scope) + case p: Pull.InScope[F, O] => + println(s" - InScope: $pull") + Pull.InScope(bindAcquireToScope(p.stream, scope), p.useInterruption) + case p: Pull.StepLeg[F, O] => + println(s" - StepLeg: $pull") + Pull.StepLeg( + bindAcquireToScope(p.stream, scope), + p.scope + ) + case p: Pull.Uncons[F, ?] => + println(s" - Uncons: $pull") + Pull.Uncons( + bindAcquireToScope(p.stream, scope) + ) + case p: Pull.Translate[g, F, O] => + println(s" - Translate: $pull") + p + case p: Pull.InterruptWhen[F] => + println(s" - InterruptWhen: $pull") + p + case p: Pull.CloseScope => + println(s" - CloseScope: $pull") + p + case p: Pull.GetScope[F] => + println(s" - GetScope: $pull") + p + case p: Pull.Eval[F, ?] => + println(s" - Eval: $pull") + p + case p: Pull.Fail => + println(s" - Fail: $pull") + p + case p: Pull.Succeeded[?] => + println(s" - Succeeded: $pull") + p + case p: Pull.Interrupted => + println(s" - Interrupted: $pull") + p + case p: Pull.Output[?] => + println(s" - Output: $pull") + p + } + /** Repeatedly uses the output of the pull as input for the next step of the * pull. Halts when a step terminates with `None` or `Pull.raiseError`. */ diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index f65d72e428..1e3ce90798 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -238,7 +238,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, */ def broadcastThrough[F2[x] >: F[x]: Concurrent, O2](pipes: Pipe[F2, O, O2]*): Stream[F2, O2] = { assert(pipes.nonEmpty, s"pipes should not be empty") - extendScopeThrough { source => + extendScopeThrough[F2, O2] { source => Stream.force { for { // topic: contains the chunk that the pipes are processing at one point. @@ -2326,7 +2326,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, isOrdered: Boolean, f: O => F2[O2] )(implicit F: Concurrent[F2]): Stream[F2, O2] = - extendScopeThrough { source => + extendScopeThrough[F2, O2] { source => Stream.force { ( Semaphore[F2](concurrency), @@ -2458,7 +2458,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, */ def prefetchN[F2[x] >: F[x]: Concurrent]( n: Int - ): Stream[F2, O] = extendScopeThrough { source => + ): Stream[F2, O] = extendScopeThrough[F2, O] { source => Stream.eval(Channel.bounded[F2, Chunk[O]](n)).flatMap { chan => chan.stream.unchunks.concurrently { source.chunks.through(chan.sendAll) @@ -2932,15 +2932,19 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, * channel's stream. */ def extendScopeThrough[F2[x] >: F[x], O2]( - f: Stream[F, O] => Stream[F2, O2] + f: Stream[F2, O] => Stream[F2, O2] )(implicit F: MonadError[F2, Throwable]): Stream[F2, O2] = - this.pull.peek - .flatMap { - case Some((_, tl)) => Pull.extendScopeTo(f(tl)) - case None => Pull.extendScopeTo(f(Stream.empty)) - } - .flatMap(_.underlying) - .stream + this.pull.peek.flatMap { + case Some((_, stream)) => + Pull + .getScope[F2] + .flatMap { scope => + f( + Pull.bindAcquireToScope(stream.underlying, scope).stream + ).underlying + } + case None => Pull.done + }.stream /** Fails this stream with a `TimeoutException` if it does not complete within given `timeout`. */ def timeout[F2[x] >: F[x]: Temporal]( diff --git a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala index fe27fc5492..0968d66fb1 100644 --- a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala @@ -1342,6 +1342,22 @@ class StreamCombinatorsSuite extends Fs2Suite { } .assertEquals(Some(true)) } + + test("scope propagation, multiple pulls") { + Stream(1, 2, 3, 4, 5, 6) + .flatMap(i => Stream.bracket(Deferred[IO, Int])(_.complete(i).void)) + .parEvalMap(2)(d => IO.sleep(1.second).as(d)) + .parEvalMap(2)(d => IO.sleep(1.second).as(d)) + .parEvalMap(2)(d => IO.sleep(1.second) >> d.complete(0)) + .evalMap(completed => + IO.raiseUnless(completed)(new RuntimeException("resource released prematurely")) + ) + .timeout(15.seconds) + .compile + .last + .assertEquals(Some(())) + } + } test("range") { From 8a12409fe99b5090c064b453ebc756d5c81c3255 Mon Sep 17 00:00:00 2001 From: Iurii Malchenko Date: Tue, 31 Dec 2024 17:06:20 +0100 Subject: [PATCH 4/7] remove printlns --- core/shared/src/main/scala/fs2/Pull.scala | 70 +++++++---------------- 1 file changed, 20 insertions(+), 50 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Pull.scala b/core/shared/src/main/scala/fs2/Pull.scala index 58cdeba603..af0d6a4912 100644 --- a/core/shared/src/main/scala/fs2/Pull.scala +++ b/core/shared/src/main/scala/fs2/Pull.scala @@ -483,77 +483,47 @@ object Pull extends PullLowPriority { scope: Scope[F] )(implicit F: MonadThrow[F]): Pull[F, O, R] = pull match { - case p: Pull.FlatMapOutput[F, O, p] => - bindAcquireToScope(p.stream, scope).flatMapOutput(o => - bindAcquireToScope(p.fun(o), scope) - ) - case p: Pull.Acquire[F, r] => - println(s" - Acquire: $pull") - + case p: Pull.FlatMapOutput[?, ?, ?] => + bindAcquireToScope(p.stream, scope).flatMapOutput(o => bindAcquireToScope(p.fun(o), scope)) + case p: Pull.Acquire[?, ?] => Pull .eval( scope.acquireResource( poll => poll(p.resource), - (r: r, e: Resource.ExitCase) => { - println("--- releasing") - p.release(r, e) - } + p.release ) ) .flatMap { case Outcome.Succeeded(Left(id)) => Pull.raiseError(new RuntimeException(s"what to do with id? $id")) case Outcome.Succeeded(Right(r)) => - println(s"--- acquired: $r") Pull.pure(r) case Outcome.Errored(e) => Pull.raiseError(e) case Outcome.Canceled() => Pull.raiseError(new InterruptedException) } - case p: Pull.Bind[F, O, x, R] => - println(s" - Bind: $pull") + case p: Pull.Bind[?, ?, ?, ?] => new ScopedBind(p, scope) - case p: Pull.InScope[F, O] => - println(s" - InScope: $pull") + case p: Pull.InScope[?, ?] => Pull.InScope(bindAcquireToScope(p.stream, scope), p.useInterruption) - case p: Pull.StepLeg[F, O] => - println(s" - StepLeg: $pull") + case Pull.StepLeg(stream, stepScope) => Pull.StepLeg( - bindAcquireToScope(p.stream, scope), - p.scope + bindAcquireToScope(stream.asInstanceOf[Pull[F, O, Unit]], scope), + stepScope ) - case p: Pull.Uncons[F, ?] => - println(s" - Uncons: $pull") + case Pull.Uncons(stream) => Pull.Uncons( - bindAcquireToScope(p.stream, scope) + bindAcquireToScope(stream.asInstanceOf[Pull[F, O, Unit]], scope) ) - case p: Pull.Translate[g, F, O] => - println(s" - Translate: $pull") - p - case p: Pull.InterruptWhen[F] => - println(s" - InterruptWhen: $pull") - p - case p: Pull.CloseScope => - println(s" - CloseScope: $pull") - p - case p: Pull.GetScope[F] => - println(s" - GetScope: $pull") - p - case p: Pull.Eval[F, ?] => - println(s" - Eval: $pull") - p - case p: Pull.Fail => - println(s" - Fail: $pull") - p - case p: Pull.Succeeded[?] => - println(s" - Succeeded: $pull") - p - case p: Pull.Interrupted => - println(s" - Interrupted: $pull") - p - case p: Pull.Output[?] => - println(s" - Output: $pull") - p + case p: Pull.Translate[?, ?, ?] => p + case p: Pull.InterruptWhen[?] => p + case p: Pull.CloseScope => p + case p: Pull.GetScope[?] => p + case p: Pull.Eval[?, ?] => p + case p: Pull.Fail => p + case p: Pull.Succeeded[?] => p + case p: Pull.Interrupted => p + case p: Pull.Output[?] => p } /** Repeatedly uses the output of the pull as input for the next step of the From 8c3a6ac89f053800b97926a76a14a8fac2c2279a Mon Sep 17 00:00:00 2001 From: Iurii Malchenko Date: Tue, 31 Dec 2024 17:43:37 +0100 Subject: [PATCH 5/7] fix scalac warnings --- core/shared/src/main/scala/fs2/Pull.scala | 45 ++++++++++++----------- 1 file changed, 24 insertions(+), 21 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Pull.scala b/core/shared/src/main/scala/fs2/Pull.scala index af0d6a4912..21e6f888b2 100644 --- a/core/shared/src/main/scala/fs2/Pull.scala +++ b/core/shared/src/main/scala/fs2/Pull.scala @@ -483,13 +483,15 @@ object Pull extends PullLowPriority { scope: Scope[F] )(implicit F: MonadThrow[F]): Pull[F, O, R] = pull match { - case p: Pull.FlatMapOutput[?, ?, ?] => + case p: Pull.FlatMapOutput[F, O, p] @unchecked => bindAcquireToScope(p.stream, scope).flatMapOutput(o => bindAcquireToScope(p.fun(o), scope)) - case p: Pull.Acquire[?, ?] => + case p: Pull.Acquire[F, r] @unchecked => Pull .eval( scope.acquireResource( - poll => poll(p.resource), + poll => + if (p.cancelable) poll(p.resource) + else p.resource, p.release ) ) @@ -501,29 +503,30 @@ object Pull extends PullLowPriority { case Outcome.Errored(e) => Pull.raiseError(e) case Outcome.Canceled() => Pull.raiseError(new InterruptedException) } - - case p: Pull.Bind[?, ?, ?, ?] => - new ScopedBind(p, scope) - case p: Pull.InScope[?, ?] => + case p: Pull.Bind[F, O, x, R] @unchecked => + new ScopedBind[F, O, x, R](p, scope) + case p: Pull.InScope[F, O] @unchecked => Pull.InScope(bindAcquireToScope(p.stream, scope), p.useInterruption) - case Pull.StepLeg(stream, stepScope) => + case p: Pull.StepLeg[F, O] @unchecked => Pull.StepLeg( - bindAcquireToScope(stream.asInstanceOf[Pull[F, O, Unit]], scope), - stepScope + bindAcquireToScope(p.stream, scope), + p.scope ) - case Pull.Uncons(stream) => + case p: Pull.Uncons[F, O] @unchecked => Pull.Uncons( - bindAcquireToScope(stream.asInstanceOf[Pull[F, O, Unit]], scope) + bindAcquireToScope(p.stream, scope) ) - case p: Pull.Translate[?, ?, ?] => p - case p: Pull.InterruptWhen[?] => p - case p: Pull.CloseScope => p - case p: Pull.GetScope[?] => p - case p: Pull.Eval[?, ?] => p - case p: Pull.Fail => p - case p: Pull.Succeeded[?] => p - case p: Pull.Interrupted => p - case p: Pull.Output[?] => p + case p: Pull.AlgEffect[F, R] @unchecked => + p // workaround for Scala 3 'Pull.CloseScope case is unreachable' + case p: Pull.Translate[g, F, O] @unchecked => p +// case p: Pull.InterruptWhen[?] => p +// case p: Pull.CloseScope => p +// case p: Pull.GetScope[?] => p +// case p: Pull.Eval[?, ?] => p + case p: Pull.Fail => p + case p: Pull.Succeeded[R] @unchecked => p + case p: Pull.Interrupted => p + case p: Pull.Output[O] @unchecked => p } /** Repeatedly uses the output of the pull as input for the next step of the From 72a7724f9bdca2c815e40458c78d9865ff88e954 Mon Sep 17 00:00:00 2001 From: Iurii Malchenko Date: Tue, 31 Dec 2024 19:09:37 +0100 Subject: [PATCH 6/7] fix scala 2.12 crash --- core/shared/src/main/scala/fs2/Pull.scala | 27 ++++++++++++----------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Pull.scala b/core/shared/src/main/scala/fs2/Pull.scala index 21e6f888b2..bc433adce8 100644 --- a/core/shared/src/main/scala/fs2/Pull.scala +++ b/core/shared/src/main/scala/fs2/Pull.scala @@ -483,7 +483,7 @@ object Pull extends PullLowPriority { scope: Scope[F] )(implicit F: MonadThrow[F]): Pull[F, O, R] = pull match { - case p: Pull.FlatMapOutput[F, O, p] @unchecked => + case p: Pull.FlatMapOutput[?, ?, ?] => bindAcquireToScope(p.stream, scope).flatMapOutput(o => bindAcquireToScope(p.fun(o), scope)) case p: Pull.Acquire[F, r] @unchecked => Pull @@ -505,7 +505,7 @@ object Pull extends PullLowPriority { } case p: Pull.Bind[F, O, x, R] @unchecked => new ScopedBind[F, O, x, R](p, scope) - case p: Pull.InScope[F, O] @unchecked => + case p: Pull.InScope[?, ?] => Pull.InScope(bindAcquireToScope(p.stream, scope), p.useInterruption) case p: Pull.StepLeg[F, O] @unchecked => Pull.StepLeg( @@ -516,17 +516,18 @@ object Pull extends PullLowPriority { Pull.Uncons( bindAcquireToScope(p.stream, scope) ) - case p: Pull.AlgEffect[F, R] @unchecked => - p // workaround for Scala 3 'Pull.CloseScope case is unreachable' - case p: Pull.Translate[g, F, O] @unchecked => p -// case p: Pull.InterruptWhen[?] => p -// case p: Pull.CloseScope => p -// case p: Pull.GetScope[?] => p -// case p: Pull.Eval[?, ?] => p - case p: Pull.Fail => p - case p: Pull.Succeeded[R] @unchecked => p - case p: Pull.Interrupted => p - case p: Pull.Output[O] @unchecked => p + case other => other +// case p: Pull.AlgEffect[F, R] @unchecked => +// p // workaround for Scala 3 'Pull.CloseScope case is unreachable' +// case p: Pull.Translate[g, F, O] @unchecked => p +//// case p: Pull.InterruptWhen[?] => p +//// case p: Pull.CloseScope => p +//// case p: Pull.GetScope[?] => p +//// case p: Pull.Eval[?, ?] => p +// case p: Pull.Fail => p +// case p: Pull.Succeeded[R] @unchecked => p +// case p: Pull.Interrupted => p +// case p: Pull.Output[O] @unchecked => p } /** Repeatedly uses the output of the pull as input for the next step of the From d8e160ef47532b4f6bc78939b96bf98c13d7d8c4 Mon Sep 17 00:00:00 2001 From: Iurii Malchenko Date: Tue, 31 Dec 2024 19:13:02 +0100 Subject: [PATCH 7/7] scalafmt --- core/shared/src/main/scala/fs2/Pull.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/shared/src/main/scala/fs2/Pull.scala b/core/shared/src/main/scala/fs2/Pull.scala index bc433adce8..3cbe70d112 100644 --- a/core/shared/src/main/scala/fs2/Pull.scala +++ b/core/shared/src/main/scala/fs2/Pull.scala @@ -505,7 +505,7 @@ object Pull extends PullLowPriority { } case p: Pull.Bind[F, O, x, R] @unchecked => new ScopedBind[F, O, x, R](p, scope) - case p: Pull.InScope[?, ?] => + case p: Pull.InScope[?, ?] => Pull.InScope(bindAcquireToScope(p.stream, scope), p.useInterruption) case p: Pull.StepLeg[F, O] @unchecked => Pull.StepLeg(