28
28
#![ forbid( missing_docs) ]
29
29
30
30
use std:: {
31
- collections:: BTreeMap ,
31
+ collections:: { BTreeMap , HashMap } ,
32
32
fmt,
33
33
sync:: { Arc , OnceLock } ,
34
34
} ;
@@ -44,13 +44,18 @@ use matrix_sdk_base::{
44
44
} ,
45
45
executor:: AbortOnDrop ,
46
46
linked_chunk:: { self , lazy_loader:: LazyLoaderError , OwnedLinkedChunkId } ,
47
+ store:: SerializableEventContent ,
47
48
store_locks:: LockStoreError ,
48
49
sync:: RoomUpdates ,
49
50
timer,
50
51
} ;
51
52
use matrix_sdk_common:: executor:: { spawn, JoinHandle } ;
52
53
use room:: RoomEventCacheState ;
53
- use ruma:: { events:: AnySyncEphemeralRoomEvent , serde:: Raw , OwnedEventId , OwnedRoomId , RoomId } ;
54
+ use ruma:: {
55
+ events:: { room:: encrypted, AnySyncEphemeralRoomEvent } ,
56
+ serde:: Raw ,
57
+ OwnedEventId , OwnedRoomId , OwnedTransactionId , RoomId ,
58
+ } ;
54
59
use tokio:: {
55
60
select,
56
61
sync:: {
@@ -60,7 +65,11 @@ use tokio::{
60
65
} ;
61
66
use tracing:: { debug, error, info, info_span, instrument, trace, warn, Instrument as _, Span } ;
62
67
63
- use crate :: { client:: WeakClient , Client } ;
68
+ use crate :: {
69
+ client:: WeakClient ,
70
+ send_queue:: { LocalEchoContent , RoomSendQueueUpdate , SendQueueUpdate } ,
71
+ Client ,
72
+ } ;
64
73
65
74
mod deduplicator;
66
75
mod pagination;
@@ -418,6 +427,7 @@ impl EventCache {
418
427
self . inner . generic_update_sender . subscribe ( )
419
428
}
420
429
430
+ #[ instrument( skip( client, thread_subscriber_sender) ) ]
421
431
async fn handle_thread_subscriber_linked_chunk_update (
422
432
client : & WeakClient ,
423
433
thread_subscriber_sender : & Sender < ( ) > ,
@@ -501,7 +511,102 @@ impl EventCache {
501
511
}
502
512
}
503
513
504
- return true ;
514
+ true
515
+ }
516
+
517
+ #[ instrument( skip( client, thread_subscriber_sender) ) ]
518
+ async fn handle_thread_subscriber_send_queue_update (
519
+ client : & WeakClient ,
520
+ thread_subscriber_sender : & Sender < ( ) > ,
521
+ events_being_sent : & mut HashMap < OwnedTransactionId , OwnedEventId > ,
522
+ up : SendQueueUpdate ,
523
+ ) -> bool {
524
+ let Some ( client) = client. get ( ) else {
525
+ // Client shutting down.
526
+ debug ! ( "Client is shutting down, exiting thread subscriber task" ) ;
527
+ return false ;
528
+ } ;
529
+
530
+ let room_id = up. room_id ;
531
+ let Some ( room) = client. get_room ( & room_id) else {
532
+ warn ! ( %room_id, "unknown room" ) ;
533
+ return true ;
534
+ } ;
535
+
536
+ let extract_thread_root = |serialized_event : SerializableEventContent | {
537
+ match serialized_event. deserialize ( ) {
538
+ Ok ( content) => {
539
+ if let Some ( encrypted:: Relation :: Thread ( thread) ) = content. relation ( ) {
540
+ return Some ( thread. event_id ) ;
541
+ }
542
+ }
543
+ Err ( err) => {
544
+ warn ! ( "error when deserializing content of a local echo: {err}" ) ;
545
+ }
546
+ }
547
+ None
548
+ } ;
549
+
550
+ let ( thread_root, subscribe_up_to) = match up. update {
551
+ RoomSendQueueUpdate :: NewLocalEvent ( local_echo) => {
552
+ match local_echo. content {
553
+ LocalEchoContent :: Event { serialized_event, .. } => {
554
+ if let Some ( thread_root) = extract_thread_root ( serialized_event) {
555
+ events_being_sent. insert ( local_echo. transaction_id , thread_root) ;
556
+ }
557
+ }
558
+ LocalEchoContent :: React { .. } => {
559
+ // Nothing to do, reactions don't count as a thread
560
+ // subscription.
561
+ }
562
+ }
563
+ return true ;
564
+ }
565
+
566
+ RoomSendQueueUpdate :: CancelledLocalEvent { transaction_id } => {
567
+ events_being_sent. remove ( & transaction_id) ;
568
+ return true ;
569
+ }
570
+
571
+ RoomSendQueueUpdate :: ReplacedLocalEvent { transaction_id, new_content } => {
572
+ if let Some ( thread_root) = extract_thread_root ( new_content) {
573
+ events_being_sent. insert ( transaction_id, thread_root) ;
574
+ } else {
575
+ // It could be that the event isn't part of a thread anymore; handle that by
576
+ // removing the pending transaction id.
577
+ events_being_sent. remove ( & transaction_id) ;
578
+ }
579
+ return true ;
580
+ }
581
+
582
+ RoomSendQueueUpdate :: SentEvent { transaction_id, event_id } => {
583
+ if let Some ( thread_root) = events_being_sent. remove ( & transaction_id) {
584
+ ( thread_root, event_id)
585
+ } else {
586
+ // We don't know about the event that has been sent, so ignore it.
587
+ trace ! ( %transaction_id, "received a sent event that we didn't know about, ignoring" ) ;
588
+ return true ;
589
+ }
590
+ }
591
+
592
+ RoomSendQueueUpdate :: SendError { .. }
593
+ | RoomSendQueueUpdate :: RetryEvent { .. }
594
+ | RoomSendQueueUpdate :: MediaUpload { .. } => {
595
+ // Nothing to do for these bad boys.
596
+ return true ;
597
+ }
598
+ } ;
599
+
600
+ // And if we've found such a mention, subscribe to the thread up to this event.
601
+ trace ! ( thread = %thread_root, up_to = %subscribe_up_to, "found a new thread to subscribe to" ) ;
602
+ if let Err ( err) = room. subscribe_thread_if_needed ( & thread_root, Some ( subscribe_up_to) ) . await
603
+ {
604
+ warn ! ( %err, "Failed to subscribe to thread" ) ;
605
+ } else {
606
+ let _ = thread_subscriber_sender. send ( ( ) ) ;
607
+ }
608
+
609
+ true
505
610
}
506
611
507
612
#[ instrument( skip_all) ]
@@ -510,16 +615,46 @@ impl EventCache {
510
615
linked_chunk_update_sender : Sender < RoomEventCacheLinkedChunkUpdate > ,
511
616
thread_subscriber_sender : Sender < ( ) > ,
512
617
) {
513
- if client. get ( ) . map_or ( false , |client| !client. enabled_thread_subscriptions ( ) ) {
514
- trace ! ( "Not spawning the thread subscriber task, because the client is shutting down or is not interested in those" ) ;
618
+ let mut send_q_rx = if let Some ( client) = client. get ( ) {
619
+ if !client. enabled_thread_subscriptions ( ) {
620
+ trace ! ( "Thread subscriptions are not enabled, not spawning thread subscriber task" ) ;
621
+ return ;
622
+ }
623
+
624
+ client. send_queue ( ) . subscribe ( )
625
+ } else {
626
+ trace ! ( "Client is shutting down, not spawning thread subscriber task" ) ;
515
627
return ;
516
- }
628
+ } ;
629
+
630
+ let mut linked_chunk_rx = linked_chunk_update_sender. subscribe ( ) ;
517
631
518
- let mut rx = linked_chunk_update_sender. subscribe ( ) ;
632
+ // A mapping of local echoes (events being sent), to their thread root, if
633
+ // they're in an in-thread reply.
634
+ //
635
+ // Entirely managed by `handle_thread_subscriber_send_queue_update`.
636
+ let mut events_being_sent = HashMap :: new ( ) ;
519
637
520
638
loop {
521
639
select ! {
522
- res = rx. recv( ) => {
640
+ res = send_q_rx. recv( ) => {
641
+ match res {
642
+ Ok ( up) => {
643
+ if !Self :: handle_thread_subscriber_send_queue_update( & client, & thread_subscriber_sender, & mut events_being_sent, up) . await {
644
+ break ;
645
+ }
646
+ }
647
+ Err ( RecvError :: Closed ) => {
648
+ debug!( "Linked chunk update channel has been closed, exiting thread subscriber task" ) ;
649
+ break ;
650
+ }
651
+ Err ( RecvError :: Lagged ( num_skipped) ) => {
652
+ warn!( num_skipped, "Lagged behind linked chunk updates" ) ;
653
+ }
654
+ }
655
+ }
656
+
657
+ res = linked_chunk_rx. recv( ) => {
523
658
match res {
524
659
Ok ( up) => {
525
660
if !Self :: handle_thread_subscriber_linked_chunk_update( & client, & thread_subscriber_sender, up) . await {
0 commit comments