Skip to content

Commit 2c401e0

Browse files
authored
feat(rt): accept shared tokio runtimes in with_tokio_rt (#811)
1 parent 1e07dce commit 2c401e0

File tree

5 files changed

+159
-14
lines changed

5 files changed

+159
-14
lines changed

actix-rt/CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
- Minimum supported Rust version (MSRV) is now 1.88.
66
- Add `SystemRunner::stop_future` and `SystemRunner::into_parts` for awaiting system stop inside `block_on`.
7+
- Allow `{System, Arbiter}::with_tokio_rt` to accept shared Tokio runtimes (e.g. `Arc<tokio::runtime::Runtime>` or `&'static tokio::runtime::Runtime`).
78

89
## 2.11.0
910

actix-rt/src/arbiter.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,11 +105,17 @@ impl Arbiter {
105105

106106
/// Spawn a new Arbiter using the [Tokio Runtime](tokio-runtime) returned from a closure.
107107
///
108+
/// The closure may return any type that can be converted into [`Runtime`], such as
109+
/// `tokio::runtime::Runtime`, `Arc<tokio::runtime::Runtime>`, or
110+
/// `&'static tokio::runtime::Runtime`.
111+
///
108112
/// [tokio-runtime]: tokio::runtime::Runtime
113+
/// [`Runtime`]: crate::Runtime
109114
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
110-
pub fn with_tokio_rt<F>(runtime_factory: F) -> Arbiter
115+
pub fn with_tokio_rt<F, R>(runtime_factory: F) -> Arbiter
111116
where
112-
F: FnOnce() -> tokio::runtime::Runtime + Send + 'static,
117+
F: FnOnce() -> R + Send + 'static,
118+
R: Into<crate::runtime::Runtime> + Send + 'static,
113119
{
114120
let sys = System::current();
115121
let system_id = sys.id();
@@ -125,7 +131,7 @@ impl Arbiter {
125131
.spawn({
126132
let tx = tx.clone();
127133
move || {
128-
let rt = crate::runtime::Runtime::from(runtime_factory());
134+
let rt = runtime_factory().into();
129135
let hnd = ArbiterHandle::new(tx);
130136

131137
System::set_current(sys);

actix-rt/src/runtime.rs

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,22 @@
1-
use std::{future::Future, io};
1+
use std::{future::Future, io, sync::Arc};
22

33
use tokio::task::{JoinHandle, LocalSet};
44

5+
#[derive(Debug)]
6+
enum RuntimeInner {
7+
Owned(tokio::runtime::Runtime),
8+
Shared(Arc<tokio::runtime::Runtime>),
9+
Static(&'static tokio::runtime::Runtime),
10+
}
11+
512
/// A Tokio-based runtime proxy.
613
///
714
/// All spawned futures will be executed on the current thread. Therefore, there is no `Send` bound
815
/// on submitted futures.
916
#[derive(Debug)]
1017
pub struct Runtime {
1118
local: LocalSet,
12-
rt: tokio::runtime::Runtime,
19+
rt: RuntimeInner,
1320
}
1421

1522
pub(crate) fn default_tokio_runtime() -> io::Result<tokio::runtime::Runtime> {
@@ -26,11 +33,19 @@ impl Runtime {
2633
let rt = default_tokio_runtime()?;
2734

2835
Ok(Runtime {
29-
rt,
36+
rt: RuntimeInner::Owned(rt),
3037
local: LocalSet::new(),
3138
})
3239
}
3340

41+
fn tokio_runtime_ref(&self) -> &tokio::runtime::Runtime {
42+
match &self.rt {
43+
RuntimeInner::Owned(rt) => rt,
44+
RuntimeInner::Shared(rt) => rt,
45+
RuntimeInner::Static(rt) => rt,
46+
}
47+
}
48+
3449
/// Offload a future onto the single-threaded runtime.
3550
///
3651
/// The returned join handle can be used to await the future's result.
@@ -114,7 +129,7 @@ impl Runtime {
114129
/// of the Actix runtime. This is because Tokio is responsible for driving the Actix system,
115130
/// and blocking tasks could delay or deadlock other tasks in run loop.
116131
pub fn tokio_runtime(&self) -> &tokio::runtime::Runtime {
117-
&self.rt
132+
self.tokio_runtime_ref()
118133
}
119134

120135
/// Runs the provided future, blocking the current thread until the future completes.
@@ -135,15 +150,33 @@ impl Runtime {
135150
where
136151
F: Future,
137152
{
138-
self.local.block_on(&self.rt, f)
153+
self.local.block_on(self.tokio_runtime_ref(), f)
139154
}
140155
}
141156

142157
impl From<tokio::runtime::Runtime> for Runtime {
143158
fn from(rt: tokio::runtime::Runtime) -> Self {
144159
Self {
145160
local: LocalSet::new(),
146-
rt,
161+
rt: RuntimeInner::Owned(rt),
162+
}
163+
}
164+
}
165+
166+
impl From<Arc<tokio::runtime::Runtime>> for Runtime {
167+
fn from(rt: Arc<tokio::runtime::Runtime>) -> Self {
168+
Self {
169+
local: LocalSet::new(),
170+
rt: RuntimeInner::Shared(rt),
171+
}
172+
}
173+
}
174+
175+
impl From<&'static tokio::runtime::Runtime> for Runtime {
176+
fn from(rt: &'static tokio::runtime::Runtime) -> Self {
177+
Self {
178+
local: LocalSet::new(),
179+
rt: RuntimeInner::Static(rt),
147180
}
148181
}
149182
}

actix-rt/src/system.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,21 @@ impl System {
4545

4646
/// Create a new System using the [Tokio Runtime](tokio-runtime) returned from a closure.
4747
///
48+
/// The closure may return any type that can be converted into [`Runtime`], such as
49+
/// `tokio::runtime::Runtime`, `Arc<tokio::runtime::Runtime>`, or
50+
/// `&'static tokio::runtime::Runtime`.
51+
///
4852
/// [tokio-runtime]: tokio::runtime::Runtime
49-
pub fn with_tokio_rt<F>(runtime_factory: F) -> SystemRunner
53+
/// [`Runtime`]: crate::Runtime
54+
pub fn with_tokio_rt<F, R>(runtime_factory: F) -> SystemRunner
5055
where
51-
F: FnOnce() -> tokio::runtime::Runtime,
56+
F: FnOnce() -> R,
57+
R: Into<crate::runtime::Runtime>,
5258
{
5359
let (stop_tx, stop_rx) = watch::channel(None);
5460
let (sys_tx, sys_rx) = mpsc::unbounded_channel();
5561

56-
let rt = crate::runtime::Runtime::from(runtime_factory());
62+
let rt = runtime_factory().into();
5763
let sys_arbiter = rt.block_on(async { Arbiter::in_new_system() });
5864
let system = System::construct(sys_tx, sys_arbiter.clone());
5965

@@ -85,9 +91,10 @@ impl System {
8591
///
8692
/// [tokio-runtime]: tokio::runtime::Runtime
8793
#[doc(hidden)]
88-
pub fn with_tokio_rt<F>(_: F) -> SystemRunner
94+
pub fn with_tokio_rt<F, R>(_: F) -> SystemRunner
8995
where
90-
F: FnOnce() -> tokio::runtime::Runtime,
96+
F: FnOnce() -> R,
97+
R: Into<crate::runtime::Runtime>,
9198
{
9299
unimplemented!("System::with_tokio_rt is not implemented for io-uring feature yet")
93100
}

actix-rt/tests/tests.rs

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,65 @@ fn new_system_with_tokio() {
298298
assert_eq!(rx.recv().unwrap(), 42);
299299
}
300300

301+
#[cfg(not(feature = "io-uring"))]
302+
#[test]
303+
fn new_system_with_shared_tokio_runtime() {
304+
use std::sync::Arc;
305+
306+
let (tx, rx) = channel();
307+
308+
let rt = Arc::new(
309+
tokio::runtime::Builder::new_multi_thread()
310+
.enable_io()
311+
.enable_time()
312+
.worker_threads(2)
313+
.max_blocking_threads(2)
314+
.build()
315+
.unwrap(),
316+
);
317+
318+
let res = System::with_tokio_rt({
319+
let rt = rt.clone();
320+
move || rt
321+
})
322+
.block_on(async {
323+
actix_rt::time::sleep(Duration::from_millis(1)).await;
324+
325+
tokio::task::spawn(async move {
326+
tx.send(7).unwrap();
327+
})
328+
.await
329+
.unwrap();
330+
331+
321usize
332+
});
333+
334+
assert_eq!(res, 321);
335+
assert_eq!(rx.recv().unwrap(), 7);
336+
}
337+
338+
#[cfg(not(feature = "io-uring"))]
339+
#[test]
340+
fn new_system_with_static_tokio_runtime() {
341+
use std::sync::OnceLock;
342+
343+
static TOKIO: OnceLock<tokio::runtime::Runtime> = OnceLock::new();
344+
345+
let res = System::with_tokio_rt(|| -> &'static tokio::runtime::Runtime {
346+
TOKIO.get_or_init(|| {
347+
tokio::runtime::Builder::new_multi_thread()
348+
.enable_io()
349+
.enable_time()
350+
.worker_threads(1)
351+
.build()
352+
.unwrap()
353+
})
354+
})
355+
.block_on(async { 7usize });
356+
357+
assert_eq!(res, 7);
358+
}
359+
301360
#[cfg(not(feature = "io-uring"))]
302361
#[test]
303362
fn new_arbiter_with_tokio() {
@@ -331,6 +390,45 @@ fn new_arbiter_with_tokio() {
331390
assert!(!counter.load(Ordering::SeqCst));
332391
}
333392

393+
#[cfg(not(feature = "io-uring"))]
394+
#[test]
395+
fn new_arbiter_with_shared_tokio_runtime() {
396+
use std::sync::{
397+
atomic::{AtomicBool, Ordering},
398+
Arc,
399+
};
400+
401+
let _ = System::new();
402+
403+
let rt = Arc::new(
404+
tokio::runtime::Builder::new_multi_thread()
405+
.enable_all()
406+
.worker_threads(2)
407+
.build()
408+
.unwrap(),
409+
);
410+
411+
let arb = Arbiter::with_tokio_rt({
412+
let rt = rt.clone();
413+
move || rt
414+
});
415+
416+
let flag = Arc::new(AtomicBool::new(false));
417+
418+
let flag1 = flag.clone();
419+
let did_spawn = arb.spawn(async move {
420+
actix_rt::time::sleep(Duration::from_millis(1)).await;
421+
flag1.store(true, Ordering::SeqCst);
422+
Arbiter::current().stop();
423+
});
424+
425+
assert!(did_spawn);
426+
427+
arb.join().unwrap();
428+
429+
assert!(flag.load(Ordering::SeqCst));
430+
}
431+
334432
#[test]
335433
#[should_panic]
336434
fn no_system_current_panic() {

0 commit comments

Comments
 (0)