Skip to content

Commit 2bbf6fc

Browse files
committed
feat(sdk): Add a tasks that listens for historic room keys if they arrive out of order
Historic room key bundles are uploaded as an encrypted file to the media repo and the key to decrypt the file is sent as a to-device message to the recipient device. In the nominal case, the invite and this to-device message should arrive at the same time and accepting the invite would download and import the bundle. If the to-device message arrives after the invite has already been accepted we would never download and import the bundle. To mitigate this problem, this patch introduces a task that listens for bundles that arrive. If the bundle is for a room that we have joined we will consider importing the bundle.
1 parent 1a9e5b9 commit 2bbf6fc

File tree

7 files changed

+101
-17
lines changed

7 files changed

+101
-17
lines changed

crates/matrix-sdk/CHANGELOG.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,13 @@ All notable changes to this project will be documented in this file.
66

77
## [Unreleased] - ReleaseDate
88

9-
### Breaking changes:
9+
### Features
10+
11+
- Add support to accept historic room key bundles that arrive out of order, i.e.
12+
the bundle arrives after the invite has already been accepted.
13+
([#5322](https://github.com/matrix-org/matrix-rust-sdk/pull/5322))
1014

11-
- `OAuth::login` now allows requesting additional scopes for the authorization code grant.
15+
- [**breaking**] `OAuth::login` now allows requesting additional scopes for the authorization code grant.
1216
([#5395](https://github.com/matrix-org/matrix-rust-sdk/pull/5395))
1317

1418
## [0.13.0] - 2025-07-10

crates/matrix-sdk/src/authentication/matrix/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -802,7 +802,7 @@ impl MatrixAuth {
802802
_ => None,
803803
};
804804

805-
self.client.encryption().spawn_initialization_task(auth_data);
805+
self.client.encryption().spawn_initialization_task(auth_data).await;
806806
}
807807

808808
Ok(())

crates/matrix-sdk/src/authentication/oauth/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -817,7 +817,7 @@ impl OAuth {
817817
}
818818

819819
#[cfg(feature = "e2e-encryption")]
820-
self.client.encryption().spawn_initialization_task(None);
820+
self.client.encryption().spawn_initialization_task(None).await;
821821

822822
Ok(())
823823
}
@@ -1031,7 +1031,7 @@ impl OAuth {
10311031
self.enable_cross_process_lock().await.map_err(OAuthError::from)?;
10321032

10331033
#[cfg(feature = "e2e-encryption")]
1034-
self.client.encryption().spawn_initialization_task(None);
1034+
self.client.encryption().spawn_initialization_task(None).await;
10351035
}
10361036

10371037
Ok(())

crates/matrix-sdk/src/authentication/oauth/qrcode/login.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ impl<'a> IntoFuture for LoginWithQrCode<'a> {
253253
// ourselves see us as verified and the recovery/backup states will
254254
// be known. If we did receive all the secrets in the secrets
255255
// bundle, then backups will be enabled after this step as well.
256-
self.client.encryption().spawn_initialization_task(None);
256+
self.client.encryption().spawn_initialization_task(None).await;
257257
self.client.encryption().wait_for_e2ee_initialization_tasks().await;
258258

259259
trace!("successfully logged in and enabled E2EE.");

crates/matrix-sdk/src/client/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,7 @@ impl ClientInner {
414414
let client = Arc::new(client);
415415

416416
#[cfg(feature = "e2e-encryption")]
417-
client.e2ee.initialize_room_key_tasks(&client);
417+
client.e2ee.initialize_tasks(&client);
418418

419419
let _ = client
420420
.event_cache

crates/matrix-sdk/src/encryption/mod.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ use ruma::{
6262
#[cfg(feature = "experimental-send-custom-to-device")]
6363
use ruma::{events::AnyToDeviceEventContent, serde::Raw, to_device::DeviceIdOrAllDevices};
6464
use serde::Deserialize;
65+
use tasks::BundleReceiverTask;
6566
use tokio::sync::{Mutex, RwLockReadGuard};
6667
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
6768
use tracing::{debug, error, instrument, trace, warn};
@@ -134,7 +135,7 @@ impl EncryptionData {
134135
}
135136
}
136137

137-
pub fn initialize_room_key_tasks(&self, client: &Arc<ClientInner>) {
138+
pub fn initialize_tasks(&self, client: &Arc<ClientInner>) {
138139
let weak_client = WeakClient::from_inner(client);
139140

140141
let mut tasks = self.tasks.lock();
@@ -1685,10 +1686,20 @@ impl Encryption {
16851686
/// there is a proposal (MSC3967) to remove this requirement, which would
16861687
/// allow for the initial upload of cross-signing keys without
16871688
/// authentication, rendering this parameter obsolete.
1688-
pub(crate) fn spawn_initialization_task(&self, auth_data: Option<AuthData>) {
1689+
pub(crate) async fn spawn_initialization_task(&self, auth_data: Option<AuthData>) {
1690+
// It's fine to be async here as we're only getting the lock protecting the
1691+
// `OlmMachine`. Since the lock shouldn't be that contested right after logging
1692+
// in we won't delay the login or restoration of the Client.
1693+
let bundle_receiver_task = if self.client.inner.enable_share_history_on_invite {
1694+
Some(BundleReceiverTask::new(&self.client).await)
1695+
} else {
1696+
None
1697+
};
1698+
16891699
let mut tasks = self.client.inner.e2ee.tasks.lock();
16901700

16911701
let this = self.clone();
1702+
16921703
tasks.setup_e2ee = Some(spawn(async move {
16931704
// Update the current state first, so we don't have to wait for the result of
16941705
// network requests
@@ -1707,6 +1718,8 @@ impl Encryption {
17071718
error!("Couldn't setup and resume recovery {e:?}");
17081719
}
17091720
}));
1721+
1722+
tasks.receive_historic_room_key_bundles = bundle_receiver_task;
17101723
}
17111724

17121725
/// Waits for end-to-end encryption initialization tasks to finish, if any

crates/matrix-sdk/src/encryption/tasks.rs

Lines changed: 75 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,23 +14,24 @@
1414

1515
use std::{collections::BTreeMap, sync::Arc, time::Duration};
1616

17+
use futures_core::Stream;
18+
use futures_util::{pin_mut, StreamExt};
19+
use matrix_sdk_base::{crypto::store::types::RoomKeyBundleInfo, RoomState};
1720
use matrix_sdk_common::failures_cache::FailuresCache;
1821
use ruma::{
1922
events::room::encrypted::{EncryptedEventScheme, OriginalSyncRoomEncryptedEvent},
2023
serde::Raw,
2124
OwnedEventId, OwnedRoomId,
2225
};
23-
use tokio::sync::{
24-
mpsc::{self, UnboundedReceiver},
25-
Mutex,
26-
};
27-
use tracing::{debug, trace, warn};
26+
use tokio::sync::{mpsc, Mutex};
27+
use tracing::{debug, info, instrument, trace, warn};
2828

2929
use crate::{
3030
client::WeakClient,
3131
encryption::backups::UploadState,
3232
executor::{spawn, JoinHandle},
33-
Client,
33+
room::shared_room_history,
34+
Client, Room,
3435
};
3536

3637
/// A cache of room keys we already downloaded.
@@ -41,6 +42,7 @@ pub(crate) struct ClientTasks {
4142
pub(crate) upload_room_keys: Option<BackupUploadingTask>,
4243
pub(crate) download_room_keys: Option<BackupDownloadTask>,
4344
pub(crate) update_recovery_state_after_backup: Option<JoinHandle<()>>,
45+
pub(crate) receive_historic_room_key_bundles: Option<BundleReceiverTask>,
4446
pub(crate) setup_e2ee: Option<JoinHandle<()>>,
4547
}
4648

@@ -72,7 +74,7 @@ impl BackupUploadingTask {
7274
let _ = self.sender.send(());
7375
}
7476

75-
pub(crate) async fn listen(client: WeakClient, mut receiver: UnboundedReceiver<()>) {
77+
pub(crate) async fn listen(client: WeakClient, mut receiver: mpsc::UnboundedReceiver<()>) {
7678
while receiver.recv().await.is_some() {
7779
if let Some(client) = client.get() {
7880
let upload_progress = &client.inner.e2ee.backup_state.upload_progress;
@@ -176,7 +178,10 @@ impl BackupDownloadTask {
176178
/// # Arguments
177179
///
178180
/// * `receiver` - The source of incoming [`RoomKeyDownloadRequest`]s.
179-
async fn listen(client: WeakClient, mut receiver: UnboundedReceiver<RoomKeyDownloadRequest>) {
181+
async fn listen(
182+
client: WeakClient,
183+
mut receiver: mpsc::UnboundedReceiver<RoomKeyDownloadRequest>,
184+
) {
180185
let state = Arc::new(Mutex::new(BackupDownloadTaskListenerState::new(client)));
181186

182187
while let Some(room_key_download_request) = receiver.recv().await {
@@ -385,6 +390,68 @@ impl BackupDownloadTaskListenerState {
385390
}
386391
}
387392

393+
pub(crate) struct BundleReceiverTask {
394+
_handle: JoinHandle<()>,
395+
}
396+
397+
impl BundleReceiverTask {
398+
pub async fn new(client: &Client) -> Self {
399+
let stream = client.encryption().historic_room_key_stream().await.expect("E2EE tasks should only be initialized once we have logged in and have access to an OlmMachine");
400+
let weak_client = WeakClient::from_client(client);
401+
let handle = spawn(Self::listen_task(weak_client, stream));
402+
403+
Self { _handle: handle }
404+
}
405+
406+
async fn listen_task(client: WeakClient, stream: impl Stream<Item = RoomKeyBundleInfo>) {
407+
pin_mut!(stream);
408+
409+
// TODO: Listening to this stream is not enough for iOS due to the NSE killing
410+
// our OlmMachine and thus also this stream. We need to add an event handler
411+
// that will listen for the bundle event. To be able to add an event handler,
412+
// we'll have to implement the bundle event in Ruma.
413+
while let Some(bundle_info) = stream.next().await {
414+
let Some(client) = client.get() else {
415+
// The client was dropped while we were waiting on the stream. Let's end the
416+
// loop, since this means that the application has shut down.
417+
break;
418+
};
419+
420+
let Some(room) = client.get_room(&bundle_info.room_id) else {
421+
warn!(room_id = %bundle_info.room_id, "Received a historic room key bundle for an unknown room");
422+
continue;
423+
};
424+
425+
Self::handle_bundle(&room, &bundle_info).await;
426+
}
427+
}
428+
429+
#[instrument(skip(room), fields(room_id = %room.room_id()))]
430+
async fn handle_bundle(room: &Room, bundle_info: &RoomKeyBundleInfo) {
431+
if Self::should_accept_bundle(room, bundle_info) {
432+
info!("Accepting a late key bundle.");
433+
434+
if let Err(e) =
435+
shared_room_history::maybe_accept_key_bundle(room, &bundle_info.sender).await
436+
{
437+
warn!("Couldn't accept a late room key bundle {e:?}");
438+
}
439+
} else {
440+
info!("Refusing to accept a historic room key bundle.");
441+
}
442+
}
443+
444+
fn should_accept_bundle(room: &Room, _bundle_info: &RoomKeyBundleInfo) -> bool {
445+
// TODO: Check that the person that invited us to this room is the same as the
446+
// sender, of the bundle. Otherwise don't ignore the bundle.
447+
// TODO: Check that we joined the room "recently". (How do you do this if you
448+
// accept the invite on another client? I guess we remember when the transition
449+
// from Invited to Joined happened, but can't the server force us into a joined
450+
// state if we do this?
451+
room.state() == RoomState::Joined
452+
}
453+
}
454+
388455
#[cfg(all(test, not(target_family = "wasm")))]
389456
mod test {
390457
use matrix_sdk_test::async_test;

0 commit comments

Comments
 (0)