Skip to content

Commit a682a46

Browse files
committed
refactor(event cache): avoid deserializing the full event content to be sent, for extracting its thread root
1 parent 0ac8699 commit a682a46

File tree

4 files changed

+46
-26
lines changed

4 files changed

+46
-26
lines changed

crates/matrix-sdk-base/src/store/send_queue.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,19 @@ impl SerializableEventContent {
6666
self.event.deserialize_with_type(&self.event_type)
6767
}
6868

69-
/// Returns the raw event content along with its type.
69+
/// Returns the raw event content along with its type, borrowed variant.
7070
///
7171
/// Useful for callers manipulating custom events.
7272
pub fn raw(&self) -> (&Raw<AnyMessageLikeEventContent>, &str) {
7373
(&self.event, &self.event_type)
7474
}
75+
76+
/// Returns the raw event content along with its type, owned variant.
77+
///
78+
/// Useful for callers manipulating custom events.
79+
pub fn into_raw(self) -> (Raw<AnyMessageLikeEventContent>, String) {
80+
(self.event, self.event_type)
81+
}
7582
}
7683

7784
/// The kind of a send queue request.

crates/matrix-sdk-common/src/serde_helpers.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@
1717
1818
use ruma::{
1919
OwnedEventId,
20-
events::{AnySyncMessageLikeEvent, AnySyncTimelineEvent, relation::BundledThread},
20+
events::{
21+
AnyMessageLikeEventContent, AnySyncMessageLikeEvent, AnySyncTimelineEvent,
22+
relation::BundledThread,
23+
},
2124
serde::Raw,
2225
};
2326
use serde::Deserialize;
@@ -45,6 +48,22 @@ struct SimplifiedContent {
4548
relates_to: Option<RelatesTo>,
4649
}
4750

51+
/// Try to extract the thread root from an event's content, if provided.
52+
///
53+
/// The thread root is the field located at `m.relates_to`.`event_id`,
54+
/// if the field at `m.relates_to`.`rel_type` is `m.thread`.
55+
///
56+
/// Returns `None` if we couldn't find a thread root, or if there was an issue
57+
/// during deserialization.
58+
pub fn extract_thread_root_from_content(
59+
content: Raw<AnyMessageLikeEventContent>,
60+
) -> Option<OwnedEventId> {
61+
let relates_to = content.deserialize_as_unchecked::<SimplifiedContent>().ok()?.relates_to?;
62+
match relates_to.rel_type {
63+
RelationsType::Thread => relates_to.event_id,
64+
}
65+
}
66+
4867
/// Try to extract the thread root from a timeline event, if provided.
4968
///
5069
/// The thread root is the field located at `content`.`m.relates_to`.`event_id`,

crates/matrix-sdk/src/event_cache/mod.rs

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -44,17 +44,16 @@ use matrix_sdk_base::{
4444
},
4545
executor::AbortOnDrop,
4646
linked_chunk::{self, lazy_loader::LazyLoaderError, OwnedLinkedChunkId},
47-
store::SerializableEventContent,
47+
serde_helpers::extract_thread_root_from_content,
4848
store_locks::LockStoreError,
4949
sync::RoomUpdates,
5050
timer,
5151
};
5252
use matrix_sdk_common::executor::{spawn, JoinHandle};
5353
use room::RoomEventCacheState;
5454
use ruma::{
55-
events::{room::encrypted, AnySyncEphemeralRoomEvent},
56-
serde::Raw,
57-
OwnedEventId, OwnedRoomId, OwnedTransactionId, RoomId,
55+
events::AnySyncEphemeralRoomEvent, serde::Raw, OwnedEventId, OwnedRoomId, OwnedTransactionId,
56+
RoomId,
5857
};
5958
use tokio::{
6059
select,
@@ -533,25 +532,13 @@ impl EventCache {
533532
return true;
534533
};
535534

536-
let extract_thread_root = |serialized_event: SerializableEventContent| {
537-
match serialized_event.deserialize() {
538-
Ok(content) => {
539-
if let Some(encrypted::Relation::Thread(thread)) = content.relation() {
540-
return Some(thread.event_id);
541-
}
542-
}
543-
Err(err) => {
544-
warn!("error when deserializing content of a local echo: {err}");
545-
}
546-
}
547-
None
548-
};
549-
550535
let (thread_root, subscribe_up_to) = match up.update {
551536
RoomSendQueueUpdate::NewLocalEvent(local_echo) => {
552537
match local_echo.content {
553538
LocalEchoContent::Event { serialized_event, .. } => {
554-
if let Some(thread_root) = extract_thread_root(serialized_event) {
539+
if let Some(thread_root) =
540+
extract_thread_root_from_content(serialized_event.into_raw().0)
541+
{
555542
events_being_sent.insert(local_echo.transaction_id, thread_root);
556543
}
557544
}
@@ -569,7 +556,9 @@ impl EventCache {
569556
}
570557

571558
RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content } => {
572-
if let Some(thread_root) = extract_thread_root(new_content) {
559+
if let Some(thread_root) =
560+
extract_thread_root_from_content(new_content.into_raw().0)
561+
{
573562
events_being_sent.insert(transaction_id, thread_root);
574563
} else {
575564
// It could be that the event isn't part of a thread anymore; handle that by

crates/matrix-sdk/tests/integration/send_queue.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use assert_matches2::{assert_let, assert_matches};
55
#[cfg(feature = "unstable-msc4274")]
66
use matrix_sdk::attachment::{GalleryConfig, GalleryItemInfo};
77
use matrix_sdk::{
8+
assert_let_timeout,
89
attachment::{AttachmentConfig, AttachmentInfo, BaseImageInfo, Thumbnail},
910
config::StoreConfig,
1011
media::{MediaFormat, MediaRequestParameters, MediaThumbnailSettings},
@@ -3729,6 +3730,8 @@ async fn test_sending_reply_in_thread_auto_subscribe() {
37293730

37303731
client.event_cache().subscribe().unwrap();
37313732

3733+
let mut thread_subscriber_updates = client.event_cache().subscribe_thread_subscriber_updates();
3734+
37323735
let room_id = room_id!("!a:b.c");
37333736
let room = server.sync_joined_room(&client, room_id).await;
37343737

@@ -3793,8 +3796,9 @@ async fn test_sending_reply_in_thread_auto_subscribe() {
37933796
room.send_queue().send(content.into()).await.unwrap();
37943797

37953798
// Let the send queue process the event.
3796-
assert_let!(RoomSendQueueUpdate::NewLocalEvent(..) = stream.recv().await.unwrap());
3797-
assert_let!(RoomSendQueueUpdate::SentEvent { .. } = stream.recv().await.unwrap());
3799+
assert_let_timeout!(Ok(RoomSendQueueUpdate::NewLocalEvent(..)) = stream.recv());
3800+
assert_let_timeout!(Ok(RoomSendQueueUpdate::SentEvent { .. }) = stream.recv());
3801+
assert_let_timeout!(Ok(()) = thread_subscriber_updates.recv());
37983802

37993803
// Check the endpoints have been correctly called.
38003804
server.server().reset().await;
@@ -3819,8 +3823,9 @@ async fn test_sending_reply_in_thread_auto_subscribe() {
38193823
room.send_queue().send(content.into()).await.unwrap();
38203824

38213825
// Let the send queue process the event.
3822-
assert_let!(RoomSendQueueUpdate::NewLocalEvent(..) = stream.recv().await.unwrap());
3823-
assert_let!(RoomSendQueueUpdate::SentEvent { .. } = stream.recv().await.unwrap());
3826+
assert_let_timeout!(Ok(RoomSendQueueUpdate::NewLocalEvent(..)) = stream.recv());
3827+
assert_let_timeout!(Ok(RoomSendQueueUpdate::SentEvent { .. }) = stream.recv());
3828+
assert_let_timeout!(Ok(()) = thread_subscriber_updates.recv());
38243829

38253830
sleep(Duration::from_millis(100)).await;
38263831
}

0 commit comments

Comments
 (0)