Skip to content
Merged
54 changes: 51 additions & 3 deletions core/shared/src/main/scala/fs2/concurrent/Signal.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@ import cats.effect.std.MapRef
import cats.effect.syntax.all._
import cats.syntax.all._
import cats.{Applicative, Functor, Invariant, Monad}

import cats.arrow.FunctionK
import scala.collection.immutable.LongMap
import fs2.concurrent.SignallingRef.TransformedSignallingRef
import fs2.concurrent.Signal.TransformedSignal
import cats.data.State

/** Pure holder of a single value of type `A` that can be read in the effect `F`. */
trait Signal[F[_], A] { outer =>
Expand Down Expand Up @@ -135,6 +138,11 @@ trait Signal[F[_], A] { outer =>
*/
def waitUntil(p: A => Boolean)(implicit F: Concurrent[F]): F[Unit] =
discrete.forall(a => !p(a)).compile.drain

def mapK[G[_]](
f: FunctionK[F, G]
): Signal[G, A] =
new TransformedSignal(this, f)
}

object Signal extends SignalInstances {
Expand Down Expand Up @@ -162,6 +170,16 @@ object Signal extends SignalInstances {
def get: F[B] = Functor[F].map(fa.get)(f)
}

final private class TransformedSignal[F[_], G[_], A](
underlying: Signal[F, A],
trans: FunctionK[F, G]
) extends Signal[G, A] {
override def get: G[A] = trans(underlying.get)
override def discrete: Stream[G, A] = underlying.discrete.translate(trans)
override def continuous: Stream[G, A] = underlying.continuous.translate(trans)
override def changes(implicit eqA: Eq[A]): Signal[G, A] = underlying.changes.mapK(trans)
}

implicit class SignalOps[F[_], A](val self: Signal[F, A]) extends AnyVal {

/** Converts this signal to signal of `B` by applying `f`.
Expand Down Expand Up @@ -196,7 +214,12 @@ object Signal extends SignalInstances {
* function, in the presence of `discrete`, can return `false` and
* need looping even without any other writers.
*/
abstract class SignallingRef[F[_], A] extends Ref[F, A] with Signal[F, A]
abstract class SignallingRef[F[_], A] extends Ref[F, A] with Signal[F, A] {
def mapK[G[_]](
f: FunctionK[F, G]
)(implicit G: Functor[G], dummy: DummyImplicit): SignallingRef[G, A] =
new TransformedSignallingRef(this, f)
}

object SignallingRef {

Expand All @@ -222,6 +245,7 @@ object SignallingRef {
*
* @see [[of]]
*/

def apply[F[_]]: PartiallyApplied[F] = new PartiallyApplied[F]

/** Alias for `of`. */
Expand Down Expand Up @@ -341,7 +365,31 @@ object SignallingRef {
ref: SignallingRef[F, A]
)(get: A => B, set: A => B => A)(implicit F: Functor[F]): SignallingRef[F, B] =
new LensSignallingRef(ref)(get, set)

final private class TransformedSignallingRef[F[_], G[_], A](
underlying: SignallingRef[F, A],
trans: FunctionK[F, G]
)(implicit G: Functor[G])
extends SignallingRef[G, A] {

// --- Ref methods: these are lifted using trans, just like in TransformedRef2
override def get: G[A] = trans(underlying.get)
override def set(a: A): G[Unit] = trans(underlying.set(a))
override def getAndSet(a: A): G[A] = trans(underlying.getAndSet(a))
override def tryUpdate(f: A => A): G[Boolean] = trans(underlying.tryUpdate(f))
override def tryModify[B](f: A => (A, B)): G[Option[B]] = trans(underlying.tryModify(f))
override def update(f: A => A): G[Unit] = trans(underlying.update(f))
override def modify[B](f: A => (A, B)): G[B] = trans(underlying.modify(f))
override def tryModifyState[B](state: State[A, B]): G[Option[B]] =
trans(underlying.tryModifyState(state))
override def modifyState[B](state: State[A, B]): G[B] = trans(underlying.modifyState(state))
override def access: G[(A, A => G[Boolean])] =
G.compose[(A, *)].compose[A => *].map(trans(underlying.access))(trans(_))

// --- Signal-specific methods
override def discrete: Stream[G, A] = underlying.discrete.translate(trans)
override def continuous: Stream[G, A] = underlying.continuous.translate(trans)
override def changes(implicit eqA: Eq[A]): Signal[G, A] = underlying.changes.mapK(trans)
}
private final class LensSignallingRef[F[_], A, B](underlying: SignallingRef[F, A])(
lensGet: A => B,
lensSet: A => B => A
Expand Down