Skip to content

Commit bc81b8a

Browse files
jefftthds
andauthored
feat(console): add warning for tasks that never yield (#439)
feat(console): add warning for tasks that never yield (#439) Adds a new warning for tasks that have never yielded. This is more complicated than it sounds, because under the previous behavior, tasks are only linted when they are created or updated. A task that never yields doesn't get updated, so if it gets linted before the "never yielded" threshold is reached (defaults to 1 second), the warning returns false and the task won't get linted again. To solve this problem, warnings can now return a response that requests to be rechecked later. In practice, this later is on the next update cycle, at which point the task may again request to be rechecked later. For this warning, the overhead will likely be only a single recheck after task creation (and the fact that we need to store those task IDs between updates). We will have to consider the performance impact of any new warnings which may request rechecks as they are added. Co-authored-by: Hayden Stainsby <[email protected]>
1 parent 7d7e198 commit bc81b8a

File tree

4 files changed

+180
-28
lines changed

4 files changed

+180
-28
lines changed

console-subscriber/examples/app.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ OPTIONS:
1111
blocks Includes a (misbehaving) blocking task
1212
burn Includes a (misbehaving) task that spins CPU with self-wakes
1313
coma Includes a (misbehaving) task that forgets to register a waker
14+
noyield Includes a (misbehaving) task that spawns tasks that never yield
1415
"#;
1516

1617
#[tokio::main]
@@ -38,6 +39,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
3839
.spawn(burn(1, 10))
3940
.unwrap();
4041
}
42+
"noyield" => {
43+
tokio::task::Builder::new()
44+
.name("noyield")
45+
.spawn(no_yield(20))
46+
.unwrap();
47+
}
4148
"help" | "-h" => {
4249
eprintln!("{}", HELP);
4350
return Ok(());
@@ -114,3 +121,17 @@ async fn burn(min: u64, max: u64) {
114121
}
115122
}
116123
}
124+
125+
#[tracing::instrument]
126+
async fn no_yield(seconds: u64) {
127+
loop {
128+
let handle = tokio::task::Builder::new()
129+
.name("greedy")
130+
.spawn(async move {
131+
std::thread::sleep(Duration::from_secs(seconds));
132+
})
133+
.expect("Couldn't spawn greedy task");
134+
135+
_ = handle.await;
136+
}
137+
}

tokio-console/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ async fn main() -> color_eyre::Result<()> {
6565
.with_task_linters(vec![
6666
warnings::Linter::new(warnings::SelfWakePercent::default()),
6767
warnings::Linter::new(warnings::LostWaker),
68+
warnings::Linter::new(warnings::NeverYielded::default()),
6869
])
6970
.with_retain_for(retain_for);
7071
let mut input = input::EventStream::new();

tokio-console/src/state/tasks.rs

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,13 @@ use crate::{
99
},
1010
util::Percentage,
1111
view,
12-
warnings::Linter,
12+
warnings::{Lint, Linter},
1313
};
1414
use console_api as proto;
1515
use ratatui::{style::Color, text::Span};
1616
use std::{
1717
cell::RefCell,
18-
collections::HashMap,
18+
collections::{HashMap, HashSet},
1919
convert::{TryFrom, TryInto},
2020
rc::{Rc, Weak},
2121
time::{Duration, SystemTime},
@@ -24,6 +24,7 @@ use std::{
2424
#[derive(Default, Debug)]
2525
pub(crate) struct TasksState {
2626
tasks: Store<Task>,
27+
pending_lint: HashSet<Id<Task>>,
2728
pub(crate) linters: Vec<Linter<Task>>,
2829
dropped_events: u64,
2930
}
@@ -145,6 +146,9 @@ impl TasksState {
145146
let mut stats_update = update.stats_update;
146147
let linters = &self.linters;
147148

149+
// Gathers the tasks that need to be linted again on the next update cycle
150+
let mut next_pending_lint = HashSet::new();
151+
148152
self.tasks
149153
.insert_with(visibility, update.new_tasks, |ids, mut task| {
150154
if task.id.is_none() {
@@ -217,15 +221,30 @@ impl TasksState {
217221
warnings: Vec::new(),
218222
location,
219223
};
220-
task.lint(linters);
224+
if let TaskLintResult::RequiresRecheck = task.lint(linters) {
225+
next_pending_lint.insert(task.id);
226+
}
221227
Some((id, task))
222228
});
223229

224230
for (stats, mut task) in self.tasks.updated(stats_update) {
225231
tracing::trace!(?task, ?stats, "processing stats update for");
226232
task.stats = stats.into();
227-
task.lint(linters);
233+
match task.lint(linters) {
234+
TaskLintResult::RequiresRecheck => next_pending_lint.insert(task.id),
235+
// Avoid linting this task again this cycle
236+
_ => self.pending_lint.remove(&task.id),
237+
};
238+
}
239+
240+
for id in &self.pending_lint {
241+
if let Some(task) = self.tasks.get(*id) {
242+
if let TaskLintResult::RequiresRecheck = task.borrow_mut().lint(linters) {
243+
next_pending_lint.insert(*id);
244+
}
245+
}
228246
}
247+
self.pending_lint = next_pending_lint;
229248

230249
self.dropped_events += update.dropped_events;
231250
}
@@ -430,22 +449,37 @@ impl Task {
430449
&self.warnings[..]
431450
}
432451

433-
fn lint(&mut self, linters: &[Linter<Task>]) {
452+
fn lint(&mut self, linters: &[Linter<Task>]) -> TaskLintResult {
434453
self.warnings.clear();
454+
let mut recheck = false;
435455
for lint in linters {
436456
tracing::debug!(?lint, task = ?self, "checking...");
437-
if let Some(warning) = lint.check(self) {
438-
tracing::info!(?warning, task = ?self, "found a warning!");
439-
self.warnings.push(warning)
457+
match lint.check(self) {
458+
Lint::Warning(warning) => {
459+
tracing::info!(?warning, task = ?self, "found a warning!");
460+
self.warnings.push(warning);
461+
}
462+
Lint::Ok => {}
463+
Lint::Recheck => recheck = true,
440464
}
441465
}
466+
if recheck {
467+
TaskLintResult::RequiresRecheck
468+
} else {
469+
TaskLintResult::Linted
470+
}
442471
}
443472

444473
pub(crate) fn location(&self) -> &str {
445474
&self.location
446475
}
447476
}
448477

478+
enum TaskLintResult {
479+
Linted,
480+
RequiresRecheck,
481+
}
482+
449483
impl From<proto::tasks::Stats> for TaskStats {
450484
fn from(pb: proto::tasks::Stats) -> Self {
451485
let created_at = pb

tokio-console/src/warnings.rs

Lines changed: 116 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
1-
use crate::state::tasks::Task;
2-
use std::{fmt::Debug, rc::Rc};
1+
use crate::state::tasks::{Task, TaskState};
2+
use std::{
3+
fmt::Debug,
4+
rc::Rc,
5+
time::{Duration, SystemTime},
6+
};
37

48
/// A warning for a particular type of monitored entity (e.g. task or resource).
59
///
610
/// This trait implements the logic for detecting a particular warning, and
711
/// generating a warning message describing it. The [`Linter`] type wraps an
812
/// instance of this trait to track active instances of the warning.
913
pub trait Warn<T>: Debug {
10-
/// Returns `true` if the warning applies to `val`.
11-
fn check(&self, val: &T) -> bool;
14+
/// Returns if the warning applies to `val`.
15+
fn check(&self, val: &T) -> Warning;
1216

1317
/// Formats a description of the warning detected for a *specific* `val`.
1418
///
@@ -46,6 +50,19 @@ pub trait Warn<T>: Debug {
4650
fn summary(&self) -> &str;
4751
}
4852

53+
/// A result for a warning check
54+
pub enum Warning {
55+
/// No warning for this entity.
56+
Ok,
57+
58+
/// A warning has been detected for this entity.
59+
Warn,
60+
61+
/// The warning should be rechecked as the conditions to allow for checking
62+
/// are not satisfied yet
63+
Recheck,
64+
}
65+
4966
#[derive(Debug)]
5067
pub(crate) struct Linter<T>(Rc<dyn Warn<T>>);
5168

@@ -57,17 +74,12 @@ impl<T> Linter<T> {
5774
Self(Rc::new(warning))
5875
}
5976

60-
/// Checks if the warning applies to a particular entity, returning a clone
61-
/// of `Self` if it does.
62-
///
63-
/// The cloned instance of `Self` should be held by the entity that
64-
/// generated the warning, so that it can be formatted. Holding the clone of
65-
/// `Self` will increment the warning count for that entity.
66-
pub(crate) fn check(&self, val: &T) -> Option<Self> {
67-
if self.0.check(val) {
68-
Some(Self(self.0.clone()))
69-
} else {
70-
None
77+
/// Checks if the warning applies to a particular entity
78+
pub(crate) fn check(&self, val: &T) -> Lint<T> {
79+
match self.0.check(val) {
80+
Warning::Ok => Lint::Ok,
81+
Warning::Warn => Lint::Warning(Self(self.0.clone())),
82+
Warning::Recheck => Lint::Recheck,
7183
}
7284
}
7385

@@ -78,7 +90,7 @@ impl<T> Linter<T> {
7890

7991
pub(crate) fn format(&self, val: &T) -> String {
8092
debug_assert!(
81-
self.0.check(val),
93+
matches!(self.0.check(val), Warning::Warn),
8294
"tried to format a warning for a {} that did not have that warning!",
8395
std::any::type_name::<T>()
8496
);
@@ -90,6 +102,21 @@ impl<T> Linter<T> {
90102
}
91103
}
92104

105+
/// A result for a linter check
106+
pub(crate) enum Lint<T> {
107+
/// No warning applies to the entity
108+
Ok,
109+
110+
/// The cloned instance of `Self` should be held by the entity that
111+
/// generated the warning, so that it can be formatted. Holding the clone of
112+
/// `Self` will increment the warning count for that entity.
113+
Warning(Linter<T>),
114+
115+
/// The lint should be rechecked as the conditions to allow for checking are
116+
/// not satisfied yet
117+
Recheck,
118+
}
119+
93120
#[derive(Clone, Debug)]
94121
pub(crate) struct SelfWakePercent {
95122
min_percent: u64,
@@ -120,9 +147,13 @@ impl Warn<Task> for SelfWakePercent {
120147
self.description.as_str()
121148
}
122149

123-
fn check(&self, task: &Task) -> bool {
150+
fn check(&self, task: &Task) -> Warning {
124151
let self_wakes = task.self_wake_percent();
125-
self_wakes > self.min_percent
152+
if self_wakes > self.min_percent {
153+
Warning::Warn
154+
} else {
155+
Warning::Ok
156+
}
126157
}
127158

128159
fn format(&self, task: &Task) -> String {
@@ -142,11 +173,76 @@ impl Warn<Task> for LostWaker {
142173
"tasks have lost their waker"
143174
}
144175

145-
fn check(&self, task: &Task) -> bool {
146-
!task.is_completed() && task.waker_count() == 0 && !task.is_running() && !task.is_awakened()
176+
fn check(&self, task: &Task) -> Warning {
177+
if !task.is_completed()
178+
&& task.waker_count() == 0
179+
&& !task.is_running()
180+
&& !task.is_awakened()
181+
{
182+
Warning::Warn
183+
} else {
184+
Warning::Ok
185+
}
147186
}
148187

149188
fn format(&self, _: &Task) -> String {
150189
"This task has lost its waker, and will never be woken again.".into()
151190
}
152191
}
192+
193+
/// Warning for if a task has never yielded
194+
#[derive(Clone, Debug)]
195+
pub(crate) struct NeverYielded {
196+
min_duration: Duration,
197+
description: String,
198+
}
199+
200+
impl NeverYielded {
201+
pub(crate) const DEFAULT_DURATION: Duration = Duration::from_secs(1);
202+
pub(crate) fn new(min_duration: Duration) -> Self {
203+
Self {
204+
min_duration,
205+
description: format!(
206+
"tasks have never yielded (threshold {}ms)",
207+
min_duration.as_millis()
208+
),
209+
}
210+
}
211+
}
212+
213+
impl Default for NeverYielded {
214+
fn default() -> Self {
215+
Self::new(Self::DEFAULT_DURATION)
216+
}
217+
}
218+
219+
impl Warn<Task> for NeverYielded {
220+
fn summary(&self) -> &str {
221+
self.description.as_str()
222+
}
223+
224+
fn check(&self, task: &Task) -> Warning {
225+
// Don't fire warning for tasks that are waiting to run
226+
if task.state() != TaskState::Running {
227+
return Warning::Ok;
228+
}
229+
230+
if task.total_polls() > 1 {
231+
return Warning::Ok;
232+
}
233+
234+
// Avoid short-lived task false positives
235+
if task.busy(SystemTime::now()) >= self.min_duration {
236+
Warning::Warn
237+
} else {
238+
Warning::Recheck
239+
}
240+
}
241+
242+
fn format(&self, task: &Task) -> String {
243+
format!(
244+
"This task has never yielded ({:?})",
245+
task.busy(SystemTime::now()),
246+
)
247+
}
248+
}

0 commit comments

Comments
 (0)