Skip to content

Commit 8cb6019

Browse files
committed
Port to crossbeam-deque 0.6
1 parent a116575 commit 8cb6019

File tree

4 files changed

+33
-42
lines changed

4 files changed

+33
-42
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ exclude = ["ci"]
2020

2121
[dependencies]
2222
rayon-core = { version = "1.5.0", path = "rayon-core" }
23-
crossbeam-deque = "0.2.0"
23+
crossbeam-deque = "0.6.3"
2424

2525
# This is a public dependency!
2626
[dependencies.either]

rayon-core/Cargo.toml

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,7 @@ categories = ["concurrency"]
1717
num_cpus = "1.2"
1818
libc = "0.2.16"
1919
lazy_static = "1"
20-
21-
# This is deliberately not the latest version, because we want
22-
# to support older rustc than crossbeam-deque 0.3+ does.
23-
[dependencies.crossbeam-deque]
24-
version = "0.2.0"
20+
crossbeam-deque = "0.6.3"
2521

2622
# Also held back for rustc compatibility
2723
[dependencies.crossbeam]

rayon-core/src/registry.rs

Lines changed: 20 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crossbeam::sync::SegQueue;
2-
use crossbeam_deque::{Deque, Steal, Stealer};
2+
use crossbeam_deque::{self as deque, Pop, Steal, Stealer, Worker};
33
#[cfg(rayon_unstable)]
44
use internal::task::Task;
55
#[cfg(rayon_unstable)]
@@ -102,8 +102,15 @@ impl Registry {
102102
let n_threads = builder.get_num_threads();
103103
let breadth_first = builder.get_breadth_first();
104104

105-
let workers: Vec<_> = (0..n_threads).map(|_| Deque::new()).collect();
106-
let stealers: Vec<_> = workers.iter().map(|d| d.stealer()).collect();
105+
let (workers, stealers): (Vec<_>, Vec<_>) = (0..n_threads)
106+
.map(|_| {
107+
if breadth_first {
108+
deque::fifo()
109+
} else {
110+
deque::lifo()
111+
}
112+
})
113+
.unzip();
107114

108115
let registry = Arc::new(Registry {
109116
thread_infos: stealers.into_iter().map(|s| ThreadInfo::new(s)).collect(),
@@ -127,9 +134,7 @@ impl Registry {
127134
if let Some(stack_size) = builder.get_stack_size() {
128135
b = b.stack_size(stack_size);
129136
}
130-
if let Err(e) =
131-
b.spawn(move || unsafe { main_loop(worker, registry, index, breadth_first) })
132-
{
137+
if let Err(e) = b.spawn(move || unsafe { main_loop(worker, registry, index) }) {
133138
return Err(ThreadPoolBuildError::new(ErrorKind::IOError(e)));
134139
}
135140
}
@@ -467,16 +472,13 @@ impl ThreadInfo {
467472
468473
pub struct WorkerThread {
469474
/// the "worker" half of our local deque
470-
worker: Deque<JobRef>,
475+
worker: Worker<JobRef>,
471476

472477
/// local queue used for `spawn_fifo` indirection
473478
fifo: JobFifo,
474479

475480
index: usize,
476481

477-
/// are these workers configured to steal breadth-first or not?
478-
breadth_first: bool,
479-
480482
/// A weak random number generator.
481483
rng: XorShift64Star,
482484

@@ -535,7 +537,7 @@ impl WorkerThread {
535537

536538
#[inline]
537539
pub fn local_deque_is_empty(&self) -> bool {
538-
self.worker.len() == 0
540+
self.worker.is_empty()
539541
}
540542

541543
/// Attempts to obtain a "local" job -- typically this means
@@ -544,15 +546,11 @@ impl WorkerThread {
544546
/// bottom.
545547
#[inline]
546548
pub unsafe fn take_local_job(&self) -> Option<JobRef> {
547-
if !self.breadth_first {
548-
self.worker.pop()
549-
} else {
550-
loop {
551-
match self.worker.steal() {
552-
Steal::Empty => return None,
553-
Steal::Data(d) => return Some(d),
554-
Steal::Retry => {}
555-
}
549+
loop {
550+
match self.worker.pop() {
551+
Pop::Empty => return None,
552+
Pop::Data(d) => return Some(d),
553+
Pop::Retry => {}
556554
}
557555
}
558556
}
@@ -620,7 +618,7 @@ impl WorkerThread {
620618
/// local work to do.
621619
unsafe fn steal(&self) -> Option<JobRef> {
622620
// we only steal when we don't have any work to do locally
623-
debug_assert!(self.worker.pop().is_none());
621+
debug_assert!(self.local_deque_is_empty());
624622

625623
// otherwise, try to steal
626624
let num_threads = self.registry.thread_infos.len();
@@ -654,16 +652,10 @@ impl WorkerThread {
654652

655653
/// ////////////////////////////////////////////////////////////////////////
656654
657-
unsafe fn main_loop(
658-
worker: Deque<JobRef>,
659-
registry: Arc<Registry>,
660-
index: usize,
661-
breadth_first: bool,
662-
) {
655+
unsafe fn main_loop(worker: Worker<JobRef>, registry: Arc<Registry>, index: usize) {
663656
let worker_thread = WorkerThread {
664657
worker: worker,
665658
fifo: JobFifo::new(),
666-
breadth_first: breadth_first,
667659
index: index,
668660
rng: XorShift64Star::new(),
669661
registry: registry.clone(),

src/iter/par_bridge.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crossbeam_deque::{Deque, Steal, Stealer};
1+
use crossbeam_deque::{self as deque, Steal, Stealer, Worker};
22

33
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
44
use std::sync::{Mutex, TryLockError};
@@ -79,10 +79,9 @@ where
7979
C: UnindexedConsumer<Self::Item>,
8080
{
8181
let split_count = AtomicUsize::new(current_num_threads());
82-
let deque = Deque::new();
83-
let stealer = deque.stealer();
82+
let (worker, stealer) = deque::fifo();
8483
let done = AtomicBool::new(false);
85-
let iter = Mutex::new((self.iter, deque));
84+
let iter = Mutex::new((self.iter, worker));
8685

8786
bridge_unindexed(
8887
IterParallelProducer {
@@ -99,7 +98,7 @@ where
9998
struct IterParallelProducer<'a, Iter: Iterator + 'a> {
10099
split_count: &'a AtomicUsize,
101100
done: &'a AtomicBool,
102-
iter: &'a Mutex<(Iter, Deque<Iter::Item>)>,
101+
iter: &'a Mutex<(Iter, Worker<Iter::Item>)>,
103102
items: Stealer<Iter::Item>,
104103
}
105104

@@ -167,11 +166,15 @@ where
167166
let count = current_num_threads();
168167
let count = (count * count) * 2;
169168

170-
let (ref mut iter, ref deque) = *guard;
169+
let (ref mut iter, ref worker) = *guard;
171170

172-
while deque.len() < count {
171+
// while worker.len() < count {
172+
// FIXME the new deque doesn't let us count items. We can just
173+
// push a number of items, but that doesn't consider active
174+
// stealers elsewhere.
175+
for _ in 0..count {
173176
if let Some(it) = iter.next() {
174-
deque.push(it);
177+
worker.push(it);
175178
} else {
176179
self.done.store(true, Ordering::SeqCst);
177180
break;

0 commit comments

Comments
 (0)