diff --git a/Cargo.lock b/Cargo.lock index fd47053071b7..8bb9c1251a41 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -623,6 +623,7 @@ dependencies = [ "gio", "glib", "glib-build-tools", + "libc", ] [[package]] diff --git a/examples/Cargo.toml b/examples/Cargo.toml index dd14731b3af7..a3de5f78511c 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -9,7 +9,8 @@ version.workspace = true futures = "0.3" futures-channel = "0.3" futures-util = "0.3" -glib.workspace = true +libc = "0.2" +glib = { workspace = true, features = ["futures"] } gio.workspace = true [dependencies.async-tls] diff --git a/examples/gio_futures_await/main.rs b/examples/gio_futures_await/main.rs index 855a102ddcac..5967ad1c98a1 100644 --- a/examples/gio_futures_await/main.rs +++ b/examples/gio_futures_await/main.rs @@ -1,30 +1,43 @@ -use std::str; - use futures::prelude::*; use gio::prelude::*; use glib::clone; fn main() { let c = glib::MainContext::default(); - let l = glib::MainLoop::new(Some(&c), false); - let file = gio::File::for_path("Cargo.toml"); + let cancellable = gio::Cancellable::new(); let future = clone!( #[strong] - l, + cancellable, async move { match read_file(file).await { Ok(()) => (), Err(err) => eprintln!("Got error: {err}"), } - l.quit(); + cancellable.cancel(); } ); - c.spawn_local(future); + #[cfg(unix)] + let cancel_future = clone!( + #[strong] + cancellable, + async move { + glib::unix_signal_future(libc::SIGINT).await; + eprintln!("Ctrl+C pressed, operation will be stopped!"); + cancellable.cancel(); + } + ); + #[cfg(not(unix))] + let cancel_future = async move {}; - l.run(); + let _ = c + .block_on(c.spawn_local(gio::CancellableFuture::new( + futures_util::future::join(future, cancel_future), + cancellable, + ))) + .expect("futures must be executed"); } /// Throughout our chained futures, we convert all errors to strings diff --git a/gio/src/cancellable.rs b/gio/src/cancellable.rs index 1423d4b32930..19b9cd5df3ab 100644 --- a/gio/src/cancellable.rs +++ b/gio/src/cancellable.rs @@ -214,15 +214,11 @@ mod tests { fn cancellable_future_delayed() { let ctx = glib::MainContext::new(); let c = Cancellable::new(); - let (tx, rx) = oneshot::channel(); - { + let future = { let c = c.clone(); - ctx.spawn_local(async move { - c.future().await; - tx.send(()).unwrap(); - }); - } + ctx.spawn_local(c.future()) + }; std::thread::spawn(move || c.cancel()).join().unwrap(); - ctx.block_on(rx).unwrap(); + ctx.block_on(future).unwrap(); } } diff --git a/gio/src/cancellable_future.rs b/gio/src/cancellable_future.rs index 56ed941d712f..5a5c1bd07973 100644 --- a/gio/src/cancellable_future.rs +++ b/gio/src/cancellable_future.rs @@ -19,6 +19,11 @@ pin_project! { // rustdoc-stripper-ignore-next /// A future which can be cancelled via [`Cancellable`]. /// + /// It can be used to cancel one or multiple Gio futures that support + /// internal cancellation via [`Cancellable`] by using this future to + /// execute cancellable promises that are created (implicitly or explicitly) + /// via [`GioFuture`]. + /// /// # Examples /// /// ``` @@ -32,14 +37,59 @@ pin_project! { /// c.cancel(); /// /// ``` + /// + /// As said the [`CancellableFuture`] can be used to handle Gio futures, + /// relying on an actual [`Cancellable`] instance or with a new one. + /// + /// ```no_run + /// # use futures::FutureExt; + /// # use gio::prelude::*; + /// # use gio::CancellableFuture; + /// # async { + /// CancellableFuture::new( + /// async { + /// let file = gio::File::for_path("/dev/null"); + /// let file_info = file + /// .query_info_future( + /// gio::FILE_ATTRIBUTE_STANDARD_NAME, + /// gio::FileQueryInfoFlags::NONE, + /// glib::Priority::default(), + /// ) + /// .await?; + /// + /// // Sub-cancellable chains are also working as expected. + /// // The new cancellable will have its own scope, but will also be + /// // cancelled when the parent cancellable is cancelled. + /// let io_stream = CancellableFuture::new( + /// file.open_readwrite_future(glib::Priority::default()), + /// gio::Cancellable::new(), + /// ) + /// .await??; + /// // [...] + /// Ok::(true) + /// }, + /// gio::Cancellable::new(), + /// ) + /// .await + /// # }; + /// ``` pub struct CancellableFuture { #[pin] future: F, - #[pin] - waker_handler_cb: Option, - cancellable: Cancellable, + connection_id: Option, + + waker: std::sync::Arc>>, + } + + impl PinnedDrop for CancellableFuture { + fn drop(this: Pin<&mut Self>) { + let this = this.project(); + if let Some(connection_id) = this.connection_id.take() { + this.cancellable.disconnect_cancelled(connection_id); + } + } } } @@ -51,10 +101,22 @@ impl CancellableFuture { /// immediately without making any further progress. In such a case, an error /// will be returned by this future (i.e., [`Cancelled`]). pub fn new(future: F, cancellable: Cancellable) -> Self { + let waker = std::sync::Arc::new(std::sync::Mutex::new(None::)); + let connection_id = cancellable.connect_cancelled(glib::clone!( + #[strong] + waker, + move |_| { + if let Some(waker) = waker.lock().unwrap().take() { + waker.wake(); + } + } + )); + Self { future, - waker_handler_cb: None, cancellable, + connection_id, + waker, } } @@ -87,41 +149,29 @@ where type Output = Result<::Output, Cancelled>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - if self.is_cancelled() { - return Poll::Ready(Err(Cancelled)); + if self.cancellable.is_cancelled() { + // XXX: Whenever we want to break the API, we should return here only + // self.cancellable.set_error_if_cancelled() value. + return std::task::Poll::Ready(Err(Cancelled)); } - let mut this = self.as_mut().project(); + let mut waker = self.waker.lock().unwrap(); + if waker.is_none() { + *waker = Some(cx.waker().clone()); + } + drop(waker); + let this = self.as_mut().project(); match this.future.poll(cx) { Poll::Ready(out) => Poll::Ready(Ok(out)), - - Poll::Pending => { - if let Some(prev_handler) = this.waker_handler_cb.take() { - this.cancellable.disconnect_cancelled(prev_handler); - } - - let canceller_handler_id = this.cancellable.connect_cancelled({ - let w = cx.waker().clone(); - move |_| w.wake() - }); - - match canceller_handler_id { - Some(canceller_handler_id) => { - *this.waker_handler_cb = Some(canceller_handler_id); - Poll::Pending - } - - None => Poll::Ready(Err(Cancelled)), - } - } + Poll::Pending => Poll::Pending, } } } impl From for glib::Error { fn from(_: Cancelled) -> Self { - glib::Error::new(IOErrorEnum::Cancelled, "Task cancelled") + glib::Error::new(IOErrorEnum::Cancelled, "Operation was cancelled") } } @@ -129,7 +179,7 @@ impl std::error::Error for Cancelled {} impl Debug for Cancelled { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "Task cancelled") + write!(f, "Operation was cancelled") } } @@ -141,52 +191,276 @@ impl Display for Cancelled { #[cfg(test)] mod tests { - use futures_channel::oneshot; + use std::future::Future; + use std::time::Duration; use super::{Cancellable, CancellableFuture, Cancelled}; - use crate::prelude::*; + use crate::{prelude::*, spawn_blocking}; + + async fn cancel_after_timeout(duration: Duration, cancellable: Cancellable) { + glib::timeout_future_with_priority(glib::Priority::default(), duration).await; + cancellable.cancel(); + } + + async fn cancel_after_sleep_in_thread(duration: Duration, cancellable: Cancellable) { + spawn_blocking(move || { + std::thread::sleep(duration); + cancellable.cancel(); + }) + .await + .unwrap() + } + + fn async_begin) + Send + 'static>( + duration: Duration, + must_be_cancelled_on_begin: bool, + must_be_cancelled_after_sleep: bool, + cancellable: &Cancellable, + callback: P, + ) { + // We do not use std::thread here since we want to simulate what C code normally does. + // Also not using spawn_blocking() directly, since we want to have the full control + // for the test case. + let callback = Box::new(callback); + let task = unsafe { + crate::Task::::new(None::<&glib::Binding>, Some(cancellable), move |t, _| { + let cancellable = t.cancellable().unwrap(); + let ret = t.propagate(); + println!( + "Task callback, returning {:?} - cancelled {}", + ret, + cancellable.is_cancelled() + ); + assert_eq!(cancellable.is_cancelled(), must_be_cancelled_after_sleep); + match ret { + Err(e) => callback(Err(e)), + Ok(_) => callback(Ok(())), + }; + }) + }; + + task.run_in_thread(move |task, _: Option<&glib::Binding>, cancellable| { + let cancellable = cancellable.unwrap(); + let func = || { + println!( + "Task thread started, cancelled {} - want {}", + cancellable.is_cancelled(), + must_be_cancelled_on_begin + ); + assert_eq!(cancellable.is_cancelled(), must_be_cancelled_on_begin); + std::thread::sleep(duration); + assert_eq!(cancellable.is_cancelled(), must_be_cancelled_after_sleep); + println!( + "Task thread done, cancelled {} - want {}", + cancellable.is_cancelled(), + must_be_cancelled_after_sleep + ) + }; + + if let Err(e) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(func)) { + std::panic::resume_unwind(e); + } + + unsafe { + task.return_result(match cancellable.set_error_if_cancelled() { + Err(e) => Err(e), + Ok(_) => Ok(true), + }); + } + }); + } + + fn async_future( + duration: Duration, + must_be_cancelled_on_begin: bool, + must_be_cancelled_after_sleep: bool, + ) -> std::pin::Pin> + 'static>> { + Box::pin(crate::GioFuture::new(&(), move |_, cancellable, send| { + async_begin( + duration, + must_be_cancelled_on_begin, + must_be_cancelled_after_sleep, + cancellable, + move |res| send.resolve(res), + ); + })) + } #[test] fn cancellable_future_ok() { let ctx = glib::MainContext::new(); let c = Cancellable::new(); - let (tx, rx) = oneshot::channel(); - { + let future = { ctx.spawn_local(async { let cancellable_future = CancellableFuture::new(async { 42 }, c); assert!(!cancellable_future.is_cancelled()); let result = cancellable_future.await; assert!(matches!(result, Ok(42))); + }) + }; - tx.send(()).unwrap(); - }); - } - - ctx.block_on(rx).unwrap() + ctx.block_on(future).unwrap() } #[test] fn cancellable_future_cancel() { let ctx = glib::MainContext::new(); let c = Cancellable::new(); - let (tx, rx) = oneshot::channel(); - { + let future = { let c = c.clone(); ctx.spawn_local(async move { let cancellable_future = CancellableFuture::new(std::future::pending::<()>(), c); let result = cancellable_future.await; assert!(matches!(result, Err(Cancelled))); + }) + }; + + std::thread::spawn(move || c.cancel()).join().unwrap(); + + ctx.block_on(future).unwrap(); + } + + #[test] + fn cancellable_future_delayed_cancel_local() { + let ctx = glib::MainContext::new(); + let c = Cancellable::new(); + + let (r1, r2) = ctx + .block_on(ctx.spawn_local({ + futures_util::future::join( + CancellableFuture::new(std::future::pending::<()>(), c.clone()), + cancel_after_timeout(Duration::from_millis(300), c.clone()), + ) + })) + .expect("futures must be executed"); + + assert!(matches!(r1, Err(Cancelled))); + assert!(matches!(r2, ())); + } + + #[test] + fn cancellable_future_delayed_cancel_from_other_thread() { + let ctx = glib::MainContext::new(); + let c = Cancellable::new(); + + let (r1, r2) = ctx + .block_on(ctx.spawn_local({ + futures_util::future::join( + CancellableFuture::new(std::future::pending::<()>(), c.clone()), + cancel_after_sleep_in_thread(Duration::from_millis(300), c.clone()), + ) + })) + .expect("futures must be executed"); + + assert!(matches!(r1, Err(Cancelled))); + assert!(matches!(r2, ())); + } + + #[test] + fn cancellable_future_immediate_cancel_with_gio_future() { + let ctx = glib::MainContext::new(); + let c = Cancellable::new(); - tx.send(()).unwrap(); - }); + async fn async_chain() -> Result<(), glib::Error> { + async_future(Duration::from_millis(250), true, true).await?; + async_future(Duration::from_secs(9999999), true, true).await } - std::thread::spawn(move || c.cancel()).join().unwrap(); + c.cancel(); + + let result = ctx + .block_on(ctx.spawn_local({ + CancellableFuture::new( + futures_util::future::join5( + async_chain(), + async_chain(), + async_chain(), + async_chain(), + CancellableFuture::new(async_chain(), Cancellable::new()), + ), + c.clone(), + ) + })) + .expect("futures must be executed"); + + assert!(matches!(result, Err(Cancelled))); + } + + #[test] + fn cancellable_future_delayed_cancel_with_gio_future() { + let ctx = glib::MainContext::new(); + let c = Cancellable::new(); + + async fn async_chain() -> Result<(), glib::Error> { + async_future(Duration::from_millis(250), false, true).await?; + async_future(Duration::from_secs(9999999), true, true).await + } + + let (result, _, _) = ctx + .block_on(ctx.spawn_local({ + futures_util::future::join3( + CancellableFuture::new( + futures_util::future::join5( + async_chain(), + async_chain(), + async_chain(), + async_chain(), + CancellableFuture::new(async_chain(), Cancellable::new()), + ), + c.clone(), + ), + cancel_after_sleep_in_thread(Duration::from_millis(100), c.clone()), + // Let's wait a bit more to ensure that more events are processed + // by the loop. Not required, but it simulates a more real + // scenario. + glib::timeout_future(Duration::from_millis(350)), + ) + })) + .expect("futures must be executed"); + + assert!(matches!(result, Err(Cancelled))); + } + + #[test] + fn cancellable_future_late_cancel_with_gio_future() { + let ctx = glib::MainContext::new(); + let c = Cancellable::new(); + + async fn async_chain() -> Result<(), glib::Error> { + async_future(Duration::from_millis(100), false, false).await?; + async_future(Duration::from_millis(100), false, false).await + } - ctx.block_on(rx).unwrap(); + let results = ctx + .block_on(ctx.spawn_local(async move { + let ret = CancellableFuture::new( + futures_util::future::join5( + async_chain(), + async_chain(), + async_chain(), + async_chain(), + CancellableFuture::new(async_chain(), Cancellable::new()), + ), + c.clone(), + ) + .await; + + c.cancel(); + ret + })) + .expect("futures must be executed"); + + assert!(results.is_ok()); + + let r1 = results.unwrap(); + assert!(matches!(r1.0, Ok(()))); + assert!(matches!(r1.1, Ok(()))); + assert!(matches!(r1.2, Ok(()))); + assert!(matches!(r1.3, Ok(()))); + assert!(matches!(r1.4.unwrap(), Ok(()))); } } diff --git a/gio/src/task.rs b/gio/src/task.rs index c177461f8c20..91e4bd25e81e 100644 --- a/gio/src/task.rs +++ b/gio/src/task.rs @@ -519,10 +519,10 @@ where T: Send + 'static, F: FnOnce() -> T + Send + 'static, { - // use Cancellable::NONE as source obj to fulfill `Send` requirement - let task = unsafe { Task::::new(Cancellable::NONE, Cancellable::NONE, |_, _| {}) }; + // use a None GBinding as source obj to fulfill `Send` requirement + let task = unsafe { Task::::new(None::<&glib::Binding>, Cancellable::NONE, |_, _| {}) }; let (join, tx) = JoinHandle::new(); - task.run_in_thread(move |task, _: Option<&Cancellable>, _| { + task.run_in_thread(move |task, _: Option<&glib::Binding>, _| { let res = panic::catch_unwind(panic::AssertUnwindSafe(func)); let _ = tx.send(res); unsafe { ffi::g_task_return_pointer(task.to_glib_none().0, ptr::null_mut(), None) }