Skip to content

Commit f635475

Browse files
committed
Add receive_signal and receive_signal_parameters
Closes #1820
1 parent 61dbd05 commit f635475

File tree

2 files changed

+159
-2
lines changed

2 files changed

+159
-2
lines changed

gio/src/dbus_connection.rs

Lines changed: 157 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@ use crate::{
66
ffi, ActionGroup, DBusConnection, DBusInterfaceInfo, DBusMessage, DBusMethodInvocation,
77
DBusSignalFlags, MenuModel,
88
};
9-
use glib::{prelude::*, translate::*, WeakRef};
9+
use futures_channel::mpsc;
10+
use futures_core::{FusedStream, Stream};
11+
use glib::{prelude::*, translate::*, variant::VariantTypeMismatchError, WeakRef};
12+
use pin_project_lite::pin_project;
1013

1114
pub trait DBusMethodCall: Sized {
1215
fn parse_call(
@@ -211,6 +214,77 @@ pub struct DBusSignalRef<'a> {
211214
pub parameters: &'a glib::Variant,
212215
}
213216

217+
pin_project! {
218+
// rustdoc-stripper-ignore-next
219+
/// A subscribed stream.
220+
///
221+
/// A stream which wraps an inner stream of type `S` while holding on to a
222+
/// subscription handle `H` to keep a subscription alive.
223+
#[derive(Debug)]
224+
#[must_use = "streams do nothing unless polled"]
225+
pub struct SubscribedSignalStream<H, S> {
226+
#[pin]
227+
stream: S,
228+
subscription: H,
229+
}
230+
}
231+
232+
impl<S> SubscribedSignalStream<SignalSubscription, S> {
233+
// rustdoc-stripper-ignore-next
234+
/// Downgrade the inner signal subscription to a weak one.
235+
///
236+
/// See [`SignalSubscription::downgrade`] and [`WeakSignalSubscription`].
237+
pub fn downgrade(self) -> SubscribedSignalStream<WeakSignalSubscription, S> {
238+
SubscribedSignalStream {
239+
subscription: self.subscription.downgrade(),
240+
stream: self.stream,
241+
}
242+
}
243+
}
244+
245+
impl<S> SubscribedSignalStream<WeakSignalSubscription, S> {
246+
// rustdoc-stripper-ignore-next
247+
/// Upgrade the inner signal subscription to a strong one.
248+
///
249+
/// See [`WeakSignalSubscription::upgrade`] and [`SignalSubscription`].
250+
pub fn downgrade(self) -> Option<SubscribedSignalStream<SignalSubscription, S>> {
251+
self.subscription
252+
.upgrade()
253+
.map(|subscription| SubscribedSignalStream {
254+
subscription,
255+
stream: self.stream,
256+
})
257+
}
258+
}
259+
260+
impl<H, S> Stream for SubscribedSignalStream<H, S>
261+
where
262+
S: Stream,
263+
{
264+
type Item = S::Item;
265+
266+
fn poll_next(
267+
self: std::pin::Pin<&mut Self>,
268+
cx: &mut std::task::Context<'_>,
269+
) -> std::task::Poll<Option<Self::Item>> {
270+
let this = self.project();
271+
this.stream.poll_next(cx)
272+
}
273+
274+
fn size_hint(&self) -> (usize, Option<usize>) {
275+
self.stream.size_hint()
276+
}
277+
}
278+
279+
impl<H, S> FusedStream for SubscribedSignalStream<H, S>
280+
where
281+
S: FusedStream,
282+
{
283+
fn is_terminated(&self) -> bool {
284+
self.stream.is_terminated()
285+
}
286+
}
287+
214288
// rustdoc-stripper-ignore-next
215289
/// Build a registered DBus object, by handling different parts of DBus.
216290
#[must_use = "The builder must be built to be used"]
@@ -651,4 +725,86 @@ impl DBusConnection {
651725
);
652726
}
653727
}
728+
729+
// rustdoc-stripper-ignore-next
730+
/// Subscribe to a D-Bus signal and receive signal emissions as a stream.
731+
///
732+
/// See [`Self::signal_subscribe`] for arguments. `map_signal` maps the
733+
/// received signal to the stream's element.
734+
///
735+
/// The returned stream holds a strong reference to this D-Bus connection,
736+
/// and unsubscribes from the signal when dropped. To avoid reference cycles
737+
/// you may wish to downgrade the returned stream to hold only weak
738+
/// reference to the connection using [`SubscribedSignalStream::downgrade`].
739+
///
740+
/// After invoking `map_signal` the stream threads incoming signals through
741+
/// an unbounded channel. Hence, memory consumption will keep increasing
742+
/// as long as the stream consumer does not keep up with signal emissions.
743+
/// If you need to perform expensive processing in response to signals it's
744+
/// therefore recommended to insert an extra buffering and if the buffer
745+
/// overruns, either fail drop the entire stream, or drop individual signal
746+
/// emissions until the buffer has space again.
747+
pub fn receive_signal<T: 'static, F: Fn(DBusSignalRef) -> T + 'static>(
748+
&self,
749+
sender: Option<&str>,
750+
interface_name: Option<&str>,
751+
member: Option<&str>,
752+
object_path: Option<&str>,
753+
arg0: Option<&str>,
754+
flags: DBusSignalFlags,
755+
map_signal: F,
756+
) -> SubscribedSignalStream<SignalSubscription, impl Stream<Item = T> + use<T, F>> {
757+
let (tx, rx) = mpsc::unbounded();
758+
let subscription = self.subscribe_to_signal(
759+
sender,
760+
interface_name,
761+
member,
762+
object_path,
763+
arg0,
764+
flags,
765+
move |signal| {
766+
// Just ignore send errors: if the receiver is dropped, the
767+
// signal subscription is dropped too, so the callback won't
768+
// be invoked anymore.
769+
let _ = tx.unbounded_send(map_signal(signal));
770+
},
771+
);
772+
SubscribedSignalStream {
773+
subscription,
774+
stream: rx,
775+
}
776+
}
777+
778+
// rustdoc-stripper-ignore-next
779+
/// Subscribe to a D-Bus signal and receive signal parameters as a stream.
780+
///
781+
/// Like [`Self::receive_signal`] (which see for more information), but
782+
/// automatically decodes the emitted signal parameters to type `T`.
783+
/// If decoding fails the corresponding variant type error is sent
784+
/// downstream.
785+
pub fn receive_signal_parameters<T>(
786+
&self,
787+
sender: Option<&str>,
788+
interface_name: Option<&str>,
789+
member: Option<&str>,
790+
object_path: Option<&str>,
791+
arg0: Option<&str>,
792+
flags: DBusSignalFlags,
793+
) -> SubscribedSignalStream<
794+
SignalSubscription,
795+
impl Stream<Item = Result<T, VariantTypeMismatchError>> + use<T>,
796+
>
797+
where
798+
T: FromVariant + 'static,
799+
{
800+
self.receive_signal(
801+
sender,
802+
interface_name,
803+
member,
804+
object_path,
805+
arg0,
806+
flags,
807+
|signal| signal.parameters.try_get(),
808+
)
809+
}
654810
}

gio/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ pub use self::dbus::*;
3333
mod dbus_connection;
3434
pub use self::dbus_connection::{
3535
ActionGroupExportId, DBusSignalRef, FilterId, MenuModelExportId, RegistrationBuilder,
36-
RegistrationId, SignalSubscription, SignalSubscriptionId, WatcherId, WeakSignalSubscription,
36+
RegistrationId, SignalSubscription, SignalSubscriptionId, SubscribedSignalStream, WatcherId,
37+
WeakSignalSubscription,
3738
};
3839
mod dbus_message;
3940
mod dbus_method_invocation;

0 commit comments

Comments
 (0)