16
16
//!
17
17
//! See [`SpaceService`] for details.
18
18
19
- use eyeball:: { SharedObservable , Subscriber } ;
19
+ use std:: sync:: Arc ;
20
+
21
+ use eyeball_im:: { ObservableVector , VectorDiff , VectorSubscriber , VectorSubscriberBatchedStream } ;
22
+ use futures_core:: Stream ;
20
23
use futures_util:: pin_mut;
24
+ use imbl:: Vector ;
21
25
use matrix_sdk:: { Client , deserialized_responses:: SyncOrStrippedState , locks:: Mutex } ;
22
26
use matrix_sdk_common:: executor:: { JoinHandle , spawn} ;
23
27
use ruma:: {
@@ -39,7 +43,7 @@ pub mod room_list;
39
43
pub struct SpaceService {
40
44
client : Client ,
41
45
42
- joined_spaces : SharedObservable < Vec < SpaceRoom > > ,
46
+ joined_spaces : Arc < Mutex < ObservableVector < SpaceRoom > > > ,
43
47
44
48
room_update_handle : Mutex < Option < JoinHandle < ( ) > > > ,
45
49
}
@@ -56,15 +60,17 @@ impl SpaceService {
56
60
pub fn new ( client : Client ) -> Self {
57
61
Self {
58
62
client,
59
- joined_spaces : SharedObservable :: new ( Vec :: new ( ) ) ,
63
+ joined_spaces : Arc :: new ( Mutex :: new ( ObservableVector :: new ( ) ) ) ,
60
64
room_update_handle : Mutex :: new ( None ) ,
61
65
}
62
66
}
63
67
64
- pub fn subscribe_to_joined_spaces ( & self ) -> Subscriber < Vec < SpaceRoom > > {
68
+ pub fn subscribe_to_joined_spaces (
69
+ & self ,
70
+ ) -> ( Vector < SpaceRoom > , VectorSubscriberBatchedStream < SpaceRoom > ) {
65
71
if self . room_update_handle . lock ( ) . is_none ( ) {
66
- let client_clone = self . client . clone ( ) ;
67
- let joined_spaces_clone = self . joined_spaces . clone ( ) ;
72
+ let client = self . client . clone ( ) ;
73
+ let joined_spaces = Arc :: clone ( & self . joined_spaces ) ;
68
74
let all_room_updates_receiver = self . client . subscribe_to_all_room_updates ( ) ;
69
75
70
76
* self . room_update_handle . lock ( ) = Some ( spawn ( async move {
@@ -73,10 +79,8 @@ impl SpaceService {
73
79
loop {
74
80
match all_room_updates_receiver. recv ( ) . await {
75
81
Ok ( _) => {
76
- let new_spaces = Self :: joined_spaces_for ( & client_clone) . await ;
77
- if new_spaces != joined_spaces_clone. get ( ) {
78
- joined_spaces_clone. set ( new_spaces) ;
79
- }
82
+ let new_spaces = Vector :: from ( Self :: joined_spaces_for ( & client) . await ) ;
83
+ Self :: update_joined_spaces_if_needed ( new_spaces, & joined_spaces) ;
80
84
}
81
85
Err ( err) => {
82
86
error ! ( "error when listening to room updates: {err}" ) ;
@@ -86,15 +90,14 @@ impl SpaceService {
86
90
} ) ) ;
87
91
}
88
92
89
- self . joined_spaces . subscribe ( )
93
+ self . joined_spaces . lock ( ) . subscribe ( ) . into_values_and_batched_stream ( )
90
94
}
91
95
92
96
pub async fn joined_spaces ( & self ) -> Vec < SpaceRoom > {
93
97
let spaces = Self :: joined_spaces_for ( & self . client ) . await ;
94
98
95
- if spaces != self . joined_spaces . get ( ) {
96
- self . joined_spaces . set ( spaces. clone ( ) ) ;
97
- }
99
+ self . joined_spaces . lock ( ) . clear ( ) ;
100
+ self . joined_spaces . lock ( ) . append ( Vector :: from ( spaces. clone ( ) ) ) ;
98
101
99
102
spaces
100
103
}
@@ -103,6 +106,18 @@ impl SpaceService {
103
106
SpaceRoomList :: new ( self . client . clone ( ) , space_id)
104
107
}
105
108
109
+ fn update_joined_spaces_if_needed (
110
+ new_spaces : Vector < SpaceRoom > ,
111
+ joined_spaces : & Arc < Mutex < ObservableVector < SpaceRoom > > > ,
112
+ ) {
113
+ let old_spaces = joined_spaces. lock ( ) . clone ( ) ;
114
+
115
+ if new_spaces != old_spaces {
116
+ joined_spaces. lock ( ) . clear ( ) ;
117
+ joined_spaces. lock ( ) . append ( Vector :: from ( new_spaces) ) ;
118
+ }
119
+ }
120
+
106
121
async fn joined_spaces_for ( client : & Client ) -> Vec < SpaceRoom > {
107
122
let joined_spaces = client
108
123
. joined_rooms ( )
@@ -340,7 +355,7 @@ mod tests {
340
355
341
356
let space_service = SpaceService :: new ( client. clone ( ) ) ;
342
357
343
- let joined_spaces_subscriber = space_service. subscribe_to_joined_spaces ( ) ;
358
+ let ( _ , joined_spaces_subscriber) = space_service. subscribe_to_joined_spaces ( ) ;
344
359
pin_mut ! ( joined_spaces_subscriber) ;
345
360
assert_pending ! ( joined_spaces_subscriber) ;
346
361
@@ -349,6 +364,17 @@ mod tests {
349
364
vec![ SpaceRoom :: new_from_known( client. get_room( first_space_id) . unwrap( ) , 0 ) ]
350
365
) ;
351
366
367
+ assert_next_eq ! (
368
+ joined_spaces_subscriber,
369
+ vec![ VectorDiff :: Append {
370
+ values: vec![ SpaceRoom :: new_from_known(
371
+ client. get_room( first_space_id) . unwrap( ) ,
372
+ 0
373
+ ) ]
374
+ . into( )
375
+ } ]
376
+ ) ;
377
+
352
378
// Join the second space
353
379
354
380
server
@@ -380,12 +406,25 @@ mod tests {
380
406
]
381
407
) ;
382
408
383
- // The subscriber yields new results when a space is joined
384
409
assert_next_eq ! (
385
410
joined_spaces_subscriber,
386
411
vec![
387
- SpaceRoom :: new_from_known( client. get_room( first_space_id) . unwrap( ) , 0 ) ,
388
- SpaceRoom :: new_from_known( client. get_room( second_space_id) . unwrap( ) , 1 )
412
+ VectorDiff :: Clear ,
413
+ VectorDiff :: Append {
414
+ values: vec![
415
+ SpaceRoom :: new_from_known( client. get_room( first_space_id) . unwrap( ) , 0 ) ,
416
+ SpaceRoom :: new_from_known( client. get_room( second_space_id) . unwrap( ) , 1 )
417
+ ]
418
+ . into( )
419
+ } ,
420
+ VectorDiff :: Clear ,
421
+ VectorDiff :: Append {
422
+ values: vec![
423
+ SpaceRoom :: new_from_known( client. get_room( first_space_id) . unwrap( ) , 0 ) ,
424
+ SpaceRoom :: new_from_known( client. get_room( second_space_id) . unwrap( ) , 1 )
425
+ ]
426
+ . into( )
427
+ }
389
428
]
390
429
) ;
391
430
@@ -394,7 +433,16 @@ mod tests {
394
433
// and when one is left
395
434
assert_next_eq ! (
396
435
joined_spaces_subscriber,
397
- vec![ SpaceRoom :: new_from_known( client. get_room( first_space_id) . unwrap( ) , 0 ) ]
436
+ vec![
437
+ VectorDiff :: Clear ,
438
+ VectorDiff :: Append {
439
+ values: vec![ SpaceRoom :: new_from_known(
440
+ client. get_room( first_space_id) . unwrap( ) ,
441
+ 0
442
+ ) ]
443
+ . into( )
444
+ } ,
445
+ ]
398
446
) ;
399
447
400
448
// but it doesn't when a non-space room gets joined
0 commit comments