diff --git a/crates/matrix-sdk/src/latest_events/latest_event.rs b/crates/matrix-sdk/src/latest_events/latest_event.rs index b2e5c9e4301..68361b6dd72 100644 --- a/crates/matrix-sdk/src/latest_events/latest_event.rs +++ b/crates/matrix-sdk/src/latest_events/latest_event.rs @@ -12,26 +12,29 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::iter::once; + use eyeball::{AsyncLock, SharedObservable, Subscriber}; use matrix_sdk_base::event_cache::Event; use ruma::{ events::{ - call::{invite::SyncCallInviteEvent, notify::SyncCallNotifyEvent}, - poll::unstable_start::SyncUnstablePollStartEvent, + call::{invite::CallInviteEventContent, notify::CallNotifyEventContent}, + poll::unstable_start::UnstablePollStartEventContent, relation::RelationType, room::{ - member::{MembershipState, SyncRoomMemberEvent}, - message::{MessageType, SyncRoomMessageEvent}, + member::{MembershipState, RoomMemberEventContent}, + message::{MessageType, RoomMessageEventContent}, power_levels::RoomPowerLevels, }, - sticker::SyncStickerEvent, - AnySyncMessageLikeEvent, AnySyncStateEvent, AnySyncTimelineEvent, + sticker::StickerEventContent, + AnyMessageLikeEventContent, AnySyncMessageLikeEvent, AnySyncStateEvent, + AnySyncTimelineEvent, SyncStateEvent, }, - EventId, OwnedEventId, OwnedRoomId, RoomId, UserId, + EventId, OwnedEventId, OwnedRoomId, OwnedTransactionId, RoomId, TransactionId, UserId, }; use tracing::warn; -use crate::{event_cache::RoomEventCache, room::WeakRoom}; +use crate::{event_cache::RoomEventCache, room::WeakRoom, send_queue::RoomSendQueueUpdate}; /// The latest event of a room or a thread. /// @@ -39,11 +42,18 @@ use crate::{event_cache::RoomEventCache, room::WeakRoom}; #[derive(Debug)] pub(super) struct LatestEvent { /// The room owning this latest event. - room_id: OwnedRoomId, + _room_id: OwnedRoomId, + /// The thread (if any) owning this latest event. - thread_id: Option, + _thread_id: Option, + + /// A buffer of the current [`LatestEventValue`] computed for local events + /// seen by the send queue. See [`LatestEventValuesForLocalEvents`] to learn + /// more. + buffer_of_values_for_local_events: LatestEventValuesForLocalEvents, + /// The latest event value. - value: SharedObservable, + current_value: SharedObservable, } impl LatestEvent { @@ -54,34 +64,60 @@ impl LatestEvent { weak_room: &WeakRoom, ) -> Self { Self { - room_id: room_id.to_owned(), - thread_id: thread_id.map(ToOwned::to_owned), - value: SharedObservable::new_async( - LatestEventValue::new(room_id, thread_id, room_event_cache, weak_room).await, + _room_id: room_id.to_owned(), + _thread_id: thread_id.map(ToOwned::to_owned), + buffer_of_values_for_local_events: LatestEventValuesForLocalEvents::new(), + current_value: SharedObservable::new_async( + LatestEventValue::new_remote(room_event_cache, weak_room).await, ), } } /// Return a [`Subscriber`] to new values. pub async fn subscribe(&self) -> Subscriber { - self.value.subscribe().await + self.current_value.subscribe().await + } + + /// Update the inner latest event value, based on the event cache + /// (specifically with a [`RoomEventCache`]). + pub async fn update_with_event_cache( + &mut self, + room_event_cache: &RoomEventCache, + power_levels: &Option<(&UserId, RoomPowerLevels)>, + ) { + let new_value = + LatestEventValue::new_remote_with_power_levels(room_event_cache, power_levels).await; + + self.update(new_value).await; } - /// Update the inner latest event value. - pub async fn update( + /// Update the inner latest event value, based on the send queue + /// (specifically with a [`RoomSendQueueUpdate`]). + pub async fn update_with_send_queue( &mut self, + send_queue_update: &RoomSendQueueUpdate, room_event_cache: &RoomEventCache, power_levels: &Option<(&UserId, RoomPowerLevels)>, ) { - let new_value = LatestEventValue::new_with_power_levels( - &self.room_id, - self.thread_id.as_deref(), + let new_value = LatestEventValue::new_local( + send_queue_update, + &mut self.buffer_of_values_for_local_events, room_event_cache, power_levels, ) .await; - self.value.set(new_value).await; + self.update(new_value).await; + } + + /// Update [`Self::current_value`] if and only if the `new_value` is not + /// [`LatestEventValue::None`]. + async fn update(&mut self, new_value: LatestEventValue) { + if let LatestEventValue::None = new_value { + // Do not update to a `None` value. + } else { + self.current_value.set(new_value).await; + } } } @@ -92,33 +128,20 @@ pub enum LatestEventValue { #[default] None, - /// A `m.room.message` event. - RoomMessage(SyncRoomMessageEvent), - - /// A `m.sticker` event. - Sticker(SyncStickerEvent), - - /// An `org.matrix.msc3381.poll.start` event. - Poll(SyncUnstablePollStartEvent), - - /// A `m.call.invite` event. - CallInvite(SyncCallInviteEvent), + /// The latest event represents a remote event. + Remote(LatestEventKind), - /// A `m.call.notify` event. - CallNotify(SyncCallNotifyEvent), + /// The latest event represents a local event that is sending. + LocalIsSending(LatestEventKind), - /// A `m.room.member` event, more precisely a knock membership change that - /// can be handled by the current user. - KnockedStateEvent(SyncRoomMemberEvent), + /// The latest event represents a local event that is wedged, either because + /// a previous local event, or this local event cannot be sent. + LocalIsWedged(LatestEventKind), } impl LatestEventValue { - async fn new( - room_id: &RoomId, - thread_id: Option<&EventId>, - room_event_cache: &RoomEventCache, - weak_room: &WeakRoom, - ) -> Self { + /// Create a new [`LatestEventValue::Remote`]. + async fn new_remote(room_event_cache: &RoomEventCache, weak_room: &WeakRoom) -> Self { // Get the power levels of the user for the current room if the `WeakRoom` is // still valid. let room = weak_room.get(); @@ -132,85 +155,402 @@ impl LatestEventValue { None => None, }; - Self::new_with_power_levels(room_id, thread_id, room_event_cache, &power_levels).await + Self::new_remote_with_power_levels(room_event_cache, &power_levels).await } - async fn new_with_power_levels( - _room_id: &RoomId, - _thread_id: Option<&EventId>, + /// Create a new [`LatestEventValue::Remote`] based on existing power + /// levels. + async fn new_remote_with_power_levels( room_event_cache: &RoomEventCache, power_levels: &Option<(&UserId, RoomPowerLevels)>, ) -> Self { room_event_cache - .rfind_map_event_in_memory_by(|event| find_and_map(event, power_levels)) + .rfind_map_event_in_memory_by(|event| find_and_map_timeline_event(event, power_levels)) .await + .map(Self::Remote) .unwrap_or_default() } -} -fn find_and_map( - event: &Event, - power_levels: &Option<(&UserId, RoomPowerLevels)>, -) -> Option { - // Cast the event into an `AnySyncTimelineEvent`. If deserializing fails, we - // ignore the event. - let Some(event) = event.raw().deserialize().ok() else { - warn!(?event, "Failed to deserialize the event when looking for a suitable latest event"); + /// Create a new [`LatestEventValue::LocalIsSending`] or + /// [`LatestEventValue::LocalIsWedged`]. + async fn new_local( + send_queue_update: &RoomSendQueueUpdate, + buffer_of_values_for_local_events: &mut LatestEventValuesForLocalEvents, + room_event_cache: &RoomEventCache, + power_levels: &Option<(&UserId, RoomPowerLevels)>, + ) -> Self { + use crate::send_queue::{LocalEcho, LocalEchoContent}; + + match send_queue_update { + // A new local event is being sent. + // + // Let's create the `LatestEventValue` and push it in the buffer of values. + RoomSendQueueUpdate::NewLocalEvent(LocalEcho { + transaction_id, + content: local_echo_content, + }) => match local_echo_content { + LocalEchoContent::Event { serialized_event: content, .. } => { + if let Ok(content) = content.deserialize() { + if let Some(kind) = find_and_map_any_message_like_event_content(content) { + let value = Self::LocalIsSending(kind); + + buffer_of_values_for_local_events + .push(transaction_id.to_owned(), value.clone()); + + value + } else { + Self::None + } + } else { + Self::None + } + } - return None; - }; + LocalEchoContent::React { .. } => Self::None, + }, - match event { - AnySyncTimelineEvent::MessageLike(message_like_event) => match message_like_event { - AnySyncMessageLikeEvent::RoomMessage(message) => { - if let Some(original_message) = message.as_original() { - // Don't show incoming verification requests. - if let MessageType::VerificationRequest(_) = original_message.content.msgtype { - return None; - } + // A local event has been cancelled before being sent. + // + // Remove the calculated `LatestEventValue` from the buffer of values, and return the + // last `LatestEventValue` or calculate a new one. + RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => { + if let Some(position) = buffer_of_values_for_local_events.position(transaction_id) { + buffer_of_values_for_local_events.remove(position); + } - // Check if this is a replacement for another message. If it is, ignore it. - let is_replacement = - original_message.content.relates_to.as_ref().is_some_and(|relates_to| { - if let Some(relation_type) = relates_to.rel_type() { - relation_type == RelationType::Replacement - } else { - false - } - }); + Self::new_local_or_remote( + buffer_of_values_for_local_events, + room_event_cache, + power_levels, + ) + .await + } + + // A local event has successfully been sent! + // + // Unwedge all wedged values after the one matching `transaction_id`. Indeed, if + // an event has been sent, it means the send queue is working, so if any value has been + // marked as wedged, it must be marked as unwedged. Then, remove the calculated + // `LatestEventValue` from the buffer of values. Finally, return the last + // `LatestEventValue` or calculate a new one. + RoomSendQueueUpdate::SentEvent { transaction_id, .. } => { + let position = buffer_of_values_for_local_events.unwedged_after(transaction_id); + + if let Some(position) = position { + buffer_of_values_for_local_events.remove(position); + } + + Self::new_local_or_remote( + buffer_of_values_for_local_events, + room_event_cache, + power_levels, + ) + .await + } - if is_replacement { - None + // A local event has been replaced by another one. + // + // Replace the latest event value matching `transaction_id` in the buffer if it exists + // (note: it should!), and return the last `LatestEventValue` or calculate a new one. + RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content: content } => { + if let Some(position) = buffer_of_values_for_local_events.position(transaction_id) { + if let Ok(content) = content.deserialize() { + if let Some(kind) = find_and_map_any_message_like_event_content(content) { + buffer_of_values_for_local_events.replace_kind(position, kind); + } } else { - Some(LatestEventValue::RoomMessage(message)) + return Self::None; } - } else { - Some(LatestEventValue::RoomMessage(message)) + } + + Self::new_local_or_remote( + buffer_of_values_for_local_events, + room_event_cache, + power_levels, + ) + .await + } + + // An error has occurred. + // + // Mark the latest event value matching `transaction_id`, and all its following values, + // as wedged. + RoomSendQueueUpdate::SendError { transaction_id, .. } => { + buffer_of_values_for_local_events.wedged_from(transaction_id); + + Self::new_local_or_remote( + buffer_of_values_for_local_events, + room_event_cache, + power_levels, + ) + .await + } + + // A local event has been unwedged and sending is being retried. + // + // Mark the latest event value matching `transaction_id`, and all its following values, + // as unwedged. + RoomSendQueueUpdate::RetryEvent { transaction_id } => { + buffer_of_values_for_local_events.unwedged_from(transaction_id); + + Self::new_local_or_remote( + buffer_of_values_for_local_events, + room_event_cache, + power_levels, + ) + .await + } + + // A media upload has made progress. + // + // Nothing to do here. + RoomSendQueueUpdate::MediaUpload { .. } => Self::None, + } + } + + /// Get the last [`LatestEventValue`] from the local latest event values if + /// any, or create a new [`LatestEventValue`] from the remote events. + /// + /// If the buffer of latest event values is not empty, let's return the last + /// one. Otherwise, it means we no longer have any local event: let's + /// fallback on remote event! + async fn new_local_or_remote( + buffer_of_values_for_local_events: &mut LatestEventValuesForLocalEvents, + room_event_cache: &RoomEventCache, + power_levels: &Option<(&UserId, RoomPowerLevels)>, + ) -> Self { + if let Some(value) = buffer_of_values_for_local_events.last() { + value.clone() + } else { + Self::new_remote_with_power_levels(room_event_cache, power_levels).await + } + } +} + +/// A buffer of the current [`LatestEventValue`] computed for local events +/// seen by the send queue. It is used by +/// [`LatestEvent::buffer_of_values_for_local_events`]. +/// +/// The system does only receive [`RoomSendQueueUpdate`]s. It's not designed to +/// iterate over local events in the send queue when a local event is changed +/// (cancelled, or updated for example). That's why we keep our own buffer here. +/// Imagine the system receives 4 [`RoomSendQueueUpdate`]: +/// +/// 1. [`RoomSendQueueUpdate::NewLocalEvent`]: new local event, +/// 2. [`RoomSendQueueUpdate::NewLocalEvent`]: new local event, +/// 3. [`RoomSendQueueUpdate::ReplacedLocalEvent`]: replaced the first local +/// event, +/// 4. [`RoomSendQueueUpdate::CancelledLocalEvent`]: cancelled the second local +/// event. +/// +/// `NewLocalEvent`s will trigger the computation of new +/// `LatestEventValue`s, but `CancelledLocalEvent` for example doesn't hold +/// any information to compute a new `LatestEventValue`, so we need to +/// remember the previous values, until the local events are sent and +/// removed from this buffer. +/// +/// Another reason why we need a buffer is to handle wedged local event. Imagine +/// the system receives 3 [`RoomSendQueueUpdate`]: +/// +/// 1. [`RoomSendQueueUpdate::NewLocalEvent`]: new local event, +/// 2. [`RoomSendQueueUpdate::NewLocalEvent`]: new local event, +/// 3. [`RoomSendQueueUpdate::SendError`]: the first local event has failed to +/// be sent. +/// +/// Because a `SendError` is received (targeting the first `NewLocalEvent`), the +/// send queue is stopped. However, the `LatestEventValue` targets the second +/// `NewLocalEvent`. The system must consider that when a local event is wedged, +/// all the following local events must also be marked as wedged. And vice +/// versa, when the send queue is able to send an event again, all the following +/// local events must be marked as unwedged. +/// +/// This type isolates a couple of methods designed to manage these specific +/// behaviours. +#[derive(Debug)] +struct LatestEventValuesForLocalEvents { + buffer: Vec<(OwnedTransactionId, LatestEventValue)>, +} + +impl LatestEventValuesForLocalEvents { + /// Create a new [`LatestEventValuesForLocalEvents`]. + fn new() -> Self { + Self { buffer: Vec::with_capacity(2) } + } + + /// Get the last [`LatestEventValue`]. + fn last(&self) -> Option<&LatestEventValue> { + self.buffer.last().map(|(_, value)| value) + } + + /// Find the position of the [`LatestEventValue`] matching `transaction_id`. + fn position(&self, transaction_id: &TransactionId) -> Option { + self.buffer + .iter() + .position(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate) + } + + /// Push a new [`LatestEventValue`]. + /// + /// # Panics + /// + /// Panics if `value` is not of kind [`LatestEventValue::LocalIsSending`] or + /// [`LatestEventValue::LocalIsWedged`]. + fn push(&mut self, transaction_id: OwnedTransactionId, value: LatestEventValue) { + assert!( + matches!( + value, + LatestEventValue::LocalIsSending(_) | LatestEventValue::LocalIsWedged(_) + ), + "`value` must be either `LocalIsSending` or `LocalIsWedged`" + ); + + self.buffer.push((transaction_id, value)); + } + + /// Replace the [`LatestEventKind`] of the [`LatestEventValue`] at position + /// `position`. + /// + /// # Panics + /// + /// Panics if: + /// - `position` is strictly greater than buffer's length, + /// - the [`LatestEventValue`] is not of kind + /// [`LatestEventValue::LocalIsSending`] or + /// [`LatestEventValue::LocalIsWedged`]. + fn replace_kind(&mut self, position: usize, new_kind: LatestEventKind) { + let (_, value) = self.buffer.get_mut(position).expect("`position` must be valid"); + + match value { + LatestEventValue::LocalIsSending(kind) => *kind = new_kind, + LatestEventValue::LocalIsWedged(kind) => *kind = new_kind, + _ => panic!("`value` must be either `LocalIsSending` or `LocalIsWedged`"), + } + } + + /// Remove the [`LatestEventValue`] at position `position`. + /// + /// # Panics + /// + /// Panics if `position` is strictly greater than buffer's length. + fn remove(&mut self, position: usize) -> (OwnedTransactionId, LatestEventValue) { + self.buffer.remove(position) + } + + /// Mark the `LatestEventValue` matching `transaction_id`, and all the + /// following values, as wedged. + fn wedged_from(&mut self, transaction_id: &TransactionId) { + let mut values = self.buffer.iter_mut(); + + if let Some(first_value_to_wedge) = values + .by_ref() + .find(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate) + { + // Iterate over the found value and the following ones. + for (_, value_to_wedge) in once(first_value_to_wedge).chain(values) { + if let LatestEventValue::LocalIsSending(kind) = value_to_wedge { + *value_to_wedge = LatestEventValue::LocalIsWedged(kind.clone()); } } + } + } - AnySyncMessageLikeEvent::UnstablePollStart(poll) => Some(LatestEventValue::Poll(poll)), + /// Mark the `LatestEventValue` matching `transaction_id`, and all the + /// following values, as unwedged. + fn unwedged_from(&mut self, transaction_id: &TransactionId) { + let mut values = self.buffer.iter_mut(); - AnySyncMessageLikeEvent::CallInvite(invite) => { - Some(LatestEventValue::CallInvite(invite)) + if let Some(first_value_to_unwedge) = values + .by_ref() + .find(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate) + { + // Iterate over the found value and the following ones. + for (_, value_to_unwedge) in once(first_value_to_unwedge).chain(values) { + if let LatestEventValue::LocalIsWedged(kind) = value_to_unwedge { + *value_to_unwedge = LatestEventValue::LocalIsSending(kind.clone()); + } } + } + } - AnySyncMessageLikeEvent::CallNotify(notify) => { - Some(LatestEventValue::CallNotify(notify)) + /// Mark all the following values after the `LatestEventValue` matching + /// `transaction_id` as unwedged. + /// + /// Note that contrary to [`Self::unwedged_from`], the `LatestEventValue` is + /// untouched. However, its position is returned (if any). + fn unwedged_after(&mut self, transaction_id: &TransactionId) -> Option { + let mut values = self.buffer.iter_mut(); + + if let Some(position) = values + .by_ref() + .position(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate) + { + // Iterate over all values after the found one. + for (_, value_to_unwedge) in values { + if let LatestEventValue::LocalIsWedged(kind) = value_to_unwedge { + *value_to_unwedge = LatestEventValue::LocalIsSending(kind.clone()); + } } - AnySyncMessageLikeEvent::Sticker(sticker) => Some(LatestEventValue::Sticker(sticker)), + Some(position) + } else { + None + } + } +} - // Encrypted events are not suitable. - AnySyncMessageLikeEvent::RoomEncrypted(_) => None, +/// A latest event value! +#[derive(Debug, Clone)] +pub enum LatestEventKind { + /// A `m.room.message` event. + RoomMessage(RoomMessageEventContent), - // Everything else is considered not suitable. - _ => None, - }, + /// A `m.sticker` event. + Sticker(StickerEventContent), - // We don't currently support most state events + /// An `org.matrix.msc3381.poll.start` event. + Poll(UnstablePollStartEventContent), + + /// A `m.call.invite` event. + CallInvite(CallInviteEventContent), + + /// A `m.call.notify` event. + CallNotify(CallNotifyEventContent), + + /// A `m.room.member` event, more precisely a knock membership change that + /// can be handled by the current user. + KnockedStateEvent(RoomMemberEventContent), + + /// A redacted event. + Redacted(AnySyncMessageLikeEvent), +} + +fn find_and_map_timeline_event( + event: &Event, + power_levels: &Option<(&UserId, RoomPowerLevels)>, +) -> Option { + // Cast the event into an `AnySyncTimelineEvent`. If deserializing fails, we + // ignore the event. + let Some(event) = event.raw().deserialize().ok() else { + warn!(?event, "Failed to deserialize the event when looking for a suitable latest event"); + + return None; + }; + + match event { + AnySyncTimelineEvent::MessageLike(message_like_event) => { + match message_like_event.original_content() { + Some(any_message_like_event_content) => { + find_and_map_any_message_like_event_content(any_message_like_event_content) + } + + // The event has been redacted. + None => Some(LatestEventKind::Redacted(message_like_event)), + } + } + + // We don't currently support most state events… AnySyncTimelineEvent::State(state) => { - // But we make an exception for knocked state events *if* the current user + // … but we make an exception for knocked state events _if_ the current user // can either accept or decline them. if let AnySyncStateEvent::RoomMember(member) = state { if matches!(member.membership(), MembershipState::Knock) { @@ -225,7 +565,14 @@ fn find_and_map( // The current user can act on the knock changes, so they should be // displayed if can_accept_or_decline_knocks { - return Some(LatestEventValue::KnockedStateEvent(member)); + return Some(LatestEventKind::KnockedStateEvent(match member { + SyncStateEvent::Original(member) => member.content, + SyncStateEvent::Redacted(_) => { + // Cannot decide if the user can accept or decline knocks because + // the event has been redacted. + return None; + } + })); } } } @@ -235,18 +582,58 @@ fn find_and_map( } } +fn find_and_map_any_message_like_event_content( + event: AnyMessageLikeEventContent, +) -> Option { + match event { + AnyMessageLikeEventContent::RoomMessage(message) => { + // Don't show incoming verification requests. + if let MessageType::VerificationRequest(_) = message.msgtype { + return None; + } + + // Check if this is a replacement for another message. If it is, ignore + // it. + let is_replacement = message.relates_to.as_ref().is_some_and(|relates_to| { + if let Some(relation_type) = relates_to.rel_type() { + relation_type == RelationType::Replacement + } else { + false + } + }); + + if is_replacement { + None + } else { + Some(LatestEventKind::RoomMessage(message)) + } + } + + AnyMessageLikeEventContent::UnstablePollStart(poll) => Some(LatestEventKind::Poll(poll)), + + AnyMessageLikeEventContent::CallInvite(invite) => Some(LatestEventKind::CallInvite(invite)), + + AnyMessageLikeEventContent::CallNotify(notify) => Some(LatestEventKind::CallNotify(notify)), + + AnyMessageLikeEventContent::Sticker(sticker) => Some(LatestEventKind::Sticker(sticker)), + + // Encrypted events are not suitable. + AnyMessageLikeEventContent::RoomEncrypted(_) => None, + + // Everything else is considered not suitable. + _ => None, + } +} + #[cfg(test)] -mod tests { +mod tests_latest_event_kind { use assert_matches::assert_matches; use matrix_sdk_test::event_factory::EventFactory; - use ruma::{ - event_id, events::room::power_levels::RoomPowerLevelsSource, - room_version_rules::AuthorizationRules, user_id, - }; + use ruma::{event_id, user_id}; - use super::{find_and_map, LatestEventValue}; + use super::{find_and_map_timeline_event, LatestEventKind, RoomMessageEventContent}; - macro_rules! assert_latest_event_value { + macro_rules! assert_latest_event_kind { ( with | $event_factory:ident | $event_builder:block it produces $match:pat ) => { let user_id = user_id!("@mnt_io:matrix.org"); @@ -256,23 +643,23 @@ mod tests { $event_builder }; - assert_matches!(find_and_map(&event, &None), $match); + assert_matches!(find_and_map_timeline_event(&event, &None), $match); }; } #[test] - fn test_latest_event_value_room_message() { - assert_latest_event_value!( + fn test_room_message() { + assert_latest_event_kind!( with |event_factory| { event_factory.text_msg("hello").into_event() } - it produces Some(LatestEventValue::RoomMessage(_)) + it produces Some(LatestEventKind::RoomMessage(_)) ); } #[test] - fn test_latest_event_value_room_message_redacted() { - assert_latest_event_value!( + fn test_redacted() { + assert_latest_event_kind!( with |event_factory| { event_factory .redacted( @@ -281,19 +668,19 @@ mod tests { ) .into_event() } - it produces Some(LatestEventValue::RoomMessage(_)) + it produces Some(LatestEventKind::Redacted(_)) ); } #[test] - fn test_latest_event_value_room_message_replacement() { - assert_latest_event_value!( + fn test_room_message_replacement() { + assert_latest_event_kind!( with |event_factory| { event_factory .text_msg("bonjour") .edit( event_id!("$ev0"), - ruma::events::room::message::RoomMessageEventContent::text_plain("hello").into() + RoomMessageEventContent::text_plain("hello").into() ) .into_event() } @@ -302,8 +689,8 @@ mod tests { } #[test] - fn test_latest_event_value_poll() { - assert_latest_event_value!( + fn test_poll() { + assert_latest_event_kind!( with |event_factory| { event_factory .poll_start( @@ -313,13 +700,13 @@ mod tests { ) .into_event() } - it produces Some(LatestEventValue::Poll(_)) + it produces Some(LatestEventKind::Poll(_)) ); } #[test] - fn test_latest_event_value_call_invite() { - assert_latest_event_value!( + fn test_call_invite() { + assert_latest_event_kind!( with |event_factory| { event_factory .call_invite( @@ -330,13 +717,13 @@ mod tests { ) .into_event() } - it produces Some(LatestEventValue::CallInvite(_)) + it produces Some(LatestEventKind::CallInvite(_)) ); } #[test] - fn test_latest_event_value_call_notify() { - assert_latest_event_value!( + fn test_call_notify() { + assert_latest_event_kind!( with |event_factory| { event_factory .call_notify( @@ -347,13 +734,13 @@ mod tests { ) .into_event() } - it produces Some(LatestEventValue::CallNotify(_)) + it produces Some(LatestEventKind::CallNotify(_)) ); } #[test] - fn test_latest_event_value_sticker() { - assert_latest_event_value!( + fn test_sticker() { + assert_latest_event_kind!( with |event_factory| { event_factory .sticker( @@ -363,13 +750,13 @@ mod tests { ) .into_event() } - it produces Some(LatestEventValue::Sticker(_)) + it produces Some(LatestEventKind::Sticker(_)) ); } #[test] - fn test_latest_event_value_encrypted_room_message() { - assert_latest_event_value!( + fn test_encrypted_room_message() { + assert_latest_event_kind!( with |event_factory| { event_factory .event(ruma::events::room::encrypted::RoomEncryptedEventContent::new( @@ -391,9 +778,9 @@ mod tests { } #[test] - fn test_latest_event_value_reaction() { + fn test_reaction() { // Take a random message-like event. - assert_latest_event_value!( + assert_latest_event_kind!( with |event_factory| { event_factory .reaction(event_id!("$ev0"), "+1") @@ -404,8 +791,8 @@ mod tests { } #[test] - fn test_latest_event_state_event() { - assert_latest_event_value!( + fn test_state_event() { + assert_latest_event_kind!( with |event_factory| { event_factory .room_topic("new room topic") @@ -416,8 +803,8 @@ mod tests { } #[test] - fn test_latest_event_knocked_state_event_without_power_levels() { - assert_latest_event_value!( + fn test_knocked_state_event_without_power_levels() { + assert_latest_event_kind!( with |event_factory| { event_factory .member(user_id!("@other_mnt_io:server.name")) @@ -429,16 +816,20 @@ mod tests { } #[test] - fn test_latest_event_knocked_state_event_with_power_levels() { - use ruma::events::room::power_levels::RoomPowerLevels; + fn test_knocked_state_event_with_power_levels() { + use ruma::{ + events::room::{ + member::MembershipState, + power_levels::{RoomPowerLevels, RoomPowerLevelsSource}, + }, + room_version_rules::AuthorizationRules, + }; let user_id = user_id!("@mnt_io:matrix.org"); let other_user_id = user_id!("@other_mnt_io:server.name"); let event_factory = EventFactory::new().sender(user_id); - let event = event_factory - .member(other_user_id) - .membership(ruma::events::room::member::MembershipState::Knock) - .into_event(); + let event = + event_factory.member(other_user_id).membership(MembershipState::Knock).into_event(); let mut room_power_levels = RoomPowerLevels::new(RoomPowerLevelsSource::None, &AuthorizationRules::V1, []); @@ -450,7 +841,7 @@ mod tests { room_power_levels.invite = 10.into(); room_power_levels.kick = 10.into(); assert_matches!( - find_and_map(&event, &Some((user_id, room_power_levels))), + find_and_map_timeline_event(&event, &Some((user_id, room_power_levels))), None, "cannot accept, cannot decline", ); @@ -462,8 +853,8 @@ mod tests { room_power_levels.invite = 0.into(); room_power_levels.kick = 10.into(); assert_matches!( - find_and_map(&event, &Some((user_id, room_power_levels))), - Some(LatestEventValue::KnockedStateEvent(_)), + find_and_map_timeline_event(&event, &Some((user_id, room_power_levels))), + Some(LatestEventKind::KnockedStateEvent(_)), "can accept, cannot decline", ); } @@ -474,8 +865,8 @@ mod tests { room_power_levels.invite = 10.into(); room_power_levels.kick = 0.into(); assert_matches!( - find_and_map(&event, &Some((user_id, room_power_levels))), - Some(LatestEventValue::KnockedStateEvent(_)), + find_and_map_timeline_event(&event, &Some((user_id, room_power_levels))), + Some(LatestEventKind::KnockedStateEvent(_)), "cannot accept, can decline", ); } @@ -485,25 +876,27 @@ mod tests { room_power_levels.invite = 0.into(); room_power_levels.kick = 0.into(); assert_matches!( - find_and_map(&event, &Some((user_id, room_power_levels))), - Some(LatestEventValue::KnockedStateEvent(_)), + find_and_map_timeline_event(&event, &Some((user_id, room_power_levels))), + Some(LatestEventKind::KnockedStateEvent(_)), "can accept, can decline", ); } } #[test] - fn test_latest_event_value_room_message_verification_request() { - assert_latest_event_value!( + fn test_room_message_verification_request() { + use ruma::{events::room::message, OwnedDeviceId}; + + assert_latest_event_kind!( with |event_factory| { event_factory .event( - ruma::events::room::message::RoomMessageEventContent::new( - ruma::events::room::message::MessageType::VerificationRequest( - ruma::events::room::message::KeyVerificationRequestEventContent::new( + RoomMessageEventContent::new( + message::MessageType::VerificationRequest( + message::KeyVerificationRequestEventContent::new( "body".to_owned(), vec![], - ruma::OwnedDeviceId::from("device_id"), + OwnedDeviceId::from("device_id"), user_id!("@user:server.name").to_owned(), ) ) @@ -516,86 +909,1015 @@ mod tests { } } -#[cfg(all(not(target_family = "wasm"), test))] -mod tests_non_wasm { +#[cfg(test)] +mod tests_latest_event_values_for_local_events { use assert_matches::assert_matches; - use matrix_sdk_test::{async_test, event_factory::EventFactory}; - use ruma::{event_id, room_id, user_id}; + use ruma::OwnedTransactionId; - use super::LatestEventValue; - use crate::test_utils::mocks::MatrixMockServer; + use super::{ + LatestEventKind, LatestEventValue, LatestEventValuesForLocalEvents, RoomMessageEventContent, + }; - #[async_test] - async fn test_latest_event_value_is_scanning_event_backwards_from_event_cache() { - use matrix_sdk_base::{ - linked_chunk::{ChunkIdentifier, Position, Update}, - RoomState, - }; + fn room_message(body: &str) -> LatestEventKind { + LatestEventKind::RoomMessage(RoomMessageEventContent::text_plain(body)) + } - use crate::{client::WeakClient, room::WeakRoom}; + #[test] + fn test_last() { + let mut buffer = LatestEventValuesForLocalEvents::new(); - let room_id = room_id!("!r0"); - let user_id = user_id!("@mnt_io:matrix.org"); - let event_factory = EventFactory::new().sender(user_id).room(room_id); - let event_id_0 = event_id!("$ev0"); - let event_id_1 = event_id!("$ev1"); - let event_id_2 = event_id!("$ev2"); + assert!(buffer.last().is_none()); - let server = MatrixMockServer::new().await; - let client = server.client_builder().build().await; + buffer.push( + OwnedTransactionId::from("txnid"), + LatestEventValue::LocalIsSending(room_message("tome")), + ); - // Prelude. - { - // Create the room. - client.base_client().get_or_create_room(room_id, RoomState::Joined); + assert_matches!( + buffer.last(), + Some(LatestEventValue::LocalIsSending(LatestEventKind::RoomMessage(_))) + ); + } - // Initialise the event cache store. - client - .event_cache_store() - .lock() - .await - .unwrap() - .handle_linked_chunk_updates( - matrix_sdk_base::linked_chunk::LinkedChunkId::Room(room_id), - vec![ - Update::NewItemsChunk { - previous: None, - new: ChunkIdentifier::new(0), - next: None, - }, - Update::PushItems { - at: Position::new(ChunkIdentifier::new(0), 0), - items: vec![ - // a latest event candidate - event_factory.text_msg("hello").event_id(event_id_0).into(), - // a latest event candidate - event_factory.text_msg("world").event_id(event_id_1).into(), - // not a latest event candidate - event_factory - .room_topic("new room topic") - .event_id(event_id_2) - .into(), - ], - }, - ], - ) - .await - .unwrap(); - } + #[test] + fn test_position() { + let mut buffer = LatestEventValuesForLocalEvents::new(); + let transaction_id = OwnedTransactionId::from("txnid"); - let event_cache = client.event_cache(); - event_cache.subscribe().unwrap(); + assert!(buffer.position(&transaction_id).is_none()); - let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap(); - let weak_room = WeakRoom::new(WeakClient::from_client(&client), room_id.to_owned()); + buffer.push( + transaction_id.clone(), + LatestEventValue::LocalIsSending(room_message("raclette")), + ); + buffer.push( + OwnedTransactionId::from("othertxnid"), + LatestEventValue::LocalIsSending(room_message("tome")), + ); - assert_matches!( - LatestEventValue::new(room_id, None, &room_event_cache, &weak_room).await, - LatestEventValue::RoomMessage(given_event) => { - // We get `event_id_1` because `event_id_2` isn't a candidate, - // and `event_id_0` hasn't been read yet (because events are - // read backwards). - assert_eq!(given_event.event_id(), event_id_1); + assert_eq!(buffer.position(&transaction_id), Some(0)); + } + + #[test] + #[should_panic] + fn test_push_none() { + let mut buffer = LatestEventValuesForLocalEvents::new(); + + buffer.push(OwnedTransactionId::from("txnid"), LatestEventValue::None); + } + + #[test] + #[should_panic] + fn test_push_remote() { + let mut buffer = LatestEventValuesForLocalEvents::new(); + + buffer.push( + OwnedTransactionId::from("txnid"), + LatestEventValue::Remote(room_message("tome")), + ); + } + + #[test] + fn test_push_local() { + let mut buffer = LatestEventValuesForLocalEvents::new(); + + buffer.push( + OwnedTransactionId::from("txnid0"), + LatestEventValue::LocalIsSending(room_message("tome")), + ); + buffer.push( + OwnedTransactionId::from("txnid1"), + LatestEventValue::LocalIsWedged(room_message("raclette")), + ); + + // no panic. + } + + #[test] + fn test_replace_kind() { + let mut buffer = LatestEventValuesForLocalEvents::new(); + + buffer.push( + OwnedTransactionId::from("txnid0"), + LatestEventValue::LocalIsSending(room_message("gruyère")), + ); + + buffer.replace_kind(0, room_message("comté")); + + assert_matches!( + buffer.last(), + Some(LatestEventValue::LocalIsSending(LatestEventKind::RoomMessage(content))) => { + assert_eq!(content.body(), "comté"); + } + ); + } + + #[test] + fn test_remove() { + let mut buffer = LatestEventValuesForLocalEvents::new(); + + buffer.push( + OwnedTransactionId::from("txnid"), + LatestEventValue::LocalIsSending(room_message("gryuère")), + ); + + assert!(buffer.last().is_some()); + + buffer.remove(0); + + assert!(buffer.last().is_none()); + } + + #[test] + fn test_wedged_from() { + let mut buffer = LatestEventValuesForLocalEvents::new(); + let transaction_id_0 = OwnedTransactionId::from("txnid0"); + let transaction_id_1 = OwnedTransactionId::from("txnid1"); + let transaction_id_2 = OwnedTransactionId::from("txnid2"); + + buffer.push(transaction_id_0, LatestEventValue::LocalIsSending(room_message("gruyère"))); + buffer.push( + transaction_id_1.clone(), + LatestEventValue::LocalIsSending(room_message("brigand")), + ); + buffer.push(transaction_id_2, LatestEventValue::LocalIsSending(room_message("raclette"))); + + buffer.wedged_from(&transaction_id_1); + + assert_eq!(buffer.buffer.len(), 3); + assert_matches!(buffer.buffer[0].1, LatestEventValue::LocalIsSending(_)); + assert_matches!(buffer.buffer[1].1, LatestEventValue::LocalIsWedged(_)); + assert_matches!(buffer.buffer[2].1, LatestEventValue::LocalIsWedged(_)); + } + + #[test] + fn test_unwedged_from() { + let mut buffer = LatestEventValuesForLocalEvents::new(); + let transaction_id_0 = OwnedTransactionId::from("txnid0"); + let transaction_id_1 = OwnedTransactionId::from("txnid1"); + let transaction_id_2 = OwnedTransactionId::from("txnid2"); + + buffer.push(transaction_id_0, LatestEventValue::LocalIsWedged(room_message("gruyère"))); + buffer.push( + transaction_id_1.clone(), + LatestEventValue::LocalIsWedged(room_message("brigand")), + ); + buffer.push(transaction_id_2, LatestEventValue::LocalIsWedged(room_message("raclette"))); + + buffer.unwedged_from(&transaction_id_1); + + assert_eq!(buffer.buffer.len(), 3); + assert_matches!(buffer.buffer[0].1, LatestEventValue::LocalIsWedged(_)); + assert_matches!(buffer.buffer[1].1, LatestEventValue::LocalIsSending(_)); + assert_matches!(buffer.buffer[2].1, LatestEventValue::LocalIsSending(_)); + } + + #[test] + fn test_unwedged_after() { + let mut buffer = LatestEventValuesForLocalEvents::new(); + let transaction_id_0 = OwnedTransactionId::from("txnid0"); + let transaction_id_1 = OwnedTransactionId::from("txnid1"); + let transaction_id_2 = OwnedTransactionId::from("txnid2"); + + buffer.push(transaction_id_0, LatestEventValue::LocalIsWedged(room_message("gruyère"))); + buffer.push( + transaction_id_1.clone(), + LatestEventValue::LocalIsWedged(room_message("brigand")), + ); + buffer.push(transaction_id_2, LatestEventValue::LocalIsWedged(room_message("raclette"))); + + buffer.unwedged_after(&transaction_id_1); + + assert_eq!(buffer.buffer.len(), 3); + assert_matches!(buffer.buffer[0].1, LatestEventValue::LocalIsWedged(_)); + assert_matches!(buffer.buffer[1].1, LatestEventValue::LocalIsWedged(_)); + assert_matches!(buffer.buffer[2].1, LatestEventValue::LocalIsSending(_)); + } +} + +#[cfg(all(not(target_family = "wasm"), test))] +mod tests_latest_event_value_non_wasm { + use std::sync::Arc; + + use assert_matches::assert_matches; + use matrix_sdk_base::{ + linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update}, + store::SerializableEventContent, + RoomState, + }; + use matrix_sdk_test::{async_test, event_factory::EventFactory}; + use ruma::{ + event_id, + events::{room::message::RoomMessageEventContent, AnyMessageLikeEventContent}, + room_id, user_id, MilliSecondsSinceUnixEpoch, OwnedRoomId, OwnedTransactionId, + }; + + use super::{ + LatestEvent, LatestEventKind, LatestEventValue, LatestEventValuesForLocalEvents, + RoomEventCache, RoomSendQueueUpdate, + }; + use crate::{ + client::WeakClient, + room::WeakRoom, + send_queue::{AbstractProgress, LocalEcho, LocalEchoContent, RoomSendQueue, SendHandle}, + test_utils::mocks::MatrixMockServer, + Client, Error, + }; + + #[async_test] + async fn test_update_ignores_none_value() { + let room_id = room_id!("!r0"); + + let server = MatrixMockServer::new().await; + let client = server.client_builder().build().await; + let weak_client = WeakClient::from_client(&client); + + // Create the room. + client.base_client().get_or_create_room(room_id, RoomState::Joined); + let weak_room = WeakRoom::new(weak_client, room_id.to_owned()); + + // Get a `RoomEventCache`. + let event_cache = client.event_cache(); + event_cache.subscribe().unwrap(); + + let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap(); + + let mut latest_event = LatestEvent::new(room_id, None, &room_event_cache, &weak_room).await; + + // First off, check the default value is `None`! + assert_matches!(latest_event.current_value.get().await, LatestEventValue::None); + + // Second, set a new value. + latest_event + .update(LatestEventValue::LocalIsSending(LatestEventKind::RoomMessage( + RoomMessageEventContent::text_plain("foo"), + ))) + .await; + + assert_matches!( + latest_event.current_value.get().await, + LatestEventValue::LocalIsSending(_) + ); + + // Finally, set a new `None` value. It must be ignored. + latest_event.update(LatestEventValue::None).await; + + assert_matches!( + latest_event.current_value.get().await, + LatestEventValue::LocalIsSending(_) + ); + } + + #[async_test] + async fn test_remote_is_scanning_event_backwards_from_event_cache() { + let room_id = room_id!("!r0"); + let user_id = user_id!("@mnt_io:matrix.org"); + let event_factory = EventFactory::new().sender(user_id).room(room_id); + let event_id_0 = event_id!("$ev0"); + let event_id_1 = event_id!("$ev1"); + let event_id_2 = event_id!("$ev2"); + + let server = MatrixMockServer::new().await; + let client = server.client_builder().build().await; + + // Prelude. + { + // Create the room. + client.base_client().get_or_create_room(room_id, RoomState::Joined); + + // Initialise the event cache store. + client + .event_cache_store() + .lock() + .await + .unwrap() + .handle_linked_chunk_updates( + LinkedChunkId::Room(room_id), + vec![ + Update::NewItemsChunk { + previous: None, + new: ChunkIdentifier::new(0), + next: None, + }, + Update::PushItems { + at: Position::new(ChunkIdentifier::new(0), 0), + items: vec![ + // a latest event candidate + event_factory.text_msg("hello").event_id(event_id_0).into(), + // a latest event candidate + event_factory.text_msg("world").event_id(event_id_1).into(), + // not a latest event candidate + event_factory + .room_topic("new room topic") + .event_id(event_id_2) + .into(), + ], + }, + ], + ) + .await + .unwrap(); + } + + let event_cache = client.event_cache(); + event_cache.subscribe().unwrap(); + + let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap(); + let weak_room = WeakRoom::new(WeakClient::from_client(&client), room_id.to_owned()); + + assert_matches!( + LatestEventValue::new_remote(&room_event_cache, &weak_room).await, + LatestEventValue::Remote(LatestEventKind::RoomMessage(message_content)) => { + // We get `event_id_1` because `event_id_2` isn't a candidate, + // and `event_id_0` hasn't been read yet (because events are + // read backwards). + assert_eq!(message_content.body(), "world"); + } + ); + } + + async fn local_prelude() -> (Client, OwnedRoomId, RoomSendQueue, RoomEventCache) { + let room_id = room_id!("!r0").to_owned(); + + let server = MatrixMockServer::new().await; + let client = server.client_builder().build().await; + client.base_client().get_or_create_room(&room_id, RoomState::Joined); + let room = client.get_room(&room_id).unwrap(); + + let event_cache = client.event_cache(); + event_cache.subscribe().unwrap(); + + let (room_event_cache, _) = event_cache.for_room(&room_id).await.unwrap(); + + let send_queue = client.send_queue(); + let room_send_queue = send_queue.for_room(room); + + (client, room_id, room_send_queue, room_event_cache) + } + + fn new_local_echo_content( + room_send_queue: &RoomSendQueue, + transaction_id: &OwnedTransactionId, + body: &str, + ) -> LocalEchoContent { + LocalEchoContent::Event { + serialized_event: SerializableEventContent::new( + &AnyMessageLikeEventContent::RoomMessage(RoomMessageEventContent::text_plain(body)), + ) + .unwrap(), + send_handle: SendHandle::new( + room_send_queue.clone(), + transaction_id.clone(), + MilliSecondsSinceUnixEpoch::now(), + ), + send_error: None, + } + } + + #[async_test] + async fn test_local_new_local_event() { + let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await; + + let mut buffer = LatestEventValuesForLocalEvents::new(); + + // Receiving one `NewLocalEvent`. + { + let transaction_id = OwnedTransactionId::from("txnid0"); + let content = new_local_echo_content(&room_send_queue, &transaction_id, "A"); + + let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho { transaction_id, content }); + + // The `LatestEventValue` matches the new local event. + assert_matches!( + LatestEventValue::new_local(&update, &mut buffer, &room_event_cache, &None).await, + LatestEventValue::LocalIsSending(LatestEventKind::RoomMessage(message_content)) => { + assert_eq!(message_content.body(), "A"); + } + ); + } + + // Receiving another `NewLocalEvent`, ensuring it's pushed back in the buffer. + { + let transaction_id = OwnedTransactionId::from("txnid1"); + let content = new_local_echo_content(&room_send_queue, &transaction_id, "B"); + + let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho { transaction_id, content }); + + // The `LatestEventValue` matches the new local event. + assert_matches!( + LatestEventValue::new_local(&update, &mut buffer, &room_event_cache, &None).await, + LatestEventValue::LocalIsSending( + LatestEventKind::RoomMessage(message_content) + ) => { + assert_eq!(message_content.body(), "B"); + } + ); + } + + assert_eq!(buffer.buffer.len(), 2); + assert_matches!( + &buffer.buffer[0].1, + LatestEventValue::LocalIsSending( + LatestEventKind::RoomMessage(message_content) + ) => { + assert_eq!(message_content.body(), "A"); + } + ); + assert_matches!( + &buffer.buffer[1].1, + LatestEventValue::LocalIsSending( + LatestEventKind::RoomMessage(message_content) + ) => { + assert_eq!(message_content.body(), "B"); + } + ); + } + + #[async_test] + async fn test_local_cancelled_local_event() { + let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await; + + let mut buffer = LatestEventValuesForLocalEvents::new(); + let transaction_id_0 = OwnedTransactionId::from("txnid0"); + let transaction_id_1 = OwnedTransactionId::from("txnid1"); + let transaction_id_2 = OwnedTransactionId::from("txnid2"); + + // Receiving three `NewLocalEvent`s. + { + for (transaction_id, body) in + [(&transaction_id_0, "A"), (&transaction_id_1, "B"), (&transaction_id_2, "C")] + { + let content = new_local_echo_content(&room_send_queue, transaction_id, body); + + let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho { + transaction_id: transaction_id.clone(), + content, + }); + + // The `LatestEventValue` matches the new local event. + assert_matches!( + LatestEventValue::new_local(&update, &mut buffer, &room_event_cache, &None).await, + LatestEventValue::LocalIsSending( + LatestEventKind::RoomMessage(message_content) + ) => { + assert_eq!(message_content.body(), body); + } + ); + } + + assert_eq!(buffer.buffer.len(), 3); + } + + // Receiving a `CancelledLocalEvent` targeting the second event. The + // `LatestEventValue` must not change. + { + let update = RoomSendQueueUpdate::CancelledLocalEvent { + transaction_id: transaction_id_1.clone(), + }; + + // The `LatestEventValue` hasn't changed, it still matches the latest local + // event. + assert_matches!( + LatestEventValue::new_local(&update, &mut buffer, &room_event_cache, &None).await, + LatestEventValue::LocalIsSending( + LatestEventKind::RoomMessage(message_content) + ) => { + assert_eq!(message_content.body(), "C"); + } + ); + + assert_eq!(buffer.buffer.len(), 2); + } + + // Receiving a `CancelledLocalEvent` targeting the second (so the last) event. + // The `LatestEventValue` must point to the first local event. + { + let update = RoomSendQueueUpdate::CancelledLocalEvent { + transaction_id: transaction_id_2.clone(), + }; + + // The `LatestEventValue` has changed, it matches the previous (so the first) + // local event. + assert_matches!( + LatestEventValue::new_local(&update, &mut buffer, &room_event_cache, &None).await, + LatestEventValue::LocalIsSending( + LatestEventKind::RoomMessage(message_content) + ) => { + assert_eq!(message_content.body(), "A"); + } + ); + + assert_eq!(buffer.buffer.len(), 1); + } + + // Receiving a `CancelledLocalEvent` targeting the first (so the last) event. + // The `LatestEventValue` cannot be computed from the send queue and will + // fallback to the event cache. The event cache is empty in this case, so we get + // nothing. + { + let update = + RoomSendQueueUpdate::CancelledLocalEvent { transaction_id: transaction_id_0 }; + + // The `LatestEventValue` has changed, it's empty! + assert_matches!( + LatestEventValue::new_local(&update, &mut buffer, &room_event_cache, &None).await, + LatestEventValue::None + ); + + assert!(buffer.buffer.is_empty()); + } + } + + #[async_test] + async fn test_local_sent_event() { + let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await; + + let mut buffer = LatestEventValuesForLocalEvents::new(); + let transaction_id_0 = OwnedTransactionId::from("txnid0"); + let transaction_id_1 = OwnedTransactionId::from("txnid1"); + + // Receiving two `NewLocalEvent`s. + { + for (transaction_id, body) in [(&transaction_id_0, "A"), (&transaction_id_1, "B")] { + let content = new_local_echo_content(&room_send_queue, transaction_id, body); + + let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho { + transaction_id: transaction_id.clone(), + content, + }); + + // The `LatestEventValue` matches the new local event. + assert_matches!( + LatestEventValue::new_local(&update, &mut buffer, &room_event_cache, &None).await, + LatestEventValue::LocalIsSending( + LatestEventKind::RoomMessage(message_content) + ) => { + assert_eq!(message_content.body(), body); + } + ); + } + + assert_eq!(buffer.buffer.len(), 2); + } + + // Receiving a `SentEvent` targeting the first event. The `LatestEventValue` + // must not change. + { + let update = RoomSendQueueUpdate::SentEvent { + transaction_id: transaction_id_0.clone(), + event_id: event_id!("$ev0").to_owned(), + }; + + // The `LatestEventValue` hasn't changed, it still matches the latest local + // event. + assert_matches!( + LatestEventValue::new_local(&update, &mut buffer, &room_event_cache, &None).await, + LatestEventValue::LocalIsSending( + LatestEventKind::RoomMessage(message_content) + ) => { + assert_eq!(message_content.body(), "B"); + } + ); + + assert_eq!(buffer.buffer.len(), 1); + } + + // Receiving a `SentEvent` targeting the first event. The `LaetstEvent` cannot + // be computed from the send queue and will fallback to the event cache. + // The event cache is empty in this case, so we get nothing. + { + let update = RoomSendQueueUpdate::SentEvent { + transaction_id: transaction_id_1, + event_id: event_id!("$ev1").to_owned(), + }; + + // The `LatestEventValue` has changed, it's empty! + assert_matches!( + LatestEventValue::new_local(&update, &mut buffer, &room_event_cache, &None).await, + LatestEventValue::None + ); + + assert!(buffer.buffer.is_empty()); + } + } + + #[async_test] + async fn test_local_replaced_local_event() { + let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await; + + let mut buffer = LatestEventValuesForLocalEvents::new(); + let transaction_id_0 = OwnedTransactionId::from("txnid0"); + let transaction_id_1 = OwnedTransactionId::from("txnid1"); + + // Receiving two `NewLocalEvent`s. + { + for (transaction_id, body) in [(&transaction_id_0, "A"), (&transaction_id_1, "B")] { + let content = new_local_echo_content(&room_send_queue, transaction_id, body); + + let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho { + transaction_id: transaction_id.clone(), + content, + }); + + // The `LatestEventValue` matches the new local event. + assert_matches!( + LatestEventValue::new_local(&update, &mut buffer, &room_event_cache, &None).await, + LatestEventValue::LocalIsSending( + LatestEventKind::RoomMessage(message_content) + ) => { + assert_eq!(message_content.body(), body); + } + ); + } + + assert_eq!(buffer.buffer.len(), 2); + } + + // Receiving a `ReplacedLocalEvent` targeting the first event. The + // `LatestEventValue` must not change. + { + let transaction_id = &transaction_id_0; + let LocalEchoContent::Event { serialized_event: new_content, .. } = + new_local_echo_content(&room_send_queue, transaction_id, "A.") + else { + panic!("oopsy"); + }; + + let update = RoomSendQueueUpdate::ReplacedLocalEvent { + transaction_id: transaction_id.clone(), + new_content, + }; + + // The `LatestEventValue` hasn't changed, it still matches the latest local + // event. + assert_matches!( + LatestEventValue::new_local(&update, &mut buffer, &room_event_cache, &None).await, + LatestEventValue::LocalIsSending( + LatestEventKind::RoomMessage(message_content) + ) => { + assert_eq!(message_content.body(), "B"); + } + ); + + assert_eq!(buffer.buffer.len(), 2); + } + + // Receiving a `ReplacedLocalEvent` targeting the second (so the last) event. + // The `LatestEventValue` is changing. + { + let transaction_id = &transaction_id_1; + let LocalEchoContent::Event { serialized_event: new_content, .. } = + new_local_echo_content(&room_send_queue, transaction_id, "B.") + else { + panic!("oopsy"); + }; + + let update = RoomSendQueueUpdate::ReplacedLocalEvent { + transaction_id: transaction_id.clone(), + new_content, + }; + + // The `LatestEventValue` has changed, it still matches the latest local + // event but with its new content. + assert_matches!( + LatestEventValue::new_local(&update, &mut buffer, &room_event_cache, &None).await, + LatestEventValue::LocalIsSending( + LatestEventKind::RoomMessage(message_content) + ) => { + assert_eq!(message_content.body(), "B."); + } + ); + + assert_eq!(buffer.buffer.len(), 2); + } + } + + #[async_test] + async fn test_local_send_error() { + let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await; + + let mut buffer = LatestEventValuesForLocalEvents::new(); + let transaction_id_0 = OwnedTransactionId::from("txnid0"); + let transaction_id_1 = OwnedTransactionId::from("txnid1"); + + // Receiving two `NewLocalEvent`s. + { + for (transaction_id, body) in [(&transaction_id_0, "A"), (&transaction_id_1, "B")] { + let content = new_local_echo_content(&room_send_queue, transaction_id, body); + + let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho { + transaction_id: transaction_id.clone(), + content, + }); + + // The `LatestEventValue` matches the new local event. + assert_matches!( + LatestEventValue::new_local(&update, &mut buffer, &room_event_cache, &None).await, + LatestEventValue::LocalIsSending( + LatestEventKind::RoomMessage(message_content) + ) => { + assert_eq!(message_content.body(), body); + } + ); + } + + assert_eq!(buffer.buffer.len(), 2); + } + + // Receiving a `SendError` targeting the first event. The + // `LatestEventValue` must change to indicate it's wedged. + { + let update = RoomSendQueueUpdate::SendError { + transaction_id: transaction_id_0.clone(), + error: Arc::new(Error::UnknownError("oopsy".to_owned().into())), + is_recoverable: true, + }; + + // The `LatestEventValue` has changed, it still matches the latest local + // event but it's marked as wedged. + assert_matches!( + LatestEventValue::new_local(&update, &mut buffer, &room_event_cache, &None).await, + LatestEventValue::LocalIsWedged( + LatestEventKind::RoomMessage(message_content) + ) => { + assert_eq!(message_content.body(), "B"); + } + ); + + assert_eq!(buffer.buffer.len(), 2); + assert_matches!( + &buffer.buffer[0].1, + LatestEventValue::LocalIsWedged( + LatestEventKind::RoomMessage(message_content) + ) => { + assert_eq!(message_content.body(), "A"); + } + ); + assert_matches!( + &buffer.buffer[1].1, + LatestEventValue::LocalIsWedged( + LatestEventKind::RoomMessage(message_content) + ) => { + assert_eq!(message_content.body(), "B"); + } + ); + } + + // Receiving a `SentEvent` targeting the first event. The `LatestEventValue` + // must change: since an event has been sent, the following events are now + // unwedged. + { + let update = RoomSendQueueUpdate::SentEvent { + transaction_id: transaction_id_0.clone(), + event_id: event_id!("$ev0").to_owned(), + }; + + // The `LatestEventValue` has changed, it still matches the latest local + // event but it's unwedged. + assert_matches!( + LatestEventValue::new_local(&update, &mut buffer, &room_event_cache, &None).await, + LatestEventValue::LocalIsSending( + LatestEventKind::RoomMessage(message_content) + ) => { + assert_eq!(message_content.body(), "B"); + } + ); + + assert_eq!(buffer.buffer.len(), 1); + assert_matches!( + &buffer.buffer[0].1, + LatestEventValue::LocalIsSending( + LatestEventKind::RoomMessage(message_content) + ) => { + assert_eq!(message_content.body(), "B"); + } + ); + } + } + + #[async_test] + async fn test_local_retry_event() { + let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await; + + let mut buffer = LatestEventValuesForLocalEvents::new(); + let transaction_id_0 = OwnedTransactionId::from("txnid0"); + let transaction_id_1 = OwnedTransactionId::from("txnid1"); + + // Receiving two `NewLocalEvent`s. + { + for (transaction_id, body) in [(&transaction_id_0, "A"), (&transaction_id_1, "B")] { + let content = new_local_echo_content(&room_send_queue, transaction_id, body); + + let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho { + transaction_id: transaction_id.clone(), + content, + }); + + // The `LatestEventValue` matches the new local event. + assert_matches!( + LatestEventValue::new_local(&update, &mut buffer, &room_event_cache, &None).await, + LatestEventValue::LocalIsSending( + LatestEventKind::RoomMessage(message_content) + ) => { + assert_eq!(message_content.body(), body); + } + ); + } + + assert_eq!(buffer.buffer.len(), 2); + } + + // Receiving a `SendError` targeting the first event. The + // `LatestEventValue` must change to indicate it's wedged. + { + let update = RoomSendQueueUpdate::SendError { + transaction_id: transaction_id_0.clone(), + error: Arc::new(Error::UnknownError("oopsy".to_owned().into())), + is_recoverable: true, + }; + + // The `LatestEventValue` has changed, it still matches the latest local + // event but it's marked as wedged. + assert_matches!( + LatestEventValue::new_local(&update, &mut buffer, &room_event_cache, &None).await, + LatestEventValue::LocalIsWedged( + LatestEventKind::RoomMessage(message_content) + ) => { + assert_eq!(message_content.body(), "B"); + } + ); + + assert_eq!(buffer.buffer.len(), 2); + assert_matches!( + &buffer.buffer[0].1, + LatestEventValue::LocalIsWedged( + LatestEventKind::RoomMessage(message_content) + ) => { + assert_eq!(message_content.body(), "A"); + } + ); + assert_matches!( + &buffer.buffer[1].1, + LatestEventValue::LocalIsWedged( + LatestEventKind::RoomMessage(message_content) + ) => { + assert_eq!(message_content.body(), "B"); + } + ); + } + + // Receiving a `RetryEvent` targeting the first event. The `LatestEventValue` + // must change: this local event and its following must be unwedged. + { + let update = + RoomSendQueueUpdate::RetryEvent { transaction_id: transaction_id_0.clone() }; + + // The `LatestEventValue` has changed, it still matches the latest local + // event but it's unwedged. + assert_matches!( + LatestEventValue::new_local(&update, &mut buffer, &room_event_cache, &None).await, + LatestEventValue::LocalIsSending( + LatestEventKind::RoomMessage(message_content) + ) => { + assert_eq!(message_content.body(), "B"); + } + ); + + assert_eq!(buffer.buffer.len(), 2); + assert_matches!( + &buffer.buffer[0].1, + LatestEventValue::LocalIsSending( + LatestEventKind::RoomMessage(message_content) + ) => { + assert_eq!(message_content.body(), "A"); + } + ); + assert_matches!( + &buffer.buffer[1].1, + LatestEventValue::LocalIsSending( + LatestEventKind::RoomMessage(message_content) + ) => { + assert_eq!(message_content.body(), "B"); + } + ); + } + } + + #[async_test] + async fn test_local_media_upload() { + let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await; + + let mut buffer = LatestEventValuesForLocalEvents::new(); + let transaction_id = OwnedTransactionId::from("txnid"); + + // Receiving a `NewLocalEvent`. + { + let content = new_local_echo_content(&room_send_queue, &transaction_id, "A"); + + let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho { + transaction_id: transaction_id.clone(), + content, + }); + + // The `LatestEventValue` matches the new local event. + assert_matches!( + LatestEventValue::new_local(&update, &mut buffer, &room_event_cache, &None).await, + LatestEventValue::LocalIsSending( + LatestEventKind::RoomMessage(message_content) + ) => { + assert_eq!(message_content.body(), "A"); + } + ); + + assert_eq!(buffer.buffer.len(), 1); + } + + // Receiving a `MediaUpload` targeting the first event. The + // `LatestEventValue` must not change as `MediaUpload` are ignored. + { + let update = RoomSendQueueUpdate::MediaUpload { + related_to: transaction_id, + file: None, + index: 0, + progress: AbstractProgress { current: 0, total: 0 }, + }; + + // The `LatestEventValue` has changed somehow, it tells no new + // `LatestEventValue` is computed. + assert_matches!( + LatestEventValue::new_local(&update, &mut buffer, &room_event_cache, &None).await, + LatestEventValue::None + ); + + assert_eq!(buffer.buffer.len(), 1); + } + } + + #[async_test] + async fn test_local_fallbacks_to_remote_when_empty() { + let room_id = room_id!("!r0"); + let user_id = user_id!("@mnt_io:matrix.org"); + let event_factory = EventFactory::new().sender(user_id).room(room_id); + let event_id_0 = event_id!("$ev0"); + let event_id_1 = event_id!("$ev1"); + + let server = MatrixMockServer::new().await; + let client = server.client_builder().build().await; + + // Prelude. + { + // Create the room. + client.base_client().get_or_create_room(room_id, RoomState::Joined); + + // Initialise the event cache store. + client + .event_cache_store() + .lock() + .await + .unwrap() + .handle_linked_chunk_updates( + LinkedChunkId::Room(room_id), + vec![ + Update::NewItemsChunk { + previous: None, + new: ChunkIdentifier::new(0), + next: None, + }, + Update::PushItems { + at: Position::new(ChunkIdentifier::new(0), 0), + items: vec![event_factory + .text_msg("hello") + .event_id(event_id_0) + .into()], + }, + ], + ) + .await + .unwrap(); + } + + let event_cache = client.event_cache(); + event_cache.subscribe().unwrap(); + + let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap(); + + let mut buffer = LatestEventValuesForLocalEvents::new(); + + assert_matches!( + LatestEventValue::new_local( + // An update that won't be create a new `LatestEventValue`. + &RoomSendQueueUpdate::SentEvent { + transaction_id: OwnedTransactionId::from("txnid"), + event_id: event_id_1.to_owned(), + }, + &mut buffer, + &room_event_cache, + &None, + ) + .await, + // We get a `Remote` because there is no `Local*` values! + LatestEventValue::Remote(LatestEventKind::RoomMessage(message_content)) => { + assert_eq!(message_content.body(), "hello"); } ); } diff --git a/crates/matrix-sdk/src/latest_events/mod.rs b/crates/matrix-sdk/src/latest_events/mod.rs index 92b17a6c7c0..971aa0b3bbc 100644 --- a/crates/matrix-sdk/src/latest_events/mod.rs +++ b/crates/matrix-sdk/src/latest_events/mod.rs @@ -57,9 +57,9 @@ use std::{ pub use error::LatestEventsError; use eyeball::{AsyncLock, Subscriber}; -use futures_util::{select, FutureExt}; +use futures_util::{select_biased, FutureExt}; use latest_event::LatestEvent; -pub use latest_event::LatestEventValue; +pub use latest_event::{LatestEventKind, LatestEventValue}; use matrix_sdk_common::executor::{spawn, AbortOnDrop, JoinHandleExt as _}; use ruma::{EventId, OwnedEventId, OwnedRoomId, RoomId}; use tokio::sync::{broadcast, mpsc, RwLock, RwLockReadGuard, RwLockWriteGuard}; @@ -69,7 +69,7 @@ use crate::{ client::WeakClient, event_cache::{EventCache, EventCacheError, RoomEventCache, RoomEventCacheGenericUpdate}, room::WeakRoom, - send_queue::SendQueue, + send_queue::{RoomSendQueueUpdate, SendQueue, SendQueueUpdate}, }; /// The entry point to fetch the [`LatestEventValue`] for rooms or threads. @@ -408,6 +408,25 @@ enum RoomRegistration { Remove(OwnedRoomId), } +/// Represents the kind of updates the [`compute_latest_events_task`] will have +/// to deal with. +enum LatestEventQueueUpdate { + /// An update from the [`EventCache`] happened. + EventCache { + /// The ID of the room that has triggered the update. + room_id: OwnedRoomId, + }, + + /// An update from the [`SendQueue`] happened. + SendQueue { + /// The ID of the room that has triggered the update. + room_id: OwnedRoomId, + + /// The update itself. + update: RoomSendQueueUpdate, + }, +} + /// Type holding the [`LatestEvent`] for a room and for all its threads. #[derive(Debug)] struct RoomLatestEvents { @@ -488,8 +507,9 @@ impl RoomLatestEvents { self.per_thread.get(thread_id) } - /// Update the latest events for the room and its threads. - async fn update(&mut self) { + /// Update the latest events for the room and its threads, based on the + /// event cache data. + async fn update_with_event_cache(&mut self) { // Get the power levels of the user for the current room if the `WeakRoom` is // still valid. // @@ -506,10 +526,40 @@ impl RoomLatestEvents { None => None, }; - self.for_the_room.update(&self.room_event_cache, &power_levels).await; + self.for_the_room.update_with_event_cache(&self.room_event_cache, &power_levels).await; for latest_event in self.per_thread.values_mut() { - latest_event.update(&self.room_event_cache, &power_levels).await; + latest_event.update_with_event_cache(&self.room_event_cache, &power_levels).await; + } + } + + /// Update the latest events for the room and its threads, based on the + /// send queue update. + async fn update_with_send_queue(&mut self, send_queue_update: &RoomSendQueueUpdate) { + // Get the power levels of the user for the current room if the `WeakRoom` is + // still valid. + // + // Get it once for all the updates of all the latest events for this room (be + // the room and its threads). + let room = self.weak_room.get(); + let power_levels = match &room { + Some(room) => { + let power_levels = room.power_levels().await.ok(); + + Some(room.own_user_id()).zip(power_levels) + } + + None => None, + }; + + self.for_the_room + .update_with_send_queue(send_queue_update, &self.room_event_cache, &power_levels) + .await; + + for latest_event in self.per_thread.values_mut() { + latest_event + .update_with_send_queue(send_queue_update, &self.room_event_cache, &power_levels) + .await; } } } @@ -529,27 +579,26 @@ async fn listen_to_event_cache_and_send_queue_updates_task( registered_rooms: Arc, mut room_registration_receiver: mpsc::Receiver, event_cache: EventCache, - _send_queue: SendQueue, - latest_event_queue_sender: mpsc::UnboundedSender, + send_queue: SendQueue, + latest_event_queue_sender: mpsc::UnboundedSender, ) { let mut event_cache_generic_updates_subscriber = event_cache.subscribe_to_room_generic_updates(); + let mut send_queue_generic_updates_subscriber = send_queue.subscribe(); // Initialise the list of rooms that are listened. // - // Technically, we can use `rooms` to get this information, but it would involve - // a read-lock. In order to reduce the pressure on this lock, we use - // this intermediate structure. - let mut listened_rooms = { - let rooms = registered_rooms.rooms.read().await; - - HashSet::from_iter(rooms.keys().cloned()) - }; + // Technically, we can use `registered_rooms.rooms` every time to get this + // information, but it would involve a read-lock. In order to reduce the + // pressure on this lock, we use this intermediate structure. + let mut listened_rooms = + HashSet::from_iter(registered_rooms.rooms.read().await.keys().cloned()); loop { if listen_to_event_cache_and_send_queue_updates( &mut room_registration_receiver, &mut event_cache_generic_updates_subscriber, + &mut send_queue_generic_updates_subscriber, &mut listened_rooms, &latest_event_queue_sender, ) @@ -570,10 +619,13 @@ async fn listen_to_event_cache_and_send_queue_updates_task( async fn listen_to_event_cache_and_send_queue_updates( room_registration_receiver: &mut mpsc::Receiver, event_cache_generic_updates_subscriber: &mut broadcast::Receiver, + send_queue_generic_updates_subscriber: &mut broadcast::Receiver, listened_rooms: &mut HashSet, - latest_event_queue_sender: &mpsc::UnboundedSender, + latest_event_queue_sender: &mpsc::UnboundedSender, ) -> ControlFlow<()> { - select! { + // We need a biased select here: `room_registration_receiver` must have the + // priority over other futures. + select_biased! { update = room_registration_receiver.recv().fuse() => { match update { Some(RoomRegistration::Add(room_id)) => { @@ -595,7 +647,9 @@ async fn listen_to_event_cache_and_send_queue_updates( let room_id = room_event_cache_generic_update.room_id; if listened_rooms.contains(&room_id) { - let _ = latest_event_queue_sender.send(room_id); + let _ = latest_event_queue_sender.send(LatestEventQueueUpdate::EventCache { + room_id + }); } } else { error!("`event_cache_generic_updates` channel has been closed"); @@ -603,6 +657,21 @@ async fn listen_to_event_cache_and_send_queue_updates( return ControlFlow::Break(()); } } + + send_queue_generic_update = send_queue_generic_updates_subscriber.recv().fuse() => { + if let Ok(SendQueueUpdate { room_id, update }) = send_queue_generic_update { + if listened_rooms.contains(&room_id) { + let _ = latest_event_queue_sender.send(LatestEventQueueUpdate::SendQueue { + room_id, + update + }); + } + } else { + error!("`send_queue_generic_updates` channel has been closed"); + + return ControlFlow::Break(()); + } + } } ControlFlow::Continue(()) @@ -615,7 +684,7 @@ async fn listen_to_event_cache_and_send_queue_updates( /// [`listen_to_event_cache_and_send_queue_updates_task`]. async fn compute_latest_events_task( registered_rooms: Arc, - mut latest_event_queue_receiver: mpsc::UnboundedReceiver, + mut latest_event_queue_receiver: mpsc::UnboundedReceiver, ) { const BUFFER_SIZE: usize = 16; @@ -629,16 +698,35 @@ async fn compute_latest_events_task( error!("`compute_latest_events_task` has stopped"); } -async fn compute_latest_events(registered_rooms: &RegisteredRooms, for_rooms: &[OwnedRoomId]) { - for room_id in for_rooms { - let mut rooms = registered_rooms.rooms.write().await; +async fn compute_latest_events( + registered_rooms: &RegisteredRooms, + latest_event_queue_updates: &[LatestEventQueueUpdate], +) { + for latest_event_queue_update in latest_event_queue_updates { + match latest_event_queue_update { + LatestEventQueueUpdate::EventCache { room_id } => { + let mut rooms = registered_rooms.rooms.write().await; + + if let Some(room_latest_events) = rooms.get_mut(room_id) { + room_latest_events.update_with_event_cache().await; + } else { + error!(?room_id, "Failed to find the room"); + + continue; + } + } + + LatestEventQueueUpdate::SendQueue { room_id, update } => { + let mut rooms = registered_rooms.rooms.write().await; - if let Some(room_latest_events) = rooms.get_mut(room_id) { - room_latest_events.update().await; - } else { - error!(?room_id, "Failed to find the room"); + if let Some(room_latest_events) = rooms.get_mut(room_id) { + room_latest_events.update_with_send_queue(update).await; + } else { + error!(?room_id, "Failed to find the room"); - continue; + continue; + } + } } } } @@ -653,12 +741,13 @@ mod tests { RoomState, }; use matrix_sdk_test::{async_test, event_factory::EventFactory, JoinedRoomBuilder}; - use ruma::{event_id, owned_room_id, room_id, user_id}; + use ruma::{event_id, owned_room_id, room_id, user_id, OwnedTransactionId}; use stream_assert::assert_pending; use super::{ - broadcast, listen_to_event_cache_and_send_queue_updates, mpsc, HashSet, LatestEventValue, - RoomEventCacheGenericUpdate, RoomRegistration, + broadcast, listen_to_event_cache_and_send_queue_updates, mpsc, HashSet, LatestEventKind, + LatestEventValue, RoomEventCacheGenericUpdate, RoomRegistration, RoomSendQueueUpdate, + SendQueueUpdate, }; use crate::test_utils::mocks::MatrixMockServer; @@ -822,6 +911,8 @@ mod tests { let (room_registration_sender, mut room_registration_receiver) = mpsc::channel(1); let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) = broadcast::channel(1); + let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) = + broadcast::channel(1); let mut listened_rooms = HashSet::new(); let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel(); @@ -834,6 +925,7 @@ mod tests { assert!(listen_to_event_cache_and_send_queue_updates( &mut room_registration_receiver, &mut room_event_cache_generic_update_receiver, + &mut send_queue_generic_update_receiver, &mut listened_rooms, &latest_event_queue_sender, ) @@ -853,6 +945,7 @@ mod tests { assert!(listen_to_event_cache_and_send_queue_updates( &mut room_registration_receiver, &mut room_event_cache_generic_update_receiver, + &mut send_queue_generic_update_receiver, &mut listened_rooms, &latest_event_queue_sender, ) @@ -876,6 +969,7 @@ mod tests { assert!(listen_to_event_cache_and_send_queue_updates( &mut room_registration_receiver, &mut room_event_cache_generic_update_receiver, + &mut send_queue_generic_update_receiver, &mut listened_rooms, &latest_event_queue_sender, ) @@ -895,6 +989,8 @@ mod tests { let (_room_registration_sender, mut room_registration_receiver) = mpsc::channel(1); let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) = broadcast::channel(1); + let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) = + broadcast::channel(1); let mut listened_rooms = HashSet::new(); let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel(); @@ -905,6 +1001,7 @@ mod tests { assert!(listen_to_event_cache_and_send_queue_updates( &mut room_registration_receiver, &mut room_event_cache_generic_update_receiver, + &mut send_queue_generic_update_receiver, &mut listened_rooms, &latest_event_queue_sender, ) @@ -923,6 +1020,8 @@ mod tests { let (room_registration_sender, mut room_registration_receiver) = mpsc::channel(1); let (room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) = broadcast::channel(1); + let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) = + broadcast::channel(1); let mut listened_rooms = HashSet::new(); let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel(); @@ -936,6 +1035,7 @@ mod tests { assert!(listen_to_event_cache_and_send_queue_updates( &mut room_registration_receiver, &mut room_event_cache_generic_update_receiver, + &mut send_queue_generic_update_receiver, &mut listened_rooms, &latest_event_queue_sender, ) @@ -961,6 +1061,82 @@ mod tests { assert!(listen_to_event_cache_and_send_queue_updates( &mut room_registration_receiver, &mut room_event_cache_generic_update_receiver, + &mut send_queue_generic_update_receiver, + &mut listened_rooms, + &latest_event_queue_sender, + ) + .await + .is_continue()); + } + + assert_eq!(listened_rooms.len(), 1); + assert!(listened_rooms.contains(&room_id)); + + // A latest event computation has been triggered! + assert!(latest_event_queue_receiver.is_empty().not()); + } + } + + #[async_test] + async fn test_inputs_task_can_listen_to_send_queue() { + let room_id = owned_room_id!("!r0"); + + let (room_registration_sender, mut room_registration_receiver) = mpsc::channel(1); + let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) = + broadcast::channel(1); + let (send_queue_generic_update_sender, mut send_queue_generic_update_receiver) = + broadcast::channel(1); + let mut listened_rooms = HashSet::new(); + let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel(); + + // New send queue update, but the `LatestEvents` isn't listening to it. + { + send_queue_generic_update_sender + .send(SendQueueUpdate { + room_id: room_id.clone(), + update: RoomSendQueueUpdate::SentEvent { + transaction_id: OwnedTransactionId::from("txnid0"), + event_id: event_id!("$ev0").to_owned(), + }, + }) + .unwrap(); + + // Run the task. + assert!(listen_to_event_cache_and_send_queue_updates( + &mut room_registration_receiver, + &mut room_event_cache_generic_update_receiver, + &mut send_queue_generic_update_receiver, + &mut listened_rooms, + &latest_event_queue_sender, + ) + .await + .is_continue()); + + assert!(listened_rooms.is_empty()); + + // No latest event computation has been triggered. + assert!(latest_event_queue_receiver.is_empty()); + } + + // New send queue update, but this time, the `LatestEvents` is listening to it. + { + room_registration_sender.send(RoomRegistration::Add(room_id.clone())).await.unwrap(); + send_queue_generic_update_sender + .send(SendQueueUpdate { + room_id: room_id.clone(), + update: RoomSendQueueUpdate::SentEvent { + transaction_id: OwnedTransactionId::from("txnid1"), + event_id: event_id!("$ev1").to_owned(), + }, + }) + .unwrap(); + + // Run the task to handle the `RoomRegistration` and the `SendQueueUpdate`. + for _ in 0..2 { + assert!(listen_to_event_cache_and_send_queue_updates( + &mut room_registration_receiver, + &mut room_event_cache_generic_update_receiver, + &mut send_queue_generic_update_receiver, &mut listened_rooms, &latest_event_queue_sender, ) @@ -981,6 +1157,8 @@ mod tests { let (_room_registration_sender, mut room_registration_receiver) = mpsc::channel(1); let (room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) = broadcast::channel(1); + let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) = + broadcast::channel(1); let mut listened_rooms = HashSet::new(); let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel(); @@ -991,6 +1169,36 @@ mod tests { assert!(listen_to_event_cache_and_send_queue_updates( &mut room_registration_receiver, &mut room_event_cache_generic_update_receiver, + &mut send_queue_generic_update_receiver, + &mut listened_rooms, + &latest_event_queue_sender, + ) + .await + // It breaks! + .is_break()); + + assert_eq!(listened_rooms.len(), 0); + assert!(latest_event_queue_receiver.is_empty()); + } + + #[async_test] + async fn test_inputs_task_stops_when_send_queue_channel_is_closed() { + let (_room_registration_sender, mut room_registration_receiver) = mpsc::channel(1); + let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) = + broadcast::channel(1); + let (send_queue_generic_update_sender, mut send_queue_generic_update_receiver) = + broadcast::channel(1); + let mut listened_rooms = HashSet::new(); + let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel(); + + // Drop the sender to close the channel. + drop(send_queue_generic_update_sender); + + // Run the task. + assert!(listen_to_event_cache_and_send_queue_updates( + &mut room_registration_receiver, + &mut room_event_cache_generic_update_receiver, + &mut send_queue_generic_update_receiver, &mut listened_rooms, &latest_event_queue_sender, ) @@ -1059,8 +1267,8 @@ mod tests { // latest event! assert_matches!( latest_event_stream.get().await, - LatestEventValue::RoomMessage(event) => { - assert_eq!(event.event_id(), event_id_1); + LatestEventValue::Remote(LatestEventKind::RoomMessage(message_content)) => { + assert_eq!(message_content.body(), "world"); } ); @@ -1072,10 +1280,7 @@ mod tests { .sync_room( &client, JoinedRoomBuilder::new(&room_id).add_timeline_event( - event_factory - .text_msg("venez découvrir cette nouvelle raclette !") - .event_id(event_id_2) - .into_raw(), + event_factory.text_msg("raclette !").event_id(event_id_2).into_raw(), ), ) .await; @@ -1085,8 +1290,8 @@ mod tests { // `compute_latest_events` which has updated the latest event value. assert_matches!( latest_event_stream.next().await, - Some(LatestEventValue::RoomMessage(event)) => { - assert_eq!(event.event_id(), event_id_2); + Some(LatestEventValue::Remote(LatestEventKind::RoomMessage(message_content))) => { + assert_eq!(message_content.body(), "raclette !"); } ); } diff --git a/crates/matrix-sdk/src/send_queue/mod.rs b/crates/matrix-sdk/src/send_queue/mod.rs index 98cc8937949..d2973331295 100644 --- a/crates/matrix-sdk/src/send_queue/mod.rs +++ b/crates/matrix-sdk/src/send_queue/mod.rs @@ -236,7 +236,20 @@ impl SendQueue { /// Get or create a new send queue for a given room, and insert it into our /// memoized rooms mapping. + #[cfg(not(test))] fn for_room(&self, room: Room) -> RoomSendQueue { + self.for_room_impl(room) + } + + /// Get or create a new send queue for a given room, and insert it into our + /// memoized rooms mapping. + #[cfg(test)] + pub(crate) fn for_room(&self, room: Room) -> RoomSendQueue { + self.for_room_impl(room) + } + + #[inline(always)] + fn for_room_impl(&self, room: Room) -> RoomSendQueue { let data = self.data(); let mut map = data.rooms.write().unwrap(); @@ -2406,6 +2419,16 @@ pub struct SendHandle { } impl SendHandle { + /// Creates a new [`SendHandle`]. + #[cfg(test)] + pub(crate) fn new( + room: RoomSendQueue, + transaction_id: OwnedTransactionId, + created_at: MilliSecondsSinceUnixEpoch, + ) -> Self { + Self { room, transaction_id, media_handles: vec![], created_at } + } + fn nyi_for_uploads(&self) -> Result<(), RoomSendQueueStorageError> { if !self.media_handles.is_empty() { Err(RoomSendQueueStorageError::OperationNotImplementedYet)