Skip to content

Commit 6781af7

Browse files
committed
feat(event cache): automatically subscribe to threads according to msc4306 semantics
1 parent 9d90a92 commit 6781af7

File tree

6 files changed

+293
-53
lines changed

6 files changed

+293
-53
lines changed

crates/matrix-sdk/src/event_cache/mod.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,9 @@ mod pagination;
6060
mod room;
6161

6262
pub use pagination::{RoomPagination, RoomPaginationStatus};
63-
pub use room::{RoomEventCache, RoomEventCacheSubscriber, ThreadEventCacheUpdate};
63+
pub use room::{
64+
should_subscribe_thread, RoomEventCache, RoomEventCacheSubscriber, ThreadEventCacheUpdate,
65+
};
6466

6567
/// An error observed in the [`EventCache`].
6668
#[derive(thiserror::Error, Debug)]
@@ -102,6 +104,10 @@ pub enum EventCacheError {
102104
#[error("The owning client of the event cache has been dropped.")]
103105
ClientDropped,
104106

107+
/// The current client isn't logged in.
108+
#[error("The current client isn't logged in")]
109+
UnknownUser,
110+
105111
/// An error happening when interacting with the [`LinkedChunk`]'s lazy
106112
/// loader.
107113
///

crates/matrix-sdk/src/event_cache/pagination.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@ use super::{
2626
BackPaginationOutcome, EventsOrigin, Result, RoomEventCacheUpdate,
2727
};
2828
use crate::{
29-
event_cache::{EventCacheError, RoomEventCacheGenericUpdate},
29+
event_cache::{
30+
room::threads::{push_context_for_threads_subscriptions, subscribe_to_new_threads},
31+
EventCacheError, RoomEventCacheGenericUpdate,
32+
},
3033
room::MessagesOptions,
3134
};
3235

@@ -263,7 +266,7 @@ impl RoomPagination {
263266
batch_size: u16,
264267
prev_token: Option<String>,
265268
) -> Result<Option<BackPaginationOutcome>> {
266-
let (events, new_token) = {
269+
let (room, events, new_token) = {
267270
let Some(room) = self.inner.weak_room.get() else {
268271
// The client is shutting down, return an empty default response.
269272
return Ok(Some(BackPaginationOutcome {
@@ -280,15 +283,17 @@ impl RoomPagination {
280283
.await
281284
.map_err(|err| EventCacheError::BackpaginationError(Box::new(err)))?;
282285

283-
(response.chunk, response.end)
286+
(room, response.chunk, response.end)
284287
};
285288

286-
if let Some((outcome, timeline_event_diffs)) = self
289+
let push_context = push_context_for_threads_subscriptions(&room).await;
290+
291+
if let Some((outcome, timeline_event_diffs, new_thread_subs)) = self
287292
.inner
288293
.state
289294
.write()
290295
.await
291-
.handle_backpagination(events, new_token, prev_token)
296+
.handle_backpagination(push_context, events, new_token, prev_token)
292297
.await?
293298
{
294299
if !timeline_event_diffs.is_empty() {
@@ -298,6 +303,8 @@ impl RoomPagination {
298303
});
299304
}
300305

306+
subscribe_to_new_threads(&room, new_thread_subs).await;
307+
301308
Ok(Some(outcome))
302309
} else {
303310
// The previous token has gone missing, so the timeline has been reset in the

crates/matrix-sdk/src/event_cache/room/mod.rs

Lines changed: 93 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -51,14 +51,19 @@ use super::{
5151
};
5252
use crate::{
5353
client::WeakClient,
54-
event_cache::EventCacheError,
54+
event_cache::{
55+
room::threads::{
56+
push_context_for_threads_subscriptions, subscribe_to_new_threads, ThreadPushContext,
57+
},
58+
EventCacheError,
59+
},
5560
room::{IncludeRelations, RelationsOptions, WeakRoom},
5661
};
5762

5863
pub(super) mod events;
59-
mod threads;
64+
pub(super) mod threads;
6065

61-
pub use threads::ThreadEventCacheUpdate;
66+
pub use threads::{should_subscribe_thread, ThreadEventCacheUpdate};
6267

6368
/// A subset of an event cache, for a room.
6469
///
@@ -276,14 +281,22 @@ impl RoomEventCache {
276281
result.chunk.push(root_event);
277282
}
278283

284+
let push_context = push_context_for_threads_subscriptions(&room).await;
285+
279286
let mut state = self.inner.state.write().await;
280287

281-
if let Some(outcome) = state.finish_thread_network_pagination(
282-
thread_root.clone(),
283-
prev_token,
284-
result.next_batch_token,
285-
result.chunk,
286-
) {
288+
if let Some((outcome, thread_subscriptions)) = state
289+
.finish_thread_network_pagination(
290+
push_context,
291+
thread_root.clone(),
292+
prev_token,
293+
result.next_batch_token,
294+
result.chunk,
295+
)
296+
.await
297+
{
298+
subscribe_to_new_threads(&room, thread_subscriptions).await;
299+
287300
return Ok(outcome.reached_start);
288301
}
289302

@@ -539,11 +552,22 @@ impl RoomEventCacheInner {
539552
return Ok(());
540553
}
541554

555+
let room = self.weak_room.get();
556+
let push_context = if let Some(room) = &room {
557+
push_context_for_threads_subscriptions(&room).await
558+
} else {
559+
ThreadPushContext::default()
560+
};
561+
542562
// Add all the events to the backend.
543563
trace!("adding new events");
544564

545-
let (stored_prev_batch_token, timeline_event_diffs) =
546-
self.state.write().await.handle_sync(timeline).await?;
565+
let (stored_prev_batch_token, timeline_event_diffs, new_thread_subs) =
566+
self.state.write().await.handle_sync(push_context, timeline).await?;
567+
568+
if let Some(room) = room {
569+
subscribe_to_new_threads(&room, new_thread_subs).await;
570+
}
547571

548572
// Now that all events have been added, we can trigger the
549573
// `pagination_token_notifier`.
@@ -648,7 +672,11 @@ mod private {
648672
sort_positions_descending, EventLocation, LoadMoreEventsBackwardsOutcome,
649673
};
650674
use crate::event_cache::{
651-
deduplicator::filter_duplicate_events, room::threads::ThreadEventCache,
675+
deduplicator::filter_duplicate_events,
676+
room::threads::{
677+
should_subscribe_thread, AutomaticThreadSubscriptions, ThreadEventCache,
678+
ThreadPushContext,
679+
},
652680
BackPaginationOutcome, RoomPaginationStatus, ThreadEventCacheUpdate,
653681
};
654682

@@ -1438,11 +1466,14 @@ mod private {
14381466
/// linked chunk.
14391467
///
14401468
/// Flushes updates to disk first.
1469+
///
1470+
/// Returns new thread subscriptions, if any.
14411471
async fn post_process_new_events(
14421472
&mut self,
1473+
push_context: ThreadPushContext,
14431474
events: Vec<Event>,
14441475
is_sync: bool,
1445-
) -> Result<(), EventCacheError> {
1476+
) -> Result<AutomaticThreadSubscriptions, EventCacheError> {
14461477
// Update the store before doing the post-processing.
14471478
self.propagate_changes().await?;
14481479

@@ -1454,7 +1485,7 @@ mod private {
14541485
if let Some(thread_root) = extract_thread_root(event.raw()) {
14551486
new_events_by_thread.entry(thread_root).or_default().push(event.clone());
14561487
} else if let Some(event_id) = event.event_id() {
1457-
// If we spot the root of a thread, add it to its linked chunk, in sync mode.
1488+
// If we spot the root of a thread, add it to its linked chunk.
14581489
if self.threads.contains_key(&event_id) {
14591490
new_events_by_thread.entry(event_id).or_default().push(event.clone());
14601491
}
@@ -1466,9 +1497,10 @@ mod private {
14661497
}
14671498
}
14681499

1469-
self.update_threads(new_events_by_thread, is_sync).await?;
1500+
let new_thread_subs =
1501+
self.update_threads(push_context, new_events_by_thread, is_sync).await?;
14701502

1471-
Ok(())
1503+
Ok(new_thread_subs)
14721504
}
14731505

14741506
fn get_or_reload_thread(&mut self, root_event_id: OwnedEventId) -> &mut ThreadEventCache {
@@ -1479,15 +1511,34 @@ mod private {
14791511
.or_insert_with(|| ThreadEventCache::new(root_event_id))
14801512
}
14811513

1514+
/// Updates the threads' states according to new events:
1515+
///
1516+
/// - updates the in-memory thread cache with new events,
1517+
/// - updates the thread summary in the thread root (in the main room's
1518+
/// linked chunk),
1519+
/// - returns the threads to automatically subscribe to
14821520
#[instrument(skip_all)]
14831521
async fn update_threads(
14841522
&mut self,
1523+
push_context: ThreadPushContext,
14851524
new_events_by_thread: BTreeMap<OwnedEventId, Vec<Event>>,
14861525
is_sync: bool,
1487-
) -> Result<(), EventCacheError> {
1526+
) -> Result<AutomaticThreadSubscriptions, EventCacheError> {
1527+
let mut thread_subscriptions = AutomaticThreadSubscriptions::default();
1528+
14881529
for (thread_root, new_events) in new_events_by_thread {
14891530
let thread_cache = self.get_or_reload_thread(thread_root.clone());
14901531

1532+
// Compute the needed thread subscriptions.
1533+
// We want to subscribe up to the most recent event that mentions us. Events are
1534+
// in topological order, so start in reverse, and break at the
1535+
// first event that mentions us.
1536+
if let Some(subscribe_to_event_id) =
1537+
should_subscribe_thread(&push_context, new_events.iter()).await
1538+
{
1539+
thread_subscriptions.0.insert(thread_root.clone(), subscribe_to_event_id);
1540+
}
1541+
14911542
// If we're not in sync mode, we're receiving events from a room pagination: as
14921543
// we don't know where they should be put in a thread linked
14931544
// chunk, we don't try to be smart and include them. That's for
@@ -1547,7 +1598,7 @@ mod private {
15471598
self.replace_event_at(location, target_event).await?;
15481599
}
15491600

1550-
Ok(())
1601+
Ok(thread_subscriptions)
15511602
}
15521603

15531604
/// Replaces a single event, be it saved in memory or in the store.
@@ -1687,8 +1738,10 @@ mod private {
16871738
#[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
16881739
pub async fn handle_sync(
16891740
&mut self,
1741+
push_context: ThreadPushContext,
16901742
mut timeline: Timeline,
1691-
) -> Result<(bool, Vec<VectorDiff<Event>>), EventCacheError> {
1743+
) -> Result<(bool, Vec<VectorDiff<Event>>, AutomaticThreadSubscriptions), EventCacheError>
1744+
{
16921745
let mut prev_batch = timeline.prev_batch.take();
16931746

16941747
let DeduplicationOutcome {
@@ -1759,7 +1812,7 @@ mod private {
17591812
if all_duplicates {
17601813
// No new events and no gap (per the previous check), thus no need to change the
17611814
// room state. We're done!
1762-
return Ok((false, Vec::new()));
1815+
return Ok((false, Vec::new(), Default::default()));
17631816
}
17641817

17651818
let has_new_gap = prev_batch.is_some();
@@ -1780,7 +1833,7 @@ mod private {
17801833
self.room_linked_chunk
17811834
.push_live_events(prev_batch.map(|prev_token| Gap { prev_token }), &events);
17821835

1783-
self.post_process_new_events(events, true).await?;
1836+
let new_thread_subs = self.post_process_new_events(push_context, events, true).await?;
17841837

17851838
if timeline.limited && has_new_gap {
17861839
// If there was a previous batch token for a limited timeline, unload the chunks
@@ -1794,7 +1847,7 @@ mod private {
17941847

17951848
let timeline_event_diffs = self.room_linked_chunk.updates_as_vector_diffs();
17961849

1797-
Ok((has_new_gap, timeline_event_diffs))
1850+
Ok((has_new_gap, timeline_event_diffs, new_thread_subs))
17981851
}
17991852

18001853
/// Handle the result of a single back-pagination request.
@@ -1807,11 +1860,14 @@ mod private {
18071860
#[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
18081861
pub async fn handle_backpagination(
18091862
&mut self,
1863+
push_context: ThreadPushContext,
18101864
events: Vec<Event>,
18111865
mut new_token: Option<String>,
18121866
prev_token: Option<String>,
1813-
) -> Result<Option<(BackPaginationOutcome, Vec<VectorDiff<Event>>)>, EventCacheError>
1814-
{
1867+
) -> Result<
1868+
Option<(BackPaginationOutcome, Vec<VectorDiff<Event>>, AutomaticThreadSubscriptions)>,
1869+
EventCacheError,
1870+
> {
18151871
// Check that the previous token still exists; otherwise it's a sign that the
18161872
// room's timeline has been cleared.
18171873
let prev_gap_id = if let Some(token) = prev_token {
@@ -1884,11 +1940,16 @@ mod private {
18841940
);
18851941

18861942
// Note: this flushes updates to the store.
1887-
self.post_process_new_events(topo_ordered_events, false).await?;
1943+
let new_thread_subs =
1944+
self.post_process_new_events(push_context, topo_ordered_events, false).await?;
18881945

18891946
let event_diffs = self.room_linked_chunk.updates_as_vector_diffs();
18901947

1891-
Ok(Some((BackPaginationOutcome { events, reached_start }, event_diffs)))
1948+
Ok(Some((
1949+
BackPaginationOutcome { events, reached_start },
1950+
event_diffs,
1951+
new_thread_subs,
1952+
)))
18921953
}
18931954

18941955
/// Subscribe to thread for a given root event, and get a (maybe empty)
@@ -1903,14 +1964,17 @@ mod private {
19031964
/// Back paginate in the given thread.
19041965
///
19051966
/// Will always start from the end, unless we previously paginated.
1906-
pub fn finish_thread_network_pagination(
1967+
pub async fn finish_thread_network_pagination(
19071968
&mut self,
1969+
push_context: ThreadPushContext,
19081970
root: OwnedEventId,
19091971
prev_token: Option<String>,
19101972
new_token: Option<String>,
19111973
events: Vec<Event>,
1912-
) -> Option<BackPaginationOutcome> {
1913-
self.get_or_reload_thread(root).finish_network_pagination(prev_token, new_token, events)
1974+
) -> Option<(BackPaginationOutcome, AutomaticThreadSubscriptions)> {
1975+
self.get_or_reload_thread(root)
1976+
.finish_network_pagination(push_context, prev_token, new_token, events)
1977+
.await
19141978
}
19151979

19161980
pub fn load_more_thread_events_backwards(

0 commit comments

Comments
 (0)