Skip to content

Commit 8ccc028

Browse files
committed
Add spawn_fifo
1 parent af99fb1 commit 8ccc028

File tree

5 files changed

+125
-7
lines changed

5 files changed

+125
-7
lines changed

rayon-core/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ pub mod internal;
6363
pub use join::{join, join_context};
6464
pub use scope::{scope, Scope};
6565
pub use scope::{scope_fifo, ScopeFifo};
66-
pub use spawn::spawn;
66+
pub use spawn::{spawn, spawn_fifo};
6767
pub use thread_pool::current_thread_has_pending_tasks;
6868
pub use thread_pool::current_thread_index;
6969
pub use thread_pool::ThreadPool;

rayon-core/src/registry.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use crossbeam_deque::{Deque, Steal, Stealer};
44
use internal::task::Task;
55
#[cfg(rayon_unstable)]
66
use job::Job;
7-
use job::{JobRef, StackJob};
7+
use job::{JobFifo, JobRef, StackJob};
88
use latch::{CountLatch, Latch, LatchProbe, LockLatch, SpinLatch, TickleLatch};
99
use log::Event::*;
1010
use sleep::Sleep;
@@ -467,6 +467,9 @@ pub struct WorkerThread {
467467
/// the "worker" half of our local deque
468468
worker: Deque<JobRef>,
469469

470+
/// local queue used for `spawn_fifo` indirection
471+
fifo: JobFifo,
472+
470473
index: usize,
471474

472475
/// are these workers configured to steal breadth-first or not?
@@ -523,6 +526,11 @@ impl WorkerThread {
523526
self.registry.sleep.tickle(self.index);
524527
}
525528

529+
#[inline]
530+
pub unsafe fn push_fifo(&self, job: JobRef) {
531+
self.push(self.fifo.push(job));
532+
}
533+
526534
#[inline]
527535
pub fn local_deque_is_empty(&self) -> bool {
528536
self.worker.len() == 0
@@ -652,6 +660,7 @@ unsafe fn main_loop(
652660
) {
653661
let worker_thread = WorkerThread {
654662
worker: worker,
663+
fifo: JobFifo::new(),
655664
breadth_first: breadth_first,
656665
index: index,
657666
rng: XorShift64Star::new(),

rayon-core/src/spawn/mod.rs

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,29 @@ where
6161
///
6262
/// Not a public API, but used elsewhere in Rayon.
6363
pub unsafe fn spawn_in<F>(func: F, registry: &Arc<Registry>)
64+
where
65+
F: FnOnce() + Send + 'static,
66+
{
67+
// We assert that this does not hold any references (we know
68+
// this because of the `'static` bound in the inferface);
69+
// moreover, we assert that the code below is not supposed to
70+
// be able to panic, and hence the data won't leak but will be
71+
// enqueued into some deque for later execution.
72+
let abort_guard = unwind::AbortIfPanic; // just in case we are wrong, and code CAN panic
73+
let job_ref = spawn_job(func, registry);
74+
registry.inject_or_push(job_ref);
75+
mem::forget(abort_guard);
76+
}
77+
78+
unsafe fn spawn_job<F>(func: F, registry: &Arc<Registry>) -> JobRef
6479
where
6580
F: FnOnce() + Send + 'static,
6681
{
6782
// Ensure that registry cannot terminate until this job has
6883
// executed. This ref is decremented at the (*) below.
6984
registry.increment_terminate_count();
7085

71-
let async_job = Box::new(HeapJob::new({
86+
Box::new(HeapJob::new({
7287
let registry = registry.clone();
7388
move || {
7489
match unwind::halt_unwinding(func) {
@@ -79,16 +94,42 @@ where
7994
}
8095
registry.terminate(); // (*) permit registry to terminate now
8196
}
82-
}));
97+
}))
98+
.as_job_ref()
99+
}
83100

101+
/// TODO: like `spawn`, but FIFO
102+
pub fn spawn_fifo<F>(func: F)
103+
where
104+
F: FnOnce() + Send + 'static,
105+
{
106+
// We assert that current registry has not terminated.
107+
unsafe { spawn_fifo_in(func, &Registry::current()) }
108+
}
109+
110+
/// Spawn an asynchronous FIFO job in `registry.`
111+
///
112+
/// Unsafe because `registry` must not yet have terminated.
113+
///
114+
/// Not a public API, but used elsewhere in Rayon.
115+
pub unsafe fn spawn_fifo_in<F>(func: F, registry: &Arc<Registry>)
116+
where
117+
F: FnOnce() + Send + 'static,
118+
{
84119
// We assert that this does not hold any references (we know
85120
// this because of the `'static` bound in the inferface);
86121
// moreover, we assert that the code below is not supposed to
87122
// be able to panic, and hence the data won't leak but will be
88123
// enqueued into some deque for later execution.
89124
let abort_guard = unwind::AbortIfPanic; // just in case we are wrong, and code CAN panic
90-
let job_ref = HeapJob::as_job_ref(async_job);
91-
registry.inject_or_push(job_ref);
125+
let job_ref = spawn_job(func, registry);
126+
127+
// If we're in the pool, use our thread's private fifo for this thread to execute
128+
// in a locally-FIFO order. Otherwise, just use the pool's global injector.
129+
match registry.current_thread() {
130+
Some(worker) => worker.push_fifo(job_ref),
131+
None => registry.inject(&[job_ref]),
132+
}
92133
mem::forget(abort_guard);
93134
}
94135

rayon-core/src/spawn/test.rs

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::any::Any;
33
use std::sync::mpsc::channel;
44
use std::sync::Mutex;
55

6-
use super::spawn;
6+
use super::{spawn, spawn_fifo};
77
use ThreadPoolBuilder;
88

99
#[test]
@@ -141,3 +141,62 @@ fn custom_panic_handler_and_nested_spawn() {
141141
}
142142
}
143143
}
144+
145+
macro_rules! test_order {
146+
($outer_spawn:ident, $inner_spawn:ident) => {{
147+
let builder = ThreadPoolBuilder::new().num_threads(1);
148+
let pool = builder.build().unwrap();
149+
let (tx, rx) = channel();
150+
pool.install(move || {
151+
for i in 0..10 {
152+
let tx = tx.clone();
153+
$outer_spawn(move || {
154+
for j in 0..10 {
155+
let tx = tx.clone();
156+
$inner_spawn(move || {
157+
tx.send(i * 10 + j).unwrap();
158+
});
159+
}
160+
});
161+
}
162+
});
163+
rx.iter().collect::<Vec<i32>>()
164+
}};
165+
}
166+
167+
#[test]
168+
fn lifo_order() {
169+
// In the absense of stealing, `spawn()` jobs on a thread will run in LIFO order.
170+
let vec = test_order!(spawn, spawn);
171+
let expected: Vec<i32> = (0..100).rev().collect(); // LIFO -> reversed
172+
assert_eq!(vec, expected);
173+
}
174+
175+
#[test]
176+
fn fifo_order() {
177+
// In the absense of stealing, `spawn_fifo()` jobs on a thread will run in FIFO order.
178+
let vec = test_order!(spawn_fifo, spawn_fifo);
179+
let expected: Vec<i32> = (0..100).collect(); // FIFO -> natural order
180+
assert_eq!(vec, expected);
181+
}
182+
183+
#[test]
184+
fn lifo_fifo_order() {
185+
// LIFO on the outside, FIFO on the inside
186+
let vec = test_order!(spawn, spawn_fifo);
187+
let expected: Vec<i32> = (0..10)
188+
.rev()
189+
.flat_map(|i| (0..10).map(move |j| i * 10 + j))
190+
.collect();
191+
assert_eq!(vec, expected);
192+
}
193+
194+
#[test]
195+
fn fifo_lifo_order() {
196+
// FIFO on the outside, LIFO on the inside
197+
let vec = test_order!(spawn_fifo, spawn);
198+
let expected: Vec<i32> = (0..10)
199+
.flat_map(|i| (0..10).rev().map(move |j| i * 10 + j))
200+
.collect();
201+
assert_eq!(vec, expected);
202+
}

rayon-core/src/thread_pool/mod.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,15 @@ impl ThreadPool {
269269
// We assert that `self.registry` has not terminated.
270270
unsafe { spawn::spawn_in(op, &self.registry) }
271271
}
272+
273+
/// TODO: like `spawn`, but FIFO
274+
pub fn spawn_fifo<OP>(&self, op: OP)
275+
where
276+
OP: FnOnce() + Send + 'static,
277+
{
278+
// We assert that `self.registry` has not terminated.
279+
unsafe { spawn::spawn_in(op, &self.registry) }
280+
}
272281
}
273282

274283
impl Drop for ThreadPool {

0 commit comments

Comments
 (0)