@@ -57,7 +57,7 @@ use std::{
57
57
58
58
pub use error:: LatestEventsError ;
59
59
use eyeball:: { AsyncLock , Subscriber } ;
60
- use futures_util:: { select , FutureExt } ;
60
+ use futures_util:: { select_biased , FutureExt } ;
61
61
use latest_event:: LatestEvent ;
62
62
pub use latest_event:: LatestEventValue ;
63
63
use matrix_sdk_common:: executor:: { spawn, AbortOnDrop , JoinHandleExt as _} ;
@@ -69,7 +69,7 @@ use crate::{
69
69
client:: WeakClient ,
70
70
event_cache:: { EventCache , EventCacheError , RoomEventCache , RoomEventCacheGenericUpdate } ,
71
71
room:: WeakRoom ,
72
- send_queue:: SendQueue ,
72
+ send_queue:: { RoomSendQueueUpdate , SendQueue , SendQueueUpdate } ,
73
73
} ;
74
74
75
75
/// The entry point to fetch the [`LatestEventValue`] for rooms or threads.
@@ -408,6 +408,25 @@ enum RoomRegistration {
408
408
Remove ( OwnedRoomId ) ,
409
409
}
410
410
411
+ /// Represents the kind of updates the [`compute_latest_events_task`] will have
412
+ /// to deal with.
413
+ enum LatestEventQueueUpdate {
414
+ /// An update from the [`EventCache`] happened.
415
+ EventCache {
416
+ /// The ID of the room that has triggered the update.
417
+ room_id : OwnedRoomId ,
418
+ } ,
419
+
420
+ /// An update from the [`SendQueue`] happened.
421
+ SendQueue {
422
+ /// The ID of the room that has triggered the update.
423
+ room_id : OwnedRoomId ,
424
+
425
+ /// The update itself.
426
+ update : RoomSendQueueUpdate ,
427
+ } ,
428
+ }
429
+
411
430
/// Type holding the [`LatestEvent`] for a room and for all its threads.
412
431
#[ derive( Debug ) ]
413
432
struct RoomLatestEvents {
@@ -529,11 +548,12 @@ async fn listen_to_event_cache_and_send_queue_updates_task(
529
548
registered_rooms : Arc < RegisteredRooms > ,
530
549
mut room_registration_receiver : mpsc:: Receiver < RoomRegistration > ,
531
550
event_cache : EventCache ,
532
- _send_queue : SendQueue ,
533
- latest_event_queue_sender : mpsc:: UnboundedSender < OwnedRoomId > ,
551
+ send_queue : SendQueue ,
552
+ latest_event_queue_sender : mpsc:: UnboundedSender < LatestEventQueueUpdate > ,
534
553
) {
535
554
let mut event_cache_generic_updates_subscriber =
536
555
event_cache. subscribe_to_room_generic_updates ( ) ;
556
+ let mut send_queue_generic_updates_subscriber = send_queue. subscribe ( ) ;
537
557
538
558
// Initialise the list of rooms that are listened.
539
559
//
@@ -547,6 +567,7 @@ async fn listen_to_event_cache_and_send_queue_updates_task(
547
567
if listen_to_event_cache_and_send_queue_updates (
548
568
& mut room_registration_receiver,
549
569
& mut event_cache_generic_updates_subscriber,
570
+ & mut send_queue_generic_updates_subscriber,
550
571
& mut listened_rooms,
551
572
& latest_event_queue_sender,
552
573
)
@@ -567,10 +588,13 @@ async fn listen_to_event_cache_and_send_queue_updates_task(
567
588
async fn listen_to_event_cache_and_send_queue_updates (
568
589
room_registration_receiver : & mut mpsc:: Receiver < RoomRegistration > ,
569
590
event_cache_generic_updates_subscriber : & mut broadcast:: Receiver < RoomEventCacheGenericUpdate > ,
591
+ send_queue_generic_updates_subscriber : & mut broadcast:: Receiver < SendQueueUpdate > ,
570
592
listened_rooms : & mut HashSet < OwnedRoomId > ,
571
- latest_event_queue_sender : & mpsc:: UnboundedSender < OwnedRoomId > ,
593
+ latest_event_queue_sender : & mpsc:: UnboundedSender < LatestEventQueueUpdate > ,
572
594
) -> ControlFlow < ( ) > {
573
- select ! {
595
+ // We need a biased select here: `room_registration_receiver` must have the
596
+ // priority over other futures.
597
+ select_biased ! {
574
598
update = room_registration_receiver. recv( ) . fuse( ) => {
575
599
match update {
576
600
Some ( RoomRegistration :: Add ( room_id) ) => {
@@ -592,14 +616,31 @@ async fn listen_to_event_cache_and_send_queue_updates(
592
616
let room_id = room_event_cache_generic_update. room_id( ) ;
593
617
594
618
if listened_rooms. contains( room_id) {
595
- let _ = latest_event_queue_sender. send( room_id. to_owned( ) ) ;
619
+ let _ = latest_event_queue_sender. send( LatestEventQueueUpdate :: EventCache {
620
+ room_id: room_id. to_owned( )
621
+ } ) ;
596
622
}
597
623
} else {
598
624
error!( "`event_cache_generic_updates` channel has been closed" ) ;
599
625
600
626
return ControlFlow :: Break ( ( ) ) ;
601
627
}
602
628
}
629
+
630
+ send_queue_generic_update = send_queue_generic_updates_subscriber. recv( ) . fuse( ) => {
631
+ if let Ok ( SendQueueUpdate { room_id, update } ) = send_queue_generic_update {
632
+ if listened_rooms. contains( & room_id) {
633
+ let _ = latest_event_queue_sender. send( LatestEventQueueUpdate :: SendQueue {
634
+ room_id,
635
+ update
636
+ } ) ;
637
+ }
638
+ } else {
639
+ error!( "`send_queue_generic_updates` channel has been closed" ) ;
640
+
641
+ return ControlFlow :: Break ( ( ) ) ;
642
+ }
643
+ }
603
644
}
604
645
605
646
ControlFlow :: Continue ( ( ) )
@@ -612,7 +653,7 @@ async fn listen_to_event_cache_and_send_queue_updates(
612
653
/// [`listen_to_event_cache_and_send_queue_updates_task`].
613
654
async fn compute_latest_events_task (
614
655
registered_rooms : Arc < RegisteredRooms > ,
615
- mut latest_event_queue_receiver : mpsc:: UnboundedReceiver < OwnedRoomId > ,
656
+ mut latest_event_queue_receiver : mpsc:: UnboundedReceiver < LatestEventQueueUpdate > ,
616
657
) {
617
658
const BUFFER_SIZE : usize = 16 ;
618
659
@@ -626,16 +667,29 @@ async fn compute_latest_events_task(
626
667
error ! ( "`compute_latest_events_task` has stopped" ) ;
627
668
}
628
669
629
- async fn compute_latest_events ( registered_rooms : & RegisteredRooms , for_rooms : & [ OwnedRoomId ] ) {
630
- for room_id in for_rooms {
631
- let mut rooms = registered_rooms. rooms . write ( ) . await ;
670
+ async fn compute_latest_events (
671
+ registered_rooms : & RegisteredRooms ,
672
+ latest_event_queue_updates : & [ LatestEventQueueUpdate ] ,
673
+ ) {
674
+ for latest_event_queue_update in latest_event_queue_updates {
675
+ match latest_event_queue_update {
676
+ LatestEventQueueUpdate :: EventCache { room_id } => {
677
+ let mut rooms = registered_rooms. rooms . write ( ) . await ;
678
+
679
+ if let Some ( room_latest_events) = rooms. get_mut ( room_id) {
680
+ room_latest_events. update ( ) . await ;
681
+ } else {
682
+ error ! ( ?room_id, "Failed to find the room" ) ;
683
+
684
+ continue ;
685
+ }
686
+ }
632
687
633
- if let Some ( room_latest_events) = rooms. get_mut ( room_id) {
634
- room_latest_events. update ( ) . await ;
635
- } else {
636
- error ! ( ?room_id, "Failed to find the room" ) ;
688
+ LatestEventQueueUpdate :: SendQueue { room_id, update } => {
689
+ // let mut rooms = registered_rooms.rooms.write().await;
637
690
638
- continue ;
691
+ todo ! ( )
692
+ }
639
693
}
640
694
}
641
695
}
@@ -650,12 +704,12 @@ mod tests {
650
704
RoomState ,
651
705
} ;
652
706
use matrix_sdk_test:: { async_test, event_factory:: EventFactory , JoinedRoomBuilder } ;
653
- use ruma:: { event_id, owned_room_id, room_id, user_id} ;
707
+ use ruma:: { event_id, owned_room_id, room_id, user_id, OwnedTransactionId } ;
654
708
use stream_assert:: assert_pending;
655
709
656
710
use super :: {
657
711
broadcast, listen_to_event_cache_and_send_queue_updates, mpsc, HashSet , LatestEventValue ,
658
- RoomEventCacheGenericUpdate , RoomRegistration ,
712
+ RoomEventCacheGenericUpdate , RoomRegistration , RoomSendQueueUpdate , SendQueueUpdate ,
659
713
} ;
660
714
use crate :: test_utils:: mocks:: MatrixMockServer ;
661
715
@@ -819,6 +873,8 @@ mod tests {
819
873
let ( room_registration_sender, mut room_registration_receiver) = mpsc:: channel ( 1 ) ;
820
874
let ( _room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
821
875
broadcast:: channel ( 1 ) ;
876
+ let ( _send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
877
+ broadcast:: channel ( 1 ) ;
822
878
let mut listened_rooms = HashSet :: new ( ) ;
823
879
let ( latest_event_queue_sender, latest_event_queue_receiver) = mpsc:: unbounded_channel ( ) ;
824
880
@@ -831,6 +887,7 @@ mod tests {
831
887
assert ! ( listen_to_event_cache_and_send_queue_updates(
832
888
& mut room_registration_receiver,
833
889
& mut room_event_cache_generic_update_receiver,
890
+ & mut send_queue_generic_update_receiver,
834
891
& mut listened_rooms,
835
892
& latest_event_queue_sender,
836
893
)
@@ -850,6 +907,7 @@ mod tests {
850
907
assert ! ( listen_to_event_cache_and_send_queue_updates(
851
908
& mut room_registration_receiver,
852
909
& mut room_event_cache_generic_update_receiver,
910
+ & mut send_queue_generic_update_receiver,
853
911
& mut listened_rooms,
854
912
& latest_event_queue_sender,
855
913
)
@@ -873,6 +931,7 @@ mod tests {
873
931
assert ! ( listen_to_event_cache_and_send_queue_updates(
874
932
& mut room_registration_receiver,
875
933
& mut room_event_cache_generic_update_receiver,
934
+ & mut send_queue_generic_update_receiver,
876
935
& mut listened_rooms,
877
936
& latest_event_queue_sender,
878
937
)
@@ -892,6 +951,8 @@ mod tests {
892
951
let ( _room_registration_sender, mut room_registration_receiver) = mpsc:: channel ( 1 ) ;
893
952
let ( _room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
894
953
broadcast:: channel ( 1 ) ;
954
+ let ( _send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
955
+ broadcast:: channel ( 1 ) ;
895
956
let mut listened_rooms = HashSet :: new ( ) ;
896
957
let ( latest_event_queue_sender, latest_event_queue_receiver) = mpsc:: unbounded_channel ( ) ;
897
958
@@ -902,6 +963,7 @@ mod tests {
902
963
assert ! ( listen_to_event_cache_and_send_queue_updates(
903
964
& mut room_registration_receiver,
904
965
& mut room_event_cache_generic_update_receiver,
966
+ & mut send_queue_generic_update_receiver,
905
967
& mut listened_rooms,
906
968
& latest_event_queue_sender,
907
969
)
@@ -920,6 +982,8 @@ mod tests {
920
982
let ( room_registration_sender, mut room_registration_receiver) = mpsc:: channel ( 1 ) ;
921
983
let ( room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
922
984
broadcast:: channel ( 1 ) ;
985
+ let ( _send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
986
+ broadcast:: channel ( 1 ) ;
923
987
let mut listened_rooms = HashSet :: new ( ) ;
924
988
let ( latest_event_queue_sender, latest_event_queue_receiver) = mpsc:: unbounded_channel ( ) ;
925
989
@@ -933,6 +997,7 @@ mod tests {
933
997
assert ! ( listen_to_event_cache_and_send_queue_updates(
934
998
& mut room_registration_receiver,
935
999
& mut room_event_cache_generic_update_receiver,
1000
+ & mut send_queue_generic_update_receiver,
936
1001
& mut listened_rooms,
937
1002
& latest_event_queue_sender,
938
1003
)
@@ -958,6 +1023,82 @@ mod tests {
958
1023
assert ! ( listen_to_event_cache_and_send_queue_updates(
959
1024
& mut room_registration_receiver,
960
1025
& mut room_event_cache_generic_update_receiver,
1026
+ & mut send_queue_generic_update_receiver,
1027
+ & mut listened_rooms,
1028
+ & latest_event_queue_sender,
1029
+ )
1030
+ . await
1031
+ . is_continue( ) ) ;
1032
+ }
1033
+
1034
+ assert_eq ! ( listened_rooms. len( ) , 1 ) ;
1035
+ assert ! ( listened_rooms. contains( & room_id) ) ;
1036
+
1037
+ // A latest event computation has been triggered!
1038
+ assert ! ( latest_event_queue_receiver. is_empty( ) . not( ) ) ;
1039
+ }
1040
+ }
1041
+
1042
+ #[ async_test]
1043
+ async fn test_inputs_task_can_listen_to_send_queue ( ) {
1044
+ let room_id = owned_room_id ! ( "!r0" ) ;
1045
+
1046
+ let ( room_registration_sender, mut room_registration_receiver) = mpsc:: channel ( 1 ) ;
1047
+ let ( _room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
1048
+ broadcast:: channel ( 1 ) ;
1049
+ let ( send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
1050
+ broadcast:: channel ( 1 ) ;
1051
+ let mut listened_rooms = HashSet :: new ( ) ;
1052
+ let ( latest_event_queue_sender, latest_event_queue_receiver) = mpsc:: unbounded_channel ( ) ;
1053
+
1054
+ // New send queue update, but the `LatestEvents` isn't listening to it.
1055
+ {
1056
+ send_queue_generic_update_sender
1057
+ . send ( SendQueueUpdate {
1058
+ room_id : room_id. clone ( ) ,
1059
+ update : RoomSendQueueUpdate :: SentEvent {
1060
+ transaction_id : OwnedTransactionId :: from ( "txnid0" ) ,
1061
+ event_id : event_id ! ( "$ev0" ) . to_owned ( ) ,
1062
+ } ,
1063
+ } )
1064
+ . unwrap ( ) ;
1065
+
1066
+ // Run the task.
1067
+ assert ! ( listen_to_event_cache_and_send_queue_updates(
1068
+ & mut room_registration_receiver,
1069
+ & mut room_event_cache_generic_update_receiver,
1070
+ & mut send_queue_generic_update_receiver,
1071
+ & mut listened_rooms,
1072
+ & latest_event_queue_sender,
1073
+ )
1074
+ . await
1075
+ . is_continue( ) ) ;
1076
+
1077
+ assert ! ( listened_rooms. is_empty( ) ) ;
1078
+
1079
+ // No latest event computation has been triggered.
1080
+ assert ! ( latest_event_queue_receiver. is_empty( ) ) ;
1081
+ }
1082
+
1083
+ // New send queue update, but this time, the `LatestEvents` is listening to it.
1084
+ {
1085
+ room_registration_sender. send ( RoomRegistration :: Add ( room_id. clone ( ) ) ) . await . unwrap ( ) ;
1086
+ send_queue_generic_update_sender
1087
+ . send ( SendQueueUpdate {
1088
+ room_id : room_id. clone ( ) ,
1089
+ update : RoomSendQueueUpdate :: SentEvent {
1090
+ transaction_id : OwnedTransactionId :: from ( "txnid1" ) ,
1091
+ event_id : event_id ! ( "$ev1" ) . to_owned ( ) ,
1092
+ } ,
1093
+ } )
1094
+ . unwrap ( ) ;
1095
+
1096
+ // Run the task to handle the `RoomRegistration` and the `SendQueueUpdate`.
1097
+ for _ in 0 ..2 {
1098
+ assert ! ( listen_to_event_cache_and_send_queue_updates(
1099
+ & mut room_registration_receiver,
1100
+ & mut room_event_cache_generic_update_receiver,
1101
+ & mut send_queue_generic_update_receiver,
961
1102
& mut listened_rooms,
962
1103
& latest_event_queue_sender,
963
1104
)
@@ -978,6 +1119,8 @@ mod tests {
978
1119
let ( _room_registration_sender, mut room_registration_receiver) = mpsc:: channel ( 1 ) ;
979
1120
let ( room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
980
1121
broadcast:: channel ( 1 ) ;
1122
+ let ( _send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
1123
+ broadcast:: channel ( 1 ) ;
981
1124
let mut listened_rooms = HashSet :: new ( ) ;
982
1125
let ( latest_event_queue_sender, latest_event_queue_receiver) = mpsc:: unbounded_channel ( ) ;
983
1126
@@ -988,6 +1131,36 @@ mod tests {
988
1131
assert ! ( listen_to_event_cache_and_send_queue_updates(
989
1132
& mut room_registration_receiver,
990
1133
& mut room_event_cache_generic_update_receiver,
1134
+ & mut send_queue_generic_update_receiver,
1135
+ & mut listened_rooms,
1136
+ & latest_event_queue_sender,
1137
+ )
1138
+ . await
1139
+ // It breaks!
1140
+ . is_break( ) ) ;
1141
+
1142
+ assert_eq ! ( listened_rooms. len( ) , 0 ) ;
1143
+ assert ! ( latest_event_queue_receiver. is_empty( ) ) ;
1144
+ }
1145
+
1146
+ #[ async_test]
1147
+ async fn test_inputs_task_stops_when_send_queue_channel_is_closed ( ) {
1148
+ let ( _room_registration_sender, mut room_registration_receiver) = mpsc:: channel ( 1 ) ;
1149
+ let ( _room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
1150
+ broadcast:: channel ( 1 ) ;
1151
+ let ( send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
1152
+ broadcast:: channel ( 1 ) ;
1153
+ let mut listened_rooms = HashSet :: new ( ) ;
1154
+ let ( latest_event_queue_sender, latest_event_queue_receiver) = mpsc:: unbounded_channel ( ) ;
1155
+
1156
+ // Drop the sender to close the channel.
1157
+ drop ( send_queue_generic_update_sender) ;
1158
+
1159
+ // Run the task.
1160
+ assert ! ( listen_to_event_cache_and_send_queue_updates(
1161
+ & mut room_registration_receiver,
1162
+ & mut room_event_cache_generic_update_receiver,
1163
+ & mut send_queue_generic_update_receiver,
991
1164
& mut listened_rooms,
992
1165
& latest_event_queue_sender,
993
1166
)
0 commit comments