Skip to content

Commit f45098a

Browse files
committed
send queue: flag unrecoverable errors, and don't block the queue on those
1 parent d146ba8 commit f45098a

File tree

10 files changed

+378
-83
lines changed

10 files changed

+378
-83
lines changed

bindings/matrix-sdk-ffi/src/timeline/mod.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -852,7 +852,16 @@ pub enum EventSendState {
852852
NotSentYet,
853853
/// The local event has been sent to the server, but unsuccessfully: The
854854
/// sending has failed.
855-
SendingFailed { error: String },
855+
SendingFailed {
856+
/// Stringified error message.
857+
error: String,
858+
/// Whether the error is considered recoverable or not.
859+
///
860+
/// An error that's recoverable will disable the room's send queue,
861+
/// while an unrecoverable error will be parked, until the user
862+
/// decides to cancel sending it.
863+
is_recoverable: bool,
864+
},
856865
/// The local event has been sent successfully to the server.
857866
Sent { event_id: String },
858867
}
@@ -863,7 +872,9 @@ impl From<&matrix_sdk_ui::timeline::EventSendState> for EventSendState {
863872

864873
match value {
865874
NotSentYet => Self::NotSentYet,
866-
SendingFailed { error } => Self::SendingFailed { error: error.to_string() },
875+
SendingFailed { error, is_recoverable } => {
876+
Self::SendingFailed { error: error.to_string(), is_recoverable: *is_recoverable }
877+
}
867878
Sent { event_id } => Self::Sent { event_id: event_id.to_string() },
868879
}
869880
}

crates/matrix-sdk-ui/src/timeline/builder.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -318,11 +318,15 @@ impl TimelineBuilder {
318318
}
319319
}
320320

321-
RoomSendQueueUpdate::SendError { transaction_id, error } => {
321+
RoomSendQueueUpdate::SendError {
322+
transaction_id,
323+
error,
324+
is_recoverable,
325+
} => {
322326
timeline
323327
.update_event_send_state(
324328
&transaction_id,
325-
EventSendState::SendingFailed { error },
329+
EventSendState::SendingFailed { error, is_recoverable },
326330
)
327331
.await;
328332
}

crates/matrix-sdk-ui/src/timeline/event_item/local.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,12 @@ pub enum EventSendState {
5555
SendingFailed {
5656
/// Details about how sending the event failed.
5757
error: Arc<Error>,
58+
/// Whether the error is considered recoverable or not.
59+
///
60+
/// An error that's recoverable will disable the room's send queue,
61+
/// while an unrecoverable error will be parked, until the user
62+
/// decides to cancel sending it.
63+
is_recoverable: bool,
5864
},
5965
/// The local event has been sent successfully to the server.
6066
Sent {

crates/matrix-sdk-ui/src/timeline/tests/echo.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,14 +63,20 @@ async fn test_remote_echo_full_trip() {
6363
.inner
6464
.update_event_send_state(
6565
&txn_id,
66-
EventSendState::SendingFailed { error: Arc::new(some_io_error) },
66+
EventSendState::SendingFailed {
67+
error: Arc::new(some_io_error),
68+
is_recoverable: true,
69+
},
6770
)
6871
.await;
6972

7073
let item = assert_next_matches!(stream, VectorDiff::Set { value, index: 1 } => value);
7174
let event_item = item.as_event().unwrap();
7275
assert!(event_item.is_local_echo());
73-
assert_matches!(event_item.send_state(), Some(EventSendState::SendingFailed { .. }));
76+
assert_matches!(
77+
event_item.send_state(),
78+
Some(EventSendState::SendingFailed { is_recoverable: true, .. })
79+
);
7480
assert_eq!(item.unique_id(), id);
7581
}
7682

crates/matrix-sdk-ui/tests/integration/timeline/echo.rs

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -138,9 +138,6 @@ async fn test_retry_failed() {
138138

139139
mock_sync(&server, sync_builder.build_json_sync_response(), None).await;
140140
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
141-
server.reset().await;
142-
143-
mock_encryption_state(&server, false).await;
144141

145142
client.send_queue().set_enabled(true);
146143

@@ -149,20 +146,43 @@ async fn test_retry_failed() {
149146
let (_, mut timeline_stream) =
150147
timeline.subscribe_filter_map(|item| item.as_event().cloned()).await;
151148

149+
// When trying to send an event, return with a 500 error, which is interpreted
150+
// as a transient error.
151+
server.reset().await;
152+
mock_encryption_state(&server, false).await;
153+
let scoped_faulty_send = Mock::given(method("PUT"))
154+
.and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/.*"))
155+
.and(header("authorization", "Bearer 1234"))
156+
.respond_with(ResponseTemplate::new(500))
157+
.expect(3)
158+
.mount_as_scoped(&server)
159+
.await;
160+
152161
timeline.send(RoomMessageEventContent::text_plain("Hello, World!").into()).await.unwrap();
153162

154163
// Let the send queue handle the event.
155164
yield_now().await;
156165

157-
// First, local echo is added
166+
// First, local echo is added.
158167
assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => {
159168
assert_matches!(value.send_state(), Some(EventSendState::NotSentYet));
160169
});
161170

162-
// Sending fails, the mock server has no matching route yet
171+
// Sending fails, because the error is a transient one that's recoverable,
172+
// indicating something's wrong on the client side.
163173
assert_let!(Some(VectorDiff::Set { index: 0, value: item }) = timeline_stream.next().await);
164-
assert_matches!(item.send_state(), Some(EventSendState::SendingFailed { .. }));
174+
assert_matches!(
175+
item.send_state(),
176+
Some(EventSendState::SendingFailed { is_recoverable: true, .. })
177+
);
165178

179+
// This doesn't disable the send queue at the global level…
180+
assert!(client.send_queue().is_enabled());
181+
// …but does so at the local level.
182+
assert!(!room.send_queue().is_enabled());
183+
184+
// Have the endpoint return a success result, and re-enable the queue.
185+
drop(scoped_faulty_send);
166186
Mock::given(method("PUT"))
167187
.and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/.*"))
168188
.and(header("authorization", "Bearer 1234"))
@@ -173,11 +193,6 @@ async fn test_retry_failed() {
173193
.mount(&server)
174194
.await;
175195

176-
// This doesn't disable the send queue at the global level…
177-
assert!(client.send_queue().is_enabled());
178-
// …but does so at the local level.
179-
assert!(!room.send_queue().is_enabled());
180-
181196
room.send_queue().set_enabled(true);
182197

183198
// Let the send queue handle the event.

crates/matrix-sdk-ui/tests/integration/timeline/queue.rs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use serde_json::json;
2828
use stream_assert::{assert_next_matches, assert_pending};
2929
use tokio::{task::yield_now, time::sleep};
3030
use wiremock::{
31-
matchers::{body_string_contains, method, path_regex},
31+
matchers::{body_string_contains, header, method, path_regex},
3232
Mock, ResponseTemplate,
3333
};
3434

@@ -126,17 +126,26 @@ async fn test_retry_order() {
126126

127127
mock_sync(&server, sync_response_builder.build_json_sync_response(), None).await;
128128
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
129-
server.reset().await;
130-
131-
mock_encryption_state(&server, false).await;
132129

133130
let room = client.get_room(room_id).unwrap();
134131
let timeline = Arc::new(room.timeline().await.unwrap());
135132
let (_, mut timeline_stream) =
136133
timeline.subscribe_filter_map(|item| item.as_event().cloned()).await;
137134

135+
// When trying to send an event, return with a 500 error, which is interpreted
136+
// as a transient error.
137+
server.reset().await;
138+
mock_encryption_state(&server, false).await;
139+
let scoped_faulty_send = Mock::given(method("PUT"))
140+
.and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/.*"))
141+
.and(header("authorization", "Bearer 1234"))
142+
.respond_with(ResponseTemplate::new(500))
143+
.expect(3)
144+
.mount_as_scoped(&server)
145+
.await;
146+
138147
// Send two messages without mocking the server response.
139-
// It will respond with a 404, resulting in a failed-to-send state.
148+
// It will respond with a 500, resulting in a failed-to-send state.
140149
timeline.send(RoomMessageEventContent::text_plain("First!").into()).await.unwrap();
141150
timeline.send(RoomMessageEventContent::text_plain("Second.").into()).await.unwrap();
142151

@@ -157,6 +166,7 @@ async fn test_retry_order() {
157166
assert_matches!(first.send_state().unwrap(), EventSendState::SendingFailed { .. });
158167

159168
// Response for first message takes 100ms to respond
169+
drop(scoped_faulty_send);
160170
Mock::given(method("PUT"))
161171
.and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/.*"))
162172
.and(body_string_contains("First!"))

crates/matrix-sdk/src/http_client/mod.rs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,3 +259,53 @@ impl tower::Service<http_old::Request<Bytes>> for HttpClient {
259259
Box::pin(fut)
260260
}
261261
}
262+
263+
pub(crate) enum RetryKind {
264+
Transient {
265+
err: HttpError,
266+
// This is used only for attempts to retry, so on non-wasm32 code (in the `native` module).
267+
#[cfg_attr(target_arch = "wasm32", allow(dead_code))]
268+
retry_after: Option<Duration>,
269+
},
270+
Permanent(HttpError),
271+
}
272+
273+
impl RetryKind {
274+
pub fn error(self) -> HttpError {
275+
match self {
276+
RetryKind::Transient { err, .. } | RetryKind::Permanent(err) => err,
277+
}
278+
}
279+
}
280+
281+
/// Returns whether an API error is transient or permanent,
282+
pub(crate) fn characterize_retry_kind(err: HttpError) -> RetryKind {
283+
use ruma::api::client::error::{ErrorBody, ErrorKind, RetryAfter};
284+
285+
use crate::RumaApiError;
286+
287+
if let Some(api_error) = err.as_ruma_api_error() {
288+
let status_code = match api_error {
289+
RumaApiError::ClientApi(e) => match e.body {
290+
ErrorBody::Standard { kind: ErrorKind::LimitExceeded { retry_after }, .. } => {
291+
let retry_after = retry_after.and_then(|retry_after| match retry_after {
292+
RetryAfter::Delay(d) => Some(d),
293+
RetryAfter::DateTime(_) => None,
294+
});
295+
return RetryKind::Transient { err, retry_after };
296+
}
297+
_ => Some(e.status_code),
298+
},
299+
RumaApiError::Uiaa(_) => None,
300+
RumaApiError::Other(e) => Some(e.status_code),
301+
};
302+
303+
if let Some(status_code) = status_code {
304+
if status_code.is_server_error() {
305+
return RetryKind::Transient { err, retry_after: None };
306+
}
307+
}
308+
}
309+
310+
RetryKind::Permanent(err)
311+
}

crates/matrix-sdk/src/http_client/native.rs

Lines changed: 10 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,14 @@ use bytesize::ByteSize;
2525
use eyeball::SharedObservable;
2626
use http::header::CONTENT_LENGTH;
2727
use reqwest::Certificate;
28-
use ruma::api::{
29-
client::error::{ErrorBody as ClientApiErrorBody, ErrorKind as ClientApiErrorKind, RetryAfter},
30-
error::FromHttpResponseError,
31-
IncomingResponse, OutgoingRequest,
32-
};
28+
use ruma::api::{error::FromHttpResponseError, IncomingResponse, OutgoingRequest};
3329
use tracing::{info, warn};
3430

35-
use super::{response_to_http_response, HttpClient, TransmissionProgress, DEFAULT_REQUEST_TIMEOUT};
36-
use crate::{config::RequestConfig, error::HttpError, RumaApiError};
31+
use super::{
32+
characterize_retry_kind, response_to_http_response, HttpClient, RetryKind,
33+
TransmissionProgress, DEFAULT_REQUEST_TIMEOUT,
34+
};
35+
use crate::{config::RequestConfig, error::HttpError};
3736

3837
impl HttpClient {
3938
pub(super) async fn send_request<R>(
@@ -63,35 +62,11 @@ impl HttpClient {
6362
let error_type = if stop {
6463
RetryError::Permanent
6564
} else {
66-
|err: HttpError| {
67-
if let Some(api_error) = err.as_ruma_api_error() {
68-
let status_code = match api_error {
69-
RumaApiError::ClientApi(e) => match e.body {
70-
ClientApiErrorBody::Standard {
71-
kind: ClientApiErrorKind::LimitExceeded { retry_after },
72-
..
73-
} => {
74-
let retry_after =
75-
retry_after.and_then(|retry_after| match retry_after {
76-
RetryAfter::Delay(d) => Some(d),
77-
RetryAfter::DateTime(_) => None,
78-
});
79-
return RetryError::Transient { err, retry_after };
80-
}
81-
_ => Some(e.status_code),
82-
},
83-
RumaApiError::Uiaa(_) => None,
84-
RumaApiError::Other(e) => Some(e.status_code),
85-
};
86-
87-
if let Some(status_code) = status_code {
88-
if status_code.is_server_error() {
89-
return RetryError::Transient { err, retry_after: None };
90-
}
91-
}
65+
|err: HttpError| match characterize_retry_kind(err) {
66+
RetryKind::Transient { err, retry_after } => {
67+
RetryError::Transient { err, retry_after }
9268
}
93-
94-
RetryError::Permanent(err)
69+
RetryKind::Permanent(err) => RetryError::Permanent(err),
9570
}
9671
};
9772

0 commit comments

Comments
 (0)