Skip to content

Commit 41ad124

Browse files
feat: add warning log for blocking futures (#45)
* feat: add warning when an operation is blocking * feat: add Future type name to the log * docs: add safety comment * feat: print elapsed time in log * fix: don't warn on genservers marked as blocking * Use pin-project-lite instead of declaring unsafe code (#46) * Moved WarnOnBlocking behind an opt-in feature flag * feat: add example showing new feature * Simpler feature flag config * Simpler feature flag config * refactor: minimize the warn-on-block feature mentions * docs: add comment about new feature --------- Co-authored-by: ElFantasma <[email protected]>
1 parent 17ea47d commit 41ad124

File tree

6 files changed

+158
-10
lines changed

6 files changed

+158
-10
lines changed

Cargo.lock

Lines changed: 10 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,12 @@ members = [
1212
"examples/updater",
1313
"examples/updater_threads",
1414
"examples/blocking_genserver",
15+
"examples/busy_genserver_warning",
1516
]
1617

1718
[workspace.dependencies]
18-
spawned-rt = { path = "rt", version = "0.4.0"}
19-
spawned-concurrency = { path = "concurrency", version = "0.4.0"}
19+
spawned-rt = { path = "rt", version = "0.4.0" }
20+
spawned-concurrency = { path = "concurrency", version = "0.4.0" }
2021
tracing = { version = "0.1.41", features = ["log"] }
2122
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
2223

concurrency/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ spawned-rt = { workspace = true }
1010
tracing = { workspace = true }
1111
futures = "0.3.1"
1212
thiserror = "2.0.12"
13+
pin-project-lite = "0.2"
1314

1415
[dev-dependencies]
1516
# This tokio imports are only used in tests, we should not use them in the library code.
@@ -18,3 +19,7 @@ tokio = { version = "1", features = ["full"] }
1819

1920
[lib]
2021
path = "./src/lib.rs"
22+
23+
[features]
24+
# Enable this to log warnings when non-blocking GenServers block the runtime for too much time
25+
warn-on-block = []

concurrency/src/tasks/gen_server.rs

Lines changed: 54 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
//! GenServer trait and structs to create an abstraction similar to Erlang gen_server.
22
//! See examples/name_server for a usage example.
3-
use futures::future::FutureExt as _;
4-
use spawned_rt::tasks::{self as rt, mpsc, oneshot, timeout, CancellationToken};
5-
use std::{fmt::Debug, future::Future, panic::AssertUnwindSafe, time::Duration};
6-
73
use crate::{
84
error::GenServerError,
95
tasks::InitResult::{NoSuccess, Success},
106
};
7+
use futures::future::FutureExt as _;
8+
use spawned_rt::tasks::{self as rt, mpsc, oneshot, timeout, CancellationToken};
9+
use std::{fmt::Debug, future::Future, panic::AssertUnwindSafe, time::Duration};
1110

1211
const DEFAULT_CALL_TIMEOUT: Duration = Duration::from_secs(5);
1312

@@ -36,12 +35,19 @@ impl<G: GenServer> GenServerHandle<G> {
3635
cancellation_token,
3736
};
3837
let handle_clone = handle.clone();
39-
// Ignore the JoinHandle for now. Maybe we'll use it in the future
40-
let _join_handle = rt::spawn(async move {
38+
let inner_future = async move {
4139
if gen_server.run(&handle, &mut rx).await.is_err() {
4240
tracing::trace!("GenServer crashed")
43-
};
44-
});
41+
}
42+
};
43+
44+
#[cfg(feature = "warn-on-block")]
45+
// Optionally warn if the GenServer future blocks for too much time
46+
let inner_future = warn_on_block::WarnOnBlocking::new(inner_future);
47+
48+
// Ignore the JoinHandle for now. Maybe we'll use it in the future
49+
let _join_handle = rt::spawn(inner_future);
50+
4551
handle_clone
4652
}
4753

@@ -294,6 +300,46 @@ pub trait GenServer: Send + Sized {
294300
}
295301
}
296302

303+
#[cfg(feature = "warn-on-block")]
304+
mod warn_on_block {
305+
use super::*;
306+
307+
use std::time::Instant;
308+
use tracing::warn;
309+
310+
pin_project_lite::pin_project! {
311+
pub struct WarnOnBlocking<F: Future>{
312+
#[pin]
313+
inner: F
314+
}
315+
}
316+
317+
impl<F: Future> WarnOnBlocking<F> {
318+
pub fn new(inner: F) -> Self {
319+
Self { inner }
320+
}
321+
}
322+
323+
impl<F: Future> Future for WarnOnBlocking<F> {
324+
type Output = F::Output;
325+
326+
fn poll(
327+
self: std::pin::Pin<&mut Self>,
328+
cx: &mut std::task::Context<'_>,
329+
) -> std::task::Poll<Self::Output> {
330+
let type_id = std::any::type_name::<F>();
331+
let this = self.project();
332+
let now = Instant::now();
333+
let res = this.inner.poll(cx);
334+
let elapsed = now.elapsed();
335+
if elapsed > Duration::from_millis(10) {
336+
warn!(future = ?type_id, elapsed = ?elapsed, "Blocking operation detected");
337+
}
338+
res
339+
}
340+
}
341+
}
342+
297343
#[cfg(test)]
298344
mod tests {
299345

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
[package]
2+
name = "busy_genserver_warning"
3+
version = "0.1.0"
4+
edition = "2024"
5+
6+
[dependencies]
7+
spawned-rt = { workspace = true }
8+
spawned-concurrency = { workspace = true }
9+
tracing = { workspace = true }
10+
11+
[[bin]]
12+
name = "busy_genserver_warning"
13+
path = "main.rs"
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
use spawned_rt::tasks as rt;
2+
use std::time::Duration;
3+
use std::{process::exit, thread};
4+
use tracing::info;
5+
6+
use spawned_concurrency::tasks::{CallResponse, CastResponse, GenServer, GenServerHandle};
7+
8+
// We test a scenario with a badly behaved task
9+
struct BusyWorker;
10+
11+
impl BusyWorker {
12+
pub fn new() -> Self {
13+
BusyWorker
14+
}
15+
}
16+
17+
#[derive(Clone)]
18+
pub enum InMessage {
19+
GetCount,
20+
Stop,
21+
}
22+
23+
#[derive(Clone)]
24+
pub enum OutMsg {
25+
Count(u64),
26+
}
27+
28+
impl GenServer for BusyWorker {
29+
type CallMsg = InMessage;
30+
type CastMsg = ();
31+
type OutMsg = ();
32+
type Error = ();
33+
34+
async fn handle_call(
35+
&mut self,
36+
_: Self::CallMsg,
37+
_: &GenServerHandle<Self>,
38+
) -> CallResponse<Self> {
39+
CallResponse::Stop(())
40+
}
41+
42+
async fn handle_cast(
43+
&mut self,
44+
_: Self::CastMsg,
45+
handle: &GenServerHandle<Self>,
46+
) -> CastResponse {
47+
info!("sleeping");
48+
thread::sleep(Duration::from_millis(542));
49+
handle.clone().cast(()).await.unwrap();
50+
// This sleep is needed to yield control to the runtime.
51+
// If not, the future never returns and the warning isn't emitted.
52+
rt::sleep(Duration::from_millis(0)).await;
53+
CastResponse::NoReply
54+
}
55+
}
56+
57+
/// Example of a program with a semi-blocking [`GenServer`].
58+
/// As mentioned in the `blocking_genserver` example, tasks that block can block
59+
/// the entire runtime in cooperative multitasking models. This is easy to find
60+
/// in practice, since it appears as if the whole world stopped. However, most
61+
/// of the time, tasks simply take longer than expected, which can lead to
62+
/// service degradation and increased latency. To tackle this, we print a warning
63+
/// whenever we detect tasks that take too long to run.
64+
pub fn main() {
65+
rt::run(async move {
66+
// If we change BusyWorker to start_blocking instead, it won't print the warning
67+
let mut badboy = BusyWorker::new().start();
68+
let _ = badboy.cast(()).await;
69+
70+
rt::sleep(Duration::from_secs(5)).await;
71+
exit(0);
72+
})
73+
}

0 commit comments

Comments
 (0)