Skip to content

Commit 193f7b2

Browse files
authored
Merge pull request #3142 from armanbilge/update/cats-effect-3.5.0-RC1
Update to Cats Effect v3.5.0-RC2
2 parents a70daca + 56b8602 commit 193f7b2

File tree

12 files changed

+219
-204
lines changed

12 files changed

+219
-204
lines changed

build.sbt

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import com.typesafe.tools.mima.core._
22

33
Global / onChangedBuildSource := ReloadOnSourceChanges
44

5-
ThisBuild / tlBaseVersion := "3.6"
5+
ThisBuild / tlBaseVersion := "3.7"
66

77
ThisBuild / organization := "co.fs2"
88
ThisBuild / organizationName := "Functional Streams for Scala"
@@ -178,7 +178,14 @@ ThisBuild / mimaBinaryIssueFilters ++= Seq(
178178
),
179179
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.io.package.readBytesFromInputStream"),
180180
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.io.package.readInputStreamGeneric"),
181-
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.io.package.<clinit>")
181+
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.io.package.<clinit>"),
182+
ProblemFilters.exclude[IncompatibleResultTypeProblem]("fs2.io.net.Socket.forAsync"),
183+
ProblemFilters.exclude[IncompatibleMethTypeProblem](
184+
"fs2.io.net.SocketCompanionPlatform#AsyncSocket.this"
185+
),
186+
ProblemFilters.exclude[IncompatibleMethTypeProblem](
187+
"fs2.io.net.unixsocket.UnixSocketsCompanionPlatform#AsyncSocket.this"
188+
)
182189
)
183190

184191
lazy val root = tlCrossRootProject
@@ -213,9 +220,9 @@ lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform)
213220
libraryDependencies ++= Seq(
214221
"org.scodec" %%% "scodec-bits" % "1.1.35",
215222
"org.typelevel" %%% "cats-core" % "2.9.0",
216-
"org.typelevel" %%% "cats-effect" % "3.4.7",
217-
"org.typelevel" %%% "cats-effect-laws" % "3.4.7" % Test,
218-
"org.typelevel" %%% "cats-effect-testkit" % "3.4.7" % Test,
223+
"org.typelevel" %%% "cats-effect" % "3.5.0-RC2",
224+
"org.typelevel" %%% "cats-effect-laws" % "3.5.0-RC2" % Test,
225+
"org.typelevel" %%% "cats-effect-testkit" % "3.5.0-RC2" % Test,
219226
"org.typelevel" %%% "cats-laws" % "2.9.0" % Test,
220227
"org.typelevel" %%% "discipline-munit" % "2.0.0-M3" % Test,
221228
"org.typelevel" %%% "munit-cats-effect" % "2.0.0-M3" % Test,

core/shared/src/main/scala/fs2/concurrent/Channel.scala

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -153,8 +153,8 @@ object Channel {
153153

154154
def send(a: A) =
155155
F.deferred[Unit].flatMap { producer =>
156-
F.uncancelable { poll =>
157-
state.modify {
156+
state.flatModifyFull { case (poll, state) =>
157+
state match {
158158
case s @ State(_, _, _, _, closed @ true) =>
159159
(s, Channel.closed[Unit].pure[F])
160160

@@ -169,12 +169,12 @@ object Channel {
169169
State(values, size, None, (a, producer) :: producers, false),
170170
notifyStream(waiting).as(rightUnit) <* waitOnBound(producer, poll)
171171
)
172-
}.flatten
172+
}
173173
}
174174
}
175175

176176
def trySend(a: A) =
177-
state.modify {
177+
state.flatModify {
178178
case s @ State(_, _, _, _, closed @ true) =>
179179
(s, Channel.closed[Boolean].pure[F])
180180

@@ -186,22 +186,19 @@ object Channel {
186186
)
187187
else
188188
(s, rightFalse.pure[F])
189-
}.flatten
189+
}
190190

191191
def close =
192-
state
193-
.modify {
194-
case s @ State(_, _, _, _, closed @ true) =>
195-
(s, Channel.closed[Unit].pure[F])
192+
state.flatModify {
193+
case s @ State(_, _, _, _, closed @ true) =>
194+
(s, Channel.closed[Unit].pure[F])
196195

197-
case State(values, size, waiting, producers, closed @ false) =>
198-
(
199-
State(values, size, None, producers, true),
200-
notifyStream(waiting).as(rightUnit) <* signalClosure
201-
)
202-
}
203-
.flatten
204-
.uncancelable
196+
case State(values, size, waiting, producers, closed @ false) =>
197+
(
198+
State(values, size, None, producers, true),
199+
notifyStream(waiting).as(rightUnit) <* signalClosure
200+
)
201+
}
205202

206203
def isClosed = closedGate.tryGet.map(_.isDefined)
207204

core/shared/src/main/scala/fs2/concurrent/Signal.scala

Lines changed: 33 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import cats.data.OptionT
2626
import cats.kernel.Eq
2727
import cats.effect.kernel.{Concurrent, Deferred, Ref, Resource}
2828
import cats.effect.std.MapRef
29+
import cats.effect.syntax.all._
2930
import cats.syntax.all._
3031
import cats.{Applicative, Functor, Invariant, Monad}
3132

@@ -270,14 +271,16 @@ object SignallingRef {
270271
private[this] def getAndDiscreteUpdatesImpl = {
271272
def go(id: Long, lastSeen: Long): Stream[F, A] = {
272273
def getNext: F[(A, Long)] =
273-
F.deferred[(A, Long)].flatMap { wait =>
274-
state.modify { case state @ State(value, lastUpdate, listeners) =>
275-
if (lastUpdate != lastSeen)
276-
state -> (value -> lastUpdate).pure[F]
277-
else
278-
state.copy(listeners = listeners + (id -> wait)) -> wait.get
279-
}.flatten
280-
}
274+
F.deferred[(A, Long)]
275+
.flatMap { wait =>
276+
state.modify { case state @ State(value, lastUpdate, listeners) =>
277+
if (lastUpdate != lastSeen)
278+
state -> (value -> lastUpdate).pure[F]
279+
else
280+
state.copy(listeners = listeners + (id -> wait)) -> wait.get
281+
}
282+
}
283+
.flatten // cancelable
281284

282285
Stream.eval(getNext).flatMap { case (a, lastUpdate) =>
283286
Stream.emit(a) ++ go(id, lastSeen = lastUpdate)
@@ -297,10 +300,10 @@ object SignallingRef {
297300
def update(f: A => A): F[Unit] = modify(a => (f(a), ()))
298301

299302
def modify[B](f: A => (A, B)): F[B] =
300-
state.modify(updateAndNotify(_, f)).flatten
303+
state.flatModify(updateAndNotify(_, f))
301304

302305
def tryModify[B](f: A => (A, B)): F[Option[B]] =
303-
state.tryModify(updateAndNotify(_, f)).flatMap(_.sequence)
306+
state.tryModify(updateAndNotify(_, f)).flatMap(_.sequence).uncancelable
304307

305308
def tryUpdate(f: A => A): F[Boolean] =
306309
tryModify(a => (f(a), ())).map(_.isDefined)
@@ -529,23 +532,25 @@ object SignallingMapRef {
529532
private[this] def getAndDiscreteUpdatesImpl = {
530533
def go(id: Long, lastSeen: Long): Stream[F, Option[V]] = {
531534
def getNext: F[(Option[V], Long)] =
532-
F.deferred[(Option[V], Long)].flatMap { wait =>
533-
state.modify { state =>
534-
val keyState = state.keys.get(k)
535-
val value = keyState.flatMap(_.value)
536-
val lastUpdate = keyState.fold(-1L)(_.lastUpdate)
537-
val listeners = keyState.fold(LongMap.empty[Listener])(_.listeners)
538-
539-
if (lastUpdate != lastSeen)
540-
state -> (value -> lastUpdate).pure[F]
541-
else {
542-
val newKeys =
543-
state.keys
544-
.updated(k, KeyState(value, lastUpdate, listeners.updated(id, wait)))
545-
state.copy(keys = newKeys) -> wait.get
535+
F.deferred[(Option[V], Long)]
536+
.flatMap { wait =>
537+
state.modify { state =>
538+
val keyState = state.keys.get(k)
539+
val value = keyState.flatMap(_.value)
540+
val lastUpdate = keyState.fold(-1L)(_.lastUpdate)
541+
val listeners = keyState.fold(LongMap.empty[Listener])(_.listeners)
542+
543+
if (lastUpdate != lastSeen)
544+
state -> (value -> lastUpdate).pure[F]
545+
else {
546+
val newKeys =
547+
state.keys
548+
.updated(k, KeyState(value, lastUpdate, listeners.updated(id, wait)))
549+
state.copy(keys = newKeys) -> wait.get
550+
}
546551
}
547-
}.flatten
548-
}
552+
}
553+
.flatten // cancelable
549554

550555
Stream.eval(getNext).flatMap { case (v, lastUpdate) =>
551556
Stream.emit(v) ++ go(id, lastSeen = lastUpdate)
@@ -580,10 +585,10 @@ object SignallingMapRef {
580585
def update(f: Option[V] => Option[V]): F[Unit] = modify(v => (f(v), ()))
581586

582587
def modify[U](f: Option[V] => (Option[V], U)): F[U] =
583-
state.modify(updateAndNotify(_, k, f)).flatten
588+
state.flatModify(updateAndNotify(_, k, f))
584589

585590
def tryModify[U](f: Option[V] => (Option[V], U)): F[Option[U]] =
586-
state.tryModify(updateAndNotify(_, k, f)).flatMap(_.sequence)
591+
state.tryModify(updateAndNotify(_, k, f)).flatMap(_.sequence).uncancelable
587592

588593
def tryUpdate(f: Option[V] => Option[V]): F[Boolean] =
589594
tryModify(a => (f(a), ())).map(_.isDefined)

core/shared/src/main/scala/fs2/internal/ScopedResource.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ private[internal] object ScopedResource {
142142
.flatMap(finalizer => finalizer.map(_(ec)).getOrElse(pru))
143143

144144
def acquired(finalizer: Resource.ExitCase => F[Unit]): F[Either[Throwable, Boolean]] =
145-
state.modify { s =>
145+
state.flatModify { s =>
146146
if (s.isFinished)
147147
// state is closed and there are no leases, finalizer has to be invoked right away
148148
s -> finalizer(Resource.ExitCase.Succeeded).as(false).attempt
@@ -154,7 +154,7 @@ private[internal] object ScopedResource {
154154
Boolean
155155
]).pure[F]
156156
}
157-
}.flatten
157+
}
158158

159159
def lease: F[Option[Lease[F]]] =
160160
state.modify { s =>
@@ -173,14 +173,14 @@ private[internal] object ScopedResource {
173173
}
174174
.flatMap { now =>
175175
if (now.isFinished)
176-
state.modify { s =>
176+
state.flatModify { s =>
177177
// Scope is closed and this is last lease, assure finalizer is removed from the state and run
178178
// previous finalizer shall be always present at this point, this shall invoke it
179179
s.copy(finalizer = None) -> (s.finalizer match {
180180
case Some(ff) => ff(Resource.ExitCase.Succeeded)
181181
case None => pru
182182
})
183-
}.flatten
183+
}
184184
else
185185
pru
186186
}

0 commit comments

Comments
 (0)