From c5398da09526f265d765b22e8bfd5fc34df4f62c Mon Sep 17 00:00:00 2001 From: Andy Balaam Date: Wed, 10 Sep 2025 15:01:06 +0100 Subject: [PATCH 1/2] Test for keys arriving in the middle of adding UTD to timeline --- .../src/timeline/tests/encryption.rs | 545 +++++++++++++++++- 1 file changed, 529 insertions(+), 16 deletions(-) diff --git a/crates/matrix-sdk-ui/src/timeline/tests/encryption.rs b/crates/matrix-sdk-ui/src/timeline/tests/encryption.rs index 24b4eef21bd..06e4a24f250 100644 --- a/crates/matrix-sdk-ui/src/timeline/tests/encryption.rs +++ b/crates/matrix-sdk-ui/src/timeline/tests/encryption.rs @@ -15,6 +15,7 @@ #![cfg(not(target_family = "wasm"))] use std::{ + collections::{BTreeMap, BTreeSet, HashMap}, io::Cursor, iter, sync::{Arc, Mutex}, @@ -26,20 +27,37 @@ use assert_matches::assert_matches; use assert_matches2::assert_let; use eyeball_im::VectorDiff; use matrix_sdk::{ - assert_next_matches_with_timeout, + MemoryStore, QueueWedgeError, RoomInfo, RoomMemberships, StateChanges, StateStore, StoreError, + assert_next_matches_with_timeout, async_trait, crypto::{OlmMachine, decrypt_room_key_export, types::events::UtdCause}, deserialized_responses::{ - AlgorithmInfo, DecryptedRoomEvent, EncryptionInfo, VerificationLevel, VerificationState, + AlgorithmInfo, DecryptedRoomEvent, DisplayName, EncryptionInfo, RawAnySyncOrStrippedState, + VerificationLevel, VerificationState, + }, + room::ThreadSubscription, + store::{ + ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, QueuedRequest, + QueuedRequestKind, RoomLoadSettings, SentRequestKey, StoreConfig, }, test_utils::test_client_builder, }; -use matrix_sdk_base::deserialized_responses::{TimelineEvent, UnableToDecryptReason}; +use matrix_sdk_base::{ + MinimalRoomMemberEvent, StateStoreDataKey, StateStoreDataValue, + deserialized_responses::{TimelineEvent, UnableToDecryptReason}, +}; use matrix_sdk_test::{ALICE, BOB, async_test}; use ruma::{ - assign, event_id, - events::room::encrypted::{ - EncryptedEventScheme, MegolmV1AesSha2ContentInit, Relation, Replacement, - RoomEncryptedEventContent, + EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedTransactionId, + OwnedUserId, RoomId, TransactionId, UserId, assign, event_id, + events::{ + AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, GlobalAccountDataEventType, + RoomAccountDataEventType, StateEventType, + presence::PresenceEvent, + receipt::{Receipt, ReceiptThread, ReceiptType}, + room::encrypted::{ + EncryptedEventScheme, MegolmV1AesSha2ContentInit, Relation, Replacement, + RoomEncryptedEventContent, + }, }, owned_device_id, room_id, serde::Raw, @@ -47,7 +65,7 @@ use ruma::{ }; use serde_json::{json, value::to_raw_value}; use stream_assert::{assert_next_matches, assert_pending}; -use tokio::time::sleep; +use tokio::{spawn, time::sleep}; use super::TestTimeline; use crate::{ @@ -727,8 +745,8 @@ fn make_encryption_info( #[async_test] async fn test_utd_cause_for_nonmember_event_is_found() { - // Given a timline - let timeline = TestTimeline::new(); + // Given a timeline + let timeline = timeline_with_decryptor().await; let mut stream = timeline.subscribe().await; // When we add an event with "membership: leave" @@ -748,8 +766,8 @@ async fn test_utd_cause_for_nonmember_event_is_found() { #[async_test] async fn test_utd_cause_for_nonmember_event_is_found_unstable_prefix() { - // Given a timline - let timeline = TestTimeline::new(); + // Given a timeline + let timeline = timeline_with_decryptor().await; let mut stream = timeline.subscribe().await; // When we add an event with "io.element.msc4115.membership: leave" @@ -773,8 +791,8 @@ async fn test_utd_cause_for_nonmember_event_is_found_unstable_prefix() { #[async_test] async fn test_utd_cause_for_member_event_is_unknown() { - // Given a timline - let timeline = TestTimeline::new(); + // Given a timeline + let timeline = timeline_with_decryptor().await; let mut stream = timeline.subscribe().await; // When we add an event with "membership: join" @@ -794,8 +812,8 @@ async fn test_utd_cause_for_member_event_is_unknown() { #[async_test] async fn test_utd_cause_for_missing_membership_is_unknown() { - // Given a timline - let timeline = TestTimeline::new(); + // Given a timeline + let timeline = timeline_with_decryptor().await; let mut stream = timeline.subscribe().await; // When we add an event with no membership in unsigned @@ -943,6 +961,488 @@ async fn test_retry_decryption_updates_response() { } } +/// See https://github.com/matrix-org/matrix-rust-sdk/issues/5474 - quoting poljar: +/// Since in our sliding sync world we have two syncs running, one for the +/// events and one for the to-device messages, I think we have a race: +/// +/// 1. The event is received, we try to decrypt it but there's no key yet. +/// 2. The room key is received, we notify that a key has been received. +/// 3. The event is pushed to the timeline. +/// +/// This test checks that the event is decrypted even in this case. +#[async_test] +async fn test_event_is_redecrypted_even_if_key_arrives_between_receiving_and_adding_to_timeline() { + /// A state store that refuses to complete calls to `set_kv_data` until + /// someone calls `stop_delaying`. Other than that, it works like + /// MemoryStore. + #[derive(Debug)] + struct DelayingStore { + memory_store: MemoryStore, + delaying: Mutex, + } + + impl DelayingStore { + fn new() -> Self { + Self { memory_store: MemoryStore::new(), delaying: Mutex::new(true) } + } + + fn stop_delaying(&self) { + *self.delaying.lock().unwrap() = false + } + } + + #[async_trait] + impl StateStore for DelayingStore { + type Error = StoreError; + + async fn get_kv_data( + &self, + key: StateStoreDataKey<'_>, + ) -> matrix_sdk_base::store::Result> { + self.memory_store.get_kv_data(key).await + } + + async fn set_kv_data( + &self, + key: StateStoreDataKey<'_>, + value: StateStoreDataValue, + ) -> matrix_sdk_base::store::Result<()> { + // This is the key behaviour of this store - we wait to set this value until + // someone calls `stop_delaying`. + // + // We use `sleep` here for simplicity. The cool way would be to use a custom + // waker or something like that. + while *self.delaying.lock().unwrap() { + sleep(Duration::from_millis(10)).await; + } + + self.memory_store.set_kv_data(key, value).await + } + + async fn remove_kv_data( + &self, + key: StateStoreDataKey<'_>, + ) -> matrix_sdk_base::store::Result<()> { + self.memory_store.remove_kv_data(key).await + } + + async fn save_changes(&self, changes: &StateChanges) -> matrix_sdk_base::store::Result<()> { + self.memory_store.save_changes(changes).await + } + + async fn get_presence_event( + &self, + user_id: &UserId, + ) -> matrix_sdk_base::store::Result>> { + self.memory_store.get_presence_event(user_id).await + } + + async fn get_presence_events( + &self, + user_ids: &[OwnedUserId], + ) -> matrix_sdk_base::store::Result>> { + self.memory_store.get_presence_events(user_ids).await + } + + async fn get_state_event( + &self, + room_id: &RoomId, + event_type: StateEventType, + state_key: &str, + ) -> matrix_sdk_base::store::Result> { + self.memory_store.get_state_event(room_id, event_type, state_key).await + } + + async fn get_state_events( + &self, + room_id: &RoomId, + event_type: StateEventType, + ) -> matrix_sdk_base::store::Result> { + self.memory_store.get_state_events(room_id, event_type).await + } + + async fn get_state_events_for_keys( + &self, + room_id: &RoomId, + event_type: StateEventType, + state_keys: &[&str], + ) -> matrix_sdk_base::store::Result, Self::Error> { + self.memory_store.get_state_events_for_keys(room_id, event_type, state_keys).await + } + + async fn get_profile( + &self, + room_id: &RoomId, + user_id: &UserId, + ) -> matrix_sdk_base::store::Result> { + self.memory_store.get_profile(room_id, user_id).await + } + + async fn get_profiles<'a>( + &self, + room_id: &RoomId, + user_ids: &'a [OwnedUserId], + ) -> matrix_sdk_base::store::Result> { + self.memory_store.get_profiles(room_id, user_ids).await + } + + async fn get_user_ids( + &self, + room_id: &RoomId, + memberships: RoomMemberships, + ) -> matrix_sdk_base::store::Result> { + self.memory_store.get_user_ids(room_id, memberships).await + } + + async fn get_room_infos( + &self, + room_load_settings: &RoomLoadSettings, + ) -> matrix_sdk_base::store::Result> { + self.memory_store.get_room_infos(room_load_settings).await + } + + async fn get_users_with_display_name( + &self, + room_id: &RoomId, + display_name: &DisplayName, + ) -> matrix_sdk_base::store::Result> { + self.memory_store.get_users_with_display_name(room_id, display_name).await + } + + async fn get_users_with_display_names<'a>( + &self, + room_id: &RoomId, + display_names: &'a [DisplayName], + ) -> matrix_sdk_base::store::Result>> + { + self.memory_store.get_users_with_display_names(room_id, display_names).await + } + + async fn get_account_data_event( + &self, + event_type: GlobalAccountDataEventType, + ) -> matrix_sdk_base::store::Result>> { + self.memory_store.get_account_data_event(event_type).await + } + + async fn get_room_account_data_event( + &self, + room_id: &RoomId, + event_type: RoomAccountDataEventType, + ) -> matrix_sdk_base::store::Result>> { + self.memory_store.get_room_account_data_event(room_id, event_type).await + } + + async fn get_user_room_receipt_event( + &self, + room_id: &RoomId, + receipt_type: ReceiptType, + thread: ReceiptThread, + user_id: &UserId, + ) -> matrix_sdk_base::store::Result> { + self.memory_store + .get_user_room_receipt_event(room_id, receipt_type, thread, user_id) + .await + } + + async fn get_event_room_receipt_events( + &self, + room_id: &RoomId, + receipt_type: ReceiptType, + thread: ReceiptThread, + event_id: &EventId, + ) -> matrix_sdk_base::store::Result> { + self.memory_store + .get_event_room_receipt_events(room_id, receipt_type, thread, event_id) + .await + } + + async fn get_custom_value( + &self, + key: &[u8], + ) -> matrix_sdk_base::store::Result>> { + self.memory_store.get_custom_value(key).await + } + + async fn set_custom_value( + &self, + key: &[u8], + value: Vec, + ) -> matrix_sdk_base::store::Result>> { + self.memory_store.set_custom_value(key, value).await + } + + async fn remove_custom_value( + &self, + key: &[u8], + ) -> matrix_sdk_base::store::Result>> { + self.memory_store.remove_custom_value(key).await + } + + async fn remove_room(&self, room_id: &RoomId) -> matrix_sdk_base::store::Result<()> { + self.memory_store.remove_room(room_id).await + } + + async fn save_send_queue_request( + &self, + room_id: &RoomId, + transaction_id: OwnedTransactionId, + created_at: MilliSecondsSinceUnixEpoch, + kind: QueuedRequestKind, + priority: usize, + ) -> matrix_sdk_base::store::Result<(), Self::Error> { + self.memory_store + .save_send_queue_request(room_id, transaction_id, created_at, kind, priority) + .await + } + + async fn update_send_queue_request( + &self, + room_id: &RoomId, + transaction_id: &TransactionId, + kind: QueuedRequestKind, + ) -> matrix_sdk_base::store::Result { + self.memory_store.update_send_queue_request(room_id, transaction_id, kind).await + } + + async fn remove_send_queue_request( + &self, + room_id: &RoomId, + transaction_id: &TransactionId, + ) -> matrix_sdk_base::store::Result { + self.memory_store.remove_send_queue_request(room_id, transaction_id).await + } + + async fn load_send_queue_requests( + &self, + room_id: &RoomId, + ) -> matrix_sdk_base::store::Result, Self::Error> { + self.memory_store.load_send_queue_requests(room_id).await + } + + async fn update_send_queue_request_status( + &self, + room_id: &RoomId, + transaction_id: &TransactionId, + error: Option, + ) -> matrix_sdk_base::store::Result<(), Self::Error> { + self.memory_store.update_send_queue_request_status(room_id, transaction_id, error).await + } + + async fn load_rooms_with_unsent_requests( + &self, + ) -> matrix_sdk_base::store::Result, Self::Error> { + self.memory_store.load_rooms_with_unsent_requests().await + } + + async fn save_dependent_queued_request( + &self, + room_id: &RoomId, + parent_txn_id: &TransactionId, + own_txn_id: ChildTransactionId, + created_at: MilliSecondsSinceUnixEpoch, + content: DependentQueuedRequestKind, + ) -> matrix_sdk_base::store::Result<(), Self::Error> { + self.memory_store + .save_dependent_queued_request( + room_id, + parent_txn_id, + own_txn_id, + created_at, + content, + ) + .await + } + + async fn mark_dependent_queued_requests_as_ready( + &self, + room_id: &RoomId, + parent_txn_id: &TransactionId, + sent_parent_key: SentRequestKey, + ) -> matrix_sdk_base::store::Result { + self.memory_store + .mark_dependent_queued_requests_as_ready(room_id, parent_txn_id, sent_parent_key) + .await + } + + async fn update_dependent_queued_request( + &self, + room_id: &RoomId, + own_transaction_id: &ChildTransactionId, + new_content: DependentQueuedRequestKind, + ) -> matrix_sdk_base::store::Result { + self.memory_store + .update_dependent_queued_request(room_id, own_transaction_id, new_content) + .await + } + + async fn remove_dependent_queued_request( + &self, + room: &RoomId, + own_txn_id: &ChildTransactionId, + ) -> matrix_sdk_base::store::Result { + self.memory_store.remove_dependent_queued_request(room, own_txn_id).await + } + + async fn load_dependent_queued_requests( + &self, + room: &RoomId, + ) -> matrix_sdk_base::store::Result, Self::Error> { + self.memory_store.load_dependent_queued_requests(room).await + } + + async fn upsert_thread_subscription( + &self, + room: &RoomId, + thread_id: &EventId, + subscription: ThreadSubscription, + ) -> matrix_sdk_base::store::Result<(), Self::Error> { + self.memory_store.upsert_thread_subscription(room, thread_id, subscription).await + } + + async fn load_thread_subscription( + &self, + room: &RoomId, + thread_id: &EventId, + ) -> matrix_sdk_base::store::Result, Self::Error> { + self.memory_store.load_thread_subscription(room, thread_id).await + } + + async fn remove_thread_subscription( + &self, + room: &RoomId, + thread_id: &EventId, + ) -> matrix_sdk_base::store::Result<(), Self::Error> { + self.memory_store.remove_thread_subscription(room, thread_id).await + } + } + + //// 1. The event is received, we try to decrypt it but there's no key yet. + //// Because we're using the DelayingStore, it gets stuck after being + //// received but before we add it to the timeline. It actually gets stuck + //// inside `UtdHookManager::report_utd` but that is not important to this + //// test, so long as it is stuck somewhere. + let utd_hook = Arc::new(DummyUtdHook::default()); + let state_store = Arc::new(DelayingStore::new()); + let store_config = StoreConfig::new( + "test_event_is_redecrypted_even_if_key_arrives_between_receiving_and_adding_to_timeline" + .to_owned(), + ) + .state_store(state_store.clone()); + + let client = test_client_builder(None).store_config(store_config).build().await.unwrap(); + + //// 2. The room key is received, we notify that a key has been received. + //// 3. The event is pushed to the timeline. + + const SESSION_ID: &str = "gM8i47Xhu0q52xLfgUXzanCMpLinoyVyH7R58cBuVBU"; + const SESSION_KEY: &[u8] = b"\ + -----BEGIN MEGOLM SESSION DATA-----\n\ + ASKcWoiAVUM97482UAi83Avce62hSLce7i5JhsqoF6xeAAAACqt2Cg3nyJPRWTTMXxXH7TXnkfdlmBXbQtq5\ + bpHo3LRijcq2Gc6TXilESCmJN14pIsfKRJrWjZ0squ/XsoTFytuVLWwkNaW3QF6obeg2IoVtJXLMPdw3b2vO\ + vgwGY3OMP0XafH13j1vcb6YLzvgLkZQLnYvd47hv3yK/9GmKS9tokuaQ7dCVYckYcIOS09EDTs70YdxUd5WG\ + rQynATCLFP1p/NAGv70r9MK7Cy/mNpjD0r4qC7UEDIoi1kOWzHgnLo19wtvwsb8Fg8ATxcs3Wmtj8hIUYpDx\ + ia4sM10zbytUuaPUAfCDf42IyxdmOnGe1CueXhgI71y+RW0s0argNqUt7jB70JT0o9CyX6UBGRaqLk2MPY9T\ + hUu5J8X3UgIa6rcbWigzohzWm9rdbEHFrSWqjpfQYMaAKQQgETrjSy4XTrp2RhC2oNqG/hylI4ab+F4X6fpH\ + DYP1NqNMP5g36xNu7LhDnrUB5qsPjYOmWORxGLfudpF3oLYCSlr3DgHqEIB6HjQblLZ3KQuPBse3zxyROTnS\ + AhdPH4a/z1wioFtKNVph3hecsiKEdqnz4Y2coSIdhz58mJ9JWNQoFAENE5CSsoEZAGvafYZVpW4C75YY2zq1\ + wIeiFi1dT43/jLAUGkslsi1VvnyfUu8qO404RxYO3XHoGLMFoFLOO+lZ+VGci2Vz10AhxJhEBHxRKxw4k2uB\ + HztoSJUr/2Y\n\ + -----END MEGOLM SESSION DATA-----"; + + let own_user_id = user_id!("@example:morheus.localhost"); + let olm_machine = OlmMachine::new(own_user_id, "SomeDeviceId".into()).await; + + let utd_hook_manager = Arc::new(UtdHookManager::new(utd_hook, client)); + + let timeline = Arc::new( + TestTimelineBuilder::new() + .unable_to_decrypt_hook(utd_hook_manager) + .provider(TestRoomDataProvider::default().with_decryptor(TestDecryptor::new( + room_id!("!DovneieKSTkdHKpIXy:morpheus.localhost"), + &olm_machine, + ))) + .build(), + ); + + let mut stream = timeline.subscribe_events().await; + + let original_event_id = event_id!("$original"); + + let timeline_clone = timeline.clone(); + let handle_event_handle = spawn(async move { + let f = &timeline_clone.factory; + timeline_clone + .handle_live_event( + f.event(RoomEncryptedEventContent::new( + EncryptedEventScheme::MegolmV1AesSha2( + MegolmV1AesSha2ContentInit { + ciphertext: "\ + AwgAEtABPRMavuZMDJrPo6pGQP4qVmpcuapuXtzKXJyi3YpEsjSWdzuRKIgJzD4P\ + cSqJM1A8kzxecTQNJsC5q22+KSFEPxPnI4ltpm7GFowSoPSW9+bFdnlfUzEP1jPq\ + YevHAsMJp2fRKkzQQbPordrUk1gNqEpGl4BYFeRqKl9GPdKFwy45huvQCLNNueql\ + CFZVoYMuhxrfyMiJJAVNTofkr2um2mKjDTlajHtr39pTG8k0eOjSXkLOSdZvNOMz\ + hGhSaFNeERSA2G2YbeknOvU7MvjiO0AKuxaAe1CaVhAI14FCgzrJ8g0y5nly+n7x\ + QzL2G2Dn8EoXM5Iqj8W99iokQoVsSrUEnaQ1WnSIfewvDDt4LCaD/w7PGETMCQ" + .to_owned(), + sender_key: "DeHIg4gwhClxzFYcmNntPNF9YtsdZbmMy8+3kzCMXHA".to_owned(), + device_id: "NLAZCWIOCO".into(), + session_id: SESSION_ID.into(), + } + .into(), + ), + None, + )) + .event_id(original_event_id) + .sender(&BOB) + .into_utd_sync_timeline_event(), + ) + .await + }); + + // The timeline has received the event and it is stuck in the DelayingStore, so + // the event won't actually appear in the stream yet. + assert_pending!(stream); + + // Import a room key backup. + let exported_keys = decrypt_room_key_export(Cursor::new(SESSION_KEY), "1234").unwrap(); + olm_machine.store().import_exported_room_keys(exported_keys, |_, _| {}).await.unwrap(); + + // Now we have the key we need to decrypt the event. + // Unblock the timeline from receiving the event. + state_store.stop_delaying(); + handle_event_handle.await.expect("Failed to wait for the handle_event handle"); + + // The event appears, and it is UTD as expected. + { + let event = + assert_next_matches_with_timeout!(stream, VectorDiff::PushBack { value } => value); + assert_let!( + TimelineItemContent::MsgLike(MsgLikeContent { + kind: MsgLikeKind::UnableToDecrypt(EncryptedMessage::MegolmV1AesSha2 { + session_id, + .. + }), + .. + }) = event.content() + ); + assert_eq!(session_id, SESSION_ID); + } + + // And this is what we are testing: we triggered a redecryption, so it later + // appears as decrypted, even though the arrival of the key raced with + // adding the event to the timeline. + { + let event = + assert_next_matches_with_timeout!(stream, VectorDiff::Set { index: 0, value } => value); + + //assert_matches!(event.encryption_info(), Some(_)); + assert_let!(Some(message) = event.content().as_message()); + assert_eq!(message.body(), "It's a secret to everybody"); + assert!(!event.is_highlighted()); + } +} + fn utd_event_with_unsigned(unsigned: serde_json::Value) -> TimelineEvent { let raw = Raw::from_json( to_raw_value(&json!({ @@ -971,3 +1471,16 @@ fn utd_event_with_unsigned(unsigned: serde_json::Value) -> TimelineEvent { }, ) } +/// Creates a `TestTimeline` that doesn't crash when we attempt +/// redecryption, but doesn't actually care about the user ID or room ID. +async fn timeline_with_decryptor() -> TestTimeline { + let own_user_id = user_id!("@example:morheus.localhost"); + let olm_machine = OlmMachine::new(own_user_id, "SomeDeviceId".into()).await; + + TestTimelineBuilder::new() + .provider(TestRoomDataProvider::default().with_decryptor(TestDecryptor::new( + room_id!("!DovneieKSTkdHKpIXy:morpheus.localhost"), + &olm_machine, + ))) + .build() +} From 917a6f6ed51056685798742b7a9b159d079ba019 Mon Sep 17 00:00:00 2001 From: Andy Balaam Date: Wed, 10 Sep 2025 15:01:24 +0100 Subject: [PATCH 2/2] DRAFT code to make the test pass --- .../timeline/controller/state_transaction.rs | 135 +++++++++++++++++- .../src/timeline/tests/shields.rs | 15 +- .../tests/integration/timeline/decryption.rs | 11 +- 3 files changed, 153 insertions(+), 8 deletions(-) diff --git a/crates/matrix-sdk-ui/src/timeline/controller/state_transaction.rs b/crates/matrix-sdk-ui/src/timeline/controller/state_transaction.rs index f016be36c7e..5670b6d94e4 100644 --- a/crates/matrix-sdk-ui/src/timeline/controller/state_transaction.rs +++ b/crates/matrix-sdk-ui/src/timeline/controller/state_transaction.rs @@ -23,7 +23,7 @@ use ruma::{ EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId, UserId, events::AnySyncTimelineEvent, push::Action, serde::Raw, }; -use tracing::{debug, instrument, trace, warn}; +use tracing::{debug, error, instrument, trace, warn}; use super::{ super::{ @@ -589,6 +589,8 @@ impl<'a, P: RoomDataProvider> TimelineStateTransaction<'a, P> { settings: &TimelineSettings, date_divider_adjuster: &mut DateDividerAdjuster, ) -> RemovedItem { + let event_clone = event.clone(); + let is_highlighted = event.push_actions().is_some_and(|actions| actions.iter().any(Action::is_highlight)); @@ -651,7 +653,7 @@ impl<'a, P: RoomDataProvider> TimelineStateTransaction<'a, P> { .map(|utd_info| (utd_info, self.meta.unable_to_decrypt_hook.as_ref())), in_reply_to, thread_root, - thread_summary, + thread_summary.clone(), ) .await, should_add, @@ -687,7 +689,7 @@ impl<'a, P: RoomDataProvider> TimelineStateTransaction<'a, P> { let sender_profile = room_data_provider.profile_from_user_id(&sender).await; let ctx = TimelineEventContext { - sender, + sender: sender.clone(), sender_profile, timestamp, read_receipts: if settings.track_read_receipts && should_add { @@ -703,8 +705,8 @@ impl<'a, P: RoomDataProvider> TimelineStateTransaction<'a, P> { flow: Flow::Remote { event_id: event_id.clone(), raw_event: raw, - encryption_info, - txn_id, + encryption_info: encryption_info.clone(), + txn_id: txn_id.clone(), position, }, should_add_new_items: should_add, @@ -730,11 +732,134 @@ impl<'a, P: RoomDataProvider> TimelineStateTransaction<'a, P> { self.items.remove(timeline_item_index); item_removed = true; } + } else { + // Just in case the message key arrived while we were adding this event to the + // timeline, attempt to redecrypt it immediately. + let redecrypted = self.reattempt_decrypt_if_utd(event_clone, room_data_provider).await; + if let Some(redecrypted) = redecrypted { + self.add_or_update_remote_event( + EventMeta::new(event_id.clone(), should_add), + Some(&sender), + Some(timestamp), + position, + room_data_provider, + settings, + ) + .await; + + let bundled_edit_encryption_info = + redecrypted.kind.unsigned_encryption_map().and_then(|map| { + map.get(&UnsignedEventLocation::RelationsReplace)? + .encryption_info() + .cloned() + }); + + let (raw, utd_info) = match redecrypted.kind.clone() { + TimelineEventKind::UnableToDecrypt { utd_info, event: redecrypted } => { + (redecrypted.clone(), Some(utd_info)) + } + _ => (redecrypted.kind.clone().into_raw(), None), + }; + + let event = redecrypted.raw().deserialize().unwrap(); + + let sender_profile = room_data_provider.profile_from_user_id(&sender).await; + let (in_reply_to, thread_root) = self.meta.process_event_relations( + &event, + &raw, + bundled_edit_encryption_info, + &self.items, + matches!(self.focus, TimelineFocusKind::Thread { .. }), + ); + + let should_add = self.should_add_event_item( + room_data_provider, + settings, + &event, + thread_root.as_deref(), + position, + ); + let timeline_action = TimelineAction::from_event( + event, + &raw, + room_data_provider, + utd_info, + &self.meta, + in_reply_to, + thread_root, + thread_summary, + ) + .await; + + let timeline_action = timeline_action.unwrap(); + + let ctx = TimelineEventContext { + sender: sender.clone(), + sender_profile, + timestamp, + read_receipts: if settings.track_read_receipts && should_add { + self.meta.read_receipts.compute_event_receipts( + &event_id, + &mut self.items, + matches!(position, TimelineItemPosition::End { .. }), + ) + } else { + Default::default() + }, + is_highlighted, + flow: Flow::Remote { + event_id: event_id.clone(), + raw_event: raw, + encryption_info, + txn_id, + position, + }, + should_add_new_items: should_add, + }; + + TimelineEventHandler::new(self, ctx) + .handle_event(date_divider_adjuster, timeline_action) + .await; + } } item_removed } + /// If the supplied [`TimelineEvent`] is a UTD (unable to decrypt), attempt + /// to redecrypt it, and return the decrypted event if we succeed. + /// Otherwise, return None. + async fn reattempt_decrypt_if_utd( + &self, + event: TimelineEvent, + room_data_provider: &P, + ) -> Option { + if let TimelineEventKind::UnableToDecrypt { event: utd, .. } = &event.kind { + // This is a UTD - attempt to decrypt it. + let push_ctx = room_data_provider.push_context().await; + let push_ctx = push_ctx.as_ref(); + match room_data_provider.decrypt_event_impl(utd, push_ctx).await { + Ok(decrypted_event) => { + // The decryption process did not error, but may have returned us a UTD if we + // failed to decrypt. Only return `Some` if we actually decrypted the event. + match &decrypted_event.kind { + TimelineEventKind::Decrypted(_) => Some(decrypted_event), + TimelineEventKind::UnableToDecrypt { .. } => None, + TimelineEventKind::PlainText { .. } => None, + } + } + Err(e) => { + // If, for some reason, we hit an error while trying to decrypt, just log it + error!("Error while decrypting event: {e}"); + None + } + } + } else { + // This event is not a UTD + None + } + } + /// Remove one timeline item by its `event_index`. fn remove_timeline_item( &mut self, diff --git a/crates/matrix-sdk-ui/src/timeline/tests/shields.rs b/crates/matrix-sdk-ui/src/timeline/tests/shields.rs index ee25822fc46..1c25ef3b9c2 100644 --- a/crates/matrix-sdk-ui/src/timeline/tests/shields.rs +++ b/crates/matrix-sdk-ui/src/timeline/tests/shields.rs @@ -1,5 +1,6 @@ use assert_matches::assert_matches; use eyeball_im::VectorDiff; +use matrix_sdk::crypto::OlmMachine; use matrix_sdk_base::deserialized_responses::{ShieldState, ShieldStateCode}; use matrix_sdk_test::{ALICE, async_test, event_factory::EventFactory}; use ruma::{ @@ -13,12 +14,13 @@ use ruma::{ message::RoomMessageEventContent, }, }, + room_id, user_id, }; use stream_assert::{assert_next_matches, assert_pending}; use crate::timeline::{ EventSendState, - tests::{TestTimeline, TestTimelineBuilder}, + tests::{TestDecryptor, TestRoomDataProvider, TestTimeline, TestTimelineBuilder}, }; #[async_test] @@ -134,7 +136,16 @@ async fn test_local_sent_in_clear_shield() { /// sent in clear` red warning. async fn test_utd_shield() { // Given we are in an encrypted room - let timeline = TestTimelineBuilder::new().room_encrypted(true).build(); + let own_user_id = user_id!("@example:morheus.localhost"); + let olm_machine = OlmMachine::new(own_user_id, "SomeDeviceId".into()).await; + + let timeline = TestTimelineBuilder::new() + .provider( + TestRoomDataProvider::default() + .with_decryptor(TestDecryptor::new(room_id!("!r:s.co"), &olm_machine)), + ) + .room_encrypted(true) + .build(); let mut stream = timeline.subscribe().await; let f = &timeline.factory; diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/decryption.rs b/crates/matrix-sdk-ui/tests/integration/timeline/decryption.rs index 27794fc5cd3..835605f9b6a 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/decryption.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/decryption.rs @@ -327,7 +327,7 @@ async fn test_an_utd_from_the_event_cache_as_a_paginated_item_is_decrypted() { // Now, let's look at the updates. We must observe an update reflecting the UTD // has entered the `Timeline`. assert_next_matches_with_timeout!(updates_stream, 250, updates => { - assert_eq!(updates.len(), 2, "Expecting 2 updates from the `Timeline`"); + assert_eq!(updates.len(), 3, "Expecting 2 updates from the `Timeline`"); // UTD! UTD! assert_matches!(&updates[0], VectorDiff::Insert { index: 2, value: event } => { @@ -338,6 +338,15 @@ async fn test_an_utd_from_the_event_cache_as_a_paginated_item_is_decrypted() { }); // UTD is decrypted now! + assert_matches!(&updates[2], VectorDiff::Set { index: 2, value: event } => { + assert_matches!(event.as_event(), Some(event) => { + assert_eq!(event.event_id().unwrap().as_str(), "$ev0"); + assert_matches!(event.content().as_message(), Some(message) => { + assert_eq!(message.body(), "It's a secret to everybody"); + }); + }); + }); + assert_matches!(&updates[1], VectorDiff::Set { index: 2, value: event } => { assert_matches!(event.as_event(), Some(event) => { assert_eq!(event.event_id().unwrap().as_str(), "$ev0");