48
48
49
49
mod error;
50
50
mod latest_event;
51
+ mod room_latest_events;
51
52
52
53
use std:: {
53
54
collections:: { HashMap , HashSet } ,
@@ -61,19 +62,17 @@ use futures_util::FutureExt;
61
62
use latest_event:: LatestEvent ;
62
63
pub use latest_event:: { LatestEventValue , LocalLatestEventValue , RemoteLatestEventValue } ;
63
64
use matrix_sdk_common:: executor:: { AbortOnDrop , JoinHandleExt as _, spawn} ;
64
- use ruma:: { EventId , OwnedEventId , OwnedRoomId , RoomId } ;
65
+ use room_latest_events:: RoomLatestEvents ;
66
+ use ruma:: { EventId , OwnedRoomId , RoomId } ;
65
67
use tokio:: {
66
68
select,
67
- sync:: {
68
- OwnedRwLockReadGuard , OwnedRwLockWriteGuard , RwLock , RwLockReadGuard , RwLockWriteGuard ,
69
- broadcast, mpsc,
70
- } ,
69
+ sync:: { RwLock , RwLockReadGuard , RwLockWriteGuard , broadcast, mpsc} ,
71
70
} ;
72
71
use tracing:: { error, warn} ;
73
72
74
73
use crate :: {
75
74
client:: WeakClient ,
76
- event_cache:: { EventCache , EventCacheError , RoomEventCache , RoomEventCacheGenericUpdate } ,
75
+ event_cache:: { EventCache , RoomEventCacheGenericUpdate } ,
77
76
room:: WeakRoom ,
78
77
send_queue:: { RoomSendQueueUpdate , SendQueue , SendQueueUpdate } ,
79
78
} ;
@@ -447,200 +446,6 @@ enum LatestEventQueueUpdate {
447
446
} ,
448
447
}
449
448
450
- /// Type holding the [`LatestEvent`] for a room and for all its threads.
451
- #[ derive( Debug ) ]
452
- struct RoomLatestEvents {
453
- /// The state of this type.
454
- state : Arc < RwLock < RoomLatestEventsState > > ,
455
- }
456
-
457
- impl RoomLatestEvents {
458
- /// Create a new [`RoomLatestEvents`].
459
- async fn new (
460
- weak_room : WeakRoom ,
461
- event_cache : & EventCache ,
462
- ) -> Result < Option < Self > , LatestEventsError > {
463
- let room_id = weak_room. room_id ( ) ;
464
- let room_event_cache = match event_cache. for_room ( room_id) . await {
465
- // It's fine to drop the `EventCacheDropHandles` here as the caller
466
- // (`LatestEventState`) owns a clone of the `EventCache`.
467
- Ok ( ( room_event_cache, _drop_handles) ) => room_event_cache,
468
- Err ( EventCacheError :: RoomNotFound { .. } ) => return Ok ( None ) ,
469
- Err ( err) => return Err ( LatestEventsError :: EventCache ( err) ) ,
470
- } ;
471
-
472
- Ok ( Some ( Self {
473
- state : Arc :: new ( RwLock :: new ( RoomLatestEventsState {
474
- for_the_room : Self :: create_latest_event_for_inner (
475
- & weak_room,
476
- None ,
477
- & room_event_cache,
478
- )
479
- . await ,
480
- per_thread : HashMap :: new ( ) ,
481
- weak_room,
482
- room_event_cache,
483
- } ) ) ,
484
- } ) )
485
- }
486
-
487
- async fn create_latest_event_for_inner (
488
- weak_room : & WeakRoom ,
489
- thread_id : Option < & EventId > ,
490
- room_event_cache : & RoomEventCache ,
491
- ) -> LatestEvent {
492
- LatestEvent :: new ( weak_room, thread_id, room_event_cache) . await
493
- }
494
-
495
- /// Lock this type with shared read access, and return an owned lock guard.
496
- async fn read ( & self ) -> RoomLatestEventsReadGuard {
497
- RoomLatestEventsReadGuard { inner : self . state . clone ( ) . read_owned ( ) . await }
498
- }
499
-
500
- /// Lock this type with exclusive write access, and return an owned lock
501
- /// guard.
502
- async fn write ( & self ) -> RoomLatestEventsWriteGuard {
503
- RoomLatestEventsWriteGuard { inner : self . state . clone ( ) . write_owned ( ) . await }
504
- }
505
- }
506
-
507
- /// The state of [`RoomLatestEvents`].
508
- #[ derive( Debug ) ]
509
- struct RoomLatestEventsState {
510
- /// The latest event of the room.
511
- for_the_room : LatestEvent ,
512
-
513
- /// The latest events for each thread.
514
- per_thread : HashMap < OwnedEventId , LatestEvent > ,
515
-
516
- /// The room event cache associated to this room.
517
- room_event_cache : RoomEventCache ,
518
-
519
- /// The (weak) room.
520
- ///
521
- /// It used to to get the power-levels of the user for this room when
522
- /// computing the latest events.
523
- weak_room : WeakRoom ,
524
- }
525
-
526
- /// The owned lock guard returned by [`RoomLatestEvents::read`].
527
- struct RoomLatestEventsReadGuard {
528
- inner : OwnedRwLockReadGuard < RoomLatestEventsState > ,
529
- }
530
-
531
- impl RoomLatestEventsReadGuard {
532
- /// Get the [`LatestEvent`] for the room.
533
- fn for_room ( & self ) -> & LatestEvent {
534
- & self . inner . for_the_room
535
- }
536
-
537
- /// Get the [`LatestEvent`] for the thread if it exists.
538
- fn for_thread ( & self , thread_id : & EventId ) -> Option < & LatestEvent > {
539
- self . inner . per_thread . get ( thread_id)
540
- }
541
- }
542
-
543
- /// The owned lock guard returned by [`RoomLatestEvents::write`].
544
- struct RoomLatestEventsWriteGuard {
545
- inner : OwnedRwLockWriteGuard < RoomLatestEventsState > ,
546
- }
547
-
548
- impl RoomLatestEventsWriteGuard {
549
- async fn create_latest_event_for ( & self , thread_id : Option < & EventId > ) -> LatestEvent {
550
- RoomLatestEvents :: create_latest_event_for_inner (
551
- & self . inner . weak_room ,
552
- thread_id,
553
- & self . inner . room_event_cache ,
554
- )
555
- . await
556
- }
557
-
558
- /// Check whether this [`RoomLatestEvents`] has a latest event for a
559
- /// particular thread.
560
- fn has_thread ( & self , thread_id : & EventId ) -> bool {
561
- self . inner . per_thread . contains_key ( thread_id)
562
- }
563
-
564
- /// Create the [`LatestEvent`] for thread `thread_id` and insert it in this
565
- /// [`RoomLatestEvents`].
566
- async fn create_and_insert_latest_event_for_thread ( & mut self , thread_id : & EventId ) {
567
- let latest_event = self . create_latest_event_for ( Some ( thread_id) ) . await ;
568
-
569
- self . inner . per_thread . insert ( thread_id. to_owned ( ) , latest_event) ;
570
- }
571
-
572
- /// Forget the thread `thread_id`.
573
- fn forget_thread ( & mut self , thread_id : & EventId ) {
574
- self . inner . per_thread . remove ( thread_id) ;
575
- }
576
-
577
- /// Update the latest events for the room and its threads, based on the
578
- /// event cache data.
579
- async fn update_with_event_cache ( & mut self ) {
580
- // Get the power levels of the user for the current room if the `WeakRoom` is
581
- // still valid.
582
- //
583
- // Get it once for all the updates of all the latest events for this room (be
584
- // the room and its threads).
585
- let room = self . inner . weak_room . get ( ) ;
586
- let power_levels = match & room {
587
- Some ( room) => {
588
- let power_levels = room. power_levels ( ) . await . ok ( ) ;
589
-
590
- Some ( room. own_user_id ( ) ) . zip ( power_levels)
591
- }
592
-
593
- None => None ,
594
- } ;
595
-
596
- let inner = & mut * self . inner ;
597
- let for_the_room = & mut inner. for_the_room ;
598
- let per_thread = & mut inner. per_thread ;
599
- let room_event_cache = & inner. room_event_cache ;
600
-
601
- for_the_room. update_with_event_cache ( room_event_cache, & power_levels) . await ;
602
-
603
- for latest_event in per_thread. values_mut ( ) {
604
- latest_event. update_with_event_cache ( room_event_cache, & power_levels) . await ;
605
- }
606
- }
607
-
608
- /// Update the latest events for the room and its threads, based on the
609
- /// send queue update.
610
- async fn update_with_send_queue ( & mut self , send_queue_update : & RoomSendQueueUpdate ) {
611
- // Get the power levels of the user for the current room if the `WeakRoom` is
612
- // still valid.
613
- //
614
- // Get it once for all the updates of all the latest events for this room (be
615
- // the room and its threads).
616
- let room = self . inner . weak_room . get ( ) ;
617
- let power_levels = match & room {
618
- Some ( room) => {
619
- let power_levels = room. power_levels ( ) . await . ok ( ) ;
620
-
621
- Some ( room. own_user_id ( ) ) . zip ( power_levels)
622
- }
623
-
624
- None => None ,
625
- } ;
626
-
627
- let inner = & mut * self . inner ;
628
- let for_the_room = & mut inner. for_the_room ;
629
- let per_thread = & mut inner. per_thread ;
630
- let room_event_cache = & inner. room_event_cache ;
631
-
632
- for_the_room
633
- . update_with_send_queue ( send_queue_update, room_event_cache, & power_levels)
634
- . await ;
635
-
636
- for latest_event in per_thread. values_mut ( ) {
637
- latest_event
638
- . update_with_send_queue ( send_queue_update, room_event_cache, & power_levels)
639
- . await ;
640
- }
641
- }
642
- }
643
-
644
449
/// The task responsible to listen to the [`EventCache`] and the [`SendQueue`].
645
450
/// When an update is received and is considered relevant, a message is sent to
646
451
/// the [`compute_latest_events_task`] to compute a new [`LatestEvent`].
@@ -880,9 +685,9 @@ mod tests {
880
685
assert ! ( rooms. contains_key( room_id_1) ) ;
881
686
882
687
// Room 0 contains zero thread latest events.
883
- assert ! ( rooms. get( room_id_0) . unwrap( ) . read( ) . await . inner . per_thread. is_empty( ) ) ;
688
+ assert ! ( rooms. get( room_id_0) . unwrap( ) . read( ) . await . per_thread( ) . is_empty( ) ) ;
884
689
// Room 1 contains zero thread latest events.
885
- assert ! ( rooms. get( room_id_1) . unwrap( ) . read( ) . await . inner . per_thread. is_empty( ) ) ;
690
+ assert ! ( rooms. get( room_id_1) . unwrap( ) . read( ) . await . per_thread( ) . is_empty( ) ) ;
886
691
}
887
692
888
693
// Now let's listen to one thread respectively for two rooms.
@@ -899,17 +704,17 @@ mod tests {
899
704
assert ! ( rooms. contains_key( room_id_2) ) ;
900
705
901
706
// Room 0 contains zero thread latest events.
902
- assert ! ( rooms. get( room_id_0) . unwrap( ) . read( ) . await . inner . per_thread. is_empty( ) ) ;
707
+ assert ! ( rooms. get( room_id_0) . unwrap( ) . read( ) . await . per_thread( ) . is_empty( ) ) ;
903
708
// Room 1 contains one thread latest event…
904
709
let room_1 = rooms. get ( room_id_1) . unwrap ( ) . read ( ) . await ;
905
- assert_eq ! ( room_1. inner . per_thread. len( ) , 1 ) ;
710
+ assert_eq ! ( room_1. per_thread( ) . len( ) , 1 ) ;
906
711
// … which is thread 1.0.
907
- assert ! ( room_1. inner . per_thread. contains_key( thread_id_1_0) ) ;
712
+ assert ! ( room_1. per_thread( ) . contains_key( thread_id_1_0) ) ;
908
713
// Room 2 contains one thread latest event…
909
714
let room_2 = rooms. get ( room_id_2) . unwrap ( ) . read ( ) . await ;
910
- assert_eq ! ( room_2. inner . per_thread. len( ) , 1 ) ;
715
+ assert_eq ! ( room_2. per_thread( ) . len( ) , 1 ) ;
911
716
// … which is thread 2.0.
912
- assert ! ( room_2. inner . per_thread. contains_key( thread_id_2_0) ) ;
717
+ assert ! ( room_2. per_thread( ) . contains_key( thread_id_2_0) ) ;
913
718
}
914
719
}
915
720
@@ -939,7 +744,7 @@ mod tests {
939
744
assert ! ( rooms. contains_key( room_id_0) ) ;
940
745
941
746
// Room 0 contains zero thread latest events.
942
- assert ! ( rooms. get( room_id_0) . unwrap( ) . read( ) . await . inner . per_thread. is_empty( ) ) ;
747
+ assert ! ( rooms. get( room_id_0) . unwrap( ) . read( ) . await . per_thread( ) . is_empty( ) ) ;
943
748
}
944
749
945
750
// Now let's forget about room 0.
@@ -980,9 +785,9 @@ mod tests {
980
785
981
786
// Room 0 contains one thread latest event…
982
787
let room_0 = rooms. get ( room_id_0) . unwrap ( ) . read ( ) . await ;
983
- assert_eq ! ( room_0. inner . per_thread. len( ) , 1 ) ;
788
+ assert_eq ! ( room_0. per_thread( ) . len( ) , 1 ) ;
984
789
// … which is thread 0.0.
985
- assert ! ( room_0. inner . per_thread. contains_key( thread_id_0_0) ) ;
790
+ assert ! ( room_0. per_thread( ) . contains_key( thread_id_0_0) ) ;
986
791
}
987
792
988
793
// Now let's forget about the thread.
@@ -996,7 +801,7 @@ mod tests {
996
801
assert ! ( rooms. contains_key( room_id_0) ) ;
997
802
998
803
// But the thread has been removed.
999
- assert ! ( rooms. get( room_id_0) . unwrap( ) . read( ) . await . inner . per_thread. is_empty( ) ) ;
804
+ assert ! ( rooms. get( room_id_0) . unwrap( ) . read( ) . await . per_thread( ) . is_empty( ) ) ;
1000
805
}
1001
806
}
1002
807
0 commit comments