Skip to content

Commit 95522bb

Browse files
committed
refactor(event cache): move the listening to linked chunk updates to its own function, and introduce a select! at the top level
1 parent 2376763 commit 95522bb

File tree

1 file changed

+104
-89
lines changed
  • crates/matrix-sdk/src/event_cache

1 file changed

+104
-89
lines changed

crates/matrix-sdk/src/event_cache/mod.rs

Lines changed: 104 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,12 @@ use matrix_sdk_base::{
5151
use matrix_sdk_common::executor::{spawn, JoinHandle};
5252
use room::RoomEventCacheState;
5353
use ruma::{events::AnySyncEphemeralRoomEvent, serde::Raw, OwnedEventId, OwnedRoomId, RoomId};
54-
use tokio::sync::{
55-
broadcast::{channel, error::RecvError, Receiver, Sender},
56-
mpsc, Mutex, RwLock,
54+
use tokio::{
55+
select,
56+
sync::{
57+
broadcast::{channel, error::RecvError, Receiver, Sender},
58+
mpsc, Mutex, RwLock,
59+
},
5760
};
5861
use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument as _, Span};
5962

@@ -417,6 +420,92 @@ impl EventCache {
417420
self.inner.room_event_cache_generic_update_sender.subscribe()
418421
}
419422

423+
async fn handle_thread_subscriber_linked_chunk_update(
424+
client: &WeakClient,
425+
thread_subscriber_sender: &Sender<()>,
426+
up: RoomEventCacheLinkedChunkUpdate,
427+
) -> bool {
428+
let Some(client) = client.get() else {
429+
// Client shutting down.
430+
debug!("Client is shutting down, exiting thread subscriber task");
431+
return false;
432+
};
433+
434+
let OwnedLinkedChunkId::Thread(room_id, thread_root) = &up.linked_chunk else {
435+
trace!("received an update for a non-thread linked chunk, ignoring");
436+
return true;
437+
};
438+
439+
let Some(room) = client.get_room(&room_id) else {
440+
warn!(%room_id, "unknown room");
441+
return true;
442+
};
443+
444+
let thread_root = thread_root.clone();
445+
446+
let new_events = up.events();
447+
if new_events.is_empty() {
448+
// No new events, nothing to do.
449+
return true;
450+
}
451+
452+
// This `PushContext` is going to be used to compute whether an in-thread event
453+
// would trigger a mention.
454+
//
455+
// Of course, we're not interested in an in-thread event causing a mention,
456+
// because it's part of a thread we've subscribed to. So the
457+
// `PushContext` must not include the check for thread subscriptions (otherwise
458+
// it would be impossible to subscribe to new threads).
459+
460+
let with_thread_subscriptions = false;
461+
462+
let Some(push_context) = room
463+
.push_context_internal(with_thread_subscriptions)
464+
.await
465+
.inspect_err(|err| {
466+
warn!("Failed to get push context for threads: {err}");
467+
})
468+
.ok()
469+
.flatten()
470+
else {
471+
warn!("Missing push context for thread subscriptions.");
472+
return true;
473+
};
474+
475+
let mut subscribe_up_to = None;
476+
477+
// Find if there's an event that would trigger a mention for the current
478+
// user, iterating from the end of the new events towards the oldest,
479+
for ev in new_events.into_iter().rev() {
480+
if push_context
481+
.for_event(ev.raw())
482+
.await
483+
.into_iter()
484+
.any(|action| action.should_notify())
485+
{
486+
let Some(event_id) = ev.event_id() else {
487+
// Shouldn't happen.
488+
continue;
489+
};
490+
subscribe_up_to = Some(event_id.to_owned());
491+
break;
492+
}
493+
}
494+
495+
// And if we've found such a mention, subscribe to the thread up to this
496+
// event.
497+
if let Some(event_id) = subscribe_up_to {
498+
trace!(thread = %thread_root, up_to = %event_id, "found a new thread to subscribe to");
499+
if let Err(err) = room.subscribe_thread_if_needed(&thread_root, Some(event_id)).await {
500+
warn!(%err, "Failed to subscribe to thread");
501+
} else {
502+
let _ = thread_subscriber_sender.send(());
503+
}
504+
}
505+
506+
return true;
507+
}
508+
420509
#[instrument(skip_all)]
421510
async fn thread_subscriber_task(
422511
client: WeakClient,
@@ -431,97 +520,23 @@ impl EventCache {
431520
let mut rx = linked_chunk_update_sender.subscribe();
432521

433522
loop {
434-
match rx.recv().await {
435-
Ok(up) => {
436-
let Some(client) = client.get() else {
437-
// Client shutting down.
438-
debug!("Client is shutting down, exiting thread subscriber task");
439-
break;
440-
};
441-
442-
let OwnedLinkedChunkId::Thread(room_id, thread_root) = &up.linked_chunk else {
443-
trace!("received an update for a non-thread linked chunk, ignoring");
444-
continue;
445-
};
446-
447-
let Some(room) = client.get_room(&room_id) else {
448-
warn!(%room_id, "unknown room");
449-
continue;
450-
};
451-
452-
let thread_root = thread_root.clone();
453-
454-
let new_events = up.events();
455-
if new_events.is_empty() {
456-
// No new events, nothing to do.
457-
continue;
458-
}
459-
460-
// This `PushContext` is going to be used to compute whether an in-thread event
461-
// would trigger a mention.
462-
//
463-
// Of course, we're not interested in an in-thread event causing a mention,
464-
// because it's part of a thread we've subscribed to. So the
465-
// `PushContext` must not include the check for thread subscriptions (otherwise
466-
// it would be impossible to subscribe to new threads).
467-
468-
let with_thread_subscriptions = false;
469-
470-
let Some(push_context) = room
471-
.push_context_internal(with_thread_subscriptions)
472-
.await
473-
.inspect_err(|err| {
474-
warn!("Failed to get push context for threads: {err}");
475-
})
476-
.ok()
477-
.flatten()
478-
else {
479-
warn!("Missing push context for thread subscriptions.");
480-
continue;
481-
};
482-
483-
let mut subscribe_up_to = None;
484-
485-
// Find if there's an event that would trigger a mention for the current
486-
// user, iterating from the end of the new events towards the oldest,
487-
for ev in new_events.into_iter().rev() {
488-
if push_context
489-
.for_event(ev.raw())
490-
.await
491-
.into_iter()
492-
.any(|action| action.should_notify())
493-
{
494-
let Some(event_id) = ev.event_id() else {
495-
// Shouldn't happen.
496-
continue;
497-
};
498-
subscribe_up_to = Some(event_id.to_owned());
523+
select! {
524+
res = rx.recv() => {
525+
match res {
526+
Ok(up) => {
527+
if !Self::handle_thread_subscriber_linked_chunk_update(&client, &thread_subscriber_sender, up).await {
528+
break;
529+
}
530+
}
531+
Err(RecvError::Closed) => {
532+
debug!("Linked chunk update channel has been closed, exiting thread subscriber task");
499533
break;
500534
}
501-
}
502-
503-
// And if we've found such a mention, subscribe to the thread up to this
504-
// event.
505-
if let Some(event_id) = subscribe_up_to {
506-
trace!(thread = %thread_root, up_to = %event_id, "found a new thread to subscribe to");
507-
if let Err(err) =
508-
room.subscribe_thread_if_needed(&thread_root, Some(event_id)).await
509-
{
510-
warn!(%err, "Failed to subscribe to thread");
511-
} else {
512-
let _ = thread_subscriber_sender.send(());
535+
Err(RecvError::Lagged(num_skipped)) => {
536+
warn!(num_skipped, "Lagged behind linked chunk updates");
513537
}
514538
}
515539
}
516-
517-
Err(RecvError::Closed) => {
518-
debug!("Linked chunk update channel has been closed, exiting thread subscriber task");
519-
break;
520-
}
521-
522-
Err(RecvError::Lagged(num_skipped)) => {
523-
warn!(num_skipped, "Lagged behind linked chunk updates");
524-
}
525540
}
526541
}
527542
}

0 commit comments

Comments
 (0)