Skip to content

Commit c31cb16

Browse files
committed
Add receive_signal and receive_signal_parameters
Closes #1820
1 parent 61dbd05 commit c31cb16

File tree

2 files changed

+162
-2
lines changed

2 files changed

+162
-2
lines changed

gio/src/dbus_connection.rs

Lines changed: 160 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@ use crate::{
66
ffi, ActionGroup, DBusConnection, DBusInterfaceInfo, DBusMessage, DBusMethodInvocation,
77
DBusSignalFlags, MenuModel,
88
};
9-
use glib::{prelude::*, translate::*, WeakRef};
9+
use futures_channel::mpsc;
10+
use futures_core::{FusedStream, Stream};
11+
use glib::{prelude::*, translate::*, variant::VariantTypeMismatchError, WeakRef};
12+
use pin_project_lite::pin_project;
1013

1114
pub trait DBusMethodCall: Sized {
1215
fn parse_call(
@@ -211,6 +214,77 @@ pub struct DBusSignalRef<'a> {
211214
pub parameters: &'a glib::Variant,
212215
}
213216

217+
pin_project! {
218+
// rustdoc-stripper-ignore-next
219+
/// A subscribed stream.
220+
///
221+
/// A stream which wraps an inner stream of type `S` while holding on to a
222+
/// subscription handle `H` to keep a subscription alive.
223+
#[derive(Debug)]
224+
#[must_use = "streams do nothing unless polled"]
225+
pub struct SubscribedSignalStream<H, S> {
226+
#[pin]
227+
stream: S,
228+
subscription: H,
229+
}
230+
}
231+
232+
impl<S> SubscribedSignalStream<SignalSubscription, S> {
233+
// rustdoc-stripper-ignore-next
234+
/// Downgrade the inner signal subscription to a weak one.
235+
///
236+
/// See [`SignalSubscription::downgrade`] and [`WeakSignalSubscription`].
237+
pub fn downgrade(self) -> SubscribedSignalStream<WeakSignalSubscription, S> {
238+
SubscribedSignalStream {
239+
subscription: self.subscription.downgrade(),
240+
stream: self.stream,
241+
}
242+
}
243+
}
244+
245+
impl<S> SubscribedSignalStream<WeakSignalSubscription, S> {
246+
// rustdoc-stripper-ignore-next
247+
/// Upgrade the inner signal subscription to a strong one.
248+
///
249+
/// See [`WeakSignalSubscription::upgrade`] and [`SignalSubscription`].
250+
pub fn downgrade(self) -> Option<SubscribedSignalStream<SignalSubscription, S>> {
251+
self.subscription
252+
.upgrade()
253+
.map(|subscription| SubscribedSignalStream {
254+
subscription,
255+
stream: self.stream,
256+
})
257+
}
258+
}
259+
260+
impl<H, S> Stream for SubscribedSignalStream<H, S>
261+
where
262+
S: Stream,
263+
{
264+
type Item = S::Item;
265+
266+
fn poll_next(
267+
self: std::pin::Pin<&mut Self>,
268+
cx: &mut std::task::Context<'_>,
269+
) -> std::task::Poll<Option<Self::Item>> {
270+
let this = self.project();
271+
this.stream.poll_next(cx)
272+
}
273+
274+
fn size_hint(&self) -> (usize, Option<usize>) {
275+
self.stream.size_hint()
276+
}
277+
}
278+
279+
impl<H, S> FusedStream for SubscribedSignalStream<H, S>
280+
where
281+
S: FusedStream,
282+
{
283+
fn is_terminated(&self) -> bool {
284+
self.stream.is_terminated()
285+
}
286+
}
287+
214288
// rustdoc-stripper-ignore-next
215289
/// Build a registered DBus object, by handling different parts of DBus.
216290
#[must_use = "The builder must be built to be used"]
@@ -651,4 +725,89 @@ impl DBusConnection {
651725
);
652726
}
653727
}
728+
729+
// rustdoc-stripper-ignore-next
730+
/// Subscribe to a D-Bus signal and receive signal emissions as a stream.
731+
///
732+
/// See [`Self::signal_subscribe`] for arguments. `map_signal` maps the
733+
/// received signal to the stream's element. If it returns `None` the
734+
/// signal emission is not propagated downstream.
735+
///
736+
/// The returned stream holds a strong reference to this D-Bus connection,
737+
/// and unsubscribes from the signal when dropped.
738+
///
739+
/// To avoid reference cycles you may wish to downgrade the returned
740+
/// stream to hold only weak reference to the connection using
741+
/// [`SubscribedSignalStream::downgrade`].
742+
pub fn receive_signal<T: 'static, F: Fn(DBusSignalRef) -> Option<T> + 'static>(
743+
&self,
744+
sender: Option<&str>,
745+
interface_name: Option<&str>,
746+
member: Option<&str>,
747+
object_path: Option<&str>,
748+
arg0: Option<&str>,
749+
flags: DBusSignalFlags,
750+
map_signal: F,
751+
) -> SubscribedSignalStream<SignalSubscription, impl Stream<Item = T> + use<T, F>> {
752+
let (tx, rx) = mpsc::unbounded();
753+
let subscription = self.subscribe_to_signal(
754+
sender,
755+
interface_name,
756+
member,
757+
object_path,
758+
arg0,
759+
flags,
760+
move |signal| {
761+
if let Some(item) = map_signal(signal) {
762+
// Just ignore send errors: if the receiver is dropped, the
763+
// signal subscription is dropped too, so the callback won't
764+
// be invoked anymore.
765+
let _ = tx.unbounded_send(item);
766+
}
767+
},
768+
);
769+
SubscribedSignalStream {
770+
subscription,
771+
stream: rx,
772+
}
773+
}
774+
775+
// rustdoc-stripper-ignore-next
776+
/// Subscribe to a D-Bus signal and receive signal parameters as a stream.
777+
///
778+
/// See [`Self::signal_subscribe`] for arguments. Decode the parameters
779+
/// as type `T`, and propagate either the decoded parameters, or the type
780+
/// error if the signal parameters had a different type.
781+
///
782+
/// The returned stream holds a strong reference to this D-Bus connection,
783+
/// and unsubscribes from the signal when dropped.
784+
///
785+
/// To avoid reference cycles you may wish to downgrade the returned
786+
/// stream to hold only weak reference to the connection using
787+
/// [`SubscribedSignalStream::downgrade`].
788+
pub fn receive_signal_parameters<T>(
789+
&self,
790+
sender: Option<&str>,
791+
interface_name: Option<&str>,
792+
member: Option<&str>,
793+
object_path: Option<&str>,
794+
arg0: Option<&str>,
795+
flags: DBusSignalFlags,
796+
) -> SubscribedSignalStream<
797+
SignalSubscription,
798+
impl Stream<Item = Result<T, VariantTypeMismatchError>> + use<T>,
799+
>
800+
where
801+
T: FromVariant + 'static,
802+
{
803+
self.receive_signal(
804+
sender,
805+
interface_name,
806+
member,
807+
object_path,
808+
arg0,
809+
flags,
810+
|signal| Some(signal.parameters.try_get()),
811+
)
812+
}
654813
}

gio/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ pub use self::dbus::*;
3333
mod dbus_connection;
3434
pub use self::dbus_connection::{
3535
ActionGroupExportId, DBusSignalRef, FilterId, MenuModelExportId, RegistrationBuilder,
36-
RegistrationId, SignalSubscription, SignalSubscriptionId, WatcherId, WeakSignalSubscription,
36+
RegistrationId, SignalSubscription, SignalSubscriptionId, SubscribedSignalStream, WatcherId,
37+
WeakSignalSubscription,
3738
};
3839
mod dbus_message;
3940
mod dbus_method_invocation;

0 commit comments

Comments
 (0)