Skip to content

Commit 705d6f8

Browse files
committed
refactor(event cache): move the listening to linked chunk updates to its own function, and introduce a select! at the top level
1 parent 4fc28c4 commit 705d6f8

File tree

1 file changed

+104
-89
lines changed
  • crates/matrix-sdk/src/event_cache

1 file changed

+104
-89
lines changed

crates/matrix-sdk/src/event_cache/mod.rs

Lines changed: 104 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,12 @@ use matrix_sdk_common::executor::{spawn, JoinHandle};
5353
use matrix_sdk_search::error::IndexError;
5454
use room::RoomEventCacheState;
5555
use ruma::{events::AnySyncEphemeralRoomEvent, serde::Raw, OwnedEventId, OwnedRoomId, RoomId};
56-
use tokio::sync::{
57-
broadcast::{channel, error::RecvError, Receiver, Sender},
58-
mpsc, Mutex, RwLock,
56+
use tokio::{
57+
select,
58+
sync::{
59+
broadcast::{channel, error::RecvError, Receiver, Sender},
60+
mpsc, Mutex, RwLock,
61+
},
5962
};
6063
use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument as _, Span};
6164

@@ -422,6 +425,92 @@ impl EventCache {
422425
self.inner.generic_update_sender.subscribe()
423426
}
424427

428+
async fn handle_thread_subscriber_linked_chunk_update(
429+
client: &WeakClient,
430+
thread_subscriber_sender: &Sender<()>,
431+
up: RoomEventCacheLinkedChunkUpdate,
432+
) -> bool {
433+
let Some(client) = client.get() else {
434+
// Client shutting down.
435+
debug!("Client is shutting down, exiting thread subscriber task");
436+
return false;
437+
};
438+
439+
let OwnedLinkedChunkId::Thread(room_id, thread_root) = &up.linked_chunk else {
440+
trace!("received an update for a non-thread linked chunk, ignoring");
441+
return true;
442+
};
443+
444+
let Some(room) = client.get_room(room_id) else {
445+
warn!(%room_id, "unknown room");
446+
return true;
447+
};
448+
449+
let thread_root = thread_root.clone();
450+
451+
let new_events = up.events();
452+
if new_events.is_empty() {
453+
// No new events, nothing to do.
454+
return true;
455+
}
456+
457+
// This `PushContext` is going to be used to compute whether an in-thread event
458+
// would trigger a mention.
459+
//
460+
// Of course, we're not interested in an in-thread event causing a mention,
461+
// because it's part of a thread we've subscribed to. So the
462+
// `PushContext` must not include the check for thread subscriptions (otherwise
463+
// it would be impossible to subscribe to new threads).
464+
465+
let with_thread_subscriptions = false;
466+
467+
let Some(push_context) = room
468+
.push_context_internal(with_thread_subscriptions)
469+
.await
470+
.inspect_err(|err| {
471+
warn!("Failed to get push context for threads: {err}");
472+
})
473+
.ok()
474+
.flatten()
475+
else {
476+
warn!("Missing push context for thread subscriptions.");
477+
return true;
478+
};
479+
480+
let mut subscribe_up_to = None;
481+
482+
// Find if there's an event that would trigger a mention for the current
483+
// user, iterating from the end of the new events towards the oldest,
484+
for ev in new_events.into_iter().rev() {
485+
if push_context
486+
.for_event(ev.raw())
487+
.await
488+
.into_iter()
489+
.any(|action| action.should_notify())
490+
{
491+
let Some(event_id) = ev.event_id() else {
492+
// Shouldn't happen.
493+
continue;
494+
};
495+
subscribe_up_to = Some(event_id);
496+
break;
497+
}
498+
}
499+
500+
// And if we've found such a mention, subscribe to the thread up to this
501+
// event.
502+
if let Some(event_id) = subscribe_up_to {
503+
trace!(thread = %thread_root, up_to = %event_id, "found a new thread to subscribe to");
504+
if let Err(err) = room.subscribe_thread_if_needed(&thread_root, Some(event_id)).await {
505+
warn!(%err, "Failed to subscribe to thread");
506+
} else {
507+
let _ = thread_subscriber_sender.send(());
508+
}
509+
}
510+
511+
return true;
512+
}
513+
425514
#[instrument(skip_all)]
426515
async fn thread_subscriber_task(
427516
client: WeakClient,
@@ -436,97 +525,23 @@ impl EventCache {
436525
let mut rx = linked_chunk_update_sender.subscribe();
437526

438527
loop {
439-
match rx.recv().await {
440-
Ok(up) => {
441-
let Some(client) = client.get() else {
442-
// Client shutting down.
443-
debug!("Client is shutting down, exiting thread subscriber task");
444-
break;
445-
};
446-
447-
let OwnedLinkedChunkId::Thread(room_id, thread_root) = &up.linked_chunk else {
448-
trace!("received an update for a non-thread linked chunk, ignoring");
449-
continue;
450-
};
451-
452-
let Some(room) = client.get_room(&room_id) else {
453-
warn!(%room_id, "unknown room");
454-
continue;
455-
};
456-
457-
let thread_root = thread_root.clone();
458-
459-
let new_events = up.events();
460-
if new_events.is_empty() {
461-
// No new events, nothing to do.
462-
continue;
463-
}
464-
465-
// This `PushContext` is going to be used to compute whether an in-thread event
466-
// would trigger a mention.
467-
//
468-
// Of course, we're not interested in an in-thread event causing a mention,
469-
// because it's part of a thread we've subscribed to. So the
470-
// `PushContext` must not include the check for thread subscriptions (otherwise
471-
// it would be impossible to subscribe to new threads).
472-
473-
let with_thread_subscriptions = false;
474-
475-
let Some(push_context) = room
476-
.push_context_internal(with_thread_subscriptions)
477-
.await
478-
.inspect_err(|err| {
479-
warn!("Failed to get push context for threads: {err}");
480-
})
481-
.ok()
482-
.flatten()
483-
else {
484-
warn!("Missing push context for thread subscriptions.");
485-
continue;
486-
};
487-
488-
let mut subscribe_up_to = None;
489-
490-
// Find if there's an event that would trigger a mention for the current
491-
// user, iterating from the end of the new events towards the oldest,
492-
for ev in new_events.into_iter().rev() {
493-
if push_context
494-
.for_event(ev.raw())
495-
.await
496-
.into_iter()
497-
.any(|action| action.should_notify())
498-
{
499-
let Some(event_id) = ev.event_id() else {
500-
// Shouldn't happen.
501-
continue;
502-
};
503-
subscribe_up_to = Some(event_id.to_owned());
528+
select! {
529+
res = rx.recv() => {
530+
match res {
531+
Ok(up) => {
532+
if !Self::handle_thread_subscriber_linked_chunk_update(&client, &thread_subscriber_sender, up).await {
533+
break;
534+
}
535+
}
536+
Err(RecvError::Closed) => {
537+
debug!("Linked chunk update channel has been closed, exiting thread subscriber task");
504538
break;
505539
}
506-
}
507-
508-
// And if we've found such a mention, subscribe to the thread up to this
509-
// event.
510-
if let Some(event_id) = subscribe_up_to {
511-
trace!(thread = %thread_root, up_to = %event_id, "found a new thread to subscribe to");
512-
if let Err(err) =
513-
room.subscribe_thread_if_needed(&thread_root, Some(event_id)).await
514-
{
515-
warn!(%err, "Failed to subscribe to thread");
516-
} else {
517-
let _ = thread_subscriber_sender.send(());
540+
Err(RecvError::Lagged(num_skipped)) => {
541+
warn!(num_skipped, "Lagged behind linked chunk updates");
518542
}
519543
}
520544
}
521-
522-
Err(RecvError::Closed) => {
523-
debug!("Linked chunk update channel has been closed, exiting thread subscriber task");
524-
break;
525-
}
526-
527-
Err(RecvError::Lagged(num_skipped)) => {
528-
warn!(num_skipped, "Lagged behind linked chunk updates");
529-
}
530545
}
531546
}
532547
}

0 commit comments

Comments
 (0)