Skip to content

Commit 4ab39c9

Browse files
authored
feat(rt): implement stop_future/into_parts (#808)
1 parent 13e17e7 commit 4ab39c9

File tree

4 files changed

+119
-9
lines changed

4 files changed

+119
-9
lines changed

actix-rt/CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
## Unreleased
44

55
- Minimum supported Rust version (MSRV) is now 1.88.
6+
- Add `SystemRunner::stop_future` and `SystemRunner::into_parts` for awaiting system stop inside `block_on`.
67

78
## 2.11.0
89

actix-rt/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ use tokio::task::JoinHandle;
6868
pub use self::{
6969
arbiter::{Arbiter, ArbiterHandle},
7070
runtime::Runtime,
71-
system::{System, SystemRunner},
71+
system::{System, SystemRunner, SystemStop},
7272
};
7373

7474
pub mod signal {

actix-rt/src/system.rs

Lines changed: 89 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use std::{
99
};
1010

1111
use futures_core::ready;
12-
use tokio::sync::{mpsc, oneshot};
12+
use tokio::sync::{mpsc, watch};
1313

1414
use crate::{arbiter::ArbiterHandle, Arbiter};
1515

@@ -50,7 +50,7 @@ impl System {
5050
where
5151
F: FnOnce() -> tokio::runtime::Runtime,
5252
{
53-
let (stop_tx, stop_rx) = oneshot::channel();
53+
let (stop_tx, stop_rx) = watch::channel(None);
5454
let (sys_tx, sys_rx) = mpsc::unbounded_channel();
5555

5656
let rt = crate::runtime::Runtime::from(runtime_factory());
@@ -176,7 +176,7 @@ impl System {
176176
#[derive(Debug)]
177177
pub struct SystemRunner {
178178
rt: crate::runtime::Runtime,
179-
stop_rx: oneshot::Receiver<i32>,
179+
stop_rx: watch::Receiver<Option<i32>>,
180180
}
181181

182182
#[cfg(not(feature = "io-uring"))]
@@ -196,7 +196,7 @@ impl SystemRunner {
196196
let SystemRunner { rt, stop_rx, .. } = self;
197197

198198
// run loop
199-
rt.block_on(stop_rx).map_err(io::Error::other)
199+
rt.block_on(wait_for_stop(stop_rx))
200200
}
201201

202202
/// Retrieves a reference to the underlying [Actix runtime](crate::Runtime) associated with this
@@ -233,6 +233,43 @@ impl SystemRunner {
233233
&self.rt
234234
}
235235

236+
/// Returns a future that resolves with the system's exit code when it is stopped.
237+
///
238+
/// This can be used to react to a system stop signal while running a future with
239+
/// [`SystemRunner::block_on`], such as when coordinating shutdown with `tokio::select!`.
240+
///
241+
/// # Examples
242+
/// ```no_run
243+
/// use std::process::ExitCode;
244+
/// use actix_rt::System;
245+
///
246+
/// let sys = System::new();
247+
/// let stop = sys.stop_future();
248+
///
249+
/// let exit = sys.block_on(async move {
250+
/// actix_rt::spawn(async {
251+
/// System::current().stop_with_code(0);
252+
/// });
253+
///
254+
/// let code = stop.await.unwrap_or(1);
255+
/// ExitCode::from(code as u8)
256+
/// });
257+
///
258+
/// # drop(exit);
259+
/// ```
260+
pub fn stop_future(&self) -> SystemStop {
261+
SystemStop::new(self.stop_rx.clone())
262+
}
263+
264+
/// Splits this runner into its runtime and a future that resolves when the system stops.
265+
///
266+
/// After calling this method, [`SystemRunner::run`] and [`SystemRunner::run_with_code`] can no
267+
/// longer be used.
268+
pub fn into_parts(self) -> (crate::runtime::Runtime, SystemStop) {
269+
let SystemRunner { rt, stop_rx } = self;
270+
(rt, SystemStop::new(stop_rx))
271+
}
272+
236273
/// Runs the provided future, blocking the current thread until the future completes.
237274
#[track_caller]
238275
#[inline]
@@ -259,11 +296,21 @@ impl SystemRunner {
259296
unimplemented!("SystemRunner::run_with_code is not implemented for io-uring feature yet");
260297
}
261298

299+
/// Returns a future that resolves with the system's exit code when it is stopped.
300+
pub fn stop_future(&self) -> SystemStop {
301+
unimplemented!("SystemRunner::stop_future is not implemented for io-uring feature yet");
302+
}
303+
304+
/// Splits this runner into its runtime and a future that resolves when the system stops.
305+
pub fn into_parts(self) -> (crate::runtime::Runtime, SystemStop) {
306+
unimplemented!("SystemRunner::into_parts is not implemented for io-uring feature yet");
307+
}
308+
262309
/// Runs the provided future, blocking the current thread until the future completes.
263310
#[inline]
264311
pub fn block_on<F: Future>(&self, fut: F) -> F::Output {
265312
tokio_uring::start(async move {
266-
let (stop_tx, stop_rx) = oneshot::channel();
313+
let (stop_tx, stop_rx) = watch::channel(None);
267314
let (sys_tx, sys_rx) = mpsc::unbounded_channel();
268315

269316
let sys_arbiter = Arbiter::in_new_system();
@@ -285,6 +332,40 @@ impl SystemRunner {
285332
}
286333
}
287334

335+
/// Future that resolves with the exit code when a [`System`] is stopped.
336+
#[must_use = "SystemStop does nothing unless polled or awaited."]
337+
pub struct SystemStop {
338+
inner: Pin<Box<dyn Future<Output = io::Result<i32>> + 'static>>,
339+
}
340+
341+
impl SystemStop {
342+
#[cfg_attr(feature = "io-uring", allow(dead_code))]
343+
fn new(stop_rx: watch::Receiver<Option<i32>>) -> Self {
344+
Self {
345+
inner: Box::pin(wait_for_stop(stop_rx)),
346+
}
347+
}
348+
}
349+
350+
impl Future for SystemStop {
351+
type Output = io::Result<i32>;
352+
353+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
354+
self.inner.as_mut().poll(cx)
355+
}
356+
}
357+
358+
#[cfg_attr(feature = "io-uring", allow(dead_code))]
359+
async fn wait_for_stop(mut stop_rx: watch::Receiver<Option<i32>>) -> io::Result<i32> {
360+
loop {
361+
if let Some(code) = *stop_rx.borrow() {
362+
return Ok(code);
363+
}
364+
365+
stop_rx.changed().await.map_err(io::Error::other)?;
366+
}
367+
}
368+
288369
#[derive(Debug)]
289370
pub(crate) enum SystemCommand {
290371
Exit(i32),
@@ -296,15 +377,15 @@ pub(crate) enum SystemCommand {
296377
/// [Arbiter]s and is able to distribute a system-wide stop command.
297378
#[derive(Debug)]
298379
pub(crate) struct SystemController {
299-
stop_tx: Option<oneshot::Sender<i32>>,
380+
stop_tx: Option<watch::Sender<Option<i32>>>,
300381
cmd_rx: mpsc::UnboundedReceiver<SystemCommand>,
301382
arbiters: HashMap<usize, ArbiterHandle>,
302383
}
303384

304385
impl SystemController {
305386
pub(crate) fn new(
306387
cmd_rx: mpsc::UnboundedReceiver<SystemCommand>,
307-
stop_tx: oneshot::Sender<i32>,
388+
stop_tx: watch::Sender<Option<i32>>,
308389
) -> Self {
309390
SystemController {
310391
cmd_rx,
@@ -335,7 +416,7 @@ impl Future for SystemController {
335416
// stop event loop
336417
// will only fire once
337418
if let Some(stop_tx) = self.stop_tx.take() {
338-
let _ = stop_tx.send(code);
419+
let _ = stop_tx.send(Some(code));
339420
}
340421
}
341422

actix-rt/tests/tests.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,34 @@ fn run_with_code() {
3434
assert_eq!(exit_code, 42);
3535
}
3636

37+
#[cfg(not(feature = "io-uring"))]
38+
#[test]
39+
fn stop_future_resolves() {
40+
let sys = System::new();
41+
let stop = sys.stop_future();
42+
43+
let exit_code = sys.block_on(async move {
44+
System::current().stop_with_code(7);
45+
stop.await.expect("stop future should resolve")
46+
});
47+
48+
assert_eq!(exit_code, 7);
49+
}
50+
51+
#[cfg(not(feature = "io-uring"))]
52+
#[test]
53+
fn into_parts_stop_future_resolves() {
54+
let sys = System::new();
55+
let (rt, stop) = sys.into_parts();
56+
57+
let exit_code = rt.block_on(async move {
58+
System::current().stop_with_code(9);
59+
stop.await.expect("stop future should resolve")
60+
});
61+
62+
assert_eq!(exit_code, 9);
63+
}
64+
3765
#[test]
3866
fn join_another_arbiter() {
3967
let time = Duration::from_secs(1);

0 commit comments

Comments
 (0)