Skip to content

Commit 20b5df9

Browse files
authored
task: fix LocalSet having a single shared task budget (tokio-rs#2462)
## Motivation Currently, an issue exists where a `LocalSet` has a single cooperative task budget that's shared across all futures spawned on the `LocalSet` _and_ by any future passed to `LocalSet::run_until` or `LocalSet::block_on`. Because these methods will poll the `run_until` future before polling spawned tasks, it is possible for that task to _always_ deterministically starve the entire `LocalSet` so that no local tasks can proceed. When the completion of that future _itself_ depends on other tasks on the `LocalSet`, this will then result in a deadlock, as in issue tokio-rs#2460. A detailed description of why this is the case, taken from [this comment][1]: `LocalSet` wraps each time a local task is run in `budget`: https://github.com/tokio-rs/tokio/blob/947045b9445f15fb9314ba0892efa2251076ae73/tokio/src/task/local.rs#L406 This is identical to what tokio's other schedulers do when running tasks, and in theory should give each task its own budget every time it's polled. _However_, `LocalSet` is different from other schedulers. Unlike the runtime schedulers, a `LocalSet` is itself a future that's run on another scheduler, in `block_on`. `block_on` _also_ sets a budget: https://github.com/tokio-rs/tokio/blob/947045b9445f15fb9314ba0892efa2251076ae73/tokio/src/runtime/basic_scheduler.rs#L131 The docs for `budget` state that: https://github.com/tokio-rs/tokio/blob/947045b9445f15fb9314ba0892efa2251076ae73/tokio/src/coop.rs#L73 This means that inside of a `LocalSet`, the calls to `budget` are no-ops. Instead, each future polled by the `LocalSet` is subtracting from a single global budget. `LocalSet`'s `RunUntil` future polls the provided future before polling any other tasks spawned on the local set: https://github.com/tokio-rs/tokio/blob/947045b9445f15fb9314ba0892efa2251076ae73/tokio/src/task/local.rs#L525-L535 In this case, the provided future is `JoinAll`. Unfortunately, every time a `JoinAll` is polled, it polls _every_ joined future that has not yet completed. When the number of futures in the `JoinAll` is >= 128, this means that the `JoinAll` immediately exhausts the task budget. This would, in theory, be a _good_ thing --- if the `JoinAll` had a huge number of `JoinHandle`s in it and none of them are ready, it would limit the time we spend polling those join handles. However, because the `LocalSet` _actually_ has a single shared task budget, this means polling the `JoinAll` _always_ exhausts the entire budget. There is now no budget remaining to poll any other tasks spawned on the `LocalSet`, and they are never able to complete. [1]: tokio-rs#2460 (comment) ## Solution This branch solves this issue by resetting the task budget when polling a `LocalSet`. I've added a new function to `coop` for resetting the task budget to `UNCONSTRAINED` for the duration of a closure, and thus allowing the `budget` calls in `LocalSet` to _actually_ create a new budget for each spawned local task. Additionally, I've changed `LocalSet` to _also_ ensure that a separate task budget is applied to any future passed to `block_on`/`run_until`. Additionally, I've added a test reproducing the issue described in tokio-rs#2460. This test fails prior to this change, and passes after it. Fixes tokio-rs#2460 Signed-off-by: Eliza Weisman <[email protected]>
1 parent fa9743f commit 20b5df9

File tree

4 files changed

+147
-55
lines changed

4 files changed

+147
-55
lines changed

tokio/src/coop.rs

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -85,15 +85,11 @@ where
8585
return f();
8686
}
8787

88-
struct Guard<'a>(&'a Cell<usize>);
89-
impl<'a> Drop for Guard<'a> {
90-
fn drop(&mut self) {
91-
self.0.set(UNCONSTRAINED);
92-
}
93-
}
94-
9588
hits.set(BUDGET);
96-
let _guard = Guard(hits);
89+
let _guard = ResetGuard {
90+
hits,
91+
prev: UNCONSTRAINED,
92+
};
9793
f()
9894
})
9995
}
@@ -114,6 +110,32 @@ cfg_blocking_impl! {
114110
}
115111
}
116112

113+
cfg_rt_core! {
114+
cfg_rt_util! {
115+
/// Run the given closure with a new task budget, resetting the previous
116+
/// budget when the closure finishes.
117+
///
118+
/// This is intended for internal use by `LocalSet` and (potentially) other
119+
/// similar schedulers which are themselves futures, and need a fresh budget
120+
/// for each of their children.
121+
#[inline(always)]
122+
pub(crate) fn reset<F, R>(f: F) -> R
123+
where
124+
F: FnOnce() -> R,
125+
{
126+
HITS.with(move |hits| {
127+
let prev = hits.get();
128+
hits.set(UNCONSTRAINED);
129+
let _guard = ResetGuard {
130+
hits,
131+
prev,
132+
};
133+
f()
134+
})
135+
}
136+
}
137+
}
138+
117139
/// Invoke `f` with a subset of the remaining budget.
118140
///
119141
/// This is useful if you have sub-futures that you need to poll, but that you want to restrict
@@ -289,6 +311,11 @@ pin_project_lite::pin_project! {
289311
}
290312
}
291313

314+
struct ResetGuard<'a> {
315+
hits: &'a Cell<usize>,
316+
prev: usize,
317+
}
318+
292319
impl<F: Future> Future for CoopFuture<F> {
293320
type Output = F::Output;
294321
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
@@ -327,6 +354,12 @@ cfg_sync! {
327354
impl<F> CoopFutureExt for F where F: Future {}
328355
}
329356

357+
impl<'a> Drop for ResetGuard<'a> {
358+
fn drop(&mut self) {
359+
self.hits.set(self.prev);
360+
}
361+
}
362+
330363
#[cfg(all(test, not(loom)))]
331364
mod test {
332365
use super::*;

tokio/src/macros/cfg.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,23 @@ macro_rules! cfg_blocking_impl {
3535
}
3636
}
3737

38+
/// Enables blocking API internals
39+
macro_rules! cfg_blocking_impl_or_task {
40+
($($item:item)*) => {
41+
$(
42+
#[cfg(any(
43+
feature = "blocking",
44+
feature = "fs",
45+
feature = "dns",
46+
feature = "io-std",
47+
feature = "rt-threaded",
48+
feature = "task",
49+
))]
50+
$item
51+
)*
52+
}
53+
}
54+
3855
/// Enables enter::block_on
3956
macro_rules! cfg_block_on {
4057
($($item:item)*) => {

tokio/src/task/local.rs

Lines changed: 35 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -454,20 +454,24 @@ impl Future for LocalSet {
454454
// Register the waker before starting to work
455455
self.context.shared.waker.register_by_ref(cx.waker());
456456

457-
if self.with(|| self.tick()) {
458-
// If `tick` returns true, we need to notify the local future again:
459-
// there are still tasks remaining in the run queue.
460-
cx.waker().wake_by_ref();
461-
Poll::Pending
462-
} else if self.context.tasks.borrow().owned.is_empty() {
463-
// If the scheduler has no remaining futures, we're done!
464-
Poll::Ready(())
465-
} else {
466-
// There are still futures in the local set, but we've polled all the
467-
// futures in the run queue. Therefore, we can just return Pending
468-
// since the remaining futures will be woken from somewhere else.
469-
Poll::Pending
470-
}
457+
// Reset any previous task budget while polling tasks spawned on the
458+
// `LocalSet`, ensuring that each has its own separate budget.
459+
crate::coop::reset(|| {
460+
if self.with(|| self.tick()) {
461+
// If `tick` returns true, we need to notify the local future again:
462+
// there are still tasks remaining in the run queue.
463+
cx.waker().wake_by_ref();
464+
Poll::Pending
465+
} else if self.context.tasks.borrow().owned.is_empty() {
466+
// If the scheduler has no remaining futures, we're done!
467+
Poll::Ready(())
468+
} else {
469+
// There are still futures in the local set, but we've polled all the
470+
// futures in the run queue. Therefore, we can just return Pending
471+
// since the remaining futures will be woken from somewhere else.
472+
Poll::Pending
473+
}
474+
})
471475
}
472476
}
473477

@@ -521,18 +525,23 @@ impl<T: Future> Future for RunUntil<'_, T> {
521525
.register_by_ref(cx.waker());
522526

523527
let _no_blocking = crate::runtime::enter::disallow_blocking();
524-
525-
if let Poll::Ready(output) = me.future.poll(cx) {
526-
return Poll::Ready(output);
527-
}
528-
529-
if me.local_set.tick() {
530-
// If `tick` returns `true`, we need to notify the local future again:
531-
// there are still tasks remaining in the run queue.
532-
cx.waker().wake_by_ref();
533-
}
534-
535-
Poll::Pending
528+
// Reset any previous task budget so that the future passed to
529+
// `run_until` and any tasks spawned on the `LocalSet` have their
530+
// own budgets.
531+
crate::coop::reset(|| {
532+
let f = me.future;
533+
if let Poll::Ready(output) = crate::coop::budget(|| f.poll(cx)) {
534+
return Poll::Ready(output);
535+
}
536+
537+
if me.local_set.tick() {
538+
// If `tick` returns `true`, we need to notify the local future again:
539+
// there are still tasks remaining in the run queue.
540+
cx.waker().wake_by_ref();
541+
}
542+
543+
Poll::Pending
544+
})
536545
})
537546
}
538547
}

tokio/tests/task_local_set.rs

Lines changed: 54 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -312,28 +312,17 @@ fn drop_cancels_tasks() {
312312
assert_eq!(1, Rc::strong_count(&rc1));
313313
}
314314

315-
#[test]
316-
fn drop_cancels_remote_tasks() {
317-
// This test reproduces issue #1885.
315+
/// Runs a test function in a separate thread, and panics if the test does not
316+
/// complete within the specified timeout, or if the test function panics.
317+
///
318+
/// This is intended for running tests whose failure mode is a hang or infinite
319+
/// loop that cannot be detected otherwise.
320+
fn with_timeout(timeout: Duration, f: impl FnOnce() + Send + 'static) {
318321
use std::sync::mpsc::RecvTimeoutError;
319322

320323
let (done_tx, done_rx) = std::sync::mpsc::channel();
321324
let thread = std::thread::spawn(move || {
322-
let (tx, mut rx) = mpsc::channel::<()>(1024);
323-
324-
let mut rt = rt();
325-
326-
let local = LocalSet::new();
327-
local.spawn_local(async move { while let Some(_) = rx.recv().await {} });
328-
local.block_on(&mut rt, async {
329-
time::delay_for(Duration::from_millis(1)).await;
330-
});
331-
332-
drop(tx);
333-
334-
// This enters an infinite loop if the remote notified tasks are not
335-
// properly cancelled.
336-
drop(local);
325+
f();
337326

338327
// Send a message on the channel so that the test thread can
339328
// determine if we have entered an infinite loop:
@@ -349,10 +338,11 @@ fn drop_cancels_remote_tasks() {
349338
//
350339
// Note that it should definitely complete in under a minute, but just
351340
// in case CI is slow, we'll give it a long timeout.
352-
match done_rx.recv_timeout(Duration::from_secs(60)) {
341+
match done_rx.recv_timeout(timeout) {
353342
Err(RecvTimeoutError::Timeout) => panic!(
354-
"test did not complete within 60 seconds, \
355-
we have (probably) entered an infinite loop!"
343+
"test did not complete within {:?} seconds, \
344+
we have (probably) entered an infinite loop!",
345+
timeout,
356346
),
357347
// Did the test thread panic? We'll find out for sure when we `join`
358348
// with it.
@@ -366,6 +356,49 @@ fn drop_cancels_remote_tasks() {
366356
thread.join().expect("test thread should not panic!")
367357
}
368358

359+
#[test]
360+
fn drop_cancels_remote_tasks() {
361+
// This test reproduces issue #1885.
362+
with_timeout(Duration::from_secs(60), || {
363+
let (tx, mut rx) = mpsc::channel::<()>(1024);
364+
365+
let mut rt = rt();
366+
367+
let local = LocalSet::new();
368+
local.spawn_local(async move { while let Some(_) = rx.recv().await {} });
369+
local.block_on(&mut rt, async {
370+
time::delay_for(Duration::from_millis(1)).await;
371+
});
372+
373+
drop(tx);
374+
375+
// This enters an infinite loop if the remote notified tasks are not
376+
// properly cancelled.
377+
drop(local);
378+
});
379+
}
380+
381+
#[test]
382+
fn local_tasks_wake_join_all() {
383+
// This test reproduces issue #2460.
384+
with_timeout(Duration::from_secs(60), || {
385+
use futures::future::join_all;
386+
use tokio::task::LocalSet;
387+
388+
let mut rt = rt();
389+
let set = LocalSet::new();
390+
let mut handles = Vec::new();
391+
392+
for _ in 1..=128 {
393+
handles.push(set.spawn_local(async move {
394+
tokio::task::spawn_local(async move {}).await.unwrap();
395+
}));
396+
}
397+
398+
rt.block_on(set.run_until(join_all(handles)));
399+
});
400+
}
401+
369402
#[tokio::test]
370403
async fn local_tasks_are_polled_after_tick() {
371404
// Reproduces issues #1899 and #1900

0 commit comments

Comments
 (0)