Skip to content

Commit 78c4a71

Browse files
committed
feat(event cache): automatically subscribe to threads according to msc4306 semantics
1 parent 92c582a commit 78c4a71

File tree

2 files changed

+173
-6
lines changed

2 files changed

+173
-6
lines changed

crates/matrix-sdk/src/event_cache/mod.rs

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ use matrix_sdk_base::{
4242
store::{EventCacheStoreError, EventCacheStoreLock},
4343
Gap,
4444
},
45+
executor::AbortOnDrop,
4546
linked_chunk::{self, lazy_loader::LazyLoaderError, OwnedLinkedChunkId},
4647
store_locks::LockStoreError,
4748
sync::RoomUpdates,
@@ -173,6 +174,13 @@ impl EventCache {
173174
let (room_event_cache_generic_update_sender, _) = channel(32);
174175
let (linked_chunk_update_sender, _) = channel(32);
175176

177+
let (thread_subscriber_sender, thread_subscriber_receiver) = channel(32);
178+
let thread_subscriber_task = AbortOnDrop::new(spawn(Self::thread_subscriber_task(
179+
client.clone(),
180+
linked_chunk_update_sender.clone(),
181+
thread_subscriber_sender.clone(),
182+
)));
183+
176184
Self {
177185
inner: Arc::new(EventCacheInner {
178186
client,
@@ -183,10 +191,20 @@ impl EventCache {
183191
auto_shrink_sender: Default::default(),
184192
room_event_cache_generic_update_sender,
185193
linked_chunk_update_sender,
194+
_thread_subscriber_task: thread_subscriber_task,
195+
thread_subscriber_receiver,
186196
}),
187197
}
188198
}
189199

200+
/// Subscribes to updates that a thread subscription has been sent.
201+
///
202+
/// For testing purposes only.
203+
#[doc(hidden)]
204+
pub fn subscribe_thread_subscriber_updates(&self) -> Receiver<()> {
205+
self.inner.thread_subscriber_receiver.resubscribe()
206+
}
207+
190208
/// Starts subscribing the [`EventCache`] to sync responses, if not done
191209
/// before.
192210
///
@@ -398,6 +416,115 @@ impl EventCache {
398416
pub fn subscribe_to_room_generic_updates(&self) -> Receiver<RoomEventCacheGenericUpdate> {
399417
self.inner.room_event_cache_generic_update_sender.subscribe()
400418
}
419+
420+
#[instrument(skip_all)]
421+
async fn thread_subscriber_task(
422+
client: WeakClient,
423+
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
424+
thread_subscriber_sender: Sender<()>,
425+
) {
426+
if client.get().map_or(false, |client| !client.enabled_thread_subscriptions()) {
427+
trace!("Not spawning the thread subscriber task, because the client is shutting down or is not interested in those");
428+
return;
429+
}
430+
431+
let mut rx = linked_chunk_update_sender.subscribe();
432+
433+
loop {
434+
match rx.recv().await {
435+
Ok(up) => {
436+
let Some(client) = client.get() else {
437+
// Client shutting down.
438+
debug!("Client is shutting down, exiting thread subscriber task");
439+
break;
440+
};
441+
442+
let OwnedLinkedChunkId::Thread(room_id, thread_root) = &up.linked_chunk else {
443+
trace!("received an update for a non-thread linked chunk, ignoring");
444+
continue;
445+
};
446+
447+
let Some(room) = client.get_room(&room_id) else {
448+
warn!(%room_id, "unknown room");
449+
continue;
450+
};
451+
452+
let thread_root = thread_root.clone();
453+
454+
let new_events = up.events();
455+
if new_events.is_empty() {
456+
// No new events, nothing to do.
457+
continue;
458+
}
459+
460+
// This `PushContext` is going to be used to compute whether an in-thread event
461+
// would trigger a mention.
462+
//
463+
// Of course, we're not interested in an in-thread event causing a mention,
464+
// because it's part of a thread we've subscribed to. So the
465+
// `PushContext` must not include the check for thread subscriptions (otherwise
466+
// it would be impossible to subscribe to new threads).
467+
468+
let with_thread_subscriptions = false;
469+
470+
let Some(push_context) = room
471+
.push_context_internal(with_thread_subscriptions)
472+
.await
473+
.inspect_err(|err| {
474+
warn!("Failed to get push context for threads: {err}");
475+
})
476+
.ok()
477+
.flatten()
478+
else {
479+
warn!("Missing push context for thread subscriptions.");
480+
continue;
481+
};
482+
483+
let mut subscribe_up_to = None;
484+
485+
// Find if there's an event that would trigger a mention for the current
486+
// user, iterating from the end of the new events towards the oldest,
487+
for ev in new_events.into_iter().rev() {
488+
if push_context
489+
.for_event(ev.raw())
490+
.await
491+
.into_iter()
492+
.any(|action| action.should_notify())
493+
{
494+
let Some(event_id) = ev.event_id() else {
495+
// Shouldn't happen.
496+
continue;
497+
};
498+
subscribe_up_to = Some(event_id.to_owned());
499+
break;
500+
}
501+
}
502+
503+
// And if we've found such a mention, subscribe to the thread up to this
504+
// event.
505+
if let Some(event_id) = subscribe_up_to {
506+
trace!(thread = %thread_root, up_to = %event_id, "found a new thread to subscribe to");
507+
if let Err(err) =
508+
room.subscribe_thread_if_needed(&thread_root, Some(event_id)).await
509+
{
510+
warn!(%err, "Failed to subscribe to thread");
511+
} else {
512+
let _ = thread_subscriber_sender.send(());
513+
}
514+
}
515+
}
516+
517+
Err(RecvError::Closed) => {
518+
debug!("Linked chunk update channel has been closed, exiting thread subscriber task");
519+
break;
520+
}
521+
522+
Err(RecvError::Lagged(num_skipped)) => {
523+
warn!(num_skipped, "Lagged behind linked chunk updates");
524+
}
525+
}
526+
}
527+
}
401528
}
402529

403530
struct EventCacheInner {
@@ -443,6 +570,21 @@ struct EventCacheInner {
443570
///
444571
/// See doc comment of [`RoomEventCacheLinkedChunkUpdate`].
445572
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
573+
574+
/// A background task listening to room and send queue updates, and
575+
/// automatically subscribing the user to threads when needed, based on
576+
/// the semantics of MSC4306.
577+
///
578+
/// One important constraint is that there is only one such task per
579+
/// [`EventCache`], so it does listen to *all* rooms at the same time.
580+
_thread_subscriber_task: AbortOnDrop<()>,
581+
582+
/// A test helper receiver that will be emitted every time the thread
583+
/// subscriber task subscribed to a new thread.
584+
///
585+
/// This is helpful for tests to coordinate that a new thread subscription
586+
/// has been sent or not.
587+
thread_subscriber_receiver: Receiver<()>,
446588
}
447589

448590
type AutoShrinkChannelPayload = OwnedRoomId;

crates/matrix-sdk/src/room/mod.rs

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2994,11 +2994,24 @@ impl Room {
29942994
self.inner.load_event_receipts(receipt_type, thread, event_id).await.map_err(Into::into)
29952995
}
29962996

2997-
/// Get the push context for this room.
2997+
/// Get the push-condition context for this room.
29982998
///
29992999
/// Returns `None` if some data couldn't be found. This should only happen
30003000
/// in brand new rooms, while we process its state.
30013001
pub async fn push_condition_room_ctx(&self) -> Result<Option<PushConditionRoomCtx>> {
3002+
self.push_condition_room_ctx_internal(self.client.enabled_thread_subscriptions()).await
3003+
}
3004+
3005+
/// Get the push-condition context for this room, with a choice to include
3006+
/// thread subscriptions or not, based on the extra
3007+
/// `with_threads_subscriptions` parameter.
3008+
///
3009+
/// Returns `None` if some data couldn't be found. This should only happen
3010+
/// in brand new rooms, while we process its state.
3011+
pub(crate) async fn push_condition_room_ctx_internal(
3012+
&self,
3013+
with_threads_subscriptions: bool,
3014+
) -> Result<Option<PushConditionRoomCtx>> {
30023015
let room_id = self.room_id();
30033016
let user_id = self.own_user_id();
30043017
let room_info = self.clone_info();
@@ -3022,7 +3035,6 @@ impl Room {
30223035
}
30233036
};
30243037

3025-
let this = self.clone();
30263038
let mut ctx = assign!(PushConditionRoomCtx::new(
30273039
room_id.to_owned(),
30283040
UInt::new(member_count).unwrap_or(UInt::MAX),
@@ -3033,12 +3045,12 @@ impl Room {
30333045
power_levels,
30343046
});
30353047

3036-
if self.client.enabled_thread_subscriptions() {
3048+
if with_threads_subscriptions {
3049+
let this = self.clone();
30373050
ctx = ctx.with_has_thread_subscription_fn(move |event_id: &EventId| {
30383051
let room = this.clone();
30393052
Box::pin(async move {
3040-
if let Ok(maybe_sub) = room.fetch_thread_subscription(event_id.to_owned()).await
3041-
{
3053+
if let Ok(maybe_sub) = room.load_or_fetch_thread_subscription(event_id).await {
30423054
maybe_sub.is_some()
30433055
} else {
30443056
false
@@ -3053,7 +3065,20 @@ impl Room {
30533065
/// Retrieves a [`PushContext`] that can be used to compute the push
30543066
/// actions for events.
30553067
pub async fn push_context(&self) -> Result<Option<PushContext>> {
3056-
let Some(push_condition_room_ctx) = self.push_condition_room_ctx().await? else {
3068+
self.push_context_internal(self.client.enabled_thread_subscriptions()).await
3069+
}
3070+
3071+
/// Retrieves a [`PushContext`] that can be used to compute the push actions
3072+
/// for events, with a choice to include thread subscriptions or not,
3073+
/// based on the extra `with_threads_subscriptions` parameter.
3074+
#[instrument(skip(self))]
3075+
pub(crate) async fn push_context_internal(
3076+
&self,
3077+
with_threads_subscriptions: bool,
3078+
) -> Result<Option<PushContext>> {
3079+
let Some(push_condition_room_ctx) =
3080+
self.push_condition_room_ctx_internal(with_threads_subscriptions).await?
3081+
else {
30573082
debug!("Could not aggregate push context");
30583083
return Ok(None);
30593084
};

0 commit comments

Comments
 (0)