diff --git a/build.sbt b/build.sbt index 3399baf2b2..612e46f809 100644 --- a/build.sbt +++ b/build.sbt @@ -389,9 +389,11 @@ lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform) "org.scodec" %%% "scodec-bits" % "1.2.4", "org.typelevel" %%% "cats-core" % "2.13.0", "org.typelevel" %%% "cats-effect" % "3.7.0-RC1", + "org.typelevel" %%% "cats-mtl" % "1.6.0", "org.typelevel" %%% "cats-effect-laws" % "3.7.0-RC1" % Test, "org.typelevel" %%% "cats-effect-testkit" % "3.7.0-RC1" % Test, "org.typelevel" %%% "cats-laws" % "2.13.0" % Test, + "org.typelevel" %%% "cats-mtl-laws" % "1.6.0" % Test, "org.typelevel" %%% "discipline-munit" % "2.0.0" % Test, "org.typelevel" %%% "munit-cats-effect" % "2.2.0-RC1" % Test, "org.typelevel" %%% "scalacheck-effect-munit" % "2.1.0-RC1" % Test diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 415d564f64..a27c5f13cf 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -39,8 +39,9 @@ import fs2.concurrent._ import fs2.internal._ import org.typelevel.scalaccompat.annotation._ import Pull.StreamPullOps +import cats.mtl.{LiftKind, LiftValue} -import java.util.concurrent.Flow.{Publisher, Processor, Subscriber} +import java.util.concurrent.Flow.{Processor, Publisher, Subscriber} /** A stream producing output of type `O` and which may evaluate `F` effects. * @@ -5810,9 +5811,46 @@ object Stream extends StreamLowPriority { new Defer[Stream[F, *]] { override def defer[A](fa: => Stream[F, A]): Stream[F, A] = Stream.empty ++ fa } + + implicit def liftKindInstance[F[_]](implicit F: Applicative[F]): LiftKind[F, Stream[F, *]] = + liftKindImpl(F) + + implicit def liftValueFromResourceInstance[F[_]](implicit + F: MonadCancel[F, ?] + ): LiftValue[Resource[F, *], Stream[F, *]] = + liftValueFromResourceImpl(implicitly) } private[fs2] trait StreamLowPriority { implicit def monadInstance[F[_]]: Monad[Stream[F, *]] = new Stream.StreamMonad[F] + + protected[this] def liftKindImpl[F[_]](F: Applicative[F]): LiftKind[F, Stream[F, *]] = + new LiftKind[F, Stream[F, *]] { + val applicativeF: Applicative[F] = F + val applicativeG: Applicative[Stream[F, *]] = monadInstance + def apply[A](fa: F[A]): Stream[F, A] = Stream.eval(fa) + def limitedMapK[A](ga: Stream[F, A])(scope: F ~> F): Stream[F, A] = + ga.translate(scope) + } + + implicit def liftKindComposedInstance[F[_], G[_]](implicit + inner: LiftKind[F, G] + ): LiftKind[F, Stream[G, *]] = + inner.andThen(liftKindImpl(inner.applicativeG)) + + protected[this] def liftValueFromResourceImpl[F[_]]( + applicativeResource: Applicative[Resource[F, *]] + )(implicit F: MonadCancel[F, ?]): LiftValue[Resource[F, *], Stream[F, *]] = + new LiftValue[Resource[F, *], Stream[F, *]] { + val applicativeF: Applicative[Resource[F, *]] = applicativeResource + val applicativeG: Applicative[Stream[F, *]] = monadInstance + def apply[A](fa: Resource[F, A]): Stream[F, A] = Stream.resource(fa) + } + + implicit def liftValueFromResourceComposedInstance[F[_], G[_]](implicit + inner: LiftValue[F, Resource[G, *]], + G: MonadCancel[G, ?] + ): LiftValue[F, Stream[G, *]] = + inner.andThen(liftValueFromResourceImpl(inner.applicativeG)) } diff --git a/core/shared/src/test/scala/fs2/StreamLawsSuite.scala b/core/shared/src/test/scala/fs2/StreamLawsSuite.scala index 61d1829947..8f4731d8b0 100644 --- a/core/shared/src/test/scala/fs2/StreamLawsSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamLawsSuite.scala @@ -21,20 +21,39 @@ package fs2 -import cats.Eq -import cats.effect.IO +import cats.{Applicative, Eq, ~>} +import cats.data.{IdT, OptionT} +import cats.effect.{Concurrent, IO, Ref, Resource} import cats.effect.testkit.TestInstances import cats.laws.discipline._ import cats.laws.discipline.arbitrary._ +import cats.mtl.LiftValue +import cats.mtl.laws.discipline.{LiftKindTests, LiftValueTests} +import org.scalacheck.{Arbitrary, Gen} class StreamLawsSuite extends Fs2Suite with TestInstances { implicit val ticker: Ticker = Ticker() - implicit def eqStream[O: Eq]: Eq[Stream[IO, O]] = - Eq.instance((x, y) => - Eq[IO[Vector[Either[Throwable, O]]]] - .eqv(x.attempt.compile.toVector, y.attempt.compile.toVector) - ) + implicit def eqStream[F[_], O](implicit + F: Concurrent[F], + eqFVecEitherThrowO: Eq[F[Vector[Either[Throwable, O]]]] + ): Eq[Stream[F, O]] = + Eq.by((_: Stream[F, O]).attempt.compile.toVector) + + private[this] val counter: IO[Ref[IO, Int]] = IO.ref(0) + + implicit val arbitraryScope: Arbitrary[IO ~> IO] = + Arbitrary { + Gen.const { + new (IO ~> IO) { + def apply[A](fa: IO[A]): IO[A] = + for { + ref <- counter + res <- ref.update(_ + 1) >> fa + } yield res + } + } + } checkAll( "MonadError[Stream[F, *], Throwable]", @@ -50,4 +69,32 @@ class StreamLawsSuite extends Fs2Suite with TestInstances { "Align[Stream[F, *]]", AlignTests[Stream[IO, *]].align[Int, Int, Int, Int] ) + checkAll( + "LiftKind[IO, Stream[IO, *]", + LiftKindTests[IO, Stream[IO, *]].liftKind[Int, Int] + ) + checkAll( + "LiftKind[IO, Stream[OptionT[IO, *], *]", + LiftKindTests[IO, Stream[OptionT[IO, *], *]].liftKind[Int, Int] + ) + checkAll( + "LiftValue[Resource[IO, *], Stream[IO, *]", + LiftValueTests[Resource[IO, *], Stream[IO, *]].liftValue[Int, Int] + ) + locally { + // this is a somewhat silly instance, but we need a + // `LiftValue[X, Resource[IO, *]]` instance where `X` is not `IO` because + // that already has a higher priority implicit instance + implicit val liftIdTResource: LiftValue[IdT[IO, *], Resource[IO, *]] = + new LiftValue[IdT[IO, *], Resource[IO, *]] { + val applicativeF: Applicative[IdT[IO, *]] = implicitly + val applicativeG: Applicative[Resource[IO, *]] = implicitly + def apply[A](fa: IdT[IO, A]): Resource[IO, A] = + Resource.eval(fa.value) + } + checkAll( + "LiftValue[IdT[IO, *], Stream[IO, *]] via Resource[IO, *]", + LiftValueTests[IdT[IO, *], Stream[IO, *]].liftValue[Int, Int] + ) + } }