16
16
//!
17
17
//! See [`SpaceService`] for details.
18
18
19
- use eyeball:: { SharedObservable , Subscriber } ;
19
+ use std:: sync:: Arc ;
20
+
21
+ use eyeball_im:: { ObservableVector , VectorSubscriberBatchedStream } ;
20
22
use futures_util:: pin_mut;
23
+ use imbl:: Vector ;
21
24
use matrix_sdk:: { Client , deserialized_responses:: SyncOrStrippedState , locks:: Mutex } ;
22
25
use matrix_sdk_common:: executor:: { JoinHandle , spawn} ;
23
26
use ruma:: {
@@ -39,7 +42,7 @@ pub mod room_list;
39
42
pub struct SpaceService {
40
43
client : Client ,
41
44
42
- joined_spaces : SharedObservable < Vec < SpaceRoom > > ,
45
+ joined_spaces : Arc < Mutex < ObservableVector < SpaceRoom > > > ,
43
46
44
47
room_update_handle : Mutex < Option < JoinHandle < ( ) > > > ,
45
48
}
@@ -56,15 +59,17 @@ impl SpaceService {
56
59
pub fn new ( client : Client ) -> Self {
57
60
Self {
58
61
client,
59
- joined_spaces : SharedObservable :: new ( Vec :: new ( ) ) ,
62
+ joined_spaces : Arc :: new ( Mutex :: new ( ObservableVector :: new ( ) ) ) ,
60
63
room_update_handle : Mutex :: new ( None ) ,
61
64
}
62
65
}
63
66
64
- pub fn subscribe_to_joined_spaces ( & self ) -> Subscriber < Vec < SpaceRoom > > {
67
+ pub fn subscribe_to_joined_spaces (
68
+ & self ,
69
+ ) -> ( Vector < SpaceRoom > , VectorSubscriberBatchedStream < SpaceRoom > ) {
65
70
if self . room_update_handle . lock ( ) . is_none ( ) {
66
- let client_clone = self . client . clone ( ) ;
67
- let joined_spaces_clone = self . joined_spaces . clone ( ) ;
71
+ let client = self . client . clone ( ) ;
72
+ let joined_spaces = Arc :: clone ( & self . joined_spaces ) ;
68
73
let all_room_updates_receiver = self . client . subscribe_to_all_room_updates ( ) ;
69
74
70
75
* self . room_update_handle . lock ( ) = Some ( spawn ( async move {
@@ -73,10 +78,8 @@ impl SpaceService {
73
78
loop {
74
79
match all_room_updates_receiver. recv ( ) . await {
75
80
Ok ( _) => {
76
- let new_spaces = Self :: joined_spaces_for ( & client_clone) . await ;
77
- if new_spaces != joined_spaces_clone. get ( ) {
78
- joined_spaces_clone. set ( new_spaces) ;
79
- }
81
+ let new_spaces = Vector :: from ( Self :: joined_spaces_for ( & client) . await ) ;
82
+ Self :: update_joined_spaces_if_needed ( new_spaces, & joined_spaces) ;
80
83
}
81
84
Err ( err) => {
82
85
error ! ( "error when listening to room updates: {err}" ) ;
@@ -86,15 +89,13 @@ impl SpaceService {
86
89
} ) ) ;
87
90
}
88
91
89
- self . joined_spaces . subscribe ( )
92
+ self . joined_spaces . lock ( ) . subscribe ( ) . into_values_and_batched_stream ( )
90
93
}
91
94
92
95
pub async fn joined_spaces ( & self ) -> Vec < SpaceRoom > {
93
96
let spaces = Self :: joined_spaces_for ( & self . client ) . await ;
94
97
95
- if spaces != self . joined_spaces . get ( ) {
96
- self . joined_spaces . set ( spaces. clone ( ) ) ;
97
- }
98
+ Self :: update_joined_spaces_if_needed ( Vector :: from ( spaces. clone ( ) ) , & self . joined_spaces ) ;
98
99
99
100
spaces
100
101
}
@@ -103,6 +104,18 @@ impl SpaceService {
103
104
SpaceRoomList :: new ( self . client . clone ( ) , space_id)
104
105
}
105
106
107
+ fn update_joined_spaces_if_needed (
108
+ new_spaces : Vector < SpaceRoom > ,
109
+ joined_spaces : & Arc < Mutex < ObservableVector < SpaceRoom > > > ,
110
+ ) {
111
+ let old_spaces = joined_spaces. lock ( ) . clone ( ) ;
112
+
113
+ if new_spaces != old_spaces {
114
+ joined_spaces. lock ( ) . clear ( ) ;
115
+ joined_spaces. lock ( ) . append ( new_spaces) ;
116
+ }
117
+ }
118
+
106
119
async fn joined_spaces_for ( client : & Client ) -> Vec < SpaceRoom > {
107
120
let joined_spaces = client
108
121
. joined_rooms ( )
@@ -175,6 +188,7 @@ impl SpaceService {
175
188
#[ cfg( test) ]
176
189
mod tests {
177
190
use assert_matches2:: assert_let;
191
+ use eyeball_im:: VectorDiff ;
178
192
use futures_util:: pin_mut;
179
193
use matrix_sdk:: { room:: ParentSpace , test_utils:: mocks:: MatrixMockServer } ;
180
194
use matrix_sdk_test:: {
@@ -340,7 +354,7 @@ mod tests {
340
354
341
355
let space_service = SpaceService :: new ( client. clone ( ) ) ;
342
356
343
- let joined_spaces_subscriber = space_service. subscribe_to_joined_spaces ( ) ;
357
+ let ( _ , joined_spaces_subscriber) = space_service. subscribe_to_joined_spaces ( ) ;
344
358
pin_mut ! ( joined_spaces_subscriber) ;
345
359
assert_pending ! ( joined_spaces_subscriber) ;
346
360
@@ -349,6 +363,17 @@ mod tests {
349
363
vec![ SpaceRoom :: new_from_known( client. get_room( first_space_id) . unwrap( ) , 0 ) ]
350
364
) ;
351
365
366
+ assert_next_eq ! (
367
+ joined_spaces_subscriber,
368
+ vec![ VectorDiff :: Append {
369
+ values: vec![ SpaceRoom :: new_from_known(
370
+ client. get_room( first_space_id) . unwrap( ) ,
371
+ 0
372
+ ) ]
373
+ . into( )
374
+ } ]
375
+ ) ;
376
+
352
377
// Join the second space
353
378
354
379
server
@@ -380,12 +405,17 @@ mod tests {
380
405
]
381
406
) ;
382
407
383
- // The subscriber yields new results when a space is joined
384
408
assert_next_eq ! (
385
409
joined_spaces_subscriber,
386
410
vec![
387
- SpaceRoom :: new_from_known( client. get_room( first_space_id) . unwrap( ) , 0 ) ,
388
- SpaceRoom :: new_from_known( client. get_room( second_space_id) . unwrap( ) , 1 )
411
+ VectorDiff :: Clear ,
412
+ VectorDiff :: Append {
413
+ values: vec![
414
+ SpaceRoom :: new_from_known( client. get_room( first_space_id) . unwrap( ) , 0 ) ,
415
+ SpaceRoom :: new_from_known( client. get_room( second_space_id) . unwrap( ) , 1 )
416
+ ]
417
+ . into( )
418
+ } ,
389
419
]
390
420
) ;
391
421
@@ -394,7 +424,16 @@ mod tests {
394
424
// and when one is left
395
425
assert_next_eq ! (
396
426
joined_spaces_subscriber,
397
- vec![ SpaceRoom :: new_from_known( client. get_room( first_space_id) . unwrap( ) , 0 ) ]
427
+ vec![
428
+ VectorDiff :: Clear ,
429
+ VectorDiff :: Append {
430
+ values: vec![ SpaceRoom :: new_from_known(
431
+ client. get_room( first_space_id) . unwrap( ) ,
432
+ 0
433
+ ) ]
434
+ . into( )
435
+ } ,
436
+ ]
398
437
) ;
399
438
400
439
// but it doesn't when a non-space room gets joined
0 commit comments