diff --git a/crates/matrix-sdk-common/src/deserialized_responses.rs b/crates/matrix-sdk-common/src/deserialized_responses.rs index fdde96dbf21..c7b29119cd4 100644 --- a/crates/matrix-sdk-common/src/deserialized_responses.rs +++ b/crates/matrix-sdk-common/src/deserialized_responses.rs @@ -941,6 +941,14 @@ impl TimelineEventKind { TimelineEventKind::PlainText { .. } => None, } } + + /// Get the event type of this event. + /// + /// Returns `None` if there isn't an event type or if the event failed to be + /// deserialized. + pub fn event_type(&self) -> Option { + self.raw().get_field("type").ok().flatten() + } } #[cfg(not(tarpaulin_include))] diff --git a/crates/matrix-sdk-sqlite/migrations/event_cache_store/012_store_event_type.sql b/crates/matrix-sdk-sqlite/migrations/event_cache_store/012_store_event_type.sql new file mode 100644 index 00000000000..2a6633a7ae7 --- /dev/null +++ b/crates/matrix-sdk-sqlite/migrations/event_cache_store/012_store_event_type.sql @@ -0,0 +1,46 @@ +-- After the merge of https://github.com/matrix-org/matrix-rust-sdk/pull/5648, +-- we want all events to get a `TimelineEvent::timestamp` value (extracted from +-- `origin_server_ts`). +-- +-- To accomplish that, we are emptying the event cache. New synced events will +-- be built correctly, with a valid `TimelineEvent::timestamp`, allowing a +-- clear, stable situation. + +DELETE from linked_chunks; +DELETE from event_chunks; -- should be done by cascading +DELETE from gap_chunks; -- should be done by cascading +DELETE from events; + +DROP TABLE events; + +-- Events and their content. +CREATE TABLE "events" ( + -- The room in which the event is located. + "room_id" BLOB NOT NULL, + + -- The `OwnedEventId` of this event. + "event_id" BLOB NOT NULL, + + -- The event type of this event. + "event_type" BLOB NOT NULL, + + -- The ID of the session that was used to encrypt this event, may be null if + -- the event wasn't encrypted. + "session_id" BLOB NULL, + + -- JSON serialized `TimelineEvent` (encrypted value). + "content" BLOB NOT NULL, + + -- If this event is an aggregation (related event), the event id of the event it relates to. + -- Can be null if this event isn't an aggregation. + "relates_to" BLOB, + + -- If this event is an aggregation (related event), the kind of relation it has to the event it + -- relates to. + -- Can be null if this event isn't an aggregation. + "rel_type" BLOB, + + -- Primary key is the event ID. + PRIMARY KEY (event_id) +) +WITHOUT ROWID; diff --git a/crates/matrix-sdk-sqlite/src/event_cache_store.rs b/crates/matrix-sdk-sqlite/src/event_cache_store.rs index ab2654e9e59..f0d8efcf243 100644 --- a/crates/matrix-sdk-sqlite/src/event_cache_store.rs +++ b/crates/matrix-sdk-sqlite/src/event_cache_store.rs @@ -53,6 +53,7 @@ use crate::{ mod keys { // Tables pub const LINKED_CHUNKS: &str = "linked_chunks"; + pub const EVENTS: &str = "events"; } /// The database name. @@ -63,7 +64,7 @@ const DATABASE_NAME: &str = "matrix-sdk-event-cache.sqlite3"; /// This is used to figure whether the SQLite database requires a migration. /// Every new SQL migration should imply a bump of this number, and changes in /// the [`run_migrations`] function. -const DATABASE_VERSION: u8 = 11; +const DATABASE_VERSION: u8 = 12; /// The string used to identify a chunk of type events, in the `type` field in /// the database. @@ -472,6 +473,16 @@ async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> { .await?; } + if version < 12 { + conn.with_transaction(|txn| { + txn.execute_batch(include_str!( + "../migrations/event_cache_store/012_store_event_type.sql" + ))?; + txn.set_db_version(12) + }) + .await?; + } + Ok(()) } @@ -632,7 +643,7 @@ impl EventCacheStore for SqliteEventCacheStore { // deduplicated and moved to another position; or because it was inserted // outside the context of a linked chunk (e.g. pinned event). let mut content_statement = txn.prepare( - "INSERT OR REPLACE INTO events(room_id, event_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?)" + "INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)" )?; let invalid_event = |event: TimelineEvent| { @@ -641,20 +652,27 @@ impl EventCacheStore for SqliteEventCacheStore { return None; }; - Some((event_id.to_string(), event)) + let Some(event_type) = event.kind.event_type() else { + error!(%event_id, "Trying to save an event with no event type"); + return None; + }; + + Some((event_id.to_string(), event_type, event)) }; let room_id = linked_chunk_id.room_id(); let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id); - for (i, (event_id, event)) in items.into_iter().filter_map(invalid_event).enumerate() { + for (i, (event_id, event_type, event)) in items.into_iter().filter_map(invalid_event).enumerate() { // Insert the location information into the database. let index = at.index() + i; chunk_statement.execute((chunk_id, &hashed_linked_chunk_id, &event_id, index))?; + let session_id = event.kind.session_id().map(|s| this.encode_key(keys::EVENTS, s)); + // Now, insert the event content into the database. let encoded_event = this.encode_event(&event)?; - content_statement.execute((&hashed_room_id, event_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?; + content_statement.execute((&hashed_room_id, event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?; } } @@ -671,6 +689,13 @@ impl EventCacheStore for SqliteEventCacheStore { continue; }; + let Some(event_type) = event.kind.event_type() else { + error!(%event_id, "Trying to save an event with no event type"); + continue; + }; + + let session_id = event.kind.session_id().map(|s| this.encode_key(keys::EVENTS, s)); + // Replace the event's content. Really we'd like to update, but in case the // event id changed, we are a bit lenient here and will allow an insertion // of the new event. @@ -678,8 +703,8 @@ impl EventCacheStore for SqliteEventCacheStore { let room_id = linked_chunk_id.room_id(); let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id); txn.execute( - "INSERT OR REPLACE INTO events(room_id, event_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?)" - , (&hashed_room_id, &event_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?; + "INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)", + (&hashed_room_id, &event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?; // Replace the event id in the linked chunk, in case it changed. txn.execute( @@ -1332,10 +1357,18 @@ impl EventCacheStore for SqliteEventCacheStore { let _timer = timer!("method"); let Some(event_id) = event.event_id() else { - error!(%room_id, "Trying to save an event with no ID"); + error!("Trying to save an event with no ID"); return Ok(()); }; + let Some(event_type) = event.kind.event_type() else { + error!(%event_id, "Trying to save an event with no event type"); + return Ok(()); + }; + + let event_type = self.encode_key(keys::EVENTS, event_type); + let session_id = event.kind.session_id().map(|s| self.encode_key(keys::EVENTS, s)); + let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id); let event_id = event_id.to_string(); let encoded_event = self.encode_event(&event)?; @@ -1344,8 +1377,8 @@ impl EventCacheStore for SqliteEventCacheStore { .await? .with_transaction(move |txn| -> Result<_> { txn.execute( - "INSERT OR REPLACE INTO events(room_id, event_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?)" - , (&hashed_room_id, &event_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?; + "INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)", + (&hashed_room_id, &event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?; Ok(()) }) diff --git a/crates/matrix-sdk-ui/src/timeline/controller/decryption_retry_task.rs b/crates/matrix-sdk-ui/src/timeline/controller/decryption_retry_task.rs index bbf9c8c3d02..1dc154bf7ea 100644 --- a/crates/matrix-sdk-ui/src/timeline/controller/decryption_retry_task.rs +++ b/crates/matrix-sdk-ui/src/timeline/controller/decryption_retry_task.rs @@ -26,12 +26,13 @@ use matrix_sdk::{ crypto::store::types::RoomKeyInfo, deserialized_responses::TimelineEventKind as SdkTimelineEventKind, encryption::backups::BackupState, + event_cache::{self, RedecryptorReport}, event_handler::EventHandlerHandle, executor::{JoinHandle, spawn}, }; use tokio::sync::{ RwLock, - mpsc::{self, Receiver, Sender}, + mpsc::{self, Receiver, Sender, UnboundedSender}, }; use tokio_stream::{StreamExt as _, wrappers::errors::BroadcastStreamRecvError}; use tracing::{Instrument as _, debug, error, field, info, info_span, warn}; @@ -69,6 +70,58 @@ impl Drop for CryptoDropHandles { } } +async fn redecryption_report_task( + stream: impl Stream>, + timeline_controller: TimelineController, + sender: UnboundedSender, +) { + pin_mut!(stream); + + while let Some(report) = stream.next().await { + match report { + Ok(RedecryptorReport::ResolvedUtds { events, .. }) => { + let state = timeline_controller.state.read().await; + + if let Some(utd_hook) = &state.meta.unable_to_decrypt_hook { + for event_id in events { + utd_hook.on_late_decrypt(&event_id).await; + } + } + } + Ok(RedecryptorReport::Lagging) | Err(_) => { + // The room key stream lagged or the OlmMachine got regenerated. Let's tell the + // redecryptor which events we have. + let state = timeline_controller.state.read().await; + + let (utds, decrypted): (BTreeSet<_>, BTreeSet<_>) = state + .items + .iter() + .filter_map(|event| { + event.as_event().and_then(|e| { + let session_id = e.encryption_info().and_then(|info| info.session_id()); + session_id.map(|id| id.to_owned()).zip(Some(e)) + }) + }) + .partition_map(|(session_id, event)| { + if event.content.is_unable_to_decrypt() { + Either::Left(session_id) + } else { + Either::Right(session_id) + } + }); + + let message = event_cache::DecryptionRetryRequest { + room_id: timeline_controller.room().room_id().to_owned(), + utd_session_ids: utds, + refresh_info_session_ids: decrypted, + }; + + let _ = sender.send(message); + } + } + } +} + /// The task that handles the room keys from backups. async fn room_keys_from_backups_task(stream: S, timeline_controller: TimelineController) where @@ -300,42 +353,43 @@ async fn decryption_task( ) { debug!("Decryption task starting."); - while let Some(request) = receiver.recv().await { - let should_retry = |session_id: &str| { - if let Some(session_ids) = &request.session_ids { - session_ids.contains(session_id) - } else { - true - } - }; - - // Find the indices of events that are in the supplied sessions, distinguishing - // between UTDs which we need to decrypt, and already-decrypted events where we - // only need to re-fetch encryption info. - let mut state = state.write().await; - let (retry_decryption_indices, retry_info_indices) = - compute_event_indices_to_retry_decryption(&state.items, should_retry); - - // Retry fetching encryption info for events that are already decrypted - if !retry_info_indices.is_empty() { - debug!("Retrying fetching encryption info"); - retry_fetch_encryption_info(&mut state, retry_info_indices, &room_data_provider).await; - } - - // Retry decrypting any unable-to-decrypt messages - if !retry_decryption_indices.is_empty() { - debug!("Retrying decryption"); - decrypt_by_index( - &mut state, - &request.settings, - &room_data_provider, - request.decryptor, - should_retry, - retry_decryption_indices, - ) - .await - } - } + // while let Some(request) = receiver.recv().await { + // let should_retry = |session_id: &str| { + // if let Some(session_ids) = &request.session_ids { + // session_ids.contains(session_id) + // } else { + // true + // } + // }; + // + // // Find the indices of events that are in the supplied sessions, + // distinguishing // between UTDs which we need to decrypt, and + // already-decrypted events where we // only need to re-fetch encryption + // info. let mut state = state.write().await; + // let (retry_decryption_indices, retry_info_indices) = + // compute_event_indices_to_retry_decryption(&state.items, + // should_retry); + // + // // Retry fetching encryption info for events that are already decrypted + // if !retry_info_indices.is_empty() { + // debug!("Retrying fetching encryption info"); + // retry_fetch_encryption_info(&mut state, retry_info_indices, + // &room_data_provider).await; } + // + // // Retry decrypting any unable-to-decrypt messages + // if !retry_decryption_indices.is_empty() { + // debug!("Retrying decryption"); + // decrypt_by_index( + // &mut state, + // &request.settings, + // &room_data_provider, + // request.decryptor, + // should_retry, + // retry_decryption_indices, + // ) + // .await + // } + // } debug!("Decryption task stopping."); } diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index bdb37d38274..de8991303a0 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -74,9 +74,13 @@ use crate::{ mod deduplicator; mod pagination; +#[cfg(feature = "e2e-encryption")] +mod redecryptor; mod room; pub use pagination::{RoomPagination, RoomPaginationStatus}; +#[cfg(feature = "e2e-encryption")] +pub use redecryptor::{DecryptionRetryRequest, RedecryptorReport}; pub use room::{RoomEventCache, RoomEventCacheSubscriber, ThreadEventCacheUpdate}; /// An error observed in the [`EventCache`]. @@ -148,6 +152,10 @@ pub struct EventCacheDropHandles { /// The task used to automatically shrink the linked chunks. auto_shrink_linked_chunk_task: JoinHandle<()>, + + /// The task used to automatically redecrypt UTDs. + #[cfg(feature = "e2e-encryption")] + _redecryptor: redecryptor::Redecryptor, } impl fmt::Debug for EventCacheDropHandles { @@ -200,6 +208,9 @@ impl EventCache { linked_chunk_update_sender.clone(), ))); + #[cfg(feature = "e2e-encryption")] + let redecryption_channels = redecryptor::RedecryptorChannels::new(); + Self { inner: Arc::new(EventCacheInner { client, @@ -213,6 +224,8 @@ impl EventCache { _thread_subscriber_task: thread_subscriber_task, #[cfg(feature = "experimental-search")] _search_indexing_task: search_indexing_task, + #[cfg(feature = "e2e-encryption")] + redecryption_channels, thread_subscriber_receiver, }), } @@ -257,10 +270,25 @@ impl EventCache { auto_shrink_receiver, )); + #[cfg(feature = "e2e-encryption")] + let redecryptor = { + let receiver = self + .inner + .redecryption_channels + .decryption_request_receiver + .lock() + .take() + .expect("We should have initialized the channel an subscribing should happen only once"); + + redecryptor::Redecryptor::new(Arc::downgrade(&self.inner), receiver) + }; + Arc::new(EventCacheDropHandles { listen_updates_task, ignore_user_list_update_task, auto_shrink_linked_chunk_task, + #[cfg(feature = "e2e-encryption")] + _redecryptor: redecryptor, }) }); @@ -840,6 +868,9 @@ struct EventCacheInner { /// This is helpful for tests to coordinate that a new thread subscription /// has been sent or not. thread_subscriber_receiver: Receiver<()>, + + #[cfg(feature = "e2e-encryption")] + redecryption_channels: redecryptor::RedecryptorChannels, } type AutoShrinkChannelPayload = OwnedRoomId; @@ -935,7 +966,7 @@ impl EventCacheInner { self.multiple_room_updates_lock.lock().await }; - // Note: bnjbvr tried to make this concurrent at some point, but it turned out + // NOTE: bnjbvr tried to make this concurrent at some point, but it turned out // to be a performance regression, even for large sync updates. Lacking // time to investigate, this code remains sequential for now. See also // https://github.com/matrix-org/matrix-rust-sdk/pull/5426. diff --git a/crates/matrix-sdk/src/event_cache/redecryptor.rs b/crates/matrix-sdk/src/event_cache/redecryptor.rs new file mode 100644 index 00000000000..1607c111ac6 --- /dev/null +++ b/crates/matrix-sdk/src/event_cache/redecryptor.rs @@ -0,0 +1,744 @@ +// Copyright 2025 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! The Redecryptor (Rd) is a layer and long-running background task which +//! handles redecryption of events in case we couldn't decrypt them imediatelly. +//! +//! Rd listens to the OlmMachine for received room keys and new +//! m.room_key.withheld events. +//! +//! If a new room key has been received it attempts to find any UTDs in the +//! [`EventCache`]. If Rd decrypts any UTDs from the event cache it will replace +//! the events in the cache and send out new [`RoomEventCacheUpdates`] to any of +//! its listeners. +//! +//! If a new withheld info has been received it attempts to find any relevant +//! events and updates the [`EncryptionInfo`] of an event. +//! +//! There's an additional gotcha, the [`OlmMachine`] might get recreated by +//! calls to [`BaseClient::regenerate_olm()`]. When this happens we will receive +//! a `None` on the room keys stream and we need to re-listen to it. +//! +//! Another gotcha is that room keys might be received on another process if the +//! Client is operating on a iOS device. A separate process is used in this case +//! to receive push notifications. In this case the room key will be received +//! and Rd won't get notified about it. To work around this decryption requests +//! can be explicitly sent to Rd. +//! +//! +//! ┌─────────────┐ +//! │ │ +//! ┌───────────┤ Timeline │◄────────────┐ +//! │ │ │ │ +//! │ └─────▲───────┘ │ +//! │ │ │ +//! │ │ │ +//! │ │ │ +//! Decryption │ Redecryptor +//! request │ report +//! │ RoomEventCacheUpdates │ +//! │ │ │ +//! │ │ │ +//! │ ┌──────────┴────────────┐ │ +//! │ │ │ │ +//! └──────► Redecryptor │────────┘ +//! │ │ +//! └───────────▲───────────┘ +//! │ +//! │ +//! │ +//! Received room keys stream +//! │ +//! │ +//! │ +//! ┌───────┴──────┐ +//! │ │ +//! │ OlmMachine │ +//! │ │ +//! └──────────────┘ + +use std::{collections::BTreeSet, pin::Pin, sync::Weak}; + +use as_variant::as_variant; +use futures_core::Stream; +use futures_util::{StreamExt, pin_mut}; +#[cfg(doc)] +use matrix_sdk_base::{BaseClient, crypto::OlmMachine}; +use matrix_sdk_base::{ + crypto::{ + store::types::{RoomKeyInfo, RoomKeyWithheldInfo}, + types::events::room::encrypted::EncryptedEvent, + }, + deserialized_responses::{DecryptedRoomEvent, TimelineEvent, TimelineEventKind}, + locks::Mutex, +}; +use matrix_sdk_common::executor::spawn; +use ruma::{OwnedEventId, OwnedRoomId, RoomId, serde::Raw}; +use tokio::{ + sync::{ + broadcast, + mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}, + }, + task::JoinHandle, +}; +use tokio_stream::wrappers::{ + BroadcastStream, UnboundedReceiverStream, errors::BroadcastStreamRecvError, +}; +use tracing::{info, instrument, trace, warn}; + +use crate::event_cache::{ + EventCache, EventCacheError, EventCacheInner, EventsOrigin, RoomEventCacheUpdate, +}; + +type SessionId<'a> = &'a str; +type OwnedSessionId = String; + +/// The information sent across the channel to the long-running task requesting +/// that the supplied set of sessions be retried. +#[derive(Debug, Clone)] +pub struct DecryptionRetryRequest { + /// The room ID of the room the events belong to. + pub room_id: OwnedRoomId, + /// Events that are not decrypted. + pub utd_session_ids: BTreeSet, + /// Events that are decrypted but might need to have their + /// [`EncryptionInfo`] refreshed. + pub refresh_info_session_ids: BTreeSet, +} + +/// A report coming from the redecryptor. +#[derive(Debug, Clone)] +pub enum RedecryptorReport { + /// Events which we were able to decrypt. + ResolvedUtds { + /// The room ID of the room the events belong to. + room_id: OwnedRoomId, + /// The list of event IDs of the decrypted events. + events: BTreeSet, + }, + /// The redecryptor might have missed some room keys so it might not have + /// re-decrypted events that are now decryptable. + Lagging, +} + +pub(super) struct RedecryptorChannels { + utd_reporter: broadcast::Sender, + pub(super) decryption_request_sender: UnboundedSender, + pub(super) decryption_request_receiver: + Mutex>>, +} + +impl RedecryptorChannels { + pub(super) fn new() -> Self { + let (utd_reporter, _) = broadcast::channel(100); + let (decryption_request_sender, decryption_request_receiver) = unbounded_channel(); + + Self { + utd_reporter, + decryption_request_sender, + decryption_request_receiver: Mutex::new(Some(decryption_request_receiver)), + } + } +} + +impl EventCache { + /// Retrieve a set of events that we weren't able to decrypt. + /// + /// # Arguments + /// + /// * `room_id` - The ID of the room where the events were sent to. + /// * `session_id` - The unique ID of the room key that was used to encrypt + /// the event. + async fn get_utds( + &self, + room_id: &RoomId, + _session_id: SessionId<'_>, + filter: impl Fn(TimelineEvent) -> Option, + ) -> Result, EventCacheError> { + // Load the relevant events from the event cache store and attempt to redecrypt + // things. + // + // TODO: We can't load **all** events all the time. + // TODO: Use the session ID to filter things. + let store = self.inner.store.lock().await?; + let events = store.get_room_events(&room_id).await?; + + Ok(events.into_iter().filter_map(filter).collect()) + } + + /// Handle a chunk of events that we were previously unable to decrypt but + /// have now successfully decrypted. + /// + /// This function will replace the existing UTD events in memory and the + /// store and send out a [`RoomEventCacheUpdate`] for the newly + /// decrypted events. + /// + /// # Arguments + /// + /// * `room_id` - The ID of the room where the events were sent to. + /// * `events` - A chunk of events that were successfully decrypted. + #[instrument(skip_all, fields(room_id))] + async fn on_resolved_utds( + &self, + room_id: &RoomId, + events: Vec<(OwnedEventId, DecryptedRoomEvent)>, + ) -> Result<(), EventCacheError> { + // Get the cache for this particular room and lock the state for the duration of + // the decryption. + let (room_cache, _) = self.for_room(room_id).await?; + let mut state = room_cache.inner.state.write().await; + + let event_ids: BTreeSet<_> = events.iter().cloned().map(|(event_id, _)| event_id).collect(); + + trace!(?event_ids, "Replacing successfully re-decrypted events"); + + for (event_id, decrypted) in events { + // The event isn't in the cache, nothing to replace. Realistically this can't + // happen since we retrieved the list of events from the cache itself and + // `find_event()` will look into the store as well. + if let Some((location, mut target_event)) = state.find_event(&event_id).await? { + target_event.kind = TimelineEventKind::Decrypted(decrypted); + state.replace_event_at(location, target_event).await? + } + } + + // We replaced a bunch of events, reactive updates for those replacements have + // been queued up. We need to send them out to our subscribers now. + let diffs = state.room_linked_chunk_mut().updates_as_vector_diffs(); + + let _ = room_cache.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents { + diffs, + origin: EventsOrigin::Cache, + }); + + // We report that we resolved some UTDs, this is mainly for listeners that don't + // care about the actual events, just about the fact that UTDs got + // resolved. Not sure if we'll have more such listeners but the UTD hook + // is one such thing. + let report = + RedecryptorReport::ResolvedUtds { room_id: room_id.to_owned(), events: event_ids }; + let _ = self.inner.redecryption_channels.utd_reporter.send(report.into()); + + Ok(()) + } + + /// Attempt to decrypt a single event. + async fn decrypt_event( + &self, + room_id: &RoomId, + event: &Raw, + ) -> Option { + let client = self.inner.client().ok()?; + // TODO: Do we need to use the `Room` object to decrypt these events so we can + // calculate if the event should count as a notification, i.e. get the push + // actions. I thing we do, what happens if the room can't be found? We fallback + // to this? + let machine = client.olm_machine().await; + let machine = machine.as_ref()?; + + match machine.decrypt_room_event(event, room_id, client.decryption_settings()).await { + Ok(decrypted) => Some(decrypted), + Err(e) => { + warn!("Failed to redecrypt an event despite receiving a room key for it {e:?}"); + None + } + } + } + + /// Attempt to redecrypt events after a room key with the given session ID + /// has been received. + #[instrument(skip_all, fields(room_key_info))] + async fn retry_decryption( + &self, + room_id: &RoomId, + session_id: SessionId<'_>, + ) -> Result<(), EventCacheError> { + trace!("Retrying to decrypt"); + + let filter_non_utds = |event: TimelineEvent| { + let event_id = event.event_id(); + + // We only care about events fort his particular room key, identified by the + // session ID. + if event.kind.session_id() == Some(session_id) { + // Only pick out events that are UTDs, get just the Raw event as this is what + // the OlmMachine needs. + let event = as_variant!(event.kind, TimelineEventKind::UnableToDecrypt { event, .. } => event); + // Zip the event ID and event together so we don't have to pick out the event ID + // again. We need the event ID to replace the event in the cache. + event_id.zip(event) + } else { + None + } + }; + + // Get all the relevant UTDs. + let events = self.get_utds(room_id, session_id, filter_non_utds).await?; + + // Let's attempt to decrypt them them. + let mut decrypted_events = Vec::with_capacity(events.len()); + + for (event_id, event) in events { + // If we managed to decrypt the event, and we should have to since we received + // the room key for this specific event, then replace the event. + if let Some(decrypted) = self.decrypt_event(room_id, event.cast_ref_unchecked()).await { + decrypted_events.push((event_id, decrypted)); + } + } + + // Replace the events and notify listeners that UTDs have been replaced with + // decrypted events. + self.on_resolved_utds(room_id, decrypted_events).await?; + + Ok(()) + } + + async fn update_encryption_info( + &self, + room_id: &RoomId, + session_id: SessionId<'_>, + ) -> Result<(), EventCacheError> { + let filter_non_utds = |event: TimelineEvent| { + let event_id = event.event_id(); + + // We only care about events fort his particular room key, identified by the + // session ID. + if event.kind.session_id() == Some(session_id) { + let event = as_variant!(event.kind, TimelineEventKind::Decrypted(event) => event); + // Zip the event ID and event together so we don't have to pick out the event ID + // again. We need the event ID to replace the event in the cache. + event_id.zip(event) + } else { + None + } + }; + + let client = self.inner.client().ok().unwrap(); + let room = client.get_room(room_id).unwrap(); + + // Get all the relevant events. + let events = self.get_utds(room_id, session_id, filter_non_utds).await?; + + // Let's attempt to update their encryption info. + let mut updated_events = Vec::with_capacity(events.len()); + + for (event_id, mut event) in events { + let new_encryption_info = + room.get_encryption_info(session_id, &event.encryption_info.sender).await; + + // Only create a replacement if the encryption info actually changed. + if let Some(new_encryption_info) = new_encryption_info { + if event.encryption_info != new_encryption_info { + event.encryption_info = new_encryption_info; + + updated_events.push((event_id, event)); + } + } + } + + self.on_resolved_utds(room_id, updated_events).await?; + + Ok(()) + } + + /// Explicitly request the redecryption of a set of events. + /// + /// TODO: Explain when and why this might be useful. + pub fn request_decryption(&self, request: DecryptionRetryRequest) { + let _ = + self.inner.redecryption_channels.decryption_request_sender.send(request).inspect_err( + |_| warn!("Requesting a decryption while the redecryption task has been shut down"), + ); + } + + /// Subscribe to reports that the redecryptor generates. + /// + /// TODO: Explain when the redecryptor might send such reports. + pub fn subscrube_to_decryption_reports( + &self, + ) -> impl Stream> { + BroadcastStream::new(self.inner.redecryption_channels.utd_reporter.subscribe()) + } +} + +pub(crate) struct Redecryptor { + task: JoinHandle<()>, +} + +impl Drop for Redecryptor { + fn drop(&mut self) { + self.task.abort(); + } +} + +impl Redecryptor { + /// Create a new [`Redecryptor`]. + /// + /// This creates a task that listens to various streams and attempts to + /// redecrypt UTDs that can be found inside the [`EventCache`]. + pub(super) fn new( + cache: Weak, + receiver: UnboundedReceiver, + ) -> Self { + let task = spawn(async { + let request_redecryption_stream = UnboundedReceiverStream::new(receiver); + + Self::listen_for_room_keys_task(cache, request_redecryption_stream).await; + }); + + Self { task } + } + + /// (Re)-subscribe to the room key stream from the [`OlmMachine`]. + /// + /// This needs to happen any time this stream returns a `None` meaning that + /// the sending part of the stream has been dropped. + async fn subscribe_to_room_key_stream( + cache: &Weak, + ) -> Option<( + impl Stream, BroadcastStreamRecvError>>, + impl Stream>, + )> { + let event_cache = cache.upgrade()?; + let client = event_cache.client().ok()?; + let machine = client.olm_machine().await; + + machine.as_ref().map(|m| { + (m.store().room_keys_received_stream(), m.store().room_keys_withheld_received_stream()) + }) + } + + fn upgrade_event_cache(cache: &Weak) -> Option { + cache.upgrade().map(|inner| EventCache { inner }) + } + + async fn redecryption_loop( + cache: &Weak, + decryption_request_stream: &mut Pin<&mut impl Stream>, + ) -> bool { + let Some((room_key_stream, withheld_stream)) = + Self::subscribe_to_room_key_stream(cache).await + else { + return false; + }; + + pin_mut!(room_key_stream); + pin_mut!(withheld_stream); + + loop { + tokio::select! { + // An explicit request, presumably from the timeline, has been received to decrypt + // events that were encrypted with a certain room key. + Some(request) = decryption_request_stream.next() => { + let Some(cache) = Self::upgrade_event_cache(cache) else { + break false; + }; + + for session_id in request.utd_session_ids { + let _ = cache + .retry_decryption(&request.room_id, &session_id) + .await + .inspect_err(|e| warn!("Error redecrypting {e:?}")); + } + + for session_id in request.refresh_info_session_ids { + let _ = cache.update_encryption_info(&request.room_id, &session_id).await.inspect_err(|e| + warn!( + room_id = %request.room_id, + session_id = session_id, + "Unable to update the encryption info {e:?}", + )); + } + } + // The room key stream from the OlmMachine. Needs to be recreated every time we + // receive a `None` from the stream. + room_keys = room_key_stream.next() => { + match room_keys { + Some(Ok(room_keys)) => { + // Alright, some room keys were received and persisted in our store, + // let's attempt to redecrypt events that were encrypted using these + // room keys. + let Some(cache) = Self::upgrade_event_cache(cache) else { + break false; + }; + + for key in &room_keys { + let _ = cache + .retry_decryption(&key.room_id, &key.session_id) + .await + .inspect_err(|e| warn!("Error redecrypting {e:?}")); + } + + for key in room_keys { + let _ = cache.update_encryption_info(&key.room_id, &key.session_id).await.inspect_err(|e| + warn!( + room_id = %key.room_id, + session_id = key.session_id, + "Unable to update the encryption info {e:?}", + )); + } + }, + Some(Err(_)) => { + // We missed some room keys, we need to report this in case a listener + // has and idea which UTDs we should attempt to redecrypt. + // + // This would most likely be the timeline. The timeline might attempt + // to redecrypt all UTDs it is showing to the user. + let Some(cache) = Self::upgrade_event_cache(cache) else { + break false; + }; + + let message = RedecryptorReport::Lagging; + let _ = cache.inner.redecryption_channels.utd_reporter.send(message); + }, + // The stream got closed, this could mean that our OlmMachine got + // regenerated, let's return true and try to recreate the stream. + None => { + break true + } + } + } + withheld_info = withheld_stream.next() => { + match withheld_info { + Some(infos) => { + let Some(cache) = Self::upgrade_event_cache(cache) else { + break false; + }; + + for RoomKeyWithheldInfo { room_id, session_id, .. } in &infos { + let _ = cache.update_encryption_info(room_id, session_id).await.inspect_err(|e| + warn!( + room_id = %room_id, + session_id = session_id, + "Unable to update the encryption info {e:?}", + )); + } + } + // The stream got closed, same as for the room key stream, we'll try to + // recreate the streams. + None => break true + } + } + else => break false, + } + } + } + + async fn listen_for_room_keys_task( + cache: Weak, + decryption_request_stream: UnboundedReceiverStream, + ) { + // We pin the decryption request stream here since that one doesn't need to be + // recreated and we don't want to miss messages coming from the stream + // while recreating it unnecessarily. + pin_mut!(decryption_request_stream); + + while Self::redecryption_loop(&cache, &mut decryption_request_stream).await { + info!("Regenerating the re-decryption streams"); + + let Some(cache) = Self::upgrade_event_cache(&cache) else { + break; + }; + + // Report that the stream got recreated so listeners can attempt to redecrypt + // any UTDs they might be seeing. + let message = RedecryptorReport::Lagging; + let _ = cache.inner.redecryption_channels.utd_reporter.send(message); + } + + info!("Shutting down the event cache redecryptor"); + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use assert_matches2::assert_matches; + use eyeball_im::VectorDiff; + use matrix_sdk_base::deserialized_responses::TimelineEventKind; + use matrix_sdk_test::{ + JoinedRoomBuilder, StateTestEvent, async_test, event_factory::EventFactory, + }; + use ruma::{device_id, event_id, room_id, user_id}; + use serde_json::json; + + use crate::{ + assert_let_timeout, encryption::EncryptionSettings, event_cache::RoomEventCacheUpdate, + test_utils::mocks::MatrixMockServer, + }; + + #[async_test] + async fn test_redecryptor() { + let room_id = room_id!("!test:localhost"); + + let alice_user_id = user_id!("@alice:localhost"); + let alice_device_id = device_id!("ALICEDEVICE"); + let bob_user_id = user_id!("@bob:localhost"); + let bob_device_id = device_id!("BOBDEVICE"); + + let matrix_mock_server = MatrixMockServer::new().await; + matrix_mock_server.mock_crypto_endpoints_preset().await; + + let encryption_settings = + EncryptionSettings { auto_enable_cross_signing: true, ..Default::default() }; + + // Create some clients for Alice and Bob. + + let alice = matrix_mock_server + .client_builder_for_crypto_end_to_end(alice_user_id, alice_device_id) + .on_builder(|builder| { + builder + .with_enable_share_history_on_invite(true) + .with_encryption_settings(encryption_settings) + }) + .build() + .await; + + let bob = matrix_mock_server + .client_builder_for_crypto_end_to_end(bob_user_id, bob_device_id) + .on_builder(|builder| { + builder + .with_enable_share_history_on_invite(true) + .with_encryption_settings(encryption_settings) + }) + .build() + .await; + + bob.event_cache().subscribe().expect("Bob should be able to enable the event cache"); + + // Ensure that Alice and Bob are aware of their devices and identities. + matrix_mock_server.exchange_e2ee_identities(&alice, &bob).await; + + let event_factory = EventFactory::new().room(room_id); + let alice_member_event = event_factory.member(alice_user_id).into_raw(); + let bob_member_event = event_factory.member(bob_user_id).into_raw(); + + // Let us now create a room for them. + let room_builder = JoinedRoomBuilder::new(room_id) + .add_state_event(StateTestEvent::Create) + .add_state_event(StateTestEvent::Encryption); + + matrix_mock_server + .mock_sync() + .ok_and_run(&alice, |builder| { + builder.add_joined_room(room_builder.clone()); + }) + .await; + + matrix_mock_server + .mock_sync() + .ok_and_run(&bob, |builder| { + builder.add_joined_room(room_builder); + }) + .await; + + let room = alice + .get_room(room_id) + .expect("Alice should have access to the room now that we synced"); + + // Alice will send a single event to the room, but this will trigger a to-device + // message containing the room key to be sent as well. We capture both the event + // and the to-device message. + + let event_type = "m.room.message"; + let content = json!({"body": "It's a secret to everybody", "msgtype": "m.text"}); + + let event_id = event_id!("$some_id"); + let (event_receiver, mock) = + matrix_mock_server.mock_room_send().ok_with_capture(event_id, alice_user_id); + let (_guard, room_key) = matrix_mock_server.mock_capture_put_to_device(alice_user_id).await; + + { + let _guard = mock.mock_once().mount_as_scoped().await; + + matrix_mock_server + .mock_get_members() + .ok(vec![alice_member_event.clone(), bob_member_event.clone()]) + .mock_once() + .mount() + .await; + + room.send_raw(event_type, content) + .await + .expect("We should be able to send an initial message"); + }; + + // Let's now see what Bob's event cache does. + + let (room_cache, _) = bob + .event_cache() + .for_room(room_id) + .await + .expect("We should be able to get to the event cache for a specific room"); + + let (_, mut subscriber) = room_cache.subscribe().await; + + // Let us retrieve the captured event and to-device message. + let event = event_receiver.await.expect("Alice should have sent the event by now"); + let room_key = room_key.await; + + // We regenerate the Olm machine to check if the room key stream is recreated to + // correctly. + bob.inner + .base_client + .regenerate_olm(None) + .await + .expect("We should be able to regenerate the Olm machine"); + + // Let us forward the event to Bob. + matrix_mock_server + .mock_sync() + .ok_and_run(&bob, |builder| { + builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_timeline_event(event)); + }) + .await; + + // Alright, Bob has received an update from the cache. + + assert_let_timeout!( + Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = subscriber.recv() + ); + + // There should be a single new event, and it should be a UTD as we did not + // receive the room key yet. + assert_eq!(diffs.len(), 1); + assert_matches!(&diffs[0], VectorDiff::Append { values }); + assert_matches!(&values[0].kind, TimelineEventKind::UnableToDecrypt { .. }); + + // Now we send the room key to Bob. + matrix_mock_server + .mock_sync() + .ok_and_run(&bob, |builder| { + builder.add_to_device_event( + room_key + .deserialize_as() + .expect("We should be able to deserialize the room key"), + ); + }) + .await; + + // Bob should receive a new update from the cache. + assert_let_timeout!( + Duration::from_secs(1), + Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = subscriber.recv() + ); + + // It should replace the UTD with a decrypted event. + assert_eq!(diffs.len(), 1); + assert_matches!(&diffs[0], VectorDiff::Set { index, value }); + assert_eq!(*index, 0); + assert_matches!(&value.kind, TimelineEventKind::Decrypted { .. }); + } +} diff --git a/crates/matrix-sdk/src/event_cache/room/mod.rs b/crates/matrix-sdk/src/event_cache/room/mod.rs index 670466ccafa..2dd86cc4d1f 100644 --- a/crates/matrix-sdk/src/event_cache/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/room/mod.rs @@ -1321,6 +1321,11 @@ mod private { &self.room_linked_chunk } + /// Returns a mutable reference to the underlying room linked chunk. + pub(in crate::event_cache) fn room_linked_chunk_mut(&mut self) -> &mut EventLinkedChunk { + &mut self.room_linked_chunk + } + //// Find a single event in this room, starting from the most recent event. /// /// **Warning**! It looks into the loaded events from the in-memory @@ -1577,7 +1582,7 @@ mod private { /// observers that a single item has been replaced. Otherwise, /// such a notification is not emitted, because observers are /// unlikely to observe the store updates directly. - async fn replace_event_at( + pub(crate) async fn replace_event_at( &mut self, location: EventLocation, event: Event, @@ -2266,7 +2271,7 @@ mod timed_tests { let event_cache = client.event_cache(); - // Don't forget to subscribe and like^W enable storage! + // Don't forget to subscribe and like. event_cache.subscribe().unwrap(); client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined); @@ -2343,7 +2348,7 @@ mod timed_tests { let event_cache = client.event_cache(); - // Don't forget to subscribe and like^W enable storage! + // Don't forget to subscribe and like. event_cache.subscribe().unwrap(); client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined); @@ -2485,7 +2490,7 @@ mod timed_tests { let event_cache = client.event_cache(); - // Don't forget to subscribe and like^W enable storage! + // Don't forget to subscribe and like. event_cache.subscribe().unwrap(); client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined); @@ -2633,7 +2638,7 @@ mod timed_tests { let event_cache = client.event_cache(); - // Don't forget to subscribe and like^W enable storage! + // Don't forget to subscribe and like. event_cache.subscribe().unwrap(); // Let's check whether the generic updates are received for the initialisation. @@ -2757,7 +2762,7 @@ mod timed_tests { let event_cache = client.event_cache(); - // Don't forget to subscribe and like^W enable storage! + // Don't forget to subscribe and like. event_cache.subscribe().unwrap(); client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined); diff --git a/crates/matrix-sdk/src/test_utils/mod.rs b/crates/matrix-sdk/src/test_utils/mod.rs index 9572d556909..7eaa6e58174 100644 --- a/crates/matrix-sdk/src/test_utils/mod.rs +++ b/crates/matrix-sdk/src/test_utils/mod.rs @@ -138,6 +138,10 @@ macro_rules! assert_next_with_timeout { /// milliseconds. #[macro_export] macro_rules! assert_recv_with_timeout { + ($receiver:expr) => { + $crate::assert_recv_with_timeout!($receiver, 1000) + }; + ($receiver:expr, $timeout_ms:expr) => {{ tokio::time::timeout(std::time::Duration::from_millis($timeout_ms), $receiver.recv()) .await