Skip to content

Commit 7681b25

Browse files
authored
Fix UB, panic and compiler error in bevy_tasks (#20352)
# Objective Fixes several issues with `bevy_tasks` uncovered by @IceSentry and @Ratysz: + For web builds, `Task::is_finished` is missing, causing compile failures for cross-platform apps. + Scopes running on the single-threaded executor have possible UB due to mistakenly using a `'env` bound instead of a `'scope` lifetime bound on the scope ref. + Scopes running on the single-threaded executor fail to poll tasks spawned on them to completion when all tasks go to sleep at the same time (causing panics and potential UB). ## Solution - Replace `futures_channel::oneshot` with `async_channel::bounded(1)`, and use `Receiver::is_empty` to implement `Task::finished`. The `async_channel` channel crate was chosen because it allows `Task` to be `Send + Sync + Unpin` and because it has a special-case oneshot implementation for bounded size-one channels (inherited from `concurrent-queue`). - Switch around the incorrect lifetime bounds. - Use referencing counting to spin while ticking the executor until all tasks spawned on a scope are complete. Since these are all fairly small changes I rolled them together into a single PR. I can split any of the three out if they prove controversial. ## Testing - Changes are currently untested. I need to check that this fixes the bugs we've been seeing at work, and I'd like to run this through miri. It's concerning to me that miri failed to catch the bad lifetime bound on the scope impl.
1 parent 67b4ecd commit 7681b25

File tree

5 files changed

+273
-183
lines changed

5 files changed

+273
-183
lines changed

crates/bevy_tasks/Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,8 @@ crossbeam-queue = { version = "0.3", default-features = false, features = [
5454
] }
5555

5656
[target.'cfg(target_arch = "wasm32")'.dependencies]
57-
pin-project = { version = "1" }
58-
futures-channel = { version = "0.3", default-features = false }
57+
pin-project = "1"
58+
async-channel = "2.3.0"
5959

6060
[target.'cfg(not(all(target_has_atomic = "8", target_has_atomic = "16", target_has_atomic = "32", target_has_atomic = "64", target_has_atomic = "ptr")))'.dependencies]
6161
async-task = { version = "4.4.0", default-features = false, features = [
@@ -72,6 +72,7 @@ atomic-waker = { version = "1", default-features = false, features = [
7272
futures-lite = { version = "2.0.1", default-features = false, features = [
7373
"std",
7474
] }
75+
async-channel = "2.3.0"
7576

7677
[lints]
7778
workspace = true

crates/bevy_tasks/src/lib.rs

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -71,32 +71,35 @@ use alloc::boxed::Box;
7171
/// An owned and dynamically typed Future used when you can't statically type your result or need to add some indirection.
7272
pub type BoxedFuture<'a, T> = core::pin::Pin<Box<dyn ConditionalSendFuture<Output = T> + 'a>>;
7373

74+
// Modules
75+
mod executor;
7476
pub mod futures;
77+
mod iter;
78+
mod slice;
79+
mod task;
80+
mod usages;
7581

7682
cfg::async_executor! {
7783
if {} else {
7884
mod edge_executor;
7985
}
8086
}
8187

82-
mod executor;
83-
84-
mod slice;
88+
// Exports
89+
pub use iter::ParallelIterator;
8590
pub use slice::{ParallelSlice, ParallelSliceMut};
91+
pub use task::Task;
92+
pub use usages::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool};
8693

87-
cfg::web! {
88-
if {
89-
#[path = "wasm_task.rs"]
90-
mod task;
91-
} else {
92-
mod task;
94+
pub use futures_lite;
95+
pub use futures_lite::future::poll_once;
9396

97+
cfg::web! {
98+
if {} else {
9499
pub use usages::tick_global_task_pools_on_main_thread;
95100
}
96101
}
97102

98-
pub use task::Task;
99-
100103
cfg::multi_threaded! {
101104
if {
102105
mod task_pool;
@@ -111,10 +114,6 @@ cfg::multi_threaded! {
111114
}
112115
}
113116

114-
mod usages;
115-
pub use futures_lite::future::poll_once;
116-
pub use usages::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool};
117-
118117
cfg::switch! {
119118
cfg::async_io => {
120119
pub use async_io::block_on;
@@ -147,11 +146,6 @@ cfg::switch! {
147146
}
148147
}
149148

150-
mod iter;
151-
pub use iter::ParallelIterator;
152-
153-
pub use futures_lite;
154-
155149
/// The tasks prelude.
156150
///
157151
/// This includes the most common types in this crate, re-exported for your convenience.

crates/bevy_tasks/src/single_threaded_task_pool.rs

Lines changed: 89 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,25 @@
11
use alloc::{string::String, vec::Vec};
22
use bevy_platform::sync::Arc;
3-
use core::{cell::RefCell, future::Future, marker::PhantomData, mem};
3+
use core::{cell::{RefCell, Cell}, future::Future, marker::PhantomData, mem};
44

5-
use crate::Task;
5+
use crate::executor::LocalExecutor;
6+
use crate::{block_on, Task};
67

78
crate::cfg::std! {
89
if {
910
use std::thread_local;
10-
use crate::executor::LocalExecutor;
11+
12+
use crate::executor::LocalExecutor as Executor;
1113

1214
thread_local! {
13-
static LOCAL_EXECUTOR: LocalExecutor<'static> = const { LocalExecutor::new() };
15+
static LOCAL_EXECUTOR: Executor<'static> = const { Executor::new() };
1416
}
15-
16-
type ScopeResult<T> = alloc::rc::Rc<RefCell<Option<T>>>;
1717
} else {
18-
use bevy_platform::sync::{Mutex, PoisonError};
19-
use crate::executor::Executor as LocalExecutor;
2018

21-
static LOCAL_EXECUTOR: LocalExecutor<'static> = const { LocalExecutor::new() };
19+
// Because we do not have thread-locals without std, we cannot use LocalExecutor here.
20+
use crate::executor::Executor;
2221

23-
type ScopeResult<T> = Arc<Mutex<Option<T>>>;
22+
static LOCAL_EXECUTOR: Executor<'static> = const { Executor::new() };
2423
}
2524
}
2625

@@ -111,7 +110,7 @@ impl TaskPool {
111110
/// This is similar to `rayon::scope` and `crossbeam::scope`
112111
pub fn scope<'env, F, T>(&self, f: F) -> Vec<T>
113112
where
114-
F: for<'scope> FnOnce(&'env mut Scope<'scope, 'env, T>),
113+
F: for<'scope> FnOnce(&'scope mut Scope<'scope, 'env, T>),
115114
T: Send + 'static,
116115
{
117116
self.scope_with_executor(false, None, f)
@@ -130,7 +129,7 @@ impl TaskPool {
130129
f: F,
131130
) -> Vec<T>
132131
where
133-
F: for<'scope> FnOnce(&'env mut Scope<'scope, 'env, T>),
132+
F: for<'scope> FnOnce(&'scope mut Scope<'scope, 'env, T>),
134133
T: Send + 'static,
135134
{
136135
// SAFETY: This safety comment applies to all references transmuted to 'env.
@@ -141,17 +140,22 @@ impl TaskPool {
141140
// Any usages of the references passed into `Scope` must be accessed through
142141
// the transmuted reference for the rest of this function.
143142

144-
let executor = &LocalExecutor::new();
143+
let executor = LocalExecutor::new();
144+
// SAFETY: As above, all futures must complete in this function so we can change the lifetime
145+
let executor_ref: &'env LocalExecutor<'env> = unsafe { mem::transmute(&executor) };
146+
147+
let results: RefCell<Vec<Option<T>>> = RefCell::new(Vec::new());
145148
// SAFETY: As above, all futures must complete in this function so we can change the lifetime
146-
let executor: &'env LocalExecutor<'env> = unsafe { mem::transmute(executor) };
149+
let results_ref: &'env RefCell<Vec<Option<T>>> = unsafe { mem::transmute(&results) };
147150

148-
let results: RefCell<Vec<ScopeResult<T>>> = RefCell::new(Vec::new());
151+
let pending_tasks: Cell<usize> = Cell::new(0);
149152
// SAFETY: As above, all futures must complete in this function so we can change the lifetime
150-
let results: &'env RefCell<Vec<ScopeResult<T>>> = unsafe { mem::transmute(&results) };
153+
let pending_tasks: &'env Cell<usize> = unsafe { mem::transmute(&pending_tasks) };
151154

152155
let mut scope = Scope {
153-
executor,
154-
results,
156+
executor_ref,
157+
pending_tasks,
158+
results_ref,
155159
scope: PhantomData,
156160
env: PhantomData,
157161
};
@@ -161,21 +165,17 @@ impl TaskPool {
161165

162166
f(scope_ref);
163167

164-
// Loop until all tasks are done
165-
while executor.try_tick() {}
168+
// Wait until the scope is complete
169+
block_on(executor.run(async {
170+
while pending_tasks.get() != 0 {
171+
futures_lite::future::yield_now().await;
172+
}
173+
}));
166174

167-
let results = scope.results.borrow();
168175
results
169-
.iter()
170-
.map(|result| crate::cfg::switch! {{
171-
crate::cfg::std => {
172-
result.borrow_mut().take().unwrap()
173-
}
174-
_ => {
175-
let mut lock = result.lock().unwrap_or_else(PoisonError::into_inner);
176-
lock.take().unwrap()
177-
}
178-
}})
176+
.take()
177+
.into_iter()
178+
.map(|result| result.unwrap())
179179
.collect()
180180
}
181181

@@ -239,7 +239,7 @@ impl TaskPool {
239239
/// ```
240240
pub fn with_local_executor<F, R>(&self, f: F) -> R
241241
where
242-
F: FnOnce(&LocalExecutor) -> R,
242+
F: FnOnce(&Executor) -> R,
243243
{
244244
crate::cfg::switch! {{
245245
crate::cfg::std => {
@@ -257,9 +257,11 @@ impl TaskPool {
257257
/// For more information, see [`TaskPool::scope`].
258258
#[derive(Debug)]
259259
pub struct Scope<'scope, 'env: 'scope, T> {
260-
executor: &'scope LocalExecutor<'scope>,
260+
executor_ref: &'scope LocalExecutor<'scope>,
261+
// The number of pending tasks spawned on the scope
262+
pending_tasks: &'scope Cell<usize>,
261263
// Vector to gather results of all futures spawned during scope run
262-
results: &'env RefCell<Vec<ScopeResult<T>>>,
264+
results_ref: &'env RefCell<Vec<Option<T>>>,
263265

264266
// make `Scope` invariant over 'scope and 'env
265267
scope: PhantomData<&'scope mut &'scope ()>,
@@ -295,21 +297,32 @@ impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> {
295297
///
296298
/// For more information, see [`TaskPool::scope`].
297299
pub fn spawn_on_scope<Fut: Future<Output = T> + 'scope + MaybeSend>(&self, f: Fut) {
298-
let result = ScopeResult::<T>::default();
299-
self.results.borrow_mut().push(result.clone());
300+
// increment the number of pending tasks
301+
let pending_tasks = self.pending_tasks;
302+
pending_tasks.update(|i| i + 1);
303+
304+
// add a spot to keep the result, and record the index
305+
let results_ref = self.results_ref;
306+
let mut results = results_ref.borrow_mut();
307+
let task_number = results.len();
308+
results.push(None);
309+
drop(results);
310+
311+
// create the job closure
300312
let f = async move {
301-
let temp_result = f.await;
302-
303-
crate::cfg::std! {
304-
if {
305-
result.borrow_mut().replace(temp_result);
306-
} else {
307-
let mut lock = result.lock().unwrap_or_else(PoisonError::into_inner);
308-
*lock = Some(temp_result);
309-
}
310-
}
313+
let result = f.await;
314+
315+
// store the result in the allocated slot
316+
let mut results = results_ref.borrow_mut();
317+
results[task_number] = Some(result);
318+
drop(results);
319+
320+
// decrement the pending tasks count
321+
pending_tasks.update(|i| i - 1);
311322
};
312-
self.executor.spawn(f).detach();
323+
324+
// spawn the job itself
325+
self.executor_ref.spawn(f).detach();
313326
}
314327
}
315328

@@ -328,3 +341,32 @@ crate::cfg::std! {
328341
impl<T: Sync> MaybeSync for T {}
329342
}
330343
}
344+
345+
#[cfg(test)]
346+
mod test {
347+
use std::{time, thread};
348+
349+
use super::*;
350+
351+
/// This test creates a scope with a single task that goes to sleep for a
352+
/// nontrivial amount of time. At one point, the scope would (incorrectly)
353+
/// return early under these conditions, causing a crash.
354+
///
355+
/// The correct behavior is for the scope to block until the receiver is
356+
/// woken by the external thread.
357+
#[test]
358+
fn scoped_spawn() {
359+
let (sender, recever) = async_channel::unbounded();
360+
let task_pool = TaskPool {};
361+
let thread = thread::spawn(move || {
362+
let duration = time::Duration::from_millis(50);
363+
thread::sleep(duration);
364+
let _ = sender.send(0);
365+
});
366+
task_pool.scope(|scope| {
367+
scope.spawn(async {
368+
recever.recv().await
369+
});
370+
});
371+
}
372+
}

0 commit comments

Comments
 (0)