From bb5b410ff820b65f7a44133a4c9d4d50e0c96ccb Mon Sep 17 00:00:00 2001 From: tothpeti Date: Thu, 14 Sep 2023 09:07:18 +0200 Subject: [PATCH 01/11] Investigating --- core/shared/src/main/scala/fs2/Stream.scala | 38 +++++++++++++++---- .../src/test/scala/fs2/StreamSuite.scala | 9 +++++ 2 files changed, 39 insertions(+), 8 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 81c35651a1..7433a2183f 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -2251,8 +2251,10 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, val releaseAndCheckCompletion = semaphore.release *> semaphore.available.flatMap { - case `concurrency` => channel.close *> end.complete(()).void - case _ => F.unit + case `concurrency` => + println("DEBUG: inside releaseAndCheckCompletion") + channel.close.void // *> end.complete(()).void + case _ => F.unit } def forkOnElem(el: O): F2[Unit] = @@ -2260,7 +2262,9 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, poll(semaphore.acquire) <* Deferred[F2, Unit].flatMap { pushed => val init = initFork(pushed.complete(()).void) + println("DEBUG: Inside forkOnElem") poll(init).onCancel(releaseAndCheckCompletion).flatMap { send => + println("DEBUG: Inside forkOnElem and inside onCancel") val action = F.catchNonFatal(f(el)).flatten.attempt.flatMap(send) *> pushed.get F.start(stop.get.race(action) *> releaseAndCheckCompletion) } @@ -2272,12 +2276,26 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, interruptWhen(stop.get.map(_.asRight[Throwable])) .foreach(forkOnElem) .onFinalizeCase { - case ExitCase.Succeeded => releaseAndCheckCompletion - case _ => stop.complete(()) *> releaseAndCheckCompletion + case ExitCase.Succeeded => + println("DEBUG: inside background SUCCESS case") + releaseAndCheckCompletion + case _ => + println("DEBUG: inside background OTHER case") + stop.complete(()) *> releaseAndCheckCompletion } - val foreground = channel.stream.evalMap(_.rethrow) - foreground.onFinalize(stop.complete(()) *> end.get).concurrently(background) + val foreground = channel.stream + .evalTap { x => + println("DEBUG: Inside foreground evalTap") + x + } + .evalMap(_.rethrow) + foreground + .concurrently(background) + .onFinalize { + println("DEBUG: Inside the foreground's onFinalize"); + stop.complete(()) *> end.complete(()) *> end.get + } } Stream.force(action) @@ -3843,15 +3861,19 @@ object Stream extends StreamLowPriority { def resourceWeak[F[_], O](r: Resource[F, O])(implicit F: MonadCancel[F, _]): Stream[F, O] = r match { case Resource.Allocate(resource) => + println("DEBUG: inside resourceWeak Allocate case") Stream .bracketFullWeak(resource) { case ((_, release), exit) => + println("DEBUG: inside resourceWeak Allocate case CALLING release") release(exit) } .mapNoScope(_._1) case Resource.Bind(source, f) => resourceWeak(source).flatMap(o => resourceWeak(f(o))) - case Resource.Eval(fo) => Stream.eval(fo) - case Resource.Pure(o) => Stream.emit(o) + case Resource.Eval(fo) => + Stream.eval(fo) + case Resource.Pure(o) => + Stream.emit(o) } /** Same as [[resourceWeak]], but expressed as a FunctionK. */ diff --git a/core/shared/src/test/scala/fs2/StreamSuite.scala b/core/shared/src/test/scala/fs2/StreamSuite.scala index d1fb1ae82d..f8c46f1170 100644 --- a/core/shared/src/test/scala/fs2/StreamSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamSuite.scala @@ -1042,4 +1042,13 @@ class StreamSuite extends Fs2Suite { } } + test("parEvalMap works correctly") { + Stream + .resource(Resource.make(IO.println("acquire"))(_ => IO.println("release"))) + // .parEvalMap(2)(_ => IO.sleep(1.second) >> IO.println("use")) + .parEvalMap(2)(_ => IO.println("use")) + .compile + .drain + } + } From 9a2f274cd974645c38c716de90469c3b4630217f Mon Sep 17 00:00:00 2001 From: tothpeti Date: Thu, 14 Sep 2023 09:15:58 +0200 Subject: [PATCH 02/11] Refactor --- core/shared/src/main/scala/fs2/Stream.scala | 4 ++-- core/shared/src/test/scala/fs2/StreamSuite.scala | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 7433a2183f..7d68dede2a 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -2253,7 +2253,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, semaphore.available.flatMap { case `concurrency` => println("DEBUG: inside releaseAndCheckCompletion") - channel.close.void // *> end.complete(()).void + channel.close.void *> end.complete(()).void case _ => F.unit } @@ -2294,7 +2294,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, .concurrently(background) .onFinalize { println("DEBUG: Inside the foreground's onFinalize"); - stop.complete(()) *> end.complete(()) *> end.get + stop.complete(()) *> end.get } } diff --git a/core/shared/src/test/scala/fs2/StreamSuite.scala b/core/shared/src/test/scala/fs2/StreamSuite.scala index f8c46f1170..bcded93f29 100644 --- a/core/shared/src/test/scala/fs2/StreamSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamSuite.scala @@ -1045,8 +1045,8 @@ class StreamSuite extends Fs2Suite { test("parEvalMap works correctly") { Stream .resource(Resource.make(IO.println("acquire"))(_ => IO.println("release"))) - // .parEvalMap(2)(_ => IO.sleep(1.second) >> IO.println("use")) - .parEvalMap(2)(_ => IO.println("use")) + .parEvalMap(2)(_ => IO.sleep(1.second) >> IO.println("use")) + // .parEvalMap(2)(_ => IO.println("use")) .compile .drain } From 02ebfad3cdccb27f655d389e046bb62a134d763e Mon Sep 17 00:00:00 2001 From: tothpeti Date: Thu, 14 Sep 2023 09:55:22 +0200 Subject: [PATCH 03/11] Refactor --- core/shared/src/main/scala/fs2/Stream.scala | 49 ++++++++++--------- .../src/test/scala/fs2/StreamSuite.scala | 4 +- 2 files changed, 27 insertions(+), 26 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 7d68dede2a..ec2150bc87 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -2252,7 +2252,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, semaphore.release *> semaphore.available.flatMap { case `concurrency` => - println("DEBUG: inside releaseAndCheckCompletion") + F.unit.map(_ => println("DEBUG: inside releaseAndCheckCompletion")) channel.close.void *> end.complete(()).void case _ => F.unit } @@ -2262,12 +2262,12 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, poll(semaphore.acquire) <* Deferred[F2, Unit].flatMap { pushed => val init = initFork(pushed.complete(()).void) - println("DEBUG: Inside forkOnElem") - poll(init).onCancel(releaseAndCheckCompletion).flatMap { send => - println("DEBUG: Inside forkOnElem and inside onCancel") - val action = F.catchNonFatal(f(el)).flatten.attempt.flatMap(send) *> pushed.get - F.start(stop.get.race(action) *> releaseAndCheckCompletion) - } + F.unit.map(_ => println("DEBUG: Inside forkOnElem")) *> + poll(init).onCancel(releaseAndCheckCompletion).flatMap { send => + val action = F.catchNonFatal(f(el)).flatten.attempt.flatMap(send) *> pushed.get + F.unit.map(_ => println("DEBUG: Inside forkOnElem and inside onCancel")) *> + F.start(stop.get.race(action) *> releaseAndCheckCompletion) + } } } @@ -2277,24 +2277,24 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, .foreach(forkOnElem) .onFinalizeCase { case ExitCase.Succeeded => - println("DEBUG: inside background SUCCESS case") - releaseAndCheckCompletion + F.unit.map(_ => println("DEBUG: inside background SUCCESS case")) *> + releaseAndCheckCompletion case _ => - println("DEBUG: inside background OTHER case") - stop.complete(()) *> releaseAndCheckCompletion + F.unit.map(_ => println("DEBUG: inside background OTHER case")) *> + stop.complete(()) *> releaseAndCheckCompletion } val foreground = channel.stream .evalTap { x => - println("DEBUG: Inside foreground evalTap") - x + F.unit.map(_ => println("DEBUG: Inside foreground evalTap")) *> + x } .evalMap(_.rethrow) foreground .concurrently(background) .onFinalize { - println("DEBUG: Inside the foreground's onFinalize"); - stop.complete(()) *> end.get + F.unit.map(_ => println("DEBUG: Inside the foreground's onFinalize")) *> + stop.complete(()) *> end.get } } @@ -3861,17 +3861,18 @@ object Stream extends StreamLowPriority { def resourceWeak[F[_], O](r: Resource[F, O])(implicit F: MonadCancel[F, _]): Stream[F, O] = r match { case Resource.Allocate(resource) => - println("DEBUG: inside resourceWeak Allocate case") - Stream - .bracketFullWeak(resource) { case ((_, release), exit) => - println("DEBUG: inside resourceWeak Allocate case CALLING release") - release(exit) - } - .mapNoScope(_._1) + Stream.eval(F.unit.map(_ => println("DEBUG: inside resourceWeak Allocate case"))) *> + Stream + .bracketFullWeak(resource) { case ((_, release), exit) => + F.unit.map(_ => + println("DEBUG: inside resourceWeak Allocate case CALLING release") + ) *> + release(exit) + } + .mapNoScope(_._1) case Resource.Bind(source, f) => resourceWeak(source).flatMap(o => resourceWeak(f(o))) - case Resource.Eval(fo) => - Stream.eval(fo) + case Resource.Eval(fo) => Stream.eval(fo) case Resource.Pure(o) => Stream.emit(o) } diff --git a/core/shared/src/test/scala/fs2/StreamSuite.scala b/core/shared/src/test/scala/fs2/StreamSuite.scala index bcded93f29..f8c46f1170 100644 --- a/core/shared/src/test/scala/fs2/StreamSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamSuite.scala @@ -1045,8 +1045,8 @@ class StreamSuite extends Fs2Suite { test("parEvalMap works correctly") { Stream .resource(Resource.make(IO.println("acquire"))(_ => IO.println("release"))) - .parEvalMap(2)(_ => IO.sleep(1.second) >> IO.println("use")) - // .parEvalMap(2)(_ => IO.println("use")) + // .parEvalMap(2)(_ => IO.sleep(1.second) >> IO.println("use")) + .parEvalMap(2)(_ => IO.println("use")) .compile .drain } From e4d2c3912140f85d0e2b50fbdb0b937af602e036 Mon Sep 17 00:00:00 2001 From: tothpeti Date: Thu, 14 Sep 2023 11:32:31 +0200 Subject: [PATCH 04/11] Refactor --- core/shared/src/main/scala/fs2/Stream.scala | 4 ++-- core/shared/src/test/scala/fs2/StreamSuite.scala | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index ec2150bc87..98aa131872 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -2252,8 +2252,8 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, semaphore.release *> semaphore.available.flatMap { case `concurrency` => - F.unit.map(_ => println("DEBUG: inside releaseAndCheckCompletion")) - channel.close.void *> end.complete(()).void + F.unit.map(_ => println("DEBUG: inside releaseAndCheckCompletion")) *> + channel.close.void *> end.complete(()).void case _ => F.unit } diff --git a/core/shared/src/test/scala/fs2/StreamSuite.scala b/core/shared/src/test/scala/fs2/StreamSuite.scala index f8c46f1170..bcded93f29 100644 --- a/core/shared/src/test/scala/fs2/StreamSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamSuite.scala @@ -1045,8 +1045,8 @@ class StreamSuite extends Fs2Suite { test("parEvalMap works correctly") { Stream .resource(Resource.make(IO.println("acquire"))(_ => IO.println("release"))) - // .parEvalMap(2)(_ => IO.sleep(1.second) >> IO.println("use")) - .parEvalMap(2)(_ => IO.println("use")) + .parEvalMap(2)(_ => IO.sleep(1.second) >> IO.println("use")) + // .parEvalMap(2)(_ => IO.println("use")) .compile .drain } From fd6b1d05b6800ee63f6b1dee0167657cd52782d8 Mon Sep 17 00:00:00 2001 From: tothpeti Date: Fri, 15 Sep 2023 10:44:49 +0200 Subject: [PATCH 05/11] Testing --- core/shared/src/main/scala/fs2/Stream.scala | 99 +++++++++++++++++-- .../src/test/scala/fs2/StreamSuite.scala | 18 +++- 2 files changed, 107 insertions(+), 10 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 98aa131872..497ac776cd 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -545,18 +545,26 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, interrupt <- F.deferred[Unit] backResult <- F.deferred[Either[Throwable, Unit]] } yield { - def watch[A](str: Stream[F2, A]) = str.interruptWhen(interrupt.get.attempt) + def watch[A](str: Stream[F2, A]) = Stream.eval( + F.unit.map(_ => println("DEBUG: Inside 'concurrently' watch")) + ) *> str.interruptWhen(interrupt.get.attempt) val compileBack: F2[Unit] = watch(that).compile.drain.guaranteeCase { // Pass the result of backstream completion in the backResult deferred. // IF result of back-stream was failed, interrupt fore. Otherwise, let it be case Outcome.Errored(t) => backResult.complete(Left(t)) >> interrupt.complete(()).void - case _ => backResult.complete(Right(())).void + case _ => + backResult.complete(Right(())).void *> F.unit.map(_ => + println("DEBUG: Inside 'concurrently' compileBack SUCCESS case") + ) }.voidError // stop background process but await for it to finalise with a result // We use F.fromEither to bring errors from the back into the fore - val stopBack: F2[Unit] = interrupt.complete(()) >> backResult.get.flatMap(F.fromEither) + val stopBack: F2[Unit] = + interrupt.complete(()) >> backResult.get.flatMap(F.fromEither) *> F.unit.map(_ => + println("DEBUG: Inside 'concurrently' stopBack") + ) (Stream.bracket(compileBack.start)(_ => stopBack), watch(this)) } @@ -2253,7 +2261,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, semaphore.available.flatMap { case `concurrency` => F.unit.map(_ => println("DEBUG: inside releaseAndCheckCompletion")) *> - channel.close.void *> end.complete(()).void + channel.close *> end.complete(()).void case _ => F.unit } @@ -2266,7 +2274,12 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, poll(init).onCancel(releaseAndCheckCompletion).flatMap { send => val action = F.catchNonFatal(f(el)).flatten.attempt.flatMap(send) *> pushed.get F.unit.map(_ => println("DEBUG: Inside forkOnElem and inside onCancel")) *> - F.start(stop.get.race(action) *> releaseAndCheckCompletion) + // F.start(stop.get.race(action) *> releaseAndCheckCompletion) + F.start( + stop.get.race(action) *> F.unit.map(_ => + println("DEBUG: Inside forkOnElem and inside onCancel after action") + ) *> releaseAndCheckCompletion + ) } } } @@ -2290,12 +2303,78 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, x } .evalMap(_.rethrow) + + /* foreground .concurrently(background) .onFinalize { F.unit.map(_ => println("DEBUG: Inside the foreground's onFinalize")) *> stop.complete(()) *> end.get } + */ + + /* + val z = for { + _ <- Stream.resource(background.compile.resource.drain) + forg <- Stream.resource( + foreground + .onFinalize { + F.unit.map(_ => println("DEBUG: Inside the foreground's onFinalize")) *> + stop.complete(()) *> end.get + } + .compile + .resource + .lastOrError + ) + } yield forg + */ + /* + val z = for { + forg <- Stream.resource( + foreground + .onFinalize { + F.unit.map(_ => println("DEBUG: Inside the foreground's onFinalize")) *> + stop.complete(()) *> end.get + } + .compile + .resource + .lastOrError + ) + _ <- Stream.resource(background.compile.resource.drain) + } yield forg + */ + + /* + val e = Stream + .resource(background.compile.resource.drain) + .flatMap { _ => + val x = Stream.eval( + foreground + .onFinalize { + F.unit.map(_ => println("DEBUG: Inside the foreground's onFinalize")) *> + stop.complete(()) *> end.get + } + ) + x + } + */ + + // This one runs but still closes somehow the resource :( + val z = for { + _ <- Resource.eval(background.compile.drain) + forg <- Resource.eval { + foreground + .onFinalize { + F.unit.map(_ => println("DEBUG: Inside the foreground's onFinalize")) *> + stop.complete(()) *> end.get + } + .compile + .lastOrError + } + } yield forg + + Stream.resource(z) + } Stream.force(action) @@ -3871,10 +3950,14 @@ object Stream extends StreamLowPriority { } .mapNoScope(_._1) case Resource.Bind(source, f) => - resourceWeak(source).flatMap(o => resourceWeak(f(o))) - case Resource.Eval(fo) => Stream.eval(fo) + Stream.eval(F.unit.map(_ => println("DEBUG: inside resourceWeak Bind case"))) *> + resourceWeak(source).flatMap(o => resourceWeak(f(o))) + case Resource.Eval(fo) => + Stream.eval(F.unit.map(_ => println("DEBUG: inside resourceWeak Eval case"))) *> + Stream.eval(fo) case Resource.Pure(o) => - Stream.emit(o) + Stream.eval(F.unit.map(_ => println("DEBUG: inside resourceWeak Pure case"))) *> + Stream.emit(o) } /** Same as [[resourceWeak]], but expressed as a FunctionK. */ diff --git a/core/shared/src/test/scala/fs2/StreamSuite.scala b/core/shared/src/test/scala/fs2/StreamSuite.scala index bcded93f29..a13b23a6a3 100644 --- a/core/shared/src/test/scala/fs2/StreamSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamSuite.scala @@ -1043,9 +1043,23 @@ class StreamSuite extends Fs2Suite { } test("parEvalMap works correctly") { + /* Stream - .resource(Resource.make(IO.println("acquire"))(_ => IO.println("release"))) - .parEvalMap(2)(_ => IO.sleep(1.second) >> IO.println("use")) + .resource( + Resource.make(IO.println("acquire"))(_ => IO.println("release")) + ) + */ + Stream + .resource( + Resource.make(IO.println("Creating Resource") *> IO.ref(true))(r => + IO.println("Closing Resource") *> r.set(false) + ) + ) + // .parEvalMap(2)(_ => IO.sleep(1.second) >> IO.println("use")) + .parEvalMap(2)(ref => + ref.get.flatMap(x => IO.println(s"before sleep: ${x}")) >> IO.sleep(1.second) >> + ref.get.flatMap(x => IO.println(s"after sleep: ${x}")) + ) // .parEvalMap(2)(_ => IO.println("use")) .compile .drain From 08c334033435bbcb791e8fdb5ae3e0ce61a01a53 Mon Sep 17 00:00:00 2001 From: tothpeti Date: Fri, 15 Sep 2023 16:24:49 +0200 Subject: [PATCH 06/11] Testing --- core/shared/src/main/scala/fs2/Stream.scala | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 497ac776cd..5031c85fcc 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -2360,8 +2360,9 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, */ // This one runs but still closes somehow the resource :( + /* val z = for { - _ <- Resource.eval(background.compile.drain) + b <- Resource.eval(background.compile.drain) forg <- Resource.eval { foreground .onFinalize { @@ -2372,8 +2373,22 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, .lastOrError } } yield forg + */ + + // This one runs as well but still closes somehow the resource :( + val y = Resource.eval(background.compile.drain).background.use { _ => + Resource.eval { + foreground + .onFinalize { + F.unit.map(_ => println("DEBUG: Inside the foreground's onFinalize")) *> + stop.complete(()) *> end.get + } + .compile + .lastOrError + } + } - Stream.resource(z) + Stream.resource(y) } From 39e1472cceeb3e95d90bc860322a15b0f0a6ba3a Mon Sep 17 00:00:00 2001 From: tothpeti Date: Tue, 19 Sep 2023 17:55:36 +0200 Subject: [PATCH 07/11] Testing --- core/shared/src/main/scala/fs2/Stream.scala | 40 +++++++++---------- .../src/test/scala/fs2/StreamSuite.scala | 17 +++++++- 2 files changed, 34 insertions(+), 23 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 5031c85fcc..cf08378618 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -545,9 +545,9 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, interrupt <- F.deferred[Unit] backResult <- F.deferred[Either[Throwable, Unit]] } yield { - def watch[A](str: Stream[F2, A]) = Stream.eval( - F.unit.map(_ => println("DEBUG: Inside 'concurrently' watch")) - ) *> str.interruptWhen(interrupt.get.attempt) + def watch[A](str: Stream[F2, A]) = str.interruptWhen(interrupt.get.attempt) <* Stream.eval( + F.unit.map(_ => println("DEBUG: Inside 'concurrently' watch AFTER interruptWhen")) + ) val compileBack: F2[Unit] = watch(that).compile.drain.guaranteeCase { // Pass the result of backstream completion in the backResult deferred. @@ -2277,9 +2277,11 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, // F.start(stop.get.race(action) *> releaseAndCheckCompletion) F.start( stop.get.race(action) *> F.unit.map(_ => - println("DEBUG: Inside forkOnElem and inside onCancel after action") + println( + "DEBUG: Inside forkOnElem and inside F.start after action invocation" + ) ) *> releaseAndCheckCompletion - ) + ) *> F.unit.map(_ => println("DEBUG: F.start invoked")) } } } @@ -2304,6 +2306,8 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, } .evalMap(_.rethrow) + foreground.onFinalize(stop.complete(()) *> end.get).concurrently(background) + /* foreground .concurrently(background) @@ -2373,7 +2377,6 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, .lastOrError } } yield forg - */ // This one runs as well but still closes somehow the resource :( val y = Resource.eval(background.compile.drain).background.use { _ => @@ -2389,6 +2392,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, } Stream.resource(y) + */ } @@ -3955,24 +3959,18 @@ object Stream extends StreamLowPriority { def resourceWeak[F[_], O](r: Resource[F, O])(implicit F: MonadCancel[F, _]): Stream[F, O] = r match { case Resource.Allocate(resource) => - Stream.eval(F.unit.map(_ => println("DEBUG: inside resourceWeak Allocate case"))) *> - Stream - .bracketFullWeak(resource) { case ((_, release), exit) => - F.unit.map(_ => - println("DEBUG: inside resourceWeak Allocate case CALLING release") - ) *> - release(exit) - } - .mapNoScope(_._1) + Stream + .bracketFullWeak(resource) { case ((_, release), exit) => + F.unit.map(_ => println("DEBUG: inside resourceWeak Allocate case CALLING release")) *> + release(exit) + } + .mapNoScope(_._1) case Resource.Bind(source, f) => - Stream.eval(F.unit.map(_ => println("DEBUG: inside resourceWeak Bind case"))) *> - resourceWeak(source).flatMap(o => resourceWeak(f(o))) + resourceWeak(source).flatMap(o => resourceWeak(f(o))) case Resource.Eval(fo) => - Stream.eval(F.unit.map(_ => println("DEBUG: inside resourceWeak Eval case"))) *> - Stream.eval(fo) + Stream.eval(fo) case Resource.Pure(o) => - Stream.eval(F.unit.map(_ => println("DEBUG: inside resourceWeak Pure case"))) *> - Stream.emit(o) + Stream.emit(o) } /** Same as [[resourceWeak]], but expressed as a FunctionK. */ diff --git a/core/shared/src/test/scala/fs2/StreamSuite.scala b/core/shared/src/test/scala/fs2/StreamSuite.scala index a13b23a6a3..e4179f6d45 100644 --- a/core/shared/src/test/scala/fs2/StreamSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamSuite.scala @@ -1049,18 +1049,31 @@ class StreamSuite extends Fs2Suite { Resource.make(IO.println("acquire"))(_ => IO.println("release")) ) */ + /* Stream .resource( Resource.make(IO.println("Creating Resource") *> IO.ref(true))(r => IO.println("Closing Resource") *> r.set(false) ) ) - // .parEvalMap(2)(_ => IO.sleep(1.second) >> IO.println("use")) .parEvalMap(2)(ref => ref.get.flatMap(x => IO.println(s"before sleep: ${x}")) >> IO.sleep(1.second) >> ref.get.flatMap(x => IO.println(s"after sleep: ${x}")) ) - // .parEvalMap(2)(_ => IO.println("use")) + .compile + .drain + */ + + Stream + .resource( + Resource.make(IO.println("Creating Resource") *> IO.ref(true))(r => + IO.println("Closing Resource") *> r.set(false) + ) + ) + .parEvalMap(2) { ref => + IO.sleep(1 second) >> + ref.get.flatMap(x => IO.println(s"after sleep: ${x}")) + } .compile .drain } From 329c9f9e7cef863a20e9dc1c476b13b2efc595c2 Mon Sep 17 00:00:00 2001 From: tothpeti Date: Wed, 20 Sep 2023 19:15:11 +0200 Subject: [PATCH 08/11] Potential fix --- core/shared/src/main/scala/fs2/Stream.scala | 3 +++ core/shared/src/test/scala/fs2/StreamSuite.scala | 9 +++++++++ 2 files changed, 12 insertions(+) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index cf08378618..313bbf5947 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -2275,6 +2275,8 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, val action = F.catchNonFatal(f(el)).flatten.attempt.flatMap(send) *> pushed.get F.unit.map(_ => println("DEBUG: Inside forkOnElem and inside onCancel")) *> // F.start(stop.get.race(action) *> releaseAndCheckCompletion) + stop.get.race(action) *> F.unit.map(_ => println("DEBUG: Inside forkOnElem and after action invocation")) *> releaseAndCheckCompletion + /* F.start( stop.get.race(action) *> F.unit.map(_ => println( @@ -2282,6 +2284,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, ) ) *> releaseAndCheckCompletion ) *> F.unit.map(_ => println("DEBUG: F.start invoked")) + */ } } } diff --git a/core/shared/src/test/scala/fs2/StreamSuite.scala b/core/shared/src/test/scala/fs2/StreamSuite.scala index e4179f6d45..8412e4f000 100644 --- a/core/shared/src/test/scala/fs2/StreamSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamSuite.scala @@ -1064,6 +1064,7 @@ class StreamSuite extends Fs2Suite { .drain */ + /* Stream .resource( Resource.make(IO.println("Creating Resource") *> IO.ref(true))(r => @@ -1071,11 +1072,19 @@ class StreamSuite extends Fs2Suite { ) ) .parEvalMap(2) { ref => + ref.get.flatMap(x => IO.println(s"before sleep: ${x}")) >> IO.sleep(1 second) >> ref.get.flatMap(x => IO.println(s"after sleep: ${x}")) } .compile .drain + */ + Stream + .resource(Resource.make(IO.println("acquire"))(_ => IO.println("release"))) + .parEvalMap(2)(_ => IO.sleep(1.second) >> IO.println("use")) + .compile + .drain + } } From a1a11c211b5fc6b86341231ca11a5e06fc5ee26f Mon Sep 17 00:00:00 2001 From: tothpeti Date: Wed, 20 Sep 2023 19:44:05 +0200 Subject: [PATCH 09/11] Revert potential fix --- core/shared/src/main/scala/fs2/Stream.scala | 4 +++- core/shared/src/test/scala/fs2/StreamSuite.scala | 8 ++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 313bbf5947..376cbf9b7f 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -2275,7 +2275,9 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, val action = F.catchNonFatal(f(el)).flatten.attempt.flatMap(send) *> pushed.get F.unit.map(_ => println("DEBUG: Inside forkOnElem and inside onCancel")) *> // F.start(stop.get.race(action) *> releaseAndCheckCompletion) - stop.get.race(action) *> F.unit.map(_ => println("DEBUG: Inside forkOnElem and after action invocation")) *> releaseAndCheckCompletion + F.start( + stop.get.race(action) *> F.unit.map(_ => println("DEBUG: Inside forkOnElem and after action invocation")) *> releaseAndCheckCompletion + ) /* F.start( stop.get.race(action) *> F.unit.map(_ => diff --git a/core/shared/src/test/scala/fs2/StreamSuite.scala b/core/shared/src/test/scala/fs2/StreamSuite.scala index 8412e4f000..9ee17eb5aa 100644 --- a/core/shared/src/test/scala/fs2/StreamSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamSuite.scala @@ -1079,11 +1079,19 @@ class StreamSuite extends Fs2Suite { .compile .drain */ + /* Stream .resource(Resource.make(IO.println("acquire"))(_ => IO.println("release"))) .parEvalMap(2)(_ => IO.sleep(1.second) >> IO.println("use")) .compile .drain + */ + + Stream.range(0, 60) + .covary[IO] + .parEvalMap(60)(_ => IO.sleep(1.second)) + .compile + .drain } From e00eb08cdc43865ec19cdeebbeda393836f43c41 Mon Sep 17 00:00:00 2001 From: tothpeti Date: Mon, 11 Dec 2023 11:04:20 +0100 Subject: [PATCH 10/11] Adding new logs --- core/shared/src/main/scala/fs2/Stream.scala | 145 +++++------------- .../src/test/scala/fs2/StreamSuite.scala | 6 +- 2 files changed, 41 insertions(+), 110 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 376cbf9b7f..9a7daa618e 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -534,7 +534,10 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, def concurrently[F2[x] >: F[x], O2]( that: Stream[F2, O2] )(implicit F: Concurrent[F2]): Stream[F2, O] = - concurrentlyAux(that).flatMap { case (startBack, fore) => startBack >> fore } + Stream.eval(F.unit.map(_ => println("DEBUG: Invoked concurrently"))) *> concurrentlyAux(that) + .flatMap { case (startBack, fore) => + startBack >> fore + } private def concurrentlyAux[F2[x] >: F[x], O2]( that: Stream[F2, O2] @@ -549,24 +552,43 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, F.unit.map(_ => println("DEBUG: Inside 'concurrently' watch AFTER interruptWhen")) ) - val compileBack: F2[Unit] = watch(that).compile.drain.guaranteeCase { + val compileBack: F2[Unit] = F.unit.map(_ => + println("DEBUG: Inside 'concurrently' compileBack started (starting resource)") + ) *> watch(that).compile.drain.guaranteeCase { // Pass the result of backstream completion in the backResult deferred. // IF result of back-stream was failed, interrupt fore. Otherwise, let it be - case Outcome.Errored(t) => backResult.complete(Left(t)) >> interrupt.complete(()).void - case _ => + case Outcome.Errored(t) => + F.unit.map(_ => + println("DEBUG: Inside 'concurrently' compileBack ERRORED case") + ) *> backResult.complete(Left(t)) >> interrupt.complete(()).void + case Outcome.Canceled() => backResult.complete(Right(())).void *> F.unit.map(_ => - println("DEBUG: Inside 'concurrently' compileBack SUCCESS case") + println("DEBUG: Inside 'concurrently' compileBack CANCELED case") ) + case Outcome.Succeeded(fa) => + F.unit.map(_ => + println(s"DEBUG: Inside 'concurrently' compileBack SUCCESS case with value: $fa") + ) *> + backResult.complete(Right(())).void *> F.unit.map(_ => + println("DEBUG: Inside 'concurrently' compileBack SUCCESS case") + ) }.voidError // stop background process but await for it to finalise with a result // We use F.fromEither to bring errors from the back into the fore val stopBack: F2[Unit] = interrupt.complete(()) >> backResult.get.flatMap(F.fromEither) *> F.unit.map(_ => - println("DEBUG: Inside 'concurrently' stopBack") + println("DEBUG: Inside 'concurrently' AFTER stopBack completed") ) - (Stream.bracket(compileBack.start)(_ => stopBack), watch(this)) + ( + Stream.bracket(compileBack.start)(_ => + F.unit.map(_ => + println("DEBUG: Inside 'concurrently' stopBack invoked (closing resource)") + ) *> stopBack + ), + watch(this) + ) } Stream.eval(fstream) @@ -2260,9 +2282,13 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, semaphore.release *> semaphore.available.flatMap { case `concurrency` => - F.unit.map(_ => println("DEBUG: inside releaseAndCheckCompletion")) *> + F.unit.map(_ => + println("DEBUG: inside releaseAndCheckCompletion 'concurrency' case ") + ) *> channel.close *> end.complete(()).void - case _ => F.unit + case _ => + F.unit.map(_ => println("DEBUG: inside releaseAndCheckCompletion 'other' case ")) *> + F.unit } def forkOnElem(el: O): F2[Unit] = @@ -2273,20 +2299,14 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, F.unit.map(_ => println("DEBUG: Inside forkOnElem")) *> poll(init).onCancel(releaseAndCheckCompletion).flatMap { send => val action = F.catchNonFatal(f(el)).flatten.attempt.flatMap(send) *> pushed.get - F.unit.map(_ => println("DEBUG: Inside forkOnElem and inside onCancel")) *> + F.unit + .map(_ => println("DEBUG: Inside forkOnElem and BEFORE action invocation")) *> // F.start(stop.get.race(action) *> releaseAndCheckCompletion) - F.start( - stop.get.race(action) *> F.unit.map(_ => println("DEBUG: Inside forkOnElem and after action invocation")) *> releaseAndCheckCompletion - ) - /* F.start( stop.get.race(action) *> F.unit.map(_ => - println( - "DEBUG: Inside forkOnElem and inside F.start after action invocation" - ) + println("DEBUG: Inside forkOnElem when stop vs action race ends") ) *> releaseAndCheckCompletion - ) *> F.unit.map(_ => println("DEBUG: F.start invoked")) - */ + ) } } } @@ -2312,93 +2332,6 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, .evalMap(_.rethrow) foreground.onFinalize(stop.complete(()) *> end.get).concurrently(background) - - /* - foreground - .concurrently(background) - .onFinalize { - F.unit.map(_ => println("DEBUG: Inside the foreground's onFinalize")) *> - stop.complete(()) *> end.get - } - */ - - /* - val z = for { - _ <- Stream.resource(background.compile.resource.drain) - forg <- Stream.resource( - foreground - .onFinalize { - F.unit.map(_ => println("DEBUG: Inside the foreground's onFinalize")) *> - stop.complete(()) *> end.get - } - .compile - .resource - .lastOrError - ) - } yield forg - */ - /* - val z = for { - forg <- Stream.resource( - foreground - .onFinalize { - F.unit.map(_ => println("DEBUG: Inside the foreground's onFinalize")) *> - stop.complete(()) *> end.get - } - .compile - .resource - .lastOrError - ) - _ <- Stream.resource(background.compile.resource.drain) - } yield forg - */ - - /* - val e = Stream - .resource(background.compile.resource.drain) - .flatMap { _ => - val x = Stream.eval( - foreground - .onFinalize { - F.unit.map(_ => println("DEBUG: Inside the foreground's onFinalize")) *> - stop.complete(()) *> end.get - } - ) - x - } - */ - - // This one runs but still closes somehow the resource :( - /* - val z = for { - b <- Resource.eval(background.compile.drain) - forg <- Resource.eval { - foreground - .onFinalize { - F.unit.map(_ => println("DEBUG: Inside the foreground's onFinalize")) *> - stop.complete(()) *> end.get - } - .compile - .lastOrError - } - } yield forg - - // This one runs as well but still closes somehow the resource :( - val y = Resource.eval(background.compile.drain).background.use { _ => - Resource.eval { - foreground - .onFinalize { - F.unit.map(_ => println("DEBUG: Inside the foreground's onFinalize")) *> - stop.complete(()) *> end.get - } - .compile - .lastOrError - } - } - - Stream.resource(y) - */ - } Stream.force(action) diff --git a/core/shared/src/test/scala/fs2/StreamSuite.scala b/core/shared/src/test/scala/fs2/StreamSuite.scala index 9ee17eb5aa..246ddafdf2 100644 --- a/core/shared/src/test/scala/fs2/StreamSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamSuite.scala @@ -1064,7 +1064,6 @@ class StreamSuite extends Fs2Suite { .drain */ - /* Stream .resource( Resource.make(IO.println("Creating Resource") *> IO.ref(true))(r => @@ -1073,25 +1072,24 @@ class StreamSuite extends Fs2Suite { ) .parEvalMap(2) { ref => ref.get.flatMap(x => IO.println(s"before sleep: ${x}")) >> - IO.sleep(1 second) >> + IO.sleep(1 second) >> ref.get.flatMap(x => IO.println(s"after sleep: ${x}")) } .compile .drain - */ /* Stream .resource(Resource.make(IO.println("acquire"))(_ => IO.println("release"))) .parEvalMap(2)(_ => IO.sleep(1.second) >> IO.println("use")) .compile .drain - */ Stream.range(0, 60) .covary[IO] .parEvalMap(60)(_ => IO.sleep(1.second)) .compile .drain + */ } From 793b7f9452a519f23e4c27088151f61a81cfab25 Mon Sep 17 00:00:00 2001 From: tothpeti Date: Tue, 12 Dec 2023 09:56:58 +0100 Subject: [PATCH 11/11] Trying out replacing Stream.bracket with Stream.resource --- core/shared/src/main/scala/fs2/Stream.scala | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 9a7daa618e..3fe6fb9ad1 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -549,7 +549,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, backResult <- F.deferred[Either[Throwable, Unit]] } yield { def watch[A](str: Stream[F2, A]) = str.interruptWhen(interrupt.get.attempt) <* Stream.eval( - F.unit.map(_ => println("DEBUG: Inside 'concurrently' watch AFTER interruptWhen")) + F.unit.map(_ => println("DEBUG: Inside 'concurrently' watch is invoked")) ) val compileBack: F2[Unit] = F.unit.map(_ => @@ -581,6 +581,17 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, println("DEBUG: Inside 'concurrently' AFTER stopBack completed") ) + val tmp = Stream.resource( + Resource.make(compileBack.start)(_ => + F.unit.map(_ => + println("DEBUG: Inside 'concurrently' stopBack invoked (closing resource)") + ) *> stopBack + ) + ) + + (tmp, watch(this)) + + /* ( Stream.bracket(compileBack.start)(_ => F.unit.map(_ => @@ -589,6 +600,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, ), watch(this) ) + */ } Stream.eval(fstream)