28
28
#![ forbid( missing_docs) ]
29
29
30
30
use std:: {
31
- collections:: BTreeMap ,
31
+ collections:: { BTreeMap , HashMap } ,
32
32
fmt,
33
33
sync:: { Arc , OnceLock } ,
34
34
} ;
@@ -44,13 +44,18 @@ use matrix_sdk_base::{
44
44
} ,
45
45
executor:: AbortOnDrop ,
46
46
linked_chunk:: { self , lazy_loader:: LazyLoaderError , OwnedLinkedChunkId } ,
47
+ store:: SerializableEventContent ,
47
48
store_locks:: LockStoreError ,
48
49
sync:: RoomUpdates ,
49
50
timer,
50
51
} ;
51
52
use matrix_sdk_common:: executor:: { spawn, JoinHandle } ;
52
53
use room:: RoomEventCacheState ;
53
- use ruma:: { events:: AnySyncEphemeralRoomEvent , serde:: Raw , OwnedEventId , OwnedRoomId , RoomId } ;
54
+ use ruma:: {
55
+ events:: { room:: encrypted, AnySyncEphemeralRoomEvent } ,
56
+ serde:: Raw ,
57
+ OwnedEventId , OwnedRoomId , OwnedTransactionId , RoomId ,
58
+ } ;
54
59
use tokio:: {
55
60
select,
56
61
sync:: {
@@ -60,7 +65,11 @@ use tokio::{
60
65
} ;
61
66
use tracing:: { debug, error, info, info_span, instrument, trace, warn, Instrument as _, Span } ;
62
67
63
- use crate :: { client:: WeakClient , Client } ;
68
+ use crate :: {
69
+ client:: WeakClient ,
70
+ send_queue:: { LocalEchoContent , RoomSendQueueUpdate , SendQueueUpdate } ,
71
+ Client ,
72
+ } ;
64
73
65
74
mod deduplicator;
66
75
mod pagination;
@@ -420,6 +429,7 @@ impl EventCache {
420
429
self . inner . room_event_cache_generic_update_sender . subscribe ( )
421
430
}
422
431
432
+ #[ instrument( skip( client, thread_subscriber_sender) ) ]
423
433
async fn handle_thread_subscriber_linked_chunk_update (
424
434
client : & WeakClient ,
425
435
thread_subscriber_sender : & Sender < ( ) > ,
@@ -503,7 +513,106 @@ impl EventCache {
503
513
}
504
514
}
505
515
506
- return true ;
516
+ true
517
+ }
518
+
519
+ #[ instrument( skip( client, thread_subscriber_sender) ) ]
520
+ async fn handle_thread_subscriber_send_queue_update (
521
+ client : & WeakClient ,
522
+ thread_subscriber_sender : & Sender < ( ) > ,
523
+ events_being_sent : & mut HashMap < OwnedTransactionId , OwnedEventId > ,
524
+ up : SendQueueUpdate ,
525
+ ) -> bool {
526
+ let Some ( client) = client. get ( ) else {
527
+ // Client shutting down.
528
+ debug ! ( "Client is shutting down, exiting thread subscriber task" ) ;
529
+ return false ;
530
+ } ;
531
+
532
+ let room_id = up. room_id ;
533
+ let Some ( room) = client. get_room ( & room_id) else {
534
+ warn ! ( %room_id, "unknown room" ) ;
535
+ return true ;
536
+ } ;
537
+
538
+ let extract_thread_root = |serialized_event : SerializableEventContent | {
539
+ match serialized_event. deserialize ( ) {
540
+ Ok ( content) => {
541
+ if let Some ( encrypted:: Relation :: Thread ( thread) ) = content. relation ( ) {
542
+ return Some ( thread. event_id ) ;
543
+ }
544
+ }
545
+ Err ( err) => {
546
+ warn ! ( "error when deserializing content of a local echo: {err}" ) ;
547
+ }
548
+ }
549
+ None
550
+ } ;
551
+
552
+ let ( thread_root, subscribe_up_to) = match up. update {
553
+ RoomSendQueueUpdate :: NewLocalEvent ( local_echo) => {
554
+ match local_echo. content {
555
+ LocalEchoContent :: Event { serialized_event, .. } => {
556
+ if let Some ( thread_root) = extract_thread_root ( serialized_event) {
557
+ events_being_sent. insert ( local_echo. transaction_id , thread_root) ;
558
+ }
559
+ }
560
+
561
+ LocalEchoContent :: React { .. } => {
562
+ // Nothing to do, reactions don't count as a thread
563
+ // subscription.
564
+ }
565
+ }
566
+
567
+ return true ;
568
+ }
569
+
570
+ RoomSendQueueUpdate :: CancelledLocalEvent { transaction_id } => {
571
+ events_being_sent. remove ( & transaction_id) ;
572
+ return true ;
573
+ }
574
+
575
+ RoomSendQueueUpdate :: ReplacedLocalEvent { transaction_id, new_content } => {
576
+ if let Some ( thread_root) = extract_thread_root ( new_content) {
577
+ // If the new content is a thread event, we need to update the
578
+ // events being sent.
579
+ events_being_sent. insert ( transaction_id, thread_root) ;
580
+ } else {
581
+ // It could be that the thread root has been removed; handle that by removing
582
+ // the pending transaction id.
583
+ events_being_sent. remove ( & transaction_id) ;
584
+ }
585
+ return true ;
586
+ }
587
+
588
+ RoomSendQueueUpdate :: SentEvent { transaction_id, event_id } => {
589
+ if let Some ( thread_root) = events_being_sent. remove ( & transaction_id) {
590
+ ( thread_root, event_id)
591
+ } else {
592
+ // We don't know about the event that has been sent, so ignore it.
593
+ trace ! ( %transaction_id, "received a sent event that we didn't know about, ignoring" ) ;
594
+ return true ;
595
+ }
596
+ }
597
+
598
+ RoomSendQueueUpdate :: SendError { .. }
599
+ | RoomSendQueueUpdate :: RetryEvent { .. }
600
+ | RoomSendQueueUpdate :: MediaUpload { .. } => {
601
+ // Nothing to do for these bad boys.
602
+ return true ;
603
+ }
604
+ } ;
605
+
606
+ // And if we've found such a mention, subscribe to the thread up to this event.
607
+ trace ! ( thread = %thread_root, up_to = %subscribe_up_to, "found a new thread to subscribe to" ) ;
608
+ if let Err ( err) = room. subscribe_thread_if_needed ( & thread_root, Some ( subscribe_up_to) ) . await
609
+ {
610
+ warn ! ( %err, "Failed to subscribe to thread" ) ;
611
+ } else {
612
+ let _ = thread_subscriber_sender. send ( ( ) ) ;
613
+ }
614
+
615
+ true
507
616
}
508
617
509
618
#[ instrument( skip_all) ]
@@ -512,16 +621,42 @@ impl EventCache {
512
621
linked_chunk_update_sender : Sender < RoomEventCacheLinkedChunkUpdate > ,
513
622
thread_subscriber_sender : Sender < ( ) > ,
514
623
) {
515
- if client. get ( ) . map_or ( false , |client| !client. enabled_thread_subscriptions ( ) ) {
516
- trace ! ( "Not spawning the thread subscriber task, because the client is shutting down or is not interested in those" ) ;
624
+ let mut send_q_rx = if let Some ( client) = client. get ( ) {
625
+ if !client. enabled_thread_subscriptions ( ) {
626
+ trace ! ( "Thread subscriptions are not enabled, not spawning thread subscriber task" ) ;
627
+ return ;
628
+ }
629
+
630
+ client. send_queue ( ) . subscribe ( )
631
+ } else {
632
+ trace ! ( "Client is shutting down, not spawning thread subscriber task" ) ;
517
633
return ;
518
- }
634
+ } ;
519
635
520
- let mut rx = linked_chunk_update_sender. subscribe ( ) ;
636
+ let mut linked_chunk_rx = linked_chunk_update_sender. subscribe ( ) ;
637
+
638
+ let mut events_being_sent = HashMap :: new ( ) ;
521
639
522
640
loop {
523
641
select ! {
524
- res = rx. recv( ) => {
642
+ res = send_q_rx. recv( ) => {
643
+ match res {
644
+ Ok ( up) => {
645
+ if !Self :: handle_thread_subscriber_send_queue_update( & client, & thread_subscriber_sender, & mut events_being_sent, up) . await {
646
+ break ;
647
+ }
648
+ }
649
+ Err ( RecvError :: Closed ) => {
650
+ debug!( "Linked chunk update channel has been closed, exiting thread subscriber task" ) ;
651
+ break ;
652
+ }
653
+ Err ( RecvError :: Lagged ( num_skipped) ) => {
654
+ warn!( num_skipped, "Lagged behind linked chunk updates" ) ;
655
+ }
656
+ }
657
+ }
658
+
659
+ res = linked_chunk_rx. recv( ) => {
525
660
match res {
526
661
Ok ( up) => {
527
662
if !Self :: handle_thread_subscriber_linked_chunk_update( & client, & thread_subscriber_sender, up) . await {
0 commit comments