@@ -51,9 +51,12 @@ use matrix_sdk_base::{
51
51
use matrix_sdk_common:: executor:: { spawn, JoinHandle } ;
52
52
use room:: RoomEventCacheState ;
53
53
use ruma:: { events:: AnySyncEphemeralRoomEvent , serde:: Raw , OwnedEventId , OwnedRoomId , RoomId } ;
54
- use tokio:: sync:: {
55
- broadcast:: { channel, error:: RecvError , Receiver , Sender } ,
56
- mpsc, Mutex , RwLock ,
54
+ use tokio:: {
55
+ select,
56
+ sync:: {
57
+ broadcast:: { channel, error:: RecvError , Receiver , Sender } ,
58
+ mpsc, Mutex , RwLock ,
59
+ } ,
57
60
} ;
58
61
use tracing:: { debug, error, info, info_span, instrument, trace, warn, Instrument as _, Span } ;
59
62
@@ -415,6 +418,92 @@ impl EventCache {
415
418
self . inner . generic_update_sender . subscribe ( )
416
419
}
417
420
421
+ async fn handle_thread_subscriber_linked_chunk_update (
422
+ client : & WeakClient ,
423
+ thread_subscriber_sender : & Sender < ( ) > ,
424
+ up : RoomEventCacheLinkedChunkUpdate ,
425
+ ) -> bool {
426
+ let Some ( client) = client. get ( ) else {
427
+ // Client shutting down.
428
+ debug ! ( "Client is shutting down, exiting thread subscriber task" ) ;
429
+ return false ;
430
+ } ;
431
+
432
+ let OwnedLinkedChunkId :: Thread ( room_id, thread_root) = & up. linked_chunk else {
433
+ trace ! ( "received an update for a non-thread linked chunk, ignoring" ) ;
434
+ return true ;
435
+ } ;
436
+
437
+ let Some ( room) = client. get_room ( room_id) else {
438
+ warn ! ( %room_id, "unknown room" ) ;
439
+ return true ;
440
+ } ;
441
+
442
+ let thread_root = thread_root. clone ( ) ;
443
+
444
+ let new_events = up. events ( ) ;
445
+ if new_events. is_empty ( ) {
446
+ // No new events, nothing to do.
447
+ return true ;
448
+ }
449
+
450
+ // This `PushContext` is going to be used to compute whether an in-thread event
451
+ // would trigger a mention.
452
+ //
453
+ // Of course, we're not interested in an in-thread event causing a mention,
454
+ // because it's part of a thread we've subscribed to. So the
455
+ // `PushContext` must not include the check for thread subscriptions (otherwise
456
+ // it would be impossible to subscribe to new threads).
457
+
458
+ let with_thread_subscriptions = false ;
459
+
460
+ let Some ( push_context) = room
461
+ . push_context_internal ( with_thread_subscriptions)
462
+ . await
463
+ . inspect_err ( |err| {
464
+ warn ! ( "Failed to get push context for threads: {err}" ) ;
465
+ } )
466
+ . ok ( )
467
+ . flatten ( )
468
+ else {
469
+ warn ! ( "Missing push context for thread subscriptions." ) ;
470
+ return true ;
471
+ } ;
472
+
473
+ let mut subscribe_up_to = None ;
474
+
475
+ // Find if there's an event that would trigger a mention for the current
476
+ // user, iterating from the end of the new events towards the oldest,
477
+ for ev in new_events. into_iter ( ) . rev ( ) {
478
+ if push_context
479
+ . for_event ( ev. raw ( ) )
480
+ . await
481
+ . into_iter ( )
482
+ . any ( |action| action. should_notify ( ) )
483
+ {
484
+ let Some ( event_id) = ev. event_id ( ) else {
485
+ // Shouldn't happen.
486
+ continue ;
487
+ } ;
488
+ subscribe_up_to = Some ( event_id) ;
489
+ break ;
490
+ }
491
+ }
492
+
493
+ // And if we've found such a mention, subscribe to the thread up to this
494
+ // event.
495
+ if let Some ( event_id) = subscribe_up_to {
496
+ trace ! ( thread = %thread_root, up_to = %event_id, "found a new thread to subscribe to" ) ;
497
+ if let Err ( err) = room. subscribe_thread_if_needed ( & thread_root, Some ( event_id) ) . await {
498
+ warn ! ( %err, "Failed to subscribe to thread" ) ;
499
+ } else {
500
+ let _ = thread_subscriber_sender. send ( ( ) ) ;
501
+ }
502
+ }
503
+
504
+ return true ;
505
+ }
506
+
418
507
#[ instrument( skip_all) ]
419
508
async fn thread_subscriber_task (
420
509
client : WeakClient ,
@@ -429,97 +518,23 @@ impl EventCache {
429
518
let mut rx = linked_chunk_update_sender. subscribe ( ) ;
430
519
431
520
loop {
432
- match rx. recv ( ) . await {
433
- Ok ( up) => {
434
- let Some ( client) = client. get ( ) else {
435
- // Client shutting down.
436
- debug ! ( "Client is shutting down, exiting thread subscriber task" ) ;
437
- break ;
438
- } ;
439
-
440
- let OwnedLinkedChunkId :: Thread ( room_id, thread_root) = & up. linked_chunk else {
441
- trace ! ( "received an update for a non-thread linked chunk, ignoring" ) ;
442
- continue ;
443
- } ;
444
-
445
- let Some ( room) = client. get_room ( & room_id) else {
446
- warn ! ( %room_id, "unknown room" ) ;
447
- continue ;
448
- } ;
449
-
450
- let thread_root = thread_root. clone ( ) ;
451
-
452
- let new_events = up. events ( ) ;
453
- if new_events. is_empty ( ) {
454
- // No new events, nothing to do.
455
- continue ;
456
- }
457
-
458
- // This `PushContext` is going to be used to compute whether an in-thread event
459
- // would trigger a mention.
460
- //
461
- // Of course, we're not interested in an in-thread event causing a mention,
462
- // because it's part of a thread we've subscribed to. So the
463
- // `PushContext` must not include the check for thread subscriptions (otherwise
464
- // it would be impossible to subscribe to new threads).
465
-
466
- let with_thread_subscriptions = false ;
467
-
468
- let Some ( push_context) = room
469
- . push_context_internal ( with_thread_subscriptions)
470
- . await
471
- . inspect_err ( |err| {
472
- warn ! ( "Failed to get push context for threads: {err}" ) ;
473
- } )
474
- . ok ( )
475
- . flatten ( )
476
- else {
477
- warn ! ( "Missing push context for thread subscriptions." ) ;
478
- continue ;
479
- } ;
480
-
481
- let mut subscribe_up_to = None ;
482
-
483
- // Find if there's an event that would trigger a mention for the current
484
- // user, iterating from the end of the new events towards the oldest,
485
- for ev in new_events. into_iter ( ) . rev ( ) {
486
- if push_context
487
- . for_event ( ev. raw ( ) )
488
- . await
489
- . into_iter ( )
490
- . any ( |action| action. should_notify ( ) )
491
- {
492
- let Some ( event_id) = ev. event_id ( ) else {
493
- // Shouldn't happen.
494
- continue ;
495
- } ;
496
- subscribe_up_to = Some ( event_id. to_owned ( ) ) ;
521
+ select ! {
522
+ res = rx. recv( ) => {
523
+ match res {
524
+ Ok ( up) => {
525
+ if !Self :: handle_thread_subscriber_linked_chunk_update( & client, & thread_subscriber_sender, up) . await {
526
+ break ;
527
+ }
528
+ }
529
+ Err ( RecvError :: Closed ) => {
530
+ debug!( "Linked chunk update channel has been closed, exiting thread subscriber task" ) ;
497
531
break ;
498
532
}
499
- }
500
-
501
- // And if we've found such a mention, subscribe to the thread up to this
502
- // event.
503
- if let Some ( event_id) = subscribe_up_to {
504
- trace ! ( thread = %thread_root, up_to = %event_id, "found a new thread to subscribe to" ) ;
505
- if let Err ( err) =
506
- room. subscribe_thread_if_needed ( & thread_root, Some ( event_id) ) . await
507
- {
508
- warn ! ( %err, "Failed to subscribe to thread" ) ;
509
- } else {
510
- let _ = thread_subscriber_sender. send ( ( ) ) ;
533
+ Err ( RecvError :: Lagged ( num_skipped) ) => {
534
+ warn!( num_skipped, "Lagged behind linked chunk updates" ) ;
511
535
}
512
536
}
513
537
}
514
-
515
- Err ( RecvError :: Closed ) => {
516
- debug ! ( "Linked chunk update channel has been closed, exiting thread subscriber task" ) ;
517
- break ;
518
- }
519
-
520
- Err ( RecvError :: Lagged ( num_skipped) ) => {
521
- warn ! ( num_skipped, "Lagged behind linked chunk updates" ) ;
522
- }
523
538
}
524
539
}
525
540
}
0 commit comments