Skip to content

Commit 544b24b

Browse files
committed
Move some code out of client.rs
It is by far the biggest module of the matrix-sdk crate.
1 parent 1e13e06 commit 544b24b

File tree

3 files changed

+151
-131
lines changed

3 files changed

+151
-131
lines changed

crates/matrix-sdk/src/client.rs

Lines changed: 7 additions & 131 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,8 @@ use std::{
2626
use anymap2::any::CloneAnySendSync;
2727
use dashmap::DashMap;
2828
use futures_core::stream::Stream;
29-
use futures_timer::Delay as sleep;
3029
use matrix_sdk_base::{
31-
deserialized_responses::{JoinedRoom, LeftRoom, SyncResponse},
30+
deserialized_responses::SyncResponse,
3231
media::{MediaEventContent, MediaFormat, MediaRequest, MediaThumbnailSize, MediaType},
3332
BaseClient, Session, Store,
3433
};
@@ -711,6 +710,12 @@ impl Client {
711710
self
712711
}
713712

713+
pub(crate) async fn notification_handlers(
714+
&self,
715+
) -> RwLockReadGuard<'_, Vec<NotificationHandlerFn>> {
716+
self.notification_handlers.read().await
717+
}
718+
714719
/// Get all the rooms the client knows about.
715720
///
716721
/// This will return the list of joined, invited, and left rooms.
@@ -1723,135 +1728,6 @@ impl Client {
17231728
self.send(request, None).await
17241729
}
17251730

1726-
pub(crate) async fn process_sync(
1727-
&self,
1728-
response: sync_events::Response,
1729-
) -> Result<SyncResponse> {
1730-
let response = self.base_client.receive_sync_response(response).await?;
1731-
let SyncResponse {
1732-
next_batch: _,
1733-
rooms,
1734-
presence,
1735-
account_data,
1736-
to_device: _,
1737-
device_lists: _,
1738-
device_one_time_keys_count: _,
1739-
ambiguity_changes: _,
1740-
notifications,
1741-
} = &response;
1742-
1743-
self.handle_sync_events(EventKind::GlobalAccountData, &None, &account_data.events).await?;
1744-
self.handle_sync_events(EventKind::Presence, &None, &presence.events).await?;
1745-
1746-
for (room_id, room_info) in &rooms.join {
1747-
let room = self.get_room(room_id);
1748-
if room.is_none() {
1749-
error!("Can't call event handler, room {} not found", room_id);
1750-
continue;
1751-
}
1752-
1753-
let JoinedRoom { unread_notifications: _, timeline, state, account_data, ephemeral } =
1754-
room_info;
1755-
1756-
self.handle_sync_events(EventKind::EphemeralRoomData, &room, &ephemeral.events).await?;
1757-
self.handle_sync_events(EventKind::RoomAccountData, &room, &account_data.events)
1758-
.await?;
1759-
self.handle_sync_state_events(&room, &state.events).await?;
1760-
self.handle_sync_timeline_events(&room, &timeline.events).await?;
1761-
}
1762-
1763-
for (room_id, room_info) in &rooms.leave {
1764-
let room = self.get_room(room_id);
1765-
if room.is_none() {
1766-
error!("Can't call event handler, room {} not found", room_id);
1767-
continue;
1768-
}
1769-
1770-
let LeftRoom { timeline, state, account_data } = room_info;
1771-
1772-
self.handle_sync_events(EventKind::RoomAccountData, &room, &account_data.events)
1773-
.await?;
1774-
self.handle_sync_state_events(&room, &state.events).await?;
1775-
self.handle_sync_timeline_events(&room, &timeline.events).await?;
1776-
}
1777-
1778-
for (room_id, room_info) in &rooms.invite {
1779-
let room = self.get_room(room_id);
1780-
if room.is_none() {
1781-
error!("Can't call event handler, room {} not found", room_id);
1782-
continue;
1783-
}
1784-
1785-
// FIXME: Destructure room_info
1786-
self.handle_sync_events(
1787-
EventKind::StrippedState,
1788-
&room,
1789-
&room_info.invite_state.events,
1790-
)
1791-
.await?;
1792-
}
1793-
1794-
// Construct notification event handler futures
1795-
let mut futures = Vec::new();
1796-
for handler in &*self.notification_handlers.read().await {
1797-
for (room_id, room_notifications) in notifications {
1798-
let room = match self.get_room(room_id) {
1799-
Some(room) => room,
1800-
None => {
1801-
warn!("Can't call notification handler, room {} not found", room_id);
1802-
continue;
1803-
}
1804-
};
1805-
1806-
futures.extend(room_notifications.iter().map(|notification| {
1807-
(handler)(notification.clone(), room.clone(), self.clone())
1808-
}));
1809-
}
1810-
}
1811-
1812-
// Run the notification handler futures with the
1813-
// `self.notification_handlers` lock no longer being held, in order.
1814-
for fut in futures {
1815-
fut.await;
1816-
}
1817-
1818-
Ok(response)
1819-
}
1820-
1821-
async fn sync_loop_helper(
1822-
&self,
1823-
sync_settings: &mut crate::config::SyncSettings<'_>,
1824-
) -> Result<SyncResponse> {
1825-
let response = self.sync_once(sync_settings.clone()).await;
1826-
1827-
match response {
1828-
Ok(r) => {
1829-
sync_settings.token = Some(r.next_batch.clone());
1830-
Ok(r)
1831-
}
1832-
Err(e) => {
1833-
error!("Received an invalid response: {}", e);
1834-
sleep::new(Duration::from_secs(1)).await;
1835-
Err(e)
1836-
}
1837-
}
1838-
}
1839-
1840-
async fn delay_sync(last_sync_time: &mut Option<Instant>) {
1841-
let now = Instant::now();
1842-
1843-
// If the last sync happened less than a second ago, sleep for a
1844-
// while to not hammer out requests if the server doesn't respect
1845-
// the sync timeout.
1846-
if let Some(t) = last_sync_time {
1847-
if now - *t <= Duration::from_secs(1) {
1848-
sleep::new(Duration::from_secs(1)).await;
1849-
}
1850-
}
1851-
1852-
*last_sync_time = Some(now);
1853-
}
1854-
18551731
/// Synchronize the client's state with the latest state on the server.
18561732
///
18571733
/// ## Syncing Events

crates/matrix-sdk/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ mod http_client;
5555
pub mod room;
5656
/// High-level room API
5757
mod room_member;
58+
mod sync;
5859

5960
#[cfg(feature = "encryption")]
6061
pub mod encryption;

crates/matrix-sdk/src/sync.rs

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
use std::time::Duration;
2+
3+
use futures_timer::Delay as sleep;
4+
use matrix_sdk_base::{
5+
deserialized_responses::{JoinedRoom, LeftRoom, SyncResponse},
6+
instant::Instant,
7+
};
8+
use ruma::api::client::r0::sync::sync_events;
9+
use tracing::{error, warn};
10+
11+
use crate::{event_handler::EventKind, Client, Result};
12+
13+
/// Internal functionality related to getting events from the server (`sync_events` endpoint)
14+
impl Client {
15+
pub(crate) async fn process_sync(
16+
&self,
17+
response: sync_events::Response,
18+
) -> Result<SyncResponse> {
19+
let response = self.base_client.receive_sync_response(response).await?;
20+
let SyncResponse {
21+
next_batch: _,
22+
rooms,
23+
presence,
24+
account_data,
25+
to_device: _,
26+
device_lists: _,
27+
device_one_time_keys_count: _,
28+
ambiguity_changes: _,
29+
notifications,
30+
} = &response;
31+
32+
self.handle_sync_events(EventKind::GlobalAccountData, &None, &account_data.events).await?;
33+
self.handle_sync_events(EventKind::Presence, &None, &presence.events).await?;
34+
35+
for (room_id, room_info) in &rooms.join {
36+
let room = self.get_room(room_id);
37+
if room.is_none() {
38+
error!("Can't call event handler, room {} not found", room_id);
39+
continue;
40+
}
41+
42+
let JoinedRoom { unread_notifications: _, timeline, state, account_data, ephemeral } =
43+
room_info;
44+
45+
self.handle_sync_events(EventKind::EphemeralRoomData, &room, &ephemeral.events).await?;
46+
self.handle_sync_events(EventKind::RoomAccountData, &room, &account_data.events)
47+
.await?;
48+
self.handle_sync_state_events(&room, &state.events).await?;
49+
self.handle_sync_timeline_events(&room, &timeline.events).await?;
50+
}
51+
52+
for (room_id, room_info) in &rooms.leave {
53+
let room = self.get_room(room_id);
54+
if room.is_none() {
55+
error!("Can't call event handler, room {} not found", room_id);
56+
continue;
57+
}
58+
59+
let LeftRoom { timeline, state, account_data } = room_info;
60+
61+
self.handle_sync_events(EventKind::RoomAccountData, &room, &account_data.events)
62+
.await?;
63+
self.handle_sync_state_events(&room, &state.events).await?;
64+
self.handle_sync_timeline_events(&room, &timeline.events).await?;
65+
}
66+
67+
for (room_id, room_info) in &rooms.invite {
68+
let room = self.get_room(room_id);
69+
if room.is_none() {
70+
error!("Can't call event handler, room {} not found", room_id);
71+
continue;
72+
}
73+
74+
// FIXME: Destructure room_info
75+
self.handle_sync_events(
76+
EventKind::StrippedState,
77+
&room,
78+
&room_info.invite_state.events,
79+
)
80+
.await?;
81+
}
82+
83+
// Construct notification event handler futures
84+
let mut futures = Vec::new();
85+
for handler in &*self.notification_handlers().await {
86+
for (room_id, room_notifications) in notifications {
87+
let room = match self.get_room(room_id) {
88+
Some(room) => room,
89+
None => {
90+
warn!("Can't call notification handler, room {} not found", room_id);
91+
continue;
92+
}
93+
};
94+
95+
futures.extend(room_notifications.iter().map(|notification| {
96+
(handler)(notification.clone(), room.clone(), self.clone())
97+
}));
98+
}
99+
}
100+
101+
// Run the notification handler futures with the
102+
// `self.notification_handlers` lock no longer being held, in order.
103+
for fut in futures {
104+
fut.await;
105+
}
106+
107+
Ok(response)
108+
}
109+
110+
pub(crate) async fn sync_loop_helper(
111+
&self,
112+
sync_settings: &mut crate::config::SyncSettings<'_>,
113+
) -> Result<SyncResponse> {
114+
let response = self.sync_once(sync_settings.clone()).await;
115+
116+
match response {
117+
Ok(r) => {
118+
sync_settings.token = Some(r.next_batch.clone());
119+
Ok(r)
120+
}
121+
Err(e) => {
122+
error!("Received an invalid response: {}", e);
123+
sleep::new(Duration::from_secs(1)).await;
124+
Err(e)
125+
}
126+
}
127+
}
128+
129+
pub(crate) async fn delay_sync(last_sync_time: &mut Option<Instant>) {
130+
let now = Instant::now();
131+
132+
// If the last sync happened less than a second ago, sleep for a
133+
// while to not hammer out requests if the server doesn't respect
134+
// the sync timeout.
135+
if let Some(t) = last_sync_time {
136+
if now - *t <= Duration::from_secs(1) {
137+
sleep::new(Duration::from_secs(1)).await;
138+
}
139+
}
140+
141+
*last_sync_time = Some(now);
142+
}
143+
}

0 commit comments

Comments
 (0)