Skip to content

Commit ce061a5

Browse files
authored
Merge pull request #3125 from armanbilge/feature/signal-get-and-discrete-updates
Add `Signal#getAndDiscreteUpdates`
2 parents 325c74f + b3be9fd commit ce061a5

File tree

2 files changed

+93
-10
lines changed

2 files changed

+93
-10
lines changed

core/shared/src/main/scala/fs2/concurrent/Signal.scala

Lines changed: 76 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ package concurrent
2424

2525
import cats.data.OptionT
2626
import cats.kernel.Eq
27-
import cats.effect.kernel.{Concurrent, Deferred, Ref}
27+
import cats.effect.kernel.{Concurrent, Deferred, Ref, Resource}
2828
import cats.effect.std.MapRef
2929
import cats.syntax.all._
3030
import cats.{Applicative, Functor, Invariant, Monad}
@@ -34,7 +34,7 @@ import scala.collection.immutable.LongMap
3434
/** Pure holder of a single value of type `A` that can be read in the effect `F`. */
3535
trait Signal[F[_], A] { outer =>
3636

37-
/** Returns a stream of the updates to this signal.
37+
/** Returns a stream of the current value and subsequent updates to this signal.
3838
*
3939
* Even if you are pulling as fast as possible, updates that are very close together may
4040
* result in only the last update appearing in the stream. In general, when you pull
@@ -48,17 +48,33 @@ trait Signal[F[_], A] { outer =>
4848
*/
4949
def continuous: Stream[F, A]
5050

51-
/** Asynchronously gets the current value of this `Signal`.
51+
/** Gets the current value of this `Signal`.
5252
*/
5353
def get: F[A]
5454

55+
/** Returns the current value of this `Signal` and a `Stream` to subscribe to
56+
* subsequent updates, with the same semantics as [[discrete]]. The updates
57+
* stream should be compiled at most once.
58+
*/
59+
def getAndDiscreteUpdates(implicit F: Concurrent[F]): Resource[F, (A, Stream[F, A])] =
60+
discrete.pull.uncons1
61+
.flatMap(Pull.outputOption1(_))
62+
.streamNoScope
63+
.compile
64+
.resource
65+
.onlyOrError
66+
5567
/** Returns a signal derived from this one, that drops update events that did not change the value.
5668
*/
5769
def changes(implicit eqA: Eq[A]): Signal[F, A] =
5870
new Signal[F, A] {
5971
def discrete = outer.discrete.changes
6072
def continuous = outer.continuous
6173
def get = outer.get
74+
override def getAndDiscreteUpdates(implicit F: Concurrent[F]) =
75+
outer.getAndDiscreteUpdates.map { case (got, updates) =>
76+
(got, updates.dropWhile(_ === got).changes)
77+
}
6278
}
6379

6480
/** Returns when the condition becomes true, semantically blocking
@@ -126,13 +142,23 @@ object Signal extends SignalInstances {
126142
new Signal[F, A] {
127143
def get: F[A] = F.pure(a)
128144
def continuous: Stream[Pure, A] = Stream.constant(a)
145+
override def getAndDiscreteUpdates(implicit
146+
ev: Concurrent[F]
147+
): Resource[F, (A, Stream[F, A])] =
148+
Resource.pure((a, Stream.never(F)))
129149
def discrete: Stream[F, A] = Stream(a) ++ Stream.never
130150
}
131151

132152
def mapped[F[_]: Functor, A, B](fa: Signal[F, A])(f: A => B): Signal[F, B] =
133153
new Signal[F, B] {
134154
def continuous: Stream[F, B] = fa.continuous.map(f)
135155
def discrete: Stream[F, B] = fa.discrete.map(f)
156+
override def getAndDiscreteUpdates(implicit
157+
ev: Concurrent[F]
158+
): Resource[F, (B, Stream[F, B])] =
159+
fa.getAndDiscreteUpdates(ev).map { case (a, updates) =>
160+
(f(a), updates.map(f))
161+
}
136162
def get: F[B] = Functor[F].map(fa.get)(f)
137163
}
138164

@@ -232,7 +258,16 @@ object SignallingRef {
232258

233259
def continuous: Stream[F, A] = Stream.repeatEval(get)
234260

235-
def discrete: Stream[F, A] = {
261+
def discrete: Stream[F, A] =
262+
Stream.resource(getAndDiscreteUpdates).flatMap { case (a, updates) =>
263+
Stream.emit(a) ++ updates
264+
}
265+
266+
override def getAndDiscreteUpdates(implicit
267+
ev: Concurrent[F]
268+
): Resource[F, (A, Stream[F, A])] =
269+
getAndDiscreteUpdatesImpl
270+
private[this] def getAndDiscreteUpdatesImpl = {
236271
def go(id: Long, lastSeen: Long): Stream[F, A] = {
237272
def getNext: F[(A, Long)] =
238273
F.deferred[(A, Long)].flatMap { wait =>
@@ -252,9 +287,8 @@ object SignallingRef {
252287
def cleanup(id: Long): F[Unit] =
253288
state.update(s => s.copy(listeners = s.listeners - id))
254289

255-
Stream.eval(state.get).flatMap { state =>
256-
Stream.emit(state.value) ++
257-
Stream.bracket(newId)(cleanup).flatMap(go(_, state.lastUpdate))
290+
Resource.eval(state.get).map { s =>
291+
(s.value, Stream.bracket(newId)(cleanup).flatMap(go(_, s.lastUpdate)))
258292
}
259293
}
260294

@@ -318,6 +352,11 @@ object SignallingRef {
318352

319353
def get: F[B] = F.map(underlying.get)(a => lensGet(a))
320354

355+
override def getAndDiscreteUpdates(implicit ev: Concurrent[F]): Resource[F, (B, Stream[F, B])] =
356+
underlying.getAndDiscreteUpdates.map { case (a, updates) =>
357+
(lensGet(a), updates.map(lensGet))
358+
}
359+
321360
def set(b: B): F[Unit] = underlying.update(a => lensModify(a)(_ => b))
322361

323362
override def getAndSet(b: B): F[B] =
@@ -369,6 +408,12 @@ object SignallingRef {
369408
def get: F[B] = fa.get.map(f)
370409
def discrete: Stream[F, B] = fa.discrete.map(f)
371410
def continuous: Stream[F, B] = fa.continuous.map(f)
411+
override def getAndDiscreteUpdates(implicit
412+
ev: Concurrent[F]
413+
): Resource[F, (B, Stream[F, B])] =
414+
fa.getAndDiscreteUpdates(ev).map { case (a, updates) =>
415+
(f(a), updates.map(f))
416+
}
372417
def set(b: B): F[Unit] = fa.set(g(b))
373418
def access: F[(B, B => F[Boolean])] =
374419
fa.access.map { case (getter, setter) =>
@@ -471,7 +516,17 @@ object SignallingMapRef {
471516

472517
def continuous: Stream[F, Option[V]] = Stream.repeatEval(get)
473518

474-
def discrete: Stream[F, Option[V]] = {
519+
def discrete: Stream[F, Option[V]] =
520+
Stream.resource(getAndDiscreteUpdates).flatMap { case (a, updates) =>
521+
Stream.emit(a) ++ updates
522+
}
523+
524+
override def getAndDiscreteUpdates(implicit
525+
ev: Concurrent[F]
526+
): Resource[F, (Option[V], Stream[F, Option[V]])] =
527+
getAndDiscreteUpdatesImpl
528+
529+
private[this] def getAndDiscreteUpdatesImpl = {
475530
def go(id: Long, lastSeen: Long): Stream[F, Option[V]] = {
476531
def getNext: F[(Option[V], Long)] =
477532
F.deferred[(Option[V], Long)].flatMap { wait =>
@@ -510,11 +565,13 @@ object SignallingMapRef {
510565
}
511566
}
512567

513-
Stream.eval(state.get).flatMap { state =>
514-
Stream.emit(state.keys.get(k).flatMap(_.value)) ++
568+
Resource.eval(state.get).map { state =>
569+
(
570+
state.keys.get(k).flatMap(_.value),
515571
Stream
516572
.bracket(newId)(cleanup)
517573
.flatMap(go(_, state.keys.get(k).fold(-1L)(_.lastUpdate)))
574+
)
518575
}
519576
}
520577

@@ -597,6 +654,15 @@ private[concurrent] trait SignalInstances extends SignalLowPriorityInstances {
597654
def continuous: Stream[F, B] = Stream.repeatEval(get)
598655

599656
def get: F[B] = ff.get.ap(fa.get)
657+
658+
override def getAndDiscreteUpdates(implicit
659+
ev: Concurrent[F]
660+
): Resource[F, (B, Stream[F, B])] = getAndDiscreteUpdatesImpl
661+
662+
private[this] def getAndDiscreteUpdatesImpl =
663+
(ff.getAndDiscreteUpdates, fa.getAndDiscreteUpdates).mapN { case ((f, fs), (a, as)) =>
664+
(f(a), nondeterministicZip(fs, as).map { case (f, a) => f(a) })
665+
}
600666
}
601667
}
602668
}

core/shared/src/test/scala/fs2/concurrent/SignalSuite.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,23 @@ class SignalSuite extends Fs2Suite {
266266
.intercept[NoSuchElementException]
267267
}
268268

269+
test("hold consistent with getAndDiscreteUpdates") {
270+
forAllF { (init: Int, stream: Stream[Pure, Int]) =>
271+
TestControl.executeEmbed {
272+
stream.evalMap(IO.sleep(1.second).as(_)).holdResource(init).use { sig =>
273+
sig.getAndDiscreteUpdates.use { case (got, updates) =>
274+
IO(assertEquals(got, init)) *>
275+
updates
276+
.interruptAfter(Long.MaxValue.nanos)
277+
.compile
278+
.toVector
279+
.assertEquals(stream.compile.toVector)
280+
}
281+
}
282+
}
283+
}
284+
}
285+
269286
test("waitUntil") {
270287
val target = 5
271288
val expected = 1

0 commit comments

Comments
 (0)