Skip to content

Commit 9e62075

Browse files
committed
Add a fallback when threading is unsupported
1 parent b6cdc9d commit 9e62075

File tree

2 files changed

+63
-17
lines changed

2 files changed

+63
-17
lines changed

rayon-core/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -707,6 +707,10 @@ impl ThreadPoolBuildError {
707707
fn new(kind: ErrorKind) -> ThreadPoolBuildError {
708708
ThreadPoolBuildError { kind }
709709
}
710+
711+
fn is_unsupported(&self) -> bool {
712+
matches!(&self.kind, ErrorKind::IOError(e) if e.kind() == io::ErrorKind::Unsupported)
713+
}
710714
}
711715

712716
const GLOBAL_POOL_ALREADY_INITIALIZED: &str =

rayon-core/src/registry.rs

Lines changed: 59 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ impl ThreadBuilder {
5050
/// Executes the main loop for this thread. This will not return until the
5151
/// thread pool is dropped.
5252
pub fn run(self) {
53-
unsafe { main_loop(self.worker, self.stealer, self.registry, self.index) }
53+
unsafe { main_loop(self) }
5454
}
5555
}
5656

@@ -164,7 +164,7 @@ static THE_REGISTRY_SET: Once = Once::new();
164164
/// initialization has not already occurred, use the default
165165
/// configuration.
166166
pub(super) fn global_registry() -> &'static Arc<Registry> {
167-
set_global_registry(|| Registry::new(ThreadPoolBuilder::new()))
167+
set_global_registry(default_global_registry)
168168
.or_else(|err| unsafe { THE_REGISTRY.as_ref().ok_or(err) })
169169
.expect("The global thread pool has not been initialized.")
170170
}
@@ -198,6 +198,46 @@ where
198198
result
199199
}
200200

201+
fn default_global_registry() -> Result<Arc<Registry>, ThreadPoolBuildError> {
202+
let result = Registry::new(ThreadPoolBuilder::new());
203+
204+
// If we're running in an environment that doesn't support threads at all, we can fall back to
205+
// using the current thread alone. This is crude, and probably won't work for non-blocking
206+
// calls like `spawn` or `broadcast_spawn`, but a lot of stuff does work fine.
207+
//
208+
// Notably, this allows current WebAssembly targets to work even though their threading support
209+
// is stubbed out, and we won't have to change anything if they do add real threading.
210+
let unsupported = matches!(&result, Err(e) if e.is_unsupported());
211+
if unsupported && WorkerThread::current().is_null() {
212+
let builder = ThreadPoolBuilder::new()
213+
.num_threads(1)
214+
.spawn_handler(|thread| {
215+
// Rather than starting a new thread, we're just taking over the current thread
216+
// *without* running the main loop, so we can still return from here.
217+
// The WorkerThread is leaked, but we never shutdown the global pool anyway.
218+
let worker_thread = Box::leak(Box::new(WorkerThread::from(thread)));
219+
let registry = &*worker_thread.registry;
220+
let index = worker_thread.index;
221+
222+
unsafe {
223+
WorkerThread::set_current(worker_thread);
224+
225+
// let registry know we are ready to do work
226+
Latch::set(&registry.thread_infos[index].primed);
227+
}
228+
229+
Ok(())
230+
});
231+
232+
let fallback_result = Registry::new(builder);
233+
if fallback_result.is_ok() {
234+
return fallback_result;
235+
}
236+
}
237+
238+
result
239+
}
240+
201241
struct Terminator<'a>(&'a Arc<Registry>);
202242

203243
impl<'a> Drop for Terminator<'a> {
@@ -655,6 +695,19 @@ thread_local! {
655695
static WORKER_THREAD_STATE: Cell<*const WorkerThread> = Cell::new(ptr::null());
656696
}
657697

698+
impl From<ThreadBuilder> for WorkerThread {
699+
fn from(thread: ThreadBuilder) -> Self {
700+
Self {
701+
worker: thread.worker,
702+
stealer: thread.stealer,
703+
fifo: JobFifo::new(),
704+
index: thread.index,
705+
rng: XorShift64Star::new(),
706+
registry: thread.registry,
707+
}
708+
}
709+
}
710+
658711
impl Drop for WorkerThread {
659712
fn drop(&mut self) {
660713
// Undo `set_current`
@@ -851,22 +904,11 @@ impl WorkerThread {
851904

852905
/// ////////////////////////////////////////////////////////////////////////
853906
854-
unsafe fn main_loop(
855-
worker: Worker<JobRef>,
856-
stealer: Stealer<JobRef>,
857-
registry: Arc<Registry>,
858-
index: usize,
859-
) {
860-
let worker_thread = &WorkerThread {
861-
worker,
862-
stealer,
863-
fifo: JobFifo::new(),
864-
index,
865-
rng: XorShift64Star::new(),
866-
registry,
867-
};
907+
unsafe fn main_loop(thread: ThreadBuilder) {
908+
let worker_thread = &WorkerThread::from(thread);
868909
WorkerThread::set_current(worker_thread);
869910
let registry = &*worker_thread.registry;
911+
let index = worker_thread.index;
870912

871913
// let registry know we are ready to do work
872914
Latch::set(&registry.thread_infos[index].primed);
@@ -924,7 +966,7 @@ where
924966
// invalidated until we return.
925967
op(&*owner_thread, false)
926968
} else {
927-
global_registry().in_worker_cold(op)
969+
global_registry().in_worker(op)
928970
}
929971
}
930972
}

0 commit comments

Comments
 (0)