Skip to content

Commit 7332294

Browse files
committed
Some modifies are cancelable 😇
1 parent c9c1696 commit 7332294

File tree

1 file changed

+26
-22
lines changed

1 file changed

+26
-22
lines changed

‎core/shared/src/main/scala/fs2/concurrent/Signal.scala‎

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -271,14 +271,16 @@ object SignallingRef {
271271
private[this] def getAndDiscreteUpdatesImpl = {
272272
def go(id: Long, lastSeen: Long): Stream[F, A] = {
273273
def getNext: F[(A, Long)] =
274-
F.deferred[(A, Long)].flatMap { wait =>
275-
state.flatModify { case state @ State(value, lastUpdate, listeners) =>
276-
if (lastUpdate != lastSeen)
277-
state -> (value -> lastUpdate).pure[F]
278-
else
279-
state.copy(listeners = listeners + (id -> wait)) -> wait.get
274+
F.deferred[(A, Long)]
275+
.flatMap { wait =>
276+
state.modify { case state @ State(value, lastUpdate, listeners) =>
277+
if (lastUpdate != lastSeen)
278+
state -> (value -> lastUpdate).pure[F]
279+
else
280+
state.copy(listeners = listeners + (id -> wait)) -> wait.get
281+
}
280282
}
281-
}
283+
.flatten // cancelable
282284

283285
Stream.eval(getNext).flatMap { case (a, lastUpdate) =>
284286
Stream.emit(a) ++ go(id, lastSeen = lastUpdate)
@@ -530,23 +532,25 @@ object SignallingMapRef {
530532
private[this] def getAndDiscreteUpdatesImpl = {
531533
def go(id: Long, lastSeen: Long): Stream[F, Option[V]] = {
532534
def getNext: F[(Option[V], Long)] =
533-
F.deferred[(Option[V], Long)].flatMap { wait =>
534-
state.flatModify { state =>
535-
val keyState = state.keys.get(k)
536-
val value = keyState.flatMap(_.value)
537-
val lastUpdate = keyState.fold(-1L)(_.lastUpdate)
538-
val listeners = keyState.fold(LongMap.empty[Listener])(_.listeners)
539-
540-
if (lastUpdate != lastSeen)
541-
state -> (value -> lastUpdate).pure[F]
542-
else {
543-
val newKeys =
544-
state.keys
545-
.updated(k, KeyState(value, lastUpdate, listeners.updated(id, wait)))
546-
state.copy(keys = newKeys) -> wait.get
535+
F.deferred[(Option[V], Long)]
536+
.flatMap { wait =>
537+
state.modify { state =>
538+
val keyState = state.keys.get(k)
539+
val value = keyState.flatMap(_.value)
540+
val lastUpdate = keyState.fold(-1L)(_.lastUpdate)
541+
val listeners = keyState.fold(LongMap.empty[Listener])(_.listeners)
542+
543+
if (lastUpdate != lastSeen)
544+
state -> (value -> lastUpdate).pure[F]
545+
else {
546+
val newKeys =
547+
state.keys
548+
.updated(k, KeyState(value, lastUpdate, listeners.updated(id, wait)))
549+
state.copy(keys = newKeys) -> wait.get
550+
}
547551
}
548552
}
549-
}
553+
.flatten // cancelable
550554

551555
Stream.eval(getNext).flatMap { case (v, lastUpdate) =>
552556
Stream.emit(v) ++ go(id, lastSeen = lastUpdate)

0 commit comments

Comments
 (0)