@@ -433,6 +433,12 @@ impl EventCache {
433
433
self . inner . generic_update_sender . subscribe ( )
434
434
}
435
435
436
+ /// React to a given linked chunk update by subscribing the user to a
437
+ /// thread, if needs be (when the user got mentioned in a thread reply, for
438
+ /// a thread they were not subscribed to).
439
+ ///
440
+ /// Returns a boolean indicating whether the task should keep on running or
441
+ /// not.
436
442
#[ instrument( skip( client, thread_subscriber_sender) ) ]
437
443
async fn handle_thread_subscriber_linked_chunk_update (
438
444
client : & WeakClient ,
@@ -445,7 +451,7 @@ impl EventCache {
445
451
return false ;
446
452
} ;
447
453
448
- let OwnedLinkedChunkId :: Thread ( room_id, thread_root) = & up. linked_chunk else {
454
+ let OwnedLinkedChunkId :: Thread ( room_id, thread_root) = & up. linked_chunk_id else {
449
455
trace ! ( "received an update for a non-thread linked chunk, ignoring" ) ;
450
456
return true ;
451
457
} ;
@@ -457,8 +463,9 @@ impl EventCache {
457
463
458
464
let thread_root = thread_root. clone ( ) ;
459
465
460
- let new_events = up. events ( ) ;
461
- if new_events. is_empty ( ) {
466
+ let mut new_events = up. events ( ) . peekable ( ) ;
467
+
468
+ if new_events. peek ( ) . is_none ( ) {
462
469
// No new events, nothing to do.
463
470
return true ;
464
471
}
@@ -489,8 +496,9 @@ impl EventCache {
489
496
let mut subscribe_up_to = None ;
490
497
491
498
// Find if there's an event that would trigger a mention for the current
492
- // user, iterating from the end of the new events towards the oldest,
493
- for ev in new_events. into_iter ( ) . rev ( ) {
499
+ // user, iterating from the end of the new events towards the oldest, so we can
500
+ // find the most recent event to subscribe to.
501
+ for ev in new_events. rev ( ) {
494
502
if push_context
495
503
. for_event ( ev. raw ( ) )
496
504
. await
@@ -520,6 +528,12 @@ impl EventCache {
520
528
true
521
529
}
522
530
531
+ /// React to a given send queue update by subscribing the user to a
532
+ /// thread, if needs be (when the user sent an event in a thread they were
533
+ /// not subscribed to).
534
+ ///
535
+ /// Returns a boolean indicating whether the task should keep on running or
536
+ /// not.
523
537
#[ instrument( skip( client, thread_subscriber_sender) ) ]
524
538
async fn handle_thread_subscriber_send_queue_update (
525
539
client : & WeakClient ,
@@ -710,7 +724,8 @@ struct EventCacheInner {
710
724
/// A sender for a persisted linked chunk update.
711
725
///
712
726
/// This is used to notify that some linked chunk has persisted some updates
713
- /// to a store, and can be used by observers to look for new events.
727
+ /// to a store, during sync or a back-pagination of *any* linked chunk.
728
+ /// This can be used by observers to look for new events.
714
729
///
715
730
/// See doc comment of [`RoomEventCacheLinkedChunkUpdate`].
716
731
linked_chunk_update_sender : Sender < RoomEventCacheLinkedChunkUpdate > ,
@@ -964,34 +979,37 @@ pub struct RoomEventCacheGenericUpdate {
964
979
#[ derive( Clone , Debug ) ]
965
980
struct RoomEventCacheLinkedChunkUpdate {
966
981
/// The linked chunk affected by the update.
967
- linked_chunk : OwnedLinkedChunkId ,
982
+ linked_chunk_id : OwnedLinkedChunkId ,
968
983
969
- /// A vector of all the updates that happened during this update.
984
+ /// A vector of all the linked chunk updates that happened during this event
985
+ /// cache update.
970
986
updates : Vec < linked_chunk:: Update < TimelineEvent , Gap > > ,
971
987
}
972
988
973
989
impl RoomEventCacheLinkedChunkUpdate {
974
990
/// Return all the new events propagated by this update, in topological
975
991
/// order.
976
- pub fn events ( self ) -> Vec < TimelineEvent > {
977
- self . updates
978
- . into_iter ( )
979
- . flat_map ( |update| match update {
980
- linked_chunk:: Update :: PushItems { items, .. } => items,
981
- linked_chunk:: Update :: ReplaceItem { item, .. } => vec ! [ item] ,
982
- linked_chunk:: Update :: RemoveItem { .. }
983
- | linked_chunk:: Update :: DetachLastItems { .. }
984
- | linked_chunk:: Update :: StartReattachItems
985
- | linked_chunk:: Update :: EndReattachItems
986
- | linked_chunk:: Update :: NewItemsChunk { .. }
987
- | linked_chunk:: Update :: NewGapChunk { .. }
988
- | linked_chunk:: Update :: RemoveChunk ( ..)
989
- | linked_chunk:: Update :: Clear => {
990
- // All these updates don't contain any new event.
991
- vec ! [ ]
992
- }
993
- } )
994
- . collect ( )
992
+ pub fn events ( self ) -> impl DoubleEndedIterator < Item = TimelineEvent > {
993
+ use itertools:: Either ;
994
+ self . updates . into_iter ( ) . flat_map ( |update| match update {
995
+ linked_chunk:: Update :: PushItems { items, .. } => {
996
+ Either :: Left ( Either :: Left ( items. into_iter ( ) ) )
997
+ }
998
+ linked_chunk:: Update :: ReplaceItem { item, .. } => {
999
+ Either :: Left ( Either :: Right ( std:: iter:: once ( item) ) )
1000
+ }
1001
+ linked_chunk:: Update :: RemoveItem { .. }
1002
+ | linked_chunk:: Update :: DetachLastItems { .. }
1003
+ | linked_chunk:: Update :: StartReattachItems
1004
+ | linked_chunk:: Update :: EndReattachItems
1005
+ | linked_chunk:: Update :: NewItemsChunk { .. }
1006
+ | linked_chunk:: Update :: NewGapChunk { .. }
1007
+ | linked_chunk:: Update :: RemoveChunk ( ..)
1008
+ | linked_chunk:: Update :: Clear => {
1009
+ // All these updates don't contain any new event.
1010
+ Either :: Right ( std:: iter:: empty ( ) )
1011
+ }
1012
+ } )
995
1013
}
996
1014
}
997
1015
0 commit comments