diff --git a/crates/matrix-sdk-base/src/store/send_queue.rs b/crates/matrix-sdk-base/src/store/send_queue.rs index a67602f482c..0c1aa006f18 100644 --- a/crates/matrix-sdk-base/src/store/send_queue.rs +++ b/crates/matrix-sdk-base/src/store/send_queue.rs @@ -66,12 +66,19 @@ impl SerializableEventContent { self.event.deserialize_with_type(&self.event_type) } - /// Returns the raw event content along with its type. + /// Returns the raw event content along with its type, borrowed variant. /// /// Useful for callers manipulating custom events. pub fn raw(&self) -> (&Raw, &str) { (&self.event, &self.event_type) } + + /// Returns the raw event content along with its type, owned variant. + /// + /// Useful for callers manipulating custom events. + pub fn into_raw(self) -> (Raw, String) { + (self.event, self.event_type) + } } /// The kind of a send queue request. diff --git a/crates/matrix-sdk-common/src/serde_helpers.rs b/crates/matrix-sdk-common/src/serde_helpers.rs index 41cd06c8d4c..9d83d3b4c3e 100644 --- a/crates/matrix-sdk-common/src/serde_helpers.rs +++ b/crates/matrix-sdk-common/src/serde_helpers.rs @@ -17,7 +17,10 @@ use ruma::{ OwnedEventId, - events::{AnySyncMessageLikeEvent, AnySyncTimelineEvent, relation::BundledThread}, + events::{ + AnyMessageLikeEventContent, AnySyncMessageLikeEvent, AnySyncTimelineEvent, + relation::BundledThread, + }, serde::Raw, }; use serde::Deserialize; @@ -45,6 +48,22 @@ struct SimplifiedContent { relates_to: Option, } +/// Try to extract the thread root from an event's content, if provided. +/// +/// The thread root is the field located at `m.relates_to`.`event_id`, +/// if the field at `m.relates_to`.`rel_type` is `m.thread`. +/// +/// Returns `None` if we couldn't find a thread root, or if there was an issue +/// during deserialization. +pub fn extract_thread_root_from_content( + content: Raw, +) -> Option { + let relates_to = content.deserialize_as_unchecked::().ok()?.relates_to?; + match relates_to.rel_type { + RelationsType::Thread => relates_to.event_id, + } +} + /// Try to extract the thread root from a timeline event, if provided. /// /// The thread root is the field located at `content`.`m.relates_to`.`event_id`, diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index 3d80268d6f8..866b1726547 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -28,7 +28,7 @@ #![forbid(missing_docs)] use std::{ - collections::BTreeMap, + collections::{BTreeMap, HashMap}, fmt, sync::{Arc, OnceLock}, }; @@ -38,22 +38,37 @@ use eyeball_im::VectorDiff; use futures_util::future::{join_all, try_join_all}; use matrix_sdk_base::{ deserialized_responses::{AmbiguityChange, TimelineEvent}, - event_cache::store::{EventCacheStoreError, EventCacheStoreLock}, - linked_chunk::lazy_loader::LazyLoaderError, + event_cache::{ + store::{EventCacheStoreError, EventCacheStoreLock}, + Gap, + }, + executor::AbortOnDrop, + linked_chunk::{self, lazy_loader::LazyLoaderError, OwnedLinkedChunkId}, + serde_helpers::extract_thread_root_from_content, store_locks::LockStoreError, sync::RoomUpdates, timer, }; use matrix_sdk_common::executor::{spawn, JoinHandle}; use room::RoomEventCacheState; -use ruma::{events::AnySyncEphemeralRoomEvent, serde::Raw, OwnedEventId, OwnedRoomId, RoomId}; -use tokio::sync::{ - broadcast::{channel, error::RecvError, Receiver, Sender}, - mpsc, Mutex, RwLock, +use ruma::{ + events::AnySyncEphemeralRoomEvent, serde::Raw, OwnedEventId, OwnedRoomId, OwnedTransactionId, + RoomId, +}; +use tokio::{ + select, + sync::{ + broadcast::{channel, error::RecvError, Receiver, Sender}, + mpsc, Mutex, RwLock, + }, }; use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument as _, Span}; -use crate::{client::WeakClient, Client}; +use crate::{ + client::WeakClient, + send_queue::{LocalEchoContent, RoomSendQueueUpdate, SendQueueUpdate}, + Client, +}; mod deduplicator; mod pagination; @@ -168,6 +183,14 @@ impl EventCache { /// Create a new [`EventCache`] for the given client. pub(crate) fn new(client: WeakClient, event_cache_store: EventCacheStoreLock) -> Self { let (generic_update_sender, _) = channel(32); + let (linked_chunk_update_sender, _) = channel(32); + + let (thread_subscriber_sender, thread_subscriber_receiver) = channel(32); + let thread_subscriber_task = AbortOnDrop::new(spawn(Self::thread_subscriber_task( + client.clone(), + linked_chunk_update_sender.clone(), + thread_subscriber_sender, + ))); Self { inner: Arc::new(EventCacheInner { @@ -178,10 +201,21 @@ impl EventCache { drop_handles: Default::default(), auto_shrink_sender: Default::default(), generic_update_sender, + linked_chunk_update_sender, + _thread_subscriber_task: thread_subscriber_task, + thread_subscriber_receiver, }), } } + /// Subscribes to updates that a thread subscription has been sent. + /// + /// For testing purposes only. + #[doc(hidden)] + pub fn subscribe_thread_subscriber_updates(&self) -> Receiver<()> { + self.inner.thread_subscriber_receiver.resubscribe() + } + /// Starts subscribing the [`EventCache`] to sync responses, if not done /// before. /// @@ -391,6 +425,243 @@ impl EventCache { pub fn subscribe_to_room_generic_updates(&self) -> Receiver { self.inner.generic_update_sender.subscribe() } + + #[instrument(skip(client, thread_subscriber_sender))] + async fn handle_thread_subscriber_linked_chunk_update( + client: &WeakClient, + thread_subscriber_sender: &Sender<()>, + up: RoomEventCacheLinkedChunkUpdate, + ) -> bool { + let Some(client) = client.get() else { + // Client shutting down. + debug!("Client is shutting down, exiting thread subscriber task"); + return false; + }; + + let OwnedLinkedChunkId::Thread(room_id, thread_root) = &up.linked_chunk else { + trace!("received an update for a non-thread linked chunk, ignoring"); + return true; + }; + + let Some(room) = client.get_room(room_id) else { + warn!(%room_id, "unknown room"); + return true; + }; + + let thread_root = thread_root.clone(); + + let new_events = up.events(); + if new_events.is_empty() { + // No new events, nothing to do. + return true; + } + + // This `PushContext` is going to be used to compute whether an in-thread event + // would trigger a mention. + // + // Of course, we're not interested in an in-thread event causing a mention, + // because it's part of a thread we've subscribed to. So the + // `PushContext` must not include the check for thread subscriptions (otherwise + // it would be impossible to subscribe to new threads). + + let with_thread_subscriptions = false; + + let Some(push_context) = room + .push_context_internal(with_thread_subscriptions) + .await + .inspect_err(|err| { + warn!("Failed to get push context for threads: {err}"); + }) + .ok() + .flatten() + else { + warn!("Missing push context for thread subscriptions."); + return true; + }; + + let mut subscribe_up_to = None; + + // Find if there's an event that would trigger a mention for the current + // user, iterating from the end of the new events towards the oldest, + for ev in new_events.into_iter().rev() { + if push_context + .for_event(ev.raw()) + .await + .into_iter() + .any(|action| action.should_notify()) + { + let Some(event_id) = ev.event_id() else { + // Shouldn't happen. + continue; + }; + subscribe_up_to = Some(event_id); + break; + } + } + + // And if we've found such a mention, subscribe to the thread up to this + // event. + if let Some(event_id) = subscribe_up_to { + trace!(thread = %thread_root, up_to = %event_id, "found a new thread to subscribe to"); + if let Err(err) = room.subscribe_thread_if_needed(&thread_root, Some(event_id)).await { + warn!(%err, "Failed to subscribe to thread"); + } else { + let _ = thread_subscriber_sender.send(()); + } + } + + true + } + + #[instrument(skip(client, thread_subscriber_sender))] + async fn handle_thread_subscriber_send_queue_update( + client: &WeakClient, + thread_subscriber_sender: &Sender<()>, + events_being_sent: &mut HashMap, + up: SendQueueUpdate, + ) -> bool { + let Some(client) = client.get() else { + // Client shutting down. + debug!("Client is shutting down, exiting thread subscriber task"); + return false; + }; + + let room_id = up.room_id; + let Some(room) = client.get_room(&room_id) else { + warn!(%room_id, "unknown room"); + return true; + }; + + let (thread_root, subscribe_up_to) = match up.update { + RoomSendQueueUpdate::NewLocalEvent(local_echo) => { + match local_echo.content { + LocalEchoContent::Event { serialized_event, .. } => { + if let Some(thread_root) = + extract_thread_root_from_content(serialized_event.into_raw().0) + { + events_being_sent.insert(local_echo.transaction_id, thread_root); + } + } + LocalEchoContent::React { .. } => { + // Nothing to do, reactions don't count as a thread + // subscription. + } + } + return true; + } + + RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => { + events_being_sent.remove(&transaction_id); + return true; + } + + RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content } => { + if let Some(thread_root) = + extract_thread_root_from_content(new_content.into_raw().0) + { + events_being_sent.insert(transaction_id, thread_root); + } else { + // It could be that the event isn't part of a thread anymore; handle that by + // removing the pending transaction id. + events_being_sent.remove(&transaction_id); + } + return true; + } + + RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => { + if let Some(thread_root) = events_being_sent.remove(&transaction_id) { + (thread_root, event_id) + } else { + // We don't know about the event that has been sent, so ignore it. + trace!(%transaction_id, "received a sent event that we didn't know about, ignoring"); + return true; + } + } + + RoomSendQueueUpdate::SendError { .. } + | RoomSendQueueUpdate::RetryEvent { .. } + | RoomSendQueueUpdate::MediaUpload { .. } => { + // Nothing to do for these bad boys. + return true; + } + }; + + // And if we've found such a mention, subscribe to the thread up to this event. + trace!(thread = %thread_root, up_to = %subscribe_up_to, "found a new thread to subscribe to"); + if let Err(err) = room.subscribe_thread_if_needed(&thread_root, Some(subscribe_up_to)).await + { + warn!(%err, "Failed to subscribe to thread"); + } else { + let _ = thread_subscriber_sender.send(()); + } + + true + } + + #[instrument(skip_all)] + async fn thread_subscriber_task( + client: WeakClient, + linked_chunk_update_sender: Sender, + thread_subscriber_sender: Sender<()>, + ) { + let mut send_q_rx = if let Some(client) = client.get() { + if !client.enabled_thread_subscriptions() { + trace!("Thread subscriptions are not enabled, not spawning thread subscriber task"); + return; + } + + client.send_queue().subscribe() + } else { + trace!("Client is shutting down, not spawning thread subscriber task"); + return; + }; + + let mut linked_chunk_rx = linked_chunk_update_sender.subscribe(); + + // A mapping of local echoes (events being sent), to their thread root, if + // they're in an in-thread reply. + // + // Entirely managed by `handle_thread_subscriber_send_queue_update`. + let mut events_being_sent = HashMap::new(); + + loop { + select! { + res = send_q_rx.recv() => { + match res { + Ok(up) => { + if !Self::handle_thread_subscriber_send_queue_update(&client, &thread_subscriber_sender, &mut events_being_sent, up).await { + break; + } + } + Err(RecvError::Closed) => { + debug!("Linked chunk update channel has been closed, exiting thread subscriber task"); + break; + } + Err(RecvError::Lagged(num_skipped)) => { + warn!(num_skipped, "Lagged behind linked chunk updates"); + } + } + } + + res = linked_chunk_rx.recv() => { + match res { + Ok(up) => { + if !Self::handle_thread_subscriber_linked_chunk_update(&client, &thread_subscriber_sender, up).await { + break; + } + } + Err(RecvError::Closed) => { + debug!("Linked chunk update channel has been closed, exiting thread subscriber task"); + break; + } + Err(RecvError::Lagged(num_skipped)) => { + warn!(num_skipped, "Lagged behind linked chunk updates"); + } + } + } + } + } + } } struct EventCacheInner { @@ -428,6 +699,29 @@ struct EventCacheInner { /// See doc comment of [`RoomEventCacheGenericUpdate`] and /// [`EventCache::subscribe_to_room_generic_updates`]. generic_update_sender: Sender, + + /// A sender for a persisted linked chunk update. + /// + /// This is used to notify that some linked chunk has persisted some updates + /// to a store, and can be used by observers to look for new events. + /// + /// See doc comment of [`RoomEventCacheLinkedChunkUpdate`]. + linked_chunk_update_sender: Sender, + + /// A background task listening to room and send queue updates, and + /// automatically subscribing the user to threads when needed, based on + /// the semantics of MSC4306. + /// + /// One important constraint is that there is only one such task per + /// [`EventCache`], so it does listen to *all* rooms at the same time. + _thread_subscriber_task: AbortOnDrop<()>, + + /// A test helper receiver that will be emitted every time the thread + /// subscriber task subscribed to a new thread. + /// + /// This is helpful for tests to coordinate that a new thread subscription + /// has been sent or not. + thread_subscriber_receiver: Receiver<()>, } type AutoShrinkChannelPayload = OwnedRoomId; @@ -591,6 +885,7 @@ impl EventCacheInner { let room_state = RoomEventCacheState::new( room_id.to_owned(), room_version_rules, + self.linked_chunk_update_sender.clone(), self.store.clone(), pagination_status.clone(), ) @@ -657,6 +952,42 @@ pub struct RoomEventCacheGenericUpdate { pub room_id: OwnedRoomId, } +/// An update being triggered when events change in the persisted event cache +/// for any room. +#[derive(Clone, Debug)] +struct RoomEventCacheLinkedChunkUpdate { + /// The linked chunk affected by the update. + linked_chunk: OwnedLinkedChunkId, + + /// A vector of all the updates that happened during this update. + updates: Vec>, +} + +impl RoomEventCacheLinkedChunkUpdate { + /// Return all the new events propagated by this update, in topological + /// order. + pub fn events(self) -> Vec { + self.updates + .into_iter() + .flat_map(|update| match update { + linked_chunk::Update::PushItems { items, .. } => items, + linked_chunk::Update::ReplaceItem { item, .. } => vec![item], + linked_chunk::Update::RemoveItem { .. } + | linked_chunk::Update::DetachLastItems { .. } + | linked_chunk::Update::StartReattachItems + | linked_chunk::Update::EndReattachItems + | linked_chunk::Update::NewItemsChunk { .. } + | linked_chunk::Update::NewGapChunk { .. } + | linked_chunk::Update::RemoveChunk(..) + | linked_chunk::Update::Clear => { + // All these updates don't contain any new event. + vec![] + } + }) + .collect() + } +} + /// An update related to events happened in a room. #[derive(Debug, Clone)] pub enum RoomEventCacheUpdate { diff --git a/crates/matrix-sdk/src/event_cache/room/mod.rs b/crates/matrix-sdk/src/event_cache/room/mod.rs index 113dc5b6de2..175731f01ad 100644 --- a/crates/matrix-sdk/src/event_cache/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/room/mod.rs @@ -617,7 +617,8 @@ mod private { }, linked_chunk::{ lazy_loader::{self}, - ChunkContent, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId, Position, Update, + ChunkContent, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId, + OwnedLinkedChunkId, Position, Update, }, serde_helpers::extract_thread_root, sync::Timeline, @@ -632,7 +633,7 @@ mod private { serde::Raw, EventId, OwnedEventId, OwnedRoomId, }; - use tokio::sync::broadcast::Receiver; + use tokio::sync::broadcast::{Receiver, Sender}; use tracing::{debug, error, instrument, trace, warn}; use super::{ @@ -642,7 +643,8 @@ mod private { }; use crate::event_cache::{ deduplicator::filter_duplicate_events, room::threads::ThreadEventCache, - BackPaginationOutcome, RoomPaginationStatus, ThreadEventCacheUpdate, + BackPaginationOutcome, RoomEventCacheLinkedChunkUpdate, RoomPaginationStatus, + ThreadEventCacheUpdate, }; /// State for a single room's event cache. @@ -676,6 +678,10 @@ mod private { pagination_status: SharedObservable, + /// See doc comment of + /// [`super::super::EventCacheInner::linked_chunk_update_sender`]. + linked_chunk_update_sender: Sender, + /// An atomic count of the current number of subscriber of the /// [`super::RoomEventCache`]. pub(super) subscriber_count: Arc, @@ -694,6 +700,7 @@ mod private { pub async fn new( room_id: OwnedRoomId, room_version_rules: RoomVersionRules, + linked_chunk_update_sender: Sender, store: EventCacheStoreLock, pagination_status: SharedObservable, ) -> Result { @@ -766,6 +773,7 @@ mod private { waited_for_initial_prev_token: false, subscriber_count: Default::default(), pagination_status, + linked_chunk_update_sender, }) } @@ -1235,13 +1243,14 @@ mod private { let store = self.store.clone(); let room_id = self.room.clone(); + let cloned_updates = updates.clone(); spawn(async move { let store = store.lock().await?; - trace!(?updates, "sending linked chunk updates to the store"); + trace!(updates = ?cloned_updates, "sending linked chunk updates to the store"); let linked_chunk_id = LinkedChunkId::Room(&room_id); - store.handle_linked_chunk_updates(linked_chunk_id, updates).await?; + store.handle_linked_chunk_updates(linked_chunk_id, cloned_updates).await?; trace!("linked chunk updates applied"); super::Result::Ok(()) @@ -1249,6 +1258,12 @@ mod private { .await .expect("joining failed")?; + // Forward that the store got updated to observers. + let _ = self.linked_chunk_update_sender.send(RoomEventCacheLinkedChunkUpdate { + linked_chunk: OwnedLinkedChunkId::Room(self.room.clone()), + updates, + }); + Ok(()) } @@ -1467,9 +1482,13 @@ mod private { fn get_or_reload_thread(&mut self, root_event_id: OwnedEventId) -> &mut ThreadEventCache { // TODO: when there's persistent storage, try to lazily reload from disk, if // missing from memory. - self.threads - .entry(root_event_id.clone()) - .or_insert_with(|| ThreadEventCache::new(root_event_id)) + self.threads.entry(root_event_id.clone()).or_insert_with(|| { + ThreadEventCache::new( + self.room.clone(), + root_event_id, + self.linked_chunk_update_sender.clone(), + ) + }) } #[instrument(skip_all)] diff --git a/crates/matrix-sdk/src/event_cache/room/threads.rs b/crates/matrix-sdk/src/event_cache/room/threads.rs index 18485110187..1939b4df63f 100644 --- a/crates/matrix-sdk/src/event_cache/room/threads.rs +++ b/crates/matrix-sdk/src/event_cache/room/threads.rs @@ -19,16 +19,16 @@ use std::collections::BTreeSet; use eyeball_im::VectorDiff; use matrix_sdk_base::{ event_cache::{Event, Gap}, - linked_chunk::{ChunkContent, Position}, + linked_chunk::{ChunkContent, OwnedLinkedChunkId, Position}, }; -use ruma::OwnedEventId; +use ruma::{OwnedEventId, OwnedRoomId}; use tokio::sync::broadcast::{Receiver, Sender}; use tracing::trace; use crate::event_cache::{ deduplicator::DeduplicationOutcome, room::{events::EventLinkedChunk, LoadMoreEventsBackwardsOutcome}, - BackPaginationOutcome, EventsOrigin, + BackPaginationOutcome, EventsOrigin, RoomEventCacheLinkedChunkUpdate, }; /// An update coming from a thread event cache. @@ -42,6 +42,9 @@ pub struct ThreadEventCacheUpdate { /// All the information related to a single thread. pub(crate) struct ThreadEventCache { + /// The room owning this thread. + room_id: OwnedRoomId, + /// The ID of the thread root event, which is the first event in the thread /// (and eventually the first in the linked chunk). thread_root: OwnedEventId, @@ -51,12 +54,24 @@ pub(crate) struct ThreadEventCache { /// A sender for live events updates in this thread. sender: Sender, + + linked_chunk_update_sender: Sender, } impl ThreadEventCache { /// Create a new empty thread event cache. - pub fn new(thread_root: OwnedEventId) -> Self { - Self { chunk: EventLinkedChunk::new(), sender: Sender::new(32), thread_root } + pub fn new( + room_id: OwnedRoomId, + thread_root: OwnedEventId, + linked_chunk_update_sender: Sender, + ) -> Self { + Self { + chunk: EventLinkedChunk::new(), + sender: Sender::new(32), + room_id, + thread_root, + linked_chunk_update_sender, + } } /// Subscribe to live events from this thread. @@ -78,6 +93,22 @@ impl ThreadEventCache { } } + // TODO(bnjbvr): share more code with `RoomEventCacheState` to avoid the + // duplication here too. + fn propagate_changes(&mut self) { + // This is a lie, at the moment! We're not persisting threads yet, so we're just + // forwarding all updates to the linked chunk update sender. + let updates = self.chunk.store_updates().take(); + + let _ = self.linked_chunk_update_sender.send(RoomEventCacheLinkedChunkUpdate { + updates, + linked_chunk: OwnedLinkedChunkId::Thread( + self.room_id.clone(), + self.thread_root.clone(), + ), + }); + } + /// Push some live events to this thread, and propagate the updates to /// the listeners. pub fn add_live_events(&mut self, events: Vec) { @@ -104,6 +135,8 @@ impl ThreadEventCache { self.chunk.push_live_events(None, &events); + self.propagate_changes(); + let diffs = self.chunk.updates_as_vector_diffs(); if !diffs.is_empty() { let _ = self.sender.send(ThreadEventCacheUpdate { diffs, origin: EventsOrigin::Sync }); @@ -249,6 +282,8 @@ impl ThreadEventCache { // Add the paginated events to the thread chunk. let reached_start = self.chunk.finish_back_pagination(prev_gap_id, new_gap, &events); + self.propagate_changes(); + // Notify observers about the updates. let updates = self.chunk.updates_as_vector_diffs(); if !updates.is_empty() { diff --git a/crates/matrix-sdk/src/room/mod.rs b/crates/matrix-sdk/src/room/mod.rs index d508ef26883..0534bb6420c 100644 --- a/crates/matrix-sdk/src/room/mod.rs +++ b/crates/matrix-sdk/src/room/mod.rs @@ -2994,11 +2994,24 @@ impl Room { self.inner.load_event_receipts(receipt_type, thread, event_id).await.map_err(Into::into) } - /// Get the push context for this room. + /// Get the push-condition context for this room. /// /// Returns `None` if some data couldn't be found. This should only happen /// in brand new rooms, while we process its state. pub async fn push_condition_room_ctx(&self) -> Result> { + self.push_condition_room_ctx_internal(self.client.enabled_thread_subscriptions()).await + } + + /// Get the push-condition context for this room, with a choice to include + /// thread subscriptions or not, based on the extra + /// `with_threads_subscriptions` parameter. + /// + /// Returns `None` if some data couldn't be found. This should only happen + /// in brand new rooms, while we process its state. + pub(crate) async fn push_condition_room_ctx_internal( + &self, + with_threads_subscriptions: bool, + ) -> Result> { let room_id = self.room_id(); let user_id = self.own_user_id(); let room_info = self.clone_info(); @@ -3022,7 +3035,6 @@ impl Room { } }; - let this = self.clone(); let mut ctx = assign!(PushConditionRoomCtx::new( room_id.to_owned(), UInt::new(member_count).unwrap_or(UInt::MAX), @@ -3033,12 +3045,12 @@ impl Room { power_levels, }); - if self.client.enabled_thread_subscriptions() { + if with_threads_subscriptions { + let this = self.clone(); ctx = ctx.with_has_thread_subscription_fn(move |event_id: &EventId| { let room = this.clone(); Box::pin(async move { - if let Ok(maybe_sub) = room.fetch_thread_subscription(event_id.to_owned()).await - { + if let Ok(maybe_sub) = room.load_or_fetch_thread_subscription(event_id).await { maybe_sub.is_some() } else { false @@ -3053,7 +3065,20 @@ impl Room { /// Retrieves a [`PushContext`] that can be used to compute the push /// actions for events. pub async fn push_context(&self) -> Result> { - let Some(push_condition_room_ctx) = self.push_condition_room_ctx().await? else { + self.push_context_internal(self.client.enabled_thread_subscriptions()).await + } + + /// Retrieves a [`PushContext`] that can be used to compute the push actions + /// for events, with a choice to include thread subscriptions or not, + /// based on the extra `with_threads_subscriptions` parameter. + #[instrument(skip(self))] + pub(crate) async fn push_context_internal( + &self, + with_threads_subscriptions: bool, + ) -> Result> { + let Some(push_condition_room_ctx) = + self.push_condition_room_ctx_internal(with_threads_subscriptions).await? + else { debug!("Could not aggregate push context"); return Ok(None); }; @@ -3740,6 +3765,26 @@ impl Room { } } + /// Subscribe to a thread if needed, based on a current subscription to it. + /// + /// This is like [`Self::subscribe_thread`], but it first checks if the user + /// has already subscribed to a thread, so as to minimize sending + /// unnecessary subscriptions which would be ignored by the server. + pub async fn subscribe_thread_if_needed( + &self, + thread_root: &EventId, + automatic: Option, + ) -> Result<()> { + if let Some(prev_sub) = self.load_or_fetch_thread_subscription(thread_root).await? { + // If we have a previous subscription, we should only send the new one if it's + // manual and the previous one was automatic. + if !prev_sub.automatic || automatic.is_some() { + return Ok(()); + } + } + self.subscribe_thread(thread_root.to_owned(), automatic).await + } + /// Unsubscribe from a given thread in this room. /// /// # Arguments @@ -3821,6 +3866,20 @@ impl Room { Ok(subscription) } + + /// Return the current thread subscription for the given thread root in this + /// room, by getting it from storage if possible, or fetching it from + /// network otherwise. + /// + /// See also [`Self::fetch_thread_subscription`] for the exact semantics of + /// this method. + pub async fn load_or_fetch_thread_subscription( + &self, + thread_root: &EventId, + ) -> Result> { + // A bit of a lie at the moment, since thread subscriptions are not sync'd yet. + self.fetch_thread_subscription(thread_root.to_owned()).await + } } #[cfg(feature = "e2e-encryption")] diff --git a/crates/matrix-sdk/src/test_utils/mocks/mod.rs b/crates/matrix-sdk/src/test_utils/mocks/mod.rs index 1a8d9a9f7b2..f7465fdfc87 100644 --- a/crates/matrix-sdk/src/test_utils/mocks/mod.rs +++ b/crates/matrix-sdk/src/test_utils/mocks/mod.rs @@ -3950,6 +3950,13 @@ impl<'a> MockEndpoint<'a, PutThreadSubscriptionEndpoint> { self.endpoint.matchers = self.endpoint.matchers.match_thread_id(thread_root); self } + /// Match the request body's `automatic` field against a specific event id. + pub fn match_automatic_event_id(mut self, up_to_event_id: &EventId) -> Self { + self.mock = self.mock.and(body_json(json!({ + "automatic": up_to_event_id + }))); + self + } } /// A prebuilt mock for `DELETE diff --git a/crates/matrix-sdk/tests/integration/event_cache/threads.rs b/crates/matrix-sdk/tests/integration/event_cache/threads.rs index fd268baff1f..ccab2f3af80 100644 --- a/crates/matrix-sdk/tests/integration/event_cache/threads.rs +++ b/crates/matrix-sdk/tests/integration/event_cache/threads.rs @@ -1,19 +1,30 @@ +use std::time::Duration; + use assert_matches2::assert_let; use eyeball_im::VectorDiff; use imbl::Vector; use matrix_sdk::{ assert_let_timeout, deserialized_responses::{ThreadSummaryStatus, TimelineEvent}, - event_cache::{RoomEventCacheUpdate, ThreadEventCacheUpdate}, + event_cache::{RoomEventCacheSubscriber, RoomEventCacheUpdate, ThreadEventCacheUpdate}, + sleep::sleep, test_utils::{ assert_event_matches_msg, mocks::{MatrixMockServer, RoomRelationsResponseTemplate}, }, + Client, ThreadingSupport, }; use matrix_sdk_test::{ async_test, event_factory::EventFactory, GlobalAccountDataTestEvent, JoinedRoomBuilder, ALICE, }; -use ruma::{event_id, room_id, user_id}; +use ruma::{ + event_id, + events::{AnySyncTimelineEvent, Mentions}, + push::{ConditionalPushRule, Ruleset}, + room_id, + serde::Raw, + user_id, OwnedEventId, OwnedRoomId, +}; use serde_json::json; use tokio::sync::broadcast; @@ -433,3 +444,344 @@ async fn test_deduplication() { // The events were already known, so the stream is still empty. assert!(thread_stream.is_empty()); } + +struct ThreadSubscriptionTestSetup { + server: MatrixMockServer, + client: Client, + factory: EventFactory, + room_id: OwnedRoomId, + subscriber: RoomEventCacheSubscriber, + /// 3 events: 1 non-mention, 1 mention, and another non-mention. + events: Vec>, + mention_event_id: OwnedEventId, + thread_root: OwnedEventId, +} + +/// Create a new setup for a thread subscription test, with enough data so that +/// a push context can be created. +/// +/// The setup uses custom push rules, to trigger notifications only on mentions. +/// +/// The setup includes 3 events (1 non-mention, 1 mention, and another +/// non-mention) in the same thread, for easy testing of automated +/// subscriptions. +async fn thread_subscription_test_setup() -> ThreadSubscriptionTestSetup { + let server = MatrixMockServer::new().await; + + let thread_root = event_id!("$thread_root"); + + // Assuming a client that's interested in thread subscriptions, + let client = server + .client_builder() + .on_builder(|builder| { + builder.with_threading_support(ThreadingSupport::Enabled { with_subscriptions: true }) + }) + .build() + .await; + + // Immediately subscribe the event cache to sync updates. + client.event_cache().subscribe().unwrap(); + + let room_id = room_id!("!omelette:fromage.fr"); + let room = server.sync_joined_room(&client, room_id).await; + + let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); + + let (initial_events, mut subscriber) = room_event_cache.subscribe().await; + assert!(initial_events.is_empty()); + assert!(subscriber.is_empty()); + + // Provide a dummy sync with the room's member profile of the current user, so + // the push context can be created. + let own_user_id = client.user_id().unwrap(); + let f = EventFactory::new().room(room_id).sender(*ALICE); + let member = f.member(own_user_id).sender(own_user_id); + + // Override push rules so that only an intentional mention causes a + // notification. + let mut push_rules = Ruleset::default(); + push_rules.override_.insert(ConditionalPushRule::is_user_mention(own_user_id)); + + server + .mock_sync() + .ok_and_run(&client, |sync_builder| { + sync_builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_state_event(member)); + sync_builder.add_global_account_data_event(GlobalAccountDataTestEvent::Custom(json!({ + "type": "m.push_rules", + "content": { + "global": push_rules + } + }))); + }) + .await; + + // Wait for the initial sync processing to complete; it will trigger a member + // update, at the very least. + assert_let_timeout!(Ok(RoomEventCacheUpdate::UpdateMembers { .. }) = subscriber.recv()); + + let first_reply_event_id = event_id!("$first_reply"); + let first_reply = f + .text_msg("hey there") + .in_thread(thread_root, thread_root) + .event_id(first_reply_event_id) + .into_raw(); + + let second_reply_event_id = event_id!("$second_reply"); + let second_reply = f + .text_msg("hoy test user!") + .mentions(Mentions::with_user_ids([own_user_id.to_owned()])) + .in_thread(thread_root, first_reply_event_id) + .event_id(second_reply_event_id) + .into_raw(); + + let third_reply_event_id = event_id!("$third_reply"); + let third_reply = f + .text_msg("ciao!") + .in_thread(thread_root, second_reply_event_id) + .event_id(third_reply_event_id) + .into_raw(); + + ThreadSubscriptionTestSetup { + server, + client, + factory: f, + subscriber, + events: vec![first_reply, second_reply, third_reply], + mention_event_id: second_reply_event_id.to_owned(), + thread_root: thread_root.to_owned(), + room_id: room_id.to_owned(), + } +} + +#[async_test] +async fn test_auto_subscribe_thread_via_sync() { + let mut s = thread_subscription_test_setup().await; + + // (The endpoint will be called for the current thread, and with an automatic + // subscription up to the given event ID.) + s.server + .mock_put_thread_subscription() + .match_automatic_event_id(&s.mention_event_id) + .match_thread_id(s.thread_root.to_owned()) + .ok() + .mock_once() + .mount() + .await; + + let mut thread_subscriber_updates = + s.client.event_cache().subscribe_thread_subscriber_updates(); + + // When I receive 3 events (1 non mention, 1 mention, then 1 non mention again), + // from sync, I'll get subscribed to the thread because of the second event. + s.server + .sync_room(&s.client, JoinedRoomBuilder::new(&s.room_id).add_timeline_bulk(s.events)) + .await; + + // Let the event cache process the update. + assert_let_timeout!( + Ok(RoomEventCacheUpdate::UpdateTimelineEvents { .. }) = s.subscriber.recv() + ); + assert_let_timeout!(Ok(()) = thread_subscriber_updates.recv()); + + // The actual check is the `mock_once` call above! +} + +#[async_test] +async fn test_dont_auto_subscribe_on_already_subscribed_thread() { + let mut s = thread_subscription_test_setup().await; + + // Given a thread I'm already subscribed to, + s.server + .mock_get_thread_subscription() + .match_thread_id(s.thread_root.to_owned()) + .ok(false) + .mock_once() + .mount() + .await; + + // The PUT endpoint (to subscribe to the thread) shouldn't be called… + s.server.mock_put_thread_subscription().ok().expect(0).mount().await; + + // …when I receive a new in-thread mention for this thread. + s.server + .sync_room(&s.client, JoinedRoomBuilder::new(&s.room_id).add_timeline_bulk(s.events)) + .await; + + // Let the event cache process the update. + assert_let_timeout!( + Ok(RoomEventCacheUpdate::UpdateTimelineEvents { .. }) = s.subscriber.recv() + ); + + // Let a bit of time for the background thread subscriber task to process the + // update. + sleep(Duration::from_millis(200)).await; + + // The actual check is the `expect` call above! +} + +#[async_test] +async fn test_auto_subscribe_on_thread_paginate() { + // In this scenario, we're back-paginating a thread and making sure that the + // back-paginated events do cause a subscription. + + let s = thread_subscription_test_setup().await; + + let event_cache = s.client.event_cache(); + event_cache.subscribe().unwrap(); + + let mut thread_subscriber_updates = + s.client.event_cache().subscribe_thread_subscriber_updates(); + + let thread_root_id = event_id!("$thread_root"); + let thread_resp_id = event_id!("$thread_resp"); + + // Receive an in-thread event. + let room = s + .server + .sync_room( + &s.client, + JoinedRoomBuilder::new(&s.room_id).add_timeline_event( + s.factory + .text_msg("that's a good point") + .in_thread(thread_root_id, thread_root_id) + .event_id(thread_resp_id), + ), + ) + .await; + + let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); + + let (thread_events, mut thread_stream) = + room_event_cache.subscribe_to_thread(thread_root_id.to_owned()).await; + + // Sanity check: the sync event is added to the thread. + let mut thread_events = wait_for_initial_events(thread_events, &mut thread_stream).await; + assert_eq!(thread_events.len(), 1); + assert_eq!(thread_events.remove(0).event_id().as_deref(), Some(thread_resp_id)); + + assert!(thread_subscriber_updates.is_empty()); + + // It's possible to paginate the thread, and this will push the thread root + // because there's no prev-batch token. + let reversed_events = s.events.into_iter().rev().map(Raw::cast_unchecked).collect(); + s.server + .mock_room_relations() + .match_target_event(thread_root_id.to_owned()) + .ok(RoomRelationsResponseTemplate::default().events(reversed_events)) + .mock_once() + .mount() + .await; + + s.server + .mock_room_event() + .match_event_id() + .ok(s.factory.text_msg("Thread root").event_id(thread_root_id).into()) + .mock_once() + .mount() + .await; + + // (The endpoint will be called for the current thread, and with an automatic + // subscription up to the given event ID.) + s.server + .mock_put_thread_subscription() + .match_automatic_event_id(&s.mention_event_id) + .match_thread_id(s.thread_root.to_owned()) + .ok() + .mock_once() + .mount() + .await; + + let hit_start = + room_event_cache.paginate_thread_backwards(thread_root_id.to_owned(), 42).await.unwrap(); + assert!(hit_start); + + // Let the event cache process the update. + assert_let_timeout!(Ok(ThreadEventCacheUpdate { .. }) = thread_stream.recv()); + assert_let_timeout!(Ok(()) = thread_subscriber_updates.recv()); + assert!(thread_subscriber_updates.is_empty()); +} + +#[async_test] +async fn test_auto_subscribe_on_thread_paginate_root_event() { + // In this scenario, the root of a thread is the event that would cause the + // subscription. + + let s = thread_subscription_test_setup().await; + + let event_cache = s.client.event_cache(); + event_cache.subscribe().unwrap(); + + let mut thread_subscriber_updates = + s.client.event_cache().subscribe_thread_subscriber_updates(); + + let thread_root_id = event_id!("$thread_root"); + let thread_resp_id = event_id!("$thread_resp"); + + // Receive an in-thread event. + let room = s + .server + .sync_room( + &s.client, + JoinedRoomBuilder::new(&s.room_id).add_timeline_event( + s.factory + .text_msg("that's a good point") + .in_thread(thread_root_id, thread_root_id) + .event_id(thread_resp_id), + ), + ) + .await; + + let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); + + let (thread_events, mut thread_stream) = + room_event_cache.subscribe_to_thread(thread_root_id.to_owned()).await; + + // Sanity check: the sync event is added to the thread. + let mut thread_events = wait_for_initial_events(thread_events, &mut thread_stream).await; + assert_eq!(thread_events.len(), 1); + assert_eq!(thread_events.remove(0).event_id().as_deref(), Some(thread_resp_id)); + + assert!(thread_subscriber_updates.is_empty()); + + // It's possible to paginate the thread, and this will push the thread root + // because there's no prev-batch token. + s.server + .mock_room_relations() + .match_target_event(thread_root_id.to_owned()) + .ok(RoomRelationsResponseTemplate::default()) + .mock_once() + .mount() + .await; + + s.server + .mock_room_event() + .match_event_id() + .ok(s + .factory + .text_msg("da r00t") + .event_id(thread_root_id) + .mentions(Mentions::with_user_ids(s.client.user_id().map(ToOwned::to_owned))) + .into()) + .mock_once() + .mount() + .await; + + // (The endpoint will be called for the current thread, and with an automatic + // subscription up to the given event ID.) + s.server + .mock_put_thread_subscription() + .match_automatic_event_id(thread_root_id) + .match_thread_id(thread_root_id.to_owned()) + .ok() + .mock_once() + .mount() + .await; + + let hit_start = + room_event_cache.paginate_thread_backwards(thread_root_id.to_owned(), 42).await.unwrap(); + assert!(hit_start); + + // Let the event cache process the update. + assert_let_timeout!(Ok(ThreadEventCacheUpdate { .. }) = thread_stream.recv()); + assert_let_timeout!(Ok(()) = thread_subscriber_updates.recv()); +} diff --git a/crates/matrix-sdk/tests/integration/room/thread.rs b/crates/matrix-sdk/tests/integration/room/thread.rs index dffcd2e1478..75dd94892ae 100644 --- a/crates/matrix-sdk/tests/integration/room/thread.rs +++ b/crates/matrix-sdk/tests/integration/room/thread.rs @@ -79,6 +79,97 @@ async fn test_subscribe_thread() { assert_matches!(subscription, None); } +#[async_test] +async fn test_subscribe_thread_if_needed() { + let server = MatrixMockServer::new().await; + let client = server.client_builder().build().await; + + let room_id = room_id!("!test:example.org"); + let room = server.sync_joined_room(&client, room_id).await; + + // If there's no prior subscription, the function `subscribe_thread_if_needed` + // will automatically subscribe to the thread, whether the new subscription + // is automatic or not. + for (root_id, automatic) in [ + (owned_event_id!("$root"), None), + (owned_event_id!("$woot"), Some(owned_event_id!("$woot"))), + ] { + server + .mock_put_thread_subscription() + .match_room_id(room_id.to_owned()) + .match_thread_id(root_id.clone()) + .ok() + .mock_once() + .mount() + .await; + + room.subscribe_thread_if_needed(&root_id, automatic).await.unwrap(); + } + + // If there's a prior automatic subscription, the function + // `subscribe_thread_if_needed` will only subscribe to the thread if the new + // subscription is manual. + { + let root_id = owned_event_id!("$toot"); + + server + .mock_get_thread_subscription() + .match_room_id(room_id.to_owned()) + .match_thread_id(root_id.clone()) + .ok(true) + .mock_once() + .mount() + .await; + + server + .mock_put_thread_subscription() + .match_room_id(room_id.to_owned()) + .match_thread_id(root_id.clone()) + .ok() + .mock_once() + .mount() + .await; + + room.subscribe_thread_if_needed(&root_id, None).await.unwrap(); + } + + // Otherwise, it will be a no-op. + { + let root_id = owned_event_id!("$foot"); + + server + .mock_get_thread_subscription() + .match_room_id(room_id.to_owned()) + .match_thread_id(root_id.clone()) + .ok(true) + .mock_once() + .mount() + .await; + + room.subscribe_thread_if_needed(&root_id, Some(owned_event_id!("$foot"))).await.unwrap(); + } + + // The function `subscribe_thread_if_needed` is a no-op if there's a prior + // manual subscription, whether the new subscription is automatic or not. + for (root_id, automatic) in [ + (owned_event_id!("$root"), None), + (owned_event_id!("$woot"), Some(owned_event_id!("$woot"))), + ] { + server + .mock_get_thread_subscription() + .match_room_id(room_id.to_owned()) + .match_thread_id(root_id.clone()) + .ok(false) + .mock_once() + .mount() + .await; + + // No-op! (The PUT endpoint hasn't been mocked, so this would result in a 404 if + // it were trying to hit it.) + room.subscribe_thread_if_needed(&root_id, automatic).await.unwrap(); + } +} + #[async_test] async fn test_thread_push_rule_is_triggered_for_subscribed_threads() { // This test checks that the evaluation of push rules for threads will correctly diff --git a/crates/matrix-sdk/tests/integration/send_queue.rs b/crates/matrix-sdk/tests/integration/send_queue.rs index db6578005e8..295ddb77c0b 100644 --- a/crates/matrix-sdk/tests/integration/send_queue.rs +++ b/crates/matrix-sdk/tests/integration/send_queue.rs @@ -5,6 +5,7 @@ use assert_matches2::{assert_let, assert_matches}; #[cfg(feature = "unstable-msc4274")] use matrix_sdk::attachment::{GalleryConfig, GalleryItemInfo}; use matrix_sdk::{ + assert_let_timeout, attachment::{AttachmentConfig, AttachmentInfo, BaseImageInfo, Thumbnail}, config::StoreConfig, media::{MediaFormat, MediaRequestParameters, MediaThumbnailSettings}, @@ -14,7 +15,7 @@ use matrix_sdk::{ RoomSendQueueStorageError, RoomSendQueueUpdate, SendHandle, SendQueueUpdate, }, test_utils::mocks::{MatrixMock, MatrixMockServer}, - Client, MemoryStore, + Client, MemoryStore, ThreadingSupport, }; use matrix_sdk_test::{ async_test, event_factory::EventFactory, InvitedRoomBuilder, KnockedRoomBuilder, @@ -29,6 +30,7 @@ use ruma::{ NewUnstablePollStartEventContent, UnstablePollAnswer, UnstablePollAnswers, UnstablePollStartContentBlock, UnstablePollStartEventContent, }, + relation::Thread, room::{ message::{ ImageMessageEventContent, MessageType, Relation, ReplyWithinThread, @@ -3712,3 +3714,118 @@ async fn test_update_caption_while_sending_media_event() { // That's all, folks! assert!(watch.is_empty()); } + +#[async_test] +async fn test_sending_reply_in_thread_auto_subscribe() { + let server = MatrixMockServer::new().await; + + // Assuming a client that's interested in thread subscriptions, + let client = server + .client_builder() + .on_builder(|builder| { + builder.with_threading_support(ThreadingSupport::Enabled { with_subscriptions: true }) + }) + .build() + .await; + + client.event_cache().subscribe().unwrap(); + + let mut thread_subscriber_updates = client.event_cache().subscribe_thread_subscriber_updates(); + + let room_id = room_id!("!a:b.c"); + let room = server.sync_joined_room(&client, room_id).await; + + server.mock_room_state_encryption().plain().mount().await; + + // When I send a message to a thread, + let thread_root = event_id!("$thread"); + + let mut content = RoomMessageEventContent::text_plain("hello world"); + content.relates_to = + Some(Relation::Thread(Thread::plain(thread_root.to_owned(), thread_root.to_owned()))); + + server.mock_room_send().ok(event_id!("$reply")).mock_once().mount().await; + + server + .mock_put_thread_subscription() + .match_room_id(room_id.to_owned()) + .match_thread_id(thread_root.to_owned()) + .ok() + .mock_once() + .mount() + .await; + + let (_, mut stream) = room.send_queue().subscribe().await.unwrap(); + room.send_queue().send(content.into()).await.unwrap(); + + // Let the send queue process the event. + assert_let_timeout!(Ok(RoomSendQueueUpdate::NewLocalEvent(..)) = stream.recv()); + assert_let_timeout!(Ok(RoomSendQueueUpdate::SentEvent { .. }) = stream.recv()); + assert_let_timeout!(Ok(()) = thread_subscriber_updates.recv()); + + // Check the endpoints have been correctly called. + server.server().reset().await; + + // Now, if I send a message in a thread I've already subscribed to, in automatic + // mode, this promotes the subscription to manual. + + // Subscribed, automatically. + server + .mock_get_thread_subscription() + .match_room_id(room_id.to_owned()) + .match_thread_id(thread_root.to_owned()) + .ok(true) + .mount() + .await; + + // I'll get one subscription. + server + .mock_put_thread_subscription() + .match_room_id(room_id.to_owned()) + .match_thread_id(thread_root.to_owned()) + .ok() + .mock_once() + .mount() + .await; + + server.mock_room_send().ok(event_id!("$reply")).mock_once().mount().await; + + let mut content = RoomMessageEventContent::text_plain("hello world"); + content.relates_to = + Some(Relation::Thread(Thread::plain(thread_root.to_owned(), thread_root.to_owned()))); + room.send_queue().send(content.into()).await.unwrap(); + + // Let the send queue process the event. + assert_let_timeout!(Ok(RoomSendQueueUpdate::NewLocalEvent(..)) = stream.recv()); + assert_let_timeout!(Ok(RoomSendQueueUpdate::SentEvent { .. }) = stream.recv()); + assert_let_timeout!(Ok(()) = thread_subscriber_updates.recv()); + + // Check the endpoints have been correctly called. + server.server().reset().await; + + // Subscribed, but manually. + server + .mock_get_thread_subscription() + .match_room_id(room_id.to_owned()) + .match_thread_id(thread_root.to_owned()) + .ok(false) + .mount() + .await; + + // I'll get zero subscription. + server.mock_put_thread_subscription().ok().expect(0).mount().await; + + server.mock_room_send().ok(event_id!("$reply")).mock_once().mount().await; + + let mut content = RoomMessageEventContent::text_plain("hello world"); + content.relates_to = + Some(Relation::Thread(Thread::plain(thread_root.to_owned(), thread_root.to_owned()))); + room.send_queue().send(content.into()).await.unwrap(); + + // Let the send queue process the event. + assert_let_timeout!(Ok(RoomSendQueueUpdate::NewLocalEvent(..)) = stream.recv()); + assert_let_timeout!(Ok(RoomSendQueueUpdate::SentEvent { .. }) = stream.recv()); + assert_let_timeout!(Ok(()) = thread_subscriber_updates.recv()); + + sleep(Duration::from_millis(100)).await; +} diff --git a/labs/multiverse/src/main.rs b/labs/multiverse/src/main.rs index 8e10dc97123..9f9bd58ea6f 100644 --- a/labs/multiverse/src/main.rs +++ b/labs/multiverse/src/main.rs @@ -21,6 +21,7 @@ use imbl::Vector; use layout::Flex; use matrix_sdk::{ AuthSession, Client, SqliteCryptoStore, SqliteEventCacheStore, SqliteStateStore, + ThreadingSupport, authentication::matrix::MatrixSession, config::StoreConfig, encryption::{BackupDownloadStrategy, EncryptionSettings}, @@ -554,7 +555,8 @@ async fn configure_client(cli: Cli) -> Result { backup_download_strategy: BackupDownloadStrategy::AfterDecryptionFailure, auto_enable_backups: true, }) - .with_enable_share_history_on_invite(true); + .with_enable_share_history_on_invite(true) + .with_threading_support(ThreadingSupport::Enabled { with_subscriptions: true }); if let Some(proxy_url) = proxy { client_builder = client_builder.proxy(proxy_url).disable_ssl_verification();