Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion crates/matrix-sdk-common/src/serde_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

use ruma::{
OwnedEventId,
events::{AnySyncMessageLikeEvent, AnySyncTimelineEvent, relation::BundledThread},
events::{
AnyMessageLikeEventContent, AnySyncMessageLikeEvent, AnySyncTimelineEvent,
relation::BundledThread,
},
serde::Raw,
};
use serde::Deserialize;
Expand Down Expand Up @@ -45,6 +48,16 @@ struct SimplifiedContent {
relates_to: Option<RelatesTo>,
}

/// Given a message-like event content, try to extract the thread root from it.
pub fn extract_thread_root_from_content(
content: &Raw<AnyMessageLikeEventContent>,
) -> Option<OwnedEventId> {
let relates_to = content.deserialize_as_unchecked::<SimplifiedContent>().ok()?.relates_to?;
match relates_to.rel_type {
RelationsType::Thread => relates_to.event_id,
}
}

/// Try to extract the thread root from a timeline event, if provided.
///
/// The thread root is the field located at `content`.`m.relates_to`.`event_id`,
Expand Down
8 changes: 7 additions & 1 deletion crates/matrix-sdk/src/event_cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ mod pagination;
mod room;

pub use pagination::{RoomPagination, RoomPaginationStatus};
pub use room::{RoomEventCache, RoomEventCacheSubscriber, ThreadEventCacheUpdate};
pub use room::{
should_subscribe_thread, RoomEventCache, RoomEventCacheSubscriber, ThreadEventCacheUpdate,
};

/// An error observed in the [`EventCache`].
#[derive(thiserror::Error, Debug)]
Expand Down Expand Up @@ -102,6 +104,10 @@ pub enum EventCacheError {
#[error("The owning client of the event cache has been dropped.")]
ClientDropped,

/// The current client isn't logged in.
#[error("The current client isn't logged in")]
UnknownUser,

/// An error happening when interacting with the [`LinkedChunk`]'s lazy
/// loader.
///
Expand Down
17 changes: 12 additions & 5 deletions crates/matrix-sdk/src/event_cache/pagination.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ use super::{
BackPaginationOutcome, EventsOrigin, Result, RoomEventCacheUpdate,
};
use crate::{
event_cache::{EventCacheError, RoomEventCacheGenericUpdate},
event_cache::{
room::threads::{push_context_for_threads_subscriptions, subscribe_to_new_threads},
EventCacheError, RoomEventCacheGenericUpdate,
},
room::MessagesOptions,
};

Expand Down Expand Up @@ -263,7 +266,7 @@ impl RoomPagination {
batch_size: u16,
prev_token: Option<String>,
) -> Result<Option<BackPaginationOutcome>> {
let (events, new_token) = {
let (room, events, new_token) = {
let Some(room) = self.inner.weak_room.get() else {
// The client is shutting down, return an empty default response.
return Ok(Some(BackPaginationOutcome {
Expand All @@ -280,15 +283,17 @@ impl RoomPagination {
.await
.map_err(|err| EventCacheError::BackpaginationError(Box::new(err)))?;

(response.chunk, response.end)
(room, response.chunk, response.end)
};

if let Some((outcome, timeline_event_diffs)) = self
let push_context = push_context_for_threads_subscriptions(&room).await;

if let Some((outcome, timeline_event_diffs, new_thread_subs)) = self
.inner
.state
.write()
.await
.handle_backpagination(events, new_token, prev_token)
.handle_backpagination(push_context, events, new_token, prev_token)
.await?
{
if !timeline_event_diffs.is_empty() {
Expand All @@ -298,6 +303,8 @@ impl RoomPagination {
});
}

subscribe_to_new_threads(&room, new_thread_subs).await;

Ok(Some(outcome))
} else {
// The previous token has gone missing, so the timeline has been reset in the
Expand Down
122 changes: 93 additions & 29 deletions crates/matrix-sdk/src/event_cache/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,19 @@ use super::{
};
use crate::{
client::WeakClient,
event_cache::EventCacheError,
event_cache::{
room::threads::{
push_context_for_threads_subscriptions, subscribe_to_new_threads, ThreadPushContext,
},
EventCacheError,
},
room::{IncludeRelations, RelationsOptions, WeakRoom},
};

pub(super) mod events;
mod threads;
pub(super) mod threads;

pub use threads::ThreadEventCacheUpdate;
pub use threads::{should_subscribe_thread, ThreadEventCacheUpdate};

/// A subset of an event cache, for a room.
///
Expand Down Expand Up @@ -276,14 +281,22 @@ impl RoomEventCache {
result.chunk.push(root_event);
}

let push_context = push_context_for_threads_subscriptions(&room).await;

let mut state = self.inner.state.write().await;

if let Some(outcome) = state.finish_thread_network_pagination(
thread_root.clone(),
prev_token,
result.next_batch_token,
result.chunk,
) {
if let Some((outcome, thread_subscriptions)) = state
.finish_thread_network_pagination(
push_context,
thread_root.clone(),
prev_token,
result.next_batch_token,
result.chunk,
)
.await
{
subscribe_to_new_threads(&room, thread_subscriptions).await;

return Ok(outcome.reached_start);
}

Expand Down Expand Up @@ -539,11 +552,22 @@ impl RoomEventCacheInner {
return Ok(());
}

let room = self.weak_room.get();
let push_context = if let Some(room) = &room {
push_context_for_threads_subscriptions(room).await
} else {
ThreadPushContext::default()
};

// Add all the events to the backend.
trace!("adding new events");

let (stored_prev_batch_token, timeline_event_diffs) =
self.state.write().await.handle_sync(timeline).await?;
let (stored_prev_batch_token, timeline_event_diffs, new_thread_subs) =
self.state.write().await.handle_sync(push_context, timeline).await?;

if let Some(room) = room {
subscribe_to_new_threads(&room, new_thread_subs).await;
}

// Now that all events have been added, we can trigger the
// `pagination_token_notifier`.
Expand Down Expand Up @@ -648,7 +672,11 @@ mod private {
sort_positions_descending, EventLocation, LoadMoreEventsBackwardsOutcome,
};
use crate::event_cache::{
deduplicator::filter_duplicate_events, room::threads::ThreadEventCache,
deduplicator::filter_duplicate_events,
room::threads::{
should_subscribe_thread, AutomaticThreadSubscriptions, ThreadEventCache,
ThreadPushContext,
},
BackPaginationOutcome, RoomPaginationStatus, ThreadEventCacheUpdate,
};

Expand Down Expand Up @@ -1438,11 +1466,14 @@ mod private {
/// linked chunk.
///
/// Flushes updates to disk first.
///
/// Returns new thread subscriptions, if any.
async fn post_process_new_events(
&mut self,
push_context: ThreadPushContext,
events: Vec<Event>,
is_sync: bool,
) -> Result<(), EventCacheError> {
) -> Result<AutomaticThreadSubscriptions, EventCacheError> {
// Update the store before doing the post-processing.
self.propagate_changes().await?;

Expand All @@ -1454,7 +1485,7 @@ mod private {
if let Some(thread_root) = extract_thread_root(event.raw()) {
new_events_by_thread.entry(thread_root).or_default().push(event.clone());
} else if let Some(event_id) = event.event_id() {
// If we spot the root of a thread, add it to its linked chunk, in sync mode.
// If we spot the root of a thread, add it to its linked chunk.
if self.threads.contains_key(&event_id) {
new_events_by_thread.entry(event_id).or_default().push(event.clone());
}
Expand All @@ -1466,9 +1497,10 @@ mod private {
}
}

self.update_threads(new_events_by_thread, is_sync).await?;
let new_thread_subs =
self.update_threads(push_context, new_events_by_thread, is_sync).await?;

Ok(())
Ok(new_thread_subs)
}

fn get_or_reload_thread(&mut self, root_event_id: OwnedEventId) -> &mut ThreadEventCache {
Expand All @@ -1479,15 +1511,34 @@ mod private {
.or_insert_with(|| ThreadEventCache::new(root_event_id))
}

/// Updates the threads' states according to new events:
///
/// - updates the in-memory thread cache with new events,
/// - updates the thread summary in the thread root (in the main room's
/// linked chunk),
/// - returns the threads to automatically subscribe to
#[instrument(skip_all)]
async fn update_threads(
&mut self,
push_context: ThreadPushContext,
new_events_by_thread: BTreeMap<OwnedEventId, Vec<Event>>,
is_sync: bool,
) -> Result<(), EventCacheError> {
) -> Result<AutomaticThreadSubscriptions, EventCacheError> {
let mut thread_subscriptions = AutomaticThreadSubscriptions::default();

for (thread_root, new_events) in new_events_by_thread {
let thread_cache = self.get_or_reload_thread(thread_root.clone());

// Compute the needed thread subscriptions.
// We want to subscribe up to the most recent event that mentions us. Events are
// in topological order, so start in reverse, and break at the
// first event that mentions us.
if let Some(subscribe_to_event_id) =
should_subscribe_thread(&push_context, new_events.iter()).await
{
thread_subscriptions.0.insert(thread_root.clone(), subscribe_to_event_id);
}

// If we're not in sync mode, we're receiving events from a room pagination: as
// we don't know where they should be put in a thread linked
// chunk, we don't try to be smart and include them. That's for
Expand Down Expand Up @@ -1547,7 +1598,7 @@ mod private {
self.replace_event_at(location, target_event).await?;
}

Ok(())
Ok(thread_subscriptions)
}

/// Replaces a single event, be it saved in memory or in the store.
Expand Down Expand Up @@ -1687,8 +1738,10 @@ mod private {
#[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
pub async fn handle_sync(
&mut self,
push_context: ThreadPushContext,
mut timeline: Timeline,
) -> Result<(bool, Vec<VectorDiff<Event>>), EventCacheError> {
) -> Result<(bool, Vec<VectorDiff<Event>>, AutomaticThreadSubscriptions), EventCacheError>
{
let mut prev_batch = timeline.prev_batch.take();

let DeduplicationOutcome {
Expand Down Expand Up @@ -1759,7 +1812,7 @@ mod private {
if all_duplicates {
// No new events and no gap (per the previous check), thus no need to change the
// room state. We're done!
return Ok((false, Vec::new()));
return Ok((false, Vec::new(), Default::default()));
}

let has_new_gap = prev_batch.is_some();
Expand All @@ -1780,7 +1833,7 @@ mod private {
self.room_linked_chunk
.push_live_events(prev_batch.map(|prev_token| Gap { prev_token }), &events);

self.post_process_new_events(events, true).await?;
let new_thread_subs = self.post_process_new_events(push_context, events, true).await?;

if timeline.limited && has_new_gap {
// If there was a previous batch token for a limited timeline, unload the chunks
Expand All @@ -1794,7 +1847,7 @@ mod private {

let timeline_event_diffs = self.room_linked_chunk.updates_as_vector_diffs();

Ok((has_new_gap, timeline_event_diffs))
Ok((has_new_gap, timeline_event_diffs, new_thread_subs))
}

/// Handle the result of a single back-pagination request.
Expand All @@ -1807,11 +1860,14 @@ mod private {
#[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
pub async fn handle_backpagination(
&mut self,
push_context: ThreadPushContext,
events: Vec<Event>,
mut new_token: Option<String>,
prev_token: Option<String>,
) -> Result<Option<(BackPaginationOutcome, Vec<VectorDiff<Event>>)>, EventCacheError>
{
) -> Result<
Option<(BackPaginationOutcome, Vec<VectorDiff<Event>>, AutomaticThreadSubscriptions)>,
EventCacheError,
> {
// Check that the previous token still exists; otherwise it's a sign that the
// room's timeline has been cleared.
let prev_gap_id = if let Some(token) = prev_token {
Expand Down Expand Up @@ -1884,11 +1940,16 @@ mod private {
);

// Note: this flushes updates to the store.
self.post_process_new_events(topo_ordered_events, false).await?;
let new_thread_subs =
self.post_process_new_events(push_context, topo_ordered_events, false).await?;

let event_diffs = self.room_linked_chunk.updates_as_vector_diffs();

Ok(Some((BackPaginationOutcome { events, reached_start }, event_diffs)))
Ok(Some((
BackPaginationOutcome { events, reached_start },
event_diffs,
new_thread_subs,
)))
}

/// Subscribe to thread for a given root event, and get a (maybe empty)
Expand All @@ -1903,14 +1964,17 @@ mod private {
/// Back paginate in the given thread.
///
/// Will always start from the end, unless we previously paginated.
pub fn finish_thread_network_pagination(
pub async fn finish_thread_network_pagination(
&mut self,
push_context: ThreadPushContext,
root: OwnedEventId,
prev_token: Option<String>,
new_token: Option<String>,
events: Vec<Event>,
) -> Option<BackPaginationOutcome> {
self.get_or_reload_thread(root).finish_network_pagination(prev_token, new_token, events)
) -> Option<(BackPaginationOutcome, AutomaticThreadSubscriptions)> {
self.get_or_reload_thread(root)
.finish_network_pagination(push_context, prev_token, new_token, events)
.await
}

pub fn load_more_thread_events_backwards(
Expand Down
Loading
Loading