Skip to content

Commit af99fb1

Browse files
committed
Add scope_fifo
1 parent 9d0b315 commit af99fb1

File tree

7 files changed

+271
-74
lines changed

7 files changed

+271
-74
lines changed

rayon-core/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ lazy_static = "1"
2323
[dependencies.crossbeam-deque]
2424
version = "0.2.0"
2525

26+
# Also held back for rustc compatibility
27+
[dependencies.crossbeam]
28+
version = "0.3.0"
29+
2630
[dev-dependencies]
2731
rand = "0.5"
2832

rayon-core/src/job.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crossbeam::sync::SegQueue;
12
use latch::Latch;
23
use std::any::Any;
34
use std::cell::UnsafeCell;
@@ -179,3 +180,35 @@ impl<T> JobResult<T> {
179180
}
180181
}
181182
}
183+
184+
/// Indirect queue to provide FIFO job priority.
185+
pub struct JobFifo {
186+
inner: SegQueue<JobRef>,
187+
}
188+
189+
impl JobFifo {
190+
pub fn new() -> Self {
191+
JobFifo {
192+
inner: SegQueue::new(),
193+
}
194+
}
195+
196+
pub unsafe fn push(&self, job_ref: JobRef) -> JobRef {
197+
// A little indirection ensures that spawns are always prioritized in FIFO order. The
198+
// jobs in a thread's deque may be popped from the back (LIFO) or stolen from the front
199+
// (FIFO), but either way they will end up popping from the front of this queue.
200+
self.inner.push(job_ref);
201+
JobRef::new(self)
202+
}
203+
}
204+
205+
impl Job for JobFifo {
206+
unsafe fn execute(this: *const Self) {
207+
// We "execute" a queue by executing its first job, FIFO.
208+
(*this)
209+
.inner
210+
.try_pop()
211+
.expect("job in fifo queue")
212+
.execute()
213+
}
214+
}

rayon-core/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use std::io;
3131
use std::marker::PhantomData;
3232
use std::str::FromStr;
3333

34+
extern crate crossbeam;
3435
extern crate crossbeam_deque;
3536
#[macro_use]
3637
extern crate lazy_static;
@@ -61,6 +62,7 @@ mod test;
6162
pub mod internal;
6263
pub use join::{join, join_context};
6364
pub use scope::{scope, Scope};
65+
pub use scope::{scope_fifo, ScopeFifo};
6466
pub use spawn::spawn;
6567
pub use thread_pool::current_thread_has_pending_tasks;
6668
pub use thread_pool::current_thread_index;

rayon-core/src/registry.rs

Lines changed: 35 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crossbeam::sync::SegQueue;
12
use crossbeam_deque::{Deque, Steal, Stealer};
23
#[cfg(rayon_unstable)]
34
use internal::task::Task;
@@ -13,7 +14,7 @@ use std::collections::hash_map::DefaultHasher;
1314
use std::hash::Hasher;
1415
use std::mem;
1516
use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
16-
use std::sync::{Arc, Mutex, Once, ONCE_INIT};
17+
use std::sync::{Arc, Once, ONCE_INIT};
1718
use std::thread;
1819
use std::usize;
1920
use unwind;
@@ -22,9 +23,8 @@ use {ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError, T
2223

2324
pub struct Registry {
2425
thread_infos: Vec<ThreadInfo>,
25-
state: Mutex<RegistryState>,
2626
sleep: Sleep,
27-
job_uninjector: Stealer<JobRef>,
27+
injected_jobs: SegQueue<JobRef>,
2828
panic_handler: Option<Box<PanicHandler>>,
2929
start_handler: Option<Box<StartHandler>>,
3030
exit_handler: Option<Box<ExitHandler>>,
@@ -45,10 +45,6 @@ pub struct Registry {
4545
terminate_latch: CountLatch,
4646
}
4747

48-
struct RegistryState {
49-
job_injector: Deque<JobRef>,
50-
}
51-
5248
/// ////////////////////////////////////////////////////////////////////////
5349
/// Initialization
5450
@@ -104,16 +100,13 @@ impl Registry {
104100
let n_threads = builder.get_num_threads();
105101
let breadth_first = builder.get_breadth_first();
106102

107-
let inj_worker = Deque::new();
108-
let inj_stealer = inj_worker.stealer();
109103
let workers: Vec<_> = (0..n_threads).map(|_| Deque::new()).collect();
110104
let stealers: Vec<_> = workers.iter().map(|d| d.stealer()).collect();
111105

112106
let registry = Arc::new(Registry {
113107
thread_infos: stealers.into_iter().map(|s| ThreadInfo::new(s)).collect(),
114-
state: Mutex::new(RegistryState::new(inj_worker)),
115108
sleep: Sleep::new(),
116-
job_uninjector: inj_stealer,
109+
injected_jobs: SegQueue::new(),
117110
terminate_latch: CountLatch::new(),
118111
panic_handler: builder.take_panic_handler(),
119112
start_handler: builder.take_start_handler(),
@@ -175,6 +168,18 @@ impl Registry {
175168
}
176169
}
177170

171+
/// Returns the current `WorkerThread` if it's part of this `Registry`.
172+
pub fn current_thread(&self) -> Option<&WorkerThread> {
173+
unsafe {
174+
if let Some(worker) = WorkerThread::current().as_ref() {
175+
if worker.registry().id() == self.id() {
176+
return Some(worker);
177+
}
178+
}
179+
None
180+
}
181+
}
182+
178183
/// Returns an opaque identifier for this registry.
179184
pub fn id(&self) -> RegistryId {
180185
// We can rely on `self` not to change since we only ever create
@@ -297,39 +302,31 @@ impl Registry {
297302
log!(InjectJobs {
298303
count: injected_jobs.len()
299304
});
300-
{
301-
let state = self.state.lock().unwrap();
302-
303-
// It should not be possible for `state.terminate` to be true
304-
// here. It is only set to true when the user creates (and
305-
// drops) a `ThreadPool`; and, in that case, they cannot be
306-
// calling `inject()` later, since they dropped their
307-
// `ThreadPool`.
308-
assert!(
309-
!self.terminate_latch.probe(),
310-
"inject() sees state.terminate as true"
311-
);
312-
313-
for &job_ref in injected_jobs {
314-
state.job_injector.push(job_ref);
315-
}
305+
306+
// It should not be possible for `state.terminate` to be true
307+
// here. It is only set to true when the user creates (and
308+
// drops) a `ThreadPool`; and, in that case, they cannot be
309+
// calling `inject()` later, since they dropped their
310+
// `ThreadPool`.
311+
assert!(
312+
!self.terminate_latch.probe(),
313+
"inject() sees state.terminate as true"
314+
);
315+
316+
for &job_ref in injected_jobs {
317+
self.injected_jobs.push(job_ref);
316318
}
317319
self.sleep.tickle(usize::MAX);
318320
}
319321

320322
fn pop_injected_job(&self, worker_index: usize) -> Option<JobRef> {
321-
loop {
322-
match self.job_uninjector.steal() {
323-
Steal::Empty => return None,
324-
Steal::Data(d) => {
325-
log!(UninjectedWork {
326-
worker: worker_index
327-
});
328-
return Some(d);
329-
}
330-
Steal::Retry => {}
331-
}
323+
let job = self.injected_jobs.try_pop();
324+
if job.is_some() {
325+
log!(UninjectedWork {
326+
worker: worker_index
327+
});
332328
}
329+
job
333330
}
334331

335332
/// If already in a worker-thread of this registry, just execute `op`.
@@ -439,14 +436,6 @@ pub struct RegistryId {
439436
addr: usize,
440437
}
441438

442-
impl RegistryState {
443-
pub fn new(job_injector: Deque<JobRef>) -> RegistryState {
444-
RegistryState {
445-
job_injector: job_injector,
446-
}
447-
}
448-
}
449-
450439
struct ThreadInfo {
451440
/// Latch set once thread has started and we are entering into the
452441
/// main loop. Used to wait for worker threads to become primed,

0 commit comments

Comments
 (0)