Skip to content

Commit 99b1ff5

Browse files
bors[bot]cuviper
andauthored
Merge #945
945: Revert "Use a single termination latch for the registry" r=cuviper a=cuviper This reverts commit 1dfd5d9. This was a mistake -- multiple threads can't wait on the same latch, because the inner `CoreLatch` tracks thread-specific sleep state. Co-authored-by: Josh Stone <[email protected]>
2 parents 76e3a74 + a7017a8 commit 99b1ff5

File tree

3 files changed

+26
-19
lines changed

3 files changed

+26
-19
lines changed

rayon-core/src/latch.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -292,11 +292,10 @@ impl CountLatch {
292292
}
293293
}
294294

295-
/// Increments the latch counter by one and returns the previous value.
296295
#[inline]
297-
pub(super) fn increment(&self) -> usize {
296+
pub(super) fn increment(&self) {
298297
debug_assert!(!self.core_latch.probe());
299-
self.counter.fetch_add(1, Ordering::Relaxed)
298+
self.counter.fetch_add(1, Ordering::Relaxed);
300299
}
301300

302301
/// Decrements the latch counter by one. If this is the final
@@ -345,12 +344,10 @@ impl CountLockLatch {
345344
}
346345
}
347346

348-
/// Increments the latch counter by one and returns the previous value.
349347
#[inline]
350-
pub(super) fn increment(&self) -> usize {
348+
pub(super) fn increment(&self) {
351349
let old_counter = self.counter.fetch_add(1, Ordering::Relaxed);
352350
debug_assert!(old_counter != 0);
353-
old_counter
354351
}
355352

356353
pub(super) fn wait(&self) {

rayon-core/src/registry.rs

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ pub(super) struct Registry {
152152
// - when `join()` or `scope()` is invoked, similarly, no adjustments are needed.
153153
// These are always owned by some other job (e.g., one injected by `ThreadPool::install()`)
154154
// and that job will keep the pool alive.
155-
terminate: CountLatch,
155+
terminate_count: AtomicUsize,
156156
}
157157

158158
/// ////////////////////////////////////////////////////////////////////////
@@ -238,7 +238,7 @@ impl Registry {
238238
thread_infos: stealers.into_iter().map(ThreadInfo::new).collect(),
239239
sleep: Sleep::new(logger, n_threads),
240240
injected_jobs: Injector::new(),
241-
terminate: CountLatch::new(),
241+
terminate_count: AtomicUsize::new(1),
242242
panic_handler: builder.take_panic_handler(),
243243
start_handler: builder.take_start_handler(),
244244
exit_handler: builder.take_exit_handler(),
@@ -390,8 +390,9 @@ impl Registry {
390390
// drops) a `ThreadPool`; and, in that case, they cannot be
391391
// calling `inject()` later, since they dropped their
392392
// `ThreadPool`.
393-
debug_assert!(
394-
!self.terminate.as_core_latch().probe(),
393+
debug_assert_ne!(
394+
self.terminate_count.load(Ordering::Acquire),
395+
0,
395396
"inject() sees state.terminate as true"
396397
);
397398

@@ -522,7 +523,7 @@ impl Registry {
522523
/// terminate count and is responsible for invoking `terminate()`
523524
/// when finished.
524525
pub(super) fn increment_terminate_count(&self) {
525-
let previous = self.terminate.increment();
526+
let previous = self.terminate_count.fetch_add(1, Ordering::AcqRel);
526527
debug_assert!(previous != 0, "registry ref count incremented from zero");
527528
assert!(
528529
previous != std::usize::MAX,
@@ -534,9 +535,9 @@ impl Registry {
534535
/// dropped. The worker threads will gradually terminate, once any
535536
/// extant work is completed.
536537
pub(super) fn terminate(&self) {
537-
if self.terminate.set() {
538-
for i in 0..self.num_threads() {
539-
self.notify_worker_latch_is_set(i);
538+
if self.terminate_count.fetch_sub(1, Ordering::AcqRel) == 1 {
539+
for (i, thread_info) in self.thread_infos.iter().enumerate() {
540+
thread_info.terminate.set_and_tickle_one(self, i);
540541
}
541542
}
542543
}
@@ -562,6 +563,15 @@ struct ThreadInfo {
562563
/// until workers have stopped; only used for tests.
563564
stopped: LockLatch,
564565

566+
/// The latch used to signal that terminated has been requested.
567+
/// This latch is *set* by the `terminate` method on the
568+
/// `Registry`, once the registry's main "terminate" counter
569+
/// reaches zero.
570+
///
571+
/// NB. We use a `CountLatch` here because it has no lifetimes and is
572+
/// meant for async use, but the count never gets higher than one.
573+
terminate: CountLatch,
574+
565575
/// the "stealer" half of the worker's deque
566576
stealer: Stealer<JobRef>,
567577
}
@@ -571,6 +581,7 @@ impl ThreadInfo {
571581
ThreadInfo {
572582
primed: LockLatch::new(),
573583
stopped: LockLatch::new(),
584+
terminate: CountLatch::new(),
574585
stealer,
575586
}
576587
}
@@ -817,12 +828,12 @@ unsafe fn main_loop(worker: Worker<JobRef>, registry: Arc<Registry>, index: usiz
817828
}
818829
}
819830

820-
let terminate_latch = registry.terminate.as_core_latch();
831+
let my_terminate_latch = &registry.thread_infos[index].terminate;
821832
worker_thread.log(|| ThreadStart {
822833
worker: index,
823-
terminate_addr: terminate_latch.addr(),
834+
terminate_addr: my_terminate_latch.as_core_latch().addr(),
824835
});
825-
worker_thread.wait_until_cold(terminate_latch);
836+
worker_thread.wait_until(my_terminate_latch);
826837

827838
// Should not be any work left in our queue.
828839
debug_assert!(worker_thread.take_local_job().is_none());

rayon-core/src/scope/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -699,8 +699,7 @@ impl ScopeLatch {
699699
}
700700
}
701701

702-
/// Increments the latch counter by one and returns the previous value.
703-
fn increment(&self) -> usize {
702+
fn increment(&self) {
704703
match self {
705704
ScopeLatch::Stealing { latch, .. } => latch.increment(),
706705
ScopeLatch::Blocking { latch } => latch.increment(),

0 commit comments

Comments
 (0)