15
15
use std:: sync:: Mutex ;
16
16
17
17
use eyeball:: { SharedObservable , Subscriber } ;
18
+ use futures_util:: pin_mut;
18
19
use matrix_sdk:: { Client , Error , paginators:: PaginationToken } ;
19
20
use ruma:: { OwnedRoomId , api:: client:: space:: get_hierarchy} ;
21
+ use tokio:: task:: JoinHandle ;
22
+ use tracing:: error;
20
23
21
24
use crate :: spaces:: SpaceServiceRoom ;
22
25
@@ -36,18 +39,60 @@ pub struct SpaceServiceRoomList {
36
39
pagination_state : SharedObservable < SpaceServiceRoomListPaginationState > ,
37
40
38
41
rooms : SharedObservable < Vec < SpaceServiceRoom > > ,
42
+
43
+ room_update_handle : JoinHandle < ( ) > ,
39
44
}
40
45
46
+ impl Drop for SpaceServiceRoomList {
47
+ fn drop ( & mut self ) {
48
+ self . room_update_handle . abort ( ) ;
49
+ }
50
+ }
41
51
impl SpaceServiceRoomList {
42
52
pub fn new ( client : Client , parent_space_id : OwnedRoomId ) -> Self {
53
+ let rooms = SharedObservable :: new ( Vec :: < SpaceServiceRoom > :: new ( ) ) ;
54
+
55
+ let client_clone = client. clone ( ) ;
56
+ let rooms_clone = rooms. clone ( ) ;
57
+ let all_room_updates_receiver = client. subscribe_to_all_room_updates ( ) ;
58
+
59
+ let handle = tokio:: spawn ( async move {
60
+ pin_mut ! ( all_room_updates_receiver) ;
61
+
62
+ loop {
63
+ match all_room_updates_receiver. recv ( ) . await {
64
+ Ok ( updates) => {
65
+ let mut new_rooms = rooms_clone. get ( ) ;
66
+
67
+ updates. iter_all_room_ids ( ) . for_each ( |updated_room_id| {
68
+ if let Some ( room) =
69
+ new_rooms. iter_mut ( ) . find ( |room| & room. room_id == updated_room_id)
70
+ && let Some ( update_room) = client_clone. get_room ( updated_room_id)
71
+ {
72
+ * room = SpaceServiceRoom :: new_from_known ( update_room) ;
73
+ }
74
+ } ) ;
75
+
76
+ if new_rooms != rooms_clone. get ( ) {
77
+ rooms_clone. set ( new_rooms) ;
78
+ }
79
+ }
80
+ Err ( err) => {
81
+ error ! ( "error when listening to room updates: {err}" ) ;
82
+ }
83
+ }
84
+ }
85
+ } ) ;
86
+
43
87
Self {
44
88
client,
45
89
parent_space_id,
46
90
token : Mutex :: new ( None . into ( ) ) ,
47
91
pagination_state : SharedObservable :: new ( SpaceServiceRoomListPaginationState :: Idle {
48
92
end_reached : false ,
49
93
} ) ,
50
- rooms : SharedObservable :: new ( Vec :: new ( ) ) ,
94
+ rooms,
95
+ room_update_handle : handle,
51
96
}
52
97
}
53
98
@@ -132,13 +177,13 @@ impl SpaceServiceRoomList {
132
177
mod tests {
133
178
use assert_matches2:: assert_matches;
134
179
use futures_util:: pin_mut;
135
- use matrix_sdk:: test_utils:: mocks:: MatrixMockServer ;
136
- use matrix_sdk_test:: async_test;
180
+ use matrix_sdk:: { RoomState , test_utils:: mocks:: MatrixMockServer } ;
181
+ use matrix_sdk_test:: { JoinedRoomBuilder , LeftRoomBuilder , async_test} ;
137
182
use ruma:: {
138
183
room:: { JoinRuleSummary , RoomSummary } ,
139
184
room_id, uint,
140
185
} ;
141
- use stream_assert:: { assert_next_eq, assert_next_matches, assert_pending} ;
186
+ use stream_assert:: { assert_next_eq, assert_next_matches, assert_pending, assert_ready } ;
142
187
143
188
use crate :: spaces:: {
144
189
SpaceService , SpaceServiceRoom , room_list:: SpaceServiceRoomListPaginationState ,
@@ -220,4 +265,58 @@ mod tests {
220
265
]
221
266
) ;
222
267
}
268
+
269
+ #[ async_test]
270
+ async fn test_room_state_updates ( ) {
271
+ let server = MatrixMockServer :: new ( ) . await ;
272
+ let client = server. client_builder ( ) . build ( ) . await ;
273
+ let space_service = SpaceService :: new ( client. clone ( ) ) . await ;
274
+
275
+ let parent_space_id = room_id ! ( "!parent_space:example.org" ) ;
276
+ let child_room_id_1 = room_id ! ( "!1:example.org" ) ;
277
+ let child_room_id_2 = room_id ! ( "!2:example.org" ) ;
278
+
279
+ server
280
+ . mock_get_hierarchy ( )
281
+ . ok_with_room_ids ( vec ! [ child_room_id_1, child_room_id_2] )
282
+ . mount ( )
283
+ . await ;
284
+
285
+ let room_list = space_service. space_room_list ( parent_space_id. to_owned ( ) ) ;
286
+
287
+ room_list. paginate ( ) . await . unwrap ( ) ;
288
+
289
+ // This space contains 2 rooms
290
+ assert_eq ! ( room_list. rooms( ) . first( ) . unwrap( ) . room_id, child_room_id_1) ;
291
+ assert_eq ! ( room_list. rooms( ) . last( ) . unwrap( ) . room_id, child_room_id_2) ;
292
+
293
+ // and we don't know about either of them
294
+ assert_eq ! ( room_list. rooms( ) . first( ) . unwrap( ) . state, None ) ;
295
+ assert_eq ! ( room_list. rooms( ) . last( ) . unwrap( ) . state, None ) ;
296
+
297
+ let rooms_subscriber = room_list. subscribe_to_room_updates ( ) ;
298
+ pin_mut ! ( rooms_subscriber) ;
299
+ assert_pending ! ( rooms_subscriber) ;
300
+
301
+ // Joining one of them though
302
+ server. sync_room ( & client, JoinedRoomBuilder :: new ( child_room_id_1) ) . await ;
303
+
304
+ // Results in an update being pushed through
305
+ assert_ready ! ( rooms_subscriber) ;
306
+ assert_eq ! ( room_list. rooms( ) . first( ) . unwrap( ) . state, Some ( RoomState :: Joined ) ) ;
307
+ assert_eq ! ( room_list. rooms( ) . last( ) . unwrap( ) . state, None ) ;
308
+
309
+ // Same for the second one
310
+ server. sync_room ( & client, JoinedRoomBuilder :: new ( child_room_id_2) ) . await ;
311
+ assert_ready ! ( rooms_subscriber) ;
312
+ assert_eq ! ( room_list. rooms( ) . first( ) . unwrap( ) . state, Some ( RoomState :: Joined ) ) ;
313
+ assert_eq ! ( room_list. rooms( ) . last( ) . unwrap( ) . state, Some ( RoomState :: Joined ) ) ;
314
+
315
+ // And when leaving them
316
+ server. sync_room ( & client, LeftRoomBuilder :: new ( child_room_id_1) ) . await ;
317
+ server. sync_room ( & client, LeftRoomBuilder :: new ( child_room_id_2) ) . await ;
318
+ assert_ready ! ( rooms_subscriber) ;
319
+ assert_eq ! ( room_list. rooms( ) . first( ) . unwrap( ) . state, Some ( RoomState :: Left ) ) ;
320
+ assert_eq ! ( room_list. rooms( ) . last( ) . unwrap( ) . state, Some ( RoomState :: Left ) ) ;
321
+ }
223
322
}
0 commit comments