Skip to content

Commit d180d49

Browse files
committed
refactor(threads): do not store the unsubscribed state in the DB
1 parent bcee5ba commit d180d49

File tree

10 files changed

+125
-55
lines changed

10 files changed

+125
-55
lines changed

bindings/matrix-sdk-ffi/src/room/mod.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1093,8 +1093,8 @@ impl Room {
10931093
Ok(Arc::new(RoomPreview::new(AsyncRuntimeDropped::new(client), room_preview)))
10941094
}
10951095

1096-
/// Toggle a MSC4306 subscription to a thread in this room, based on the
1097-
/// thread root event id.
1096+
/// Set a MSC4306 subscription to a thread in this room, based on the thread
1097+
/// root event id.
10981098
///
10991099
/// If `subscribed` is `true`, it will subscribe to the thread, with a
11001100
/// precision that the subscription was manually requested by the user
@@ -1135,7 +1135,6 @@ impl Room {
11351135
matrix_sdk::room::ThreadStatus::Subscribed { automatic } => {
11361136
ThreadStatus::Subscribed { automatic }
11371137
}
1138-
matrix_sdk::room::ThreadStatus::Unsubscribed => ThreadStatus::Unsubscribed,
11391138
}))
11401139
}
11411140
}
@@ -1149,9 +1148,6 @@ pub enum ThreadStatus {
11491148
/// mention) or if it was manually requested by the user.
11501149
automatic: bool,
11511150
},
1152-
1153-
/// The thread is not subscribed to.
1154-
Unsubscribed,
11551151
}
11561152

11571153
/// A listener for receiving new live location shares in a room.

crates/matrix-sdk-base/src/store/integration_tests.rs

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1810,16 +1810,33 @@ impl StateStoreIntegrationTests for DynStateStore {
18101810
assert_eq!(maybe_status, Some(ThreadStatus::Subscribed { automatic: false }));
18111811

18121812
// We can override the thread subscription status.
1813-
self.upsert_thread_subscription(room_id(), first_thread, ThreadStatus::Unsubscribed)
1814-
.await
1815-
.unwrap();
1813+
self.upsert_thread_subscription(
1814+
room_id(),
1815+
first_thread,
1816+
ThreadStatus::Subscribed { automatic: false },
1817+
)
1818+
.await
1819+
.unwrap();
18161820

18171821
// And it's correctly reflected.
18181822
let maybe_status = self.load_thread_subscription(room_id(), first_thread).await.unwrap();
1819-
assert_eq!(maybe_status, Some(ThreadStatus::Unsubscribed));
1823+
assert_eq!(maybe_status, Some(ThreadStatus::Subscribed { automatic: false }));
18201824
// And the second thread is still subscribed.
18211825
let maybe_status = self.load_thread_subscription(room_id(), second_thread).await.unwrap();
18221826
assert_eq!(maybe_status, Some(ThreadStatus::Subscribed { automatic: false }));
1827+
1828+
// We can remove a thread subscription.
1829+
self.remove_thread_subscription(room_id(), second_thread).await.unwrap();
1830+
1831+
// And it's correctly reflected.
1832+
let maybe_status = self.load_thread_subscription(room_id(), second_thread).await.unwrap();
1833+
assert_eq!(maybe_status, None);
1834+
// And the first thread is still subscribed.
1835+
let maybe_status = self.load_thread_subscription(room_id(), first_thread).await.unwrap();
1836+
assert_eq!(maybe_status, Some(ThreadStatus::Subscribed { automatic: false }));
1837+
1838+
// Removing a thread subscription for an unknown thread is a no-op.
1839+
self.remove_thread_subscription(room_id(), second_thread).await.unwrap();
18231840
}
18241841
}
18251842

crates/matrix-sdk-base/src/store/memory_store.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -982,6 +982,27 @@ impl StateStore for MemoryStore {
982982
.and_then(|subscriptions| subscriptions.get(thread_id))
983983
.copied())
984984
}
985+
986+
async fn remove_thread_subscription(
987+
&self,
988+
room: &RoomId,
989+
thread_id: &EventId,
990+
) -> Result<(), Self::Error> {
991+
let mut inner = self.inner.write().unwrap();
992+
993+
let Some(room_subs) = inner.thread_subscriptions.get_mut(room) else {
994+
return Ok(());
995+
};
996+
997+
room_subs.remove(thread_id);
998+
999+
if room_subs.is_empty() {
1000+
// If there are no more subscriptions for this room, remove the room entry.
1001+
inner.thread_subscriptions.remove(room);
1002+
}
1003+
1004+
Ok(())
1005+
}
9851006
}
9861007

9871008
#[cfg(test)]

crates/matrix-sdk-base/src/store/mod.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -462,9 +462,6 @@ pub enum ThreadStatus {
462462
/// manual user choice.
463463
automatic: bool,
464464
},
465-
/// The thread is unsubscribed to (it won't cause any notifications or
466-
/// automatic subscription anymore).
467-
Unsubscribed,
468465
}
469466

470467
impl ThreadStatus {
@@ -478,7 +475,6 @@ impl ThreadStatus {
478475
"manual"
479476
}
480477
}
481-
ThreadStatus::Unsubscribed => "unsubscribed",
482478
}
483479
}
484480

@@ -488,7 +484,6 @@ impl ThreadStatus {
488484
match s {
489485
"automatic" => Some(ThreadStatus::Subscribed { automatic: true }),
490486
"manual" => Some(ThreadStatus::Subscribed { automatic: false }),
491-
"unsubscribed" => Some(ThreadStatus::Unsubscribed),
492487
_ => None,
493488
}
494489
}

crates/matrix-sdk-base/src/store/traits.rs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -481,17 +481,22 @@ pub trait StateStore: AsyncTraitDeps {
481481
) -> Result<Vec<DependentQueuedRequest>, Self::Error>;
482482

483483
/// Insert or update a thread subscription for a given room and thread.
484-
///
485-
/// Note: there's no way to remove a thread subscription, because it's
486-
/// either subscribed to, or unsubscribed to, after it's been saved for
487-
/// the first time.
488484
async fn upsert_thread_subscription(
489485
&self,
490486
room: &RoomId,
491487
thread_id: &EventId,
492488
status: ThreadStatus,
493489
) -> Result<(), Self::Error>;
494490

491+
/// Remove a previous thread subscription for a given room and thread.
492+
///
493+
/// Note: removing an unknown thread subscription is a no-op.
494+
async fn remove_thread_subscription(
495+
&self,
496+
room: &RoomId,
497+
thread_id: &EventId,
498+
) -> Result<(), Self::Error>;
499+
495500
/// Loads the current thread subscription for a given room and thread.
496501
///
497502
/// Returns `None` if there was no entry for the given room/thread pair.
@@ -811,6 +816,14 @@ impl<T: StateStore> StateStore for EraseStateStoreError<T> {
811816
) -> Result<Option<ThreadStatus>, Self::Error> {
812817
self.0.load_thread_subscription(room, thread_id).await.map_err(Into::into)
813818
}
819+
820+
async fn remove_thread_subscription(
821+
&self,
822+
room: &RoomId,
823+
thread_id: &EventId,
824+
) -> Result<(), Self::Error> {
825+
self.0.remove_thread_subscription(room, thread_id).await.map_err(Into::into)
826+
}
814827
}
815828

816829
/// Convenience functionality for state stores.

crates/matrix-sdk-indexeddb/src/state_store/mod.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1837,6 +1837,21 @@ impl_state_store!({
18371837

18381838
Ok(Some(status))
18391839
}
1840+
1841+
async fn remove_thread_subscription(&self, room: &RoomId, thread_id: &EventId) -> Result<()> {
1842+
let encoded_key = self.encode_key(keys::THREAD_SUBSCRIPTIONS, (room, thread_id));
1843+
1844+
self.inner
1845+
.transaction_on_one_with_mode(
1846+
keys::THREAD_SUBSCRIPTIONS,
1847+
IdbTransactionMode::Readwrite,
1848+
)?
1849+
.object_store(keys::THREAD_SUBSCRIPTIONS)?
1850+
.delete(&encoded_key)?
1851+
.await?;
1852+
1853+
Ok(())
1854+
}
18401855
});
18411856

18421857
/// A room member.

crates/matrix-sdk-sqlite/src/state_store.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2142,6 +2142,25 @@ impl StateStore for SqliteStateStore {
21422142
})
21432143
.transpose()?)
21442144
}
2145+
2146+
async fn remove_thread_subscription(
2147+
&self,
2148+
room_id: &RoomId,
2149+
thread_id: &EventId,
2150+
) -> Result<(), Self::Error> {
2151+
let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
2152+
let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
2153+
2154+
self.acquire()
2155+
.await?
2156+
.execute(
2157+
"DELETE FROM thread_subscriptions WHERE room_id = ? AND event_id = ?",
2158+
(room_id, thread_id),
2159+
)
2160+
.await?;
2161+
2162+
Ok(())
2163+
}
21452164
}
21462165

21472166
#[derive(Debug, Clone, Serialize, Deserialize)]

crates/matrix-sdk/src/room/mod.rs

Lines changed: 26 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -3675,6 +3675,7 @@ impl Room {
36753675
/// - An `Ok` result if the subscription was successful, or if the server
36763676
/// skipped an automatic subscription (as the user unsubscribed from the
36773677
/// thread after the event causing the automatic subscription).
3678+
#[instrument(skip(self), fields(room_id = %self.room_id()))]
36783679
pub async fn subscribe_thread(
36793680
&self,
36803681
thread_root: OwnedEventId,
@@ -3733,6 +3734,7 @@ impl Room {
37333734
/// - An `Ok` result if the unsubscription was successful, or the thread was
37343735
/// already unsubscribed.
37353736
/// - A 404 error if the event isn't known, or isn't a thread root.
3737+
#[instrument(skip(self), fields(room_id = %self.room_id()))]
37363738
pub async fn unsubscribe_thread(&self, thread_root: OwnedEventId) -> Result<()> {
37373739
self.client
37383740
.send(unsubscribe_thread::unstable::Request::new(
@@ -3741,11 +3743,10 @@ impl Room {
37413743
))
37423744
.await?;
37433745

3746+
trace!("Server acknowledged the thread subscription removal; removed it from db too");
3747+
37443748
// Immediately save the result into the database.
3745-
self.client
3746-
.state_store()
3747-
.upsert_thread_subscription(self.room_id(), &thread_root, ThreadStatus::Unsubscribed)
3748-
.await?;
3749+
self.client.state_store().remove_thread_subscription(self.room_id(), &thread_root).await?;
37493750

37503751
Ok(())
37513752
}
@@ -3766,6 +3767,7 @@ impl Room {
37663767
/// event couldn't be found, or the event isn't a thread.
37673768
/// - An error if the request fails for any other reason, such as a network
37683769
/// error.
3770+
#[instrument(skip(self), fields(room_id = %self.room_id()))]
37693771
pub async fn fetch_thread_subscription(
37703772
&self,
37713773
thread_root: OwnedEventId,
@@ -3778,36 +3780,29 @@ impl Room {
37783780
))
37793781
.await;
37803782

3781-
match result {
3782-
Ok(response) => Ok(Some(ThreadStatus::Subscribed { automatic: response.automatic })),
3783+
let sub = match result {
3784+
Ok(response) => Some(ThreadStatus::Subscribed { automatic: response.automatic }),
37833785
Err(http_error) => match http_error.as_client_api_error() {
3784-
Some(error) if error.status_code == StatusCode::NOT_FOUND => {
3785-
// At this point the server returned no subscriptions, which can mean that the
3786-
// endpoint doesn't exist (not enabled/implemented yet on the server), or that
3787-
// the thread doesn't exist, or that the user has unsubscribed from it
3788-
// previously.
3789-
//
3790-
// If we had any information about prior unsubscription, we can use it here to
3791-
// return something slightly more precise than what the server returned.
3792-
let stored_status = self
3793-
.client
3794-
.state_store()
3795-
.load_thread_subscription(self.room_id(), &thread_root)
3796-
.await?;
3797-
3798-
if let Some(ThreadStatus::Unsubscribed) = stored_status {
3799-
// The thread was unsubscribed from before, so maintain this information.
3800-
Ok(Some(ThreadStatus::Unsubscribed))
3801-
} else {
3802-
// We either have stale information (the thread was marked as subscribed
3803-
// to, but the server said it wasn't), or we didn't have any information.
3804-
// Return unknown.
3805-
Ok(None)
3806-
}
3807-
}
3808-
_ => Err(http_error.into()),
3786+
Some(error) if error.status_code == StatusCode::NOT_FOUND => None,
3787+
_ => return Err(http_error.into()),
38093788
},
3789+
};
3790+
3791+
// Keep the database in sync.
3792+
if let Some(sub) = &sub {
3793+
self.client
3794+
.state_store()
3795+
.upsert_thread_subscription(self.room_id(), &thread_root, *sub)
3796+
.await?;
3797+
} else {
3798+
// If the subscription was not found, remove it from the database.
3799+
self.client
3800+
.state_store()
3801+
.remove_thread_subscription(self.room_id(), &thread_root)
3802+
.await?;
38103803
}
3804+
3805+
Ok(sub)
38113806
}
38123807
}
38133808

crates/matrix-sdk/tests/integration/room/thread.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,10 @@ async fn test_subscribe_thread() {
5656

5757
room.unsubscribe_thread(root_id.clone()).await.unwrap();
5858

59-
// Now, if I retry to get the subscription status for this thread, it's
60-
// unsubscribed.
59+
// Now, if I retry to get the subscription status for this thread, it doesn't
60+
// exist anymore.
6161
let subscription = room.fetch_thread_subscription(root_id.clone()).await.unwrap();
62-
assert_matches!(subscription, Some(ThreadStatus::Unsubscribed));
62+
assert_matches!(subscription, None);
6363

6464
// Subscribing automatically to the thread may also return a `M_SKIPPED` error
6565
// that should be non-fatal.
@@ -76,5 +76,5 @@ async fn test_subscribe_thread() {
7676

7777
// And in this case, the thread is still unsubscribed.
7878
let subscription = room.fetch_thread_subscription(root_id).await.unwrap();
79-
assert_matches!(subscription, Some(ThreadStatus::Unsubscribed));
79+
assert_matches!(subscription, None);
8080
}

labs/multiverse/src/widgets/room_view/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -534,7 +534,6 @@ impl RoomView {
534534
"subscribed (manual)"
535535
}
536536
}
537-
ThreadStatus::Unsubscribed => "unsubscribed",
538537
}
539538
));
540539
}

0 commit comments

Comments
 (0)