Skip to content

Commit 9dde99f

Browse files
authored
Cleanup the multithreaded executor (#12969)
# Objective Improve the code quality of the multithreaded executor. ## Solution * Remove some unused variables. * Use `Mutex::get_mut` where applicable instead of locking. * Use a `startup_systems` FixedBitset to pre-compute the starting systems instead of building it bit-by-bit on startup. * Instead of using `FixedBitset::clear` and `FixedBitset::union_with`, use `FixedBitset::clone_from` instead, which does only a single copy and will not allocate if the target bitset has a large enough allocation. * Replace the `Mutex` around `Conditions` with `SyncUnsafeCell`, and add a `Context::try_lock` that forces it to be synchronized fetched alongside the executor lock. This might produce minimal performance gains, but the focus here is on the code quality improvements.
1 parent a5fa32e commit 9dde99f

File tree

2 files changed

+42
-75
lines changed

2 files changed

+42
-75
lines changed

crates/bevy_ecs/src/schedule/executor/multi_threaded.rs

Lines changed: 39 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::{
22
any::Any,
3-
sync::{Arc, Mutex},
3+
sync::{Arc, Mutex, MutexGuard},
44
};
55

66
use bevy_tasks::{ComputeTaskPool, Scope, TaskPool, ThreadExecutor};
@@ -30,7 +30,7 @@ use super::__rust_begin_short_backtrace;
3030
struct Environment<'env, 'sys> {
3131
executor: &'env MultiThreadedExecutor,
3232
systems: &'sys [SyncUnsafeCell<BoxedSystem>],
33-
conditions: Mutex<Conditions<'sys>>,
33+
conditions: SyncUnsafeCell<Conditions<'sys>>,
3434
world_cell: UnsafeWorldCell<'env>,
3535
}
3636

@@ -50,7 +50,7 @@ impl<'env, 'sys> Environment<'env, 'sys> {
5050
Environment {
5151
executor,
5252
systems: SyncUnsafeCell::from_mut(schedule.systems.as_mut_slice()).as_slice_of_cells(),
53-
conditions: Mutex::new(Conditions {
53+
conditions: SyncUnsafeCell::new(Conditions {
5454
system_conditions: &mut schedule.system_conditions,
5555
set_conditions: &mut schedule.set_conditions,
5656
sets_with_conditions_of_systems: &schedule.sets_with_conditions_of_systems,
@@ -77,7 +77,6 @@ struct SystemTaskMetadata {
7777
/// The result of running a system that is sent across a channel.
7878
struct SystemResult {
7979
system_index: usize,
80-
success: bool,
8180
}
8281

8382
/// Runs the schedule using a thread pool. Non-conflicting systems can run in parallel.
@@ -90,6 +89,7 @@ pub struct MultiThreadedExecutor {
9089
apply_final_deferred: bool,
9190
/// When set, tells the executor that a thread has panicked.
9291
panic_payload: Mutex<Option<Box<dyn Any + Send>>>,
92+
starting_systems: FixedBitSet,
9393
/// Cached tracing span
9494
#[cfg(feature = "trace")]
9595
executor_span: Span,
@@ -105,12 +105,8 @@ pub struct ExecutorState {
105105
local_thread_running: bool,
106106
/// Returns `true` if an exclusive system is running.
107107
exclusive_running: bool,
108-
/// The number of systems expected to run.
109-
num_systems: usize,
110108
/// The number of systems that are running.
111109
num_running_systems: usize,
112-
/// The number of systems that have completed.
113-
num_completed_systems: usize,
114110
/// The number of dependencies each system has that have not completed.
115111
num_dependencies_remaining: Vec<usize>,
116112
/// System sets whose conditions have been evaluated.
@@ -127,8 +123,6 @@ pub struct ExecutorState {
127123
completed_systems: FixedBitSet,
128124
/// Systems that have run but have not had their buffers applied.
129125
unapplied_systems: FixedBitSet,
130-
/// When set, stops the executor from running any more systems.
131-
stop_spawning: bool,
132126
}
133127

134128
/// References to data required by the executor.
@@ -159,6 +153,7 @@ impl SystemExecutor for MultiThreadedExecutor {
159153
let set_count = schedule.set_ids.len();
160154

161155
self.system_completion = ConcurrentQueue::bounded(sys_count.max(1));
156+
self.starting_systems = FixedBitSet::with_capacity(sys_count);
162157
state.evaluated_sets = FixedBitSet::with_capacity(set_count);
163158
state.ready_systems = FixedBitSet::with_capacity(sys_count);
164159
state.ready_systems_copy = FixedBitSet::with_capacity(sys_count);
@@ -175,6 +170,9 @@ impl SystemExecutor for MultiThreadedExecutor {
175170
is_send: schedule.systems[index].is_send(),
176171
is_exclusive: schedule.systems[index].is_exclusive(),
177172
});
173+
if schedule.system_dependencies[index] == 0 {
174+
self.starting_systems.insert(index);
175+
}
178176
}
179177

180178
state.num_dependencies_remaining = Vec::with_capacity(sys_count);
@@ -188,23 +186,14 @@ impl SystemExecutor for MultiThreadedExecutor {
188186
) {
189187
let state = self.state.get_mut().unwrap();
190188
// reset counts
191-
state.num_systems = schedule.systems.len();
192-
if state.num_systems == 0 {
189+
if schedule.systems.is_empty() {
193190
return;
194191
}
195192
state.num_running_systems = 0;
196-
state.num_completed_systems = 0;
197-
state.num_dependencies_remaining.clear();
198193
state
199194
.num_dependencies_remaining
200-
.extend_from_slice(&schedule.system_dependencies);
201-
202-
for (system_index, dependencies) in state.num_dependencies_remaining.iter_mut().enumerate()
203-
{
204-
if *dependencies == 0 {
205-
state.ready_systems.insert(system_index);
206-
}
207-
}
195+
.clone_from(&schedule.system_dependencies);
196+
state.ready_systems.clone_from(&self.starting_systems);
208197

209198
// If stepping is enabled, make sure we skip those systems that should
210199
// not be run.
@@ -213,13 +202,12 @@ impl SystemExecutor for MultiThreadedExecutor {
213202
debug_assert_eq!(skipped_systems.len(), state.completed_systems.len());
214203
// mark skipped systems as completed
215204
state.completed_systems |= skipped_systems;
216-
state.num_completed_systems = state.completed_systems.count_ones(..);
217205

218206
// signal the dependencies for each of the skipped systems, as
219207
// though they had run
220208
for system_index in skipped_systems.ones() {
221209
state.signal_dependents(system_index);
222-
state.ready_systems.set(system_index, false);
210+
state.ready_systems.remove(system_index);
223211
}
224212
}
225213

@@ -251,15 +239,14 @@ impl SystemExecutor for MultiThreadedExecutor {
251239
// Commands should be applied while on the scope's thread, not the executor's thread
252240
let res = apply_deferred(&state.unapplied_systems, systems, world);
253241
if let Err(payload) = res {
254-
let mut panic_payload = self.panic_payload.lock().unwrap();
242+
let panic_payload = self.panic_payload.get_mut().unwrap();
255243
*panic_payload = Some(payload);
256244
}
257245
state.unapplied_systems.clear();
258-
debug_assert!(state.unapplied_systems.is_clear());
259246
}
260247

261248
// check to see if there was a panic
262-
let mut payload = self.panic_payload.lock().unwrap();
249+
let payload = self.panic_payload.get_mut().unwrap();
263250
if let Some(payload) = payload.take() {
264251
std::panic::resume_unwind(payload);
265252
}
@@ -288,10 +275,7 @@ impl<'scope, 'env: 'scope, 'sys> Context<'scope, 'env, 'sys> {
288275
self.environment
289276
.executor
290277
.system_completion
291-
.push(SystemResult {
292-
system_index,
293-
success: res.is_ok(),
294-
})
278+
.push(SystemResult { system_index })
295279
.unwrap_or_else(|error| unreachable!("{}", error));
296280
if let Err(payload) = res {
297281
eprintln!("Encountered a panic in system `{}`!", &*system.name());
@@ -304,17 +288,25 @@ impl<'scope, 'env: 'scope, 'sys> Context<'scope, 'env, 'sys> {
304288
self.tick_executor();
305289
}
306290

291+
fn try_lock<'a>(&'a self) -> Option<(&'a mut Conditions<'sys>, MutexGuard<'a, ExecutorState>)> {
292+
let guard = self.environment.executor.state.try_lock().ok()?;
293+
// SAFETY: This is an exclusive access as no other location fetches conditions mutably, and
294+
// is synchronized by the lock on the executor state.
295+
let conditions = unsafe { &mut *self.environment.conditions.get() };
296+
Some((conditions, guard))
297+
}
298+
307299
fn tick_executor(&self) {
308300
// Ensure that the executor handles any events pushed to the system_completion queue by this thread.
309301
// If this thread acquires the lock, the exector runs after the push() and they are processed.
310302
// If this thread does not acquire the lock, then the is_empty() check on the other thread runs
311303
// after the lock is released, which is after try_lock() failed, which is after the push()
312304
// on this thread, so the is_empty() check will see the new events and loop.
313305
loop {
314-
let Ok(mut guard) = self.environment.executor.state.try_lock() else {
306+
let Some((conditions, mut guard)) = self.try_lock() else {
315307
return;
316308
};
317-
guard.tick(self);
309+
guard.tick(self, conditions);
318310
// Make sure we drop the guard before checking system_completion.is_empty(), or we could lose events.
319311
drop(guard);
320312
if self.environment.executor.system_completion.is_empty() {
@@ -332,6 +324,7 @@ impl MultiThreadedExecutor {
332324
Self {
333325
state: Mutex::new(ExecutorState::new()),
334326
system_completion: ConcurrentQueue::unbounded(),
327+
starting_systems: FixedBitSet::new(),
335328
apply_final_deferred: true,
336329
panic_payload: Mutex::new(None),
337330
#[cfg(feature = "trace")]
@@ -344,9 +337,7 @@ impl ExecutorState {
344337
fn new() -> Self {
345338
Self {
346339
system_task_metadata: Vec::new(),
347-
num_systems: 0,
348340
num_running_systems: 0,
349-
num_completed_systems: 0,
350341
num_dependencies_remaining: Vec::new(),
351342
active_access: default(),
352343
local_thread_running: false,
@@ -358,11 +349,10 @@ impl ExecutorState {
358349
skipped_systems: FixedBitSet::new(),
359350
completed_systems: FixedBitSet::new(),
360351
unapplied_systems: FixedBitSet::new(),
361-
stop_spawning: false,
362352
}
363353
}
364354

365-
fn tick(&mut self, context: &Context) {
355+
fn tick(&mut self, context: &Context, conditions: &mut Conditions) {
366356
#[cfg(feature = "trace")]
367357
let _span = context.environment.executor.executor_span.enter();
368358

@@ -376,7 +366,7 @@ impl ExecutorState {
376366
// - `finish_system_and_handle_dependents` has updated the currently running systems.
377367
// - `rebuild_active_access` locks access for all currently running systems.
378368
unsafe {
379-
self.spawn_system_tasks(context);
369+
self.spawn_system_tasks(context, conditions);
380370
}
381371
}
382372

@@ -385,17 +375,11 @@ impl ExecutorState {
385375
/// have been mutably borrowed (such as the systems currently running).
386376
/// - `world_cell` must have permission to access all world data (not counting
387377
/// any world data that is claimed by systems currently running on this executor).
388-
unsafe fn spawn_system_tasks(&mut self, context: &Context) {
378+
unsafe fn spawn_system_tasks(&mut self, context: &Context, conditions: &mut Conditions) {
389379
if self.exclusive_running {
390380
return;
391381
}
392382

393-
let mut conditions = context
394-
.environment
395-
.conditions
396-
.try_lock()
397-
.expect("Conditions should only be locked while owning the executor state");
398-
399383
// can't borrow since loop mutably borrows `self`
400384
let mut ready_systems = std::mem::take(&mut self.ready_systems_copy);
401385

@@ -405,19 +389,18 @@ impl ExecutorState {
405389
while check_for_new_ready_systems {
406390
check_for_new_ready_systems = false;
407391

408-
ready_systems.clear();
409-
ready_systems.union_with(&self.ready_systems);
392+
ready_systems.clone_from(&self.ready_systems);
410393

411394
for system_index in ready_systems.ones() {
412-
assert!(!self.running_systems.contains(system_index));
395+
debug_assert!(!self.running_systems.contains(system_index));
413396
// SAFETY: Caller assured that these systems are not running.
414397
// Therefore, no other reference to this system exists and there is no aliasing.
415398
let system = unsafe { &mut *context.environment.systems[system_index].get() };
416399

417400
if !self.can_run(
418401
system_index,
419402
system,
420-
&mut conditions,
403+
conditions,
421404
context.environment.world_cell,
422405
) {
423406
// NOTE: exclusive systems with ambiguities are susceptible to
@@ -427,7 +410,7 @@ impl ExecutorState {
427410
continue;
428411
}
429412

430-
self.ready_systems.set(system_index, false);
413+
self.ready_systems.remove(system_index);
431414

432415
// SAFETY: `can_run` returned true, which means that:
433416
// - It must have called `update_archetype_component_access` for each run condition.
@@ -436,7 +419,7 @@ impl ExecutorState {
436419
!self.should_run(
437420
system_index,
438421
system,
439-
&mut conditions,
422+
conditions,
440423
context.environment.world_cell,
441424
)
442425
} {
@@ -523,11 +506,9 @@ impl ExecutorState {
523506
return false;
524507
}
525508

526-
// PERF: use an optimized clear() + extend() operation
527-
let meta_access =
528-
&mut self.system_task_metadata[system_index].archetype_component_access;
529-
meta_access.clear();
530-
meta_access.extend(system.archetype_component_access());
509+
self.system_task_metadata[system_index]
510+
.archetype_component_access
511+
.clone_from(system.archetype_component_access());
531512
}
532513

533514
true
@@ -666,10 +647,7 @@ impl ExecutorState {
666647
}
667648

668649
fn finish_system_and_handle_dependents(&mut self, result: SystemResult) {
669-
let SystemResult {
670-
system_index,
671-
success,
672-
} = result;
650+
let SystemResult { system_index, .. } = result;
673651

674652
if self.system_task_metadata[system_index].is_exclusive {
675653
self.exclusive_running = false;
@@ -681,20 +659,14 @@ impl ExecutorState {
681659

682660
debug_assert!(self.num_running_systems >= 1);
683661
self.num_running_systems -= 1;
684-
self.num_completed_systems += 1;
685-
self.running_systems.set(system_index, false);
662+
self.running_systems.remove(system_index);
686663
self.completed_systems.insert(system_index);
687664
self.unapplied_systems.insert(system_index);
688665

689666
self.signal_dependents(system_index);
690-
691-
if !success {
692-
self.stop_spawning_systems();
693-
}
694667
}
695668

696669
fn skip_system_and_signal_dependents(&mut self, system_index: usize) {
697-
self.num_completed_systems += 1;
698670
self.completed_systems.insert(system_index);
699671
self.signal_dependents(system_index);
700672
}
@@ -710,13 +682,6 @@ impl ExecutorState {
710682
}
711683
}
712684

713-
fn stop_spawning_systems(&mut self) {
714-
if !self.stop_spawning {
715-
self.num_systems = self.num_completed_systems + self.num_running_systems;
716-
self.stop_spawning = true;
717-
}
718-
}
719-
720685
fn rebuild_active_access(&mut self) {
721686
self.active_access.clear();
722687
for index in self.running_systems.ones() {

crates/bevy_utils/src/syncunsafecell.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,12 +101,14 @@ impl<T> SyncUnsafeCell<[T]> {
101101
/// assert_eq!(slice_cell.len(), 3);
102102
/// ```
103103
pub fn as_slice_of_cells(&self) -> &[SyncUnsafeCell<T>] {
104+
let self_ptr: *const SyncUnsafeCell<[T]> = ptr::from_ref(self);
105+
let slice_ptr = self_ptr as *const [SyncUnsafeCell<T>];
104106
// SAFETY: `UnsafeCell<T>` and `SyncUnsafeCell<T>` have #[repr(transparent)]
105107
// therefore:
106108
// - `SyncUnsafeCell<T>` has the same layout as `T`
107109
// - `SyncUnsafeCell<[T]>` has the same layout as `[T]`
108110
// - `SyncUnsafeCell<[T]>` has the same layout as `[SyncUnsafeCell<T>]`
109-
unsafe { &*(ptr::from_ref(self) as *const [SyncUnsafeCell<T>]) }
111+
unsafe { &*slice_ptr }
110112
}
111113
}
112114

0 commit comments

Comments
 (0)