18
18
19
19
use eyeball:: { SharedObservable , Subscriber } ;
20
20
use futures_util:: pin_mut;
21
- use matrix_sdk:: { Client , deserialized_responses:: SyncOrStrippedState } ;
21
+ use matrix_sdk:: { Client , deserialized_responses:: SyncOrStrippedState , locks :: Mutex } ;
22
22
use matrix_sdk_common:: executor:: { JoinHandle , spawn} ;
23
23
use ruma:: {
24
24
OwnedRoomId ,
@@ -41,52 +41,62 @@ pub struct SpaceService {
41
41
42
42
joined_spaces : SharedObservable < Vec < SpaceServiceRoom > > ,
43
43
44
- room_update_handle : JoinHandle < ( ) > ,
44
+ room_update_handle : Mutex < Option < JoinHandle < ( ) > > > ,
45
45
}
46
46
47
47
impl Drop for SpaceService {
48
48
fn drop ( & mut self ) {
49
- self . room_update_handle . abort ( ) ;
49
+ if let Some ( handle) = & * self . room_update_handle . lock ( ) {
50
+ handle. abort ( ) ;
51
+ }
50
52
}
51
53
}
52
54
53
55
impl SpaceService {
54
- pub async fn new ( client : Client ) -> Self {
55
- let joined_spaces = SharedObservable :: new ( Vec :: new ( ) ) ;
56
-
57
- joined_spaces. set ( Self :: joined_spaces_for ( & client) . await ) ;
58
-
59
- let client_clone = client. clone ( ) ;
60
- let joined_spaces_clone = joined_spaces. clone ( ) ;
61
- let all_room_updates_receiver = client. subscribe_to_all_room_updates ( ) ;
62
-
63
- let handle = spawn ( async move {
64
- pin_mut ! ( all_room_updates_receiver) ;
56
+ pub fn new ( client : Client ) -> Self {
57
+ Self {
58
+ client,
59
+ joined_spaces : SharedObservable :: new ( Vec :: new ( ) ) ,
60
+ room_update_handle : Mutex :: new ( None ) ,
61
+ }
62
+ }
65
63
66
- loop {
67
- match all_room_updates_receiver. recv ( ) . await {
68
- Ok ( _) => {
69
- let new_spaces = Self :: joined_spaces_for ( & client_clone) . await ;
70
- if new_spaces != joined_spaces_clone. get ( ) {
71
- joined_spaces_clone. set ( new_spaces) ;
64
+ pub fn subscribe_to_joined_spaces ( & self ) -> Subscriber < Vec < SpaceServiceRoom > > {
65
+ if self . room_update_handle . lock ( ) . is_none ( ) {
66
+ let client_clone = self . client . clone ( ) ;
67
+ let joined_spaces_clone = self . joined_spaces . clone ( ) ;
68
+ let all_room_updates_receiver = self . client . subscribe_to_all_room_updates ( ) ;
69
+
70
+ * self . room_update_handle . lock ( ) = Some ( spawn ( async move {
71
+ pin_mut ! ( all_room_updates_receiver) ;
72
+
73
+ loop {
74
+ match all_room_updates_receiver. recv ( ) . await {
75
+ Ok ( _) => {
76
+ let new_spaces = Self :: joined_spaces_for ( & client_clone) . await ;
77
+ if new_spaces != joined_spaces_clone. get ( ) {
78
+ joined_spaces_clone. set ( new_spaces) ;
79
+ }
80
+ }
81
+ Err ( err) => {
82
+ error ! ( "error when listening to room updates: {err}" ) ;
72
83
}
73
- }
74
- Err ( err) => {
75
- error ! ( "error when listening to room updates: {err}" ) ;
76
84
}
77
85
}
78
- }
79
- } ) ;
80
-
81
- Self { client, joined_spaces, room_update_handle : handle }
82
- }
86
+ } ) ) ;
87
+ }
83
88
84
- pub fn subscribe_to_joined_spaces ( & self ) -> Subscriber < Vec < SpaceServiceRoom > > {
85
89
self . joined_spaces . subscribe ( )
86
90
}
87
91
88
- pub fn joined_spaces ( & self ) -> Vec < SpaceServiceRoom > {
89
- self . joined_spaces . get ( )
92
+ pub async fn joined_spaces ( & self ) -> Vec < SpaceServiceRoom > {
93
+ let spaces = Self :: joined_spaces_for ( & self . client ) . await ;
94
+
95
+ if spaces != self . joined_spaces . get ( ) {
96
+ self . joined_spaces . set ( spaces. clone ( ) ) ;
97
+ }
98
+
99
+ spaces
90
100
}
91
101
92
102
pub fn space_room_list ( & self , space_id : OwnedRoomId ) -> SpaceServiceRoomList {
@@ -181,7 +191,7 @@ mod tests {
181
191
let server = MatrixMockServer :: new ( ) . await ;
182
192
let client = server. client_builder ( ) . build ( ) . await ;
183
193
let user_id = client. user_id ( ) . unwrap ( ) ;
184
- let space_service = SpaceService :: new ( client. clone ( ) ) . await ;
194
+ let space_service = SpaceService :: new ( client. clone ( ) ) ;
185
195
let factory = EventFactory :: new ( ) ;
186
196
187
197
server. mock_room_state_encryption ( ) . plain ( ) . mount ( ) . await ;
@@ -249,13 +259,23 @@ mod tests {
249
259
250
260
// Only the parent space is returned
251
261
assert_eq ! (
252
- space_service. joined_spaces( ) . iter( ) . map( |s| s. room_id. to_owned( ) ) . collect:: <Vec <_>>( ) ,
262
+ space_service
263
+ . joined_spaces( )
264
+ . await
265
+ . iter( )
266
+ . map( |s| s. room_id. to_owned( ) )
267
+ . collect:: <Vec <_>>( ) ,
253
268
vec![ parent_space_id]
254
269
) ;
255
270
256
271
// and it has 2 children
257
272
assert_eq ! (
258
- space_service. joined_spaces( ) . iter( ) . map( |s| s. children_count) . collect:: <Vec <_>>( ) ,
273
+ space_service
274
+ . joined_spaces( )
275
+ . await
276
+ . iter( )
277
+ . map( |s| s. children_count)
278
+ . collect:: <Vec <_>>( ) ,
259
279
vec![ 2 ]
260
280
) ;
261
281
@@ -318,14 +338,14 @@ mod tests {
318
338
// Build the `SpaceService` and expect the room to show up with no updates
319
339
// pending
320
340
321
- let space_service = SpaceService :: new ( client. clone ( ) ) . await ;
341
+ let space_service = SpaceService :: new ( client. clone ( ) ) ;
322
342
323
343
let joined_spaces_subscriber = space_service. subscribe_to_joined_spaces ( ) ;
324
344
pin_mut ! ( joined_spaces_subscriber) ;
325
345
assert_pending ! ( joined_spaces_subscriber) ;
326
346
327
347
assert_eq ! (
328
- space_service. joined_spaces( ) ,
348
+ space_service. joined_spaces( ) . await ,
329
349
vec![ SpaceServiceRoom :: new_from_known( client. get_room( first_space_id) . unwrap( ) , 0 ) ]
330
350
) ;
331
351
@@ -344,7 +364,7 @@ mod tests {
344
364
345
365
// And expect the list to update
346
366
assert_eq ! (
347
- space_service. joined_spaces( ) ,
367
+ space_service. joined_spaces( ) . await ,
348
368
vec![
349
369
SpaceServiceRoom :: new_from_known( client. get_room( first_space_id) . unwrap( ) , 0 ) ,
350
370
SpaceServiceRoom :: new_from_known( client. get_room( second_space_id) . unwrap( ) , 0 )
@@ -380,7 +400,7 @@ mod tests {
380
400
// and the subscriber doesn't yield any updates
381
401
assert_pending ! ( joined_spaces_subscriber) ;
382
402
assert_eq ! (
383
- space_service. joined_spaces( ) ,
403
+ space_service. joined_spaces( ) . await ,
384
404
vec![ SpaceServiceRoom :: new_from_known( client. get_room( first_space_id) . unwrap( ) , 0 ) ]
385
405
) ;
386
406
}
0 commit comments