Skip to content

Commit 3b879a1

Browse files
Rimeeeeeemattsse
andauthored
feat: task executor accessible globally (#15360)
Co-authored-by: Matthias Seitz <[email protected]>
1 parent 81a8c27 commit 3b879a1

File tree

1 file changed

+46
-4
lines changed

1 file changed

+46
-4
lines changed

crates/tasks/src/lib.rs

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use std::{
2727
pin::{pin, Pin},
2828
sync::{
2929
atomic::{AtomicUsize, Ordering},
30-
Arc,
30+
Arc, OnceLock,
3131
},
3232
task::{ready, Context, Poll},
3333
};
@@ -45,6 +45,9 @@ pub mod shutdown;
4545
#[cfg(feature = "rayon")]
4646
pub mod pool;
4747

48+
/// Global [`TaskExecutor`] instance that can be accessed from anywhere.
49+
static GLOBAL_EXECUTOR: OnceLock<TaskExecutor> = OnceLock::new();
50+
4851
/// A type that can spawn tasks.
4952
///
5053
/// The main purpose of this type is to abstract over [`TaskExecutor`] so it's more convenient to
@@ -176,7 +179,7 @@ pub struct TaskManager {
176179
// === impl TaskManager ===
177180

178181
impl TaskManager {
179-
/// Returns a [`TaskManager`] over the currently running Runtime.
182+
/// Returns a new [`TaskManager`] over the currently running Runtime.
180183
///
181184
/// # Panics
182185
///
@@ -187,17 +190,25 @@ impl TaskManager {
187190
}
188191

189192
/// Create a new instance connected to the given handle's tokio runtime.
193+
///
194+
/// This also sets the global [`TaskExecutor`].
190195
pub fn new(handle: Handle) -> Self {
191196
let (panicked_tasks_tx, panicked_tasks_rx) = unbounded_channel();
192197
let (signal, on_shutdown) = signal();
193-
Self {
198+
let manager = Self {
194199
handle,
195200
panicked_tasks_tx,
196201
panicked_tasks_rx,
197202
signal: Some(signal),
198203
on_shutdown,
199204
graceful_tasks: Arc::new(AtomicUsize::new(0)),
200-
}
205+
};
206+
207+
let _ = GLOBAL_EXECUTOR
208+
.set(manager.executor())
209+
.inspect_err(|_| error!("Global executor already set"));
210+
211+
manager
201212
}
202213

203214
/// Returns a new [`TaskExecutor`] that can spawn new tasks onto the tokio runtime this type is
@@ -304,6 +315,23 @@ pub struct TaskExecutor {
304315
// === impl TaskExecutor ===
305316

306317
impl TaskExecutor {
318+
/// Attempts to get the current `TaskExecutor` if one has been initialized.
319+
///
320+
/// Returns an error if no [`TaskExecutor`] has been initialized via [`TaskManager`].
321+
pub fn try_current() -> Result<Self, NoCurrentTaskExecutorError> {
322+
GLOBAL_EXECUTOR.get().cloned().ok_or_else(NoCurrentTaskExecutorError::default)
323+
}
324+
325+
/// Returns the current `TaskExecutor`.
326+
///
327+
/// # Panics
328+
///
329+
/// Panics if no global executor has been initialized. Use [`try_current`](Self::try_current)
330+
/// for a non-panicking version.
331+
pub fn current() -> Self {
332+
Self::try_current().unwrap()
333+
}
334+
307335
/// Returns the [Handle] to the tokio runtime.
308336
pub const fn handle(&self) -> &Handle {
309337
&self.handle
@@ -644,6 +672,12 @@ enum TaskKind {
644672
Blocking,
645673
}
646674

675+
/// Error returned by `try_current` when no task executor has been configured.
676+
#[derive(Debug, Default, thiserror::Error)]
677+
#[error("No current task executor available.")]
678+
#[non_exhaustive]
679+
pub struct NoCurrentTaskExecutorError;
680+
647681
#[cfg(test)]
648682
mod tests {
649683
use super::*;
@@ -783,4 +817,12 @@ mod tests {
783817
manager.graceful_shutdown_with_timeout(timeout);
784818
assert!(!val.load(Ordering::Relaxed));
785819
}
820+
821+
#[test]
822+
fn can_access_global() {
823+
let runtime = tokio::runtime::Runtime::new().unwrap();
824+
let handle = runtime.handle().clone();
825+
let _manager = TaskManager::new(handle);
826+
let _executor = TaskExecutor::try_current().unwrap();
827+
}
786828
}

0 commit comments

Comments
 (0)