diff --git a/examples/Cargo.toml b/examples/Cargo.toml index c703b6bf524f..dd14731b3af7 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -36,6 +36,10 @@ path = "gio_futures/main.rs" name = "gio_futures_await" path = "gio_futures_await/main.rs" +[[bin]] +name = "gio_dbus_receive_signals" +path = "gio_dbus_receive_signals/main.rs" + [[bin]] name = "gio_dbus_register_object" path = "gio_dbus_register_object/main.rs" diff --git a/examples/gio_dbus_receive_signals/main.rs b/examples/gio_dbus_receive_signals/main.rs new file mode 100644 index 000000000000..015b954e4b71 --- /dev/null +++ b/examples/gio_dbus_receive_signals/main.rs @@ -0,0 +1,105 @@ +use gio::prelude::*; + +glib::wrapper! { + pub struct SampleApplication(ObjectSubclass) + @extends gio::Application, + @implements gio::ActionGroup, gio::ActionMap; +} + +impl Default for SampleApplication { + fn default() -> Self { + glib::Object::builder() + .property( + "application-id", + "com.github.gtk-rs.examples.ReceiveDBusSignals", + ) + .build() + } +} + +mod imp { + use std::cell::RefCell; + + use futures::{future, StreamExt}; + use gio::prelude::*; + use gio::subclass::prelude::*; + use gio::{bus_get_future, BusType, DBusSignalFlags, WeakSignalSubscription}; + + const DESKTOP_PORTAL_BUSNAME: &str = "org.freedesktop.portal.Desktop"; + const DESKTOP_PORTAL_OBJPATH: &str = "/org/freedesktop/portal/desktop"; + const SETTINGS_PORTAL_IFACE: &str = "org.freedesktop.portal.Settings"; + + #[derive(Default)] + pub struct SampleApplication { + signal_subscription: RefCell>, + } + + #[glib::object_subclass] + impl ObjectSubclass for SampleApplication { + const NAME: &'static str = "SampleApplication"; + + type Type = super::SampleApplication; + + type ParentType = gio::Application; + } + + impl ObjectImpl for SampleApplication {} + + impl ApplicationImpl for SampleApplication { + fn startup(&self) { + self.parent_startup(); + + self.signal_subscription.replace(Some( + self.obj() + .dbus_connection() + .unwrap() + .subscribe_to_signal( + Some(DESKTOP_PORTAL_BUSNAME), + Some(SETTINGS_PORTAL_IFACE), + Some("SettingChanged"), + Some(DESKTOP_PORTAL_OBJPATH), + None, + DBusSignalFlags::NONE, + |signal| { + println!( + "Callback received signal {}.{} from {} at {} with parameters: {}", + signal.interface_name, + signal.signal_name, + signal.object_path, + signal.sender_name, + signal.parameters + ) + }, + ) + .downgrade(), + )); + + glib::spawn_future_local(async move { + let session_bus = bus_get_future(BusType::Session).await.unwrap(); + session_bus + .receive_signal_parameters::<(String, String, glib::Variant)>( + Some(DESKTOP_PORTAL_BUSNAME), + Some(SETTINGS_PORTAL_IFACE), + Some("SettingChanged"), + Some(DESKTOP_PORTAL_OBJPATH), + None, + DBusSignalFlags::NONE, + ) + .for_each(|result| { + let (iface, setting, value) = result.unwrap(); + println!("Setting {iface}.{setting} changed to {value}"); + future::ready(()) + }) + .await + }); + } + + fn activate(&self) {} + } +} + +fn main() -> glib::ExitCode { + let app = SampleApplication::default(); + let _guard = app.hold(); + app.run() +} diff --git a/gio/src/dbus_connection.rs b/gio/src/dbus_connection.rs index 9dc2b734870a..f61d93102e61 100644 --- a/gio/src/dbus_connection.rs +++ b/gio/src/dbus_connection.rs @@ -6,7 +6,10 @@ use crate::{ ffi, ActionGroup, DBusConnection, DBusInterfaceInfo, DBusMessage, DBusMethodInvocation, DBusSignalFlags, MenuModel, }; -use glib::{prelude::*, translate::*}; +use futures_channel::mpsc; +use futures_core::{FusedStream, Stream}; +use glib::{prelude::*, translate::*, variant::VariantTypeMismatchError, WeakRef}; +use pin_project_lite::pin_project; pub trait DBusMethodCall: Sized { fn parse_call( @@ -117,9 +120,171 @@ pub struct ActionGroupExportId(NonZeroU32); pub struct MenuModelExportId(NonZeroU32); #[derive(Debug, Eq, PartialEq)] pub struct FilterId(NonZeroU32); + #[derive(Debug, Eq, PartialEq)] pub struct SignalSubscriptionId(NonZeroU32); +// rustdoc-stripper-ignore-next +/// A strong subscription to a D-Bus signal. +/// +/// Keep a reference to a D-Bus connection to maintain a subscription on a +/// D-Bus signal even if the connection has no other strong reference. +/// +/// When dropped, unsubscribes from signal on the connection, and then drop the +/// reference on the connection. If no other strong reference on the connection +/// exists the connection is closed and destroyed. +#[derive(Debug)] +pub struct SignalSubscription(DBusConnection, Option); + +impl SignalSubscription { + // rustdoc-stripper-ignore-next + /// Downgrade this signal subscription to a weak one. + #[must_use] + pub fn downgrade(mut self) -> WeakSignalSubscription { + WeakSignalSubscription(self.0.downgrade(), self.1.take()) + } +} + +impl Drop for SignalSubscription { + fn drop(&mut self) { + if let Some(id) = self.1.take() { + #[allow(deprecated)] + self.0.signal_unsubscribe(id); + } + } +} + +// rustdoc-stripper-ignore-next +/// A weak subscription to a D-Bus signal. +/// +/// Like [`SignalSubscription`] but hold only a weak reference to the D-Bus +/// connection the siganl is subscribed on, i.e. maintain the subscription on +/// the D-Bus signal only as long as some strong reference exists on the +/// corresponding D-Bus connection. +/// +/// When dropped, unsubscribes from signal on the connection if it still exists, +/// and then drop the reference on the connection. If no other strong reference +/// on the connection exists the connection is closed and destroyed. +#[derive(Debug)] +pub struct WeakSignalSubscription(WeakRef, Option); + +impl WeakSignalSubscription { + // rustdoc-stripper-ignore-next + /// Upgrade this signal subscription to a strong one. + #[must_use] + pub fn upgrade(mut self) -> Option { + self.0 + .upgrade() + .map(|c| SignalSubscription(c, self.1.take())) + } +} + +impl Drop for WeakSignalSubscription { + fn drop(&mut self) { + if let Some(id) = self.1.take() { + if let Some(connection) = self.0.upgrade() { + #[allow(deprecated)] + connection.signal_unsubscribe(id); + } + } + } +} + +// rustdoc-stripper-ignore-next +/// An emitted D-Bus signal. +#[derive(Debug, Copy, Clone)] +pub struct DBusSignalRef<'a> { + // rustdoc-stripper-ignore-next + /// The connection the signal was emitted on. + pub connection: &'a DBusConnection, + // rustdoc-stripper-ignore-next + /// The bus name of the sender which emitted the signal. + pub sender_name: &'a str, + // rustdoc-stripper-ignore-next + /// The path of the object on `sender` the signal was emitted from. + pub object_path: &'a str, + // rustdoc-stripper-ignore-next + /// The interface the signal belongs to. + pub interface_name: &'a str, + // rustdoc-stripper-ignore-next + /// The name of the emitted signal. + pub signal_name: &'a str, + // rustdoc-stripper-ignore-next + /// Parameters the signal was emitted with. + pub parameters: &'a glib::Variant, +} + +pin_project! { + // rustdoc-stripper-ignore-next + /// A subscribed stream. + /// + /// A stream which wraps an inner stream of type `S` while holding on to a + /// subscription handle `H` to keep a subscription alive. + #[derive(Debug)] + #[must_use = "streams do nothing unless polled"] + pub struct SubscribedSignalStream { + #[pin] + stream: S, + subscription: H, + } +} + +impl SubscribedSignalStream { + // rustdoc-stripper-ignore-next + /// Downgrade the inner signal subscription to a weak one. + /// + /// See [`SignalSubscription::downgrade`] and [`WeakSignalSubscription`]. + pub fn downgrade(self) -> SubscribedSignalStream { + SubscribedSignalStream { + subscription: self.subscription.downgrade(), + stream: self.stream, + } + } +} + +impl SubscribedSignalStream { + // rustdoc-stripper-ignore-next + /// Upgrade the inner signal subscription to a strong one. + /// + /// See [`WeakSignalSubscription::upgrade`] and [`SignalSubscription`]. + pub fn downgrade(self) -> Option> { + self.subscription + .upgrade() + .map(|subscription| SubscribedSignalStream { + subscription, + stream: self.stream, + }) + } +} + +impl Stream for SubscribedSignalStream +where + S: Stream, +{ + type Item = S::Item; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let this = self.project(); + this.stream.poll_next(cx) + } + + fn size_hint(&self) -> (usize, Option) { + self.stream.size_hint() + } +} + +impl FusedStream for SubscribedSignalStream +where + S: FusedStream, +{ + fn is_terminated(&self) -> bool { + self.stream.is_terminated() + } +} + // rustdoc-stripper-ignore-next /// Build a registered DBus object, by handling different parts of DBus. #[must_use = "The builder must be built to be used"] @@ -437,8 +602,52 @@ impl DBusConnection { } } + // rustdoc-stripper-ignore-next + /// Subscribe to a D-Bus signal. + /// + /// See [`Self::signal_subscribe`] for arguments. + /// + /// Return a signal subscription which keeps a reference to this D-Bus + /// connection and unsubscribes from the signal when dropped. + /// + /// To avoid reference cycles you may wish to downgrade the returned + /// subscription to a weak one with [`SignalSubscription::downgrade`]. + #[must_use] + pub fn subscribe_to_signal( + &self, + sender: Option<&str>, + interface_name: Option<&str>, + member: Option<&str>, + object_path: Option<&str>, + arg0: Option<&str>, + flags: DBusSignalFlags, + callback: P, + ) -> SignalSubscription { + #[allow(deprecated)] + let id = self.signal_subscribe( + sender, + interface_name, + member, + object_path, + arg0, + flags, + move |connection, sender_name, object_path, interface_name, signal_name, parameters| { + callback(DBusSignalRef { + connection, + sender_name, + object_path, + interface_name, + signal_name, + parameters, + }); + }, + ); + SignalSubscription(self.clone(), Some(id)) + } + #[doc(alias = "g_dbus_connection_signal_subscribe")] #[allow(clippy::too_many_arguments)] + #[deprecated(note = "Prefer subscribe_to_signal")] pub fn signal_subscribe< P: Fn(&DBusConnection, &str, &str, &str, &str, &glib::Variant) + 'static, >( @@ -507,6 +716,7 @@ impl DBusConnection { } #[doc(alias = "g_dbus_connection_signal_unsubscribe")] + #[deprecated(note = "Prefer subscribe_to_signal")] pub fn signal_unsubscribe(&self, subscription_id: SignalSubscriptionId) { unsafe { ffi::g_dbus_connection_signal_unsubscribe( @@ -515,4 +725,86 @@ impl DBusConnection { ); } } + + // rustdoc-stripper-ignore-next + /// Subscribe to a D-Bus signal and receive signal emissions as a stream. + /// + /// See [`Self::signal_subscribe`] for arguments. `map_signal` maps the + /// received signal to the stream's element. + /// + /// The returned stream holds a strong reference to this D-Bus connection, + /// and unsubscribes from the signal when dropped. To avoid reference cycles + /// you may wish to downgrade the returned stream to hold only weak + /// reference to the connection using [`SubscribedSignalStream::downgrade`]. + /// + /// After invoking `map_signal` the stream threads incoming signals through + /// an unbounded channel. Hence, memory consumption will keep increasing + /// as long as the stream consumer does not keep up with signal emissions. + /// If you need to perform expensive processing in response to signals it's + /// therefore recommended to insert an extra buffering and if the buffer + /// overruns, either fail drop the entire stream, or drop individual signal + /// emissions until the buffer has space again. + pub fn receive_signal T + 'static>( + &self, + sender: Option<&str>, + interface_name: Option<&str>, + member: Option<&str>, + object_path: Option<&str>, + arg0: Option<&str>, + flags: DBusSignalFlags, + map_signal: F, + ) -> SubscribedSignalStream + use> { + let (tx, rx) = mpsc::unbounded(); + let subscription = self.subscribe_to_signal( + sender, + interface_name, + member, + object_path, + arg0, + flags, + move |signal| { + // Just ignore send errors: if the receiver is dropped, the + // signal subscription is dropped too, so the callback won't + // be invoked anymore. + let _ = tx.unbounded_send(map_signal(signal)); + }, + ); + SubscribedSignalStream { + subscription, + stream: rx, + } + } + + // rustdoc-stripper-ignore-next + /// Subscribe to a D-Bus signal and receive signal parameters as a stream. + /// + /// Like [`Self::receive_signal`] (which see for more information), but + /// automatically decodes the emitted signal parameters to type `T`. + /// If decoding fails the corresponding variant type error is sent + /// downstream. + pub fn receive_signal_parameters( + &self, + sender: Option<&str>, + interface_name: Option<&str>, + member: Option<&str>, + object_path: Option<&str>, + arg0: Option<&str>, + flags: DBusSignalFlags, + ) -> SubscribedSignalStream< + SignalSubscription, + impl Stream> + use, + > + where + T: FromVariant + 'static, + { + self.receive_signal( + sender, + interface_name, + member, + object_path, + arg0, + flags, + |signal| signal.parameters.try_get(), + ) + } } diff --git a/gio/src/lib.rs b/gio/src/lib.rs index cc6e13f840d8..314feb6740a6 100644 --- a/gio/src/lib.rs +++ b/gio/src/lib.rs @@ -32,8 +32,9 @@ mod dbus; pub use self::dbus::*; mod dbus_connection; pub use self::dbus_connection::{ - ActionGroupExportId, FilterId, MenuModelExportId, RegistrationBuilder, RegistrationId, - SignalSubscriptionId, WatcherId, + ActionGroupExportId, DBusSignalRef, FilterId, MenuModelExportId, RegistrationBuilder, + RegistrationId, SignalSubscription, SignalSubscriptionId, SubscribedSignalStream, WatcherId, + WeakSignalSubscription, }; mod dbus_message; mod dbus_method_invocation;