Skip to content

Commit 01cdeb6

Browse files
committed
Change ThreadPoolBuilder to a parameterized ThreadSpawn
1 parent 03578b2 commit 01cdeb6

File tree

7 files changed

+222
-109
lines changed

7 files changed

+222
-109
lines changed

rayon-core/src/lib.rs

Lines changed: 102 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ extern crate rand_xorshift;
4747

4848
#[macro_use]
4949
mod log;
50+
#[macro_use]
51+
mod private;
5052

5153
mod job;
5254
mod join;
@@ -73,6 +75,8 @@ pub use thread_pool::current_thread_has_pending_tasks;
7375
pub use thread_pool::current_thread_index;
7476
pub use thread_pool::ThreadPool;
7577

78+
use registry::{CustomSpawn, DefaultSpawn, ThreadSpawn};
79+
7680
/// Returns the number of threads in the current registry. If this
7781
/// code is executing within a Rayon thread-pool, then this will be
7882
/// the number of threads for the thread-pool of the current
@@ -125,8 +129,7 @@ enum ErrorKind {
125129
///
126130
/// [`ThreadPool`]: struct.ThreadPool.html
127131
/// [`build_global()`]: struct.ThreadPoolBuilder.html#method.build_global
128-
#[derive(Default)]
129-
pub struct ThreadPoolBuilder {
132+
pub struct ThreadPoolBuilder<S = DefaultSpawn> {
130133
/// The number of threads in the rayon thread pool.
131134
/// If zero will use the RAYON_NUM_THREADS environment variable.
132135
/// If RAYON_NUM_THREADS is invalid or zero will use the default.
@@ -148,6 +151,9 @@ pub struct ThreadPoolBuilder {
148151
/// Closure invoked on worker thread exit.
149152
exit_handler: Option<Box<ExitHandler>>,
150153

154+
/// Closure invoked to spawn threads.
155+
spawn_handler: S,
156+
151157
/// If false, worker threads will execute spawned jobs in a
152158
/// "depth-first" fashion. If true, they will do a "breadth-first"
153159
/// fashion. Depth-first is the default.
@@ -176,19 +182,69 @@ type StartHandler = Fn(usize) + Send + Sync;
176182
/// Note that this same closure may be invoked multiple times in parallel.
177183
type ExitHandler = Fn(usize) + Send + Sync;
178184

185+
// NB: We can't `#[derive(Default)]` because `S` is left ambiguous.
186+
impl Default for ThreadPoolBuilder {
187+
fn default() -> Self {
188+
ThreadPoolBuilder {
189+
num_threads: 0,
190+
panic_handler: None,
191+
get_thread_name: None,
192+
stack_size: None,
193+
start_handler: None,
194+
exit_handler: None,
195+
spawn_handler: DefaultSpawn,
196+
breadth_first: false,
197+
}
198+
}
199+
}
200+
179201
impl ThreadPoolBuilder {
180202
/// Creates and returns a valid rayon thread pool builder, but does not initialize it.
181-
pub fn new() -> ThreadPoolBuilder {
182-
ThreadPoolBuilder::default()
203+
pub fn new() -> Self {
204+
Self::default()
183205
}
206+
}
184207

208+
/// Note: the `S: ThreadSpawn` constraint is an internal implementation detail for the
209+
/// default spawn and those set by [`spawn_handler`](#method.spawn_handler).
210+
impl<S> ThreadPoolBuilder<S>
211+
where
212+
S: ThreadSpawn,
213+
{
185214
/// Create a new `ThreadPool` initialized using this configuration.
186215
pub fn build(self) -> Result<ThreadPool, ThreadPoolBuildError> {
187216
ThreadPool::build(self)
188217
}
189218

219+
/// Initializes the global thread pool. This initialization is
220+
/// **optional**. If you do not call this function, the thread pool
221+
/// will be automatically initialized with the default
222+
/// configuration. Calling `build_global` is not recommended, except
223+
/// in two scenarios:
224+
///
225+
/// - You wish to change the default configuration.
226+
/// - You are running a benchmark, in which case initializing may
227+
/// yield slightly more consistent results, since the worker threads
228+
/// will already be ready to go even in the first iteration. But
229+
/// this cost is minimal.
230+
///
231+
/// Initialization of the global thread pool happens exactly
232+
/// once. Once started, the configuration cannot be
233+
/// changed. Therefore, if you call `build_global` a second time, it
234+
/// will return an error. An `Ok` result indicates that this
235+
/// is the first initialization of the thread pool.
236+
pub fn build_global(self) -> Result<(), ThreadPoolBuildError> {
237+
let registry = registry::init_global_registry(self)?;
238+
registry.wait_until_primed();
239+
Ok(())
240+
}
241+
}
242+
243+
impl ThreadPoolBuilder {
190244
/// Create a scoped `ThreadPool` initialized using this configuration.
191245
///
246+
/// This is a convenience function for building a pool using a `crossbeam`
247+
/// scope to spawn threads in a [`spawn_handler`](#method.spawn_handler).
192248
/// The threads in this pool will start by calling `wrapper`, which should
193249
/// do initialization and continue by calling `ThreadBuilder::run()`.
194250
pub fn build_scoped<W, F, R>(self, wrapper: W, with_pool: F) -> Result<R, ThreadPoolBuildError>
@@ -198,17 +254,19 @@ impl ThreadPoolBuilder {
198254
{
199255
let result = crossbeam_utils::thread::scope(|scope| {
200256
let wrapper = &wrapper;
201-
let pool = self.spawn(|thread| {
202-
let mut builder = scope.builder();
203-
if let Some(name) = thread.name() {
204-
builder = builder.name(name.to_string());
205-
}
206-
if let Some(size) = thread.stack_size() {
207-
builder = builder.stack_size(size);
208-
}
209-
builder.spawn(move |_| wrapper(thread))?;
210-
Ok(())
211-
})?;
257+
let pool = self
258+
.spawn_handler(|thread| {
259+
let mut builder = scope.builder();
260+
if let Some(name) = thread.name() {
261+
builder = builder.name(name.to_string());
262+
}
263+
if let Some(size) = thread.stack_size() {
264+
builder = builder.stack_size(size);
265+
}
266+
builder.spawn(move |_| wrapper(thread))?;
267+
Ok(())
268+
})
269+
.build()?;
212270
Ok(with_pool(&pool))
213271
});
214272

@@ -217,58 +275,37 @@ impl ThreadPoolBuilder {
217275
Err(err) => unwind::resume_unwinding(err),
218276
}
219277
}
278+
}
220279

221-
/// Create a new `ThreadPool` initialized using this configuration and a
222-
/// custom function for spawning threads.
280+
impl<S> ThreadPoolBuilder<S> {
281+
/// Set a custom function for spawning threads.
223282
///
224283
/// Note that the threads will not exit until after the pool is dropped. It
225284
/// is up to the caller to wait for thread termination if that is important
226285
/// for any invariants. For instance, threads created in `crossbeam::scope`
227286
/// will be joined before that scope returns, and this will block indefinitely
228-
/// if the pool is leaked.
229-
pub fn spawn(
230-
self,
231-
spawn: impl FnMut(ThreadBuilder) -> io::Result<()>,
232-
) -> Result<ThreadPool, ThreadPoolBuildError> {
233-
ThreadPool::build_spawn(self, spawn)
234-
}
235-
236-
/// Initializes the global thread pool. This initialization is
237-
/// **optional**. If you do not call this function, the thread pool
238-
/// will be automatically initialized with the default
239-
/// configuration. Calling `build_global` is not recommended, except
240-
/// in two scenarios:
241-
///
242-
/// - You wish to change the default configuration.
243-
/// - You are running a benchmark, in which case initializing may
244-
/// yield slightly more consistent results, since the worker threads
245-
/// will already be ready to go even in the first iteration. But
246-
/// this cost is minimal.
247-
///
248-
/// Initialization of the global thread pool happens exactly
249-
/// once. Once started, the configuration cannot be
250-
/// changed. Therefore, if you call `build_global` a second time, it
251-
/// will return an error. An `Ok` result indicates that this
252-
/// is the first initialization of the thread pool.
253-
pub fn build_global(self) -> Result<(), ThreadPoolBuildError> {
254-
let registry = registry::init_global_registry(self)?;
255-
registry.wait_until_primed();
256-
Ok(())
287+
/// if the pool is leaked. Furthermore, the global thread pool doesn't terminate
288+
/// until the entire process exits!
289+
pub fn spawn_handler<F>(self, spawn: F) -> ThreadPoolBuilder<CustomSpawn<F>>
290+
where
291+
F: FnMut(ThreadBuilder) -> io::Result<()>,
292+
{
293+
ThreadPoolBuilder {
294+
spawn_handler: CustomSpawn::new(spawn),
295+
// ..self
296+
num_threads: self.num_threads,
297+
panic_handler: self.panic_handler,
298+
get_thread_name: self.get_thread_name,
299+
stack_size: self.stack_size,
300+
start_handler: self.start_handler,
301+
exit_handler: self.exit_handler,
302+
breadth_first: self.breadth_first,
303+
}
257304
}
258305

259-
/// Initializes the global thread pool using a custom function for spawning
260-
/// threads.
261-
///
262-
/// Note that the global thread pool doesn't terminate until the entire process
263-
/// exits! If this is used with something like `crossbeam::scope` that tries to
264-
/// join threads, that will block indefinitely.
265-
pub fn spawn_global(
266-
self,
267-
spawn: impl FnMut(ThreadBuilder) -> io::Result<()>,
268-
) -> Result<(), ThreadPoolBuildError> {
269-
let registry = registry::spawn_global_registry(self, spawn)?;
270-
registry.wait_until_primed();
271-
Ok(())
306+
/// Returns a reference to the current spawn handler.
307+
fn get_spawn_handler(&mut self) -> &mut S {
308+
&mut self.spawn_handler
272309
}
273310

274311
/// Get the number of threads that will be used for the thread
@@ -339,7 +376,7 @@ impl ThreadPoolBuilder {
339376
/// replacement of the now deprecated `RAYON_RS_NUM_CPUS` environment
340377
/// variable. If both variables are specified, `RAYON_NUM_THREADS` will
341378
/// be prefered.
342-
pub fn num_threads(mut self, num_threads: usize) -> ThreadPoolBuilder {
379+
pub fn num_threads(mut self, num_threads: usize) -> Self {
343380
self.num_threads = num_threads;
344381
self
345382
}
@@ -363,7 +400,7 @@ impl ThreadPoolBuilder {
363400
/// If the panic handler itself panics, this will abort the
364401
/// process. To prevent this, wrap the body of your panic handler
365402
/// in a call to `std::panic::catch_unwind()`.
366-
pub fn panic_handler<H>(mut self, panic_handler: H) -> ThreadPoolBuilder
403+
pub fn panic_handler<H>(mut self, panic_handler: H) -> Self
367404
where
368405
H: Fn(Box<Any + Send>) + Send + Sync + 'static,
369406
{
@@ -431,7 +468,7 @@ impl ThreadPoolBuilder {
431468
/// Note that this same closure may be invoked multiple times in parallel.
432469
/// If this closure panics, the panic will be passed to the panic handler.
433470
/// If that handler returns, then startup will continue normally.
434-
pub fn start_handler<H>(mut self, start_handler: H) -> ThreadPoolBuilder
471+
pub fn start_handler<H>(mut self, start_handler: H) -> Self
435472
where
436473
H: Fn(usize) + Send + Sync + 'static,
437474
{
@@ -450,7 +487,7 @@ impl ThreadPoolBuilder {
450487
/// Note that this same closure may be invoked multiple times in parallel.
451488
/// If this closure panics, the panic will be passed to the panic handler.
452489
/// If that handler returns, then the thread will exit normally.
453-
pub fn exit_handler<H>(mut self, exit_handler: H) -> ThreadPoolBuilder
490+
pub fn exit_handler<H>(mut self, exit_handler: H) -> Self
454491
where
455492
H: Fn(usize) + Send + Sync + 'static,
456493
{
@@ -566,7 +603,7 @@ pub fn initialize(config: Configuration) -> Result<(), Box<Error>> {
566603
config.into_builder().build_global().map_err(Box::from)
567604
}
568605

569-
impl fmt::Debug for ThreadPoolBuilder {
606+
impl<S> fmt::Debug for ThreadPoolBuilder<S> {
570607
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
571608
let ThreadPoolBuilder {
572609
ref num_threads,
@@ -575,6 +612,7 @@ impl fmt::Debug for ThreadPoolBuilder {
575612
ref stack_size,
576613
ref start_handler,
577614
ref exit_handler,
615+
spawn_handler: _,
578616
ref breadth_first,
579617
} = *self;
580618

rayon-core/src/private.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
//! The public parts of this private module are used to create traits
2+
//! that cannot be implemented outside of our own crate. This way we
3+
//! can feel free to extend those traits without worrying about it
4+
//! being a breaking change for other implementations.
5+
6+
/// If this type is pub but not publicly reachable, third parties
7+
/// can't name it and can't implement traits using it.
8+
#[allow(missing_debug_implementations)]
9+
pub struct PrivateMarker;
10+
11+
macro_rules! private_decl {
12+
() => {
13+
/// This trait is private; this method exists to make it
14+
/// impossible to implement outside the crate.
15+
#[doc(hidden)]
16+
fn __rayon_private__(&self) -> ::private::PrivateMarker;
17+
}
18+
}
19+
20+
macro_rules! private_impl {
21+
() => {
22+
fn __rayon_private__(&self) -> ::private::PrivateMarker {
23+
::private::PrivateMarker
24+
}
25+
}
26+
}

0 commit comments

Comments
 (0)