Skip to content

Commit 67e3bee

Browse files
committed
feat(event cache): add a way to subscribe to any room's linked chunk updates
1 parent 247ec1d commit 67e3bee

File tree

3 files changed

+119
-15
lines changed

3 files changed

+119
-15
lines changed

crates/matrix-sdk/src/event_cache/mod.rs

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,11 @@ use eyeball_im::VectorDiff;
3838
use futures_util::future::{join_all, try_join_all};
3939
use matrix_sdk_base::{
4040
deserialized_responses::{AmbiguityChange, TimelineEvent},
41-
event_cache::store::{EventCacheStoreError, EventCacheStoreLock},
42-
linked_chunk::lazy_loader::LazyLoaderError,
41+
event_cache::{
42+
store::{EventCacheStoreError, EventCacheStoreLock},
43+
Gap,
44+
},
45+
linked_chunk::{self, lazy_loader::LazyLoaderError, OwnedLinkedChunkId},
4346
store_locks::LockStoreError,
4447
sync::RoomUpdates,
4548
timer,
@@ -168,6 +171,7 @@ impl EventCache {
168171
/// Create a new [`EventCache`] for the given client.
169172
pub(crate) fn new(client: WeakClient, event_cache_store: EventCacheStoreLock) -> Self {
170173
let (generic_update_sender, _) = channel(32);
174+
let (linked_chunk_update_sender, _) = channel(32);
171175

172176
Self {
173177
inner: Arc::new(EventCacheInner {
@@ -178,6 +182,7 @@ impl EventCache {
178182
drop_handles: Default::default(),
179183
auto_shrink_sender: Default::default(),
180184
generic_update_sender,
185+
linked_chunk_update_sender,
181186
}),
182187
}
183188
}
@@ -428,6 +433,14 @@ struct EventCacheInner {
428433
/// See doc comment of [`RoomEventCacheGenericUpdate`] and
429434
/// [`EventCache::subscribe_to_room_generic_updates`].
430435
generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
436+
437+
/// A sender for a persisted linked chunk update.
438+
///
439+
/// This is used to notify that some linked chunk has persisted some updates
440+
/// to a store, and can be used by observers to look for new events.
441+
///
442+
/// See doc comment of [`RoomEventCacheLinkedChunkUpdate`].
443+
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
431444
}
432445

433446
type AutoShrinkChannelPayload = OwnedRoomId;
@@ -591,6 +604,7 @@ impl EventCacheInner {
591604
let room_state = RoomEventCacheState::new(
592605
room_id.to_owned(),
593606
room_version_rules,
607+
self.linked_chunk_update_sender.clone(),
594608
self.store.clone(),
595609
pagination_status.clone(),
596610
)
@@ -657,6 +671,42 @@ pub struct RoomEventCacheGenericUpdate {
657671
pub room_id: OwnedRoomId,
658672
}
659673

674+
/// An update being triggered when events change in the persisted event cache
675+
/// for any room.
676+
#[derive(Clone, Debug)]
677+
struct RoomEventCacheLinkedChunkUpdate {
678+
/// The linked chunk affected by the update.
679+
linked_chunk: OwnedLinkedChunkId,
680+
681+
/// A vector of all the updates that happened during this update.
682+
updates: Vec<linked_chunk::Update<TimelineEvent, Gap>>,
683+
}
684+
685+
impl RoomEventCacheLinkedChunkUpdate {
686+
/// Return all the new events propagated by this update, in topological
687+
/// order.
688+
pub fn events(self) -> Vec<TimelineEvent> {
689+
self.updates
690+
.into_iter()
691+
.flat_map(|update| match update {
692+
linked_chunk::Update::PushItems { items, .. } => items,
693+
linked_chunk::Update::ReplaceItem { item, .. } => vec![item],
694+
linked_chunk::Update::RemoveItem { .. }
695+
| linked_chunk::Update::DetachLastItems { .. }
696+
| linked_chunk::Update::StartReattachItems
697+
| linked_chunk::Update::EndReattachItems
698+
| linked_chunk::Update::NewItemsChunk { .. }
699+
| linked_chunk::Update::NewGapChunk { .. }
700+
| linked_chunk::Update::RemoveChunk(..)
701+
| linked_chunk::Update::Clear => {
702+
// All these updates don't contain any new event.
703+
vec![]
704+
}
705+
})
706+
.collect()
707+
}
708+
}
709+
660710
/// An update related to events happened in a room.
661711
#[derive(Debug, Clone)]
662712
pub enum RoomEventCacheUpdate {

crates/matrix-sdk/src/event_cache/room/mod.rs

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -617,7 +617,8 @@ mod private {
617617
},
618618
linked_chunk::{
619619
lazy_loader::{self},
620-
ChunkContent, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId, Position, Update,
620+
ChunkContent, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId,
621+
OwnedLinkedChunkId, Position, Update,
621622
},
622623
serde_helpers::extract_thread_root,
623624
sync::Timeline,
@@ -632,7 +633,7 @@ mod private {
632633
serde::Raw,
633634
EventId, OwnedEventId, OwnedRoomId,
634635
};
635-
use tokio::sync::broadcast::Receiver;
636+
use tokio::sync::broadcast::{Receiver, Sender};
636637
use tracing::{debug, error, instrument, trace, warn};
637638

638639
use super::{
@@ -642,7 +643,8 @@ mod private {
642643
};
643644
use crate::event_cache::{
644645
deduplicator::filter_duplicate_events, room::threads::ThreadEventCache,
645-
BackPaginationOutcome, RoomPaginationStatus, ThreadEventCacheUpdate,
646+
BackPaginationOutcome, RoomEventCacheLinkedChunkUpdate, RoomPaginationStatus,
647+
ThreadEventCacheUpdate,
646648
};
647649

648650
/// State for a single room's event cache.
@@ -676,6 +678,10 @@ mod private {
676678

677679
pagination_status: SharedObservable<RoomPaginationStatus>,
678680

681+
/// See doc comment of
682+
/// [`super::super::EventCacheInner::linked_chunk_update_sender`].
683+
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
684+
679685
/// An atomic count of the current number of subscriber of the
680686
/// [`super::RoomEventCache`].
681687
pub(super) subscriber_count: Arc<AtomicUsize>,
@@ -694,6 +700,7 @@ mod private {
694700
pub async fn new(
695701
room_id: OwnedRoomId,
696702
room_version_rules: RoomVersionRules,
703+
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
697704
store: EventCacheStoreLock,
698705
pagination_status: SharedObservable<RoomPaginationStatus>,
699706
) -> Result<Self, EventCacheError> {
@@ -766,6 +773,7 @@ mod private {
766773
waited_for_initial_prev_token: false,
767774
subscriber_count: Default::default(),
768775
pagination_status,
776+
linked_chunk_update_sender,
769777
})
770778
}
771779

@@ -1235,20 +1243,27 @@ mod private {
12351243

12361244
let store = self.store.clone();
12371245
let room_id = self.room.clone();
1246+
let cloned_updates = updates.clone();
12381247

12391248
spawn(async move {
12401249
let store = store.lock().await?;
12411250

1242-
trace!(?updates, "sending linked chunk updates to the store");
1251+
trace!(updates = ?cloned_updates, "sending linked chunk updates to the store");
12431252
let linked_chunk_id = LinkedChunkId::Room(&room_id);
1244-
store.handle_linked_chunk_updates(linked_chunk_id, updates).await?;
1253+
store.handle_linked_chunk_updates(linked_chunk_id, cloned_updates).await?;
12451254
trace!("linked chunk updates applied");
12461255

12471256
super::Result::Ok(())
12481257
})
12491258
.await
12501259
.expect("joining failed")?;
12511260

1261+
// Forward that the store got updated to observers.
1262+
let _ = self.linked_chunk_update_sender.send(RoomEventCacheLinkedChunkUpdate {
1263+
linked_chunk: OwnedLinkedChunkId::Room(self.room.clone()),
1264+
updates,
1265+
});
1266+
12521267
Ok(())
12531268
}
12541269

@@ -1467,9 +1482,13 @@ mod private {
14671482
fn get_or_reload_thread(&mut self, root_event_id: OwnedEventId) -> &mut ThreadEventCache {
14681483
// TODO: when there's persistent storage, try to lazily reload from disk, if
14691484
// missing from memory.
1470-
self.threads
1471-
.entry(root_event_id.clone())
1472-
.or_insert_with(|| ThreadEventCache::new(root_event_id))
1485+
self.threads.entry(root_event_id.clone()).or_insert_with(|| {
1486+
ThreadEventCache::new(
1487+
self.room.clone(),
1488+
root_event_id,
1489+
self.linked_chunk_update_sender.clone(),
1490+
)
1491+
})
14731492
}
14741493

14751494
#[instrument(skip_all)]

crates/matrix-sdk/src/event_cache/room/threads.rs

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,16 @@ use std::collections::BTreeSet;
1919
use eyeball_im::VectorDiff;
2020
use matrix_sdk_base::{
2121
event_cache::{Event, Gap},
22-
linked_chunk::{ChunkContent, Position},
22+
linked_chunk::{ChunkContent, OwnedLinkedChunkId, Position},
2323
};
24-
use ruma::OwnedEventId;
24+
use ruma::{OwnedEventId, OwnedRoomId};
2525
use tokio::sync::broadcast::{Receiver, Sender};
2626
use tracing::trace;
2727

2828
use crate::event_cache::{
2929
deduplicator::DeduplicationOutcome,
3030
room::{events::EventLinkedChunk, LoadMoreEventsBackwardsOutcome},
31-
BackPaginationOutcome, EventsOrigin,
31+
BackPaginationOutcome, EventsOrigin, RoomEventCacheLinkedChunkUpdate,
3232
};
3333

3434
/// An update coming from a thread event cache.
@@ -42,6 +42,9 @@ pub struct ThreadEventCacheUpdate {
4242

4343
/// All the information related to a single thread.
4444
pub(crate) struct ThreadEventCache {
45+
/// The room owning this thread.
46+
room_id: OwnedRoomId,
47+
4548
/// The ID of the thread root event, which is the first event in the thread
4649
/// (and eventually the first in the linked chunk).
4750
thread_root: OwnedEventId,
@@ -51,12 +54,24 @@ pub(crate) struct ThreadEventCache {
5154

5255
/// A sender for live events updates in this thread.
5356
sender: Sender<ThreadEventCacheUpdate>,
57+
58+
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
5459
}
5560

5661
impl ThreadEventCache {
5762
/// Create a new empty thread event cache.
58-
pub fn new(thread_root: OwnedEventId) -> Self {
59-
Self { chunk: EventLinkedChunk::new(), sender: Sender::new(32), thread_root }
63+
pub fn new(
64+
room_id: OwnedRoomId,
65+
thread_root: OwnedEventId,
66+
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
67+
) -> Self {
68+
Self {
69+
chunk: EventLinkedChunk::new(),
70+
sender: Sender::new(32),
71+
room_id,
72+
thread_root,
73+
linked_chunk_update_sender,
74+
}
6075
}
6176

6277
/// Subscribe to live events from this thread.
@@ -78,6 +93,22 @@ impl ThreadEventCache {
7893
}
7994
}
8095

96+
// TODO(bnjbvr): share more code with `RoomEventCacheState` to avoid the
97+
// duplication here too.
98+
fn propagate_changes(&mut self) {
99+
// This is a lie, at the moment! We're not persisting threads yet, so we're just
100+
// forwarding all updates to the linked chunk update sender.
101+
let updates = self.chunk.store_updates().take();
102+
103+
let _ = self.linked_chunk_update_sender.send(RoomEventCacheLinkedChunkUpdate {
104+
updates,
105+
linked_chunk: OwnedLinkedChunkId::Thread(
106+
self.room_id.clone(),
107+
self.thread_root.clone(),
108+
),
109+
});
110+
}
111+
81112
/// Push some live events to this thread, and propagate the updates to
82113
/// the listeners.
83114
pub fn add_live_events(&mut self, events: Vec<Event>) {
@@ -104,6 +135,8 @@ impl ThreadEventCache {
104135

105136
self.chunk.push_live_events(None, &events);
106137

138+
self.propagate_changes();
139+
107140
let diffs = self.chunk.updates_as_vector_diffs();
108141
if !diffs.is_empty() {
109142
let _ = self.sender.send(ThreadEventCacheUpdate { diffs, origin: EventsOrigin::Sync });
@@ -249,6 +282,8 @@ impl ThreadEventCache {
249282
// Add the paginated events to the thread chunk.
250283
let reached_start = self.chunk.finish_back_pagination(prev_gap_id, new_gap, &events);
251284

285+
self.propagate_changes();
286+
252287
// Notify observers about the updates.
253288
let updates = self.chunk.updates_as_vector_diffs();
254289
if !updates.is_empty() {

0 commit comments

Comments
 (0)