Skip to content

Commit d568c07

Browse files
committed
feat(event cache): add a way to subscribe to any room's linked chunk updates
1 parent 13c30f6 commit d568c07

File tree

3 files changed

+119
-15
lines changed

3 files changed

+119
-15
lines changed

crates/matrix-sdk/src/event_cache/mod.rs

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,11 @@ use eyeball_im::VectorDiff;
3838
use futures_util::future::{join_all, try_join_all};
3939
use matrix_sdk_base::{
4040
deserialized_responses::{AmbiguityChange, TimelineEvent},
41-
event_cache::store::{EventCacheStoreError, EventCacheStoreLock},
42-
linked_chunk::lazy_loader::LazyLoaderError,
41+
event_cache::{
42+
store::{EventCacheStoreError, EventCacheStoreLock},
43+
Gap,
44+
},
45+
linked_chunk::{self, lazy_loader::LazyLoaderError, OwnedLinkedChunkId},
4346
store_locks::LockStoreError,
4447
sync::RoomUpdates,
4548
timer,
@@ -175,6 +178,7 @@ impl EventCache {
175178
/// Create a new [`EventCache`] for the given client.
176179
pub(crate) fn new(client: WeakClient, event_cache_store: EventCacheStoreLock) -> Self {
177180
let (generic_update_sender, _) = channel(32);
181+
let (linked_chunk_update_sender, _) = channel(32);
178182

179183
Self {
180184
inner: Arc::new(EventCacheInner {
@@ -185,6 +189,7 @@ impl EventCache {
185189
drop_handles: Default::default(),
186190
auto_shrink_sender: Default::default(),
187191
generic_update_sender,
192+
linked_chunk_update_sender,
188193
}),
189194
}
190195
}
@@ -435,6 +440,14 @@ struct EventCacheInner {
435440
/// See doc comment of [`RoomEventCacheGenericUpdate`] and
436441
/// [`EventCache::subscribe_to_room_generic_updates`].
437442
generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
443+
444+
/// A sender for a persisted linked chunk update.
445+
///
446+
/// This is used to notify that some linked chunk has persisted some updates
447+
/// to a store, and can be used by observers to look for new events.
448+
///
449+
/// See doc comment of [`RoomEventCacheLinkedChunkUpdate`].
450+
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
438451
}
439452

440453
type AutoShrinkChannelPayload = OwnedRoomId;
@@ -598,6 +611,7 @@ impl EventCacheInner {
598611
let room_state = RoomEventCacheState::new(
599612
room_id.to_owned(),
600613
room_version_rules,
614+
self.linked_chunk_update_sender.clone(),
601615
self.store.clone(),
602616
pagination_status.clone(),
603617
)
@@ -664,6 +678,42 @@ pub struct RoomEventCacheGenericUpdate {
664678
pub room_id: OwnedRoomId,
665679
}
666680

681+
/// An update being triggered when events change in the persisted event cache
682+
/// for any room.
683+
#[derive(Clone, Debug)]
684+
struct RoomEventCacheLinkedChunkUpdate {
685+
/// The linked chunk affected by the update.
686+
linked_chunk: OwnedLinkedChunkId,
687+
688+
/// A vector of all the updates that happened during this update.
689+
updates: Vec<linked_chunk::Update<TimelineEvent, Gap>>,
690+
}
691+
692+
impl RoomEventCacheLinkedChunkUpdate {
693+
/// Return all the new events propagated by this update, in topological
694+
/// order.
695+
pub fn events(self) -> Vec<TimelineEvent> {
696+
self.updates
697+
.into_iter()
698+
.flat_map(|update| match update {
699+
linked_chunk::Update::PushItems { items, .. } => items,
700+
linked_chunk::Update::ReplaceItem { item, .. } => vec![item],
701+
linked_chunk::Update::RemoveItem { .. }
702+
| linked_chunk::Update::DetachLastItems { .. }
703+
| linked_chunk::Update::StartReattachItems
704+
| linked_chunk::Update::EndReattachItems
705+
| linked_chunk::Update::NewItemsChunk { .. }
706+
| linked_chunk::Update::NewGapChunk { .. }
707+
| linked_chunk::Update::RemoveChunk(..)
708+
| linked_chunk::Update::Clear => {
709+
// All these updates don't contain any new event.
710+
vec![]
711+
}
712+
})
713+
.collect()
714+
}
715+
}
716+
667717
/// An update related to events happened in a room.
668718
#[derive(Debug, Clone)]
669719
pub enum RoomEventCacheUpdate {

crates/matrix-sdk/src/event_cache/room/mod.rs

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -633,7 +633,8 @@ mod private {
633633
},
634634
linked_chunk::{
635635
lazy_loader::{self},
636-
ChunkContent, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId, Position, Update,
636+
ChunkContent, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId,
637+
OwnedLinkedChunkId, Position, Update,
637638
},
638639
serde_helpers::extract_thread_root,
639640
sync::Timeline,
@@ -650,7 +651,7 @@ mod private {
650651
serde::Raw,
651652
EventId, OwnedEventId, OwnedRoomId,
652653
};
653-
use tokio::sync::broadcast::Receiver;
654+
use tokio::sync::broadcast::{Receiver, Sender};
654655
use tracing::{debug, error, instrument, trace, warn};
655656

656657
use super::{
@@ -660,7 +661,8 @@ mod private {
660661
};
661662
use crate::event_cache::{
662663
deduplicator::filter_duplicate_events, room::threads::ThreadEventCache,
663-
BackPaginationOutcome, RoomPaginationStatus, ThreadEventCacheUpdate,
664+
BackPaginationOutcome, RoomEventCacheLinkedChunkUpdate, RoomPaginationStatus,
665+
ThreadEventCacheUpdate,
664666
};
665667
#[cfg(feature = "experimental-search")]
666668
use crate::Room;
@@ -696,6 +698,10 @@ mod private {
696698

697699
pagination_status: SharedObservable<RoomPaginationStatus>,
698700

701+
/// See doc comment of
702+
/// [`super::super::EventCacheInner::linked_chunk_update_sender`].
703+
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
704+
699705
/// An atomic count of the current number of subscriber of the
700706
/// [`super::RoomEventCache`].
701707
pub(super) subscriber_count: Arc<AtomicUsize>,
@@ -714,6 +720,7 @@ mod private {
714720
pub async fn new(
715721
room_id: OwnedRoomId,
716722
room_version_rules: RoomVersionRules,
723+
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
717724
store: EventCacheStoreLock,
718725
pagination_status: SharedObservable<RoomPaginationStatus>,
719726
) -> Result<Self, EventCacheError> {
@@ -786,6 +793,7 @@ mod private {
786793
waited_for_initial_prev_token: false,
787794
subscriber_count: Default::default(),
788795
pagination_status,
796+
linked_chunk_update_sender,
789797
})
790798
}
791799

@@ -1255,20 +1263,27 @@ mod private {
12551263

12561264
let store = self.store.clone();
12571265
let room_id = self.room.clone();
1266+
let cloned_updates = updates.clone();
12581267

12591268
spawn(async move {
12601269
let store = store.lock().await?;
12611270

1262-
trace!(?updates, "sending linked chunk updates to the store");
1271+
trace!(updates = ?cloned_updates, "sending linked chunk updates to the store");
12631272
let linked_chunk_id = LinkedChunkId::Room(&room_id);
1264-
store.handle_linked_chunk_updates(linked_chunk_id, updates).await?;
1273+
store.handle_linked_chunk_updates(linked_chunk_id, cloned_updates).await?;
12651274
trace!("linked chunk updates applied");
12661275

12671276
super::Result::Ok(())
12681277
})
12691278
.await
12701279
.expect("joining failed")?;
12711280

1281+
// Forward that the store got updated to observers.
1282+
let _ = self.linked_chunk_update_sender.send(RoomEventCacheLinkedChunkUpdate {
1283+
linked_chunk: OwnedLinkedChunkId::Room(self.room.clone()),
1284+
updates,
1285+
});
1286+
12721287
Ok(())
12731288
}
12741289

@@ -1537,9 +1552,13 @@ mod private {
15371552
fn get_or_reload_thread(&mut self, root_event_id: OwnedEventId) -> &mut ThreadEventCache {
15381553
// TODO: when there's persistent storage, try to lazily reload from disk, if
15391554
// missing from memory.
1540-
self.threads
1541-
.entry(root_event_id.clone())
1542-
.or_insert_with(|| ThreadEventCache::new(root_event_id))
1555+
self.threads.entry(root_event_id.clone()).or_insert_with(|| {
1556+
ThreadEventCache::new(
1557+
self.room.clone(),
1558+
root_event_id,
1559+
self.linked_chunk_update_sender.clone(),
1560+
)
1561+
})
15431562
}
15441563

15451564
#[instrument(skip_all)]

crates/matrix-sdk/src/event_cache/room/threads.rs

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,16 @@ use std::collections::BTreeSet;
1919
use eyeball_im::VectorDiff;
2020
use matrix_sdk_base::{
2121
event_cache::{Event, Gap},
22-
linked_chunk::{ChunkContent, Position},
22+
linked_chunk::{ChunkContent, OwnedLinkedChunkId, Position},
2323
};
24-
use ruma::OwnedEventId;
24+
use ruma::{OwnedEventId, OwnedRoomId};
2525
use tokio::sync::broadcast::{Receiver, Sender};
2626
use tracing::trace;
2727

2828
use crate::event_cache::{
2929
deduplicator::DeduplicationOutcome,
3030
room::{events::EventLinkedChunk, LoadMoreEventsBackwardsOutcome},
31-
BackPaginationOutcome, EventsOrigin,
31+
BackPaginationOutcome, EventsOrigin, RoomEventCacheLinkedChunkUpdate,
3232
};
3333

3434
/// An update coming from a thread event cache.
@@ -42,6 +42,9 @@ pub struct ThreadEventCacheUpdate {
4242

4343
/// All the information related to a single thread.
4444
pub(crate) struct ThreadEventCache {
45+
/// The room owning this thread.
46+
room_id: OwnedRoomId,
47+
4548
/// The ID of the thread root event, which is the first event in the thread
4649
/// (and eventually the first in the linked chunk).
4750
thread_root: OwnedEventId,
@@ -51,12 +54,24 @@ pub(crate) struct ThreadEventCache {
5154

5255
/// A sender for live events updates in this thread.
5356
sender: Sender<ThreadEventCacheUpdate>,
57+
58+
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
5459
}
5560

5661
impl ThreadEventCache {
5762
/// Create a new empty thread event cache.
58-
pub fn new(thread_root: OwnedEventId) -> Self {
59-
Self { chunk: EventLinkedChunk::new(), sender: Sender::new(32), thread_root }
63+
pub fn new(
64+
room_id: OwnedRoomId,
65+
thread_root: OwnedEventId,
66+
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
67+
) -> Self {
68+
Self {
69+
chunk: EventLinkedChunk::new(),
70+
sender: Sender::new(32),
71+
room_id,
72+
thread_root,
73+
linked_chunk_update_sender,
74+
}
6075
}
6176

6277
/// Subscribe to live events from this thread.
@@ -78,6 +93,22 @@ impl ThreadEventCache {
7893
}
7994
}
8095

96+
// TODO(bnjbvr): share more code with `RoomEventCacheState` to avoid the
97+
// duplication here too.
98+
fn propagate_changes(&mut self) {
99+
// This is a lie, at the moment! We're not persisting threads yet, so we're just
100+
// forwarding all updates to the linked chunk update sender.
101+
let updates = self.chunk.store_updates().take();
102+
103+
let _ = self.linked_chunk_update_sender.send(RoomEventCacheLinkedChunkUpdate {
104+
updates,
105+
linked_chunk: OwnedLinkedChunkId::Thread(
106+
self.room_id.clone(),
107+
self.thread_root.clone(),
108+
),
109+
});
110+
}
111+
81112
/// Push some live events to this thread, and propagate the updates to
82113
/// the listeners.
83114
pub fn add_live_events(&mut self, events: Vec<Event>) {
@@ -104,6 +135,8 @@ impl ThreadEventCache {
104135

105136
self.chunk.push_live_events(None, &events);
106137

138+
self.propagate_changes();
139+
107140
let diffs = self.chunk.updates_as_vector_diffs();
108141
if !diffs.is_empty() {
109142
let _ = self.sender.send(ThreadEventCacheUpdate { diffs, origin: EventsOrigin::Sync });
@@ -249,6 +282,8 @@ impl ThreadEventCache {
249282
// Add the paginated events to the thread chunk.
250283
let reached_start = self.chunk.finish_back_pagination(prev_gap_id, new_gap, &events);
251284

285+
self.propagate_changes();
286+
252287
// Notify observers about the updates.
253288
let updates = self.chunk.updates_as_vector_diffs();
254289
if !updates.is_empty() {

0 commit comments

Comments
 (0)