Skip to content

Commit b6433de

Browse files
refactor(sdk): Move index handling to EventCache and use linked_chunk_update_sender.
Remove previous code that got updates in `RoomEventCacheState::post_process_new_events`. Add new task in `EventCache` that subscribes to `linked_chunk_update_sender` and forwards new events to `client::search::SearchIndex` same as before. Signed-off-by: Shrey Patel [email protected]
1 parent 385f1a8 commit b6433de

File tree

4 files changed

+119
-121
lines changed

4 files changed

+119
-121
lines changed

crates/matrix-sdk/src/event_cache/mod.rs

Lines changed: 95 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,9 @@ use matrix_sdk_base::{
5050
timer,
5151
};
5252
use matrix_sdk_common::executor::{spawn, JoinHandle};
53-
#[cfg(feature = "experimental-search")]
54-
use matrix_sdk_search::error::IndexError;
5553
use room::RoomEventCacheState;
54+
#[cfg(feature = "experimental-search")]
55+
use ruma::events::AnySyncMessageLikeEvent;
5656
use ruma::{
5757
events::AnySyncEphemeralRoomEvent, serde::Raw, OwnedEventId, OwnedRoomId, OwnedTransactionId,
5858
RoomId,
@@ -133,11 +133,6 @@ pub enum EventCacheError {
133133
/// A string containing details about the error.
134134
details: String,
135135
},
136-
137-
/// An error occurred in the index.
138-
#[cfg(feature = "experimental-search")]
139-
#[error(transparent)]
140-
IndexError(#[from] IndexError),
141136
}
142137

143138
/// A result using the [`EventCacheError`].
@@ -199,6 +194,12 @@ impl EventCache {
199194
thread_subscriber_sender,
200195
)));
201196

197+
#[cfg(feature = "experimental-search")]
198+
let search_indexing_task = AbortOnDrop::new(spawn(Self::search_indexing_task(
199+
client.clone(),
200+
linked_chunk_update_sender.clone(),
201+
)));
202+
202203
Self {
203204
inner: Arc::new(EventCacheInner {
204205
client,
@@ -210,6 +211,8 @@ impl EventCache {
210211
generic_update_sender,
211212
linked_chunk_update_sender,
212213
_thread_subscriber_task: thread_subscriber_task,
214+
#[cfg(feature = "experimental-search")]
215+
_search_indexing_task: search_indexing_task,
213216
thread_subscriber_receiver,
214217
}),
215218
}
@@ -689,6 +692,82 @@ impl EventCache {
689692
}
690693
}
691694
}
695+
696+
/// Takes a [`TimelineEvent`] and passes it to the [`RoomIndex`] of the
697+
/// given room which will add/remove/edit an event in the index based on
698+
/// the event type.
699+
#[cfg(feature = "experimental-search")]
700+
#[instrument(skip_all)]
701+
async fn search_indexing_task(
702+
client: WeakClient,
703+
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
704+
) {
705+
let mut linked_chunk_update_receiver = linked_chunk_update_sender.subscribe();
706+
707+
loop {
708+
match linked_chunk_update_receiver.recv().await {
709+
Ok(room_ec_lc_update) => {
710+
let OwnedLinkedChunkId::Room(room_id) =
711+
room_ec_lc_update.linked_chunk_id.clone()
712+
else {
713+
trace!("Received non-room updates, ignoring.");
714+
continue;
715+
};
716+
717+
let mut timeline_events = room_ec_lc_update.events().peekable();
718+
719+
if timeline_events.peek().is_none() {
720+
continue;
721+
}
722+
723+
let Some(client) = client.get() else {
724+
trace!("Client is shutting down, not spawning thread subscriber task");
725+
return;
726+
};
727+
728+
let mut search_index_guard = client.search_index().lock().await;
729+
730+
for event in timeline_events {
731+
if let Some(message_event) = parse_timeline_event_for_search_index(&event) {
732+
if let Err(err) =
733+
search_index_guard.handle_event(message_event, &room_id)
734+
{
735+
warn!("Failed to handle event for indexing: {err}")
736+
}
737+
}
738+
}
739+
}
740+
Err(RecvError::Closed) => {
741+
debug!("Linked chunk update channel has been closed, exiting thread subscriber task");
742+
break;
743+
}
744+
Err(RecvError::Lagged(num_skipped)) => {
745+
warn!(num_skipped, "Lagged behind linked chunk updates");
746+
}
747+
}
748+
}
749+
}
750+
}
751+
752+
#[cfg(feature = "experimental-search")]
753+
fn parse_timeline_event_for_search_index(event: &TimelineEvent) -> Option<AnySyncMessageLikeEvent> {
754+
use ruma::events::AnySyncTimelineEvent;
755+
756+
if event.kind.is_utd() {
757+
return None;
758+
}
759+
760+
match event.raw().deserialize() {
761+
Ok(event) => match event {
762+
AnySyncTimelineEvent::MessageLike(event) => Some(event),
763+
AnySyncTimelineEvent::State(_) => None,
764+
},
765+
766+
Err(e) => {
767+
warn!("failed to parse event: {e:?}");
768+
None
769+
}
770+
}
692771
}
693772

694773
struct EventCacheInner {
@@ -744,6 +823,15 @@ struct EventCacheInner {
744823
/// [`EventCache`], so it does listen to *all* rooms at the same time.
745824
_thread_subscriber_task: AbortOnDrop<()>,
746825

826+
/// A background task listening to room updates, and
827+
/// automatically handling search index operations add/remove/edit
828+
/// depending on the event type.
829+
///
830+
/// One important constraint is that there is only one such task per
831+
/// [`EventCache`], so it does listen to *all* rooms at the same time.
832+
#[cfg(feature = "experimental-search")]
833+
_search_indexing_task: AbortOnDrop<()>,
834+
747835
/// A test helper receiver that will be emitted every time the thread
748836
/// subscriber task subscribed to a new thread.
749837
///

crates/matrix-sdk/src/event_cache/pagination.rs

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -260,34 +260,32 @@ impl RoomPagination {
260260
batch_size: u16,
261261
prev_token: Option<String>,
262262
) -> Result<Option<BackPaginationOutcome>> {
263-
let Some(room) = self.inner.weak_room.get() else {
264-
// The client is shutting down, return an empty default response.
265-
return Ok(Some(BackPaginationOutcome {
266-
reached_start: false,
267-
events: Default::default(),
268-
}));
263+
let (events, new_token) = {
264+
let Some(room) = self.inner.weak_room.get() else {
265+
// The client is shutting down, return an empty default response.
266+
return Ok(Some(BackPaginationOutcome {
267+
reached_start: false,
268+
events: Default::default(),
269+
}));
270+
};
271+
272+
let mut options = MessagesOptions::new(Direction::Backward).from(prev_token.as_deref());
273+
options.limit = batch_size.into();
274+
275+
let response = room
276+
.messages(options)
277+
.await
278+
.map_err(|err| EventCacheError::BackpaginationError(Box::new(err)))?;
279+
280+
(response.chunk, response.end)
269281
};
270282

271-
let mut options = MessagesOptions::new(Direction::Backward).from(prev_token.as_deref());
272-
options.limit = batch_size.into();
273-
274-
let response = room
275-
.messages(options)
276-
.await
277-
.map_err(|err| EventCacheError::BackpaginationError(Box::new(err)))?;
278-
279283
if let Some((outcome, timeline_event_diffs)) = self
280284
.inner
281285
.state
282286
.write()
283287
.await
284-
.handle_backpagination(
285-
response.chunk,
286-
response.end,
287-
prev_token,
288-
#[cfg(feature = "experimental-search")]
289-
&room,
290-
)
288+
.handle_backpagination(events, new_token, prev_token)
291289
.await?
292290
{
293291
if !timeline_event_diffs.is_empty() {

crates/matrix-sdk/src/event_cache/room/mod.rs

Lines changed: 5 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -542,22 +542,8 @@ impl RoomEventCacheInner {
542542
// Add all the events to the backend.
543543
trace!("adding new events");
544544

545-
#[cfg(feature = "experimental-search")]
546-
let Some(room) = self.weak_room.get() else {
547-
trace!("Couldn't get room while handling timeline");
548-
return Ok(());
549-
};
550-
551-
let (stored_prev_batch_token, timeline_event_diffs) = self
552-
.state
553-
.write()
554-
.await
555-
.handle_sync(
556-
timeline,
557-
#[cfg(feature = "experimental-search")]
558-
&room,
559-
)
560-
.await?;
545+
let (stored_prev_batch_token, timeline_event_diffs) =
546+
self.state.write().await.handle_sync(timeline).await?;
561547

562548
// Now that all events have been added, we can trigger the
563549
// `pagination_token_notifier`.
@@ -622,8 +608,6 @@ mod private {
622608

623609
use eyeball::SharedObservable;
624610
use eyeball_im::VectorDiff;
625-
#[cfg(feature = "experimental-search")]
626-
use matrix_sdk_base::deserialized_responses::TimelineEvent;
627611
use matrix_sdk_base::{
628612
apply_redaction,
629613
deserialized_responses::{ThreadSummary, ThreadSummaryStatus, TimelineEventKind},
@@ -662,8 +646,6 @@ mod private {
662646
BackPaginationOutcome, RoomEventCacheLinkedChunkUpdate, RoomPaginationStatus,
663647
ThreadEventCacheUpdate,
664648
};
665-
#[cfg(feature = "experimental-search")]
666-
use crate::Room;
667649

668650
/// State for a single room's event cache.
669651
///
@@ -1460,41 +1442,6 @@ mod private {
14601442
Ok(Some((target, related)))
14611443
}
14621444

1463-
#[cfg(feature = "experimental-search")]
1464-
fn parse_timeline_event(&self, event: &TimelineEvent) -> Option<AnySyncMessageLikeEvent> {
1465-
if event.kind.is_utd() {
1466-
return None;
1467-
}
1468-
1469-
match event.raw().deserialize() {
1470-
Ok(event) => match event {
1471-
AnySyncTimelineEvent::MessageLike(event) => Some(event),
1472-
AnySyncTimelineEvent::State(_) => None,
1473-
},
1474-
1475-
Err(e) => {
1476-
warn!("failed to parse event: {e:?}");
1477-
None
1478-
}
1479-
}
1480-
}
1481-
1482-
/// Takes a [`TimelineEvent`] and passes it to the [`RoomIndex`] of the
1483-
/// given room which will add/remove/edit an event in the index based on
1484-
/// the event type.
1485-
#[cfg(feature = "experimental-search")]
1486-
async fn index_event(
1487-
&self,
1488-
event: &TimelineEvent,
1489-
room: &Room,
1490-
) -> Result<(), EventCacheError> {
1491-
if let Some(message_event) = self.parse_timeline_event(event) {
1492-
room.index_event(message_event).await.map_err(EventCacheError::from)
1493-
} else {
1494-
Ok(())
1495-
}
1496-
}
1497-
14981445
/// Post-process new events, after they have been added to the in-memory
14991446
/// linked chunk.
15001447
///
@@ -1503,21 +1450,14 @@ mod private {
15031450
&mut self,
15041451
events: Vec<Event>,
15051452
is_sync: bool,
1506-
#[cfg(feature = "experimental-search")] room: &Room,
15071453
) -> Result<(), EventCacheError> {
15081454
// Update the store before doing the post-processing.
15091455
self.propagate_changes().await?;
15101456

15111457
let mut new_events_by_thread: BTreeMap<_, Vec<_>> = BTreeMap::new();
15121458

15131459
for event in events {
1514-
self.maybe_apply_new_redaction(&event).await?; // TODO: Handle redaction for search index
1515-
1516-
// We can also add the event to the index.
1517-
#[cfg(feature = "experimental-search")]
1518-
if let Err(err) = self.index_event(&event, room).await {
1519-
warn!("error while trying to index event: {err:?}");
1520-
}
1460+
self.maybe_apply_new_redaction(&event).await?;
15211461

15221462
if let Some(thread_root) = extract_thread_root(event.raw()) {
15231463
new_events_by_thread.entry(thread_root).or_default().push(event.clone());
@@ -1760,7 +1700,6 @@ mod private {
17601700
pub async fn handle_sync(
17611701
&mut self,
17621702
mut timeline: Timeline,
1763-
#[cfg(feature = "experimental-search")] room: &Room,
17641703
) -> Result<(bool, Vec<VectorDiff<Event>>), EventCacheError> {
17651704
let mut prev_batch = timeline.prev_batch.take();
17661705

@@ -1853,13 +1792,7 @@ mod private {
18531792
self.room_linked_chunk
18541793
.push_live_events(prev_batch.map(|prev_token| Gap { prev_token }), &events);
18551794

1856-
self.post_process_new_events(
1857-
events,
1858-
true,
1859-
#[cfg(feature = "experimental-search")]
1860-
room,
1861-
)
1862-
.await?;
1795+
self.post_process_new_events(events, true).await?;
18631796

18641797
if timeline.limited && has_new_gap {
18651798
// If there was a previous batch token for a limited timeline, unload the chunks
@@ -1889,7 +1822,6 @@ mod private {
18891822
events: Vec<Event>,
18901823
mut new_token: Option<String>,
18911824
prev_token: Option<String>,
1892-
#[cfg(feature = "experimental-search")] room: &Room,
18931825
) -> Result<Option<(BackPaginationOutcome, Vec<VectorDiff<Event>>)>, EventCacheError>
18941826
{
18951827
// Check that the previous token still exists; otherwise it's a sign that the
@@ -1964,13 +1896,7 @@ mod private {
19641896
);
19651897

19661898
// Note: this flushes updates to the store.
1967-
self.post_process_new_events(
1968-
topo_ordered_events,
1969-
false,
1970-
#[cfg(feature = "experimental-search")]
1971-
room,
1972-
)
1973-
.await?;
1899+
self.post_process_new_events(topo_ordered_events, false).await?;
19741900

19751901
let event_diffs = self.room_linked_chunk.updates_as_vector_diffs();
19761902

crates/matrix-sdk/src/room/mod.rs

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,6 @@ use matrix_sdk_common::{
5555
timeout::timeout,
5656
};
5757
#[cfg(feature = "experimental-search")]
58-
use matrix_sdk_search::error::IndexError;
59-
#[cfg(feature = "experimental-search")]
6058
#[cfg(doc)]
6159
use matrix_sdk_search::index::RoomIndex;
6260
use mime::Mime;
@@ -3752,18 +3750,6 @@ impl Room {
37523750
opts.send(self, event_id).await
37533751
}
37543752

3755-
/// Handle an [`AnySyncMessageLikeEvent`] in this room's [`RoomIndex`].
3756-
///
3757-
/// This which will add/remove/edit an event in the index based on the
3758-
/// event type.
3759-
#[cfg(feature = "experimental-search")]
3760-
pub(crate) async fn index_event(
3761-
&self,
3762-
event: AnySyncMessageLikeEvent,
3763-
) -> Result<(), IndexError> {
3764-
self.client.search_index().lock().await.handle_event(event, self.room_id())
3765-
}
3766-
37673753
/// Search this room's [`RoomIndex`] for query and return at most
37683754
/// max_number_of_results results.
37693755
#[cfg(feature = "experimental-search")]

0 commit comments

Comments
 (0)