16
16
//!
17
17
//! See [`SpaceService`] for details.
18
18
19
- use eyeball:: { SharedObservable , Subscriber } ;
19
+ use std:: sync:: Arc ;
20
+
21
+ use eyeball_im:: { ObservableVector , VectorSubscriberBatchedStream } ;
20
22
use futures_util:: pin_mut;
23
+ use imbl:: Vector ;
21
24
use matrix_sdk:: { Client , deserialized_responses:: SyncOrStrippedState , locks:: Mutex } ;
22
25
use matrix_sdk_common:: executor:: { JoinHandle , spawn} ;
23
26
use ruma:: {
@@ -39,7 +42,7 @@ pub mod room_list;
39
42
pub struct SpaceService {
40
43
client : Client ,
41
44
42
- joined_spaces : SharedObservable < Vec < SpaceRoom > > ,
45
+ joined_spaces : Arc < Mutex < ObservableVector < SpaceRoom > > > ,
43
46
44
47
room_update_handle : Mutex < Option < JoinHandle < ( ) > > > ,
45
48
}
@@ -56,15 +59,17 @@ impl SpaceService {
56
59
pub fn new ( client : Client ) -> Self {
57
60
Self {
58
61
client,
59
- joined_spaces : SharedObservable :: new ( Vec :: new ( ) ) ,
62
+ joined_spaces : Arc :: new ( Mutex :: new ( ObservableVector :: new ( ) ) ) ,
60
63
room_update_handle : Mutex :: new ( None ) ,
61
64
}
62
65
}
63
66
64
- pub fn subscribe_to_joined_spaces ( & self ) -> Subscriber < Vec < SpaceRoom > > {
67
+ pub fn subscribe_to_joined_spaces (
68
+ & self ,
69
+ ) -> ( Vector < SpaceRoom > , VectorSubscriberBatchedStream < SpaceRoom > ) {
65
70
if self . room_update_handle . lock ( ) . is_none ( ) {
66
- let client_clone = self . client . clone ( ) ;
67
- let joined_spaces_clone = self . joined_spaces . clone ( ) ;
71
+ let client = self . client . clone ( ) ;
72
+ let joined_spaces = Arc :: clone ( & self . joined_spaces ) ;
68
73
let all_room_updates_receiver = self . client . subscribe_to_all_room_updates ( ) ;
69
74
70
75
* self . room_update_handle . lock ( ) = Some ( spawn ( async move {
@@ -73,10 +78,8 @@ impl SpaceService {
73
78
loop {
74
79
match all_room_updates_receiver. recv ( ) . await {
75
80
Ok ( _) => {
76
- let new_spaces = Self :: joined_spaces_for ( & client_clone) . await ;
77
- if new_spaces != joined_spaces_clone. get ( ) {
78
- joined_spaces_clone. set ( new_spaces) ;
79
- }
81
+ let new_spaces = Vector :: from ( Self :: joined_spaces_for ( & client) . await ) ;
82
+ Self :: update_joined_spaces_if_needed ( new_spaces, & joined_spaces) ;
80
83
}
81
84
Err ( err) => {
82
85
error ! ( "error when listening to room updates: {err}" ) ;
@@ -86,15 +89,14 @@ impl SpaceService {
86
89
} ) ) ;
87
90
}
88
91
89
- self . joined_spaces . subscribe ( )
92
+ self . joined_spaces . lock ( ) . subscribe ( ) . into_values_and_batched_stream ( )
90
93
}
91
94
92
95
pub async fn joined_spaces ( & self ) -> Vec < SpaceRoom > {
93
96
let spaces = Self :: joined_spaces_for ( & self . client ) . await ;
94
97
95
- if spaces != self . joined_spaces . get ( ) {
96
- self . joined_spaces . set ( spaces. clone ( ) ) ;
97
- }
98
+ self . joined_spaces . lock ( ) . clear ( ) ;
99
+ self . joined_spaces . lock ( ) . append ( Vector :: from ( spaces. clone ( ) ) ) ;
98
100
99
101
spaces
100
102
}
@@ -103,6 +105,18 @@ impl SpaceService {
103
105
SpaceRoomList :: new ( self . client . clone ( ) , space_id)
104
106
}
105
107
108
+ fn update_joined_spaces_if_needed (
109
+ new_spaces : Vector < SpaceRoom > ,
110
+ joined_spaces : & Arc < Mutex < ObservableVector < SpaceRoom > > > ,
111
+ ) {
112
+ let old_spaces = joined_spaces. lock ( ) . clone ( ) ;
113
+
114
+ if new_spaces != old_spaces {
115
+ joined_spaces. lock ( ) . clear ( ) ;
116
+ joined_spaces. lock ( ) . append ( Vector :: from ( new_spaces) ) ;
117
+ }
118
+ }
119
+
106
120
async fn joined_spaces_for ( client : & Client ) -> Vec < SpaceRoom > {
107
121
let joined_spaces = client
108
122
. joined_rooms ( )
@@ -175,6 +189,7 @@ impl SpaceService {
175
189
#[ cfg( test) ]
176
190
mod tests {
177
191
use assert_matches2:: assert_let;
192
+ use eyeball_im:: VectorDiff ;
178
193
use futures_util:: pin_mut;
179
194
use matrix_sdk:: { room:: ParentSpace , test_utils:: mocks:: MatrixMockServer } ;
180
195
use matrix_sdk_test:: {
@@ -340,7 +355,7 @@ mod tests {
340
355
341
356
let space_service = SpaceService :: new ( client. clone ( ) ) ;
342
357
343
- let joined_spaces_subscriber = space_service. subscribe_to_joined_spaces ( ) ;
358
+ let ( _ , joined_spaces_subscriber) = space_service. subscribe_to_joined_spaces ( ) ;
344
359
pin_mut ! ( joined_spaces_subscriber) ;
345
360
assert_pending ! ( joined_spaces_subscriber) ;
346
361
@@ -349,6 +364,17 @@ mod tests {
349
364
vec![ SpaceRoom :: new_from_known( client. get_room( first_space_id) . unwrap( ) , 0 ) ]
350
365
) ;
351
366
367
+ assert_next_eq ! (
368
+ joined_spaces_subscriber,
369
+ vec![ VectorDiff :: Append {
370
+ values: vec![ SpaceRoom :: new_from_known(
371
+ client. get_room( first_space_id) . unwrap( ) ,
372
+ 0
373
+ ) ]
374
+ . into( )
375
+ } ]
376
+ ) ;
377
+
352
378
// Join the second space
353
379
354
380
server
@@ -380,12 +406,25 @@ mod tests {
380
406
]
381
407
) ;
382
408
383
- // The subscriber yields new results when a space is joined
384
409
assert_next_eq ! (
385
410
joined_spaces_subscriber,
386
411
vec![
387
- SpaceRoom :: new_from_known( client. get_room( first_space_id) . unwrap( ) , 0 ) ,
388
- SpaceRoom :: new_from_known( client. get_room( second_space_id) . unwrap( ) , 1 )
412
+ VectorDiff :: Clear ,
413
+ VectorDiff :: Append {
414
+ values: vec![
415
+ SpaceRoom :: new_from_known( client. get_room( first_space_id) . unwrap( ) , 0 ) ,
416
+ SpaceRoom :: new_from_known( client. get_room( second_space_id) . unwrap( ) , 1 )
417
+ ]
418
+ . into( )
419
+ } ,
420
+ VectorDiff :: Clear ,
421
+ VectorDiff :: Append {
422
+ values: vec![
423
+ SpaceRoom :: new_from_known( client. get_room( first_space_id) . unwrap( ) , 0 ) ,
424
+ SpaceRoom :: new_from_known( client. get_room( second_space_id) . unwrap( ) , 1 )
425
+ ]
426
+ . into( )
427
+ }
389
428
]
390
429
) ;
391
430
@@ -394,7 +433,16 @@ mod tests {
394
433
// and when one is left
395
434
assert_next_eq ! (
396
435
joined_spaces_subscriber,
397
- vec![ SpaceRoom :: new_from_known( client. get_room( first_space_id) . unwrap( ) , 0 ) ]
436
+ vec![
437
+ VectorDiff :: Clear ,
438
+ VectorDiff :: Append {
439
+ values: vec![ SpaceRoom :: new_from_known(
440
+ client. get_room( first_space_id) . unwrap( ) ,
441
+ 0
442
+ ) ]
443
+ . into( )
444
+ } ,
445
+ ]
398
446
) ;
399
447
400
448
// but it doesn't when a non-space room gets joined
0 commit comments