Skip to content

Commit c9c1696

Browse files
committed
Prefer flatModify to modify(...).flatten
1 parent 4a252d5 commit c9c1696

File tree

3 files changed

+27
-29
lines changed

3 files changed

+27
-29
lines changed

core/shared/src/main/scala/fs2/concurrent/Channel.scala

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -153,8 +153,8 @@ object Channel {
153153

154154
def send(a: A) =
155155
F.deferred[Unit].flatMap { producer =>
156-
F.uncancelable { poll =>
157-
state.modify {
156+
state.flatModifyFull { case (poll, state) =>
157+
state match {
158158
case s @ State(_, _, _, _, closed @ true) =>
159159
(s, Channel.closed[Unit].pure[F])
160160

@@ -169,12 +169,12 @@ object Channel {
169169
State(values, size, None, (a, producer) :: producers, false),
170170
notifyStream(waiting).as(rightUnit) <* waitOnBound(producer, poll)
171171
)
172-
}.flatten
172+
}
173173
}
174174
}
175175

176176
def trySend(a: A) =
177-
state.modify {
177+
state.flatModify {
178178
case s @ State(_, _, _, _, closed @ true) =>
179179
(s, Channel.closed[Boolean].pure[F])
180180

@@ -186,22 +186,19 @@ object Channel {
186186
)
187187
else
188188
(s, rightFalse.pure[F])
189-
}.flatten
189+
}
190190

191191
def close =
192-
state
193-
.modify {
194-
case s @ State(_, _, _, _, closed @ true) =>
195-
(s, Channel.closed[Unit].pure[F])
192+
state.flatModify {
193+
case s @ State(_, _, _, _, closed @ true) =>
194+
(s, Channel.closed[Unit].pure[F])
196195

197-
case State(values, size, waiting, producers, closed @ false) =>
198-
(
199-
State(values, size, None, producers, true),
200-
notifyStream(waiting).as(rightUnit) <* signalClosure
201-
)
202-
}
203-
.flatten
204-
.uncancelable
196+
case State(values, size, waiting, producers, closed @ false) =>
197+
(
198+
State(values, size, None, producers, true),
199+
notifyStream(waiting).as(rightUnit) <* signalClosure
200+
)
201+
}
205202

206203
def isClosed = closedGate.tryGet.map(_.isDefined)
207204

core/shared/src/main/scala/fs2/concurrent/Signal.scala

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import cats.data.OptionT
2626
import cats.kernel.Eq
2727
import cats.effect.kernel.{Concurrent, Deferred, Ref, Resource}
2828
import cats.effect.std.MapRef
29+
import cats.effect.syntax.all._
2930
import cats.syntax.all._
3031
import cats.{Applicative, Functor, Invariant, Monad}
3132

@@ -271,12 +272,12 @@ object SignallingRef {
271272
def go(id: Long, lastSeen: Long): Stream[F, A] = {
272273
def getNext: F[(A, Long)] =
273274
F.deferred[(A, Long)].flatMap { wait =>
274-
state.modify { case state @ State(value, lastUpdate, listeners) =>
275+
state.flatModify { case state @ State(value, lastUpdate, listeners) =>
275276
if (lastUpdate != lastSeen)
276277
state -> (value -> lastUpdate).pure[F]
277278
else
278279
state.copy(listeners = listeners + (id -> wait)) -> wait.get
279-
}.flatten
280+
}
280281
}
281282

282283
Stream.eval(getNext).flatMap { case (a, lastUpdate) =>
@@ -297,10 +298,10 @@ object SignallingRef {
297298
def update(f: A => A): F[Unit] = modify(a => (f(a), ()))
298299

299300
def modify[B](f: A => (A, B)): F[B] =
300-
state.modify(updateAndNotify(_, f)).flatten
301+
state.flatModify(updateAndNotify(_, f))
301302

302303
def tryModify[B](f: A => (A, B)): F[Option[B]] =
303-
state.tryModify(updateAndNotify(_, f)).flatMap(_.sequence)
304+
state.tryModify(updateAndNotify(_, f)).flatMap(_.sequence).uncancelable
304305

305306
def tryUpdate(f: A => A): F[Boolean] =
306307
tryModify(a => (f(a), ())).map(_.isDefined)
@@ -530,7 +531,7 @@ object SignallingMapRef {
530531
def go(id: Long, lastSeen: Long): Stream[F, Option[V]] = {
531532
def getNext: F[(Option[V], Long)] =
532533
F.deferred[(Option[V], Long)].flatMap { wait =>
533-
state.modify { state =>
534+
state.flatModify { state =>
534535
val keyState = state.keys.get(k)
535536
val value = keyState.flatMap(_.value)
536537
val lastUpdate = keyState.fold(-1L)(_.lastUpdate)
@@ -544,7 +545,7 @@ object SignallingMapRef {
544545
.updated(k, KeyState(value, lastUpdate, listeners.updated(id, wait)))
545546
state.copy(keys = newKeys) -> wait.get
546547
}
547-
}.flatten
548+
}
548549
}
549550

550551
Stream.eval(getNext).flatMap { case (v, lastUpdate) =>
@@ -580,10 +581,10 @@ object SignallingMapRef {
580581
def update(f: Option[V] => Option[V]): F[Unit] = modify(v => (f(v), ()))
581582

582583
def modify[U](f: Option[V] => (Option[V], U)): F[U] =
583-
state.modify(updateAndNotify(_, k, f)).flatten
584+
state.flatModify(updateAndNotify(_, k, f))
584585

585586
def tryModify[U](f: Option[V] => (Option[V], U)): F[Option[U]] =
586-
state.tryModify(updateAndNotify(_, k, f)).flatMap(_.sequence)
587+
state.tryModify(updateAndNotify(_, k, f)).flatMap(_.sequence).uncancelable
587588

588589
def tryUpdate(f: Option[V] => Option[V]): F[Boolean] =
589590
tryModify(a => (f(a), ())).map(_.isDefined)

core/shared/src/main/scala/fs2/internal/ScopedResource.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ private[internal] object ScopedResource {
142142
.flatMap(finalizer => finalizer.map(_(ec)).getOrElse(pru))
143143

144144
def acquired(finalizer: Resource.ExitCase => F[Unit]): F[Either[Throwable, Boolean]] =
145-
state.modify { s =>
145+
state.flatModify { s =>
146146
if (s.isFinished)
147147
// state is closed and there are no leases, finalizer has to be invoked right away
148148
s -> finalizer(Resource.ExitCase.Succeeded).as(false).attempt
@@ -154,7 +154,7 @@ private[internal] object ScopedResource {
154154
Boolean
155155
]).pure[F]
156156
}
157-
}.flatten
157+
}
158158

159159
def lease: F[Option[Lease[F]]] =
160160
state.modify { s =>
@@ -173,14 +173,14 @@ private[internal] object ScopedResource {
173173
}
174174
.flatMap { now =>
175175
if (now.isFinished)
176-
state.modify { s =>
176+
state.flatModify { s =>
177177
// Scope is closed and this is last lease, assure finalizer is removed from the state and run
178178
// previous finalizer shall be always present at this point, this shall invoke it
179179
s.copy(finalizer = None) -> (s.finalizer match {
180180
case Some(ff) => ff(Resource.ExitCase.Succeeded)
181181
case None => pru
182182
})
183-
}.flatten
183+
}
184184
else
185185
pru
186186
}

0 commit comments

Comments
 (0)