Skip to content

Commit 0913fcb

Browse files
committed
Add receive_signal and receive_signal_parameters
Closes #1820
1 parent 61dbd05 commit 0913fcb

File tree

2 files changed

+199
-1
lines changed

2 files changed

+199
-1
lines changed

gio/src/dbus_connection.rs

Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,11 @@ use crate::{
66
ffi, ActionGroup, DBusConnection, DBusInterfaceInfo, DBusMessage, DBusMethodInvocation,
77
DBusSignalFlags, MenuModel,
88
};
9+
use futures_channel::mpsc;
10+
use futures_core::{FusedStream, Stream};
11+
use futures_util::SinkExt;
912
use glib::{prelude::*, translate::*, WeakRef};
13+
use pin_project_lite::pin_project;
1014

1115
pub trait DBusMethodCall: Sized {
1216
fn parse_call(
@@ -211,6 +215,112 @@ pub struct DBusSignalRef<'a> {
211215
pub parameters: &'a glib::Variant,
212216
}
213217

218+
impl DBusSignalRef<'_> {
219+
pub fn to_owned(&self) -> OwnedDBusSignal {
220+
OwnedDBusSignal {
221+
connection: self.connection.clone(),
222+
sender_name: self.sender_name.to_owned(),
223+
object_path: self.object_path.to_owned(),
224+
interface_name: self.interface_name.to_owned(),
225+
signal_name: self.signal_name.to_owned(),
226+
parameters: self.parameters.clone(),
227+
}
228+
}
229+
}
230+
231+
#[derive(Debug, Clone)]
232+
pub struct OwnedDBusSignal {
233+
// rustdoc-stripper-ignore-next
234+
/// The connection the signal was emitted on.
235+
pub connection: DBusConnection,
236+
// rustdoc-stripper-ignore-next
237+
/// The bus name of the sender which emitted the signal.
238+
pub sender_name: String,
239+
// rustdoc-stripper-ignore-next
240+
/// The path of the object on `sender` the signal was emitted from.
241+
pub object_path: String,
242+
// rustdoc-stripper-ignore-next
243+
/// The interface the signal belongs to.
244+
pub interface_name: String,
245+
// rustdoc-stripper-ignore-next
246+
/// The name of the emitted signal.
247+
pub signal_name: String,
248+
// rustdoc-stripper-ignore-next
249+
/// Parameters the signal was emitted with.
250+
pub parameters: glib::Variant,
251+
}
252+
253+
pin_project! {
254+
// rustdoc-stripper-ignore-next
255+
/// A subscribed stream.
256+
///
257+
/// A stream which wraps an inner stream of type `S` while holding on to a
258+
/// subscription handle `H` to keep a subscription alive.
259+
#[derive(Debug)]
260+
#[must_use = "streams do nothing unless polled"]
261+
pub struct SubscribedSignalStream<H, S> {
262+
#[pin]
263+
stream: S,
264+
subscription: H,
265+
}
266+
}
267+
268+
impl<S> SubscribedSignalStream<SignalSubscription, S> {
269+
// rustdoc-stripper-ignore-next
270+
/// Downgrade the inner signal subscription to a weak one.
271+
///
272+
/// See [`SignalSubscription::downgrade`] and [`WeakSignalSubscription`].
273+
pub fn downgrade(self) -> SubscribedSignalStream<WeakSignalSubscription, S> {
274+
SubscribedSignalStream {
275+
subscription: self.subscription.downgrade(),
276+
stream: self.stream,
277+
}
278+
}
279+
}
280+
281+
impl<S> SubscribedSignalStream<WeakSignalSubscription, S> {
282+
// rustdoc-stripper-ignore-next
283+
/// Upgrade the inner signal subscription to a strong one.
284+
///
285+
/// See [`WeakSignalSubscription::upgrade`] and [`SignalSubscription`].
286+
pub fn downgrade(self) -> Option<SubscribedSignalStream<SignalSubscription, S>> {
287+
self.subscription
288+
.upgrade()
289+
.map(|subscription| SubscribedSignalStream {
290+
subscription,
291+
stream: self.stream,
292+
})
293+
}
294+
}
295+
296+
impl<H, S> Stream for SubscribedSignalStream<H, S>
297+
where
298+
S: Stream,
299+
{
300+
type Item = S::Item;
301+
302+
fn poll_next(
303+
self: std::pin::Pin<&mut Self>,
304+
cx: &mut std::task::Context<'_>,
305+
) -> std::task::Poll<Option<Self::Item>> {
306+
let this = self.project();
307+
this.stream.poll_next(cx)
308+
}
309+
310+
fn size_hint(&self) -> (usize, Option<usize>) {
311+
self.stream.size_hint()
312+
}
313+
}
314+
315+
impl<H, S> FusedStream for SubscribedSignalStream<H, S>
316+
where
317+
S: FusedStream,
318+
{
319+
fn is_terminated(&self) -> bool {
320+
self.stream.is_terminated()
321+
}
322+
}
323+
214324
// rustdoc-stripper-ignore-next
215325
/// Build a registered DBus object, by handling different parts of DBus.
216326
#[must_use = "The builder must be built to be used"]
@@ -651,4 +761,91 @@ impl DBusConnection {
651761
);
652762
}
653763
}
764+
765+
// rustdoc-stripper-ignore-next
766+
/// Subscribe to a D-Bus signal and receive signal emissions as a stream.
767+
///
768+
/// See [`Self::signal_subscribe`] for arguments.
769+
///
770+
/// The returned stream holds a strong reference to this D-Bus connection,
771+
/// and unsubscribes from the signal when dropped.
772+
///
773+
/// To avoid reference cycles you may wish to downgrade the returned
774+
/// stream to hold only weak reference to the connection using
775+
/// [`SubscribedSignalStream::downgrade`].
776+
pub fn receive_signal(
777+
&self,
778+
sender: Option<&str>,
779+
interface_name: Option<&str>,
780+
member: Option<&str>,
781+
object_path: Option<&str>,
782+
arg0: Option<&str>,
783+
flags: DBusSignalFlags,
784+
) -> SubscribedSignalStream<SignalSubscription, impl Stream<Item = OwnedDBusSignal> + use<>>
785+
{
786+
let (tx, rx) = mpsc::channel(1);
787+
let subscription = self.subscribe_to_signal(
788+
sender,
789+
interface_name,
790+
member,
791+
object_path,
792+
arg0,
793+
flags,
794+
move |signal| {
795+
let mut tx = tx.clone();
796+
let signal = signal.to_owned();
797+
glib::spawn_future(async move {
798+
// We really don't care for the error here.
799+
let _: Result<(), mpsc::SendError> = tx.send(signal).await;
800+
});
801+
},
802+
);
803+
SubscribedSignalStream {
804+
subscription,
805+
stream: rx,
806+
}
807+
}
808+
809+
// rustdoc-stripper-ignore-next
810+
/// Subscribe to a D-Bus signal and receive its parameters as a stream.
811+
///
812+
/// See [`Self::signal_subscribe`] for arguments.
813+
///
814+
/// The returned stream holds a strong reference to this D-Bus connection,
815+
/// and unsubscribes from the signal when dropped.
816+
///
817+
/// To avoid reference cycles you may wish to downgrade the returned
818+
/// stream to hold only weak reference to the connection using
819+
/// [`SubscribedSignalStream::downgrade`].
820+
pub fn receive_signal_parameters(
821+
&self,
822+
sender: Option<&str>,
823+
interface_name: Option<&str>,
824+
member: Option<&str>,
825+
object_path: Option<&str>,
826+
arg0: Option<&str>,
827+
flags: DBusSignalFlags,
828+
) -> SubscribedSignalStream<SignalSubscription, impl Stream<Item = glib::Variant> + use<>> {
829+
let (tx, rx) = mpsc::channel(1);
830+
let subscription = self.subscribe_to_signal(
831+
sender,
832+
interface_name,
833+
member,
834+
object_path,
835+
arg0,
836+
flags,
837+
move |signal| {
838+
let mut tx = tx.clone();
839+
let parameters = signal.parameters.clone();
840+
glib::spawn_future(async move {
841+
// We really don't care for the error here.
842+
let _: Result<(), mpsc::SendError> = tx.send(parameters).await;
843+
});
844+
},
845+
);
846+
SubscribedSignalStream {
847+
subscription,
848+
stream: rx,
849+
}
850+
}
654851
}

gio/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ pub use self::dbus::*;
3333
mod dbus_connection;
3434
pub use self::dbus_connection::{
3535
ActionGroupExportId, DBusSignalRef, FilterId, MenuModelExportId, RegistrationBuilder,
36-
RegistrationId, SignalSubscription, SignalSubscriptionId, WatcherId, WeakSignalSubscription,
36+
RegistrationId, SignalSubscription, SignalSubscriptionId, SubscribedSignalStream, WatcherId,
37+
WeakSignalSubscription,
3738
};
3839
mod dbus_message;
3940
mod dbus_method_invocation;

0 commit comments

Comments
 (0)