Skip to content

Commit 26994db

Browse files
committed
wip: expiriment with stocastic sharing
1 parent 6fe60dd commit 26994db

File tree

4 files changed

+24
-127
lines changed

4 files changed

+24
-127
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ async-task = "4.7.1"
1616
atomic-wait = "1.1.0"
1717
crossbeam-queue = "0.3.12"
1818
crossbeam-utils = "0.8.21"
19+
fastrand = "2.3.0"
1920
shuttle = { version = "0.8.0", optional = true }
2021
tracing = { version = "0.1.41", features = ["release_max_level_off"] }
2122
tracing-subscriber = "0.3.19"

benches/bevy_tasks.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ mod overhead {
5050
for i in 0..80 {
5151
black_box(i);
5252
}
53-
// std::thread::sleep(Duration::from_nanos(100));
5453
black_box(value);
5554
}
5655

@@ -97,7 +96,7 @@ mod overhead {
9796

9897
bencher.bench_local(|| {
9998
THREAD_POOL.with_worker(|worker| {
100-
forte_chunks::<100, _, _>(worker, &mut vec, &|c| {
99+
forte_chunks::<8, _, _>(worker, &mut vec, &|c| {
101100
c.iter_mut().for_each(work);
102101
});
103102
})

src/thread_pool.rs

Lines changed: 21 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
33
use alloc::boxed::Box;
44
use alloc::format;
5-
use alloc::string::ToString;
65
use alloc::vec::Vec;
76
use core::cell::Cell;
87
use core::cmp;
@@ -48,8 +47,6 @@ pub struct ThreadPool {
4847
state: Mutex<ThreadPoolState>,
4948
/// A queue used for cooperatively sharing jobs between workers.
5049
shared_jobs: SegQueue<JobRef>,
51-
/// A condvar that is used to signal a new worker taking a lease on a seat.
52-
start_heartbeat: Condvar,
5350
}
5451

5552
/// The internal state of a thread pool.
@@ -76,27 +73,25 @@ impl ThreadPoolState {
7673
return Lease {
7774
thread_pool,
7875
index,
79-
seat_data: seat.data,
76+
sleep_controller: seat.sleep_controller,
8077
};
8178
}
8279
}
8380

8481
// If none are available, add a new seat.
8582
let index = self.seats.len();
86-
let seat_data = Box::leak(Box::new(SeatData {
87-
#[cfg(not(feature = "shuttle"))]
88-
heartbeat: AtomicBool::new(true).into(),
89-
sleep_controller: SleepController::default(),
90-
}));
83+
let sleep_controller = Box::leak(Box::new(
84+
SleepController::default()
85+
));
9186
let seat = Seat {
9287
occupied: true,
93-
data: seat_data,
88+
sleep_controller
9489
};
9590
self.seats.push(seat);
9691
Lease {
9792
thread_pool,
9893
index,
99-
seat_data,
94+
sleep_controller
10095
}
10196
}
10297

@@ -117,28 +112,25 @@ impl ThreadPoolState {
117112
leases.push(Lease {
118113
thread_pool,
119114
index,
120-
seat_data: seat.data,
115+
sleep_controller: seat.sleep_controller
121116
});
122117
}
123118
}
124119

125120
// Then create new seats as needed.
126121
while leases.len() != num {
127122
let index = self.seats.len();
128-
let seat_data = Box::leak(Box::new(SeatData {
129-
#[cfg(not(feature = "shuttle"))]
130-
heartbeat: AtomicBool::new(true).into(),
131-
sleep_controller: SleepController::default(),
132-
}));
123+
let sleep_controller = Box::leak(Box::new(SleepController::default()
124+
));
133125
let seat = Seat {
134126
occupied: true,
135-
data: seat_data,
127+
sleep_controller,
136128
};
137129
self.seats.push(seat);
138130
leases.push(Lease {
139131
thread_pool,
140132
index,
141-
seat_data,
133+
sleep_controller,
142134
});
143135
}
144136

@@ -149,16 +141,7 @@ impl ThreadPoolState {
149141
#[derive(Clone)]
150142
struct Seat {
151143
occupied: bool,
152-
data: &'static SeatData,
153-
}
154-
155-
/// A public interface that can be claimed and used by a worker.
156-
struct SeatData {
157-
/// The heartbeat signal sent to the worker.
158-
#[cfg(not(feature = "shuttle"))]
159-
heartbeat: CachePadded<AtomicBool>,
160-
/// Allows other threads to wake the worker.
161-
sleep_controller: SleepController,
144+
sleep_controller: &'static SleepController,
162145
}
163146

164147
/// A lease represents ownership of one of a "seats" in a thread pool, and
@@ -169,7 +152,7 @@ pub struct Lease {
169152
/// The index of the claimed seat.
170153
index: usize,
171154
/// The seat being claimed by this lease.
172-
seat_data: &'static SeatData,
155+
sleep_controller: &'static SleepController,
173156
}
174157

175158
impl Drop for Lease {
@@ -183,9 +166,6 @@ impl Drop for Lease {
183166
struct ManagedThreads {
184167
/// Stores thread controls for workers spawned by the pool.
185168
workers: Vec<ManagedWorker>,
186-
/// Stores thread controls for the heartbeat thread.
187-
#[cfg(not(feature = "shuttle"))]
188-
heartbeat: Option<ThreadControl>,
189169
}
190170

191171
/// Represents a worker thread that is managed by the pool, as opposed to
@@ -217,20 +197,16 @@ impl ThreadPool {
217197
seats: Vec::new(),
218198
managed_threads: ManagedThreads {
219199
workers: Vec::new(),
220-
#[cfg(not(feature = "shuttle"))]
221-
heartbeat: None,
222200
},
223201
}),
224202
shared_jobs: SegQueue::new(),
225-
start_heartbeat: Condvar::new(),
226203
}
227204
}
228205

229206
/// Claims a lease on the thread pool which can be occupied by a worker
230207
/// (using [`Worker::occupy`]), allowing a thread to participate in the pool.
231208
#[cold]
232209
pub fn claim_lease(&'static self) -> Lease {
233-
self.start_heartbeat.notify_one();
234210
let mut state = self.state.lock().unwrap();
235211
state.claim_lease(self)
236212
}
@@ -293,7 +269,7 @@ impl ThreadPool {
293269
/// See [`ThreadPool::resize`] for more information about resizing.
294270
pub fn resize_to_available(&'static self) -> usize {
295271
let available = available_parallelism().map(NonZero::get).unwrap_or(1);
296-
let available = available.saturating_sub(2);
272+
let available = available.saturating_sub(1); // Remove one, because the main thread participates in the pool
297273
self.resize_to(available)
298274
}
299275

@@ -384,22 +360,6 @@ impl ThreadPool {
384360
new_size = current_size + new_leases.len(); // Scale back the new size to what we can actually spawn.
385361
trace!("acquired leases for {} new threads", new_size);
386362

387-
// When not in shuttle, start the heartbeat thread if scaling up from zero.
388-
#[cfg(not(feature = "shuttle"))]
389-
if new_size > 0 && current_size == 0 {
390-
debug!("spawning heartbeat runner");
391-
let halt = Arc::new(AtomicBool::new(false));
392-
let heartbeat_halt = halt.clone();
393-
let handle = ThreadBuilder::new()
394-
.name("heartbeat".to_string())
395-
.spawn(move || {
396-
heartbeat_loop(self, heartbeat_halt);
397-
})
398-
.unwrap();
399-
let control = ThreadControl { halt, handle };
400-
state.managed_threads.heartbeat = Some(control);
401-
}
402-
403363
let barrier = Arc::new(Barrier::new(new_leases.len() + 1));
404364

405365
// Spawn the new workers.
@@ -432,23 +392,15 @@ impl ThreadPool {
432392
// Pull the workers we intend to halt out of the thread manager.
433393
let terminating_workers = state.managed_threads.workers.split_off(new_size);
434394

435-
// Halt the heartbeat thread when scaling to zero.
436-
#[cfg(not(feature = "shuttle"))]
437-
let heartbeat_control = if new_size == 0 {
438-
state.managed_threads.heartbeat.take()
439-
} else {
440-
None
441-
};
442-
443395
// Terminate and wake the workers.
444396
for worker in &terminating_workers {
445397
// Tell the worker to halt.
446398
worker.control.halt.store(true, Ordering::Relaxed);
447399
// Wake the worker up.
448-
state.seats[worker.index].data.sleep_controller.wake();
400+
state.seats[worker.index].sleep_controller.wake();
449401
}
450402

451-
// Drop the lock on the state so as not to block the workers or heartbeat.
403+
// Drop the lock on the state so as not to block the workers.
452404
drop(state);
453405

454406
// Determine our seat index.
@@ -462,14 +414,6 @@ impl ThreadPool {
462414
let _ = worker.control.handle.join();
463415
}
464416
}
465-
466-
// If we took control of the heartbeat, halt it after the workers.
467-
#[cfg(not(feature = "shuttle"))]
468-
if let Some(control) = heartbeat_control {
469-
control.halt.store(true, Ordering::Relaxed);
470-
self.start_heartbeat.notify_one();
471-
let _ = control.handle.join();
472-
}
473417
}
474418
}
475419

@@ -837,21 +781,14 @@ impl Worker {
837781
// Try to promote the oldest task in the queue.
838782
#[inline(always)]
839783
fn promote(&self) {
840-
// Check for a heartbeat, potentially promoting the job we just pushed
841-
// to a shared job.
842784
#[cfg(not(feature = "shuttle"))]
843-
let heartbeat = self.lease.seat_data.heartbeat.load(Ordering::Relaxed);
785+
let heartbeat = fastrand::f32() <= 0.0003;
844786

845787
#[cfg(feature = "shuttle")]
846788
let heartbeat = thread_rng().gen_bool(0.5);
847789

848790
if heartbeat && let Some(job_ref) = self.queue.pop_oldest() {
849791
self.promote_cold(job_ref);
850-
#[cfg(not(feature = "shuttle"))]
851-
self.lease
852-
.seat_data
853-
.heartbeat
854-
.store(false, Ordering::Relaxed);
855792
}
856793
}
857794

@@ -864,6 +801,7 @@ impl Worker {
864801
// Push the job onto the shared queue.
865802
self.lease.thread_pool.shared_jobs.push(job_ref);
866803

804+
867805
// Try to wake a worker to work on it.
868806
let seats = self.lease.thread_pool.state.lock().unwrap().seats.clone();
869807
let num_seats = seats.len();
@@ -874,7 +812,7 @@ impl Worker {
874812
continue;
875813
}
876814
if seats[i].occupied {
877-
let ready = seats[i].data.sleep_controller.wake();
815+
let ready = seats[i].sleep_controller.wake();
878816
if ready {
879817
return;
880818
}
@@ -885,7 +823,7 @@ impl Worker {
885823
/// Create a new latch owned by the worker.
886824
#[inline(always)]
887825
pub fn new_latch(&self) -> Latch {
888-
Latch::new(&self.lease.seat_data.sleep_controller)
826+
Latch::new(&self.lease.sleep_controller)
889827
}
890828

891829
/// Runs jobs until the provided latch is set.
@@ -1440,56 +1378,14 @@ fn managed_worker(lease: Lease, halt: Arc<AtomicBool>, barrier: Arc<Barrier>) {
14401378
if let Some((job, migrated)) = worker.find_work() {
14411379
worker.execute(job, migrated);
14421380
} else {
1443-
worker.lease.seat_data.sleep_controller.sleep();
1381+
worker.lease.sleep_controller.sleep();
14441382
}
14451383
}
14461384
});
14471385

14481386
trace!("exiting managed worker");
14491387
}
14501388

1451-
// -----------------------------------------------------------------------------
1452-
// Heartbeat sender loop
1453-
1454-
/// This is the main loop for the heartbeat thread. It's in charge of
1455-
/// periodically sending a "heartbeat" signal to each worker. By default, each
1456-
/// worker receives a heartbeat about once every 100 μs.
1457-
///
1458-
/// Workers use the heartbeat signal to amortize the cost of promoting local
1459-
/// jobs to shared jobs (which allows other works to claim them) and to reduce
1460-
/// lock contention.
1461-
///
1462-
/// This is never runs when testing in shuttle.
1463-
#[cfg(not(feature = "shuttle"))]
1464-
fn heartbeat_loop(thread_pool: &'static ThreadPool, halt: Arc<AtomicBool>) {
1465-
trace!("starting managed heartbeat thread");
1466-
1467-
let mut seats = thread_pool.state.lock().unwrap().seats.clone();
1468-
let mut index = 0;
1469-
1470-
while !halt.load(Ordering::Relaxed) {
1471-
let num_seats = seats.len();
1472-
let (back, front) = seats.split_at(index);
1473-
if let Some((offset, seat)) = Iterator::chain(front.iter(), back.iter())
1474-
.enumerate()
1475-
.find(|(_, seat)| seat.occupied)
1476-
{
1477-
index = (index + offset + 1) % num_seats;
1478-
seat.data.heartbeat.store(true, Ordering::Relaxed);
1479-
std::thread::yield_now();
1480-
seats = thread_pool.state.lock().unwrap().seats.clone();
1481-
} else {
1482-
let state = thread_pool.state.lock().unwrap();
1483-
seats = thread_pool
1484-
.start_heartbeat
1485-
.wait(state)
1486-
.unwrap()
1487-
.seats
1488-
.clone();
1489-
}
1490-
}
1491-
}
1492-
14931389
// -----------------------------------------------------------------------------
14941390
// Tests
14951391

0 commit comments

Comments
 (0)