Skip to content

Commit 12a1ef0

Browse files
committed
feat(sdk): A redacted event in a thread is no longer removed: it's updated.
This patch fixes an invalid behaviour in a thread: when an in-thread event was redacted, it was removed from the thread. This is inconsistent regarding in-room event redaction where a redacted event is updated to its redacted form and the redaction event is added to the timeline. This patch makes the behaviour consistent by updating the redacted event. Basically, it replaces `remove_if_present` (deleted) by `replace_event_if_present` (new).
1 parent a263268 commit 12a1ef0

File tree

4 files changed

+116
-29
lines changed

4 files changed

+116
-29
lines changed

crates/matrix-sdk/src/event_cache/caches/room/state.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1296,7 +1296,7 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> {
12961296
// - or it wasn't, and it's a plain `AnySyncTimelineEvent` in this case.
12971297
target_event.replace_raw(redacted_event.cast_unchecked());
12981298

1299-
self.replace_event_at(location, target_event).await?;
1299+
self.replace_event_at(location, target_event.clone()).await?;
13001300

13011301
// If the redacted event was part of a thread, remove it in the thread linked
13021302
// chunk too, and make sure to update the thread root's summary
@@ -1308,7 +1308,7 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> {
13081308
if let Some(thread_root) = thread_root
13091309
&& let Some(thread_cache) = self.state.threads.get_mut(&thread_root)
13101310
{
1311-
thread_cache.remove_if_present(event_id).await?;
1311+
thread_cache.replace_event_if_present(event_id, target_event).await?;
13121312

13131313
// The number of replies may have changed, so update the thread summary if
13141314
// needs be.

crates/matrix-sdk/src/event_cache/caches/thread/mod.rs

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -159,22 +159,21 @@ impl ThreadEventCache {
159159
Ok(())
160160
}
161161

162-
/// Remove an event from an thread event linked chunk, if it exists.
162+
/// Replaces a single event, be it saved in memory or in the store.
163163
///
164-
/// If the event has been found and removed, then an update will be
165-
/// propagated to observers.
166-
pub(super) async fn remove_if_present(&mut self, event_id: &EventId) -> Result<()> {
164+
/// If it was saved in memory, this will emit a notification to
165+
/// observers that a single item has been replaced. Otherwise,
166+
/// such a notification is not emitted, because observers are
167+
/// unlikely to observe the store updates directly.
168+
pub(super) async fn replace_event_if_present(
169+
&mut self,
170+
event_id: &EventId,
171+
new_event: Event,
172+
) -> Result<()> {
167173
let mut state = self.inner.state.write().await?;
168174

169-
let Some(position) = state.thread_linked_chunk().events().find_map(|(position, event)| {
170-
(event.event_id().as_deref() == Some(event_id)).then_some(position)
171-
}) else {
172-
// Event not found in the linked chunk, nothing to do.
173-
return Ok(());
174-
};
175-
176-
if let Err(err) = state.remove_events(vec![(event_id.to_owned(), position)], vec![]).await {
177-
error!(%err, "a thread linked chunk position was valid a few lines above, but invalid when deleting");
175+
if let Err(err) = state.replace_event_if_present(event_id, new_event).await {
176+
error!(%err, "failed to replace an event");
178177
return Err(err);
179178
}
180179

crates/matrix-sdk/src/event_cache/caches/thread/state.rs

Lines changed: 74 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,10 @@ use matrix_sdk_base::{
2222
ChunkIdentifierGenerator, LinkedChunkId, OwnedLinkedChunkId, Position, Update, lazy_loader,
2323
},
2424
};
25-
use ruma::{OwnedEventId, OwnedRoomId, OwnedUserId};
25+
use matrix_sdk_common::executor::spawn;
26+
use ruma::{EventId, OwnedEventId, OwnedRoomId, OwnedUserId};
2627
use tokio::sync::broadcast::Sender;
27-
use tracing::{debug, error, instrument};
28+
use tracing::{debug, error, instrument, trace};
2829

2930
use super::{
3031
super::{
@@ -33,7 +34,7 @@ use super::{
3334
deduplicator::{DeduplicationOutcome, filter_duplicate_events},
3435
persistence::{load_linked_chunk_metadata, send_updates_to_store},
3536
},
36-
TimelineVectorDiffs,
37+
EventLocation, TimelineVectorDiffs,
3738
event_linked_chunk::{EventLinkedChunk, sort_positions_descending},
3839
lock,
3940
room::RoomEventCacheLinkedChunkUpdate,
@@ -368,6 +369,76 @@ impl<'a> ThreadEventCacheStateLockWriteGuard<'a> {
368369
Ok(())
369370
}
370371

372+
async fn find_event(&self, event_id: &EventId) -> Result<Option<(EventLocation, Event)>> {
373+
// There are supposedly fewer events loaded in memory than in the store. Let's
374+
// start by looking up in the `EventLinkedChunk`.
375+
for (position, event) in self.thread_linked_chunk.revents() {
376+
if event.event_id().as_deref() == Some(event_id) {
377+
return Ok(Some((EventLocation::Memory(position), event.clone())));
378+
}
379+
}
380+
381+
Ok(self
382+
.store
383+
.find_event(&self.room_id, event_id)
384+
.await?
385+
.map(|event| (EventLocation::Store, event)))
386+
}
387+
388+
/// Replaces a single event, be it saved in memory or in the store.
389+
///
390+
/// If it was saved in memory, this will emit a notification to
391+
/// observers that a single item has been replaced. Otherwise,
392+
/// such a notification is not emitted, because observers are
393+
/// unlikely to observe the store updates directly.
394+
pub async fn replace_event_if_present(
395+
&mut self,
396+
event_id: &EventId,
397+
new_event: Event,
398+
) -> Result<()> {
399+
let Some((location, _event)) = self.find_event(event_id).await? else {
400+
trace!("redacted event is missing from the thread linked chunk");
401+
return Ok(());
402+
};
403+
404+
match location {
405+
EventLocation::Memory(position) => {
406+
self.state
407+
.thread_linked_chunk
408+
.replace_event_at(position, new_event)
409+
.expect("should have been a valid position of an item");
410+
// We just changed the in-memory representation; synchronize this with
411+
// the store.
412+
self.propagate_changes().await?;
413+
}
414+
EventLocation::Store => {
415+
self.save_events([new_event]).await?;
416+
}
417+
}
418+
419+
Ok(())
420+
}
421+
422+
/// Save events into the database, without notifying observers.
423+
pub async fn save_events(&mut self, events: impl IntoIterator<Item = Event>) -> Result<()> {
424+
let store = self.store.clone();
425+
let room_id = self.state.room_id.clone();
426+
let events = events.into_iter().collect::<Vec<_>>();
427+
428+
// Spawn a task so the save is uninterrupted by task cancellation.
429+
spawn(async move {
430+
for event in events {
431+
store.save_event(&room_id, event).await?;
432+
}
433+
434+
Result::Ok(())
435+
})
436+
.await
437+
.expect("joining failed")?;
438+
439+
Ok(())
440+
}
441+
371442
/// Remove events by their position, in `EventLinkedChunk`.
372443
///
373444
/// This method is purposely isolated because it must ensure that

crates/matrix-sdk/tests/integration/event_cache/threads.rs

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -708,8 +708,15 @@ async fn test_redact_touches_threads() {
708708
assert_eq!(summary.num_replies, 2);
709709
}
710710

711-
assert_eq!(room_events[1].event_id().as_ref(), Some(&thread_resp1));
712-
assert_eq!(room_events[2].event_id().as_ref(), Some(&thread_resp2));
711+
// Second event.
712+
{
713+
assert_eq!(room_events[1].event_id().as_ref(), Some(&thread_resp1));
714+
}
715+
716+
// Third event.
717+
{
718+
assert_eq!(room_events[2].event_id().as_ref(), Some(&thread_resp2));
719+
}
713720

714721
assert!(thread_stream.is_empty());
715722
assert!(room_stream.is_empty());
@@ -724,17 +731,21 @@ async fn test_redact_touches_threads() {
724731
)
725732
.await;
726733

727-
// The redaction affects the thread cache: it *removes* the redacted event.
734+
// The redaction affects the thread cache: it updates the redacted event.
728735
{
729736
assert_let_timeout!(Ok(TimelineVectorDiffs { diffs, .. }) = thread_stream.recv());
730737
assert_eq!(diffs.len(), 1);
731-
assert_let!(VectorDiff::Remove { index: 1 } = &diffs[0]);
738+
739+
assert_let!(VectorDiff::Set { index: 1, value: new_event } = &diffs[0]);
740+
741+
let deserialized = new_event.raw().deserialize().unwrap();
742+
assert!(deserialized.is_redacted());
732743

733744
assert!(thread_stream.is_empty());
734745
}
735746

736747
// The redaction affects the room cache too:
737-
// - the redaction event is pushed to the room history,
748+
// - the redaction event is added to the “timeline”,
738749
// - the redaction's target is, well, redacted,
739750
// - the thread summary is updated correctly.
740751
{
@@ -745,9 +756,11 @@ async fn test_redact_touches_threads() {
745756
assert_eq!(diffs.len(), 3);
746757

747758
// The redaction event is appended to the room cache.
748-
assert_let!(VectorDiff::Append { values: new_events } = &diffs[0]);
749-
assert_eq!(new_events.len(), 1);
750-
assert_eq!(new_events[0].event_id().as_deref(), Some(thread_resp1_redaction));
759+
{
760+
assert_let!(VectorDiff::Append { values: new_events } = &diffs[0]);
761+
assert_eq!(new_events.len(), 1);
762+
assert_eq!(new_events[0].event_id().as_deref(), Some(thread_resp1_redaction));
763+
}
751764

752765
// The room event is redacted.
753766
{
@@ -776,17 +789,21 @@ async fn test_redact_touches_threads() {
776789
)
777790
.await;
778791

779-
// The redaction affects the thread cache: it *removes* the redacted event.
792+
// The redaction affects the thread cache: it updates the redacted event.
780793
{
781794
assert_let_timeout!(Ok(TimelineVectorDiffs { diffs, .. }) = thread_stream.recv());
782795
assert_eq!(diffs.len(), 1);
783-
assert_let!(VectorDiff::Remove { index: 1 } = &diffs[0]);
796+
797+
assert_let!(VectorDiff::Set { index: 2, value: new_event } = &diffs[0]);
798+
799+
let deserialized = new_event.raw().deserialize().unwrap();
800+
assert!(deserialized.is_redacted());
784801

785802
assert!(thread_stream.is_empty());
786803
}
787804

788805
// The redaction affects the room cache too:
789-
// - the redaction event is pushed to the room history,
806+
// - the redaction event is added to the “timeline”,
790807
// - the redaction's target is, well, redacted,
791808
// - the thread summary is removed from the thread root.
792809
{

0 commit comments

Comments
 (0)