Skip to content

Commit 9282073

Browse files
committed
Add receive_signal and receive_signal_parameters
Closes #1820
1 parent 61dbd05 commit 9282073

File tree

2 files changed

+164
-2
lines changed

2 files changed

+164
-2
lines changed

gio/src/dbus_connection.rs

Lines changed: 162 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,11 @@ use crate::{
66
ffi, ActionGroup, DBusConnection, DBusInterfaceInfo, DBusMessage, DBusMethodInvocation,
77
DBusSignalFlags, MenuModel,
88
};
9-
use glib::{prelude::*, translate::*, WeakRef};
9+
use futures_channel::mpsc;
10+
use futures_core::{FusedStream, Stream};
11+
use futures_util::SinkExt;
12+
use glib::{prelude::*, translate::*, variant::VariantTypeMismatchError, WeakRef};
13+
use pin_project_lite::pin_project;
1014

1115
pub trait DBusMethodCall: Sized {
1216
fn parse_call(
@@ -211,6 +215,77 @@ pub struct DBusSignalRef<'a> {
211215
pub parameters: &'a glib::Variant,
212216
}
213217

218+
pin_project! {
219+
// rustdoc-stripper-ignore-next
220+
/// A subscribed stream.
221+
///
222+
/// A stream which wraps an inner stream of type `S` while holding on to a
223+
/// subscription handle `H` to keep a subscription alive.
224+
#[derive(Debug)]
225+
#[must_use = "streams do nothing unless polled"]
226+
pub struct SubscribedSignalStream<H, S> {
227+
#[pin]
228+
stream: S,
229+
subscription: H,
230+
}
231+
}
232+
233+
impl<S> SubscribedSignalStream<SignalSubscription, S> {
234+
// rustdoc-stripper-ignore-next
235+
/// Downgrade the inner signal subscription to a weak one.
236+
///
237+
/// See [`SignalSubscription::downgrade`] and [`WeakSignalSubscription`].
238+
pub fn downgrade(self) -> SubscribedSignalStream<WeakSignalSubscription, S> {
239+
SubscribedSignalStream {
240+
subscription: self.subscription.downgrade(),
241+
stream: self.stream,
242+
}
243+
}
244+
}
245+
246+
impl<S> SubscribedSignalStream<WeakSignalSubscription, S> {
247+
// rustdoc-stripper-ignore-next
248+
/// Upgrade the inner signal subscription to a strong one.
249+
///
250+
/// See [`WeakSignalSubscription::upgrade`] and [`SignalSubscription`].
251+
pub fn downgrade(self) -> Option<SubscribedSignalStream<SignalSubscription, S>> {
252+
self.subscription
253+
.upgrade()
254+
.map(|subscription| SubscribedSignalStream {
255+
subscription,
256+
stream: self.stream,
257+
})
258+
}
259+
}
260+
261+
impl<H, S> Stream for SubscribedSignalStream<H, S>
262+
where
263+
S: Stream,
264+
{
265+
type Item = S::Item;
266+
267+
fn poll_next(
268+
self: std::pin::Pin<&mut Self>,
269+
cx: &mut std::task::Context<'_>,
270+
) -> std::task::Poll<Option<Self::Item>> {
271+
let this = self.project();
272+
this.stream.poll_next(cx)
273+
}
274+
275+
fn size_hint(&self) -> (usize, Option<usize>) {
276+
self.stream.size_hint()
277+
}
278+
}
279+
280+
impl<H, S> FusedStream for SubscribedSignalStream<H, S>
281+
where
282+
S: FusedStream,
283+
{
284+
fn is_terminated(&self) -> bool {
285+
self.stream.is_terminated()
286+
}
287+
}
288+
214289
// rustdoc-stripper-ignore-next
215290
/// Build a registered DBus object, by handling different parts of DBus.
216291
#[must_use = "The builder must be built to be used"]
@@ -651,4 +726,90 @@ impl DBusConnection {
651726
);
652727
}
653728
}
729+
730+
// rustdoc-stripper-ignore-next
731+
/// Subscribe to a D-Bus signal and receive signal emissions as a stream.
732+
///
733+
/// See [`Self::signal_subscribe`] for arguments. `map_signal` maps the
734+
/// received signal to the stream's element. If it returns `None` the
735+
/// signal emission is not propagated downstream.
736+
///
737+
/// The returned stream holds a strong reference to this D-Bus connection,
738+
/// and unsubscribes from the signal when dropped.
739+
///
740+
/// To avoid reference cycles you may wish to downgrade the returned
741+
/// stream to hold only weak reference to the connection using
742+
/// [`SubscribedSignalStream::downgrade`].
743+
pub fn receive_signal<T: Send + 'static, F: Fn(DBusSignalRef) -> Option<T> + 'static>(
744+
&self,
745+
sender: Option<&str>,
746+
interface_name: Option<&str>,
747+
member: Option<&str>,
748+
object_path: Option<&str>,
749+
arg0: Option<&str>,
750+
flags: DBusSignalFlags,
751+
map_signal: F,
752+
) -> SubscribedSignalStream<SignalSubscription, impl Stream<Item = T> + use<T, F>> {
753+
let (tx, rx) = mpsc::channel(1);
754+
let subscription = self.subscribe_to_signal(
755+
sender,
756+
interface_name,
757+
member,
758+
object_path,
759+
arg0,
760+
flags,
761+
move |signal| {
762+
let mut tx = tx.clone();
763+
if let Some(item) = map_signal(signal) {
764+
glib::spawn_future(async move {
765+
// We really don't care for the error here.
766+
let _: Result<(), mpsc::SendError> = tx.send(item).await;
767+
});
768+
}
769+
},
770+
);
771+
SubscribedSignalStream {
772+
subscription,
773+
stream: rx,
774+
}
775+
}
776+
777+
// rustdoc-stripper-ignore-next
778+
/// Subscribe to a D-Bus signal and receive signal parameters as a stream.
779+
///
780+
/// See [`Self::signal_subscribe`] for arguments. Decode the parameters
781+
/// as type `T`, and propagate either the decoded parameters, or the type
782+
/// error if the signal parameters had a different type.
783+
///
784+
/// The returned stream holds a strong reference to this D-Bus connection,
785+
/// and unsubscribes from the signal when dropped.
786+
///
787+
/// To avoid reference cycles you may wish to downgrade the returned
788+
/// stream to hold only weak reference to the connection using
789+
/// [`SubscribedSignalStream::downgrade`].
790+
pub fn receive_signal_parameters<T>(
791+
&self,
792+
sender: Option<&str>,
793+
interface_name: Option<&str>,
794+
member: Option<&str>,
795+
object_path: Option<&str>,
796+
arg0: Option<&str>,
797+
flags: DBusSignalFlags,
798+
) -> SubscribedSignalStream<
799+
SignalSubscription,
800+
impl Stream<Item = Result<T, VariantTypeMismatchError>> + use<T>,
801+
>
802+
where
803+
T: FromVariant + Send + 'static,
804+
{
805+
self.receive_signal(
806+
sender,
807+
interface_name,
808+
member,
809+
object_path,
810+
arg0,
811+
flags,
812+
|signal| Some(signal.parameters.try_get()),
813+
)
814+
}
654815
}

gio/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ pub use self::dbus::*;
3333
mod dbus_connection;
3434
pub use self::dbus_connection::{
3535
ActionGroupExportId, DBusSignalRef, FilterId, MenuModelExportId, RegistrationBuilder,
36-
RegistrationId, SignalSubscription, SignalSubscriptionId, WatcherId, WeakSignalSubscription,
36+
RegistrationId, SignalSubscription, SignalSubscriptionId, SubscribedSignalStream, WatcherId,
37+
WeakSignalSubscription,
3738
};
3839
mod dbus_message;
3940
mod dbus_method_invocation;

0 commit comments

Comments
 (0)