Skip to content
Merged
54 changes: 52 additions & 2 deletions crates/matrix-sdk/src/event_cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@ use eyeball_im::VectorDiff;
use futures_util::future::{join_all, try_join_all};
use matrix_sdk_base::{
deserialized_responses::{AmbiguityChange, TimelineEvent},
event_cache::store::{EventCacheStoreError, EventCacheStoreLock},
linked_chunk::lazy_loader::LazyLoaderError,
event_cache::{
store::{EventCacheStoreError, EventCacheStoreLock},
Gap,
},
linked_chunk::{self, lazy_loader::LazyLoaderError, OwnedLinkedChunkId},
store_locks::LockStoreError,
sync::RoomUpdates,
timer,
Expand Down Expand Up @@ -168,6 +171,7 @@ impl EventCache {
/// Create a new [`EventCache`] for the given client.
pub(crate) fn new(client: WeakClient, event_cache_store: EventCacheStoreLock) -> Self {
let (generic_update_sender, _) = channel(32);
let (linked_chunk_update_sender, _) = channel(32);

Self {
inner: Arc::new(EventCacheInner {
Expand All @@ -178,6 +182,7 @@ impl EventCache {
drop_handles: Default::default(),
auto_shrink_sender: Default::default(),
generic_update_sender,
linked_chunk_update_sender,
}),
}
}
Expand Down Expand Up @@ -428,6 +433,14 @@ struct EventCacheInner {
/// See doc comment of [`RoomEventCacheGenericUpdate`] and
/// [`EventCache::subscribe_to_room_generic_updates`].
generic_update_sender: Sender<RoomEventCacheGenericUpdate>,

/// A sender for a persisted linked chunk update.
///
/// This is used to notify that some linked chunk has persisted some updates
/// to a store, and can be used by observers to look for new events.
///
/// See doc comment of [`RoomEventCacheLinkedChunkUpdate`].
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
}

type AutoShrinkChannelPayload = OwnedRoomId;
Expand Down Expand Up @@ -591,6 +604,7 @@ impl EventCacheInner {
let room_state = RoomEventCacheState::new(
room_id.to_owned(),
room_version_rules,
self.linked_chunk_update_sender.clone(),
self.store.clone(),
pagination_status.clone(),
)
Expand Down Expand Up @@ -657,6 +671,42 @@ pub struct RoomEventCacheGenericUpdate {
pub room_id: OwnedRoomId,
}

/// An update being triggered when events change in the persisted event cache
/// for any room.
#[derive(Clone, Debug)]
struct RoomEventCacheLinkedChunkUpdate {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This type isn't tested in this commit neither the next commit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by a type that's not tested? Do you mean testing the methods on the type? As the method is tested in the integration with the thread subscriber task, and the type itself is mostly POD, I'm not sure to see the value in testing it in isolation…

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's tested via an integration test, that's good.

/// The linked chunk affected by the update.
linked_chunk: OwnedLinkedChunkId,

/// A vector of all the updates that happened during this update.
updates: Vec<linked_chunk::Update<TimelineEvent, Gap>>,
}

impl RoomEventCacheLinkedChunkUpdate {
/// Return all the new events propagated by this update, in topological
/// order.
pub fn events(self) -> Vec<TimelineEvent> {
self.updates
.into_iter()
.flat_map(|update| match update {
linked_chunk::Update::PushItems { items, .. } => items,
linked_chunk::Update::ReplaceItem { item, .. } => vec![item],
linked_chunk::Update::RemoveItem { .. }
| linked_chunk::Update::DetachLastItems { .. }
| linked_chunk::Update::StartReattachItems
| linked_chunk::Update::EndReattachItems
| linked_chunk::Update::NewItemsChunk { .. }
| linked_chunk::Update::NewGapChunk { .. }
| linked_chunk::Update::RemoveChunk(..)
| linked_chunk::Update::Clear => {
// All these updates don't contain any new event.
vec![]
}
})
.collect()
}
}

/// An update related to events happened in a room.
#[derive(Debug, Clone)]
pub enum RoomEventCacheUpdate {
Expand Down
35 changes: 27 additions & 8 deletions crates/matrix-sdk/src/event_cache/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,8 @@ mod private {
},
linked_chunk::{
lazy_loader::{self},
ChunkContent, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId, Position, Update,
ChunkContent, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId,
OwnedLinkedChunkId, Position, Update,
},
serde_helpers::extract_thread_root,
sync::Timeline,
Expand All @@ -632,7 +633,7 @@ mod private {
serde::Raw,
EventId, OwnedEventId, OwnedRoomId,
};
use tokio::sync::broadcast::Receiver;
use tokio::sync::broadcast::{Receiver, Sender};
use tracing::{debug, error, instrument, trace, warn};

use super::{
Expand All @@ -642,7 +643,8 @@ mod private {
};
use crate::event_cache::{
deduplicator::filter_duplicate_events, room::threads::ThreadEventCache,
BackPaginationOutcome, RoomPaginationStatus, ThreadEventCacheUpdate,
BackPaginationOutcome, RoomEventCacheLinkedChunkUpdate, RoomPaginationStatus,
ThreadEventCacheUpdate,
};

/// State for a single room's event cache.
Expand Down Expand Up @@ -676,6 +678,10 @@ mod private {

pagination_status: SharedObservable<RoomPaginationStatus>,

/// See doc comment of
/// [`super::super::EventCacheInner::linked_chunk_update_sender`].
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,

/// An atomic count of the current number of subscriber of the
/// [`super::RoomEventCache`].
pub(super) subscriber_count: Arc<AtomicUsize>,
Expand All @@ -694,6 +700,7 @@ mod private {
pub async fn new(
room_id: OwnedRoomId,
room_version_rules: RoomVersionRules,
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
store: EventCacheStoreLock,
pagination_status: SharedObservable<RoomPaginationStatus>,
) -> Result<Self, EventCacheError> {
Expand Down Expand Up @@ -766,6 +773,7 @@ mod private {
waited_for_initial_prev_token: false,
subscriber_count: Default::default(),
pagination_status,
linked_chunk_update_sender,
})
}

Expand Down Expand Up @@ -1235,20 +1243,27 @@ mod private {

let store = self.store.clone();
let room_id = self.room.clone();
let cloned_updates = updates.clone();

spawn(async move {
let store = store.lock().await?;

trace!(?updates, "sending linked chunk updates to the store");
trace!(updates = ?cloned_updates, "sending linked chunk updates to the store");
let linked_chunk_id = LinkedChunkId::Room(&room_id);
store.handle_linked_chunk_updates(linked_chunk_id, updates).await?;
store.handle_linked_chunk_updates(linked_chunk_id, cloned_updates).await?;
trace!("linked chunk updates applied");

super::Result::Ok(())
})
.await
.expect("joining failed")?;

// Forward that the store got updated to observers.
let _ = self.linked_chunk_update_sender.send(RoomEventCacheLinkedChunkUpdate {
linked_chunk: OwnedLinkedChunkId::Room(self.room.clone()),
updates,
});

Ok(())
}

Expand Down Expand Up @@ -1467,9 +1482,13 @@ mod private {
fn get_or_reload_thread(&mut self, root_event_id: OwnedEventId) -> &mut ThreadEventCache {
// TODO: when there's persistent storage, try to lazily reload from disk, if
// missing from memory.
self.threads
.entry(root_event_id.clone())
.or_insert_with(|| ThreadEventCache::new(root_event_id))
self.threads.entry(root_event_id.clone()).or_insert_with(|| {
ThreadEventCache::new(
self.room.clone(),
root_event_id,
self.linked_chunk_update_sender.clone(),
)
})
}

#[instrument(skip_all)]
Expand Down
45 changes: 40 additions & 5 deletions crates/matrix-sdk/src/event_cache/room/threads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ use std::collections::BTreeSet;
use eyeball_im::VectorDiff;
use matrix_sdk_base::{
event_cache::{Event, Gap},
linked_chunk::{ChunkContent, Position},
linked_chunk::{ChunkContent, OwnedLinkedChunkId, Position},
};
use ruma::OwnedEventId;
use ruma::{OwnedEventId, OwnedRoomId};
use tokio::sync::broadcast::{Receiver, Sender};
use tracing::trace;

use crate::event_cache::{
deduplicator::DeduplicationOutcome,
room::{events::EventLinkedChunk, LoadMoreEventsBackwardsOutcome},
BackPaginationOutcome, EventsOrigin,
BackPaginationOutcome, EventsOrigin, RoomEventCacheLinkedChunkUpdate,
};

/// An update coming from a thread event cache.
Expand All @@ -42,6 +42,9 @@ pub struct ThreadEventCacheUpdate {

/// All the information related to a single thread.
pub(crate) struct ThreadEventCache {
/// The room owning this thread.
room_id: OwnedRoomId,

/// The ID of the thread root event, which is the first event in the thread
/// (and eventually the first in the linked chunk).
thread_root: OwnedEventId,
Expand All @@ -51,12 +54,24 @@ pub(crate) struct ThreadEventCache {

/// A sender for live events updates in this thread.
sender: Sender<ThreadEventCacheUpdate>,

linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
}

impl ThreadEventCache {
/// Create a new empty thread event cache.
pub fn new(thread_root: OwnedEventId) -> Self {
Self { chunk: EventLinkedChunk::new(), sender: Sender::new(32), thread_root }
pub fn new(
room_id: OwnedRoomId,
thread_root: OwnedEventId,
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
) -> Self {
Self {
chunk: EventLinkedChunk::new(),
sender: Sender::new(32),
room_id,
thread_root,
linked_chunk_update_sender,
}
}

/// Subscribe to live events from this thread.
Expand All @@ -78,6 +93,22 @@ impl ThreadEventCache {
}
}

// TODO(bnjbvr): share more code with `RoomEventCacheState` to avoid the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this TODO addressed later? (just adding a comment to see if that's the case)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope it is not. I'd expect all these refactorings to happen around the same time, when we unify backpagination in threads and in the main room -> when persisting threads happens.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem.

// duplication here too.
fn propagate_changes(&mut self) {
// This is a lie, at the moment! We're not persisting threads yet, so we're just
// forwarding all updates to the linked chunk update sender.
let updates = self.chunk.store_updates().take();

let _ = self.linked_chunk_update_sender.send(RoomEventCacheLinkedChunkUpdate {
updates,
linked_chunk: OwnedLinkedChunkId::Thread(
self.room_id.clone(),
self.thread_root.clone(),
),
});
}

/// Push some live events to this thread, and propagate the updates to
/// the listeners.
pub fn add_live_events(&mut self, events: Vec<Event>) {
Expand All @@ -104,6 +135,8 @@ impl ThreadEventCache {

self.chunk.push_live_events(None, &events);

self.propagate_changes();

let diffs = self.chunk.updates_as_vector_diffs();
if !diffs.is_empty() {
let _ = self.sender.send(ThreadEventCacheUpdate { diffs, origin: EventsOrigin::Sync });
Expand Down Expand Up @@ -249,6 +282,8 @@ impl ThreadEventCache {
// Add the paginated events to the thread chunk.
let reached_start = self.chunk.finish_back_pagination(prev_gap_id, new_gap, &events);

self.propagate_changes();

// Notify observers about the updates.
let updates = self.chunk.updates_as_vector_diffs();
if !updates.is_empty() {
Expand Down