Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
7167d6f
chore(sqlite): Don't log the room ID twice when saving events
poljar Sep 22, 2025
68b570b
feat: Add a method to get the event type of a TimelineEventKind
poljar Sep 22, 2025
48cca6c
refactor(sqlite): Save the event type of an event in the SQLite event…
poljar Sep 22, 2025
72fe304
feat: Allow events to be fetched by event type
poljar Sep 22, 2025
22fe54d
chore: Fix a comment
poljar Sep 24, 2025
40720c9
chore: Make a Note a NOTE
poljar Sep 24, 2025
dfeedfb
feat(event cache): Add a method to access the linked chunk mutably
poljar Sep 26, 2025
e938469
feat: Create the redecryptor thing
poljar Sep 24, 2025
f3e04d3
WIP
poljar Sep 25, 2025
503fa64
fixup redecryptor
poljar Sep 25, 2025
f6770a3
feat(event_cache): Add the RedecryptorCtx concept
poljar Sep 25, 2025
912a499
fix: Emit event cache updates from the redecryptor
poljar Sep 26, 2025
59d3724
chore: Get rid of the RedecryptorCtx
poljar Sep 29, 2025
e56db05
enable the redecryptor in the event cache
poljar Sep 25, 2025
e5848d9
test: Allow to specify the timeout for assert_recv_with_timeout
poljar Sep 26, 2025
9f16685
test(event cache): Add a test to show that the redecryptor works
poljar Sep 26, 2025
4ad7693
fixup: Move common methods out of the redecryptor into the cache
poljar Oct 1, 2025
0566061
fixup Make the redecryptor return Self
poljar Oct 1, 2025
8d8d588
fixup create a redecryptor in event cache
poljar Oct 1, 2025
e2ab634
fixup redecryptor
poljar Oct 1, 2025
a62fea4
Rejigger things so we can relisten to the room key stream
poljar Oct 2, 2025
e4bf600
refactor: Remove the decryption logic from the timeline
poljar Oct 2, 2025
f70dbc5
fixup redecryptor
poljar Oct 2, 2025
2cfd85f
doc(event cache): Document the redecryptor
poljar Oct 2, 2025
82c7865
doc: Document that the redecryptor sends reports out
poljar Oct 7, 2025
e8c882f
feat: Redecryptor start to send out redecryptor reports
poljar Oct 7, 2025
bab9d9b
feat(ui): Listen to redecryptor reports in the timeline
poljar Oct 7, 2025
54f3b5f
feat(cache): Let the redecryptor listen to room key withheld updates
poljar Oct 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions crates/matrix-sdk-common/src/deserialized_responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,14 @@ impl TimelineEventKind {
TimelineEventKind::PlainText { .. } => None,
}
}

/// Get the event type of this event.
///
/// Returns `None` if there isn't an event type or if the event failed to be
/// deserialized.
pub fn event_type(&self) -> Option<String> {
self.raw().get_field("type").ok().flatten()
}
}

#[cfg(not(tarpaulin_include))]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
-- After the merge of https://github.com/matrix-org/matrix-rust-sdk/pull/5648,
-- we want all events to get a `TimelineEvent::timestamp` value (extracted from
-- `origin_server_ts`).
--
-- To accomplish that, we are emptying the event cache. New synced events will
-- be built correctly, with a valid `TimelineEvent::timestamp`, allowing a
-- clear, stable situation.

DELETE from linked_chunks;
DELETE from event_chunks; -- should be done by cascading
DELETE from gap_chunks; -- should be done by cascading
DELETE from events;

DROP TABLE events;

-- Events and their content.
CREATE TABLE "events" (
-- The room in which the event is located.
"room_id" BLOB NOT NULL,

-- The `OwnedEventId` of this event.
"event_id" BLOB NOT NULL,

-- The event type of this event.
"event_type" BLOB NOT NULL,

-- The ID of the session that was used to encrypt this event, may be null if
-- the event wasn't encrypted.
"session_id" BLOB NULL,

-- JSON serialized `TimelineEvent` (encrypted value).
"content" BLOB NOT NULL,

-- If this event is an aggregation (related event), the event id of the event it relates to.
-- Can be null if this event isn't an aggregation.
"relates_to" BLOB,

-- If this event is an aggregation (related event), the kind of relation it has to the event it
-- relates to.
-- Can be null if this event isn't an aggregation.
"rel_type" BLOB,

-- Primary key is the event ID.
PRIMARY KEY (event_id)
)
WITHOUT ROWID;
53 changes: 43 additions & 10 deletions crates/matrix-sdk-sqlite/src/event_cache_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use crate::{
mod keys {
// Tables
pub const LINKED_CHUNKS: &str = "linked_chunks";
pub const EVENTS: &str = "events";
}

/// The database name.
Expand All @@ -63,7 +64,7 @@ const DATABASE_NAME: &str = "matrix-sdk-event-cache.sqlite3";
/// This is used to figure whether the SQLite database requires a migration.
/// Every new SQL migration should imply a bump of this number, and changes in
/// the [`run_migrations`] function.
const DATABASE_VERSION: u8 = 11;
const DATABASE_VERSION: u8 = 12;

/// The string used to identify a chunk of type events, in the `type` field in
/// the database.
Expand Down Expand Up @@ -472,6 +473,16 @@ async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
.await?;
}

if version < 12 {
conn.with_transaction(|txn| {
txn.execute_batch(include_str!(
"../migrations/event_cache_store/012_store_event_type.sql"
))?;
txn.set_db_version(12)
})
.await?;
}

Ok(())
}

Expand Down Expand Up @@ -632,7 +643,7 @@ impl EventCacheStore for SqliteEventCacheStore {
// deduplicated and moved to another position; or because it was inserted
// outside the context of a linked chunk (e.g. pinned event).
let mut content_statement = txn.prepare(
"INSERT OR REPLACE INTO events(room_id, event_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?)"
"INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)"
)?;

let invalid_event = |event: TimelineEvent| {
Expand All @@ -641,20 +652,27 @@ impl EventCacheStore for SqliteEventCacheStore {
return None;
};

Some((event_id.to_string(), event))
let Some(event_type) = event.kind.event_type() else {
error!(%event_id, "Trying to save an event with no event type");
return None;
};

Some((event_id.to_string(), event_type, event))
};

let room_id = linked_chunk_id.room_id();
let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id);

for (i, (event_id, event)) in items.into_iter().filter_map(invalid_event).enumerate() {
for (i, (event_id, event_type, event)) in items.into_iter().filter_map(invalid_event).enumerate() {
// Insert the location information into the database.
let index = at.index() + i;
chunk_statement.execute((chunk_id, &hashed_linked_chunk_id, &event_id, index))?;

let session_id = event.kind.session_id().map(|s| this.encode_key(keys::EVENTS, s));

// Now, insert the event content into the database.
let encoded_event = this.encode_event(&event)?;
content_statement.execute((&hashed_room_id, event_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
content_statement.execute((&hashed_room_id, event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
}
}

Expand All @@ -671,15 +689,22 @@ impl EventCacheStore for SqliteEventCacheStore {
continue;
};

let Some(event_type) = event.kind.event_type() else {
error!(%event_id, "Trying to save an event with no event type");
continue;
};

let session_id = event.kind.session_id().map(|s| this.encode_key(keys::EVENTS, s));

// Replace the event's content. Really we'd like to update, but in case the
// event id changed, we are a bit lenient here and will allow an insertion
// of the new event.
let encoded_event = this.encode_event(&event)?;
let room_id = linked_chunk_id.room_id();
let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id);
txn.execute(
"INSERT OR REPLACE INTO events(room_id, event_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?)"
, (&hashed_room_id, &event_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
"INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)",
(&hashed_room_id, &event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;

// Replace the event id in the linked chunk, in case it changed.
txn.execute(
Expand Down Expand Up @@ -1332,10 +1357,18 @@ impl EventCacheStore for SqliteEventCacheStore {
let _timer = timer!("method");

let Some(event_id) = event.event_id() else {
error!(%room_id, "Trying to save an event with no ID");
error!("Trying to save an event with no ID");
return Ok(());
};

let Some(event_type) = event.kind.event_type() else {
error!(%event_id, "Trying to save an event with no event type");
return Ok(());
};

let event_type = self.encode_key(keys::EVENTS, event_type);
let session_id = event.kind.session_id().map(|s| self.encode_key(keys::EVENTS, s));

let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
let event_id = event_id.to_string();
let encoded_event = self.encode_event(&event)?;
Expand All @@ -1344,8 +1377,8 @@ impl EventCacheStore for SqliteEventCacheStore {
.await?
.with_transaction(move |txn| -> Result<_> {
txn.execute(
"INSERT OR REPLACE INTO events(room_id, event_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?)"
, (&hashed_room_id, &event_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
"INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)",
(&hashed_room_id, &event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;

Ok(())
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ use matrix_sdk::{
crypto::store::types::RoomKeyInfo,
deserialized_responses::TimelineEventKind as SdkTimelineEventKind,
encryption::backups::BackupState,
event_cache::{self, RedecryptorReport},
event_handler::EventHandlerHandle,
executor::{JoinHandle, spawn},
};
use tokio::sync::{
RwLock,
mpsc::{self, Receiver, Sender},
mpsc::{self, Receiver, Sender, UnboundedSender},
};
use tokio_stream::{StreamExt as _, wrappers::errors::BroadcastStreamRecvError};
use tracing::{Instrument as _, debug, error, field, info, info_span, warn};
Expand Down Expand Up @@ -69,6 +70,58 @@ impl Drop for CryptoDropHandles {
}
}

async fn redecryption_report_task(
stream: impl Stream<Item = Result<RedecryptorReport, BroadcastStreamRecvError>>,
timeline_controller: TimelineController,
sender: UnboundedSender<event_cache::DecryptionRetryRequest>,
) {
pin_mut!(stream);

while let Some(report) = stream.next().await {
match report {
Ok(RedecryptorReport::ResolvedUtds { events, .. }) => {
let state = timeline_controller.state.read().await;

if let Some(utd_hook) = &state.meta.unable_to_decrypt_hook {
for event_id in events {
utd_hook.on_late_decrypt(&event_id).await;
}
}
}
Ok(RedecryptorReport::Lagging) | Err(_) => {
// The room key stream lagged or the OlmMachine got regenerated. Let's tell the
// redecryptor which events we have.
let state = timeline_controller.state.read().await;

let (utds, decrypted): (BTreeSet<_>, BTreeSet<_>) = state
.items
.iter()
.filter_map(|event| {
event.as_event().and_then(|e| {
let session_id = e.encryption_info().and_then(|info| info.session_id());
session_id.map(|id| id.to_owned()).zip(Some(e))
})
})
.partition_map(|(session_id, event)| {
if event.content.is_unable_to_decrypt() {
Either::Left(session_id)
} else {
Either::Right(session_id)
}
});

let message = event_cache::DecryptionRetryRequest {
room_id: timeline_controller.room().room_id().to_owned(),
utd_session_ids: utds,
refresh_info_session_ids: decrypted,
};

let _ = sender.send(message);
}
}
}
}

/// The task that handles the room keys from backups.
async fn room_keys_from_backups_task<S>(stream: S, timeline_controller: TimelineController)
where
Expand Down Expand Up @@ -300,42 +353,43 @@ async fn decryption_task<P: RoomDataProvider, D: Decryptor>(
) {
debug!("Decryption task starting.");

while let Some(request) = receiver.recv().await {
let should_retry = |session_id: &str| {
if let Some(session_ids) = &request.session_ids {
session_ids.contains(session_id)
} else {
true
}
};

// Find the indices of events that are in the supplied sessions, distinguishing
// between UTDs which we need to decrypt, and already-decrypted events where we
// only need to re-fetch encryption info.
let mut state = state.write().await;
let (retry_decryption_indices, retry_info_indices) =
compute_event_indices_to_retry_decryption(&state.items, should_retry);

// Retry fetching encryption info for events that are already decrypted
if !retry_info_indices.is_empty() {
debug!("Retrying fetching encryption info");
retry_fetch_encryption_info(&mut state, retry_info_indices, &room_data_provider).await;
}

// Retry decrypting any unable-to-decrypt messages
if !retry_decryption_indices.is_empty() {
debug!("Retrying decryption");
decrypt_by_index(
&mut state,
&request.settings,
&room_data_provider,
request.decryptor,
should_retry,
retry_decryption_indices,
)
.await
}
}
// while let Some(request) = receiver.recv().await {
// let should_retry = |session_id: &str| {
// if let Some(session_ids) = &request.session_ids {
// session_ids.contains(session_id)
// } else {
// true
// }
// };
//
// // Find the indices of events that are in the supplied sessions,
// distinguishing // between UTDs which we need to decrypt, and
// already-decrypted events where we // only need to re-fetch encryption
// info. let mut state = state.write().await;
// let (retry_decryption_indices, retry_info_indices) =
// compute_event_indices_to_retry_decryption(&state.items,
// should_retry);
//
// // Retry fetching encryption info for events that are already decrypted
// if !retry_info_indices.is_empty() {
// debug!("Retrying fetching encryption info");
// retry_fetch_encryption_info(&mut state, retry_info_indices,
// &room_data_provider).await; }
//
// // Retry decrypting any unable-to-decrypt messages
// if !retry_decryption_indices.is_empty() {
// debug!("Retrying decryption");
// decrypt_by_index(
// &mut state,
// &request.settings,
// &room_data_provider,
// request.decryptor,
// should_retry,
// retry_decryption_indices,
// )
// .await
// }
// }

debug!("Decryption task stopping.");
}
Expand Down
Loading
Loading