Skip to content

Commit 8848aa3

Browse files
committed
feat(event cache): add a way to subscribe to any room's linked chunk updates
1 parent 9d90a92 commit 8848aa3

File tree

3 files changed

+119
-15
lines changed

3 files changed

+119
-15
lines changed

crates/matrix-sdk/src/event_cache/mod.rs

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,11 @@ use eyeball_im::VectorDiff;
3838
use futures_util::future::{join_all, try_join_all};
3939
use matrix_sdk_base::{
4040
deserialized_responses::{AmbiguityChange, TimelineEvent},
41-
event_cache::store::{EventCacheStoreError, EventCacheStoreLock},
42-
linked_chunk::lazy_loader::LazyLoaderError,
41+
event_cache::{
42+
store::{EventCacheStoreError, EventCacheStoreLock},
43+
Gap,
44+
},
45+
linked_chunk::{self, lazy_loader::LazyLoaderError, OwnedLinkedChunkId},
4346
store_locks::LockStoreError,
4447
sync::RoomUpdates,
4548
timer,
@@ -168,6 +171,7 @@ impl EventCache {
168171
/// Create a new [`EventCache`] for the given client.
169172
pub(crate) fn new(client: WeakClient, event_cache_store: EventCacheStoreLock) -> Self {
170173
let (room_event_cache_generic_update_sender, _) = channel(32);
174+
let (linked_chunk_update_sender, _) = channel(32);
171175

172176
Self {
173177
inner: Arc::new(EventCacheInner {
@@ -178,6 +182,7 @@ impl EventCache {
178182
drop_handles: Default::default(),
179183
auto_shrink_sender: Default::default(),
180184
room_event_cache_generic_update_sender,
185+
linked_chunk_update_sender,
181186
}),
182187
}
183188
}
@@ -430,6 +435,14 @@ struct EventCacheInner {
430435
/// See doc comment of [`RoomEventCacheGenericUpdate`] and
431436
/// [`EventCache::subscribe_to_room_generic_updates`].
432437
room_event_cache_generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
438+
439+
/// A sender for a persisted linked chunk update.
440+
///
441+
/// This is used to notify that some linked chunk has persisted some updates
442+
/// to a store, and can be used by observers to look for new events.
443+
///
444+
/// See doc comment of [`RoomEventCacheLinkedChunkUpdate`].
445+
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
433446
}
434447

435448
type AutoShrinkChannelPayload = OwnedRoomId;
@@ -593,6 +606,7 @@ impl EventCacheInner {
593606
let room_state = RoomEventCacheState::new(
594607
room_id.to_owned(),
595608
room_version_rules,
609+
self.linked_chunk_update_sender.clone(),
596610
self.store.clone(),
597611
pagination_status.clone(),
598612
)
@@ -679,6 +693,42 @@ impl RoomEventCacheGenericUpdate {
679693
}
680694
}
681695

696+
/// An update being triggered when events change in the persisted event cache
697+
/// for any room.
698+
#[derive(Clone, Debug)]
699+
struct RoomEventCacheLinkedChunkUpdate {
700+
/// The linked chunk affected by the update.
701+
linked_chunk: OwnedLinkedChunkId,
702+
703+
/// A vector of all the updates that happened during this update.
704+
updates: Vec<linked_chunk::Update<TimelineEvent, Gap>>,
705+
}
706+
707+
impl RoomEventCacheLinkedChunkUpdate {
708+
/// Return all the new events propagated by this update, in topological
709+
/// order.
710+
pub fn events(self) -> Vec<TimelineEvent> {
711+
self.updates
712+
.into_iter()
713+
.flat_map(|update| match update {
714+
linked_chunk::Update::PushItems { items, .. } => items,
715+
linked_chunk::Update::ReplaceItem { item, .. } => vec![item],
716+
linked_chunk::Update::RemoveItem { .. }
717+
| linked_chunk::Update::DetachLastItems { .. }
718+
| linked_chunk::Update::StartReattachItems
719+
| linked_chunk::Update::EndReattachItems
720+
| linked_chunk::Update::NewItemsChunk { .. }
721+
| linked_chunk::Update::NewGapChunk { .. }
722+
| linked_chunk::Update::RemoveChunk(..)
723+
| linked_chunk::Update::Clear => {
724+
// All these updates don't contain any new event.
725+
vec![]
726+
}
727+
})
728+
.collect()
729+
}
730+
}
731+
682732
/// An update related to events happened in a room.
683733
#[derive(Debug, Clone)]
684734
pub enum RoomEventCacheUpdate {

crates/matrix-sdk/src/event_cache/room/mod.rs

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -624,7 +624,8 @@ mod private {
624624
},
625625
linked_chunk::{
626626
lazy_loader::{self},
627-
ChunkContent, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId, Position, Update,
627+
ChunkContent, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId,
628+
OwnedLinkedChunkId, Position, Update,
628629
},
629630
serde_helpers::extract_thread_root,
630631
sync::Timeline,
@@ -639,7 +640,7 @@ mod private {
639640
serde::Raw,
640641
EventId, OwnedEventId, OwnedRoomId,
641642
};
642-
use tokio::sync::broadcast::Receiver;
643+
use tokio::sync::broadcast::{Receiver, Sender};
643644
use tracing::{debug, error, instrument, trace, warn};
644645

645646
use super::{
@@ -649,7 +650,8 @@ mod private {
649650
};
650651
use crate::event_cache::{
651652
deduplicator::filter_duplicate_events, room::threads::ThreadEventCache,
652-
BackPaginationOutcome, RoomPaginationStatus, ThreadEventCacheUpdate,
653+
BackPaginationOutcome, RoomEventCacheLinkedChunkUpdate, RoomPaginationStatus,
654+
ThreadEventCacheUpdate,
653655
};
654656

655657
/// State for a single room's event cache.
@@ -683,6 +685,10 @@ mod private {
683685

684686
pagination_status: SharedObservable<RoomPaginationStatus>,
685687

688+
/// See doc comment of
689+
/// [`super::super::EventCacheInner::linked_chunk_update_sender`].
690+
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
691+
686692
/// An atomic count of the current number of subscriber of the
687693
/// [`super::RoomEventCache`].
688694
pub(super) subscriber_count: Arc<AtomicUsize>,
@@ -701,6 +707,7 @@ mod private {
701707
pub async fn new(
702708
room_id: OwnedRoomId,
703709
room_version_rules: RoomVersionRules,
710+
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
704711
store: EventCacheStoreLock,
705712
pagination_status: SharedObservable<RoomPaginationStatus>,
706713
) -> Result<Self, EventCacheError> {
@@ -773,6 +780,7 @@ mod private {
773780
waited_for_initial_prev_token: false,
774781
subscriber_count: Default::default(),
775782
pagination_status,
783+
linked_chunk_update_sender,
776784
})
777785
}
778786

@@ -1242,20 +1250,27 @@ mod private {
12421250

12431251
let store = self.store.clone();
12441252
let room_id = self.room.clone();
1253+
let cloned_updates = updates.clone();
12451254

12461255
spawn(async move {
12471256
let store = store.lock().await?;
12481257

1249-
trace!(?updates, "sending linked chunk updates to the store");
1258+
trace!(updates = ?cloned_updates, "sending linked chunk updates to the store");
12501259
let linked_chunk_id = LinkedChunkId::Room(&room_id);
1251-
store.handle_linked_chunk_updates(linked_chunk_id, updates).await?;
1260+
store.handle_linked_chunk_updates(linked_chunk_id, cloned_updates).await?;
12521261
trace!("linked chunk updates applied");
12531262

12541263
super::Result::Ok(())
12551264
})
12561265
.await
12571266
.expect("joining failed")?;
12581267

1268+
// Forward that the store got updated to observers.
1269+
let _ = self.linked_chunk_update_sender.send(RoomEventCacheLinkedChunkUpdate {
1270+
linked_chunk: OwnedLinkedChunkId::Room(self.room.clone()),
1271+
updates,
1272+
});
1273+
12591274
Ok(())
12601275
}
12611276

@@ -1474,9 +1489,13 @@ mod private {
14741489
fn get_or_reload_thread(&mut self, root_event_id: OwnedEventId) -> &mut ThreadEventCache {
14751490
// TODO: when there's persistent storage, try to lazily reload from disk, if
14761491
// missing from memory.
1477-
self.threads
1478-
.entry(root_event_id.clone())
1479-
.or_insert_with(|| ThreadEventCache::new(root_event_id))
1492+
self.threads.entry(root_event_id.clone()).or_insert_with(|| {
1493+
ThreadEventCache::new(
1494+
self.room.clone(),
1495+
root_event_id,
1496+
self.linked_chunk_update_sender.clone(),
1497+
)
1498+
})
14801499
}
14811500

14821501
#[instrument(skip_all)]

crates/matrix-sdk/src/event_cache/room/threads.rs

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,16 @@ use std::collections::BTreeSet;
1919
use eyeball_im::VectorDiff;
2020
use matrix_sdk_base::{
2121
event_cache::{Event, Gap},
22-
linked_chunk::{ChunkContent, Position},
22+
linked_chunk::{ChunkContent, OwnedLinkedChunkId, Position},
2323
};
24-
use ruma::OwnedEventId;
24+
use ruma::{OwnedEventId, OwnedRoomId};
2525
use tokio::sync::broadcast::{Receiver, Sender};
2626
use tracing::trace;
2727

2828
use crate::event_cache::{
2929
deduplicator::DeduplicationOutcome,
3030
room::{events::EventLinkedChunk, LoadMoreEventsBackwardsOutcome},
31-
BackPaginationOutcome, EventsOrigin,
31+
BackPaginationOutcome, EventsOrigin, RoomEventCacheLinkedChunkUpdate,
3232
};
3333

3434
/// An update coming from a thread event cache.
@@ -42,6 +42,9 @@ pub struct ThreadEventCacheUpdate {
4242

4343
/// All the information related to a single thread.
4444
pub(crate) struct ThreadEventCache {
45+
/// The room owning this thread.
46+
room_id: OwnedRoomId,
47+
4548
/// The ID of the thread root event, which is the first event in the thread
4649
/// (and eventually the first in the linked chunk).
4750
thread_root: OwnedEventId,
@@ -51,12 +54,24 @@ pub(crate) struct ThreadEventCache {
5154

5255
/// A sender for live events updates in this thread.
5356
sender: Sender<ThreadEventCacheUpdate>,
57+
58+
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
5459
}
5560

5661
impl ThreadEventCache {
5762
/// Create a new empty thread event cache.
58-
pub fn new(thread_root: OwnedEventId) -> Self {
59-
Self { chunk: EventLinkedChunk::new(), sender: Sender::new(32), thread_root }
63+
pub fn new(
64+
room_id: OwnedRoomId,
65+
thread_root: OwnedEventId,
66+
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
67+
) -> Self {
68+
Self {
69+
chunk: EventLinkedChunk::new(),
70+
sender: Sender::new(32),
71+
room_id,
72+
thread_root,
73+
linked_chunk_update_sender,
74+
}
6075
}
6176

6277
/// Subscribe to live events from this thread.
@@ -78,6 +93,22 @@ impl ThreadEventCache {
7893
}
7994
}
8095

96+
// TODO(bnjbvr): share more code with `RoomEventCacheState` to avoid the
97+
// duplication here too.
98+
fn propagate_changes(&mut self) {
99+
// This is a lie, at the moment! We're not persisting threads yet, so we're just
100+
// forwarding all updates to the linked chunk update sender.
101+
let updates = self.chunk.store_updates().take();
102+
103+
let _ = self.linked_chunk_update_sender.send(RoomEventCacheLinkedChunkUpdate {
104+
updates,
105+
linked_chunk: OwnedLinkedChunkId::Thread(
106+
self.room_id.clone(),
107+
self.thread_root.clone(),
108+
),
109+
});
110+
}
111+
81112
/// Push some live events to this thread, and propagate the updates to
82113
/// the listeners.
83114
pub fn add_live_events(&mut self, events: Vec<Event>) {
@@ -104,6 +135,8 @@ impl ThreadEventCache {
104135

105136
self.chunk.push_live_events(None, &events);
106137

138+
self.propagate_changes();
139+
107140
let diffs = self.chunk.updates_as_vector_diffs();
108141
if !diffs.is_empty() {
109142
let _ = self.sender.send(ThreadEventCacheUpdate { diffs, origin: EventsOrigin::Sync });
@@ -249,6 +282,8 @@ impl ThreadEventCache {
249282
// Add the paginated events to the thread chunk.
250283
let reached_start = self.chunk.finish_back_pagination(prev_gap_id, new_gap, &events);
251284

285+
self.propagate_changes();
286+
252287
// Notify observers about the updates.
253288
let updates = self.chunk.updates_as_vector_diffs();
254289
if !updates.is_empty() {

0 commit comments

Comments
 (0)