Skip to content

Commit 45d074b

Browse files
committed
Add a reactive version of the joined_spaces method
1 parent 257c48d commit 45d074b

File tree

2 files changed

+160
-9
lines changed

2 files changed

+160
-9
lines changed

bindings/matrix-sdk-ffi/src/client.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1261,7 +1261,11 @@ impl Client {
12611261
SyncServiceBuilder::new((*self.inner).clone(), self.utd_hook_manager.get().cloned())
12621262
}
12631263

1264-
pub fn spaces_service(&self) -> Arc<SpaceService> {
1264+
pub async fn spaces_service(&self) -> Arc<SpaceService> {
1265+
// This method doesn't need to be async but if its not the FFI layer panics
1266+
// with "there is no no reactor running, must be called from the context
1267+
// of a Tokio 1.x runtime" error because the undelying constructor spawns
1268+
// an async task.
12651269
let inner = UISpaceService::new((*self.inner).clone());
12661270
Arc::new(SpaceService::new(inner))
12671271
}

crates/matrix-sdk-ui/src/spaces/mod.rs

Lines changed: 155 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,11 @@
1616
//!
1717
//! See [`SpaceService`] for details.
1818
19+
use eyeball::{SharedObservable, Subscriber};
1920
use matrix_sdk::Client;
2021
use ruma::OwnedRoomId;
22+
use tokio::task::JoinHandle;
23+
use tracing::error;
2124

2225
pub use crate::spaces::{room::SpaceServiceRoom, room_list::SpaceServiceRoomList};
2326

@@ -26,37 +29,87 @@ pub mod room_list;
2629

2730
pub struct SpaceService {
2831
client: Client,
32+
33+
joined_spaces: SharedObservable<Vec<SpaceServiceRoom>>,
34+
35+
room_update_handle: JoinHandle<()>,
36+
}
37+
38+
impl Drop for SpaceService {
39+
fn drop(&mut self) {
40+
self.room_update_handle.abort();
41+
}
2942
}
3043

3144
impl SpaceService {
3245
pub fn new(client: Client) -> Self {
33-
Self { client }
46+
let joined_spaces = SharedObservable::new(Vec::new());
47+
48+
joined_spaces.set(Self::joined_spaces_for(&client));
49+
50+
let client_clone = client.clone();
51+
let joined_spaces_clone = joined_spaces.clone();
52+
let all_room_updates_receiver = client.subscribe_to_all_room_updates();
53+
54+
let handle = tokio::spawn(async move {
55+
pin_mut!(all_room_updates_receiver);
56+
57+
loop {
58+
match all_room_updates_receiver.recv().await {
59+
Ok(_) => {
60+
let new_spaces = Self::joined_spaces_for(&client_clone);
61+
if new_spaces != joined_spaces_clone.get() {
62+
joined_spaces_clone.set(new_spaces);
63+
}
64+
}
65+
Err(err) => {
66+
error!("error when listening to room updates: {err}");
67+
}
68+
}
69+
}
70+
});
71+
72+
Self { client, joined_spaces, room_update_handle: handle }
73+
}
74+
75+
pub fn subscribe_to_joined_spaces(&self) -> Subscriber<Vec<SpaceServiceRoom>> {
76+
self.joined_spaces.subscribe()
3477
}
3578

3679
pub fn joined_spaces(&self) -> Vec<SpaceServiceRoom> {
37-
self.client
80+
self.joined_spaces.get()
81+
}
82+
83+
pub fn space_room_list(&self, space_id: OwnedRoomId) -> SpaceServiceRoomList {
84+
SpaceServiceRoomList::new(self.client.clone(), space_id)
85+
}
86+
87+
fn joined_spaces_for(client: &Client) -> Vec<SpaceServiceRoom> {
88+
client
3889
.joined_rooms()
3990
.into_iter()
4091
.filter_map(|room| if room.is_space() { Some(room) } else { None })
4192
.map(SpaceServiceRoom::new_from_known)
4293
.collect::<Vec<_>>()
4394
}
44-
45-
pub fn space_room_list(&self, space_id: OwnedRoomId) -> SpaceServiceRoomList {
46-
SpaceServiceRoomList::new(self.client.clone(), space_id)
47-
}
4895
}
4996

5097
#[cfg(test)]
5198
mod tests {
5299
use assert_matches2::assert_let;
53100
use matrix_sdk::{room::ParentSpace, test_utils::mocks::MatrixMockServer};
54-
use matrix_sdk_test::{JoinedRoomBuilder, async_test, event_factory::EventFactory};
101+
use matrix_sdk_test::{
102+
JoinedRoomBuilder, LeftRoomBuilder, async_test, event_factory::EventFactory,
103+
};
55104
use ruma::{RoomVersionId, room::RoomType, room_id};
56105
use tokio_stream::StreamExt;
57106

58107
use super::*;
59108

109+
use futures_util::pin_mut;
110+
111+
use stream_assert::{assert_next_eq, assert_pending};
112+
60113
#[async_test]
61114
async fn test_spaces_hierarchy() {
62115
let server = MatrixMockServer::new().await;
@@ -137,7 +190,7 @@ mod tests {
137190
let parent_space = client.get_room(parent_space_id).unwrap();
138191
assert!(parent_space.is_space());
139192

140-
// Then the parent space and the two child spaces are linked
193+
// And the parent space and the two child spaces are linked
141194

142195
let spaces: Vec<ParentSpace> = client
143196
.get_room(child_space_id_1)
@@ -165,4 +218,98 @@ mod tests {
165218
assert_let!(ParentSpace::Reciprocal(parent) = spaces.last().unwrap());
166219
assert_eq!(parent.room_id(), parent_space.room_id());
167220
}
221+
222+
#[async_test]
223+
async fn test_joined_spaces_updates() {
224+
let server = MatrixMockServer::new().await;
225+
let client = server.client_builder().build().await;
226+
let user_id = client.user_id().unwrap();
227+
let factory = EventFactory::new();
228+
229+
server.mock_room_state_encryption().plain().mount().await;
230+
231+
let first_space_id = room_id!("!first_space:example.org");
232+
let second_space_id = room_id!("!second_space:example.org");
233+
234+
// Join the first space
235+
server
236+
.sync_room(
237+
&client,
238+
JoinedRoomBuilder::new(first_space_id).add_state_event(factory.create(
239+
user_id,
240+
RoomVersionId::V1,
241+
Some(RoomType::Space),
242+
)),
243+
)
244+
.await;
245+
246+
// Build the `SpaceService` and expect the room to show up with no updates
247+
// pending
248+
249+
let space_service = SpaceService::new(client.clone());
250+
251+
let joined_spaces_subscriber = space_service.subscribe_to_joined_spaces();
252+
pin_mut!(joined_spaces_subscriber);
253+
assert_pending!(joined_spaces_subscriber);
254+
255+
assert_eq!(
256+
space_service.joined_spaces(),
257+
vec![SpaceServiceRoom::new_from_known(client.get_room(first_space_id).unwrap())]
258+
);
259+
260+
// Join the second space
261+
262+
server
263+
.sync_room(
264+
&client,
265+
JoinedRoomBuilder::new(second_space_id).add_state_event(factory.create(
266+
user_id,
267+
RoomVersionId::V1,
268+
Some(RoomType::Space),
269+
)),
270+
)
271+
.await;
272+
273+
// And expect the list to update
274+
assert_eq!(
275+
space_service.joined_spaces(),
276+
vec![
277+
SpaceServiceRoom::new_from_known(client.get_room(first_space_id).unwrap()),
278+
SpaceServiceRoom::new_from_known(client.get_room(second_space_id).unwrap())
279+
]
280+
);
281+
282+
// The subscriber yields new results when a space is joined
283+
assert_next_eq!(
284+
joined_spaces_subscriber,
285+
vec![
286+
SpaceServiceRoom::new_from_known(client.get_room(first_space_id).unwrap()),
287+
SpaceServiceRoom::new_from_known(client.get_room(second_space_id).unwrap())
288+
]
289+
);
290+
291+
server.sync_room(&client, LeftRoomBuilder::new(second_space_id)).await;
292+
293+
// and when one is left
294+
assert_next_eq!(
295+
joined_spaces_subscriber,
296+
vec![SpaceServiceRoom::new_from_known(client.get_room(first_space_id).unwrap())]
297+
);
298+
299+
// but it doesn't when a non-space room gets joined
300+
server
301+
.sync_room(
302+
&client,
303+
JoinedRoomBuilder::new(room_id!("!room:example.org"))
304+
.add_state_event(factory.create(user_id, RoomVersionId::V1, None)),
305+
)
306+
.await;
307+
308+
// and the subscriber doesn't yield any updates
309+
assert_pending!(joined_spaces_subscriber);
310+
assert_eq!(
311+
space_service.joined_spaces(),
312+
vec![SpaceServiceRoom::new_from_known(client.get_room(first_space_id).unwrap())]
313+
);
314+
}
168315
}

0 commit comments

Comments
 (0)