From 7167d6fe07b59cae1346baab1affdaecbe95f1ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Mon, 22 Sep 2025 08:40:31 +0200 Subject: [PATCH 01/28] chore(sqlite): Don't log the room ID twice when saving events --- crates/matrix-sdk-sqlite/src/event_cache_store.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/matrix-sdk-sqlite/src/event_cache_store.rs b/crates/matrix-sdk-sqlite/src/event_cache_store.rs index ab2654e9e59..4c8bd643907 100644 --- a/crates/matrix-sdk-sqlite/src/event_cache_store.rs +++ b/crates/matrix-sdk-sqlite/src/event_cache_store.rs @@ -1332,7 +1332,7 @@ impl EventCacheStore for SqliteEventCacheStore { let _timer = timer!("method"); let Some(event_id) = event.event_id() else { - error!(%room_id, "Trying to save an event with no ID"); + error!("Trying to save an event with no ID"); return Ok(()); }; From 68b570b639a97ce84336edecbd159195b008be21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Mon, 22 Sep 2025 08:40:31 +0200 Subject: [PATCH 02/28] feat: Add a method to get the event type of a TimelineEventKind --- crates/matrix-sdk-common/src/deserialized_responses.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/crates/matrix-sdk-common/src/deserialized_responses.rs b/crates/matrix-sdk-common/src/deserialized_responses.rs index fdde96dbf21..c7b29119cd4 100644 --- a/crates/matrix-sdk-common/src/deserialized_responses.rs +++ b/crates/matrix-sdk-common/src/deserialized_responses.rs @@ -941,6 +941,14 @@ impl TimelineEventKind { TimelineEventKind::PlainText { .. } => None, } } + + /// Get the event type of this event. + /// + /// Returns `None` if there isn't an event type or if the event failed to be + /// deserialized. + pub fn event_type(&self) -> Option { + self.raw().get_field("type").ok().flatten() + } } #[cfg(not(tarpaulin_include))] From 48cca6c2cde229ace2e52a9a02e79c0abe2c00ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Mon, 22 Sep 2025 08:40:31 +0200 Subject: [PATCH 03/28] refactor(sqlite): Save the event type of an event in the SQLite event cache --- .../012_store_event_type.sql | 46 +++++++++++++++++ .../src/event_cache_store.rs | 51 +++++++++++++++---- 2 files changed, 88 insertions(+), 9 deletions(-) create mode 100644 crates/matrix-sdk-sqlite/migrations/event_cache_store/012_store_event_type.sql diff --git a/crates/matrix-sdk-sqlite/migrations/event_cache_store/012_store_event_type.sql b/crates/matrix-sdk-sqlite/migrations/event_cache_store/012_store_event_type.sql new file mode 100644 index 00000000000..2a6633a7ae7 --- /dev/null +++ b/crates/matrix-sdk-sqlite/migrations/event_cache_store/012_store_event_type.sql @@ -0,0 +1,46 @@ +-- After the merge of https://github.com/matrix-org/matrix-rust-sdk/pull/5648, +-- we want all events to get a `TimelineEvent::timestamp` value (extracted from +-- `origin_server_ts`). +-- +-- To accomplish that, we are emptying the event cache. New synced events will +-- be built correctly, with a valid `TimelineEvent::timestamp`, allowing a +-- clear, stable situation. + +DELETE from linked_chunks; +DELETE from event_chunks; -- should be done by cascading +DELETE from gap_chunks; -- should be done by cascading +DELETE from events; + +DROP TABLE events; + +-- Events and their content. +CREATE TABLE "events" ( + -- The room in which the event is located. + "room_id" BLOB NOT NULL, + + -- The `OwnedEventId` of this event. + "event_id" BLOB NOT NULL, + + -- The event type of this event. + "event_type" BLOB NOT NULL, + + -- The ID of the session that was used to encrypt this event, may be null if + -- the event wasn't encrypted. + "session_id" BLOB NULL, + + -- JSON serialized `TimelineEvent` (encrypted value). + "content" BLOB NOT NULL, + + -- If this event is an aggregation (related event), the event id of the event it relates to. + -- Can be null if this event isn't an aggregation. + "relates_to" BLOB, + + -- If this event is an aggregation (related event), the kind of relation it has to the event it + -- relates to. + -- Can be null if this event isn't an aggregation. + "rel_type" BLOB, + + -- Primary key is the event ID. + PRIMARY KEY (event_id) +) +WITHOUT ROWID; diff --git a/crates/matrix-sdk-sqlite/src/event_cache_store.rs b/crates/matrix-sdk-sqlite/src/event_cache_store.rs index 4c8bd643907..f0d8efcf243 100644 --- a/crates/matrix-sdk-sqlite/src/event_cache_store.rs +++ b/crates/matrix-sdk-sqlite/src/event_cache_store.rs @@ -53,6 +53,7 @@ use crate::{ mod keys { // Tables pub const LINKED_CHUNKS: &str = "linked_chunks"; + pub const EVENTS: &str = "events"; } /// The database name. @@ -63,7 +64,7 @@ const DATABASE_NAME: &str = "matrix-sdk-event-cache.sqlite3"; /// This is used to figure whether the SQLite database requires a migration. /// Every new SQL migration should imply a bump of this number, and changes in /// the [`run_migrations`] function. -const DATABASE_VERSION: u8 = 11; +const DATABASE_VERSION: u8 = 12; /// The string used to identify a chunk of type events, in the `type` field in /// the database. @@ -472,6 +473,16 @@ async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> { .await?; } + if version < 12 { + conn.with_transaction(|txn| { + txn.execute_batch(include_str!( + "../migrations/event_cache_store/012_store_event_type.sql" + ))?; + txn.set_db_version(12) + }) + .await?; + } + Ok(()) } @@ -632,7 +643,7 @@ impl EventCacheStore for SqliteEventCacheStore { // deduplicated and moved to another position; or because it was inserted // outside the context of a linked chunk (e.g. pinned event). let mut content_statement = txn.prepare( - "INSERT OR REPLACE INTO events(room_id, event_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?)" + "INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)" )?; let invalid_event = |event: TimelineEvent| { @@ -641,20 +652,27 @@ impl EventCacheStore for SqliteEventCacheStore { return None; }; - Some((event_id.to_string(), event)) + let Some(event_type) = event.kind.event_type() else { + error!(%event_id, "Trying to save an event with no event type"); + return None; + }; + + Some((event_id.to_string(), event_type, event)) }; let room_id = linked_chunk_id.room_id(); let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id); - for (i, (event_id, event)) in items.into_iter().filter_map(invalid_event).enumerate() { + for (i, (event_id, event_type, event)) in items.into_iter().filter_map(invalid_event).enumerate() { // Insert the location information into the database. let index = at.index() + i; chunk_statement.execute((chunk_id, &hashed_linked_chunk_id, &event_id, index))?; + let session_id = event.kind.session_id().map(|s| this.encode_key(keys::EVENTS, s)); + // Now, insert the event content into the database. let encoded_event = this.encode_event(&event)?; - content_statement.execute((&hashed_room_id, event_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?; + content_statement.execute((&hashed_room_id, event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?; } } @@ -671,6 +689,13 @@ impl EventCacheStore for SqliteEventCacheStore { continue; }; + let Some(event_type) = event.kind.event_type() else { + error!(%event_id, "Trying to save an event with no event type"); + continue; + }; + + let session_id = event.kind.session_id().map(|s| this.encode_key(keys::EVENTS, s)); + // Replace the event's content. Really we'd like to update, but in case the // event id changed, we are a bit lenient here and will allow an insertion // of the new event. @@ -678,8 +703,8 @@ impl EventCacheStore for SqliteEventCacheStore { let room_id = linked_chunk_id.room_id(); let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id); txn.execute( - "INSERT OR REPLACE INTO events(room_id, event_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?)" - , (&hashed_room_id, &event_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?; + "INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)", + (&hashed_room_id, &event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?; // Replace the event id in the linked chunk, in case it changed. txn.execute( @@ -1336,6 +1361,14 @@ impl EventCacheStore for SqliteEventCacheStore { return Ok(()); }; + let Some(event_type) = event.kind.event_type() else { + error!(%event_id, "Trying to save an event with no event type"); + return Ok(()); + }; + + let event_type = self.encode_key(keys::EVENTS, event_type); + let session_id = event.kind.session_id().map(|s| self.encode_key(keys::EVENTS, s)); + let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id); let event_id = event_id.to_string(); let encoded_event = self.encode_event(&event)?; @@ -1344,8 +1377,8 @@ impl EventCacheStore for SqliteEventCacheStore { .await? .with_transaction(move |txn| -> Result<_> { txn.execute( - "INSERT OR REPLACE INTO events(room_id, event_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?)" - , (&hashed_room_id, &event_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?; + "INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)", + (&hashed_room_id, &event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?; Ok(()) }) From 72fe3040bd7208aea8d8f539a548faea82fe9fa0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Mon, 22 Sep 2025 08:40:31 +0200 Subject: [PATCH 04/28] feat: Allow events to be fetched by event type From 22fe54db8dbd2d380ce84074a81b1bfb178b8266 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Wed, 24 Sep 2025 10:11:34 +0200 Subject: [PATCH 05/28] chore: Fix a comment --- crates/matrix-sdk/src/event_cache/room/mod.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/room/mod.rs b/crates/matrix-sdk/src/event_cache/room/mod.rs index 670466ccafa..29555eb025e 100644 --- a/crates/matrix-sdk/src/event_cache/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/room/mod.rs @@ -2266,7 +2266,7 @@ mod timed_tests { let event_cache = client.event_cache(); - // Don't forget to subscribe and like^W enable storage! + // Don't forget to subscribe and like. event_cache.subscribe().unwrap(); client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined); @@ -2343,7 +2343,7 @@ mod timed_tests { let event_cache = client.event_cache(); - // Don't forget to subscribe and like^W enable storage! + // Don't forget to subscribe and like. event_cache.subscribe().unwrap(); client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined); @@ -2485,7 +2485,7 @@ mod timed_tests { let event_cache = client.event_cache(); - // Don't forget to subscribe and like^W enable storage! + // Don't forget to subscribe and like. event_cache.subscribe().unwrap(); client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined); @@ -2633,7 +2633,7 @@ mod timed_tests { let event_cache = client.event_cache(); - // Don't forget to subscribe and like^W enable storage! + // Don't forget to subscribe and like. event_cache.subscribe().unwrap(); // Let's check whether the generic updates are received for the initialisation. @@ -2757,7 +2757,7 @@ mod timed_tests { let event_cache = client.event_cache(); - // Don't forget to subscribe and like^W enable storage! + // Don't forget to subscribe and like. event_cache.subscribe().unwrap(); client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined); From 40720c9cdf306108480fc5c86d58bcfaea511c65 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Wed, 24 Sep 2025 10:11:34 +0200 Subject: [PATCH 06/28] chore: Make a Note a NOTE --- crates/matrix-sdk/src/event_cache/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index bdb37d38274..810e07fa553 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -935,7 +935,7 @@ impl EventCacheInner { self.multiple_room_updates_lock.lock().await }; - // Note: bnjbvr tried to make this concurrent at some point, but it turned out + // NOTE: bnjbvr tried to make this concurrent at some point, but it turned out // to be a performance regression, even for large sync updates. Lacking // time to investigate, this code remains sequential for now. See also // https://github.com/matrix-org/matrix-rust-sdk/pull/5426. From dfeedfb52728be92aebfeedb83b61ad672cf2120 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Fri, 26 Sep 2025 14:21:28 +0200 Subject: [PATCH 07/28] feat(event cache): Add a method to access the linked chunk mutably --- crates/matrix-sdk/src/event_cache/room/mod.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/crates/matrix-sdk/src/event_cache/room/mod.rs b/crates/matrix-sdk/src/event_cache/room/mod.rs index 29555eb025e..80177409503 100644 --- a/crates/matrix-sdk/src/event_cache/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/room/mod.rs @@ -1321,6 +1321,11 @@ mod private { &self.room_linked_chunk } + /// Returns a mutable reference to the underlying room linked chunk. + pub(in crate::event_cache) fn room_linked_chunk_mut(&mut self) -> &mut EventLinkedChunk { + &mut self.room_linked_chunk + } + //// Find a single event in this room, starting from the most recent event. /// /// **Warning**! It looks into the loaded events from the in-memory From e938469b7b3f8e28e3a3c3521f12144336f6eb05 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Wed, 24 Sep 2025 10:11:34 +0200 Subject: [PATCH 08/28] feat: Create the redecryptor thing --- crates/matrix-sdk/src/event_cache/mod.rs | 1 + .../matrix-sdk/src/event_cache/redecryptor.rs | 116 ++++++++++++++++++ crates/matrix-sdk/src/event_cache/room/mod.rs | 2 +- 3 files changed, 118 insertions(+), 1 deletion(-) create mode 100644 crates/matrix-sdk/src/event_cache/redecryptor.rs diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index 810e07fa553..68fae9200e3 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -74,6 +74,7 @@ use crate::{ mod deduplicator; mod pagination; +mod redecryptor; mod room; pub use pagination::{RoomPagination, RoomPaginationStatus}; diff --git a/crates/matrix-sdk/src/event_cache/redecryptor.rs b/crates/matrix-sdk/src/event_cache/redecryptor.rs new file mode 100644 index 00000000000..bd51c269313 --- /dev/null +++ b/crates/matrix-sdk/src/event_cache/redecryptor.rs @@ -0,0 +1,116 @@ +// Copyright 2025 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! The REDECRYPTOR is a layer that handles redecryption of events in case we +//! couldn't decrypt them imediatelly + +use std::sync::{Arc, Weak}; + +use futures_core::Stream; +use futures_util::{StreamExt, pin_mut}; +use matrix_sdk_base::{ + crypto::{OlmMachine, store::types::RoomKeyInfo}, + deserialized_responses::{DecryptedRoomEvent, TimelineEvent, TimelineEventKind}, + event_cache::store::{EventCacheStoreError, EventCacheStoreLock}, +}; +use matrix_sdk_common::executor::JoinHandle; +use tokio_stream::wrappers::errors::BroadcastStreamRecvError; +use tracing::warn; + +use crate::event_cache::{EventCacheError, RoomEventCacheState}; + +pub(crate) struct Redecryptor { + redecryption_task: JoinHandle<()>, + inner: Arc, +} + +impl Drop for Redecryptor { + fn drop(&mut self) { + self.redecryption_task.abort(); + } +} + +pub(crate) struct InnerRedecryptor { + store: EventCacheStoreLock, + olm_machine: OlmMachine, +} + +impl InnerRedecryptor { + /// Attempt to redecrypt events after a room key with the given session ID + /// has been received. + pub async fn retry_decryption( + &self, + room_key_info: RoomKeyInfo, + ) -> Result<(), EventCacheError> { + let event_cache: RoomEventCacheState = unimplemented!(); + + // Load the relevant events from the event cache store and attempt to redecrypt + // things. + // TODO: Do we want to have this method on the [`RoomEventCacheState`]? + let store = self.store.lock().await.unwrap(); + + // TODO: We can't load **all** events all the time. + let events = store.get_room_events(&room_key_info.room_id).await?; + + for event in events.into_iter().filter(|e| !e.kind.is_utd()) { + let Some(event_id) = event.event_id() else { + continue; + }; + + let Some((location, mut target_event)) = event_cache.find_event(&event_id).await? + else { + continue; + }; + + let decrypted = self.decrypt_event(event).await?; + target_event.kind = TimelineEventKind::Decrypted(decrypted); + + event_cache.replace_event_at(location, target_event).await? + } + + Ok(()) + } + + pub async fn decrypt_event( + &self, + event: TimelineEvent, + ) -> Result { + todo!(); + } + + pub async fn listen_for_room_keys_task( + weak_redecryptor: Weak, + received_stream: impl Stream, BroadcastStreamRecvError>>, + ) { + pin_mut!(received_stream); + + // TODO: We need to relisten to this stream if it dies. + while let Some(update) = received_stream.next().await { + let Some(decryptor) = weak_redecryptor.upgrade() else { + break; + }; + + if let Ok(room_keys) = update { + for key in room_keys { + let _ = decryptor + .retry_decryption(key) + .await + .inspect_err(|e| warn!("Error redecrypting {e:?}")); + } + } else { + todo!("Redecrypt all visible events?") + } + } + } +} diff --git a/crates/matrix-sdk/src/event_cache/room/mod.rs b/crates/matrix-sdk/src/event_cache/room/mod.rs index 80177409503..2dd86cc4d1f 100644 --- a/crates/matrix-sdk/src/event_cache/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/room/mod.rs @@ -1582,7 +1582,7 @@ mod private { /// observers that a single item has been replaced. Otherwise, /// such a notification is not emitted, because observers are /// unlikely to observe the store updates directly. - async fn replace_event_at( + pub(crate) async fn replace_event_at( &mut self, location: EventLocation, event: Event, From f3e04d35d98d292ff658ad1b30acb5798c6126ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Thu, 25 Sep 2025 11:36:41 +0200 Subject: [PATCH 09/28] WIP --- .../matrix-sdk/src/event_cache/redecryptor.rs | 72 +++++++++++++------ 1 file changed, 49 insertions(+), 23 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/redecryptor.rs b/crates/matrix-sdk/src/event_cache/redecryptor.rs index bd51c269313..2c1a4d53c35 100644 --- a/crates/matrix-sdk/src/event_cache/redecryptor.rs +++ b/crates/matrix-sdk/src/event_cache/redecryptor.rs @@ -17,18 +17,22 @@ use std::sync::{Arc, Weak}; +use as_variant::as_variant; use futures_core::Stream; use futures_util::{StreamExt, pin_mut}; use matrix_sdk_base::{ - crypto::{OlmMachine, store::types::RoomKeyInfo}, + crypto::{store::types::RoomKeyInfo, types::events::room::encrypted::EncryptedEvent}, deserialized_responses::{DecryptedRoomEvent, TimelineEvent, TimelineEventKind}, - event_cache::store::{EventCacheStoreError, EventCacheStoreLock}, }; use matrix_sdk_common::executor::JoinHandle; +use ruma::{RoomId, serde::Raw}; use tokio_stream::wrappers::errors::BroadcastStreamRecvError; use tracing::warn; -use crate::event_cache::{EventCacheError, RoomEventCacheState}; +use crate::{ + Client, + event_cache::{EventCache, EventCacheError, EventCacheInner}, +}; pub(crate) struct Redecryptor { redecryption_task: JoinHandle<()>, @@ -42,8 +46,7 @@ impl Drop for Redecryptor { } pub(crate) struct InnerRedecryptor { - store: EventCacheStoreLock, - olm_machine: OlmMachine, + cache: Weak, } impl InnerRedecryptor { @@ -53,30 +56,42 @@ impl InnerRedecryptor { &self, room_key_info: RoomKeyInfo, ) -> Result<(), EventCacheError> { - let event_cache: RoomEventCacheState = unimplemented!(); + let event_cache = EventCache { inner: self.cache.upgrade().unwrap() }; + let client: Client = event_cache.inner.client.get().unwrap(); // Load the relevant events from the event cache store and attempt to redecrypt // things. - // TODO: Do we want to have this method on the [`RoomEventCacheState`]? - let store = self.store.lock().await.unwrap(); - - // TODO: We can't load **all** events all the time. - let events = store.get_room_events(&room_key_info.room_id).await?; + let events = { + let store = event_cache.inner.store.lock().await.unwrap(); + let events = store.get_room_events(&room_key_info.room_id).await?; - for event in events.into_iter().filter(|e| !e.kind.is_utd()) { - let Some(event_id) = event.event_id() else { - continue; - }; + events + }; - let Some((location, mut target_event)) = event_cache.find_event(&event_id).await? - else { + // TODO: We can't load **all** events all the time. + let only_utd_events = |event: TimelineEvent| { + // We need the event ID and we only care about events that are still encrypted. + event.event_id().zip( + as_variant!(event.kind, TimelineEventKind::UnableToDecrypt { event, .. } => event), + ) + }; + + let (room_cache, _) = event_cache.for_room(&room_key_info.room_id).await.unwrap(); + let mut state = room_cache.inner.state.write().await; + + for (event_id, event) in events.into_iter().filter_map(only_utd_events) { + let Some((location, mut target_event)) = state.find_event(&event_id).await? else { continue; }; - let decrypted = self.decrypt_event(event).await?; - target_event.kind = TimelineEventKind::Decrypted(decrypted); + if let Some(decrypted) = self + .decrypt_event(&client, &room_key_info.room_id, event.cast_ref_unchecked()) + .await + { + target_event.kind = TimelineEventKind::Decrypted(decrypted); - event_cache.replace_event_at(location, target_event).await? + state.replace_event_at(location, target_event).await? + } } Ok(()) @@ -84,9 +99,20 @@ impl InnerRedecryptor { pub async fn decrypt_event( &self, - event: TimelineEvent, - ) -> Result { - todo!(); + client: &Client, + room_id: &RoomId, + event: &Raw, + ) -> Option { + let machine = client.olm_machine().await; + let Some(machine) = &*machine else { + return None; + }; + + match machine.decrypt_room_event(event, room_id, client.decryption_settings()).await { + Ok(decrypted) => Some(decrypted), + // TODO: Inspect the error. + Err(e) => None, + } } pub async fn listen_for_room_keys_task( From 503fa642e345afdaaa64bfbf8fe5f7e6e7faab50 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Thu, 25 Sep 2025 13:47:01 +0200 Subject: [PATCH 10/28] fixup redecryptor --- .../matrix-sdk/src/event_cache/redecryptor.rs | 116 +++++++++++------- 1 file changed, 72 insertions(+), 44 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/redecryptor.rs b/crates/matrix-sdk/src/event_cache/redecryptor.rs index 2c1a4d53c35..7ef38f59ede 100644 --- a/crates/matrix-sdk/src/event_cache/redecryptor.rs +++ b/crates/matrix-sdk/src/event_cache/redecryptor.rs @@ -15,7 +15,7 @@ //! The REDECRYPTOR is a layer that handles redecryption of events in case we //! couldn't decrypt them imediatelly -use std::sync::{Arc, Weak}; +use std::sync::Weak; use as_variant::as_variant; use futures_core::Stream; @@ -24,10 +24,11 @@ use matrix_sdk_base::{ crypto::{store::types::RoomKeyInfo, types::events::room::encrypted::EncryptedEvent}, deserialized_responses::{DecryptedRoomEvent, TimelineEvent, TimelineEventKind}, }; -use matrix_sdk_common::executor::JoinHandle; +use matrix_sdk_common::executor::spawn; use ruma::{RoomId, serde::Raw}; +use tokio::task::JoinHandle; use tokio_stream::wrappers::errors::BroadcastStreamRecvError; -use tracing::warn; +use tracing::{instrument, trace, warn}; use crate::{ Client, @@ -35,58 +36,81 @@ use crate::{ }; pub(crate) struct Redecryptor { - redecryption_task: JoinHandle<()>, - inner: Arc, + cache: Weak, } -impl Drop for Redecryptor { - fn drop(&mut self) { - self.redecryption_task.abort(); - } -} +impl Redecryptor { + pub fn new(client: Client, cache: Weak) -> JoinHandle<()> { + let redecryptor = Self { cache }; -pub(crate) struct InnerRedecryptor { - cache: Weak, -} + let task = spawn(async { + let stream = { + let machine = client.olm_machine().await; + machine.as_ref().unwrap().store().room_keys_received_stream() + }; + + drop(client); -impl InnerRedecryptor { + redecryptor.listen_for_room_keys_task(stream).await; + }); + + task + } /// Attempt to redecrypt events after a room key with the given session ID /// has been received. - pub async fn retry_decryption( + #[instrument(skip_all, fields(room_key_info))] + async fn retry_decryption( &self, + cache: &EventCache, room_key_info: RoomKeyInfo, ) -> Result<(), EventCacheError> { - let event_cache = EventCache { inner: self.cache.upgrade().unwrap() }; - let client: Client = event_cache.inner.client.get().unwrap(); + trace!("Retrying to decrypt"); // Load the relevant events from the event cache store and attempt to redecrypt // things. let events = { - let store = event_cache.inner.store.lock().await.unwrap(); + // TODO: We can't load **all** events all the time. + let store = cache.inner.store.lock().await?; let events = store.get_room_events(&room_key_info.room_id).await?; events }; - // TODO: We can't load **all** events all the time. let only_utd_events = |event: TimelineEvent| { - // We need the event ID and we only care about events that are still encrypted. - event.event_id().zip( - as_variant!(event.kind, TimelineEventKind::UnableToDecrypt { event, .. } => event), - ) + // We need the event ID to be able to replace the event. + let event_id = event.event_id(); + // We only care about events fort his particular room key, identified by the + // session ID. + let session_id = event.kind.session_id(); + + if session_id == Some(&room_key_info.session_id) { + // Only pick out events that are UTDs. + let event = as_variant!(event.kind, TimelineEventKind::UnableToDecrypt { event, .. } => event); + event.zip(event_id) + } else { + None + } }; - let (room_cache, _) = event_cache.for_room(&room_key_info.room_id).await.unwrap(); + // Get the cache for this particular room and lock the state for the duration of + // the decryption. + // TODO: Do we want to first decrypt and then do the locking and replacement, + // would be an additional loop but the lock would be free for the + // duration of the "slow" decryption attempts. + let (room_cache, _) = cache.for_room(&room_key_info.room_id).await?; let mut state = room_cache.inner.state.write().await; - for (event_id, event) in events.into_iter().filter_map(only_utd_events) { + for (event, event_id) in events.into_iter().filter_map(only_utd_events) { + // The event isn't in the cache, nothing to replace. Realistically this can't + // happen since we retrieved the list of events from the cache itself. let Some((location, mut target_event)) = state.find_event(&event_id).await? else { continue; }; - if let Some(decrypted) = self - .decrypt_event(&client, &room_key_info.room_id, event.cast_ref_unchecked()) - .await + // If we managed to decrypt the event, and we should have to since we received + // the room key for this specific event, then replace the event. + if let Some(decrypted) = + self.decrypt_event(&cache, &room_key_info.room_id, event.cast_ref_unchecked()).await { target_event.kind = TimelineEventKind::Decrypted(decrypted); @@ -97,40 +121,44 @@ impl InnerRedecryptor { Ok(()) } - pub async fn decrypt_event( + async fn decrypt_event( &self, - client: &Client, + cache: &EventCache, room_id: &RoomId, event: &Raw, ) -> Option { + let client = cache.inner.client().ok()?; let machine = client.olm_machine().await; - let Some(machine) = &*machine else { - return None; - }; + let machine = machine.as_ref()?; match machine.decrypt_room_event(event, room_id, client.decryption_settings()).await { Ok(decrypted) => Some(decrypted), - // TODO: Inspect the error. - Err(e) => None, + Err(e) => { + warn!("Failed to redecrypt an event {e:?}"); + None + } } } - pub async fn listen_for_room_keys_task( - weak_redecryptor: Weak, + async fn listen_for_room_keys_task( + self, received_stream: impl Stream, BroadcastStreamRecvError>>, ) { pin_mut!(received_stream); - // TODO: We need to relisten to this stream if it dies. + // TODO: We need to relisten to this stream if it dies due to the cross-process + // lock reloading the Olm machine. while let Some(update) = received_stream.next().await { - let Some(decryptor) = weak_redecryptor.upgrade() else { - break; - }; - if let Ok(room_keys) = update { + let Some(event_cache) = self.cache.upgrade() else { + break; + }; + + let cache = EventCache { inner: event_cache }; + for key in room_keys { - let _ = decryptor - .retry_decryption(key) + let _ = self + .retry_decryption(&cache, key) .await .inspect_err(|e| warn!("Error redecrypting {e:?}")); } From f6770a3d4c78a24e78e0a2c89f800f9bb8f30aba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Thu, 25 Sep 2025 15:50:17 +0200 Subject: [PATCH 11/28] feat(event_cache): Add the RedecryptorCtx concept --- .../matrix-sdk/src/event_cache/redecryptor.rs | 126 +++++++++++------- 1 file changed, 81 insertions(+), 45 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/redecryptor.rs b/crates/matrix-sdk/src/event_cache/redecryptor.rs index 7ef38f59ede..ccc3dd94b26 100644 --- a/crates/matrix-sdk/src/event_cache/redecryptor.rs +++ b/crates/matrix-sdk/src/event_cache/redecryptor.rs @@ -25,16 +25,87 @@ use matrix_sdk_base::{ deserialized_responses::{DecryptedRoomEvent, TimelineEvent, TimelineEventKind}, }; use matrix_sdk_common::executor::spawn; -use ruma::{RoomId, serde::Raw}; +use ruma::{OwnedEventId, RoomId, events::AnySyncTimelineEvent, serde::Raw}; use tokio::task::JoinHandle; use tokio_stream::wrappers::errors::BroadcastStreamRecvError; -use tracing::{instrument, trace, warn}; +use tracing::{info, instrument, trace, warn}; use crate::{ Client, event_cache::{EventCache, EventCacheError, EventCacheInner}, }; +pub(crate) trait RedecryptorCtx { + type Error; + + async fn get_utds( + &self, + room_key_info: &RoomKeyInfo, + ) -> Result)>, Self::Error>; + + async fn on_resolved_utds( + &self, + room_id: &RoomId, + events: Vec<(OwnedEventId, DecryptedRoomEvent)>, + ) -> Result<(), Self::Error>; +} + +impl RedecryptorCtx for EventCache { + type Error = EventCacheError; + + async fn get_utds( + &self, + room_key_info: &RoomKeyInfo, + ) -> Result)>, Self::Error> { + let only_utd_events = |event: TimelineEvent| { + // We need the event ID to be able to replace the event. + let event_id = event.event_id(); + // We only care about events fort his particular room key, identified by the + // session ID. + let session_id = event.kind.session_id(); + + if session_id == Some(&room_key_info.session_id) { + // Only pick out events that are UTDs. + let event = as_variant!(event.kind, TimelineEventKind::UnableToDecrypt { event, .. } => event); + event_id.zip(event) + } else { + None + } + }; + + // Load the relevant events from the event cache store and attempt to redecrypt + // things. + // + // TODO: We can't load **all** events all the time. + let store = self.inner.store.lock().await?; + let events = store.get_room_events(&room_key_info.room_id).await?; + + Ok(events.into_iter().filter_map(only_utd_events).collect()) + } + + async fn on_resolved_utds( + &self, + room_id: &RoomId, + events: Vec<(OwnedEventId, DecryptedRoomEvent)>, + ) -> Result<(), Self::Error> { + // Get the cache for this particular room and lock the state for the duration of + // the decryption. + let (room_cache, _) = self.for_room(room_id).await?; + let mut state = room_cache.inner.state.write().await; + + for (event_id, decrypted) in events { + // The event isn't in the cache, nothing to replace. Realistically this can't + // happen since we retrieved the list of events from the cache itself. + if let Some((location, mut target_event)) = state.find_event(&event_id).await? { + target_event.kind = TimelineEventKind::Decrypted(decrypted); + state.replace_event_at(location, target_event).await? + } + } + + Ok(()) + } +} + pub(crate) struct Redecryptor { cache: Weak, } @@ -66,58 +137,21 @@ impl Redecryptor { ) -> Result<(), EventCacheError> { trace!("Retrying to decrypt"); - // Load the relevant events from the event cache store and attempt to redecrypt - // things. - let events = { - // TODO: We can't load **all** events all the time. - let store = cache.inner.store.lock().await?; - let events = store.get_room_events(&room_key_info.room_id).await?; - - events - }; - - let only_utd_events = |event: TimelineEvent| { - // We need the event ID to be able to replace the event. - let event_id = event.event_id(); - // We only care about events fort his particular room key, identified by the - // session ID. - let session_id = event.kind.session_id(); - - if session_id == Some(&room_key_info.session_id) { - // Only pick out events that are UTDs. - let event = as_variant!(event.kind, TimelineEventKind::UnableToDecrypt { event, .. } => event); - event.zip(event_id) - } else { - None - } - }; - - // Get the cache for this particular room and lock the state for the duration of - // the decryption. - // TODO: Do we want to first decrypt and then do the locking and replacement, - // would be an additional loop but the lock would be free for the - // duration of the "slow" decryption attempts. - let (room_cache, _) = cache.for_room(&room_key_info.room_id).await?; - let mut state = room_cache.inner.state.write().await; - - for (event, event_id) in events.into_iter().filter_map(only_utd_events) { - // The event isn't in the cache, nothing to replace. Realistically this can't - // happen since we retrieved the list of events from the cache itself. - let Some((location, mut target_event)) = state.find_event(&event_id).await? else { - continue; - }; + let events = cache.get_utds(&room_key_info).await?; + let mut decrypted_events = Vec::with_capacity(events.len()); + for (event_id, event) in events { // If we managed to decrypt the event, and we should have to since we received // the room key for this specific event, then replace the event. if let Some(decrypted) = self.decrypt_event(&cache, &room_key_info.room_id, event.cast_ref_unchecked()).await { - target_event.kind = TimelineEventKind::Decrypted(decrypted); - - state.replace_event_at(location, target_event).await? + decrypted_events.push((event_id, decrypted)); } } + cache.on_resolved_utds(&room_key_info.room_id, decrypted_events).await?; + Ok(()) } @@ -166,5 +200,7 @@ impl Redecryptor { todo!("Redecrypt all visible events?") } } + + info!("Shutting down the event cache redecryptor"); } } From 912a499753f1e84dadc608249d07ee027e2aae19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Fri, 26 Sep 2025 14:21:28 +0200 Subject: [PATCH 12/28] fix: Emit event cache updates from the redecryptor --- .../matrix-sdk/src/event_cache/redecryptor.rs | 33 +++++++++++++++---- 1 file changed, 26 insertions(+), 7 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/redecryptor.rs b/crates/matrix-sdk/src/event_cache/redecryptor.rs index ccc3dd94b26..cb87137dc0c 100644 --- a/crates/matrix-sdk/src/event_cache/redecryptor.rs +++ b/crates/matrix-sdk/src/event_cache/redecryptor.rs @@ -32,7 +32,9 @@ use tracing::{info, instrument, trace, warn}; use crate::{ Client, - event_cache::{EventCache, EventCacheError, EventCacheInner}, + event_cache::{ + EventCache, EventCacheError, EventCacheInner, EventsOrigin, RoomEventCacheUpdate, + }, }; pub(crate) trait RedecryptorCtx { @@ -57,16 +59,18 @@ impl RedecryptorCtx for EventCache { &self, room_key_info: &RoomKeyInfo, ) -> Result)>, Self::Error> { - let only_utd_events = |event: TimelineEvent| { - // We need the event ID to be able to replace the event. + let filter_non_utds = |event: TimelineEvent| { let event_id = event.event_id(); // We only care about events fort his particular room key, identified by the // session ID. let session_id = event.kind.session_id(); if session_id == Some(&room_key_info.session_id) { - // Only pick out events that are UTDs. + // Only pick out events that are UTDs, get just the Raw event as this is what + // the OlmMachine needs. let event = as_variant!(event.kind, TimelineEventKind::UnableToDecrypt { event, .. } => event); + // Zip the event ID and event together so we don't have to pick out the event ID + // again. We need the event ID to replace the event in the cache. event_id.zip(event) } else { None @@ -80,9 +84,10 @@ impl RedecryptorCtx for EventCache { let store = self.inner.store.lock().await?; let events = store.get_room_events(&room_key_info.room_id).await?; - Ok(events.into_iter().filter_map(only_utd_events).collect()) + Ok(events.into_iter().filter_map(filter_non_utds).collect()) } + #[instrument(skip_all, fields(room_id))] async fn on_resolved_utds( &self, room_id: &RoomId, @@ -93,15 +98,29 @@ impl RedecryptorCtx for EventCache { let (room_cache, _) = self.for_room(room_id).await?; let mut state = room_cache.inner.state.write().await; + let event_ids: Vec<_> = events.iter().map(|(event_id, _)| event_id).collect(); + + trace!(?event_ids, "Replacing successfully re-decrypted events"); + for (event_id, decrypted) in events { // The event isn't in the cache, nothing to replace. Realistically this can't - // happen since we retrieved the list of events from the cache itself. + // happen since we retrieved the list of events from the cache itself and + // `find_event()` will look into the store as well. if let Some((location, mut target_event)) = state.find_event(&event_id).await? { target_event.kind = TimelineEventKind::Decrypted(decrypted); state.replace_event_at(location, target_event).await? } } + // We replaced a bunch of events, reactive updates for those replacements have + // been queued up. We need to send them out to our subscribers now. + let diffs = state.room_linked_chunk_mut().updates_as_vector_diffs(); + + let _ = room_cache.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents { + diffs, + origin: EventsOrigin::Cache, + }); + Ok(()) } } @@ -168,7 +187,7 @@ impl Redecryptor { match machine.decrypt_room_event(event, room_id, client.decryption_settings()).await { Ok(decrypted) => Some(decrypted), Err(e) => { - warn!("Failed to redecrypt an event {e:?}"); + warn!("Failed to redecrypt an event despite receiving a room key for it {e:?}"); None } } From 59d3724863fea735fe1c922d625d850470f3d60e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Mon, 29 Sep 2025 14:29:03 +0200 Subject: [PATCH 13/28] chore: Get rid of the RedecryptorCtx Doesn't really give us much as we'd like to mock everything for proper testing anyways. --- .../matrix-sdk/src/event_cache/redecryptor.rs | 23 +++---------------- 1 file changed, 3 insertions(+), 20 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/redecryptor.rs b/crates/matrix-sdk/src/event_cache/redecryptor.rs index cb87137dc0c..edadb5bcbac 100644 --- a/crates/matrix-sdk/src/event_cache/redecryptor.rs +++ b/crates/matrix-sdk/src/event_cache/redecryptor.rs @@ -37,28 +37,11 @@ use crate::{ }, }; -pub(crate) trait RedecryptorCtx { - type Error; - +impl EventCache { async fn get_utds( &self, room_key_info: &RoomKeyInfo, - ) -> Result)>, Self::Error>; - - async fn on_resolved_utds( - &self, - room_id: &RoomId, - events: Vec<(OwnedEventId, DecryptedRoomEvent)>, - ) -> Result<(), Self::Error>; -} - -impl RedecryptorCtx for EventCache { - type Error = EventCacheError; - - async fn get_utds( - &self, - room_key_info: &RoomKeyInfo, - ) -> Result)>, Self::Error> { + ) -> Result)>, EventCacheError> { let filter_non_utds = |event: TimelineEvent| { let event_id = event.event_id(); // We only care about events fort his particular room key, identified by the @@ -92,7 +75,7 @@ impl RedecryptorCtx for EventCache { &self, room_id: &RoomId, events: Vec<(OwnedEventId, DecryptedRoomEvent)>, - ) -> Result<(), Self::Error> { + ) -> Result<(), EventCacheError> { // Get the cache for this particular room and lock the state for the duration of // the decryption. let (room_cache, _) = self.for_room(room_id).await?; From e56db0587ab6c7af815823a3aa1a05bf35ad66b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Thu, 25 Sep 2025 13:47:01 +0200 Subject: [PATCH 14/28] enable the redecryptor in the event cache --- crates/matrix-sdk/src/event_cache/mod.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index 68fae9200e3..de9a4bdba74 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -69,6 +69,7 @@ use tracing::{Instrument as _, Span, debug, error, info, info_span, instrument, use crate::{ Client, client::WeakClient, + event_cache::redecryptor::Redecryptor, send_queue::{LocalEchoContent, RoomSendQueueUpdate, SendQueueUpdate}, }; @@ -149,6 +150,9 @@ pub struct EventCacheDropHandles { /// The task used to automatically shrink the linked chunks. auto_shrink_linked_chunk_task: JoinHandle<()>, + + /// The task used to automatically redecrypt UTDs. + redecryption_task: JoinHandle<()>, } impl fmt::Debug for EventCacheDropHandles { @@ -162,6 +166,7 @@ impl Drop for EventCacheDropHandles { self.listen_updates_task.abort(); self.ignore_user_list_update_task.abort(); self.auto_shrink_linked_chunk_task.abort(); + self.redecryption_task.abort(); } } @@ -258,10 +263,13 @@ impl EventCache { auto_shrink_receiver, )); + let redecryption_task = Redecryptor::new(client, Arc::downgrade(&self.inner)); + Arc::new(EventCacheDropHandles { listen_updates_task, ignore_user_list_update_task, auto_shrink_linked_chunk_task, + redecryption_task, }) }); From e5848d94cf3d4939e1b6067f66b1f8d76bd630b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Fri, 26 Sep 2025 14:21:28 +0200 Subject: [PATCH 15/28] test: Allow to specify the timeout for assert_recv_with_timeout --- crates/matrix-sdk/src/test_utils/mod.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/matrix-sdk/src/test_utils/mod.rs b/crates/matrix-sdk/src/test_utils/mod.rs index 9572d556909..7eaa6e58174 100644 --- a/crates/matrix-sdk/src/test_utils/mod.rs +++ b/crates/matrix-sdk/src/test_utils/mod.rs @@ -138,6 +138,10 @@ macro_rules! assert_next_with_timeout { /// milliseconds. #[macro_export] macro_rules! assert_recv_with_timeout { + ($receiver:expr) => { + $crate::assert_recv_with_timeout!($receiver, 1000) + }; + ($receiver:expr, $timeout_ms:expr) => {{ tokio::time::timeout(std::time::Duration::from_millis($timeout_ms), $receiver.recv()) .await From 9f1668592d8f8917c74e10230b7cbd798692188e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Fri, 26 Sep 2025 14:21:28 +0200 Subject: [PATCH 16/28] test(event cache): Add a test to show that the redecryptor works --- .../matrix-sdk/src/event_cache/redecryptor.rs | 174 ++++++++++++++++++ 1 file changed, 174 insertions(+) diff --git a/crates/matrix-sdk/src/event_cache/redecryptor.rs b/crates/matrix-sdk/src/event_cache/redecryptor.rs index edadb5bcbac..007f582626a 100644 --- a/crates/matrix-sdk/src/event_cache/redecryptor.rs +++ b/crates/matrix-sdk/src/event_cache/redecryptor.rs @@ -206,3 +206,177 @@ impl Redecryptor { info!("Shutting down the event cache redecryptor"); } } + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use assert_matches2::assert_matches; + use eyeball_im::VectorDiff; + use matrix_sdk_base::deserialized_responses::TimelineEventKind; + use matrix_sdk_test::{ + JoinedRoomBuilder, StateTestEvent, async_test, event_factory::EventFactory, + }; + use ruma::{device_id, event_id, room_id, user_id}; + use serde_json::json; + + use crate::{ + assert_let_timeout, encryption::EncryptionSettings, event_cache::RoomEventCacheUpdate, + test_utils::mocks::MatrixMockServer, + }; + + #[async_test] + async fn test_redecryptor() { + let room_id = room_id!("!test:localhost"); + + let alice_user_id = user_id!("@alice:localhost"); + let alice_device_id = device_id!("ALICEDEVICE"); + let bob_user_id = user_id!("@bob:localhost"); + let bob_device_id = device_id!("BOBDEVICE"); + + let matrix_mock_server = MatrixMockServer::new().await; + matrix_mock_server.mock_crypto_endpoints_preset().await; + + let encryption_settings = + EncryptionSettings { auto_enable_cross_signing: true, ..Default::default() }; + + // Create some clients for Alice and Bob. + + let alice = matrix_mock_server + .client_builder_for_crypto_end_to_end(alice_user_id, alice_device_id) + .on_builder(|builder| { + builder + .with_enable_share_history_on_invite(true) + .with_encryption_settings(encryption_settings) + }) + .build() + .await; + + let bob = matrix_mock_server + .client_builder_for_crypto_end_to_end(bob_user_id, bob_device_id) + .on_builder(|builder| { + builder + .with_enable_share_history_on_invite(true) + .with_encryption_settings(encryption_settings) + }) + .build() + .await; + + bob.event_cache().subscribe().expect("Bob should be able to enable the event cache"); + + // Ensure that Alice and Bob are aware of their devices and identities. + matrix_mock_server.exchange_e2ee_identities(&alice, &bob).await; + + let event_factory = EventFactory::new().room(room_id); + let alice_member_event = event_factory.member(alice_user_id).into_raw(); + let bob_member_event = event_factory.member(bob_user_id).into_raw(); + + // Let us now create a room for them. + let room_builder = JoinedRoomBuilder::new(room_id) + .add_state_event(StateTestEvent::Create) + .add_state_event(StateTestEvent::Encryption); + + matrix_mock_server + .mock_sync() + .ok_and_run(&alice, |builder| { + builder.add_joined_room(room_builder.clone()); + }) + .await; + + matrix_mock_server + .mock_sync() + .ok_and_run(&bob, |builder| { + builder.add_joined_room(room_builder); + }) + .await; + + let room = alice + .get_room(room_id) + .expect("Alice should have access to the room now that we synced"); + + // Alice will send a single event to the room, but this will trigger a to-device + // message containing the room key to be sent as well. We capture both the event + // and the to-device message. + + let event_type = "m.room.message"; + let content = json!({"body": "It's a secret to everybody", "msgtype": "m.text"}); + + let event_id = event_id!("$some_id"); + let (event_receiver, mock) = + matrix_mock_server.mock_room_send().ok_with_capture(event_id, alice_user_id); + let (_guard, room_key) = matrix_mock_server.mock_capture_put_to_device(alice_user_id).await; + + { + let _guard = mock.mock_once().mount_as_scoped().await; + + matrix_mock_server + .mock_get_members() + .ok(vec![alice_member_event.clone(), bob_member_event.clone()]) + .mock_once() + .mount() + .await; + + room.send_raw(event_type, content) + .await + .expect("We should be able to send an initial message"); + }; + + // Let's now see what Bob's event cache does. + + let (room_cache, _) = bob + .event_cache() + .for_room(room_id) + .await + .expect("We should be able to get to the event cache for a specific room"); + + let (_, mut subscriber) = room_cache.subscribe().await; + + // Let us retrieve the captured event and to-device message. + let event = event_receiver.await.expect("Alice should have sent the event by now"); + let room_key = room_key.await; + + // Let us forward the event to Bob. + matrix_mock_server + .mock_sync() + .ok_and_run(&bob, |builder| { + builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_timeline_event(event)); + }) + .await; + + // Alright, Bob has received an update from the cache. + + assert_let_timeout!( + Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = subscriber.recv() + ); + + // There should be a single new event, and it should be a UTD as we did not + // receive the room key yet. + assert_eq!(diffs.len(), 1); + assert_matches!(&diffs[0], VectorDiff::Append { values }); + assert_matches!(&values[0].kind, TimelineEventKind::UnableToDecrypt { .. }); + + // Now we send the room key to Bob. + matrix_mock_server + .mock_sync() + .ok_and_run(&bob, |builder| { + builder.add_to_device_event( + room_key + .deserialize_as() + .expect("We should be able to deserialize the room key"), + ); + }) + .await; + + // Bob should receive a new update from the cache. + assert_let_timeout!( + Duration::from_secs(1), + Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = subscriber.recv() + ); + + // It should replace the UTD with a decrypted event. + assert_eq!(diffs.len(), 1); + assert_matches!(&diffs[0], VectorDiff::Set { index, value }); + assert_eq!(*index, 0); + assert_matches!(&value.kind, TimelineEventKind::Decrypted { .. }); + } +} From 4ad7693dafea0b2b65ec074dd62541ad74962fcd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Wed, 1 Oct 2025 15:58:23 +0200 Subject: [PATCH 17/28] fixup: Move common methods out of the redecryptor into the cache --- .../matrix-sdk/src/event_cache/redecryptor.rs | 86 +++++++++---------- 1 file changed, 40 insertions(+), 46 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/redecryptor.rs b/crates/matrix-sdk/src/event_cache/redecryptor.rs index 007f582626a..b52315d9497 100644 --- a/crates/matrix-sdk/src/event_cache/redecryptor.rs +++ b/crates/matrix-sdk/src/event_cache/redecryptor.rs @@ -106,74 +106,68 @@ impl EventCache { Ok(()) } -} - -pub(crate) struct Redecryptor { - cache: Weak, -} - -impl Redecryptor { - pub fn new(client: Client, cache: Weak) -> JoinHandle<()> { - let redecryptor = Self { cache }; - let task = spawn(async { - let stream = { - let machine = client.olm_machine().await; - machine.as_ref().unwrap().store().room_keys_received_stream() - }; - - drop(client); - - redecryptor.listen_for_room_keys_task(stream).await; - }); + async fn decrypt_event( + &self, + room_id: &RoomId, + event: &Raw, + ) -> Option { + let client = self.inner.client().ok()?; + let machine = client.olm_machine().await; + let machine = machine.as_ref()?; - task + match machine.decrypt_room_event(event, room_id, client.decryption_settings()).await { + Ok(decrypted) => Some(decrypted), + Err(e) => { + warn!("Failed to redecrypt an event despite receiving a room key for it {e:?}"); + None + } + } } /// Attempt to redecrypt events after a room key with the given session ID /// has been received. #[instrument(skip_all, fields(room_key_info))] - async fn retry_decryption( - &self, - cache: &EventCache, - room_key_info: RoomKeyInfo, - ) -> Result<(), EventCacheError> { + async fn retry_decryption(&self, room_key_info: RoomKeyInfo) -> Result<(), EventCacheError> { trace!("Retrying to decrypt"); - let events = cache.get_utds(&room_key_info).await?; + let events = self.get_utds(&room_key_info).await?; let mut decrypted_events = Vec::with_capacity(events.len()); for (event_id, event) in events { // If we managed to decrypt the event, and we should have to since we received // the room key for this specific event, then replace the event. if let Some(decrypted) = - self.decrypt_event(&cache, &room_key_info.room_id, event.cast_ref_unchecked()).await + self.decrypt_event(&room_key_info.room_id, event.cast_ref_unchecked()).await { decrypted_events.push((event_id, decrypted)); } } - cache.on_resolved_utds(&room_key_info.room_id, decrypted_events).await?; + self.on_resolved_utds(&room_key_info.room_id, decrypted_events).await?; Ok(()) } - async fn decrypt_event( - &self, - cache: &EventCache, - room_id: &RoomId, - event: &Raw, - ) -> Option { - let client = cache.inner.client().ok()?; - let machine = client.olm_machine().await; - let machine = machine.as_ref()?; +pub(crate) struct Redecryptor { + cache: Weak, +} - match machine.decrypt_room_event(event, room_id, client.decryption_settings()).await { - Ok(decrypted) => Some(decrypted), - Err(e) => { - warn!("Failed to redecrypt an event despite receiving a room key for it {e:?}"); - None - } - } +impl Redecryptor { + pub fn new(client: Client, cache: Weak) -> JoinHandle<()> { + let redecryptor = Self { cache }; + + let task = spawn(async { + let stream = { + let machine = client.olm_machine().await; + machine.as_ref().unwrap().store().room_keys_received_stream() + }; + + drop(client); + + redecryptor.listen_for_room_keys_task(stream).await; + }); + + task } async fn listen_for_room_keys_task( @@ -193,8 +187,8 @@ impl Redecryptor { let cache = EventCache { inner: event_cache }; for key in room_keys { - let _ = self - .retry_decryption(&cache, key) + let _ = cache + .retry_decryption(key) .await .inspect_err(|e| warn!("Error redecrypting {e:?}")); } From 056606122feb1cd5e550845a91301cb93b46329e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Wed, 1 Oct 2025 17:21:30 +0200 Subject: [PATCH 18/28] fixup Make the redecryptor return Self --- .../matrix-sdk/src/event_cache/redecryptor.rs | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/redecryptor.rs b/crates/matrix-sdk/src/event_cache/redecryptor.rs index b52315d9497..7f1ccb8c998 100644 --- a/crates/matrix-sdk/src/event_cache/redecryptor.rs +++ b/crates/matrix-sdk/src/event_cache/redecryptor.rs @@ -124,6 +124,7 @@ impl EventCache { } } } + /// Attempt to redecrypt events after a room key with the given session ID /// has been received. #[instrument(skip_all, fields(room_key_info))] @@ -147,15 +148,20 @@ impl EventCache { Ok(()) } +} pub(crate) struct Redecryptor { - cache: Weak, + task: JoinHandle<()>, } -impl Redecryptor { - pub fn new(client: Client, cache: Weak) -> JoinHandle<()> { - let redecryptor = Self { cache }; +impl Drop for Redecryptor { + fn drop(&mut self) { + self.task.abort(); + } +} +impl Redecryptor { + pub fn new(client: Client, cache: Weak) -> Self { let task = spawn(async { let stream = { let machine = client.olm_machine().await; @@ -164,14 +170,14 @@ impl Redecryptor { drop(client); - redecryptor.listen_for_room_keys_task(stream).await; + Self::listen_for_room_keys_task(cache, stream).await; }); - task + Self { task } } async fn listen_for_room_keys_task( - self, + cache: Weak, received_stream: impl Stream, BroadcastStreamRecvError>>, ) { pin_mut!(received_stream); @@ -180,7 +186,7 @@ impl Redecryptor { // lock reloading the Olm machine. while let Some(update) = received_stream.next().await { if let Ok(room_keys) = update { - let Some(event_cache) = self.cache.upgrade() else { + let Some(event_cache) = cache.upgrade() else { break; }; From 8d8d588eaea546af1759c4810b6c56b8c9f469f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Wed, 1 Oct 2025 17:41:28 +0200 Subject: [PATCH 19/28] fixup create a redecryptor in event cache --- crates/matrix-sdk/src/event_cache/mod.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index de9a4bdba74..e372ba914a8 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -152,7 +152,7 @@ pub struct EventCacheDropHandles { auto_shrink_linked_chunk_task: JoinHandle<()>, /// The task used to automatically redecrypt UTDs. - redecryption_task: JoinHandle<()>, + redecryptor: Redecryptor, } impl fmt::Debug for EventCacheDropHandles { @@ -166,7 +166,6 @@ impl Drop for EventCacheDropHandles { self.listen_updates_task.abort(); self.ignore_user_list_update_task.abort(); self.auto_shrink_linked_chunk_task.abort(); - self.redecryption_task.abort(); } } @@ -263,13 +262,13 @@ impl EventCache { auto_shrink_receiver, )); - let redecryption_task = Redecryptor::new(client, Arc::downgrade(&self.inner)); + let redecryptor = Redecryptor::new(client, Arc::downgrade(&self.inner)); Arc::new(EventCacheDropHandles { listen_updates_task, ignore_user_list_update_task, auto_shrink_linked_chunk_task, - redecryption_task, + redecryptor, }) }); From e2ab634f57c7e95199d539455490b058034eb511 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Wed, 1 Oct 2025 17:42:02 +0200 Subject: [PATCH 20/28] fixup redecryptor --- crates/matrix-sdk/src/event_cache/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index e372ba914a8..24631ca9e87 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -262,7 +262,7 @@ impl EventCache { auto_shrink_receiver, )); - let redecryptor = Redecryptor::new(client, Arc::downgrade(&self.inner)); + let redecryptor = Redecryptor::new(Arc::downgrade(&self.inner)); Arc::new(EventCacheDropHandles { listen_updates_task, From a62fea4c16a3cff07d5ec6d59afff1dc1b7b640d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Thu, 2 Oct 2025 13:53:37 +0200 Subject: [PATCH 21/28] Rejigger things so we can relisten to the room key stream --- .../matrix-sdk/src/event_cache/redecryptor.rs | 159 ++++++++++++------ 1 file changed, 107 insertions(+), 52 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/redecryptor.rs b/crates/matrix-sdk/src/event_cache/redecryptor.rs index 7f1ccb8c998..06ae311670c 100644 --- a/crates/matrix-sdk/src/event_cache/redecryptor.rs +++ b/crates/matrix-sdk/src/event_cache/redecryptor.rs @@ -15,7 +15,7 @@ //! The REDECRYPTOR is a layer that handles redecryption of events in case we //! couldn't decrypt them imediatelly -use std::sync::Weak; +use std::{collections::BTreeSet, pin::Pin, sync::Weak}; use as_variant::as_variant; use futures_core::Stream; @@ -25,30 +25,36 @@ use matrix_sdk_base::{ deserialized_responses::{DecryptedRoomEvent, TimelineEvent, TimelineEventKind}, }; use matrix_sdk_common::executor::spawn; -use ruma::{OwnedEventId, RoomId, events::AnySyncTimelineEvent, serde::Raw}; -use tokio::task::JoinHandle; -use tokio_stream::wrappers::errors::BroadcastStreamRecvError; -use tracing::{info, instrument, trace, warn}; - -use crate::{ - Client, - event_cache::{ - EventCache, EventCacheError, EventCacheInner, EventsOrigin, RoomEventCacheUpdate, - }, +use ruma::{OwnedEventId, OwnedRoomId, RoomId, events::AnySyncTimelineEvent, serde::Raw}; +use tokio::{sync::mpsc::UnboundedSender, task::JoinHandle}; +use tokio_stream::wrappers::{UnboundedReceiverStream, errors::BroadcastStreamRecvError}; +use tracing::{instrument, trace, warn}; + +use crate::event_cache::{ + EventCache, EventCacheError, EventCacheInner, EventsOrigin, RoomEventCacheUpdate, }; +/// The information sent across the channel to the long-running task requesting +/// that the supplied set of sessions be retried. +pub struct DecryptionRetryRequest { + room_id: OwnedRoomId, + session_ids: BTreeSet, +} + +type SessionId<'a> = &'a str; + impl EventCache { async fn get_utds( &self, - room_key_info: &RoomKeyInfo, + room_id: &RoomId, + session_id: SessionId<'_>, ) -> Result)>, EventCacheError> { let filter_non_utds = |event: TimelineEvent| { let event_id = event.event_id(); + // We only care about events fort his particular room key, identified by the // session ID. - let session_id = event.kind.session_id(); - - if session_id == Some(&room_key_info.session_id) { + if event.kind.session_id() == Some(session_id) { // Only pick out events that are UTDs, get just the Raw event as this is what // the OlmMachine needs. let event = as_variant!(event.kind, TimelineEventKind::UnableToDecrypt { event, .. } => event); @@ -65,7 +71,7 @@ impl EventCache { // // TODO: We can't load **all** events all the time. let store = self.inner.store.lock().await?; - let events = store.get_room_events(&room_key_info.room_id).await?; + let events = store.get_room_events(&room_id).await?; Ok(events.into_iter().filter_map(filter_non_utds).collect()) } @@ -128,29 +134,32 @@ impl EventCache { /// Attempt to redecrypt events after a room key with the given session ID /// has been received. #[instrument(skip_all, fields(room_key_info))] - async fn retry_decryption(&self, room_key_info: RoomKeyInfo) -> Result<(), EventCacheError> { + async fn retry_decryption( + &self, + room_id: &RoomId, + session_id: SessionId<'_>, + ) -> Result<(), EventCacheError> { trace!("Retrying to decrypt"); - let events = self.get_utds(&room_key_info).await?; + let events = self.get_utds(room_id, session_id).await?; let mut decrypted_events = Vec::with_capacity(events.len()); for (event_id, event) in events { // If we managed to decrypt the event, and we should have to since we received // the room key for this specific event, then replace the event. - if let Some(decrypted) = - self.decrypt_event(&room_key_info.room_id, event.cast_ref_unchecked()).await - { + if let Some(decrypted) = self.decrypt_event(room_id, event.cast_ref_unchecked()).await { decrypted_events.push((event_id, decrypted)); } } - self.on_resolved_utds(&room_key_info.room_id, decrypted_events).await?; + self.on_resolved_utds(room_id, decrypted_events).await?; Ok(()) } } pub(crate) struct Redecryptor { + request_decryption_sender: UnboundedSender, task: JoinHandle<()>, } @@ -161,49 +170,95 @@ impl Drop for Redecryptor { } impl Redecryptor { - pub fn new(client: Client, cache: Weak) -> Self { + pub(super) fn new(cache: Weak) -> Self { + let (request_decryption_sender, receiver) = tokio::sync::mpsc::unbounded_channel(); let task = spawn(async { - let stream = { - let machine = client.olm_machine().await; - machine.as_ref().unwrap().store().room_keys_received_stream() - }; + let request_redecryption_stream = UnboundedReceiverStream::new(receiver); - drop(client); + Self::listen_for_room_keys_task(cache, request_redecryption_stream).await; + }); - Self::listen_for_room_keys_task(cache, stream).await; + Self { task, request_decryption_sender } + } + + #[allow(dead_code)] + pub(super) fn request_decryption(&self, request: DecryptionRetryRequest) { + let _ = self.request_decryption_sender.send(request).inspect_err(|_| { + warn!("Requesting a decryption while the redecryption task has been shut down") }); + } - Self { task } + async fn subscribe_to_room_key_stream( + cache: &Weak, + ) -> Option, BroadcastStreamRecvError>>> { + let event_cache = cache.upgrade()?; + let client = event_cache.client().ok()?; + let machine = client.olm_machine().await; + + machine.as_ref().map(|m| m.store().room_keys_received_stream()) + } + + async fn listen_loop( + cache: &Weak, + decryption_request_stream: &mut Pin<&mut impl Stream>, + ) -> bool { + let Some(room_key_stream) = Self::subscribe_to_room_key_stream(cache).await else { + return false; + }; + + pin_mut!(room_key_stream); + + // TODO: Listen to notifications that the Olm machine got recreated, this means + // that our room key stream is effectively dead, we need to exit this + // function with a `true` return value. + loop { + tokio::select! { + Some(request) = decryption_request_stream.next() => { + let Some(event_cache) = cache.upgrade() else { + break false; + }; + + let cache = EventCache { inner: event_cache }; + + for session_id in request.session_ids { + let _ = cache + .retry_decryption(&request.room_id, &session_id) + .await + .inspect_err(|e| warn!("Error redecrypting {e:?}")); + } + } + Some(room_keys) = room_key_stream.next() => { + if let Ok(room_keys) = room_keys { + let Some(event_cache) = cache.upgrade() else { + break false; + }; + + let cache = EventCache { inner: event_cache }; + + for key in room_keys { + let _ = cache + .retry_decryption(&key.room_id, &key.session_id) + .await + .inspect_err(|e| warn!("Error redecrypting {e:?}")); + } + } else { + todo!("Decrypt all events?"); + } + } + else => break false, + } + } } async fn listen_for_room_keys_task( cache: Weak, - received_stream: impl Stream, BroadcastStreamRecvError>>, + decryption_request_stream: UnboundedReceiverStream, ) { - pin_mut!(received_stream); + pin_mut!(decryption_request_stream); // TODO: We need to relisten to this stream if it dies due to the cross-process // lock reloading the Olm machine. - while let Some(update) = received_stream.next().await { - if let Ok(room_keys) = update { - let Some(event_cache) = cache.upgrade() else { - break; - }; - - let cache = EventCache { inner: event_cache }; - - for key in room_keys { - let _ = cache - .retry_decryption(key) - .await - .inspect_err(|e| warn!("Error redecrypting {e:?}")); - } - } else { - todo!("Redecrypt all visible events?") - } - } - - info!("Shutting down the event cache redecryptor"); + while Self::listen_loop(&cache, &mut decryption_request_stream).await {} } } From e4bf6009cc375155dccd3d248638f804cca6e369 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Thu, 2 Oct 2025 13:54:21 +0200 Subject: [PATCH 22/28] refactor: Remove the decryption logic from the timeline --- .../controller/decryption_retry_task.rs | 73 ++++++++++--------- 1 file changed, 37 insertions(+), 36 deletions(-) diff --git a/crates/matrix-sdk-ui/src/timeline/controller/decryption_retry_task.rs b/crates/matrix-sdk-ui/src/timeline/controller/decryption_retry_task.rs index bbf9c8c3d02..37cc8807d2c 100644 --- a/crates/matrix-sdk-ui/src/timeline/controller/decryption_retry_task.rs +++ b/crates/matrix-sdk-ui/src/timeline/controller/decryption_retry_task.rs @@ -300,42 +300,43 @@ async fn decryption_task( ) { debug!("Decryption task starting."); - while let Some(request) = receiver.recv().await { - let should_retry = |session_id: &str| { - if let Some(session_ids) = &request.session_ids { - session_ids.contains(session_id) - } else { - true - } - }; - - // Find the indices of events that are in the supplied sessions, distinguishing - // between UTDs which we need to decrypt, and already-decrypted events where we - // only need to re-fetch encryption info. - let mut state = state.write().await; - let (retry_decryption_indices, retry_info_indices) = - compute_event_indices_to_retry_decryption(&state.items, should_retry); - - // Retry fetching encryption info for events that are already decrypted - if !retry_info_indices.is_empty() { - debug!("Retrying fetching encryption info"); - retry_fetch_encryption_info(&mut state, retry_info_indices, &room_data_provider).await; - } - - // Retry decrypting any unable-to-decrypt messages - if !retry_decryption_indices.is_empty() { - debug!("Retrying decryption"); - decrypt_by_index( - &mut state, - &request.settings, - &room_data_provider, - request.decryptor, - should_retry, - retry_decryption_indices, - ) - .await - } - } + // while let Some(request) = receiver.recv().await { + // let should_retry = |session_id: &str| { + // if let Some(session_ids) = &request.session_ids { + // session_ids.contains(session_id) + // } else { + // true + // } + // }; + // + // // Find the indices of events that are in the supplied sessions, + // distinguishing // between UTDs which we need to decrypt, and + // already-decrypted events where we // only need to re-fetch encryption + // info. let mut state = state.write().await; + // let (retry_decryption_indices, retry_info_indices) = + // compute_event_indices_to_retry_decryption(&state.items, + // should_retry); + // + // // Retry fetching encryption info for events that are already decrypted + // if !retry_info_indices.is_empty() { + // debug!("Retrying fetching encryption info"); + // retry_fetch_encryption_info(&mut state, retry_info_indices, + // &room_data_provider).await; } + // + // // Retry decrypting any unable-to-decrypt messages + // if !retry_decryption_indices.is_empty() { + // debug!("Retrying decryption"); + // decrypt_by_index( + // &mut state, + // &request.settings, + // &room_data_provider, + // request.decryptor, + // should_retry, + // retry_decryption_indices, + // ) + // .await + // } + // } debug!("Decryption task stopping."); } From f70dbc56e286df54157ddd8117bab382bc11b9b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Thu, 2 Oct 2025 14:10:48 +0200 Subject: [PATCH 23/28] fixup redecryptor --- .../matrix-sdk/src/event_cache/redecryptor.rs | 66 +++++++++++-------- 1 file changed, 40 insertions(+), 26 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/redecryptor.rs b/crates/matrix-sdk/src/event_cache/redecryptor.rs index 06ae311670c..5254bfd15ba 100644 --- a/crates/matrix-sdk/src/event_cache/redecryptor.rs +++ b/crates/matrix-sdk/src/event_cache/redecryptor.rs @@ -28,7 +28,7 @@ use matrix_sdk_common::executor::spawn; use ruma::{OwnedEventId, OwnedRoomId, RoomId, events::AnySyncTimelineEvent, serde::Raw}; use tokio::{sync::mpsc::UnboundedSender, task::JoinHandle}; use tokio_stream::wrappers::{UnboundedReceiverStream, errors::BroadcastStreamRecvError}; -use tracing::{instrument, trace, warn}; +use tracing::{info, instrument, trace, warn}; use crate::event_cache::{ EventCache, EventCacheError, EventCacheInner, EventsOrigin, RoomEventCacheUpdate, @@ -172,6 +172,7 @@ impl Drop for Redecryptor { impl Redecryptor { pub(super) fn new(cache: Weak) -> Self { let (request_decryption_sender, receiver) = tokio::sync::mpsc::unbounded_channel(); + let task = spawn(async { let request_redecryption_stream = UnboundedReceiverStream::new(receiver); @@ -198,7 +199,11 @@ impl Redecryptor { machine.as_ref().map(|m| m.store().room_keys_received_stream()) } - async fn listen_loop( + fn upgrade_event_cache(cache: &Weak) -> Option { + cache.upgrade().map(|inner| EventCache { inner }) + } + + async fn redecryption_loop( cache: &Weak, decryption_request_stream: &mut Pin<&mut impl Stream>, ) -> bool { @@ -208,18 +213,13 @@ impl Redecryptor { pin_mut!(room_key_stream); - // TODO: Listen to notifications that the Olm machine got recreated, this means - // that our room key stream is effectively dead, we need to exit this - // function with a `true` return value. loop { tokio::select! { Some(request) = decryption_request_stream.next() => { - let Some(event_cache) = cache.upgrade() else { + let Some(cache) = Self::upgrade_event_cache(cache) else { break false; }; - let cache = EventCache { inner: event_cache }; - for session_id in request.session_ids { let _ = cache .retry_decryption(&request.room_id, &session_id) @@ -227,22 +227,26 @@ impl Redecryptor { .inspect_err(|e| warn!("Error redecrypting {e:?}")); } } - Some(room_keys) = room_key_stream.next() => { - if let Ok(room_keys) = room_keys { - let Some(event_cache) = cache.upgrade() else { - break false; - }; - - let cache = EventCache { inner: event_cache }; - - for key in room_keys { - let _ = cache - .retry_decryption(&key.room_id, &key.session_id) - .await - .inspect_err(|e| warn!("Error redecrypting {e:?}")); + room_keys = room_key_stream.next() => { + match room_keys { + Some(Ok(room_keys)) => { + let Some(cache) = Self::upgrade_event_cache(cache) else { + break false; + }; + + for key in room_keys { + let _ = cache + .retry_decryption(&key.room_id, &key.session_id) + .await + .inspect_err(|e| warn!("Error redecrypting {e:?}")); + } + }, + Some(Err(_)) => { + todo!("Handle lagging here, how?") + }, + None => { + break true } - } else { - todo!("Decrypt all events?"); } } else => break false, @@ -256,9 +260,11 @@ impl Redecryptor { ) { pin_mut!(decryption_request_stream); - // TODO: We need to relisten to this stream if it dies due to the cross-process - // lock reloading the Olm machine. - while Self::listen_loop(&cache, &mut decryption_request_stream).await {} + while Self::redecryption_loop(&cache, &mut decryption_request_stream).await { + info!("Regenerating the re-decryption streams") + } + + info!("Shutting down the event cache redecryptor"); } } @@ -390,6 +396,14 @@ mod tests { let event = event_receiver.await.expect("Alice should have sent the event by now"); let room_key = room_key.await; + // We regenerate the Olm machine to check if the room key stream is recreated to + // correctly. + bob.inner + .base_client + .regenerate_olm(None) + .await + .expect("We should be able to regenerate the Olm machine"); + // Let us forward the event to Bob. matrix_mock_server .mock_sync() From 2cfd85f50fda6ddd2ab8c9b98d045f2d6f4f5097 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Thu, 2 Oct 2025 14:10:48 +0200 Subject: [PATCH 24/28] doc(event cache): Document the redecryptor --- .../matrix-sdk/src/event_cache/redecryptor.rs | 97 ++++++++++++++++++- 1 file changed, 95 insertions(+), 2 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/redecryptor.rs b/crates/matrix-sdk/src/event_cache/redecryptor.rs index 5254bfd15ba..276f066060b 100644 --- a/crates/matrix-sdk/src/event_cache/redecryptor.rs +++ b/crates/matrix-sdk/src/event_cache/redecryptor.rs @@ -12,14 +12,62 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! The REDECRYPTOR is a layer that handles redecryption of events in case we -//! couldn't decrypt them imediatelly +//! The Redecryptor (Rd) is a layer and long-running background task which +//! handles redecryption of events in case we couldn't decrypt them imediatelly. +//! +//! Rd listens to the OlmMachine for received room keys. If a new room key has +//! been received it attempts to find any UTDs in the [`EventCache`]. If Rd +//! decrypts any UTDs from the event cache it will replace the events in the +//! cache and send out new [`RoomEventCacheUpdates`] to any of its listeners. +//! +//! There's an additional gotcha, the [`OlmMachine`] might get recreated by +//! calls to [`BaseClient::regenerate_olm()`]. When this happens we will receive +//! a `None` on the room keys stream and we need to re-listen to it. +//! +//! Another gotcha is that room keys might be received on another process if the +//! Client is operating on a iOS device. A separate process is used in this case +//! to receive push notifications. In this case the room key will be received +//! and Rd won't get notified about it. To work around this decryption requests +//! can be explicitly sent to Rd. +//! +//! ┌─────────────┐ +//! │ │ +//! ┌───────────┤ Timeline │ +//! │ │ │ +//! │ └─────▲───────┘ +//! │ │ +//! │ │ +//! │ │ +//! Decryption │ +//! request │ +//! │ RoomEventCacheUpdates +//! │ │ +//! │ │ +//! │ ┌──────────┴────────────┐ +//! │ │ │ +//! └──────► Redecryptor │ +//! │ │ +//! └───────────▲───────────┘ +//! │ +//! │ +//! │ +//! Received room keys stream +//! │ +//! │ +//! │ +//! ┌───────┴──────┐ +//! │ │ +//! │ OlmMachine │ +//! │ │ +//! └──────────────┘ use std::{collections::BTreeSet, pin::Pin, sync::Weak}; use as_variant::as_variant; use futures_core::Stream; use futures_util::{StreamExt, pin_mut}; +#[cfg(doc)] +use matrix_sdk_base::{BaseClient, crypto::OlmMachine}; use matrix_sdk_base::{ crypto::{store::types::RoomKeyInfo, types::events::room::encrypted::EncryptedEvent}, deserialized_responses::{DecryptedRoomEvent, TimelineEvent, TimelineEventKind}, @@ -44,6 +92,13 @@ pub struct DecryptionRetryRequest { type SessionId<'a> = &'a str; impl EventCache { + /// Retrieve a set of events that we weren't able to decrypt. + /// + /// # Arguments + /// + /// * `room_id` - The ID of the room where the events were sent to. + /// * `session_id` - The unique ID of the room key that was used to encrypt + /// the event. async fn get_utds( &self, room_id: &RoomId, @@ -76,6 +131,17 @@ impl EventCache { Ok(events.into_iter().filter_map(filter_non_utds).collect()) } + /// Handle a chunk of events that we were previously unable to decrypt but + /// have now successfully decrypted. + /// + /// This function will replace the existing UTD events in memory and the + /// store and send out a [`RoomEventCacheUpdate`] for the newly + /// decrypted events. + /// + /// # Arguments + /// + /// * `room_id` - The ID of the room where the events were sent to. + /// * `events` - A chunk of events that were successfully decrypted. #[instrument(skip_all, fields(room_id))] async fn on_resolved_utds( &self, @@ -113,12 +179,17 @@ impl EventCache { Ok(()) } + /// Attempt to decrypt a single event. async fn decrypt_event( &self, room_id: &RoomId, event: &Raw, ) -> Option { let client = self.inner.client().ok()?; + // TODO: Do we need to use the `Room` object to decrypt these events so we can + // calculate if the event should count as a notification, i.e. get the push + // actions. I thing we do, what happens if the room can't be found? We fallback + // to this? let machine = client.olm_machine().await; let machine = machine.as_ref()?; @@ -141,7 +212,10 @@ impl EventCache { ) -> Result<(), EventCacheError> { trace!("Retrying to decrypt"); + // Get all the relevant UTDs. let events = self.get_utds(room_id, session_id).await?; + + // Let's attempt to decrypt them them. let mut decrypted_events = Vec::with_capacity(events.len()); for (event_id, event) in events { @@ -152,6 +226,8 @@ impl EventCache { } } + // Replace the events and notify listeners that UTDs have been replaced with + // decrypted events. self.on_resolved_utds(room_id, decrypted_events).await?; Ok(()) @@ -170,6 +246,10 @@ impl Drop for Redecryptor { } impl Redecryptor { + /// Create a new [`Redecryptor`]. + /// + /// This creates a task that listens to various streams and attempts to + /// redecrypt UTDs that can be found inside the [`EventCache`]. pub(super) fn new(cache: Weak) -> Self { let (request_decryption_sender, receiver) = tokio::sync::mpsc::unbounded_channel(); @@ -189,6 +269,10 @@ impl Redecryptor { }); } + /// (Re)-subscribe to the room key stream from the [`OlmMachine`]. + /// + /// This needs to happen any time this stream returns a `None` meaning that + /// the sending part of the stream has been dropped. async fn subscribe_to_room_key_stream( cache: &Weak, ) -> Option, BroadcastStreamRecvError>>> { @@ -215,6 +299,8 @@ impl Redecryptor { loop { tokio::select! { + // An explicit request, presumably from the timeline, has been received to decrypt + // events that were encrypted with a certain room key. Some(request) = decryption_request_stream.next() => { let Some(cache) = Self::upgrade_event_cache(cache) else { break false; @@ -227,6 +313,8 @@ impl Redecryptor { .inspect_err(|e| warn!("Error redecrypting {e:?}")); } } + // The room key stream from the OlmMachine. Needs to be recreated every time we + // receive a `None` from the stream. room_keys = room_key_stream.next() => { match room_keys { Some(Ok(room_keys)) => { @@ -244,6 +332,8 @@ impl Redecryptor { Some(Err(_)) => { todo!("Handle lagging here, how?") }, + // The stream got closed, this could mean that our OlmMachine got + // regenerated, let's return true and try to recreate the stream. None => { break true } @@ -258,6 +348,9 @@ impl Redecryptor { cache: Weak, decryption_request_stream: UnboundedReceiverStream, ) { + // We pin the decryption request stream here since that one doesn't need to be + // recreated and we don't want to miss messages coming from the stream + // while recreating it unnecessarily. pin_mut!(decryption_request_stream); while Self::redecryption_loop(&cache, &mut decryption_request_stream).await { From 82c78659fd83c77b8e61ff2feb2ca81b3c6df795 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Tue, 7 Oct 2025 16:08:16 +0200 Subject: [PATCH 25/28] doc: Document that the redecryptor sends reports out --- .../matrix-sdk/src/event_cache/redecryptor.rs | 29 ++++++++++--------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/redecryptor.rs b/crates/matrix-sdk/src/event_cache/redecryptor.rs index 276f066060b..0c3a22128d9 100644 --- a/crates/matrix-sdk/src/event_cache/redecryptor.rs +++ b/crates/matrix-sdk/src/event_cache/redecryptor.rs @@ -30,22 +30,23 @@ //! and Rd won't get notified about it. To work around this decryption requests //! can be explicitly sent to Rd. //! +//! //! ┌─────────────┐ //! │ │ -//! ┌───────────┤ Timeline │ -//! │ │ │ -//! │ └─────▲───────┘ -//! │ │ -//! │ │ -//! │ │ -//! Decryption │ -//! request │ -//! │ RoomEventCacheUpdates -//! │ │ -//! │ │ -//! │ ┌──────────┴────────────┐ -//! │ │ │ -//! └──────► Redecryptor │ +//! ┌───────────┤ Timeline │◄────────────┐ +//! │ │ │ │ +//! │ └─────▲───────┘ │ +//! │ │ │ +//! │ │ │ +//! │ │ │ +//! Decryption │ Redecryptor +//! request │ report +//! │ RoomEventCacheUpdates │ +//! │ │ │ +//! │ │ │ +//! │ ┌──────────┴────────────┐ │ +//! │ │ │ │ +//! └──────► Redecryptor │────────┘ //! │ │ //! └───────────▲───────────┘ //! │ From e8c882f916f6bf9c356a48acc1573d5cf966deea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Tue, 7 Oct 2025 16:08:16 +0200 Subject: [PATCH 26/28] feat: Redecryptor start to send out redecryptor reports --- crates/matrix-sdk/src/event_cache/mod.rs | 31 +++- .../matrix-sdk/src/event_cache/redecryptor.rs | 139 +++++++++++++++--- 2 files changed, 145 insertions(+), 25 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index 24631ca9e87..de8991303a0 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -69,16 +69,18 @@ use tracing::{Instrument as _, Span, debug, error, info, info_span, instrument, use crate::{ Client, client::WeakClient, - event_cache::redecryptor::Redecryptor, send_queue::{LocalEchoContent, RoomSendQueueUpdate, SendQueueUpdate}, }; mod deduplicator; mod pagination; +#[cfg(feature = "e2e-encryption")] mod redecryptor; mod room; pub use pagination::{RoomPagination, RoomPaginationStatus}; +#[cfg(feature = "e2e-encryption")] +pub use redecryptor::{DecryptionRetryRequest, RedecryptorReport}; pub use room::{RoomEventCache, RoomEventCacheSubscriber, ThreadEventCacheUpdate}; /// An error observed in the [`EventCache`]. @@ -152,7 +154,8 @@ pub struct EventCacheDropHandles { auto_shrink_linked_chunk_task: JoinHandle<()>, /// The task used to automatically redecrypt UTDs. - redecryptor: Redecryptor, + #[cfg(feature = "e2e-encryption")] + _redecryptor: redecryptor::Redecryptor, } impl fmt::Debug for EventCacheDropHandles { @@ -205,6 +208,9 @@ impl EventCache { linked_chunk_update_sender.clone(), ))); + #[cfg(feature = "e2e-encryption")] + let redecryption_channels = redecryptor::RedecryptorChannels::new(); + Self { inner: Arc::new(EventCacheInner { client, @@ -218,6 +224,8 @@ impl EventCache { _thread_subscriber_task: thread_subscriber_task, #[cfg(feature = "experimental-search")] _search_indexing_task: search_indexing_task, + #[cfg(feature = "e2e-encryption")] + redecryption_channels, thread_subscriber_receiver, }), } @@ -262,13 +270,25 @@ impl EventCache { auto_shrink_receiver, )); - let redecryptor = Redecryptor::new(Arc::downgrade(&self.inner)); + #[cfg(feature = "e2e-encryption")] + let redecryptor = { + let receiver = self + .inner + .redecryption_channels + .decryption_request_receiver + .lock() + .take() + .expect("We should have initialized the channel an subscribing should happen only once"); + + redecryptor::Redecryptor::new(Arc::downgrade(&self.inner), receiver) + }; Arc::new(EventCacheDropHandles { listen_updates_task, ignore_user_list_update_task, auto_shrink_linked_chunk_task, - redecryptor, + #[cfg(feature = "e2e-encryption")] + _redecryptor: redecryptor, }) }); @@ -848,6 +868,9 @@ struct EventCacheInner { /// This is helpful for tests to coordinate that a new thread subscription /// has been sent or not. thread_subscriber_receiver: Receiver<()>, + + #[cfg(feature = "e2e-encryption")] + redecryption_channels: redecryptor::RedecryptorChannels, } type AutoShrinkChannelPayload = OwnedRoomId; diff --git a/crates/matrix-sdk/src/event_cache/redecryptor.rs b/crates/matrix-sdk/src/event_cache/redecryptor.rs index 0c3a22128d9..b9f62916755 100644 --- a/crates/matrix-sdk/src/event_cache/redecryptor.rs +++ b/crates/matrix-sdk/src/event_cache/redecryptor.rs @@ -72,25 +72,76 @@ use matrix_sdk_base::{BaseClient, crypto::OlmMachine}; use matrix_sdk_base::{ crypto::{store::types::RoomKeyInfo, types::events::room::encrypted::EncryptedEvent}, deserialized_responses::{DecryptedRoomEvent, TimelineEvent, TimelineEventKind}, + locks::Mutex, }; use matrix_sdk_common::executor::spawn; use ruma::{OwnedEventId, OwnedRoomId, RoomId, events::AnySyncTimelineEvent, serde::Raw}; -use tokio::{sync::mpsc::UnboundedSender, task::JoinHandle}; -use tokio_stream::wrappers::{UnboundedReceiverStream, errors::BroadcastStreamRecvError}; +use tokio::{ + sync::{ + broadcast, + mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}, + }, + task::JoinHandle, +}; +use tokio_stream::wrappers::{ + BroadcastStream, UnboundedReceiverStream, errors::BroadcastStreamRecvError, +}; use tracing::{info, instrument, trace, warn}; use crate::event_cache::{ EventCache, EventCacheError, EventCacheInner, EventsOrigin, RoomEventCacheUpdate, }; +type SessionId<'a> = &'a str; +type OwnedSessionId = String; + /// The information sent across the channel to the long-running task requesting /// that the supplied set of sessions be retried. +#[derive(Debug, Clone)] pub struct DecryptionRetryRequest { - room_id: OwnedRoomId, - session_ids: BTreeSet, + /// The room ID of the room the events belong to. + pub room_id: OwnedRoomId, + /// Events that are not decrypted. + pub utd_session_ids: BTreeSet, + /// Events that are decrypted but might need to have their + /// [`EncryptionInfo`] refreshed. + pub refresh_info_session_ids: BTreeSet, } -type SessionId<'a> = &'a str; +/// A report coming from the redecryptor. +#[derive(Debug, Clone)] +pub enum RedecryptorReport { + /// Events which we were able to decrypt. + ResolvedUtds { + /// The room ID of the room the events belong to. + room_id: OwnedRoomId, + /// The list of event IDs of the decrypted events. + events: BTreeSet, + }, + /// The redecryptor might have missed some room keys so it might not have + /// re-decrypted events that are now decryptable. + Lagging, +} + +pub(super) struct RedecryptorChannels { + utd_reporter: broadcast::Sender, + pub(super) decryption_request_sender: UnboundedSender, + pub(super) decryption_request_receiver: + Mutex>>, +} + +impl RedecryptorChannels { + pub(super) fn new() -> Self { + let (utd_reporter, _) = broadcast::channel(100); + let (decryption_request_sender, decryption_request_receiver) = unbounded_channel(); + + Self { + utd_reporter, + decryption_request_sender, + decryption_request_receiver: Mutex::new(Some(decryption_request_receiver)), + } + } +} impl EventCache { /// Retrieve a set of events that we weren't able to decrypt. @@ -154,7 +205,7 @@ impl EventCache { let (room_cache, _) = self.for_room(room_id).await?; let mut state = room_cache.inner.state.write().await; - let event_ids: Vec<_> = events.iter().map(|(event_id, _)| event_id).collect(); + let event_ids: BTreeSet<_> = events.iter().cloned().map(|(event_id, _)| event_id).collect(); trace!(?event_ids, "Replacing successfully re-decrypted events"); @@ -177,6 +228,14 @@ impl EventCache { origin: EventsOrigin::Cache, }); + // We report that we resolved some UTDs, this is mainly for listeners that don't + // care about the actual events, just about the fact that UTDs got + // resolved. Not sure if we'll have more such listeners but the UTD hook + // is one such thing. + let report = + RedecryptorReport::ResolvedUtds { room_id: room_id.to_owned(), events: event_ids }; + let _ = self.inner.redecryption_channels.utd_reporter.send(report.into()); + Ok(()) } @@ -233,10 +292,28 @@ impl EventCache { Ok(()) } + + /// Explicitly request the redecryption of a set of events. + /// + /// TODO: Explain when and why this might be useful. + pub fn request_decryption(&self, request: DecryptionRetryRequest) { + let _ = + self.inner.redecryption_channels.decryption_request_sender.send(request).inspect_err( + |_| warn!("Requesting a decryption while the redecryption task has been shut down"), + ); + } + + /// Subscribe to reports that the redecryptor generates. + /// + /// TODO: Explain when the redecryptor might send such reports. + pub fn subscrube_to_decryption_reports( + &self, + ) -> impl Stream> { + BroadcastStream::new(self.inner.redecryption_channels.utd_reporter.subscribe()) + } } pub(crate) struct Redecryptor { - request_decryption_sender: UnboundedSender, task: JoinHandle<()>, } @@ -251,23 +328,17 @@ impl Redecryptor { /// /// This creates a task that listens to various streams and attempts to /// redecrypt UTDs that can be found inside the [`EventCache`]. - pub(super) fn new(cache: Weak) -> Self { - let (request_decryption_sender, receiver) = tokio::sync::mpsc::unbounded_channel(); - + pub(super) fn new( + cache: Weak, + receiver: UnboundedReceiver, + ) -> Self { let task = spawn(async { let request_redecryption_stream = UnboundedReceiverStream::new(receiver); Self::listen_for_room_keys_task(cache, request_redecryption_stream).await; }); - Self { task, request_decryption_sender } - } - - #[allow(dead_code)] - pub(super) fn request_decryption(&self, request: DecryptionRetryRequest) { - let _ = self.request_decryption_sender.send(request).inspect_err(|_| { - warn!("Requesting a decryption while the redecryption task has been shut down") - }); + Self { task } } /// (Re)-subscribe to the room key stream from the [`OlmMachine`]. @@ -307,18 +378,23 @@ impl Redecryptor { break false; }; - for session_id in request.session_ids { + for session_id in request.utd_session_ids { let _ = cache .retry_decryption(&request.room_id, &session_id) .await .inspect_err(|e| warn!("Error redecrypting {e:?}")); } + + // TODO: Deal with encryption info updating as well. } // The room key stream from the OlmMachine. Needs to be recreated every time we // receive a `None` from the stream. room_keys = room_key_stream.next() => { match room_keys { Some(Ok(room_keys)) => { + // Alright, some room keys were received and persisted in our store, + // let's attempt to redecrypt events that were encrypted using these + // room keys. let Some(cache) = Self::upgrade_event_cache(cache) else { break false; }; @@ -329,9 +405,21 @@ impl Redecryptor { .await .inspect_err(|e| warn!("Error redecrypting {e:?}")); } + + // TODO: Deal with encryption info updating as well. }, Some(Err(_)) => { - todo!("Handle lagging here, how?") + // We missed some room keys, we need to report this in case a listener + // has and idea which UTDs we should attempt to redecrypt. + // + // This would most likely be the timeline. The timeline might attempt + // to redecrypt all UTDs it is showing to the user. + let Some(cache) = Self::upgrade_event_cache(cache) else { + break false; + }; + + let message = RedecryptorReport::Lagging; + let _ = cache.inner.redecryption_channels.utd_reporter.send(message); }, // The stream got closed, this could mean that our OlmMachine got // regenerated, let's return true and try to recreate the stream. @@ -355,7 +443,16 @@ impl Redecryptor { pin_mut!(decryption_request_stream); while Self::redecryption_loop(&cache, &mut decryption_request_stream).await { - info!("Regenerating the re-decryption streams") + info!("Regenerating the re-decryption streams"); + + let Some(cache) = Self::upgrade_event_cache(&cache) else { + break; + }; + + // Report that the stream got recreated so listeners can attempt to redecrypt + // any UTDs they might be seeing. + let message = RedecryptorReport::Lagging; + let _ = cache.inner.redecryption_channels.utd_reporter.send(message); } info!("Shutting down the event cache redecryptor"); From bab9d9b86a41234b275f9c47ea8abcaaff4039e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Tue, 7 Oct 2025 16:08:16 +0200 Subject: [PATCH 27/28] feat(ui): Listen to redecryptor reports in the timeline --- .../controller/decryption_retry_task.rs | 55 ++++++++++++++++++- 1 file changed, 54 insertions(+), 1 deletion(-) diff --git a/crates/matrix-sdk-ui/src/timeline/controller/decryption_retry_task.rs b/crates/matrix-sdk-ui/src/timeline/controller/decryption_retry_task.rs index 37cc8807d2c..1dc154bf7ea 100644 --- a/crates/matrix-sdk-ui/src/timeline/controller/decryption_retry_task.rs +++ b/crates/matrix-sdk-ui/src/timeline/controller/decryption_retry_task.rs @@ -26,12 +26,13 @@ use matrix_sdk::{ crypto::store::types::RoomKeyInfo, deserialized_responses::TimelineEventKind as SdkTimelineEventKind, encryption::backups::BackupState, + event_cache::{self, RedecryptorReport}, event_handler::EventHandlerHandle, executor::{JoinHandle, spawn}, }; use tokio::sync::{ RwLock, - mpsc::{self, Receiver, Sender}, + mpsc::{self, Receiver, Sender, UnboundedSender}, }; use tokio_stream::{StreamExt as _, wrappers::errors::BroadcastStreamRecvError}; use tracing::{Instrument as _, debug, error, field, info, info_span, warn}; @@ -69,6 +70,58 @@ impl Drop for CryptoDropHandles { } } +async fn redecryption_report_task( + stream: impl Stream>, + timeline_controller: TimelineController, + sender: UnboundedSender, +) { + pin_mut!(stream); + + while let Some(report) = stream.next().await { + match report { + Ok(RedecryptorReport::ResolvedUtds { events, .. }) => { + let state = timeline_controller.state.read().await; + + if let Some(utd_hook) = &state.meta.unable_to_decrypt_hook { + for event_id in events { + utd_hook.on_late_decrypt(&event_id).await; + } + } + } + Ok(RedecryptorReport::Lagging) | Err(_) => { + // The room key stream lagged or the OlmMachine got regenerated. Let's tell the + // redecryptor which events we have. + let state = timeline_controller.state.read().await; + + let (utds, decrypted): (BTreeSet<_>, BTreeSet<_>) = state + .items + .iter() + .filter_map(|event| { + event.as_event().and_then(|e| { + let session_id = e.encryption_info().and_then(|info| info.session_id()); + session_id.map(|id| id.to_owned()).zip(Some(e)) + }) + }) + .partition_map(|(session_id, event)| { + if event.content.is_unable_to_decrypt() { + Either::Left(session_id) + } else { + Either::Right(session_id) + } + }); + + let message = event_cache::DecryptionRetryRequest { + room_id: timeline_controller.room().room_id().to_owned(), + utd_session_ids: utds, + refresh_info_session_ids: decrypted, + }; + + let _ = sender.send(message); + } + } + } +} + /// The task that handles the room keys from backups. async fn room_keys_from_backups_task(stream: S, timeline_controller: TimelineController) where From 54f3b5f604ce0330ec0bc9daf7cb74d652e17aca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Fri, 10 Oct 2025 12:19:46 +0200 Subject: [PATCH 28/28] feat(cache): Let the redecryptor listen to room key withheld updates --- .../matrix-sdk/src/event_cache/redecryptor.rs | 170 ++++++++++++++---- 1 file changed, 136 insertions(+), 34 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/redecryptor.rs b/crates/matrix-sdk/src/event_cache/redecryptor.rs index b9f62916755..1607c111ac6 100644 --- a/crates/matrix-sdk/src/event_cache/redecryptor.rs +++ b/crates/matrix-sdk/src/event_cache/redecryptor.rs @@ -15,10 +15,16 @@ //! The Redecryptor (Rd) is a layer and long-running background task which //! handles redecryption of events in case we couldn't decrypt them imediatelly. //! -//! Rd listens to the OlmMachine for received room keys. If a new room key has -//! been received it attempts to find any UTDs in the [`EventCache`]. If Rd -//! decrypts any UTDs from the event cache it will replace the events in the -//! cache and send out new [`RoomEventCacheUpdates`] to any of its listeners. +//! Rd listens to the OlmMachine for received room keys and new +//! m.room_key.withheld events. +//! +//! If a new room key has been received it attempts to find any UTDs in the +//! [`EventCache`]. If Rd decrypts any UTDs from the event cache it will replace +//! the events in the cache and send out new [`RoomEventCacheUpdates`] to any of +//! its listeners. +//! +//! If a new withheld info has been received it attempts to find any relevant +//! events and updates the [`EncryptionInfo`] of an event. //! //! There's an additional gotcha, the [`OlmMachine`] might get recreated by //! calls to [`BaseClient::regenerate_olm()`]. When this happens we will receive @@ -70,12 +76,15 @@ use futures_util::{StreamExt, pin_mut}; #[cfg(doc)] use matrix_sdk_base::{BaseClient, crypto::OlmMachine}; use matrix_sdk_base::{ - crypto::{store::types::RoomKeyInfo, types::events::room::encrypted::EncryptedEvent}, + crypto::{ + store::types::{RoomKeyInfo, RoomKeyWithheldInfo}, + types::events::room::encrypted::EncryptedEvent, + }, deserialized_responses::{DecryptedRoomEvent, TimelineEvent, TimelineEventKind}, locks::Mutex, }; use matrix_sdk_common::executor::spawn; -use ruma::{OwnedEventId, OwnedRoomId, RoomId, events::AnySyncTimelineEvent, serde::Raw}; +use ruma::{OwnedEventId, OwnedRoomId, RoomId, serde::Raw}; use tokio::{ sync::{ broadcast, @@ -151,36 +160,21 @@ impl EventCache { /// * `room_id` - The ID of the room where the events were sent to. /// * `session_id` - The unique ID of the room key that was used to encrypt /// the event. - async fn get_utds( + async fn get_utds( &self, room_id: &RoomId, - session_id: SessionId<'_>, - ) -> Result)>, EventCacheError> { - let filter_non_utds = |event: TimelineEvent| { - let event_id = event.event_id(); - - // We only care about events fort his particular room key, identified by the - // session ID. - if event.kind.session_id() == Some(session_id) { - // Only pick out events that are UTDs, get just the Raw event as this is what - // the OlmMachine needs. - let event = as_variant!(event.kind, TimelineEventKind::UnableToDecrypt { event, .. } => event); - // Zip the event ID and event together so we don't have to pick out the event ID - // again. We need the event ID to replace the event in the cache. - event_id.zip(event) - } else { - None - } - }; - + _session_id: SessionId<'_>, + filter: impl Fn(TimelineEvent) -> Option, + ) -> Result, EventCacheError> { // Load the relevant events from the event cache store and attempt to redecrypt // things. // // TODO: We can't load **all** events all the time. + // TODO: Use the session ID to filter things. let store = self.inner.store.lock().await?; let events = store.get_room_events(&room_id).await?; - Ok(events.into_iter().filter_map(filter_non_utds).collect()) + Ok(events.into_iter().filter_map(filter).collect()) } /// Handle a chunk of events that we were previously unable to decrypt but @@ -272,8 +266,25 @@ impl EventCache { ) -> Result<(), EventCacheError> { trace!("Retrying to decrypt"); + let filter_non_utds = |event: TimelineEvent| { + let event_id = event.event_id(); + + // We only care about events fort his particular room key, identified by the + // session ID. + if event.kind.session_id() == Some(session_id) { + // Only pick out events that are UTDs, get just the Raw event as this is what + // the OlmMachine needs. + let event = as_variant!(event.kind, TimelineEventKind::UnableToDecrypt { event, .. } => event); + // Zip the event ID and event together so we don't have to pick out the event ID + // again. We need the event ID to replace the event in the cache. + event_id.zip(event) + } else { + None + } + }; + // Get all the relevant UTDs. - let events = self.get_utds(room_id, session_id).await?; + let events = self.get_utds(room_id, session_id, filter_non_utds).await?; // Let's attempt to decrypt them them. let mut decrypted_events = Vec::with_capacity(events.len()); @@ -293,6 +304,54 @@ impl EventCache { Ok(()) } + async fn update_encryption_info( + &self, + room_id: &RoomId, + session_id: SessionId<'_>, + ) -> Result<(), EventCacheError> { + let filter_non_utds = |event: TimelineEvent| { + let event_id = event.event_id(); + + // We only care about events fort his particular room key, identified by the + // session ID. + if event.kind.session_id() == Some(session_id) { + let event = as_variant!(event.kind, TimelineEventKind::Decrypted(event) => event); + // Zip the event ID and event together so we don't have to pick out the event ID + // again. We need the event ID to replace the event in the cache. + event_id.zip(event) + } else { + None + } + }; + + let client = self.inner.client().ok().unwrap(); + let room = client.get_room(room_id).unwrap(); + + // Get all the relevant events. + let events = self.get_utds(room_id, session_id, filter_non_utds).await?; + + // Let's attempt to update their encryption info. + let mut updated_events = Vec::with_capacity(events.len()); + + for (event_id, mut event) in events { + let new_encryption_info = + room.get_encryption_info(session_id, &event.encryption_info.sender).await; + + // Only create a replacement if the encryption info actually changed. + if let Some(new_encryption_info) = new_encryption_info { + if event.encryption_info != new_encryption_info { + event.encryption_info = new_encryption_info; + + updated_events.push((event_id, event)); + } + } + } + + self.on_resolved_utds(room_id, updated_events).await?; + + Ok(()) + } + /// Explicitly request the redecryption of a set of events. /// /// TODO: Explain when and why this might be useful. @@ -347,12 +406,17 @@ impl Redecryptor { /// the sending part of the stream has been dropped. async fn subscribe_to_room_key_stream( cache: &Weak, - ) -> Option, BroadcastStreamRecvError>>> { + ) -> Option<( + impl Stream, BroadcastStreamRecvError>>, + impl Stream>, + )> { let event_cache = cache.upgrade()?; let client = event_cache.client().ok()?; let machine = client.olm_machine().await; - machine.as_ref().map(|m| m.store().room_keys_received_stream()) + machine.as_ref().map(|m| { + (m.store().room_keys_received_stream(), m.store().room_keys_withheld_received_stream()) + }) } fn upgrade_event_cache(cache: &Weak) -> Option { @@ -363,11 +427,14 @@ impl Redecryptor { cache: &Weak, decryption_request_stream: &mut Pin<&mut impl Stream>, ) -> bool { - let Some(room_key_stream) = Self::subscribe_to_room_key_stream(cache).await else { + let Some((room_key_stream, withheld_stream)) = + Self::subscribe_to_room_key_stream(cache).await + else { return false; }; pin_mut!(room_key_stream); + pin_mut!(withheld_stream); loop { tokio::select! { @@ -385,7 +452,14 @@ impl Redecryptor { .inspect_err(|e| warn!("Error redecrypting {e:?}")); } - // TODO: Deal with encryption info updating as well. + for session_id in request.refresh_info_session_ids { + let _ = cache.update_encryption_info(&request.room_id, &session_id).await.inspect_err(|e| + warn!( + room_id = %request.room_id, + session_id = session_id, + "Unable to update the encryption info {e:?}", + )); + } } // The room key stream from the OlmMachine. Needs to be recreated every time we // receive a `None` from the stream. @@ -399,14 +473,21 @@ impl Redecryptor { break false; }; - for key in room_keys { + for key in &room_keys { let _ = cache .retry_decryption(&key.room_id, &key.session_id) .await .inspect_err(|e| warn!("Error redecrypting {e:?}")); } - // TODO: Deal with encryption info updating as well. + for key in room_keys { + let _ = cache.update_encryption_info(&key.room_id, &key.session_id).await.inspect_err(|e| + warn!( + room_id = %key.room_id, + session_id = key.session_id, + "Unable to update the encryption info {e:?}", + )); + } }, Some(Err(_)) => { // We missed some room keys, we need to report this in case a listener @@ -428,6 +509,27 @@ impl Redecryptor { } } } + withheld_info = withheld_stream.next() => { + match withheld_info { + Some(infos) => { + let Some(cache) = Self::upgrade_event_cache(cache) else { + break false; + }; + + for RoomKeyWithheldInfo { room_id, session_id, .. } in &infos { + let _ = cache.update_encryption_info(room_id, session_id).await.inspect_err(|e| + warn!( + room_id = %room_id, + session_id = session_id, + "Unable to update the encryption info {e:?}", + )); + } + } + // The stream got closed, same as for the room key stream, we'll try to + // recreate the streams. + None => break true + } + } else => break false, } }