@@ -6,7 +6,10 @@ use crate::{
66 ffi, ActionGroup , DBusConnection , DBusInterfaceInfo , DBusMessage , DBusMethodInvocation ,
77 DBusSignalFlags , MenuModel ,
88} ;
9- use glib:: { prelude:: * , translate:: * , WeakRef } ;
9+ use futures_channel:: mpsc;
10+ use futures_core:: { FusedStream , Stream } ;
11+ use glib:: { prelude:: * , translate:: * , variant:: VariantTypeMismatchError , WeakRef } ;
12+ use pin_project_lite:: pin_project;
1013
1114pub trait DBusMethodCall : Sized {
1215 fn parse_call (
@@ -211,6 +214,77 @@ pub struct DBusSignalRef<'a> {
211214 pub parameters : & ' a glib:: Variant ,
212215}
213216
217+ pin_project ! {
218+ // rustdoc-stripper-ignore-next
219+ /// A subscribed stream.
220+ ///
221+ /// A stream which wraps an inner stream of type `S` while holding on to a
222+ /// subscription handle `H` to keep a subscription alive.
223+ #[ derive( Debug ) ]
224+ #[ must_use = "streams do nothing unless polled" ]
225+ pub struct SubscribedSignalStream <H , S > {
226+ #[ pin]
227+ stream: S ,
228+ subscription: H ,
229+ }
230+ }
231+
232+ impl < S > SubscribedSignalStream < SignalSubscription , S > {
233+ // rustdoc-stripper-ignore-next
234+ /// Downgrade the inner signal subscription to a weak one.
235+ ///
236+ /// See [`SignalSubscription::downgrade`] and [`WeakSignalSubscription`].
237+ pub fn downgrade ( self ) -> SubscribedSignalStream < WeakSignalSubscription , S > {
238+ SubscribedSignalStream {
239+ subscription : self . subscription . downgrade ( ) ,
240+ stream : self . stream ,
241+ }
242+ }
243+ }
244+
245+ impl < S > SubscribedSignalStream < WeakSignalSubscription , S > {
246+ // rustdoc-stripper-ignore-next
247+ /// Upgrade the inner signal subscription to a strong one.
248+ ///
249+ /// See [`WeakSignalSubscription::upgrade`] and [`SignalSubscription`].
250+ pub fn downgrade ( self ) -> Option < SubscribedSignalStream < SignalSubscription , S > > {
251+ self . subscription
252+ . upgrade ( )
253+ . map ( |subscription| SubscribedSignalStream {
254+ subscription,
255+ stream : self . stream ,
256+ } )
257+ }
258+ }
259+
260+ impl < H , S > Stream for SubscribedSignalStream < H , S >
261+ where
262+ S : Stream ,
263+ {
264+ type Item = S :: Item ;
265+
266+ fn poll_next (
267+ self : std:: pin:: Pin < & mut Self > ,
268+ cx : & mut std:: task:: Context < ' _ > ,
269+ ) -> std:: task:: Poll < Option < Self :: Item > > {
270+ let this = self . project ( ) ;
271+ this. stream . poll_next ( cx)
272+ }
273+
274+ fn size_hint ( & self ) -> ( usize , Option < usize > ) {
275+ self . stream . size_hint ( )
276+ }
277+ }
278+
279+ impl < H , S > FusedStream for SubscribedSignalStream < H , S >
280+ where
281+ S : FusedStream ,
282+ {
283+ fn is_terminated ( & self ) -> bool {
284+ self . stream . is_terminated ( )
285+ }
286+ }
287+
214288// rustdoc-stripper-ignore-next
215289/// Build a registered DBus object, by handling different parts of DBus.
216290#[ must_use = "The builder must be built to be used" ]
@@ -651,4 +725,86 @@ impl DBusConnection {
651725 ) ;
652726 }
653727 }
728+
729+ // rustdoc-stripper-ignore-next
730+ /// Subscribe to a D-Bus signal and receive signal emissions as a stream.
731+ ///
732+ /// See [`Self::signal_subscribe`] for arguments. `map_signal` maps the
733+ /// received signal to the stream's element.
734+ ///
735+ /// The returned stream holds a strong reference to this D-Bus connection,
736+ /// and unsubscribes from the signal when dropped. To avoid reference cycles
737+ /// you may wish to downgrade the returned stream to hold only weak
738+ /// reference to the connection using [`SubscribedSignalStream::downgrade`].
739+ ///
740+ /// After invoking `map_signal` the stream threads incoming signals through
741+ /// an unbounded channel. Hence, memory consumption will keep increasing
742+ /// as long as the stream consumer does not keep up with signal emissions.
743+ /// If you need to perform expensive processing in response to signals it's
744+ /// therefore recommended to insert an extra buffering and if the buffer
745+ /// overruns, either fail drop the entire stream, or drop individual signal
746+ /// emissions until the buffer has space again.
747+ pub fn receive_signal < T : ' static , F : Fn ( DBusSignalRef ) -> T + ' static > (
748+ & self ,
749+ sender : Option < & str > ,
750+ interface_name : Option < & str > ,
751+ member : Option < & str > ,
752+ object_path : Option < & str > ,
753+ arg0 : Option < & str > ,
754+ flags : DBusSignalFlags ,
755+ map_signal : F ,
756+ ) -> SubscribedSignalStream < SignalSubscription , impl Stream < Item = T > + use < T , F > > {
757+ let ( tx, rx) = mpsc:: unbounded ( ) ;
758+ let subscription = self . subscribe_to_signal (
759+ sender,
760+ interface_name,
761+ member,
762+ object_path,
763+ arg0,
764+ flags,
765+ move |signal| {
766+ // Just ignore send errors: if the receiver is dropped, the
767+ // signal subscription is dropped too, so the callback won't
768+ // be invoked anymore.
769+ let _ = tx. unbounded_send ( map_signal ( signal) ) ;
770+ } ,
771+ ) ;
772+ SubscribedSignalStream {
773+ subscription,
774+ stream : rx,
775+ }
776+ }
777+
778+ // rustdoc-stripper-ignore-next
779+ /// Subscribe to a D-Bus signal and receive signal parameters as a stream.
780+ ///
781+ /// Like [`Self::receive_signal`] (which see for more information), but
782+ /// automatically decodes the emitted signal parameters to type `T`.
783+ /// If decoding fails the corresponding variant type error is sent
784+ /// downstream.
785+ pub fn receive_signal_parameters < T > (
786+ & self ,
787+ sender : Option < & str > ,
788+ interface_name : Option < & str > ,
789+ member : Option < & str > ,
790+ object_path : Option < & str > ,
791+ arg0 : Option < & str > ,
792+ flags : DBusSignalFlags ,
793+ ) -> SubscribedSignalStream <
794+ SignalSubscription ,
795+ impl Stream < Item = Result < T , VariantTypeMismatchError > > + use < T > ,
796+ >
797+ where
798+ T : FromVariant + ' static ,
799+ {
800+ self . receive_signal (
801+ sender,
802+ interface_name,
803+ member,
804+ object_path,
805+ arg0,
806+ flags,
807+ |signal| signal. parameters . try_get ( ) ,
808+ )
809+ }
654810}
0 commit comments