Skip to content

Commit c25be8b

Browse files
committed
feat(event cache): automatically subscribe to threads according to msc4306 semantics
1 parent 1554c9d commit c25be8b

File tree

2 files changed

+173
-6
lines changed

2 files changed

+173
-6
lines changed

crates/matrix-sdk/src/event_cache/mod.rs

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ use matrix_sdk_base::{
4242
store::{EventCacheStoreError, EventCacheStoreLock},
4343
Gap,
4444
},
45+
executor::AbortOnDrop,
4546
linked_chunk::{self, lazy_loader::LazyLoaderError, OwnedLinkedChunkId},
4647
store_locks::LockStoreError,
4748
sync::RoomUpdates,
@@ -180,6 +181,13 @@ impl EventCache {
180181
let (generic_update_sender, _) = channel(32);
181182
let (linked_chunk_update_sender, _) = channel(32);
182183

184+
let (thread_subscriber_sender, thread_subscriber_receiver) = channel(32);
185+
let thread_subscriber_task = AbortOnDrop::new(spawn(Self::thread_subscriber_task(
186+
client.clone(),
187+
linked_chunk_update_sender.clone(),
188+
thread_subscriber_sender,
189+
)));
190+
183191
Self {
184192
inner: Arc::new(EventCacheInner {
185193
client,
@@ -190,10 +198,20 @@ impl EventCache {
190198
auto_shrink_sender: Default::default(),
191199
generic_update_sender,
192200
linked_chunk_update_sender,
201+
_thread_subscriber_task: thread_subscriber_task,
202+
thread_subscriber_receiver,
193203
}),
194204
}
195205
}
196206

207+
/// Subscribes to updates that a thread subscription has been sent.
208+
///
209+
/// For testing purposes only.
210+
#[doc(hidden)]
211+
pub fn subscribe_thread_subscriber_updates(&self) -> Receiver<()> {
212+
self.inner.thread_subscriber_receiver.resubscribe()
213+
}
214+
197215
/// Starts subscribing the [`EventCache`] to sync responses, if not done
198216
/// before.
199217
///
@@ -403,6 +421,115 @@ impl EventCache {
403421
pub fn subscribe_to_room_generic_updates(&self) -> Receiver<RoomEventCacheGenericUpdate> {
404422
self.inner.generic_update_sender.subscribe()
405423
}
424+
425+
#[instrument(skip_all)]
426+
async fn thread_subscriber_task(
427+
client: WeakClient,
428+
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
429+
thread_subscriber_sender: Sender<()>,
430+
) {
431+
if client.get().map_or(false, |client| !client.enabled_thread_subscriptions()) {
432+
trace!("Not spawning the thread subscriber task, because the client is shutting down or is not interested in those");
433+
return;
434+
}
435+
436+
let mut rx = linked_chunk_update_sender.subscribe();
437+
438+
loop {
439+
match rx.recv().await {
440+
Ok(up) => {
441+
let Some(client) = client.get() else {
442+
// Client shutting down.
443+
debug!("Client is shutting down, exiting thread subscriber task");
444+
break;
445+
};
446+
447+
let OwnedLinkedChunkId::Thread(room_id, thread_root) = &up.linked_chunk else {
448+
trace!("received an update for a non-thread linked chunk, ignoring");
449+
continue;
450+
};
451+
452+
let Some(room) = client.get_room(&room_id) else {
453+
warn!(%room_id, "unknown room");
454+
continue;
455+
};
456+
457+
let thread_root = thread_root.clone();
458+
459+
let new_events = up.events();
460+
if new_events.is_empty() {
461+
// No new events, nothing to do.
462+
continue;
463+
}
464+
465+
// This `PushContext` is going to be used to compute whether an in-thread event
466+
// would trigger a mention.
467+
//
468+
// Of course, we're not interested in an in-thread event causing a mention,
469+
// because it's part of a thread we've subscribed to. So the
470+
// `PushContext` must not include the check for thread subscriptions (otherwise
471+
// it would be impossible to subscribe to new threads).
472+
473+
let with_thread_subscriptions = false;
474+
475+
let Some(push_context) = room
476+
.push_context_internal(with_thread_subscriptions)
477+
.await
478+
.inspect_err(|err| {
479+
warn!("Failed to get push context for threads: {err}");
480+
})
481+
.ok()
482+
.flatten()
483+
else {
484+
warn!("Missing push context for thread subscriptions.");
485+
continue;
486+
};
487+
488+
let mut subscribe_up_to = None;
489+
490+
// Find if there's an event that would trigger a mention for the current
491+
// user, iterating from the end of the new events towards the oldest,
492+
for ev in new_events.into_iter().rev() {
493+
if push_context
494+
.for_event(ev.raw())
495+
.await
496+
.into_iter()
497+
.any(|action| action.should_notify())
498+
{
499+
let Some(event_id) = ev.event_id() else {
500+
// Shouldn't happen.
501+
continue;
502+
};
503+
subscribe_up_to = Some(event_id.to_owned());
504+
break;
505+
}
506+
}
507+
508+
// And if we've found such a mention, subscribe to the thread up to this
509+
// event.
510+
if let Some(event_id) = subscribe_up_to {
511+
trace!(thread = %thread_root, up_to = %event_id, "found a new thread to subscribe to");
512+
if let Err(err) =
513+
room.subscribe_thread_if_needed(&thread_root, Some(event_id)).await
514+
{
515+
warn!(%err, "Failed to subscribe to thread");
516+
} else {
517+
let _ = thread_subscriber_sender.send(());
518+
}
519+
}
520+
}
521+
522+
Err(RecvError::Closed) => {
523+
debug!("Linked chunk update channel has been closed, exiting thread subscriber task");
524+
break;
525+
}
526+
527+
Err(RecvError::Lagged(num_skipped)) => {
528+
warn!(num_skipped, "Lagged behind linked chunk updates");
529+
}
530+
}
531+
}
532+
}
406533
}
407534

408535
struct EventCacheInner {
@@ -448,6 +575,21 @@ struct EventCacheInner {
448575
///
449576
/// See doc comment of [`RoomEventCacheLinkedChunkUpdate`].
450577
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
578+
579+
/// A background task listening to room and send queue updates, and
580+
/// automatically subscribing the user to threads when needed, based on
581+
/// the semantics of MSC4306.
582+
///
583+
/// One important constraint is that there is only one such task per
584+
/// [`EventCache`], so it does listen to *all* rooms at the same time.
585+
_thread_subscriber_task: AbortOnDrop<()>,
586+
587+
/// A test helper receiver that will be emitted every time the thread
588+
/// subscriber task subscribed to a new thread.
589+
///
590+
/// This is helpful for tests to coordinate that a new thread subscription
591+
/// has been sent or not.
592+
thread_subscriber_receiver: Receiver<()>,
451593
}
452594

453595
type AutoShrinkChannelPayload = OwnedRoomId;

crates/matrix-sdk/src/room/mod.rs

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3000,11 +3000,24 @@ impl Room {
30003000
self.inner.load_event_receipts(receipt_type, thread, event_id).await.map_err(Into::into)
30013001
}
30023002

3003-
/// Get the push context for this room.
3003+
/// Get the push-condition context for this room.
30043004
///
30053005
/// Returns `None` if some data couldn't be found. This should only happen
30063006
/// in brand new rooms, while we process its state.
30073007
pub async fn push_condition_room_ctx(&self) -> Result<Option<PushConditionRoomCtx>> {
3008+
self.push_condition_room_ctx_internal(self.client.enabled_thread_subscriptions()).await
3009+
}
3010+
3011+
/// Get the push-condition context for this room, with a choice to include
3012+
/// thread subscriptions or not, based on the extra
3013+
/// `with_threads_subscriptions` parameter.
3014+
///
3015+
/// Returns `None` if some data couldn't be found. This should only happen
3016+
/// in brand new rooms, while we process its state.
3017+
pub(crate) async fn push_condition_room_ctx_internal(
3018+
&self,
3019+
with_threads_subscriptions: bool,
3020+
) -> Result<Option<PushConditionRoomCtx>> {
30083021
let room_id = self.room_id();
30093022
let user_id = self.own_user_id();
30103023
let room_info = self.clone_info();
@@ -3028,7 +3041,6 @@ impl Room {
30283041
}
30293042
};
30303043

3031-
let this = self.clone();
30323044
let mut ctx = assign!(PushConditionRoomCtx::new(
30333045
room_id.to_owned(),
30343046
UInt::new(member_count).unwrap_or(UInt::MAX),
@@ -3039,12 +3051,12 @@ impl Room {
30393051
power_levels,
30403052
});
30413053

3042-
if self.client.enabled_thread_subscriptions() {
3054+
if with_threads_subscriptions {
3055+
let this = self.clone();
30433056
ctx = ctx.with_has_thread_subscription_fn(move |event_id: &EventId| {
30443057
let room = this.clone();
30453058
Box::pin(async move {
3046-
if let Ok(maybe_sub) = room.fetch_thread_subscription(event_id.to_owned()).await
3047-
{
3059+
if let Ok(maybe_sub) = room.load_or_fetch_thread_subscription(event_id).await {
30483060
maybe_sub.is_some()
30493061
} else {
30503062
false
@@ -3059,7 +3071,20 @@ impl Room {
30593071
/// Retrieves a [`PushContext`] that can be used to compute the push
30603072
/// actions for events.
30613073
pub async fn push_context(&self) -> Result<Option<PushContext>> {
3062-
let Some(push_condition_room_ctx) = self.push_condition_room_ctx().await? else {
3074+
self.push_context_internal(self.client.enabled_thread_subscriptions()).await
3075+
}
3076+
3077+
/// Retrieves a [`PushContext`] that can be used to compute the push actions
3078+
/// for events, with a choice to include thread subscriptions or not,
3079+
/// based on the extra `with_threads_subscriptions` parameter.
3080+
#[instrument(skip(self))]
3081+
pub(crate) async fn push_context_internal(
3082+
&self,
3083+
with_threads_subscriptions: bool,
3084+
) -> Result<Option<PushContext>> {
3085+
let Some(push_condition_room_ctx) =
3086+
self.push_condition_room_ctx_internal(with_threads_subscriptions).await?
3087+
else {
30633088
debug!("Could not aggregate push context");
30643089
return Ok(None);
30653090
};

0 commit comments

Comments
 (0)