Skip to content

Commit 872713c

Browse files
zecakehHywan
andauthored
Add setting to ignore the timeout sync setting on first sync (#5481)
The `timeout` setting on the `/sync` endpoint is the maximum allowed time for the server to send its response, because this is a poll-based API. It means that if there is no new data to show, the server will wait until the end of `timeout` before returning a response. It can be an undesirable behavior when starting a client and informing the user that we are "catching up" while waiting for the first response. By not setting a `timeout` on the first request to `/sync`, the homeserver should reply immediately, whether the response is empty or not. --------- Signed-off-by: Kévin Commaille <[email protected]> Signed-off-by: Ivan Enderlin <[email protected]> Co-authored-by: Ivan Enderlin <[email protected]>
1 parent feb22d4 commit 872713c

File tree

4 files changed

+146
-32
lines changed

4 files changed

+146
-32
lines changed

crates/matrix-sdk/CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ All notable changes to this project will be documented in this file.
88

99
### Features
1010

11+
- Add `ignore_timeout_on_first_sync` to the `SyncSettings`, which should allow to have a quicker
12+
first response when using one of the `sync`, `sync_with_callback`, `sync_with_result_callback`
13+
or `sync_stream` methods on `Client`, if the response is empty.
14+
([#5481](https://github.com/matrix-org/matrix-rust-sdk/pull/5481))
1115
- The methods to use the `/v3/sync` endpoint set the `use_state_after` field,
1216
which means that, if the server supports it, the response will contain the
1317
state changes between the last sync and the end of the timeline.

crates/matrix-sdk/src/client/mod.rs

Lines changed: 75 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2533,30 +2533,21 @@ impl Client {
25332533
#[instrument(skip(self, callback))]
25342534
pub async fn sync_with_result_callback<C>(
25352535
&self,
2536-
mut sync_settings: crate::config::SyncSettings,
2536+
sync_settings: crate::config::SyncSettings,
25372537
callback: impl Fn(Result<SyncResponse, Error>) -> C,
25382538
) -> Result<(), Error>
25392539
where
25402540
C: Future<Output = Result<LoopCtrl, Error>>,
25412541
{
2542-
let mut last_sync_time: Option<Instant> = None;
2543-
2544-
if sync_settings.token.is_none() {
2545-
sync_settings.token = self.sync_token().await;
2546-
}
2547-
2548-
loop {
2549-
trace!("Syncing");
2550-
let result = self.sync_loop_helper(&mut sync_settings).await;
2542+
let mut sync_stream = Box::pin(self.sync_stream(sync_settings).await);
25512543

2544+
while let Some(result) = sync_stream.next().await {
25522545
trace!("Running callback");
25532546
if callback(result).await? == LoopCtrl::Break {
25542547
trace!("Callback told us to stop");
25552548
break;
25562549
}
25572550
trace!("Done running callback");
2558-
2559-
Client::delay_sync(&mut last_sync_time).await
25602551
}
25612552

25622553
Ok(())
@@ -2609,6 +2600,8 @@ impl Client {
26092600
&self,
26102601
mut sync_settings: crate::config::SyncSettings,
26112602
) -> impl Stream<Item = Result<SyncResponse>> + '_ {
2603+
let mut is_first_sync = true;
2604+
let mut timeout = None;
26122605
let mut last_sync_time: Option<Instant> = None;
26132606

26142607
if sync_settings.token.is_none() {
@@ -2617,13 +2610,28 @@ impl Client {
26172610

26182611
let parent_span = Span::current();
26192612

2620-
async_stream::stream! {
2613+
async_stream::stream!({
26212614
loop {
2622-
yield self.sync_loop_helper(&mut sync_settings).instrument(parent_span.clone()).await;
2615+
trace!("Syncing");
2616+
2617+
if sync_settings.ignore_timeout_on_first_sync {
2618+
if is_first_sync {
2619+
timeout = sync_settings.timeout.take();
2620+
} else if sync_settings.timeout.is_none() && timeout.is_some() {
2621+
sync_settings.timeout = timeout.take();
2622+
}
2623+
2624+
is_first_sync = false;
2625+
}
2626+
2627+
yield self
2628+
.sync_loop_helper(&mut sync_settings)
2629+
.instrument(parent_span.clone())
2630+
.await;
26232631

26242632
Client::delay_sync(&mut last_sync_time).await
26252633
}
2626-
}
2634+
})
26272635
}
26282636

26292637
/// Get the current, if any, sync token of the client.
@@ -2932,7 +2940,7 @@ pub(crate) mod tests {
29322940
use assert_matches::assert_matches;
29332941
use assert_matches2::assert_let;
29342942
use eyeball::SharedObservable;
2935-
use futures_util::{pin_mut, FutureExt};
2943+
use futures_util::{pin_mut, FutureExt, StreamExt};
29362944
use js_int::{uint, UInt};
29372945
use matrix_sdk_base::{
29382946
store::{MemoryStore, StoreConfig},
@@ -2970,8 +2978,9 @@ pub(crate) mod tests {
29702978

29712979
use super::Client;
29722980
use crate::{
2981+
assert_let_timeout,
29732982
client::{futures::SendMediaUploadRequest, WeakClient},
2974-
config::RequestConfig,
2983+
config::{RequestConfig, SyncSettings},
29752984
futures::SendRequest,
29762985
media::MediaError,
29772986
test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
@@ -3831,4 +3840,53 @@ pub(crate) mod tests {
38313840
assert_eq!(max, uint!(1));
38323841
assert_eq!(current, UInt::new_wrapping(data.len() as u64));
38333842
}
3843+
3844+
#[async_test]
3845+
async fn test_dont_ignore_timeout_on_first_sync() {
3846+
let server = MatrixMockServer::new().await;
3847+
let client = server.client_builder().build().await;
3848+
3849+
server
3850+
.mock_sync()
3851+
.timeout(Some(Duration::from_secs(30)))
3852+
.ok(|_| {})
3853+
.mock_once()
3854+
.named("sync_with_timeout")
3855+
.mount()
3856+
.await;
3857+
3858+
// Call the endpoint once to check the timeout.
3859+
let mut stream = Box::pin(client.sync_stream(SyncSettings::new()).await);
3860+
assert_let_timeout!(Some(Ok(_)) = stream.next());
3861+
}
3862+
3863+
#[async_test]
3864+
async fn test_ignore_timeout_on_first_sync() {
3865+
let server = MatrixMockServer::new().await;
3866+
let client = server.client_builder().build().await;
3867+
3868+
server
3869+
.mock_sync()
3870+
.timeout(None)
3871+
.ok(|_| {})
3872+
.mock_once()
3873+
.named("sync_no_timeout")
3874+
.mount()
3875+
.await;
3876+
server
3877+
.mock_sync()
3878+
.timeout(Some(Duration::from_secs(30)))
3879+
.ok(|_| {})
3880+
.mock_once()
3881+
.named("sync_with_timeout")
3882+
.mount()
3883+
.await;
3884+
3885+
// Call each version of the endpoint once to check the timeouts.
3886+
let mut stream = Box::pin(
3887+
client.sync_stream(SyncSettings::new().ignore_timeout_on_first_sync(true)).await,
3888+
);
3889+
assert_let_timeout!(Some(Ok(_)) = stream.next());
3890+
assert_let_timeout!(Some(Ok(_)) = stream.next());
3891+
}
38343892
}

crates/matrix-sdk/src/config/sync.rs

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ pub struct SyncSettings {
2525
// Filter is pretty big at 1000 bytes, box it to reduce stack size
2626
pub(crate) filter: Option<Box<sync_events::v3::Filter>>,
2727
pub(crate) timeout: Option<Duration>,
28+
pub(crate) ignore_timeout_on_first_sync: bool,
2829
pub(crate) token: Option<String>,
2930
pub(crate) full_state: bool,
3031
pub(crate) set_presence: PresenceState,
@@ -39,10 +40,18 @@ impl Default for SyncSettings {
3940
#[cfg(not(tarpaulin_include))]
4041
impl fmt::Debug for SyncSettings {
4142
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42-
let Self { filter, timeout, token: _, full_state, set_presence } = self;
43+
let Self {
44+
filter,
45+
timeout,
46+
ignore_timeout_on_first_sync,
47+
token: _,
48+
full_state,
49+
set_presence,
50+
} = self;
4351
f.debug_struct("SyncSettings")
4452
.maybe_field("filter", filter)
4553
.maybe_field("timeout", timeout)
54+
.field("ignore_timeout_on_first_sync", ignore_timeout_on_first_sync)
4655
.field("full_state", full_state)
4756
.field("set_presence", set_presence)
4857
.finish()
@@ -56,6 +65,7 @@ impl SyncSettings {
5665
Self {
5766
filter: None,
5867
timeout: Some(DEFAULT_SYNC_TIMEOUT),
68+
ignore_timeout_on_first_sync: false,
5969
token: None,
6070
full_state: false,
6171
set_presence: PresenceState::Online,
@@ -85,6 +95,33 @@ impl SyncSettings {
8595
self
8696
}
8797

98+
/// Whether to ignore the `timeout` the first time that the `/sync` endpoint
99+
/// is called.
100+
///
101+
/// If there is no new data to show, the server will wait until the end of
102+
/// `timeout` before returning a response. It can be an undesirable
103+
/// behavior when starting a client and informing the user that we are
104+
/// "catching up" while waiting for the first response.
105+
///
106+
/// By not setting a `timeout` on the first request to `/sync`, the
107+
/// homeserver should reply immediately, whether the response is empty or
108+
/// not.
109+
///
110+
/// Note that this setting is ignored when calling [`Client::sync_once()`],
111+
/// because there is no loop happening.
112+
///
113+
/// # Arguments
114+
///
115+
/// * `ignore` - Whether to ignore the `timeout` the first time that the
116+
/// `/sync` endpoint is called.
117+
///
118+
/// [`Client::sync_once()`]: crate::Client::sync_once
119+
#[must_use]
120+
pub fn ignore_timeout_on_first_sync(mut self, ignore: bool) -> Self {
121+
self.ignore_timeout_on_first_sync = ignore;
122+
self
123+
}
124+
88125
/// Set the sync filter.
89126
/// It can be either the filter ID, or the definition for the filter.
90127
///

crates/matrix-sdk/src/test_utils/mocks/mod.rs

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,10 @@ use serde::Deserialize;
5050
use serde_json::{from_value, json, Value};
5151
use tokio::sync::oneshot::{self, Receiver};
5252
use wiremock::{
53-
matchers::{body_json, body_partial_json, header, method, path, path_regex, query_param},
53+
matchers::{
54+
body_json, body_partial_json, header, method, path, path_regex, query_param,
55+
query_param_is_missing,
56+
},
5457
Mock, MockBuilder, MockGuard, MockServer, Request, Respond, ResponseTemplate, Times,
5558
};
5659

@@ -342,7 +345,6 @@ impl MatrixMockServer {
342345
mock,
343346
SyncEndpoint { sync_response_builder: self.sync_response_builder.clone() },
344347
)
345-
.expect_default_access_token()
346348
}
347349

348350
/// Creates a prebuilt mock for joining a room.
@@ -2314,7 +2316,30 @@ pub struct SyncEndpoint {
23142316
sync_response_builder: Arc<Mutex<SyncResponseBuilder>>,
23152317
}
23162318

2317-
impl MockEndpoint<'_, SyncEndpoint> {
2319+
impl<'a> MockEndpoint<'a, SyncEndpoint> {
2320+
/// Expect the given timeout, or lack thereof, in the request.
2321+
pub fn timeout(mut self, timeout: Option<Duration>) -> Self {
2322+
if let Some(timeout) = timeout {
2323+
self.mock = self.mock.and(query_param("timeout", timeout.as_millis().to_string()));
2324+
} else {
2325+
self.mock = self.mock.and(query_param_is_missing("timeout"));
2326+
}
2327+
2328+
self
2329+
}
2330+
2331+
/// Mocks the sync endpoint, using the given function to generate the
2332+
/// response.
2333+
pub fn ok<F: FnOnce(&mut SyncResponseBuilder)>(self, func: F) -> MatrixMock<'a> {
2334+
let json_response = {
2335+
let mut builder = self.endpoint.sync_response_builder.lock().unwrap();
2336+
func(&mut builder);
2337+
builder.build_json_sync_response()
2338+
};
2339+
2340+
self.respond_with(ResponseTemplate::new(200).set_body_json(json_response))
2341+
}
2342+
23182343
/// Temporarily mocks the sync with the given endpoint and runs a client
23192344
/// sync with it.
23202345
///
@@ -2346,17 +2371,7 @@ impl MockEndpoint<'_, SyncEndpoint> {
23462371
/// # anyhow::Ok(()) });
23472372
/// ```
23482373
pub async fn ok_and_run<F: FnOnce(&mut SyncResponseBuilder)>(self, client: &Client, func: F) {
2349-
let json_response = {
2350-
let mut builder = self.endpoint.sync_response_builder.lock().unwrap();
2351-
func(&mut builder);
2352-
builder.build_json_sync_response()
2353-
};
2354-
2355-
let _scope = self
2356-
.mock
2357-
.respond_with(ResponseTemplate::new(200).set_body_json(json_response))
2358-
.mount_as_scoped(self.server)
2359-
.await;
2374+
let _scope = self.ok(func).mount_as_scoped().await;
23602375

23612376
let _response = client.sync_once(Default::default()).await.unwrap();
23622377
}

0 commit comments

Comments
 (0)