Skip to content
Merged
9 changes: 8 additions & 1 deletion crates/matrix-sdk-base/src/store/send_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,19 @@ impl SerializableEventContent {
self.event.deserialize_with_type(&self.event_type)
}

/// Returns the raw event content along with its type.
/// Returns the raw event content along with its type, borrowed variant.
///
/// Useful for callers manipulating custom events.
pub fn raw(&self) -> (&Raw<AnyMessageLikeEventContent>, &str) {
(&self.event, &self.event_type)
}

/// Returns the raw event content along with its type, owned variant.
///
/// Useful for callers manipulating custom events.
pub fn into_raw(self) -> (Raw<AnyMessageLikeEventContent>, String) {
(self.event, self.event_type)
}
}

/// The kind of a send queue request.
Expand Down
21 changes: 20 additions & 1 deletion crates/matrix-sdk-common/src/serde_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

use ruma::{
OwnedEventId,
events::{AnySyncMessageLikeEvent, AnySyncTimelineEvent, relation::BundledThread},
events::{
AnyMessageLikeEventContent, AnySyncMessageLikeEvent, AnySyncTimelineEvent,
relation::BundledThread,
},
serde::Raw,
};
use serde::Deserialize;
Expand Down Expand Up @@ -45,6 +48,22 @@ struct SimplifiedContent {
relates_to: Option<RelatesTo>,
}

/// Try to extract the thread root from an event's content, if provided.
///
/// The thread root is the field located at `m.relates_to`.`event_id`,
/// if the field at `m.relates_to`.`rel_type` is `m.thread`.
///
/// Returns `None` if we couldn't find a thread root, or if there was an issue
/// during deserialization.
pub fn extract_thread_root_from_content(
content: Raw<AnyMessageLikeEventContent>,
) -> Option<OwnedEventId> {
let relates_to = content.deserialize_as_unchecked::<SimplifiedContent>().ok()?.relates_to?;
match relates_to.rel_type {
RelationsType::Thread => relates_to.event_id,
}
}

/// Try to extract the thread root from a timeline event, if provided.
///
/// The thread root is the field located at `content`.`m.relates_to`.`event_id`,
Expand Down
29 changes: 9 additions & 20 deletions crates/matrix-sdk/src/event_cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,16 @@ use matrix_sdk_base::{
},
executor::AbortOnDrop,
linked_chunk::{self, lazy_loader::LazyLoaderError, OwnedLinkedChunkId},
store::SerializableEventContent,
serde_helpers::extract_thread_root_from_content,
store_locks::LockStoreError,
sync::RoomUpdates,
timer,
};
use matrix_sdk_common::executor::{spawn, JoinHandle};
use room::RoomEventCacheState;
use ruma::{
events::{room::encrypted, AnySyncEphemeralRoomEvent},
serde::Raw,
OwnedEventId, OwnedRoomId, OwnedTransactionId, RoomId,
events::AnySyncEphemeralRoomEvent, serde::Raw, OwnedEventId, OwnedRoomId, OwnedTransactionId,
RoomId,
};
use tokio::{
select,
Expand Down Expand Up @@ -533,25 +532,13 @@ impl EventCache {
return true;
};

let extract_thread_root = |serialized_event: SerializableEventContent| {
match serialized_event.deserialize() {
Ok(content) => {
if let Some(encrypted::Relation::Thread(thread)) = content.relation() {
return Some(thread.event_id);
}
}
Err(err) => {
warn!("error when deserializing content of a local echo: {err}");
}
}
None
};

let (thread_root, subscribe_up_to) = match up.update {
RoomSendQueueUpdate::NewLocalEvent(local_echo) => {
match local_echo.content {
LocalEchoContent::Event { serialized_event, .. } => {
if let Some(thread_root) = extract_thread_root(serialized_event) {
if let Some(thread_root) =
extract_thread_root_from_content(serialized_event.into_raw().0)
{
events_being_sent.insert(local_echo.transaction_id, thread_root);
}
}
Expand All @@ -569,7 +556,9 @@ impl EventCache {
}

RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content } => {
if let Some(thread_root) = extract_thread_root(new_content) {
if let Some(thread_root) =
extract_thread_root_from_content(new_content.into_raw().0)
{
events_being_sent.insert(transaction_id, thread_root);
} else {
// It could be that the event isn't part of a thread anymore; handle that by
Expand Down
13 changes: 9 additions & 4 deletions crates/matrix-sdk/tests/integration/send_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use assert_matches2::{assert_let, assert_matches};
#[cfg(feature = "unstable-msc4274")]
use matrix_sdk::attachment::{GalleryConfig, GalleryItemInfo};
use matrix_sdk::{
assert_let_timeout,
attachment::{AttachmentConfig, AttachmentInfo, BaseImageInfo, Thumbnail},
config::StoreConfig,
media::{MediaFormat, MediaRequestParameters, MediaThumbnailSettings},
Expand Down Expand Up @@ -3729,6 +3730,8 @@ async fn test_sending_reply_in_thread_auto_subscribe() {

client.event_cache().subscribe().unwrap();

let mut thread_subscriber_updates = client.event_cache().subscribe_thread_subscriber_updates();

let room_id = room_id!("!a:b.c");
let room = server.sync_joined_room(&client, room_id).await;

Expand Down Expand Up @@ -3793,8 +3796,9 @@ async fn test_sending_reply_in_thread_auto_subscribe() {
room.send_queue().send(content.into()).await.unwrap();

// Let the send queue process the event.
assert_let!(RoomSendQueueUpdate::NewLocalEvent(..) = stream.recv().await.unwrap());
assert_let!(RoomSendQueueUpdate::SentEvent { .. } = stream.recv().await.unwrap());
assert_let_timeout!(Ok(RoomSendQueueUpdate::NewLocalEvent(..)) = stream.recv());
assert_let_timeout!(Ok(RoomSendQueueUpdate::SentEvent { .. }) = stream.recv());
assert_let_timeout!(Ok(()) = thread_subscriber_updates.recv());

// Check the endpoints have been correctly called.
server.server().reset().await;
Expand All @@ -3819,8 +3823,9 @@ async fn test_sending_reply_in_thread_auto_subscribe() {
room.send_queue().send(content.into()).await.unwrap();

// Let the send queue process the event.
assert_let!(RoomSendQueueUpdate::NewLocalEvent(..) = stream.recv().await.unwrap());
assert_let!(RoomSendQueueUpdate::SentEvent { .. } = stream.recv().await.unwrap());
assert_let_timeout!(Ok(RoomSendQueueUpdate::NewLocalEvent(..)) = stream.recv());
assert_let_timeout!(Ok(RoomSendQueueUpdate::SentEvent { .. }) = stream.recv());
assert_let_timeout!(Ok(()) = thread_subscriber_updates.recv());

sleep(Duration::from_millis(100)).await;
}
Loading