Skip to content

Commit 1a5cb2b

Browse files
committed
feat(stores): allow saving thread subscriptions
1 parent b645c11 commit 1a5cb2b

File tree

8 files changed

+325
-11
lines changed

8 files changed

+325
-11
lines changed

crates/matrix-sdk-base/src/store/integration_tests.rs

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,10 @@ use super::{
4343
use crate::{
4444
RoomInfo, RoomMemberships, RoomState, StateChanges, StateStoreDataKey, StateStoreDataValue,
4545
deserialized_responses::MemberEvent,
46-
store::{ChildTransactionId, QueueWedgeError, Result, SerializableEventContent, StateStoreExt},
46+
store::{
47+
ChildTransactionId, QueueWedgeError, Result, SerializableEventContent, StateStoreExt,
48+
ThreadStatus,
49+
},
4750
};
4851

4952
/// `StateStore` integration tests.
@@ -98,6 +101,8 @@ pub trait StateStoreIntegrationTests {
98101
async fn test_server_info_saving(&self);
99102
/// Test fetching room infos based on [`RoomLoadSettings`].
100103
async fn test_get_room_infos(&self);
104+
/// Test loading thread subscriptions.
105+
async fn test_thread_subscriptions(&self);
101106
}
102107

103108
impl StateStoreIntegrationTests for DynStateStore {
@@ -1767,6 +1772,53 @@ impl StateStoreIntegrationTests for DynStateStore {
17671772
assert_eq!(all_rooms.len(), 0);
17681773
}
17691774
}
1775+
1776+
async fn test_thread_subscriptions(&self) {
1777+
let first_thread = event_id!("$t1");
1778+
let second_thread = event_id!("$t2");
1779+
1780+
// At first, there is no thread subscription.
1781+
let maybe_status = self.load_thread_subscription(room_id(), first_thread).await.unwrap();
1782+
assert!(maybe_status.is_none());
1783+
1784+
let maybe_status = self.load_thread_subscription(room_id(), second_thread).await.unwrap();
1785+
assert!(maybe_status.is_none());
1786+
1787+
// Setting the thread subscription works.
1788+
self.upsert_thread_subscription(
1789+
room_id(),
1790+
first_thread,
1791+
ThreadStatus::Subscribed { automatic: true },
1792+
)
1793+
.await
1794+
.unwrap();
1795+
1796+
self.upsert_thread_subscription(
1797+
room_id(),
1798+
second_thread,
1799+
ThreadStatus::Subscribed { automatic: false },
1800+
)
1801+
.await
1802+
.unwrap();
1803+
1804+
// Now, reading the thread subscription returns the expected status.
1805+
let maybe_status = self.load_thread_subscription(room_id(), first_thread).await.unwrap();
1806+
assert_eq!(maybe_status, Some(ThreadStatus::Subscribed { automatic: true }));
1807+
let maybe_status = self.load_thread_subscription(room_id(), second_thread).await.unwrap();
1808+
assert_eq!(maybe_status, Some(ThreadStatus::Subscribed { automatic: false }));
1809+
1810+
// We can override the thread subscription status.
1811+
self.upsert_thread_subscription(room_id(), first_thread, ThreadStatus::Unsubscribed)
1812+
.await
1813+
.unwrap();
1814+
1815+
// And it's correctly reflected.
1816+
let maybe_status = self.load_thread_subscription(room_id(), first_thread).await.unwrap();
1817+
assert_eq!(maybe_status, Some(ThreadStatus::Unsubscribed));
1818+
// And the second thread is still subscribed.
1819+
let maybe_status = self.load_thread_subscription(room_id(), second_thread).await.unwrap();
1820+
assert_eq!(maybe_status, Some(ThreadStatus::Subscribed { automatic: false }));
1821+
}
17701822
}
17711823

17721824
/// Macro building to allow your StateStore implementation to run the entire
@@ -1937,6 +1989,12 @@ macro_rules! statestore_integration_tests {
19371989
let store = get_store().await.expect("creating store failed").into_state_store();
19381990
store.test_get_room_infos().await;
19391991
}
1992+
1993+
#[async_test]
1994+
async fn test_thread_subscriptions() {
1995+
let store = get_store().await.expect("creating store failed").into_state_store();
1996+
store.test_thread_subscriptions().await;
1997+
}
19401998
}
19411999
};
19422000
}

crates/matrix-sdk-base/src/store/memory_store.rs

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ use super::{
4545
use crate::{
4646
MinimalRoomMemberEvent, RoomMemberships, StateStoreDataKey, StateStoreDataValue,
4747
deserialized_responses::{DisplayName, RawAnySyncOrStrippedState},
48-
store::QueueWedgeError,
48+
store::{QueueWedgeError, ThreadStatus},
4949
};
5050

5151
#[derive(Debug, Default)]
@@ -75,7 +75,6 @@ struct MemoryStoreInner {
7575
OwnedRoomId,
7676
HashMap<(String, Option<String>), HashMap<OwnedUserId, (OwnedEventId, Receipt)>>,
7777
>,
78-
7978
room_event_receipts: HashMap<
8079
OwnedRoomId,
8180
HashMap<(String, Option<String>), HashMap<OwnedEventId, HashMap<OwnedUserId, Receipt>>>,
@@ -84,6 +83,7 @@ struct MemoryStoreInner {
8483
send_queue_events: BTreeMap<OwnedRoomId, Vec<QueuedRequest>>,
8584
dependent_send_queue_events: BTreeMap<OwnedRoomId, Vec<DependentQueuedRequest>>,
8685
seen_knock_requests: BTreeMap<OwnedRoomId, BTreeMap<OwnedEventId, OwnedUserId>>,
86+
thread_subscriptions: BTreeMap<OwnedRoomId, BTreeMap<OwnedEventId, ThreadStatus>>,
8787
}
8888

8989
/// In-memory, non-persistent implementation of the `StateStore`.
@@ -938,10 +938,6 @@ impl StateStore for MemoryStore {
938938
}
939939
}
940940

941-
/// List all the dependent send queue events.
942-
///
943-
/// This returns absolutely all the dependent send queue events, whether
944-
/// they have an event id or not.
945941
async fn load_dependent_queued_requests(
946942
&self,
947943
room: &RoomId,
@@ -955,6 +951,35 @@ impl StateStore for MemoryStore {
955951
.cloned()
956952
.unwrap_or_default())
957953
}
954+
955+
async fn upsert_thread_subscription(
956+
&self,
957+
room: &RoomId,
958+
thread_id: &EventId,
959+
status: ThreadStatus,
960+
) -> Result<(), Self::Error> {
961+
self.inner
962+
.write()
963+
.unwrap()
964+
.thread_subscriptions
965+
.entry(room.to_owned())
966+
.or_default()
967+
.insert(thread_id.to_owned(), status);
968+
Ok(())
969+
}
970+
971+
async fn load_thread_subscription(
972+
&self,
973+
room: &RoomId,
974+
thread_id: &EventId,
975+
) -> Result<Option<ThreadStatus>, Self::Error> {
976+
let inner = self.inner.read().unwrap();
977+
Ok(inner
978+
.thread_subscriptions
979+
.get(room)
980+
.and_then(|subscriptions| subscriptions.get(thread_id))
981+
.copied())
982+
}
958983
}
959984

960985
#[cfg(test)]

crates/matrix-sdk-base/src/store/mod.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,20 +102,25 @@ pub enum StoreError {
102102
/// An error happened in the underlying database backend.
103103
#[error(transparent)]
104104
Backend(Box<dyn std::error::Error + Send + Sync>),
105+
105106
/// An error happened while serializing or deserializing some data.
106107
#[error(transparent)]
107108
Json(#[from] serde_json::Error),
109+
108110
/// An error happened while deserializing a Matrix identifier, e.g. an user
109111
/// id.
110112
#[error(transparent)]
111113
Identifier(#[from] ruma::IdParseError),
114+
112115
/// The store is locked with a passphrase and an incorrect passphrase was
113116
/// given.
114117
#[error("The store failed to be unlocked")]
115118
StoreLocked,
119+
116120
/// An unencrypted store was tried to be unlocked with a passphrase.
117121
#[error("The store is not encrypted but was tried to be opened with a passphrase")]
118122
UnencryptedStore,
123+
119124
/// The store failed to encrypt or decrypt some data.
120125
#[error("Error encrypting or decrypting data from the store: {0}")]
121126
Encryption(#[from] StoreEncryptionError),
@@ -130,11 +135,19 @@ pub enum StoreError {
130135
version: {0}, latest version: {1}"
131136
)]
132137
UnsupportedDatabaseVersion(usize, usize),
138+
133139
/// Redacting an event in the store has failed.
134140
///
135141
/// This should never happen.
136142
#[error("Redaction failed: {0}")]
137143
Redaction(#[source] ruma::canonical_json::RedactionError),
144+
145+
/// The store contains invalid data.
146+
#[error("The store contains invalid data: {details}")]
147+
InvalidData {
148+
/// Details about which data is invalid, and how.
149+
details: String,
150+
},
138151
}
139152

140153
impl StoreError {
@@ -439,6 +452,47 @@ pub enum RoomLoadSettings {
439452
One(OwnedRoomId),
440453
}
441454

455+
/// Status of a thread subscription, as saved in the state store.
456+
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
457+
pub enum ThreadStatus {
458+
/// The thread is subscribed to.
459+
Subscribed {
460+
/// Whether the subscription was made automatically by a client, not by
461+
/// manual user choice.
462+
automatic: bool,
463+
},
464+
/// The thread is unsubscribed to (it won't cause any notifications or
465+
/// automatic subscription anymore).
466+
Unsubscribed,
467+
}
468+
469+
impl ThreadStatus {
470+
/// Convert the current [`ThreadStatus`] into a string representation.
471+
pub fn as_str(&self) -> &'static str {
472+
match self {
473+
ThreadStatus::Subscribed { automatic } => {
474+
if *automatic {
475+
"automatic"
476+
} else {
477+
"manual"
478+
}
479+
}
480+
ThreadStatus::Unsubscribed => "unsubscribed",
481+
}
482+
}
483+
484+
/// Convert a string representation into a [`ThreadStatus`], if it is a
485+
/// valid one, or `None` otherwise.
486+
pub fn from_value(s: &str) -> Option<Self> {
487+
match s {
488+
"automatic" => Some(ThreadStatus::Subscribed { automatic: true }),
489+
"manual" => Some(ThreadStatus::Subscribed { automatic: false }),
490+
"unsubscribed" => Some(ThreadStatus::Unsubscribed),
491+
_ => None,
492+
}
493+
}
494+
}
495+
442496
/// Store state changes and pass them to the StateStore.
443497
#[derive(Clone, Debug, Default)]
444498
pub struct StateChanges {

crates/matrix-sdk-base/src/store/traits.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ use crate::{
5555
deserialized_responses::{
5656
DisplayName, RawAnySyncOrStrippedState, RawMemberEvent, RawSyncOrStrippedState,
5757
},
58+
store::ThreadStatus,
5859
};
5960

6061
/// An abstract state store trait that can be used to implement different stores
@@ -478,6 +479,27 @@ pub trait StateStore: AsyncTraitDeps {
478479
&self,
479480
room: &RoomId,
480481
) -> Result<Vec<DependentQueuedRequest>, Self::Error>;
482+
483+
/// Insert or update a thread subscription for a given room and thread.
484+
///
485+
/// Note: there's no way to remove a thread subscription, because it's
486+
/// either subscribed to, or unsubscribed to, after it's been saved for
487+
/// the first time.
488+
async fn upsert_thread_subscription(
489+
&self,
490+
room: &RoomId,
491+
thread_id: &EventId,
492+
status: ThreadStatus,
493+
) -> Result<(), Self::Error>;
494+
495+
/// Loads the current thread subscription for a given room and thread.
496+
///
497+
/// Returns `None` if there was no entry for the given room/thread pair.
498+
async fn load_thread_subscription(
499+
&self,
500+
room: &RoomId,
501+
thread_id: &EventId,
502+
) -> Result<Option<ThreadStatus>, Self::Error>;
481503
}
482504

483505
#[repr(transparent)]
@@ -772,6 +794,23 @@ impl<T: StateStore> StateStore for EraseStateStoreError<T> {
772794
.await
773795
.map_err(Into::into)
774796
}
797+
798+
async fn upsert_thread_subscription(
799+
&self,
800+
room: &RoomId,
801+
thread_id: &EventId,
802+
status: ThreadStatus,
803+
) -> Result<(), Self::Error> {
804+
self.0.upsert_thread_subscription(room, thread_id, status).await.map_err(Into::into)
805+
}
806+
807+
async fn load_thread_subscription(
808+
&self,
809+
room: &RoomId,
810+
thread_id: &EventId,
811+
) -> Result<Option<ThreadStatus>, Self::Error> {
812+
self.0.load_thread_subscription(room, thread_id).await.map_err(Into::into)
813+
}
775814
}
776815

777816
/// Convenience functionality for state stores.

crates/matrix-sdk-indexeddb/src/state_store/migrations.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ use super::{
4646
};
4747
use crate::IndexeddbStateStoreError;
4848

49-
const CURRENT_DB_VERSION: u32 = 12;
49+
const CURRENT_DB_VERSION: u32 = 13;
5050
const CURRENT_META_DB_VERSION: u32 = 2;
5151

5252
/// Sometimes Migrations can't proceed without having to drop existing
@@ -237,6 +237,9 @@ pub async fn upgrade_inner_db(
237237
if old_version < 12 {
238238
db = migrate_to_v12(db).await?;
239239
}
240+
if old_version < 13 {
241+
db = migrate_to_v13(db).await?;
242+
}
240243
}
241244

242245
db.close();
@@ -793,6 +796,16 @@ async fn migrate_to_v12(db: IdbDatabase) -> Result<IdbDatabase> {
793796
Ok(IdbDatabase::open_u32(&name, 12)?.await?)
794797
}
795798

799+
/// Add the thread subscriptions table.
800+
async fn migrate_to_v13(db: IdbDatabase) -> Result<IdbDatabase> {
801+
let migration = OngoingMigration {
802+
drop_stores: [].into(),
803+
create_stores: [keys::THREAD_SUBSCRIPTIONS].into_iter().collect(),
804+
data: Default::default(),
805+
};
806+
apply_migration(db, 13, migration).await
807+
}
808+
796809
#[cfg(all(test, target_family = "wasm"))]
797810
mod tests {
798811
wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);

0 commit comments

Comments
 (0)