Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 51 additions & 3 deletions core/shared/src/main/scala/fs2/concurrent/Signal.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@ import cats.effect.std.MapRef
import cats.effect.syntax.all._
import cats.syntax.all._
import cats.{Applicative, Functor, Invariant, Monad}

import cats.arrow.FunctionK
import scala.collection.immutable.LongMap
import fs2.concurrent.SignallingRef.TransformedSignallingRef
import fs2.concurrent.Signal.TransformedSignal
import cats.data.State

/** Pure holder of a single value of type `A` that can be read in the effect `F`. */
trait Signal[F[_], A] { outer =>
Expand Down Expand Up @@ -135,6 +138,11 @@ trait Signal[F[_], A] { outer =>
*/
def waitUntil(p: A => Boolean)(implicit F: Concurrent[F]): F[Unit] =
discrete.forall(a => !p(a)).compile.drain

def mapK[G[_]](
f: FunctionK[F, G]
): Signal[G, A] =
new TransformedSignal(this, f)
}

object Signal extends SignalInstances {
Expand Down Expand Up @@ -162,6 +170,16 @@ object Signal extends SignalInstances {
def get: F[B] = Functor[F].map(fa.get)(f)
}

final private class TransformedSignal[F[_], G[_], A](
underlying: Signal[F, A],
trans: FunctionK[F, G]
) extends Signal[G, A] {
override def get: G[A] = trans(underlying.get)
override def discrete: Stream[G, A] = underlying.discrete.translate(trans)
override def continuous: Stream[G, A] = underlying.continuous.translate(trans)
override def changes(implicit eqA: Eq[A]): Signal[G, A] = underlying.changes.mapK(trans)
}

implicit class SignalOps[F[_], A](val self: Signal[F, A]) extends AnyVal {

/** Converts this signal to signal of `B` by applying `f`.
Expand Down Expand Up @@ -196,7 +214,12 @@ object Signal extends SignalInstances {
* function, in the presence of `discrete`, can return `false` and
* need looping even without any other writers.
*/
abstract class SignallingRef[F[_], A] extends Ref[F, A] with Signal[F, A]
abstract class SignallingRef[F[_], A] extends Ref[F, A] with Signal[F, A] {
def mapK[G[_]](
f: FunctionK[F, G]
)(implicit G: Functor[G], dummy: DummyImplicit): SignallingRef[G, A] =
new TransformedSignallingRef(this, f)
}

object SignallingRef {

Expand All @@ -222,6 +245,7 @@ object SignallingRef {
*
* @see [[of]]
*/

def apply[F[_]]: PartiallyApplied[F] = new PartiallyApplied[F]

/** Alias for `of`. */
Expand Down Expand Up @@ -341,7 +365,31 @@ object SignallingRef {
ref: SignallingRef[F, A]
)(get: A => B, set: A => B => A)(implicit F: Functor[F]): SignallingRef[F, B] =
new LensSignallingRef(ref)(get, set)

final private class TransformedSignallingRef[F[_], G[_], A](
underlying: SignallingRef[F, A],
trans: FunctionK[F, G]
)(implicit G: Functor[G])
extends SignallingRef[G, A] {

// --- Ref methods: these are lifted using trans, just like in TransformedRef2
override def get: G[A] = trans(underlying.get)
override def set(a: A): G[Unit] = trans(underlying.set(a))
override def getAndSet(a: A): G[A] = trans(underlying.getAndSet(a))
override def tryUpdate(f: A => A): G[Boolean] = trans(underlying.tryUpdate(f))
override def tryModify[B](f: A => (A, B)): G[Option[B]] = trans(underlying.tryModify(f))
override def update(f: A => A): G[Unit] = trans(underlying.update(f))
override def modify[B](f: A => (A, B)): G[B] = trans(underlying.modify(f))
override def tryModifyState[B](state: State[A, B]): G[Option[B]] =
trans(underlying.tryModifyState(state))
override def modifyState[B](state: State[A, B]): G[B] = trans(underlying.modifyState(state))
override def access: G[(A, A => G[Boolean])] =
G.compose[(A, *)].compose[A => *].map(trans(underlying.access))(trans(_))

// --- Signal-specific methods
override def discrete: Stream[G, A] = underlying.discrete.translate(trans)
override def continuous: Stream[G, A] = underlying.continuous.translate(trans)
override def changes(implicit eqA: Eq[A]): Signal[G, A] = underlying.changes.mapK(trans)
}
private final class LensSignallingRef[F[_], A, B](underlying: SignallingRef[F, A])(
lensGet: A => B,
lensSet: A => B => A
Expand Down
14 changes: 14 additions & 0 deletions core/shared/src/test/scala/fs2/concurrent/SignalSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ package concurrent
import cats.effect.IO
import cats.effect.kernel.Ref
import cats.syntax.all._
import cats.arrow.FunctionK
import cats.effect.testkit.TestControl
// import cats.laws.discipline.{ApplicativeTests, FunctorTests}
import scala.concurrent.duration._
Expand Down Expand Up @@ -320,6 +321,19 @@ class SignalSuite extends Fs2Suite {
TestControl.executeEmbed(prog).assertEquals(expected)
}

test("SignallingRef#mapK returns a SignallingRef") {
for {
s <- SignallingRef[IO, Int](0)
nt = new FunctionK[IO, IO] {
def apply[A](fa: IO[A]): IO[A] = fa
}
transformed: SignallingRef[IO, Int] = s.mapK(nt)
} yield assert(
transformed.isInstanceOf[SignallingRef[IO, Int]],
s"Expected transformed to be a SignallingRef but got: ${transformed.getClass.getName}"
)
}

// TODO - Port laws tests once we have a compatible version of cats-laws
// /**
// * This is unsafe because the Signal created cannot have multiple consumers
Expand Down