Skip to content

Commit 76e3a74

Browse files
bors[bot]cuviper
andauthored
Merge #942 #943
942: Remove `mem::transmute` calls from job handling r=cuviper a=cuviper Where `JobRef` was transmuting the `execute` function pointer to hide its argument type, we can instead pass the opaque `*const ()` and let the callee cast the pointer type. Where `HeapJob` was transmuting its own `Box<Self>`, we can instead use `Box::into_raw` and `Box::from_raw`. It can also call its function right out of the recovered `Box`. 943: Use a single termination latch for the registry r=cuviper a=cuviper Co-authored-by: Josh Stone <[email protected]>
3 parents 6afb2d0 + ae75d26 + 1dfd5d9 commit 76e3a74

File tree

5 files changed

+46
-58
lines changed

5 files changed

+46
-58
lines changed

rayon-core/src/job.rs

Lines changed: 24 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ pub(super) trait Job {
2020
/// Unsafe: this may be called from a different thread than the one
2121
/// which scheduled the job, so the implementer must ensure the
2222
/// appropriate traits are met, whether `Send`, `Sync`, or both.
23-
unsafe fn execute(this: *const Self);
23+
unsafe fn execute(this: *const ());
2424
}
2525

2626
/// Effectively a Job trait object. Each JobRef **must** be executed
@@ -45,17 +45,15 @@ impl JobRef {
4545
where
4646
T: Job,
4747
{
48-
let fn_ptr: unsafe fn(*const T) = <T as Job>::execute;
49-
5048
// erase types:
5149
JobRef {
5250
pointer: data as *const (),
53-
execute_fn: mem::transmute(fn_ptr),
51+
execute_fn: <T as Job>::execute,
5452
}
5553
}
5654

5755
#[inline]
58-
pub(super) unsafe fn execute(&self) {
56+
pub(super) unsafe fn execute(self) {
5957
(self.execute_fn)(self.pointer)
6058
}
6159
}
@@ -108,18 +106,11 @@ where
108106
F: FnOnce(bool) -> R + Send,
109107
R: Send,
110108
{
111-
unsafe fn execute(this: *const Self) {
112-
fn call<R>(func: impl FnOnce(bool) -> R) -> impl FnOnce() -> R {
113-
move || func(true)
114-
}
115-
116-
let this = &*this;
109+
unsafe fn execute(this: *const ()) {
110+
let this = &*(this as *const Self);
117111
let abort = unwind::AbortIfPanic;
118112
let func = (*this.func.get()).take().unwrap();
119-
(*this.result.get()) = match unwind::halt_unwinding(call(func)) {
120-
Ok(x) => JobResult::Ok(x),
121-
Err(x) => JobResult::Panic(x),
122-
};
113+
(*this.result.get()) = JobResult::call(func);
123114
this.latch.set();
124115
mem::forget(abort);
125116
}
@@ -135,40 +126,43 @@ pub(super) struct HeapJob<BODY>
135126
where
136127
BODY: FnOnce() + Send,
137128
{
138-
job: UnsafeCell<Option<BODY>>,
129+
job: BODY,
139130
}
140131

141132
impl<BODY> HeapJob<BODY>
142133
where
143134
BODY: FnOnce() + Send,
144135
{
145-
pub(super) fn new(func: BODY) -> Self {
146-
HeapJob {
147-
job: UnsafeCell::new(Some(func)),
148-
}
136+
pub(super) fn new(job: BODY) -> Self {
137+
HeapJob { job }
149138
}
150139

151140
/// Creates a `JobRef` from this job -- note that this hides all
152141
/// lifetimes, so it is up to you to ensure that this JobRef
153142
/// doesn't outlive any data that it closes over.
154-
pub(super) unsafe fn as_job_ref(self: Box<Self>) -> JobRef {
155-
let this: *const Self = mem::transmute(self);
156-
JobRef::new(this)
143+
pub(super) unsafe fn into_job_ref(self: Box<Self>) -> JobRef {
144+
JobRef::new(Box::into_raw(self))
157145
}
158146
}
159147

160148
impl<BODY> Job for HeapJob<BODY>
161149
where
162150
BODY: FnOnce() + Send,
163151
{
164-
unsafe fn execute(this: *const Self) {
165-
let this: Box<Self> = mem::transmute(this);
166-
let job = (*this.job.get()).take().unwrap();
167-
job();
152+
unsafe fn execute(this: *const ()) {
153+
let this = Box::from_raw(this as *mut Self);
154+
(this.job)();
168155
}
169156
}
170157

171158
impl<T> JobResult<T> {
159+
fn call(func: impl FnOnce(bool) -> T) -> Self {
160+
match unwind::halt_unwinding(|| func(true)) {
161+
Ok(x) => JobResult::Ok(x),
162+
Err(x) => JobResult::Panic(x),
163+
}
164+
}
165+
172166
/// Convert the `JobResult` for a job that has finished (and hence
173167
/// its JobResult is populated) into its return value.
174168
///
@@ -204,10 +198,11 @@ impl JobFifo {
204198
}
205199

206200
impl Job for JobFifo {
207-
unsafe fn execute(this: *const Self) {
201+
unsafe fn execute(this: *const ()) {
208202
// We "execute" a queue by executing its first job, FIFO.
203+
let this = &*(this as *const Self);
209204
loop {
210-
match (*this).inner.steal() {
205+
match this.inner.steal() {
211206
Steal::Success(job_ref) => break job_ref.execute(),
212207
Steal::Empty => panic!("FIFO is empty"),
213208
Steal::Retry => {}

rayon-core/src/latch.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -292,10 +292,11 @@ impl CountLatch {
292292
}
293293
}
294294

295+
/// Increments the latch counter by one and returns the previous value.
295296
#[inline]
296-
pub(super) fn increment(&self) {
297+
pub(super) fn increment(&self) -> usize {
297298
debug_assert!(!self.core_latch.probe());
298-
self.counter.fetch_add(1, Ordering::Relaxed);
299+
self.counter.fetch_add(1, Ordering::Relaxed)
299300
}
300301

301302
/// Decrements the latch counter by one. If this is the final
@@ -344,10 +345,12 @@ impl CountLockLatch {
344345
}
345346
}
346347

348+
/// Increments the latch counter by one and returns the previous value.
347349
#[inline]
348-
pub(super) fn increment(&self) {
350+
pub(super) fn increment(&self) -> usize {
349351
let old_counter = self.counter.fetch_add(1, Ordering::Relaxed);
350352
debug_assert!(old_counter != 0);
353+
old_counter
351354
}
352355

353356
pub(super) fn wait(&self) {

rayon-core/src/registry.rs

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ pub(super) struct Registry {
152152
// - when `join()` or `scope()` is invoked, similarly, no adjustments are needed.
153153
// These are always owned by some other job (e.g., one injected by `ThreadPool::install()`)
154154
// and that job will keep the pool alive.
155-
terminate_count: AtomicUsize,
155+
terminate: CountLatch,
156156
}
157157

158158
/// ////////////////////////////////////////////////////////////////////////
@@ -238,7 +238,7 @@ impl Registry {
238238
thread_infos: stealers.into_iter().map(ThreadInfo::new).collect(),
239239
sleep: Sleep::new(logger, n_threads),
240240
injected_jobs: Injector::new(),
241-
terminate_count: AtomicUsize::new(1),
241+
terminate: CountLatch::new(),
242242
panic_handler: builder.take_panic_handler(),
243243
start_handler: builder.take_start_handler(),
244244
exit_handler: builder.take_exit_handler(),
@@ -390,9 +390,8 @@ impl Registry {
390390
// drops) a `ThreadPool`; and, in that case, they cannot be
391391
// calling `inject()` later, since they dropped their
392392
// `ThreadPool`.
393-
debug_assert_ne!(
394-
self.terminate_count.load(Ordering::Acquire),
395-
0,
393+
debug_assert!(
394+
!self.terminate.as_core_latch().probe(),
396395
"inject() sees state.terminate as true"
397396
);
398397

@@ -523,7 +522,7 @@ impl Registry {
523522
/// terminate count and is responsible for invoking `terminate()`
524523
/// when finished.
525524
pub(super) fn increment_terminate_count(&self) {
526-
let previous = self.terminate_count.fetch_add(1, Ordering::AcqRel);
525+
let previous = self.terminate.increment();
527526
debug_assert!(previous != 0, "registry ref count incremented from zero");
528527
assert!(
529528
previous != std::usize::MAX,
@@ -535,9 +534,9 @@ impl Registry {
535534
/// dropped. The worker threads will gradually terminate, once any
536535
/// extant work is completed.
537536
pub(super) fn terminate(&self) {
538-
if self.terminate_count.fetch_sub(1, Ordering::AcqRel) == 1 {
539-
for (i, thread_info) in self.thread_infos.iter().enumerate() {
540-
thread_info.terminate.set_and_tickle_one(self, i);
537+
if self.terminate.set() {
538+
for i in 0..self.num_threads() {
539+
self.notify_worker_latch_is_set(i);
541540
}
542541
}
543542
}
@@ -563,15 +562,6 @@ struct ThreadInfo {
563562
/// until workers have stopped; only used for tests.
564563
stopped: LockLatch,
565564

566-
/// The latch used to signal that terminated has been requested.
567-
/// This latch is *set* by the `terminate` method on the
568-
/// `Registry`, once the registry's main "terminate" counter
569-
/// reaches zero.
570-
///
571-
/// NB. We use a `CountLatch` here because it has no lifetimes and is
572-
/// meant for async use, but the count never gets higher than one.
573-
terminate: CountLatch,
574-
575565
/// the "stealer" half of the worker's deque
576566
stealer: Stealer<JobRef>,
577567
}
@@ -581,7 +571,6 @@ impl ThreadInfo {
581571
ThreadInfo {
582572
primed: LockLatch::new(),
583573
stopped: LockLatch::new(),
584-
terminate: CountLatch::new(),
585574
stealer,
586575
}
587576
}
@@ -828,12 +817,12 @@ unsafe fn main_loop(worker: Worker<JobRef>, registry: Arc<Registry>, index: usiz
828817
}
829818
}
830819

831-
let my_terminate_latch = &registry.thread_infos[index].terminate;
820+
let terminate_latch = registry.terminate.as_core_latch();
832821
worker_thread.log(|| ThreadStart {
833822
worker: index,
834-
terminate_addr: my_terminate_latch.as_core_latch().addr(),
823+
terminate_addr: terminate_latch.addr(),
835824
});
836-
worker_thread.wait_until(my_terminate_latch);
825+
worker_thread.wait_until_cold(terminate_latch);
837826

838827
// Should not be any work left in our queue.
839828
debug_assert!(worker_thread.take_local_job().is_none());

rayon-core/src/scope/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -543,7 +543,7 @@ impl<'scope> Scope<'scope> {
543543
let job_ref = Box::new(HeapJob::new(move || {
544544
self.base.execute_job(move || body(self))
545545
}))
546-
.as_job_ref();
546+
.into_job_ref();
547547

548548
// Since `Scope` implements `Sync`, we can't be sure that we're still in a
549549
// thread of this pool, so we can't just push to the local worker thread.
@@ -584,7 +584,7 @@ impl<'scope> ScopeFifo<'scope> {
584584
let job_ref = Box::new(HeapJob::new(move || {
585585
self.base.execute_job(move || body(self))
586586
}))
587-
.as_job_ref();
587+
.into_job_ref();
588588

589589
// If we're in the pool, use our scope's private fifo for this thread to execute
590590
// in a locally-FIFO order. Otherwise, just use the pool's global injector.
@@ -699,7 +699,8 @@ impl ScopeLatch {
699699
}
700700
}
701701

702-
fn increment(&self) {
702+
/// Increments the latch counter by one and returns the previous value.
703+
fn increment(&self) -> usize {
703704
match self {
704705
ScopeLatch::Stealing { latch, .. } => latch.increment(),
705706
ScopeLatch::Blocking { latch } => latch.increment(),

rayon-core/src/spawn/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ where
103103
registry.terminate(); // (*) permit registry to terminate now
104104
}
105105
}))
106-
.as_job_ref()
106+
.into_job_ref()
107107
}
108108

109109
/// Fires off a task into the Rayon threadpool in the "static" or

0 commit comments

Comments
 (0)