Skip to content

Commit 71b23c6

Browse files
committed
feat(event cache): automatically subscribe to threads according to msc4306 semantics
1 parent 6dcc394 commit 71b23c6

File tree

2 files changed

+173
-6
lines changed

2 files changed

+173
-6
lines changed

crates/matrix-sdk/src/event_cache/mod.rs

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ use matrix_sdk_base::{
4242
store::{EventCacheStoreError, EventCacheStoreLock},
4343
Gap,
4444
},
45+
executor::AbortOnDrop,
4546
linked_chunk::{self, lazy_loader::LazyLoaderError, OwnedLinkedChunkId},
4647
store_locks::LockStoreError,
4748
sync::RoomUpdates,
@@ -173,6 +174,13 @@ impl EventCache {
173174
let (generic_update_sender, _) = channel(32);
174175
let (linked_chunk_update_sender, _) = channel(32);
175176

177+
let (thread_subscriber_sender, thread_subscriber_receiver) = channel(32);
178+
let thread_subscriber_task = AbortOnDrop::new(spawn(Self::thread_subscriber_task(
179+
client.clone(),
180+
linked_chunk_update_sender.clone(),
181+
thread_subscriber_sender,
182+
)));
183+
176184
Self {
177185
inner: Arc::new(EventCacheInner {
178186
client,
@@ -183,10 +191,20 @@ impl EventCache {
183191
auto_shrink_sender: Default::default(),
184192
generic_update_sender,
185193
linked_chunk_update_sender,
194+
_thread_subscriber_task: thread_subscriber_task,
195+
thread_subscriber_receiver,
186196
}),
187197
}
188198
}
189199

200+
/// Subscribes to updates that a thread subscription has been sent.
201+
///
202+
/// For testing purposes only.
203+
#[doc(hidden)]
204+
pub fn subscribe_thread_subscriber_updates(&self) -> Receiver<()> {
205+
self.inner.thread_subscriber_receiver.resubscribe()
206+
}
207+
190208
/// Starts subscribing the [`EventCache`] to sync responses, if not done
191209
/// before.
192210
///
@@ -396,6 +414,115 @@ impl EventCache {
396414
pub fn subscribe_to_room_generic_updates(&self) -> Receiver<RoomEventCacheGenericUpdate> {
397415
self.inner.generic_update_sender.subscribe()
398416
}
417+
418+
#[instrument(skip_all)]
419+
async fn thread_subscriber_task(
420+
client: WeakClient,
421+
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
422+
thread_subscriber_sender: Sender<()>,
423+
) {
424+
if client.get().map_or(false, |client| !client.enabled_thread_subscriptions()) {
425+
trace!("Not spawning the thread subscriber task, because the client is shutting down or is not interested in those");
426+
return;
427+
}
428+
429+
let mut rx = linked_chunk_update_sender.subscribe();
430+
431+
loop {
432+
match rx.recv().await {
433+
Ok(up) => {
434+
let Some(client) = client.get() else {
435+
// Client shutting down.
436+
debug!("Client is shutting down, exiting thread subscriber task");
437+
break;
438+
};
439+
440+
let OwnedLinkedChunkId::Thread(room_id, thread_root) = &up.linked_chunk else {
441+
trace!("received an update for a non-thread linked chunk, ignoring");
442+
continue;
443+
};
444+
445+
let Some(room) = client.get_room(&room_id) else {
446+
warn!(%room_id, "unknown room");
447+
continue;
448+
};
449+
450+
let thread_root = thread_root.clone();
451+
452+
let new_events = up.events();
453+
if new_events.is_empty() {
454+
// No new events, nothing to do.
455+
continue;
456+
}
457+
458+
// This `PushContext` is going to be used to compute whether an in-thread event
459+
// would trigger a mention.
460+
//
461+
// Of course, we're not interested in an in-thread event causing a mention,
462+
// because it's part of a thread we've subscribed to. So the
463+
// `PushContext` must not include the check for thread subscriptions (otherwise
464+
// it would be impossible to subscribe to new threads).
465+
466+
let with_thread_subscriptions = false;
467+
468+
let Some(push_context) = room
469+
.push_context_internal(with_thread_subscriptions)
470+
.await
471+
.inspect_err(|err| {
472+
warn!("Failed to get push context for threads: {err}");
473+
})
474+
.ok()
475+
.flatten()
476+
else {
477+
warn!("Missing push context for thread subscriptions.");
478+
continue;
479+
};
480+
481+
let mut subscribe_up_to = None;
482+
483+
// Find if there's an event that would trigger a mention for the current
484+
// user, iterating from the end of the new events towards the oldest,
485+
for ev in new_events.into_iter().rev() {
486+
if push_context
487+
.for_event(ev.raw())
488+
.await
489+
.into_iter()
490+
.any(|action| action.should_notify())
491+
{
492+
let Some(event_id) = ev.event_id() else {
493+
// Shouldn't happen.
494+
continue;
495+
};
496+
subscribe_up_to = Some(event_id.to_owned());
497+
break;
498+
}
499+
}
500+
501+
// And if we've found such a mention, subscribe to the thread up to this
502+
// event.
503+
if let Some(event_id) = subscribe_up_to {
504+
trace!(thread = %thread_root, up_to = %event_id, "found a new thread to subscribe to");
505+
if let Err(err) =
506+
room.subscribe_thread_if_needed(&thread_root, Some(event_id)).await
507+
{
508+
warn!(%err, "Failed to subscribe to thread");
509+
} else {
510+
let _ = thread_subscriber_sender.send(());
511+
}
512+
}
513+
}
514+
515+
Err(RecvError::Closed) => {
516+
debug!("Linked chunk update channel has been closed, exiting thread subscriber task");
517+
break;
518+
}
519+
520+
Err(RecvError::Lagged(num_skipped)) => {
521+
warn!(num_skipped, "Lagged behind linked chunk updates");
522+
}
523+
}
524+
}
525+
}
399526
}
400527

401528
struct EventCacheInner {
@@ -441,6 +568,21 @@ struct EventCacheInner {
441568
///
442569
/// See doc comment of [`RoomEventCacheLinkedChunkUpdate`].
443570
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
571+
572+
/// A background task listening to room and send queue updates, and
573+
/// automatically subscribing the user to threads when needed, based on
574+
/// the semantics of MSC4306.
575+
///
576+
/// One important constraint is that there is only one such task per
577+
/// [`EventCache`], so it does listen to *all* rooms at the same time.
578+
_thread_subscriber_task: AbortOnDrop<()>,
579+
580+
/// A test helper receiver that will be emitted every time the thread
581+
/// subscriber task subscribed to a new thread.
582+
///
583+
/// This is helpful for tests to coordinate that a new thread subscription
584+
/// has been sent or not.
585+
thread_subscriber_receiver: Receiver<()>,
444586
}
445587

446588
type AutoShrinkChannelPayload = OwnedRoomId;

crates/matrix-sdk/src/room/mod.rs

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2994,11 +2994,24 @@ impl Room {
29942994
self.inner.load_event_receipts(receipt_type, thread, event_id).await.map_err(Into::into)
29952995
}
29962996

2997-
/// Get the push context for this room.
2997+
/// Get the push-condition context for this room.
29982998
///
29992999
/// Returns `None` if some data couldn't be found. This should only happen
30003000
/// in brand new rooms, while we process its state.
30013001
pub async fn push_condition_room_ctx(&self) -> Result<Option<PushConditionRoomCtx>> {
3002+
self.push_condition_room_ctx_internal(self.client.enabled_thread_subscriptions()).await
3003+
}
3004+
3005+
/// Get the push-condition context for this room, with a choice to include
3006+
/// thread subscriptions or not, based on the extra
3007+
/// `with_threads_subscriptions` parameter.
3008+
///
3009+
/// Returns `None` if some data couldn't be found. This should only happen
3010+
/// in brand new rooms, while we process its state.
3011+
pub(crate) async fn push_condition_room_ctx_internal(
3012+
&self,
3013+
with_threads_subscriptions: bool,
3014+
) -> Result<Option<PushConditionRoomCtx>> {
30023015
let room_id = self.room_id();
30033016
let user_id = self.own_user_id();
30043017
let room_info = self.clone_info();
@@ -3022,7 +3035,6 @@ impl Room {
30223035
}
30233036
};
30243037

3025-
let this = self.clone();
30263038
let mut ctx = assign!(PushConditionRoomCtx::new(
30273039
room_id.to_owned(),
30283040
UInt::new(member_count).unwrap_or(UInt::MAX),
@@ -3033,12 +3045,12 @@ impl Room {
30333045
power_levels,
30343046
});
30353047

3036-
if self.client.enabled_thread_subscriptions() {
3048+
if with_threads_subscriptions {
3049+
let this = self.clone();
30373050
ctx = ctx.with_has_thread_subscription_fn(move |event_id: &EventId| {
30383051
let room = this.clone();
30393052
Box::pin(async move {
3040-
if let Ok(maybe_sub) = room.fetch_thread_subscription(event_id.to_owned()).await
3041-
{
3053+
if let Ok(maybe_sub) = room.load_or_fetch_thread_subscription(event_id).await {
30423054
maybe_sub.is_some()
30433055
} else {
30443056
false
@@ -3053,7 +3065,20 @@ impl Room {
30533065
/// Retrieves a [`PushContext`] that can be used to compute the push
30543066
/// actions for events.
30553067
pub async fn push_context(&self) -> Result<Option<PushContext>> {
3056-
let Some(push_condition_room_ctx) = self.push_condition_room_ctx().await? else {
3068+
self.push_context_internal(self.client.enabled_thread_subscriptions()).await
3069+
}
3070+
3071+
/// Retrieves a [`PushContext`] that can be used to compute the push actions
3072+
/// for events, with a choice to include thread subscriptions or not,
3073+
/// based on the extra `with_threads_subscriptions` parameter.
3074+
#[instrument(skip(self))]
3075+
pub(crate) async fn push_context_internal(
3076+
&self,
3077+
with_threads_subscriptions: bool,
3078+
) -> Result<Option<PushContext>> {
3079+
let Some(push_condition_room_ctx) =
3080+
self.push_condition_room_ctx_internal(with_threads_subscriptions).await?
3081+
else {
30573082
debug!("Could not aggregate push context");
30583083
return Ok(None);
30593084
};

0 commit comments

Comments
 (0)