Skip to content
Merged
142 changes: 142 additions & 0 deletions crates/matrix-sdk/src/event_cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use matrix_sdk_base::{
store::{EventCacheStoreError, EventCacheStoreLock},
Gap,
},
executor::AbortOnDrop,
linked_chunk::{self, lazy_loader::LazyLoaderError, OwnedLinkedChunkId},
store_locks::LockStoreError,
sync::RoomUpdates,
Expand Down Expand Up @@ -173,6 +174,13 @@ impl EventCache {
let (generic_update_sender, _) = channel(32);
let (linked_chunk_update_sender, _) = channel(32);

let (thread_subscriber_sender, thread_subscriber_receiver) = channel(32);
let thread_subscriber_task = AbortOnDrop::new(spawn(Self::thread_subscriber_task(
client.clone(),
linked_chunk_update_sender.clone(),
thread_subscriber_sender,
)));

Self {
inner: Arc::new(EventCacheInner {
client,
Expand All @@ -183,10 +191,20 @@ impl EventCache {
auto_shrink_sender: Default::default(),
generic_update_sender,
linked_chunk_update_sender,
_thread_subscriber_task: thread_subscriber_task,
thread_subscriber_receiver,
}),
}
}

/// Subscribes to updates that a thread subscription has been sent.
///
/// For testing purposes only.
#[doc(hidden)]
pub fn subscribe_thread_subscriber_updates(&self) -> Receiver<()> {
self.inner.thread_subscriber_receiver.resubscribe()
}

/// Starts subscribing the [`EventCache`] to sync responses, if not done
/// before.
///
Expand Down Expand Up @@ -396,6 +414,115 @@ impl EventCache {
pub fn subscribe_to_room_generic_updates(&self) -> Receiver<RoomEventCacheGenericUpdate> {
self.inner.generic_update_sender.subscribe()
}

#[instrument(skip_all)]
async fn thread_subscriber_task(
client: WeakClient,
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
thread_subscriber_sender: Sender<()>,
) {
if client.get().map_or(false, |client| !client.enabled_thread_subscriptions()) {
trace!("Not spawning the thread subscriber task, because the client is shutting down or is not interested in those");
return;
}

let mut rx = linked_chunk_update_sender.subscribe();

loop {
match rx.recv().await {
Ok(up) => {
let Some(client) = client.get() else {
// Client shutting down.
debug!("Client is shutting down, exiting thread subscriber task");
break;
};

let OwnedLinkedChunkId::Thread(room_id, thread_root) = &up.linked_chunk else {
trace!("received an update for a non-thread linked chunk, ignoring");
continue;
};

let Some(room) = client.get_room(&room_id) else {
warn!(%room_id, "unknown room");
continue;
};

let thread_root = thread_root.clone();

let new_events = up.events();
if new_events.is_empty() {
// No new events, nothing to do.
continue;
}

// This `PushContext` is going to be used to compute whether an in-thread event
// would trigger a mention.
//
// Of course, we're not interested in an in-thread event causing a mention,
// because it's part of a thread we've subscribed to. So the
// `PushContext` must not include the check for thread subscriptions (otherwise
// it would be impossible to subscribe to new threads).

let with_thread_subscriptions = false;

let Some(push_context) = room
.push_context_internal(with_thread_subscriptions)
.await
.inspect_err(|err| {
warn!("Failed to get push context for threads: {err}");
})
.ok()
.flatten()
else {
warn!("Missing push context for thread subscriptions.");
continue;
};

let mut subscribe_up_to = None;

// Find if there's an event that would trigger a mention for the current
// user, iterating from the end of the new events towards the oldest,
for ev in new_events.into_iter().rev() {
if push_context
.for_event(ev.raw())
.await
.into_iter()
.any(|action| action.should_notify())
{
let Some(event_id) = ev.event_id() else {
// Shouldn't happen.
continue;
};
subscribe_up_to = Some(event_id.to_owned());
break;
}
}

// And if we've found such a mention, subscribe to the thread up to this
// event.
if let Some(event_id) = subscribe_up_to {
trace!(thread = %thread_root, up_to = %event_id, "found a new thread to subscribe to");
if let Err(err) =
room.subscribe_thread_if_needed(&thread_root, Some(event_id)).await
{
warn!(%err, "Failed to subscribe to thread");
} else {
let _ = thread_subscriber_sender.send(());
}
}
}

Err(RecvError::Closed) => {
debug!("Linked chunk update channel has been closed, exiting thread subscriber task");
break;
}

Err(RecvError::Lagged(num_skipped)) => {
warn!(num_skipped, "Lagged behind linked chunk updates");
}
}
}
}
}

struct EventCacheInner {
Expand Down Expand Up @@ -441,6 +568,21 @@ struct EventCacheInner {
///
/// See doc comment of [`RoomEventCacheLinkedChunkUpdate`].
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,

/// A background task listening to room and send queue updates, and
/// automatically subscribing the user to threads when needed, based on
/// the semantics of MSC4306.
///
/// One important constraint is that there is only one such task per
/// [`EventCache`], so it does listen to *all* rooms at the same time.
_thread_subscriber_task: AbortOnDrop<()>,

/// A test helper receiver that will be emitted every time the thread
/// subscriber task subscribed to a new thread.
///
/// This is helpful for tests to coordinate that a new thread subscription
/// has been sent or not.
thread_subscriber_receiver: Receiver<()>,
}

type AutoShrinkChannelPayload = OwnedRoomId;
Expand Down
37 changes: 31 additions & 6 deletions crates/matrix-sdk/src/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2994,11 +2994,24 @@ impl Room {
self.inner.load_event_receipts(receipt_type, thread, event_id).await.map_err(Into::into)
}

/// Get the push context for this room.
/// Get the push-condition context for this room.
///
/// Returns `None` if some data couldn't be found. This should only happen
/// in brand new rooms, while we process its state.
pub async fn push_condition_room_ctx(&self) -> Result<Option<PushConditionRoomCtx>> {
self.push_condition_room_ctx_internal(self.client.enabled_thread_subscriptions()).await
}

/// Get the push-condition context for this room, with a choice to include
/// thread subscriptions or not, based on the extra
/// `with_threads_subscriptions` parameter.
///
/// Returns `None` if some data couldn't be found. This should only happen
/// in brand new rooms, while we process its state.
pub(crate) async fn push_condition_room_ctx_internal(
&self,
with_threads_subscriptions: bool,
) -> Result<Option<PushConditionRoomCtx>> {
let room_id = self.room_id();
let user_id = self.own_user_id();
let room_info = self.clone_info();
Expand All @@ -3022,7 +3035,6 @@ impl Room {
}
};

let this = self.clone();
let mut ctx = assign!(PushConditionRoomCtx::new(
room_id.to_owned(),
UInt::new(member_count).unwrap_or(UInt::MAX),
Expand All @@ -3033,12 +3045,12 @@ impl Room {
power_levels,
});

if self.client.enabled_thread_subscriptions() {
if with_threads_subscriptions {
let this = self.clone();
ctx = ctx.with_has_thread_subscription_fn(move |event_id: &EventId| {
let room = this.clone();
Box::pin(async move {
if let Ok(maybe_sub) = room.fetch_thread_subscription(event_id.to_owned()).await
{
if let Ok(maybe_sub) = room.load_or_fetch_thread_subscription(event_id).await {
maybe_sub.is_some()
} else {
false
Expand All @@ -3053,7 +3065,20 @@ impl Room {
/// Retrieves a [`PushContext`] that can be used to compute the push
/// actions for events.
pub async fn push_context(&self) -> Result<Option<PushContext>> {
let Some(push_condition_room_ctx) = self.push_condition_room_ctx().await? else {
self.push_context_internal(self.client.enabled_thread_subscriptions()).await
}

/// Retrieves a [`PushContext`] that can be used to compute the push actions
/// for events, with a choice to include thread subscriptions or not,
/// based on the extra `with_threads_subscriptions` parameter.
#[instrument(skip(self))]
pub(crate) async fn push_context_internal(
&self,
with_threads_subscriptions: bool,
) -> Result<Option<PushContext>> {
let Some(push_condition_room_ctx) =
self.push_condition_room_ctx_internal(with_threads_subscriptions).await?
else {
debug!("Could not aggregate push context");
return Ok(None);
};
Expand Down