@@ -4,16 +4,24 @@ use imbl::Vector;
4
4
use matrix_sdk:: {
5
5
assert_let_timeout,
6
6
deserialized_responses:: { ThreadSummaryStatus , TimelineEvent } ,
7
- event_cache:: { RoomEventCacheUpdate , ThreadEventCacheUpdate } ,
7
+ event_cache:: { RoomEventCacheSubscriber , RoomEventCacheUpdate , ThreadEventCacheUpdate } ,
8
8
test_utils:: {
9
9
assert_event_matches_msg,
10
10
mocks:: { MatrixMockServer , RoomRelationsResponseTemplate } ,
11
11
} ,
12
+ Client , ThreadingSupport ,
12
13
} ;
13
14
use matrix_sdk_test:: {
14
15
async_test, event_factory:: EventFactory , GlobalAccountDataTestEvent , JoinedRoomBuilder , ALICE ,
15
16
} ;
16
- use ruma:: { event_id, room_id, user_id} ;
17
+ use ruma:: {
18
+ event_id,
19
+ events:: { AnySyncTimelineEvent , Mentions } ,
20
+ push:: { ConditionalPushRule , Ruleset } ,
21
+ room_id,
22
+ serde:: Raw ,
23
+ user_id, OwnedEventId , OwnedRoomId ,
24
+ } ;
17
25
use serde_json:: json;
18
26
use tokio:: sync:: broadcast;
19
27
@@ -433,3 +441,323 @@ async fn test_deduplication() {
433
441
// The events were already known, so the stream is still empty.
434
442
assert ! ( thread_stream. is_empty( ) ) ;
435
443
}
444
+
445
+ struct ThreadSubscriptionTestSetup {
446
+ server : MatrixMockServer ,
447
+ client : Client ,
448
+ factory : EventFactory ,
449
+ room_id : OwnedRoomId ,
450
+ subscriber : RoomEventCacheSubscriber ,
451
+ /// 3 events: 1 non-mention, 1 mention, and another non-mention.
452
+ events : Vec < Raw < AnySyncTimelineEvent > > ,
453
+ mention_event_id : OwnedEventId ,
454
+ thread_root : OwnedEventId ,
455
+ }
456
+
457
+ /// Create a new setup for a thread subscription test, with enough data so that
458
+ /// a push context can be created.
459
+ ///
460
+ /// The setup uses custom push rules, to trigger notifications only on mentions.
461
+ ///
462
+ /// The setup includes 3 events (1 non-mention, 1 mention, and another
463
+ /// non-mention) in the same thread, for easy testing of automated
464
+ /// subscriptions.
465
+ async fn thread_subscription_test_setup ( ) -> ThreadSubscriptionTestSetup {
466
+ let server = MatrixMockServer :: new ( ) . await ;
467
+
468
+ let thread_root = event_id ! ( "$thread_root" ) ;
469
+
470
+ // Assuming a client that's interested in thread subscriptions,
471
+ let client = server
472
+ . client_builder ( )
473
+ . on_builder ( |builder| {
474
+ builder. with_threading_support ( ThreadingSupport :: Enabled { with_subscriptions : true } )
475
+ } )
476
+ . build ( )
477
+ . await ;
478
+
479
+ // Immediately subscribe the event cache to sync updates.
480
+ client. event_cache ( ) . subscribe ( ) . unwrap ( ) ;
481
+
482
+ let room_id = room_id ! ( "!omelette:fromage.fr" ) ;
483
+ let room = server. sync_joined_room ( & client, room_id) . await ;
484
+
485
+ let ( room_event_cache, _drop_handles) = room. event_cache ( ) . await . unwrap ( ) ;
486
+
487
+ let ( initial_events, mut subscriber) = room_event_cache. subscribe ( ) . await ;
488
+ assert ! ( initial_events. is_empty( ) ) ;
489
+ assert ! ( subscriber. is_empty( ) ) ;
490
+
491
+ // Provide a dummy sync with the room's member profile of the current user, so
492
+ // the push context can be created.
493
+ let own_user_id = client. user_id ( ) . unwrap ( ) ;
494
+ let f = EventFactory :: new ( ) . room ( room_id) . sender ( * ALICE ) ;
495
+ let member = f. member ( own_user_id) . sender ( own_user_id) ;
496
+
497
+ // Override push rules so that only an intentional mention causes a
498
+ // notification.
499
+ let mut push_rules = Ruleset :: default ( ) ;
500
+ push_rules. override_ . insert ( ConditionalPushRule :: is_user_mention ( own_user_id) ) ;
501
+
502
+ server
503
+ . mock_sync ( )
504
+ . ok_and_run ( & client, |sync_builder| {
505
+ sync_builder. add_joined_room ( JoinedRoomBuilder :: new ( room_id) . add_state_event ( member) ) ;
506
+ sync_builder. add_global_account_data_event ( GlobalAccountDataTestEvent :: Custom ( json ! ( {
507
+ "type" : "m.push_rules" ,
508
+ "content" : {
509
+ "global" : push_rules
510
+ }
511
+ } ) ) ) ;
512
+ } )
513
+ . await ;
514
+
515
+ // Wait for the initial sync processing to complete; it will trigger a member
516
+ // update, at the very least.
517
+ assert_let_timeout ! ( Ok ( RoomEventCacheUpdate :: UpdateMembers { .. } ) = subscriber. recv( ) ) ;
518
+
519
+ let first_reply_event_id = event_id ! ( "$first_reply" ) ;
520
+ let first_reply = f
521
+ . text_msg ( "hey there" )
522
+ . in_thread ( thread_root, thread_root)
523
+ . event_id ( first_reply_event_id)
524
+ . into_raw ( ) ;
525
+
526
+ let second_reply_event_id = event_id ! ( "$second_reply" ) ;
527
+ let second_reply = f
528
+ . text_msg ( "hoy test user!" )
529
+ . mentions ( Mentions :: with_user_ids ( [ own_user_id. to_owned ( ) ] ) )
530
+ . in_thread ( thread_root, first_reply_event_id)
531
+ . event_id ( second_reply_event_id)
532
+ . into_raw ( ) ;
533
+
534
+ let third_reply_event_id = event_id ! ( "$third_reply" ) ;
535
+ let third_reply = f
536
+ . text_msg ( "ciao!" )
537
+ . in_thread ( thread_root, second_reply_event_id)
538
+ . event_id ( third_reply_event_id)
539
+ . into_raw ( ) ;
540
+
541
+ ThreadSubscriptionTestSetup {
542
+ server,
543
+ client,
544
+ factory : f,
545
+ subscriber,
546
+ events : vec ! [ first_reply, second_reply, third_reply] ,
547
+ mention_event_id : second_reply_event_id. to_owned ( ) ,
548
+ thread_root : thread_root. to_owned ( ) ,
549
+ room_id : room_id. to_owned ( ) ,
550
+ }
551
+ }
552
+
553
+ #[ async_test]
554
+ async fn test_auto_subscribe_thread_via_sync ( ) {
555
+ let mut s = thread_subscription_test_setup ( ) . await ;
556
+
557
+ // (The endpoint will be called for the current thread, and with an automatic
558
+ // subscription up to the given event ID.)
559
+ s. server
560
+ . mock_put_thread_subscription ( )
561
+ . match_automatic_event_id ( & s. mention_event_id )
562
+ . match_thread_id ( s. thread_root . to_owned ( ) )
563
+ . ok ( )
564
+ . mock_once ( )
565
+ . mount ( )
566
+ . await ;
567
+
568
+ // When I receive 3 events (1 non mention, 1 mention, then 1 non mention again),
569
+ // from sync, I'll get subscribed to the thread because of the second event.
570
+ s. server
571
+ . sync_room ( & s. client , JoinedRoomBuilder :: new ( & s. room_id ) . add_timeline_bulk ( s. events ) )
572
+ . await ;
573
+
574
+ // Let the event cache process the update.
575
+ assert_let_timeout ! (
576
+ Ok ( RoomEventCacheUpdate :: UpdateTimelineEvents { .. } ) = s. subscriber. recv( )
577
+ ) ;
578
+
579
+ // The actual check is the `mock_once` call above!
580
+ }
581
+
582
+ #[ async_test]
583
+ async fn test_dont_auto_subscribe_on_already_subscribed_thread ( ) {
584
+ let mut s = thread_subscription_test_setup ( ) . await ;
585
+
586
+ // Given a thread I'm already subscribed to,
587
+ s. server
588
+ . mock_get_thread_subscription ( )
589
+ . match_thread_id ( s. thread_root . to_owned ( ) )
590
+ . ok ( false )
591
+ . mock_once ( )
592
+ . mount ( )
593
+ . await ;
594
+
595
+ // The PUT endpoint (to subscribe to the thread) shouldn't be called…
596
+ s. server . mock_put_thread_subscription ( ) . ok ( ) . expect ( 0 ) . mount ( ) . await ;
597
+
598
+ // …when I receive a new in-thread mention for this thread.
599
+ s. server
600
+ . sync_room ( & s. client , JoinedRoomBuilder :: new ( & s. room_id ) . add_timeline_bulk ( s. events ) )
601
+ . await ;
602
+
603
+ // Let the event cache process the update.
604
+ assert_let_timeout ! (
605
+ Ok ( RoomEventCacheUpdate :: UpdateTimelineEvents { .. } ) = s. subscriber. recv( )
606
+ ) ;
607
+
608
+ // The actual check is the `expect` call above!
609
+ }
610
+
611
+ #[ async_test]
612
+ async fn test_auto_subscribe_on_thread_paginate ( ) {
613
+ // In this scenario, we're back-paginating a thread and making sure that the
614
+ // back-paginated events do cause a subscription.
615
+
616
+ let s = thread_subscription_test_setup ( ) . await ;
617
+
618
+ let event_cache = s. client . event_cache ( ) ;
619
+ event_cache. subscribe ( ) . unwrap ( ) ;
620
+
621
+ let thread_root_id = event_id ! ( "$thread_root" ) ;
622
+ let thread_resp_id = event_id ! ( "$thread_resp" ) ;
623
+
624
+ // Receive an in-thread event.
625
+ let room = s
626
+ . server
627
+ . sync_room (
628
+ & s. client ,
629
+ JoinedRoomBuilder :: new ( & s. room_id ) . add_timeline_event (
630
+ s. factory
631
+ . text_msg ( "that's a good point" )
632
+ . in_thread ( thread_root_id, thread_root_id)
633
+ . event_id ( thread_resp_id) ,
634
+ ) ,
635
+ )
636
+ . await ;
637
+
638
+ let ( room_event_cache, _drop_handles) = room. event_cache ( ) . await . unwrap ( ) ;
639
+
640
+ let ( thread_events, mut thread_stream) =
641
+ room_event_cache. subscribe_to_thread ( thread_root_id. to_owned ( ) ) . await ;
642
+
643
+ // Sanity check: the sync event is added to the thread.
644
+ let mut thread_events = wait_for_initial_events ( thread_events, & mut thread_stream) . await ;
645
+ assert_eq ! ( thread_events. len( ) , 1 ) ;
646
+ assert_eq ! ( thread_events. remove( 0 ) . event_id( ) . as_deref( ) , Some ( thread_resp_id) ) ;
647
+
648
+ // It's possible to paginate the thread, and this will push the thread root
649
+ // because there's no prev-batch token.
650
+ let reversed_events = s. events . into_iter ( ) . rev ( ) . map ( Raw :: cast_unchecked) . collect ( ) ;
651
+ s. server
652
+ . mock_room_relations ( )
653
+ . match_target_event ( thread_root_id. to_owned ( ) )
654
+ . ok ( RoomRelationsResponseTemplate :: default ( ) . events ( reversed_events) )
655
+ . mock_once ( )
656
+ . mount ( )
657
+ . await ;
658
+
659
+ s. server
660
+ . mock_room_event ( )
661
+ . match_event_id ( )
662
+ . ok ( s. factory . text_msg ( "Thread root" ) . event_id ( thread_root_id) . into ( ) )
663
+ . mock_once ( )
664
+ . mount ( )
665
+ . await ;
666
+
667
+ // (The endpoint will be called for the current thread, and with an automatic
668
+ // subscription up to the given event ID.)
669
+ s. server
670
+ . mock_put_thread_subscription ( )
671
+ . match_automatic_event_id ( & s. mention_event_id )
672
+ . match_thread_id ( s. thread_root . to_owned ( ) )
673
+ . ok ( )
674
+ . mock_once ( )
675
+ . mount ( )
676
+ . await ;
677
+
678
+ let hit_start =
679
+ room_event_cache. paginate_thread_backwards ( thread_root_id. to_owned ( ) , 42 ) . await . unwrap ( ) ;
680
+ assert ! ( hit_start) ;
681
+
682
+ // Let the event cache process the update.
683
+ assert_let_timeout ! ( Ok ( ThreadEventCacheUpdate { .. } ) = thread_stream. recv( ) ) ;
684
+ }
685
+
686
+ #[ async_test]
687
+ async fn test_auto_subscribe_on_thread_paginate_root_event ( ) {
688
+ // In this scenario, the root of a thread is the event that would cause the
689
+ // subscription.
690
+
691
+ let s = thread_subscription_test_setup ( ) . await ;
692
+
693
+ let event_cache = s. client . event_cache ( ) ;
694
+ event_cache. subscribe ( ) . unwrap ( ) ;
695
+
696
+ let thread_root_id = event_id ! ( "$thread_root" ) ;
697
+ let thread_resp_id = event_id ! ( "$thread_resp" ) ;
698
+
699
+ // Receive an in-thread event.
700
+ let room = s
701
+ . server
702
+ . sync_room (
703
+ & s. client ,
704
+ JoinedRoomBuilder :: new ( & s. room_id ) . add_timeline_event (
705
+ s. factory
706
+ . text_msg ( "that's a good point" )
707
+ . in_thread ( thread_root_id, thread_root_id)
708
+ . event_id ( thread_resp_id) ,
709
+ ) ,
710
+ )
711
+ . await ;
712
+
713
+ let ( room_event_cache, _drop_handles) = room. event_cache ( ) . await . unwrap ( ) ;
714
+
715
+ let ( thread_events, mut thread_stream) =
716
+ room_event_cache. subscribe_to_thread ( thread_root_id. to_owned ( ) ) . await ;
717
+
718
+ // Sanity check: the sync event is added to the thread.
719
+ let mut thread_events = wait_for_initial_events ( thread_events, & mut thread_stream) . await ;
720
+ assert_eq ! ( thread_events. len( ) , 1 ) ;
721
+ assert_eq ! ( thread_events. remove( 0 ) . event_id( ) . as_deref( ) , Some ( thread_resp_id) ) ;
722
+
723
+ // It's possible to paginate the thread, and this will push the thread root
724
+ // because there's no prev-batch token.
725
+ s. server
726
+ . mock_room_relations ( )
727
+ . match_target_event ( thread_root_id. to_owned ( ) )
728
+ . ok ( RoomRelationsResponseTemplate :: default ( ) )
729
+ . mock_once ( )
730
+ . mount ( )
731
+ . await ;
732
+
733
+ s. server
734
+ . mock_room_event ( )
735
+ . match_event_id ( )
736
+ . ok ( s
737
+ . factory
738
+ . text_msg ( "da r00t" )
739
+ . event_id ( thread_root_id)
740
+ . mentions ( Mentions :: with_user_ids ( s. client . user_id ( ) . map ( ToOwned :: to_owned) ) )
741
+ . into ( ) )
742
+ . mock_once ( )
743
+ . mount ( )
744
+ . await ;
745
+
746
+ // (The endpoint will be called for the current thread, and with an automatic
747
+ // subscription up to the given event ID.)
748
+ s. server
749
+ . mock_put_thread_subscription ( )
750
+ . match_automatic_event_id ( thread_root_id)
751
+ . match_thread_id ( thread_root_id. to_owned ( ) )
752
+ . ok ( )
753
+ . mock_once ( )
754
+ . mount ( )
755
+ . await ;
756
+
757
+ let hit_start =
758
+ room_event_cache. paginate_thread_backwards ( thread_root_id. to_owned ( ) , 42 ) . await . unwrap ( ) ;
759
+ assert ! ( hit_start) ;
760
+
761
+ // Let the event cache process the update.
762
+ assert_let_timeout ! ( Ok ( ThreadEventCacheUpdate { .. } ) = thread_stream. recv( ) ) ;
763
+ }
0 commit comments