@@ -29,16 +29,22 @@ use std::{
2929 sync:: { Arc , RwLock as StdRwLock } ,
3030} ;
3131
32+ #[ cfg( not( target_arch = "wasm32" ) ) ]
33+ use eyeball_im:: { Vector , VectorDiff } ;
34+ #[ cfg( not( target_arch = "wasm32" ) ) ]
35+ use futures_util:: Stream ;
3236use once_cell:: sync:: OnceCell ;
3337
3438#[ cfg( any( test, feature = "testing" ) ) ]
3539#[ macro_use]
3640pub mod integration_tests;
41+ mod observable_map;
3742mod traits;
3843
3944#[ cfg( feature = "e2e-encryption" ) ]
4045use matrix_sdk_crypto:: store:: { DynCryptoStore , IntoCryptoStore } ;
4146pub use matrix_sdk_store_encryption:: Error as StoreEncryptionError ;
47+ use observable_map:: ObservableMap ;
4248use ruma:: {
4349 events:: {
4450 presence:: PresenceEvent ,
@@ -139,7 +145,7 @@ pub(crate) struct Store {
139145 /// The current sync token that should be used for the next sync call.
140146 pub ( super ) sync_token : Arc < RwLock < Option < String > > > ,
141147 /// All rooms the store knows about.
142- rooms : Arc < StdRwLock < BTreeMap < OwnedRoomId , Room > > > ,
148+ rooms : Arc < StdRwLock < ObservableMap < OwnedRoomId , Room > > > ,
143149 /// A lock to synchronize access to the store, such that data by the sync is
144150 /// never overwritten.
145151 sync_lock : Arc < Mutex < ( ) > > ,
@@ -152,7 +158,7 @@ impl Store {
152158 inner,
153159 session_meta : Default :: default ( ) ,
154160 sync_token : Default :: default ( ) ,
155- rooms : Default :: default ( ) ,
161+ rooms : Arc :: new ( StdRwLock :: new ( ObservableMap :: new ( ) ) ) ,
156162 sync_lock : Default :: default ( ) ,
157163 }
158164 }
@@ -173,15 +179,22 @@ impl Store {
173179 session_meta : SessionMeta ,
174180 roominfo_update_sender : & broadcast:: Sender < RoomInfoUpdate > ,
175181 ) -> Result < ( ) > {
176- for info in self . inner . get_room_infos ( ) . await ? {
177- let room = Room :: restore (
178- & session_meta. user_id ,
179- self . inner . clone ( ) ,
180- info,
181- roominfo_update_sender. clone ( ) ,
182- ) ;
183-
184- self . rooms . write ( ) . unwrap ( ) . insert ( room. room_id ( ) . to_owned ( ) , room) ;
182+ {
183+ let room_infos = self . inner . get_room_infos ( ) . await ?;
184+
185+ let mut rooms = self . rooms . write ( ) . unwrap ( ) ;
186+
187+ for room_info in room_infos {
188+ let new_room = Room :: restore (
189+ & session_meta. user_id ,
190+ self . inner . clone ( ) ,
191+ room_info,
192+ roominfo_update_sender. clone ( ) ,
193+ ) ;
194+ let new_room_id = new_room. room_id ( ) . to_owned ( ) ;
195+
196+ rooms. insert ( new_room_id, new_room) ;
197+ }
185198 }
186199
187200 let token =
@@ -200,7 +213,7 @@ impl Store {
200213
201214 /// Get all the rooms this store knows about.
202215 pub fn rooms ( & self ) -> Vec < Room > {
203- self . rooms . read ( ) . unwrap ( ) . values ( ) . cloned ( ) . collect ( )
216+ self . rooms . read ( ) . unwrap ( ) . iter ( ) . cloned ( ) . collect ( )
204217 }
205218
206219 /// Get all the rooms this store knows about, filtered by state.
@@ -209,18 +222,25 @@ impl Store {
209222 . read ( )
210223 . unwrap ( )
211224 . iter ( )
212- . filter ( |( _ , room) | filter. matches ( room. state ( ) ) )
213- . map ( | ( _ , room ) | room . clone ( ) )
225+ . filter ( |room| filter. matches ( room. state ( ) ) )
226+ . cloned ( )
214227 . collect ( )
215228 }
216229
230+ /// Get a stream of all the rooms changes, in addition to the existing
231+ /// rooms.
232+ #[ cfg( not( target_arch = "wasm32" ) ) ]
233+ pub fn rooms_stream ( & self ) -> ( Vector < Room > , impl Stream < Item = Vec < VectorDiff < Room > > > ) {
234+ self . rooms . read ( ) . unwrap ( ) . stream ( )
235+ }
236+
217237 /// Get the room with the given room id.
218238 pub fn room ( & self , room_id : & RoomId ) -> Option < Room > {
219239 self . rooms . read ( ) . unwrap ( ) . get ( room_id) . cloned ( )
220240 }
221241
222- /// Lookup the Room for the given RoomId, or create one, if it didn't exist
223- /// yet in the store.
242+ /// Lookup the ` Room` for the given ` RoomId` , or create one, if it didn't
243+ /// exist yet in the store
224244 pub fn get_or_create_room (
225245 & self ,
226246 room_id : & RoomId ,
@@ -233,8 +253,7 @@ impl Store {
233253 self . rooms
234254 . write ( )
235255 . unwrap ( )
236- . entry ( room_id. to_owned ( ) )
237- . or_insert_with ( || {
256+ . get_or_create ( room_id, || {
238257 Room :: new ( user_id, self . inner . clone ( ) , room_id, room_type, roominfo_update_sender)
239258 } )
240259 . clone ( )
0 commit comments