Skip to content

Commit 95e8e53

Browse files
authored
Merge pull request #3920 from djspiewak/backport/mid-3.5
2 parents e1ab2aa + 931cd64 commit 95e8e53

File tree

18 files changed

+271
-55
lines changed

18 files changed

+271
-55
lines changed

.github/workflows/ci.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -518,6 +518,11 @@ jobs:
518518
SONATYPE_CREDENTIAL_HOST: ${{ secrets.SONATYPE_CREDENTIAL_HOST }}
519519
run: sbt tlCiRelease
520520

521+
- name: Post release to Discord
522+
env:
523+
DISCORD_WEBHOOK_URL: ${{ secrets.DISCORD_WEBHOOK_URL }}
524+
run: scripts/post-release-discord ${{ github.ref }}
525+
521526
dependency-submission:
522527
name: Submit Dependencies
523528
if: github.event_name != 'pull_request'

build.sbt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,13 @@ ThisBuild / githubWorkflowBuild := Seq("JVM", "JS", "Native").map { platform =>
220220
)
221221
)
222222

223+
ThisBuild / githubWorkflowPublish +=
224+
WorkflowStep.Run(
225+
List("scripts/post-release-discord ${{ github.ref }}"),
226+
name = Some("Post release to Discord"),
227+
env = Map("DISCORD_WEBHOOK_URL" -> "${{ secrets.DISCORD_WEBHOOK_URL }}")
228+
)
229+
223230
val ciVariants = CI.AllCIs.map(_.command)
224231
val jsCiVariants = CI.AllJSCIs.map(_.command)
225232
ThisBuild / githubWorkflowBuildMatrixAdditions += "ci" -> ciVariants

core/shared/src/main/scala/cats/effect/IOFiber.scala

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,11 @@ private final class IOFiber[A](
241241

242242
case 1 =>
243243
val cur = cur0.asInstanceOf[Error]
244-
runLoop(failed(cur.t, 0), nextCancelation, nextAutoCede)
244+
val ex = cur.t
245+
if (!NonFatal(ex))
246+
onFatalFailure(ex)
247+
248+
runLoop(failed(ex, 0), nextCancelation, nextAutoCede)
245249

246250
case 2 =>
247251
val cur = cur0.asInstanceOf[Delay[Any]]
@@ -315,7 +319,11 @@ private final class IOFiber[A](
315319

316320
case 1 =>
317321
val error = ioe.asInstanceOf[Error]
318-
runLoop(failed(error.t, 0), nextCancelation - 1, nextAutoCede)
322+
val ex = error.t
323+
if (!NonFatal(ex))
324+
onFatalFailure(ex)
325+
326+
runLoop(failed(ex, 0), nextCancelation - 1, nextAutoCede)
319327

320328
case 2 =>
321329
val delay = ioe.asInstanceOf[Delay[Any]]
@@ -382,7 +390,11 @@ private final class IOFiber[A](
382390

383391
case 1 =>
384392
val error = ioe.asInstanceOf[Error]
385-
runLoop(failed(error.t, 0), nextCancelation - 1, nextAutoCede)
393+
val ex = error.t
394+
if (!NonFatal(ex))
395+
onFatalFailure(ex)
396+
397+
runLoop(failed(ex, 0), nextCancelation - 1, nextAutoCede)
386398

387399
case 2 =>
388400
val delay = ioe.asInstanceOf[Delay[Any]]
@@ -434,6 +446,8 @@ private final class IOFiber[A](
434446
case 1 =>
435447
val error = ioa.asInstanceOf[Error]
436448
val t = error.t
449+
if (!NonFatal(t))
450+
onFatalFailure(t)
437451
// We need to augment the exception here because it doesn't get
438452
// forwarded to the `failed` path.
439453
Tracing.augmentThrowable(runtime.enhancedExceptions, t, tracingEvents)

docs/core/starvation-and-tuning.md

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@ title: Starvation and Tuning
66
All Cats Effect applications constructed via `IOApp` have an automatic mechanism which periodically checks to see if the application runtime is starving for compute resources. If you ever see warnings which look like the following, they are the result of this mechanism automatically detecting that the responsiveness of your application runtime is below the configured threshold. Note that the timestamp is the time when the starvation was detected, which is not precisely the time when starvation (or the task that is responsible) began.
77

88
```
9-
2023-01-28T00:16:24.101Z [WARNING] Your app's responsiveness to a new asynchronous
9+
2023-01-28T00:16:24.101Z [WARNING] Your app's responsiveness to a new asynchronous
1010
event (such as a new connection, an upstream response, or a timer) was in excess
11-
of 40 milliseconds. Your CPU is probably starving. Consider increasing the
11+
of 40 milliseconds. Your CPU is probably starving. Consider increasing the
1212
granularity of your delays or adding more cedes. This may also be a sign that you
1313
are unintentionally running blocking I/O operations (such as File or InetAddress)
1414
without the blocking combinator.
@@ -27,9 +27,12 @@ import cats.effect._
2727
import cats.syntax.all._
2828

2929
object StarveThyself extends IOApp.Simple {
30-
val run =
30+
val run =
3131
0.until(100).toList parTraverse_ { i =>
32-
IO.println(s"running #$i") >> IO(Thread.sleep(10000))
32+
IO {
33+
println(s"running #$i")
34+
Thread.sleep(10000)
35+
}
3336
}
3437
}
3538
```
@@ -114,7 +117,7 @@ import cats.effect._
114117
import cats.syntax.all._
115118

116119
object StarveThyselfAgain extends IOApp.Simple {
117-
val run =
120+
val run =
118121
0.until(100).toList parTraverse_ { i =>
119122
IO.println(s"running #$i") >> IO(while (true) {})
120123
}
@@ -193,9 +196,9 @@ A quick-and-dirty experimental way that this can be established for your specifi
193196
val expensiveThing: IO[A] = ???
194197

195198
IO.unit.timed flatMap {
196-
case (baseline, _) =>
199+
case (baseline, _) =>
197200
IO.println(s"baseline stage cost is $baseline") >> expensiveThing.timed flatMap {
198-
case (cost, result) =>
201+
case (cost, result) =>
199202
if (cost / baseline > 1024)
200203
IO.println("expensiveThing is very expensive").as(result)
201204
else
@@ -348,6 +351,14 @@ The problem with "going wide" is it restricts the resources available within use
348351

349352
Of course, it's never as simple as doubling the number of vCPUs and halving the number of instances. Scaling is complicated, and you'll likely need to adjust other resources such as memory, connection limits, file handle counts, autoscaling signals, and such. Overall though, a good rule of thumb is to consider 8 vCPUs to be the minimum that should be available to a Cats Effect application at scale. 16 or even 32 vCPUs is likely to improve performance even further, and it is very much worth experimenting with these types of tuning parameters.
350353

354+
#### Not Enough Threads - Running in Kubernetes
355+
356+
One cause of "not enough threads" can be that the application is running inside kubernetes with a `cpu_quota` not configured. When the cpu limit is not configured, the JVM detects the number of available processors as 1, which will severely restrict what the runtime is able to do.
357+
358+
This guide on [containerizing java applications for kubernetes](https://learn.microsoft.com/en-us/azure/developer/java/containers/kubernetes#understand-jvm-available-processors) goes into more detail on the mechanism involved.
359+
360+
**All Cats Effect applications running in kubernetes should have either a `cpu_quota` configured or use the jvm `-XX:ActiveProcessorCount` argument to explicitly tell the jvm how many cores to use.**
361+
351362
### Too Many Threads
352363

353364
In a sense, this scenario is like the correlated inverse of the "Not Enough CPUs" option, and it happens surprisingly frequently in conventional JVM applications. Consider the thread list from the previous section (assuming 8 CPUs):
@@ -397,7 +408,7 @@ This can be accomplished in some cases by using `IO.executionContext` or `IO.exe
397408

398409
- The source of the rogue threads (e.g. another library) must have some initialization mechanism which accepts an `ExecutionContext` or `Executor`
399410
- The source of the rogue threads must not ever *block* on its rogue threads: they must only be used for compute
400-
+ The exception to this is if the library in question is a well-behaved Scala library, often from the Akka ecosystem, which wraps its blocking in `scala.concurrent.blocking(...)`. In this case, it is safe to use the Cats Effect compute pool, and the results will be similar to what happens with `IO.blocking`
411+
- The exception to this is if the library in question is a well-behaved Scala library, often from the Akka ecosystem, which wraps its blocking in `scala.concurrent.blocking(...)`. In this case, it is safe to use the Cats Effect compute pool, and the results will be similar to what happens with `IO.blocking`
401412

402413
Determining both of these factors often takes some investigation, usually of the "thread dumps and async profiler" variety, trying to catch the rogue threads in a blocked state. Alternatively, you can just read the library source code, though this can be very time consuming and error prone.
403414

@@ -446,6 +457,16 @@ The solution is to eliminate this over-provisioning. If a scheduled container is
446457

447458
As a very concrete example of this, if you have a cluster of 16 host instances in your cluster, each of which having 64 CPUs, that gives you a total of 1024 vCPUs to work with. If you configure each application container to use 4 vCPUs, you can support up to 256 application instances simultaneously (without resizing the cluster). Over-provisioning by a factor of 100% would suggest that you can support up to 512 application instances. **Do not do this.** Instead, resize the application instances to use either 8 or 16 vCPUs each. If you take the latter approach, your cluster will support up to 64 application instances simultaneously. This *seems* like a downgrade, but these taller instances should (absent other constraints) support more than 4x more traffic than the smaller instances, meaning that the overall cluster is much more efficient.
448459

460+
#### Kubernetes CPU Pinning
461+
462+
Even if you have followed the above advice and avoided over-provisioning, the Linux kernel scheduler is unfortunately not aware of the Cats Effect scheduler and will likely actively work against the Cats Effect scheduler by moving Cats Effect worker threads between different CPUs, thereby destroying CPU cache-locality. In certain environments we can prevent this by configuring Kubernetes to pin an application to a gviven set of CPUs:
463+
1. Set the [CPU Manager Policy to static](https://kubernetes.io/docs/tasks/administer-cluster/cpu-management-policies/#static-policy)
464+
2. Ensure that your pod is in the [Guaranteed QoS class](https://kubernetes.io/docs/concepts/workloads/pods/pod-qos/#guaranteed)
465+
3. Request an integral number of CPUs for your Cats Effect application
466+
467+
You should be able to see the CPU assignment updates reflected in the kubelet logs.
468+
469+
449470
### Process Contention
450471

451472
All of the advice in this page is targeted towards the (very common) scenario in which your Cats Effect application `java` process is the only meaningfully active process on a given instance. In other words, there are no additional applications on the same server, no databases, nothing. If this is *not* the case, then a lot of the advice about having too many threads applies, but *worse*.
@@ -487,7 +508,7 @@ To entirely disable the checker (**not** recommended in most cases!), adjust you
487508
object MyMain extends IOApp {
488509

489510
// fully disable the checker
490-
override def runtimeConfig =
511+
override def runtimeConfig =
491512
super.runtimeConfig.copy(cpuStarvationCheckInitialDelay = Duration.Inf)
492513

493514
override def run(args: List[String]) = ???
@@ -506,7 +527,7 @@ import scala.concurrent.duration._
506527
object MyOtherMain extends IOApp {
507528

508529
// relax threshold to 500 milliseconds
509-
override def runtimeConfig =
530+
override def runtimeConfig =
510531
super.runtimeConfig.copy(cpuStarvationCheckInterval = 5.seconds)
511532

512533
override def run(args: List[String]) = ???

docs/std/hotswap.md

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -37,28 +37,25 @@ def rotating(n: Int): Resource[IO, Logger[IO]] =
3737
for {
3838
index <- Ref[IO].of(0)
3939
count <- Ref[IO].of(0)
40-
//Open the initial log file
41-
f <- hs.swap(file("0.log"))
42-
logFile <- Ref[IO].of(f)
40+
// Open the initial log file
41+
_ <- hs.swap(file("0.log"))
4342
} yield new Logger[IO] {
4443
def log(msg: String): IO[Unit] =
4544
count.get.flatMap { currentCount =>
4645
if (msg.length() < n - currentCount)
47-
for {
48-
currentFile <- logFile.get
49-
_ <- write(currentFile, msg)
50-
_ <- count.update(_ + msg.length())
51-
} yield ()
46+
hs.get.use { currentFile =>
47+
write(currentFile, msg) *>
48+
count.update(_ + msg.length())
49+
}
5250
else
5351
for {
54-
//Reset the log length counter
52+
// Reset the log length counter
5553
_ <- count.set(msg.length())
56-
//Increment the counter for the log file name
54+
// Increment the counter for the log file name
5755
idx <- index.updateAndGet(_ + 1)
58-
//Close the old log file and open the new one
59-
f <- hs.swap(file(s"$idx.log"))
60-
_ <- logFile.set(f)
61-
_ <- write(f, msg)
56+
// Close the old log file and open the new one
57+
_ <- hs.swap(file(s"$idx.log"))
58+
_ <- hs.get.use(write(_, msg))
6259
} yield ()
6360
}
6461
}

docs/third-party-resources.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@ This page outlines known third-party learning and training material (free and pa
2626
## Videos
2727

2828
These are some selected videos on various topics in Cats Effect:
29-
29+
* [The Case For Effect Systems](https://www.youtube.com/watch?v=qgfCmQ-2tW0), Daniel Spiewak
3030
* [Cats Effect 3](https://www.youtube.com/watch?v=KZtVBtOrP50&t=1s&ab_channel=ScalaintheCity), Daniel Spiewak
3131
* [Concurrency with Cats Effect](https://www.youtube.com/watch?v=Gig-f_HXvLI&ab_channel=FunctionalTV), Michael Pilquist
3232
* [How do Fibers work?](https://www.youtube.com/watch?v=x5_MmZVLiSM&ab_channel=ScalaWorld), Fabio Labella
3333
* [Cancellation in Cats Effect](https://www.youtube.com/watch?v=X9u3rgPz_zE&t=1002s&ab_channel=ScalaintheCity), Daniel Ciocîrlan
3434
* [Intro to Cats Effect](https://www.youtube.com/watch?v=83pXEdCpY4A&ab_channel=thoughtbot), Gavin Biesi
35-
* [The IO Monad for Scala](https://www.youtube.com/watch?v=8_TWM2t97r4&t=811s&ab_channel=ScalaIOFR), Gabriel Volpe
35+
* [The IO Monad for Scala](https://www.youtube.com/watch?v=8_TWM2t97r4&t=811s&ab_channel=ScalaIOFR), Gabriel Volpe

ioapp-tests/src/test/scala/IOAppSpec.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,34 @@ class IOAppSpec extends Specification {
189189
h.stderr() must contain("Boom!")
190190
}
191191

192+
"exit on raising a fatal error with attempt" in {
193+
val h = platform("RaiseFatalErrorAttempt", List.empty)
194+
h.awaitStatus() mustEqual 1
195+
h.stderr() must contain("Boom!")
196+
h.stdout() must not(contain("sadness"))
197+
}
198+
199+
"exit on raising a fatal error with handleError" in {
200+
val h = platform("RaiseFatalErrorHandle", List.empty)
201+
h.awaitStatus() mustEqual 1
202+
h.stderr() must contain("Boom!")
203+
h.stdout() must not(contain("sadness"))
204+
}
205+
206+
"exit on raising a fatal error inside a map" in {
207+
val h = platform("RaiseFatalErrorMap", List.empty)
208+
h.awaitStatus() mustEqual 1
209+
h.stderr() must contain("Boom!")
210+
h.stdout() must not(contain("sadness"))
211+
}
212+
213+
"exit on raising a fatal error inside a flatMap" in {
214+
val h = platform("RaiseFatalErrorFlatMap", List.empty)
215+
h.awaitStatus() mustEqual 1
216+
h.stderr() must contain("Boom!")
217+
h.stdout() must not(contain("sadness"))
218+
}
219+
192220
"warn on global runtime collision" in {
193221
val h = platform("GlobalRacingInit", List.empty)
194222
h.awaitStatus() mustEqual 0
@@ -332,6 +360,7 @@ class IOAppSpec extends Specification {
332360
err must contain(
333361
"[WARNING] A Cats Effect worker thread was detected to be in a blocked state")
334362
}
363+
335364
()
336365
}
337366

kernel/shared/src/main/scala/cats/effect/kernel/Async.scala

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -255,21 +255,25 @@ trait Async[F[_]] extends AsyncPlatform[F] with Sync[F] with Temporal[F] {
255255
* [[fromFutureCancelable]] for a cancelable version
256256
*/
257257
def fromFuture[A](fut: F[Future[A]]): F[A] =
258-
flatMap(fut) { f =>
259-
flatMap(executionContext) { implicit ec =>
260-
async_[A](cb => f.onComplete(t => cb(t.toEither)))
258+
flatMap(executionContext) { implicit ec =>
259+
uncancelable { poll =>
260+
flatMap(poll(fut)) { f => async_[A](cb => f.onComplete(t => cb(t.toEither))) }
261261
}
262262
}
263263

264264
/**
265265
* Like [[fromFuture]], but is cancelable via the provided finalizer.
266266
*/
267267
def fromFutureCancelable[A](futCancel: F[(Future[A], F[Unit])]): F[A] =
268-
flatMap(futCancel) {
269-
case (fut, fin) =>
270-
flatMap(executionContext) { implicit ec =>
271-
async[A](cb => as(delay(fut.onComplete(t => cb(t.toEither))), Some(fin)))
268+
flatMap(executionContext) { implicit ec =>
269+
uncancelable { poll =>
270+
flatMap(poll(futCancel)) {
271+
case (fut, fin) =>
272+
onCancel(
273+
poll(async[A](cb => as(delay(fut.onComplete(t => cb(t.toEither))), Some(unit)))),
274+
fin)
272275
}
276+
}
273277
}
274278

275279
/**

scripts/make-release-prs.sh

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,20 @@ cd "$(dirname $0)/.."
77
primary_base="$(pwd)"
88

99
if [[ $# -ne 2 ]] || [[ "$1" == "--help" ]]; then
10-
echo "usage: $0 old-version new-version"
10+
echo "usage: $0 old-tag new-tag"
1111
exit 1
1212
fi
1313

14-
old_version="$1"
15-
new_version="$2"
14+
old_version="${1#v}"
15+
new_version="${2#v}"
1616

1717
minor_base=series/$(echo $new_version | sed -E 's/([0-9]+).([0-9]+).[0-9]+/\1.\2.x/')
1818
major_base=series/$(echo $new_version | sed -E 's/([0-9]+).[0-9]+.[0-9]+/\1.x/')
1919
minor_branch="release/$new_version-minor"
2020
major_branch="release/$new_version-major"
2121

2222
cd "$(mktemp -d)"
23-
git clone git@github.com:typelevel/cats-effect.git
23+
gh repo clone typelevel/cats-effect
2424
cd 'cats-effect'
2525

2626
git checkout -b $minor_branch origin/$minor_base

scripts/make-site-pr.sh

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,15 @@ cd "$(dirname $0)/.."
77
primary_base="$(pwd)"
88

99
if [[ $# -ne 2 ]] || [[ "$1" == "--help" ]]; then
10-
echo "usage: $0 old-version new-version"
10+
echo "usage: $0 old-tag new-tag"
1111
exit 1
1212
fi
1313

14-
old_version="$1"
15-
new_version="$2"
14+
old_version="${1#v}"
15+
new_version="${2#v}"
1616

1717
cd "$(mktemp -d)"
18-
git clone git@github.com:typelevel/cats-effect.git
18+
gh repo clone typelevel/cats-effect
1919
cd 'cats-effect'
2020

2121
git checkout origin/docs

0 commit comments

Comments
 (0)