Skip to content

Commit 69c32a3

Browse files
committed
Add receive_signal and receive_signal_parameters
Closes #1820
1 parent 61dbd05 commit 69c32a3

File tree

2 files changed

+159
-2
lines changed

2 files changed

+159
-2
lines changed

gio/src/dbus_connection.rs

Lines changed: 157 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@ use crate::{
66
ffi, ActionGroup, DBusConnection, DBusInterfaceInfo, DBusMessage, DBusMethodInvocation,
77
DBusSignalFlags, MenuModel,
88
};
9-
use glib::{prelude::*, translate::*, WeakRef};
9+
use futures_channel::mpsc;
10+
use futures_core::{FusedStream, Stream};
11+
use glib::{prelude::*, translate::*, variant::VariantTypeMismatchError, WeakRef};
12+
use pin_project_lite::pin_project;
1013

1114
pub trait DBusMethodCall: Sized {
1215
fn parse_call(
@@ -211,6 +214,77 @@ pub struct DBusSignalRef<'a> {
211214
pub parameters: &'a glib::Variant,
212215
}
213216

217+
pin_project! {
218+
// rustdoc-stripper-ignore-next
219+
/// A subscribed stream.
220+
///
221+
/// A stream which wraps an inner stream of type `S` while holding on to a
222+
/// subscription handle `H` to keep a subscription alive.
223+
#[derive(Debug)]
224+
#[must_use = "streams do nothing unless polled"]
225+
pub struct SubscribedSignalStream<H, S> {
226+
#[pin]
227+
stream: S,
228+
subscription: H,
229+
}
230+
}
231+
232+
impl<S> SubscribedSignalStream<SignalSubscription, S> {
233+
// rustdoc-stripper-ignore-next
234+
/// Downgrade the inner signal subscription to a weak one.
235+
///
236+
/// See [`SignalSubscription::downgrade`] and [`WeakSignalSubscription`].
237+
pub fn downgrade(self) -> SubscribedSignalStream<WeakSignalSubscription, S> {
238+
SubscribedSignalStream {
239+
subscription: self.subscription.downgrade(),
240+
stream: self.stream,
241+
}
242+
}
243+
}
244+
245+
impl<S> SubscribedSignalStream<WeakSignalSubscription, S> {
246+
// rustdoc-stripper-ignore-next
247+
/// Upgrade the inner signal subscription to a strong one.
248+
///
249+
/// See [`WeakSignalSubscription::upgrade`] and [`SignalSubscription`].
250+
pub fn downgrade(self) -> Option<SubscribedSignalStream<SignalSubscription, S>> {
251+
self.subscription
252+
.upgrade()
253+
.map(|subscription| SubscribedSignalStream {
254+
subscription,
255+
stream: self.stream,
256+
})
257+
}
258+
}
259+
260+
impl<H, S> Stream for SubscribedSignalStream<H, S>
261+
where
262+
S: Stream,
263+
{
264+
type Item = S::Item;
265+
266+
fn poll_next(
267+
self: std::pin::Pin<&mut Self>,
268+
cx: &mut std::task::Context<'_>,
269+
) -> std::task::Poll<Option<Self::Item>> {
270+
let this = self.project();
271+
this.stream.poll_next(cx)
272+
}
273+
274+
fn size_hint(&self) -> (usize, Option<usize>) {
275+
self.stream.size_hint()
276+
}
277+
}
278+
279+
impl<H, S> FusedStream for SubscribedSignalStream<H, S>
280+
where
281+
S: FusedStream,
282+
{
283+
fn is_terminated(&self) -> bool {
284+
self.stream.is_terminated()
285+
}
286+
}
287+
214288
// rustdoc-stripper-ignore-next
215289
/// Build a registered DBus object, by handling different parts of DBus.
216290
#[must_use = "The builder must be built to be used"]
@@ -651,4 +725,86 @@ impl DBusConnection {
651725
);
652726
}
653727
}
728+
729+
// rustdoc-stripper-ignore-next
730+
/// Subscribe to a D-Bus signal and receive signal emissions as a stream.
731+
///
732+
/// See [`Self::signal_subscribe`] for arguments. `map_signal` maps the
733+
/// received signal to the stream's element.
734+
///
735+
/// The returned stream holds a strong reference to this D-Bus connection,
736+
/// and unsubscribes from the signal when dropped.
737+
///
738+
/// To avoid reference cycles you may wish to downgrade the returned
739+
/// stream to hold only weak reference to the connection using
740+
/// [`SubscribedSignalStream::downgrade`].
741+
pub fn receive_signal<T: 'static, F: Fn(DBusSignalRef) -> T + 'static>(
742+
&self,
743+
sender: Option<&str>,
744+
interface_name: Option<&str>,
745+
member: Option<&str>,
746+
object_path: Option<&str>,
747+
arg0: Option<&str>,
748+
flags: DBusSignalFlags,
749+
map_signal: F,
750+
) -> SubscribedSignalStream<SignalSubscription, impl Stream<Item = T> + use<T, F>> {
751+
let (tx, rx) = mpsc::unbounded();
752+
let subscription = self.subscribe_to_signal(
753+
sender,
754+
interface_name,
755+
member,
756+
object_path,
757+
arg0,
758+
flags,
759+
move |signal| {
760+
// Just ignore send errors: if the receiver is dropped, the
761+
// signal subscription is dropped too, so the callback won't
762+
// be invoked anymore.
763+
let _ = tx.unbounded_send(map_signal(signal));
764+
},
765+
);
766+
SubscribedSignalStream {
767+
subscription,
768+
stream: rx,
769+
}
770+
}
771+
772+
// rustdoc-stripper-ignore-next
773+
/// Subscribe to a D-Bus signal and receive signal parameters as a stream.
774+
///
775+
/// See [`Self::signal_subscribe`] for arguments. Decode the parameters
776+
/// as type `T`, and propagate either the decoded parameters, or the type
777+
/// error if the signal parameters had a different type.
778+
///
779+
/// The returned stream holds a strong reference to this D-Bus connection,
780+
/// and unsubscribes from the signal when dropped.
781+
///
782+
/// To avoid reference cycles you may wish to downgrade the returned
783+
/// stream to hold only weak reference to the connection using
784+
/// [`SubscribedSignalStream::downgrade`].
785+
pub fn receive_signal_parameters<T>(
786+
&self,
787+
sender: Option<&str>,
788+
interface_name: Option<&str>,
789+
member: Option<&str>,
790+
object_path: Option<&str>,
791+
arg0: Option<&str>,
792+
flags: DBusSignalFlags,
793+
) -> SubscribedSignalStream<
794+
SignalSubscription,
795+
impl Stream<Item = Result<T, VariantTypeMismatchError>> + use<T>,
796+
>
797+
where
798+
T: FromVariant + 'static,
799+
{
800+
self.receive_signal(
801+
sender,
802+
interface_name,
803+
member,
804+
object_path,
805+
arg0,
806+
flags,
807+
|signal| signal.parameters.try_get(),
808+
)
809+
}
654810
}

gio/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ pub use self::dbus::*;
3333
mod dbus_connection;
3434
pub use self::dbus_connection::{
3535
ActionGroupExportId, DBusSignalRef, FilterId, MenuModelExportId, RegistrationBuilder,
36-
RegistrationId, SignalSubscription, SignalSubscriptionId, WatcherId, WeakSignalSubscription,
36+
RegistrationId, SignalSubscription, SignalSubscriptionId, SubscribedSignalStream, WatcherId,
37+
WeakSignalSubscription,
3738
};
3839
mod dbus_message;
3940
mod dbus_method_invocation;

0 commit comments

Comments
 (0)