Skip to content

Commit 12148df

Browse files
authored
Merge pull request gtk-rs#772 from BiagioFesta/wip/bfesta/cancellable-future
gio: `CancellableFuture` ergonomic for cancelling tasks
2 parents 61a082e + cc317a9 commit 12148df

File tree

6 files changed

+266
-0
lines changed

6 files changed

+266
-0
lines changed

examples/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,7 @@ path = "gio_futures_await/main.rs"
4040
[[bin]]
4141
name = "gio_task"
4242
path = "gio_task/main.rs"
43+
44+
[[bin]]
45+
name = "gio_cancellable_future"
46+
path = "gio_cancellable_future/main.rs"
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# GIO Futures
2+
3+
This example reads our `Cargo.toml` by executing GIO futures.
4+
5+
Run it by executing:
6+
7+
```bash
8+
cargo run --bin gio_cancellable_future
9+
```
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
use std::future::pending;
2+
use std::thread;
3+
use std::time::Duration;
4+
5+
use gio::prelude::*;
6+
7+
use futures::prelude::*;
8+
9+
/// A very long task. This task actually never ends.
10+
async fn a_very_long_task() {
11+
println!("Very long task started");
12+
pending().await
13+
}
14+
15+
fn main() {
16+
const TIMEOUT: Duration = Duration::from_secs(3);
17+
18+
let main_ctx = glib::MainContext::default();
19+
let main_loop = glib::MainLoop::new(Some(&main_ctx), false);
20+
let cancellable = gio::Cancellable::new();
21+
22+
{
23+
let main_loop = main_loop.clone();
24+
25+
// We wrap `a_very_long_task` inside a `CancellableFuture` controlled by `cancellable`.
26+
// The task is cancelled when `.cancel()` is invoked.
27+
let cancellable_task = gio::CancellableFuture::new(a_very_long_task(), cancellable.clone())
28+
.map(move |res| {
29+
if let Err(error) = res {
30+
println!("{:?}", error);
31+
}
32+
33+
main_loop.quit();
34+
});
35+
36+
main_ctx.spawn_local(cancellable_task);
37+
}
38+
39+
// We simulate a timeout here.
40+
// After `TIMEOUT` we cancel the pending task.
41+
thread::spawn(move || {
42+
thread::sleep(TIMEOUT);
43+
44+
println!(
45+
"Timeout ({:?}) elapsed! Cancelling pending task...",
46+
TIMEOUT
47+
);
48+
49+
cancellable.cancel();
50+
});
51+
52+
main_loop.run();
53+
}

gio/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,10 @@ futures-util = { version = "0.3", default-features = false }
4545
ffi = { package = "gio-sys", path = "sys" }
4646
glib = { path = "../glib" }
4747
thiserror = "1"
48+
pin-project-lite = "0.2"
4849

4950
[dev-dependencies]
51+
futures = "0.3"
5052
futures-util = { version = "0.3", features = ["io"] }
5153
gir-format-check = "^0.1"
5254
serial_test = "0.9"

gio/src/cancellable_future.rs

Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
// Take a look at the license at the top of the repository in the LICENSE file.
2+
3+
use crate::cancellable::CancellableExtManual;
4+
use crate::cancellable::CancelledHandlerId;
5+
use crate::prelude::CancellableExt;
6+
use crate::Cancellable;
7+
use crate::IOErrorEnum;
8+
use pin_project_lite::pin_project;
9+
use std::fmt::Debug;
10+
use std::fmt::Display;
11+
use std::future::Future;
12+
use std::pin::Pin;
13+
use std::task::Context;
14+
use std::task::Poll;
15+
16+
// rustdoc-stripper-ignore-next
17+
/// Indicator that the [`CancellableFuture`] was cancelled.
18+
pub struct Cancelled;
19+
20+
pin_project! {
21+
// rustdoc-stripper-ignore-next
22+
/// A future which can be cancelled via [`Cancellable`].
23+
///
24+
/// # Examples
25+
///
26+
/// ```
27+
/// # use futures::FutureExt;
28+
/// # use gio::traits::CancellableExt;
29+
/// # use gio::CancellableFuture;
30+
/// let l = glib::MainLoop::new(None, false);
31+
/// let c = gio::Cancellable::new();
32+
///
33+
/// l.context().spawn_local(CancellableFuture::new(async { 42 }, c.clone()).map(|_| ()));
34+
/// c.cancel();
35+
///
36+
/// ```
37+
pub struct CancellableFuture<F> {
38+
#[pin]
39+
future: F,
40+
41+
#[pin]
42+
waker_handler_cb: Option<CancelledHandlerId>,
43+
44+
cancellable: Cancellable,
45+
}
46+
}
47+
48+
impl<F> CancellableFuture<F> {
49+
// rustdoc-stripper-ignore-next
50+
/// Creates a new `CancellableFuture` using a [`Cancellable`].
51+
///
52+
/// When [`cancel`](CancellableExt::cancel) is called, the future will complete
53+
/// immediately without making any further progress. In such a case, an error
54+
/// will be returned by this future (i.e., [`Cancelled`]).
55+
pub fn new(future: F, cancellable: Cancellable) -> Self {
56+
Self {
57+
future,
58+
waker_handler_cb: None,
59+
cancellable,
60+
}
61+
}
62+
63+
// rustdoc-stripper-ignore-next
64+
/// Checks whether the future has been cancelled.
65+
///
66+
/// This is a shortcut for `self.cancellable().is_cancelled()`
67+
///
68+
/// Note that all this method indicates is whether [`cancel`](CancellableExt::cancel)
69+
/// was called. This means that it will return true even if:
70+
/// * `cancel` was called after the future had completed.
71+
/// * `cancel` was called while the future was being polled.
72+
#[inline]
73+
pub fn is_cancelled(&self) -> bool {
74+
self.cancellable.is_cancelled()
75+
}
76+
77+
// rustdoc-stripper-ignore-next
78+
/// Returns the inner [`Cancellable`] associated during creation.
79+
#[inline]
80+
pub fn cancellable(&self) -> &Cancellable {
81+
&self.cancellable
82+
}
83+
}
84+
85+
impl<F> Future for CancellableFuture<F>
86+
where
87+
F: Future,
88+
{
89+
type Output = Result<<F as Future>::Output, Cancelled>;
90+
91+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
92+
if self.is_cancelled() {
93+
return Poll::Ready(Err(Cancelled));
94+
}
95+
96+
let mut this = self.as_mut().project();
97+
98+
match this.future.poll(cx) {
99+
Poll::Ready(out) => Poll::Ready(Ok(out)),
100+
101+
Poll::Pending => {
102+
if let Some(prev_handler) = this.waker_handler_cb.take() {
103+
this.cancellable.disconnect_cancelled(prev_handler);
104+
}
105+
106+
let canceller_handler_id = this.cancellable.connect_cancelled({
107+
let w = cx.waker().clone();
108+
move |_| w.wake()
109+
});
110+
111+
match canceller_handler_id {
112+
Some(canceller_handler_id) => {
113+
*this.waker_handler_cb = Some(canceller_handler_id);
114+
Poll::Pending
115+
}
116+
117+
None => Poll::Ready(Err(Cancelled)),
118+
}
119+
}
120+
}
121+
}
122+
}
123+
124+
impl From<Cancelled> for glib::Error {
125+
fn from(_: Cancelled) -> Self {
126+
glib::Error::new(IOErrorEnum::Cancelled, "Task cancelled")
127+
}
128+
}
129+
130+
impl std::error::Error for Cancelled {}
131+
132+
impl Debug for Cancelled {
133+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
134+
write!(f, "Task cancelled")
135+
}
136+
}
137+
138+
impl Display for Cancelled {
139+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
140+
Debug::fmt(self, f)
141+
}
142+
}
143+
144+
#[cfg(test)]
145+
mod tests {
146+
use super::Cancellable;
147+
use super::CancellableExt;
148+
use super::CancellableFuture;
149+
use super::Cancelled;
150+
use futures_channel::oneshot;
151+
152+
#[test]
153+
fn cancellable_future_ok() {
154+
let ctx = glib::MainContext::new();
155+
let c = Cancellable::new();
156+
let (tx, rx) = oneshot::channel();
157+
158+
{
159+
ctx.spawn_local(async {
160+
let cancellable_future = CancellableFuture::new(async { 42 }, c);
161+
assert!(!cancellable_future.is_cancelled());
162+
163+
let result = cancellable_future.await;
164+
assert!(matches!(result, Ok(42)));
165+
166+
tx.send(()).unwrap();
167+
});
168+
}
169+
170+
ctx.block_on(rx).unwrap()
171+
}
172+
173+
#[test]
174+
fn cancellable_future_cancel() {
175+
let ctx = glib::MainContext::new();
176+
let c = Cancellable::new();
177+
let (tx, rx) = oneshot::channel();
178+
179+
{
180+
let c = c.clone();
181+
ctx.spawn_local(async move {
182+
let cancellable_future = CancellableFuture::new(std::future::pending::<()>(), c);
183+
184+
let result = cancellable_future.await;
185+
assert!(matches!(result, Err(Cancelled)));
186+
187+
tx.send(()).unwrap();
188+
});
189+
}
190+
191+
std::thread::spawn(move || c.cancel()).join().unwrap();
192+
193+
ctx.block_on(rx).unwrap();
194+
}
195+
}

gio/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ pub use action_entry::{ActionEntry, ActionEntryBuilder};
2020
pub use application::{ApplicationBusyGuard, ApplicationHoldGuard};
2121
mod async_initable;
2222
mod cancellable;
23+
mod cancellable_future;
24+
pub use crate::cancellable_future::CancellableFuture;
25+
pub use crate::cancellable_future::Cancelled;
2326
mod converter;
2427
mod data_input_stream;
2528
mod dbus;

0 commit comments

Comments
 (0)