17
17
//! See [`SpaceService`] for details.
18
18
19
19
use eyeball:: { SharedObservable , Subscriber } ;
20
+ use futures_util:: pin_mut;
20
21
use matrix_sdk:: Client ;
21
22
use ruma:: OwnedRoomId ;
22
23
use tokio:: task:: JoinHandle ;
24
+ use tokio_stream:: StreamExt ;
23
25
use tracing:: error;
24
26
25
27
pub use crate :: spaces:: { room:: SpaceServiceRoom , room_list:: SpaceServiceRoomList } ;
@@ -42,10 +44,10 @@ impl Drop for SpaceService {
42
44
}
43
45
44
46
impl SpaceService {
45
- pub fn new ( client : Client ) -> Self {
47
+ pub async fn new ( client : Client ) -> Self {
46
48
let joined_spaces = SharedObservable :: new ( Vec :: new ( ) ) ;
47
49
48
- joined_spaces. set ( Self :: joined_spaces_for ( & client) ) ;
50
+ joined_spaces. set ( Self :: joined_spaces_for ( & client) . await ) ;
49
51
50
52
let client_clone = client. clone ( ) ;
51
53
let joined_spaces_clone = joined_spaces. clone ( ) ;
@@ -57,7 +59,7 @@ impl SpaceService {
57
59
loop {
58
60
match all_room_updates_receiver. recv ( ) . await {
59
61
Ok ( _) => {
60
- let new_spaces = Self :: joined_spaces_for ( & client_clone) ;
62
+ let new_spaces = Self :: joined_spaces_for ( & client_clone) . await ;
61
63
if new_spaces != joined_spaces_clone. get ( ) {
62
64
joined_spaces_clone. set ( new_spaces) ;
63
65
}
@@ -84,13 +86,29 @@ impl SpaceService {
84
86
SpaceServiceRoomList :: new ( self . client . clone ( ) , space_id)
85
87
}
86
88
87
- fn joined_spaces_for ( client : & Client ) -> Vec < SpaceServiceRoom > {
88
- client
89
+ async fn joined_spaces_for ( client : & Client ) -> Vec < SpaceServiceRoom > {
90
+ let joined_spaces = client
89
91
. joined_rooms ( )
90
92
. into_iter ( )
91
93
. filter_map ( |room| if room. is_space ( ) { Some ( room) } else { None } )
92
- . map ( SpaceServiceRoom :: new_from_known)
93
- . collect :: < Vec < _ > > ( )
94
+ . collect :: < Vec < _ > > ( ) ;
95
+
96
+ let top_level_spaces: Vec < SpaceServiceRoom > = tokio_stream:: iter ( joined_spaces)
97
+ . then ( |room| async move {
98
+ let ok = if let Ok ( parents) = room. parent_spaces ( ) . await {
99
+ pin_mut ! ( parents) ;
100
+ parents. any ( |p| p. is_ok ( ) ) . await == false
101
+ } else {
102
+ false
103
+ } ;
104
+ ( room, ok)
105
+ } )
106
+ . filter ( |( _, ok) | * ok)
107
+ . map ( |( x, _) | SpaceServiceRoom :: new_from_known ( x) )
108
+ . collect ( )
109
+ . await ;
110
+
111
+ top_level_spaces
94
112
}
95
113
}
96
114
@@ -115,7 +133,7 @@ mod tests {
115
133
let server = MatrixMockServer :: new ( ) . await ;
116
134
let client = server. client_builder ( ) . build ( ) . await ;
117
135
let user_id = client. user_id ( ) . unwrap ( ) ;
118
- let space_service = SpaceService :: new ( client. clone ( ) ) ;
136
+ let space_service = SpaceService :: new ( client. clone ( ) ) . await ;
119
137
let factory = EventFactory :: new ( ) ;
120
138
121
139
server. mock_room_state_encryption ( ) . plain ( ) . mount ( ) . await ;
@@ -181,10 +199,10 @@ mod tests {
181
199
)
182
200
. await ;
183
201
184
- // All joined
202
+ // Only the parent space is returned
185
203
assert_eq ! (
186
204
space_service. joined_spaces( ) . iter( ) . map( |s| s. room_id. to_owned( ) ) . collect:: <Vec <_>>( ) ,
187
- vec![ child_space_id_1 , child_space_id_2 , parent_space_id]
205
+ vec![ parent_space_id]
188
206
) ;
189
207
190
208
let parent_space = client. get_room ( parent_space_id) . unwrap ( ) ;
@@ -246,7 +264,7 @@ mod tests {
246
264
// Build the `SpaceService` and expect the room to show up with no updates
247
265
// pending
248
266
249
- let space_service = SpaceService :: new ( client. clone ( ) ) ;
267
+ let space_service = SpaceService :: new ( client. clone ( ) ) . await ;
250
268
251
269
let joined_spaces_subscriber = space_service. subscribe_to_joined_spaces ( ) ;
252
270
pin_mut ! ( joined_spaces_subscriber) ;
0 commit comments