1717use std:: {
1818 collections:: { btree_map, BTreeMap } ,
1919 fmt:: { self , Debug } ,
20- future:: Future ,
20+ future:: { ready , Future } ,
2121 pin:: Pin ,
2222 sync:: { Arc , Mutex as StdMutex , RwLock as StdRwLock , Weak } ,
2323} ;
@@ -88,7 +88,8 @@ use crate::{
8888 error:: { HttpError , HttpResult } ,
8989 event_cache:: EventCache ,
9090 event_handler:: {
91- EventHandler , EventHandlerDropGuard , EventHandlerHandle , EventHandlerStore , SyncEvent ,
91+ EventHandler , EventHandlerContext , EventHandlerDropGuard , EventHandlerHandle ,
92+ EventHandlerStore , ObservableEventHandler , SyncEvent ,
9293 } ,
9394 http_client:: HttpClient ,
9495 matrix_auth:: MatrixAuth ,
@@ -776,7 +777,7 @@ impl Client {
776777 /// ```
777778 pub fn add_event_handler < Ev , Ctx , H > ( & self , handler : H ) -> EventHandlerHandle
778779 where
779- Ev : SyncEvent + DeserializeOwned + Send + ' static ,
780+ Ev : SyncEvent + DeserializeOwned + SendOutsideWasm + ' static ,
780781 H : EventHandler < Ev , Ctx > ,
781782 {
782783 self . add_event_handler_impl ( handler, None )
@@ -798,12 +799,96 @@ impl Client {
798799 handler : H ,
799800 ) -> EventHandlerHandle
800801 where
801- Ev : SyncEvent + DeserializeOwned + Send + ' static ,
802+ Ev : SyncEvent + DeserializeOwned + SendOutsideWasm + ' static ,
802803 H : EventHandler < Ev , Ctx > ,
803804 {
804805 self . add_event_handler_impl ( handler, Some ( room_id. to_owned ( ) ) )
805806 }
806807
808+ /// Observe a specific event type.
809+ ///
810+ /// `Ev` represents the kind of event that will be observed. `Ctx`
811+ /// represents the context that will come with the event. It relies on the
812+ /// same mechanism as [`Self::add_event_handler`]. The main difference is
813+ /// that it returns an [`ObservableEventHandler`] and doesn't require a
814+ /// user-defined closure. It is possible to subscribe to the
815+ /// [`ObservableEventHandler`] to get an [`EventHandlerSubscriber`], which
816+ /// implements a [`Stream`]. The `Stream::Item` will be of type `(Ev,
817+ /// Ctx)`.
818+ ///
819+ /// # Example
820+ ///
821+ /// ```
822+ /// use futures_util::StreamExt as _;
823+ /// use matrix_sdk::{
824+ /// ruma::{events::room::message::SyncRoomMessageEvent, push::Action},
825+ /// Client, Room,
826+ /// };
827+ ///
828+ /// # async fn example(client: Client) {
829+ /// let observer =
830+ /// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
831+ ///
832+ /// let mut subscriber = observer.subscribe();
833+ ///
834+ /// let (message_event, (room, push_actions)) =
835+ /// subscriber.next().await.unwrap();
836+ /// # }
837+ /// ```
838+ ///
839+ /// [`EventHandlerSubscriber`]: crate::event_handler::EventHandlerSubscriber
840+ pub fn observe_events < Ev , Ctx > ( & self ) -> ObservableEventHandler < ( Ev , Ctx ) >
841+ where
842+ Ev : SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + ' static ,
843+ Ctx : EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + ' static ,
844+ {
845+ self . observe_room_events_impl ( None )
846+ }
847+
848+ /// Observe a specific room, and event type.
849+ ///
850+ /// This method works the same way as
851+ /// [`observe_events`][Self::observe_events], except that the observability
852+ /// will only be applied for events in the room with the specified ID.
853+ /// See that method for more details.
854+ pub fn observe_room_events < Ev , Ctx > (
855+ & self ,
856+ room_id : & RoomId ,
857+ ) -> ObservableEventHandler < ( Ev , Ctx ) >
858+ where
859+ Ev : SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + ' static ,
860+ Ctx : EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + ' static ,
861+ {
862+ self . observe_room_events_impl ( Some ( room_id. to_owned ( ) ) )
863+ }
864+
865+ /// Shared implementation for `Self::observe_events` and
866+ /// `Self::observe_room_events`.
867+ fn observe_room_events_impl < Ev , Ctx > (
868+ & self ,
869+ room_id : Option < OwnedRoomId > ,
870+ ) -> ObservableEventHandler < ( Ev , Ctx ) >
871+ where
872+ Ev : SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + ' static ,
873+ Ctx : EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + ' static ,
874+ {
875+ // The default value is `None`. It becomes `Some((Ev, Ctx))` once it has a
876+ // new value.
877+ let shared_observable = SharedObservable :: new ( None ) ;
878+
879+ ObservableEventHandler :: new (
880+ shared_observable. clone ( ) ,
881+ self . event_handler_drop_guard ( self . add_event_handler_impl (
882+ move |event : Ev , context : Ctx | {
883+ shared_observable. set ( Some ( ( event, context) ) ) ;
884+
885+ ready ( ( ) )
886+ } ,
887+ room_id,
888+ ) ) ,
889+ )
890+ }
891+
807892 /// Remove the event handler associated with the handle.
808893 ///
809894 /// Note that you **must not** call `remove_event_handler` from the
0 commit comments