@@ -4,16 +4,24 @@ use imbl::Vector;
4
4
use matrix_sdk:: {
5
5
assert_let_timeout,
6
6
deserialized_responses:: { ThreadSummaryStatus , TimelineEvent } ,
7
- event_cache:: { RoomEventCacheUpdate , ThreadEventCacheUpdate } ,
7
+ event_cache:: { RoomEventCacheSubscriber , RoomEventCacheUpdate , ThreadEventCacheUpdate } ,
8
8
test_utils:: {
9
9
assert_event_matches_msg,
10
10
mocks:: { MatrixMockServer , RoomRelationsResponseTemplate } ,
11
11
} ,
12
+ Client , ThreadingSupport ,
12
13
} ;
13
14
use matrix_sdk_test:: {
14
15
async_test, event_factory:: EventFactory , GlobalAccountDataTestEvent , JoinedRoomBuilder , ALICE ,
15
16
} ;
16
- use ruma:: { event_id, room_id, user_id} ;
17
+ use ruma:: {
18
+ event_id,
19
+ events:: { AnySyncTimelineEvent , Mentions } ,
20
+ push:: { ConditionalPushRule , Ruleset } ,
21
+ room_id,
22
+ serde:: Raw ,
23
+ user_id, OwnedEventId , OwnedRoomId ,
24
+ } ;
17
25
use serde_json:: json;
18
26
use tokio:: sync:: broadcast;
19
27
@@ -433,3 +441,319 @@ async fn test_deduplication() {
433
441
// The events were already known, so the stream is still empty.
434
442
assert ! ( thread_stream. is_empty( ) ) ;
435
443
}
444
+
445
+ struct ThreadSubscriptionTestSetup {
446
+ server : MatrixMockServer ,
447
+ client : Client ,
448
+ factory : EventFactory ,
449
+ room_id : OwnedRoomId ,
450
+ subscriber : RoomEventCacheSubscriber ,
451
+ /// 3 events: 1 non-mention, 1 mention, and another non-mention.
452
+ events : Vec < Raw < AnySyncTimelineEvent > > ,
453
+ mention_event_id : OwnedEventId ,
454
+ thread_root : OwnedEventId ,
455
+ }
456
+
457
+ /// Create a new setup for a thread subscription test, with enough data so that
458
+ /// a push context can be created.
459
+ ///
460
+ /// The setup uses custom push rules, to trigger notifications only on mentions.
461
+ ///
462
+ /// The setup includes 3 events (1 non-mention, 1 mention, and another
463
+ /// non-mention) in the same thread, for easy testing of automated
464
+ /// subscriptions.
465
+ async fn thread_subscription_test_setup ( ) -> ThreadSubscriptionTestSetup {
466
+ let server = MatrixMockServer :: new ( ) . await ;
467
+
468
+ let thread_root = event_id ! ( "$thread_root" ) ;
469
+
470
+ // Assuming a client that's interested in thread subscriptions,
471
+ let client = server
472
+ . client_builder ( )
473
+ . on_builder ( |builder| {
474
+ builder. with_threading_support ( ThreadingSupport :: Enabled { with_subscriptions : true } )
475
+ } )
476
+ . build ( )
477
+ . await ;
478
+
479
+ // Immediately subscribe the event cache to sync updates.
480
+ client. event_cache ( ) . subscribe ( ) . unwrap ( ) ;
481
+
482
+ let room_id = room_id ! ( "!omelette:fromage.fr" ) ;
483
+
484
+ // Provide a dummy sync with the room's member profile of the current user, so
485
+ // the push context can be created.
486
+ let own_user_id = client. user_id ( ) . unwrap ( ) ;
487
+ let f = EventFactory :: new ( ) . room ( room_id) . sender ( * ALICE ) ;
488
+ let member = f. member ( own_user_id) . sender ( own_user_id) ;
489
+
490
+ // Override push rules so that only an intentional mention causes a
491
+ // notification.
492
+ let mut push_rules = Ruleset :: default ( ) ;
493
+ push_rules. override_ . insert ( ConditionalPushRule :: is_user_mention ( own_user_id) ) ;
494
+
495
+ server
496
+ . mock_sync ( )
497
+ . ok_and_run ( & client, |sync_builder| {
498
+ sync_builder. add_joined_room ( JoinedRoomBuilder :: new ( room_id) . add_state_event ( member) ) ;
499
+ sync_builder. add_global_account_data_event ( GlobalAccountDataTestEvent :: Custom ( json ! ( {
500
+ "type" : "m.push_rules" ,
501
+ "content" : {
502
+ "global" : push_rules
503
+ }
504
+ } ) ) ) ;
505
+ } )
506
+ . await ;
507
+
508
+ let room = client. get_room ( room_id) . unwrap ( ) ;
509
+ let ( room_event_cache, _drop_handles) = room. event_cache ( ) . await . unwrap ( ) ;
510
+
511
+ let ( initial_events, subscriber) = room_event_cache. subscribe ( ) . await ;
512
+ assert ! ( initial_events. is_empty( ) ) ;
513
+ assert ! ( subscriber. is_empty( ) ) ;
514
+
515
+ let first_reply_event_id = event_id ! ( "$first_reply" ) ;
516
+ let first_reply = f
517
+ . text_msg ( "hey there" )
518
+ . in_thread ( thread_root, thread_root)
519
+ . event_id ( first_reply_event_id)
520
+ . into_raw ( ) ;
521
+
522
+ let second_reply_event_id = event_id ! ( "$second_reply" ) ;
523
+ let second_reply = f
524
+ . text_msg ( "hoy test user!" )
525
+ . mentions ( Mentions :: with_user_ids ( [ own_user_id. to_owned ( ) ] ) )
526
+ . in_thread ( thread_root, first_reply_event_id)
527
+ . event_id ( second_reply_event_id)
528
+ . into_raw ( ) ;
529
+
530
+ let third_reply_event_id = event_id ! ( "$third_reply" ) ;
531
+ let third_reply = f
532
+ . text_msg ( "ciao!" )
533
+ . in_thread ( thread_root, second_reply_event_id)
534
+ . event_id ( third_reply_event_id)
535
+ . into_raw ( ) ;
536
+
537
+ ThreadSubscriptionTestSetup {
538
+ server,
539
+ client,
540
+ factory : f,
541
+ subscriber,
542
+ events : vec ! [ first_reply, second_reply, third_reply] ,
543
+ mention_event_id : second_reply_event_id. to_owned ( ) ,
544
+ thread_root : thread_root. to_owned ( ) ,
545
+ room_id : room_id. to_owned ( ) ,
546
+ }
547
+ }
548
+
549
+ #[ async_test]
550
+ async fn test_auto_subscribe_thread_via_sync ( ) {
551
+ let mut s = thread_subscription_test_setup ( ) . await ;
552
+
553
+ // (The endpoint will be called for the current thread, and with an automatic
554
+ // subscription up to the given event ID.)
555
+ s. server
556
+ . mock_put_thread_subscription ( )
557
+ . match_automatic_event_id ( & s. mention_event_id )
558
+ . match_thread_id ( s. thread_root . to_owned ( ) )
559
+ . ok ( )
560
+ . mock_once ( )
561
+ . mount ( )
562
+ . await ;
563
+
564
+ // When I receive 3 events (1 non mention, 1 mention, then 1 non mention again),
565
+ // from sync, I'll get subscribed to the thread because of the second event.
566
+ s. server
567
+ . sync_room ( & s. client , JoinedRoomBuilder :: new ( & s. room_id ) . add_timeline_bulk ( s. events ) )
568
+ . await ;
569
+
570
+ // Let the event cache process the update.
571
+ assert_let_timeout ! (
572
+ Ok ( RoomEventCacheUpdate :: UpdateTimelineEvents { .. } ) = s. subscriber. recv( )
573
+ ) ;
574
+
575
+ // The actual check is the `mock_once` call above!
576
+ }
577
+
578
+ #[ async_test]
579
+ async fn test_dont_auto_subscribe_on_already_subscribed_thread ( ) {
580
+ let mut s = thread_subscription_test_setup ( ) . await ;
581
+
582
+ // Given a thread I'm already subscribed to,
583
+ s. server
584
+ . mock_get_thread_subscription ( )
585
+ . match_thread_id ( s. thread_root . to_owned ( ) )
586
+ . ok ( false )
587
+ . mock_once ( )
588
+ . mount ( )
589
+ . await ;
590
+
591
+ // The PUT endpoint (to subscribe to the thread) shouldn't be called…
592
+ s. server . mock_put_thread_subscription ( ) . ok ( ) . expect ( 0 ) . mount ( ) . await ;
593
+
594
+ // …when I receive a new in-thread mention for this thread.
595
+ s. server
596
+ . sync_room ( & s. client , JoinedRoomBuilder :: new ( & s. room_id ) . add_timeline_bulk ( s. events ) )
597
+ . await ;
598
+
599
+ // Let the event cache process the update.
600
+ assert_let_timeout ! (
601
+ Ok ( RoomEventCacheUpdate :: UpdateTimelineEvents { .. } ) = s. subscriber. recv( )
602
+ ) ;
603
+
604
+ // The actual check is the `expect` call above!
605
+ }
606
+
607
+ #[ async_test]
608
+ async fn test_auto_subscribe_on_thread_paginate ( ) {
609
+ // In this scenario, we're back-paginating a thread and making sure that the
610
+ // back-paginated events do cause a subscription.
611
+
612
+ let s = thread_subscription_test_setup ( ) . await ;
613
+
614
+ let event_cache = s. client . event_cache ( ) ;
615
+ event_cache. subscribe ( ) . unwrap ( ) ;
616
+
617
+ let thread_root_id = event_id ! ( "$thread_root" ) ;
618
+ let thread_resp_id = event_id ! ( "$thread_resp" ) ;
619
+
620
+ // Receive an in-thread event.
621
+ let room = s
622
+ . server
623
+ . sync_room (
624
+ & s. client ,
625
+ JoinedRoomBuilder :: new ( & s. room_id ) . add_timeline_event (
626
+ s. factory
627
+ . text_msg ( "that's a good point" )
628
+ . in_thread ( thread_root_id, thread_root_id)
629
+ . event_id ( thread_resp_id) ,
630
+ ) ,
631
+ )
632
+ . await ;
633
+
634
+ let ( room_event_cache, _drop_handles) = room. event_cache ( ) . await . unwrap ( ) ;
635
+
636
+ let ( thread_events, mut thread_stream) =
637
+ room_event_cache. subscribe_to_thread ( thread_root_id. to_owned ( ) ) . await ;
638
+
639
+ // Sanity check: the sync event is added to the thread.
640
+ let mut thread_events = wait_for_initial_events ( thread_events, & mut thread_stream) . await ;
641
+ assert_eq ! ( thread_events. len( ) , 1 ) ;
642
+ assert_eq ! ( thread_events. remove( 0 ) . event_id( ) . as_deref( ) , Some ( thread_resp_id) ) ;
643
+
644
+ // It's possible to paginate the thread, and this will push the thread root
645
+ // because there's no prev-batch token.
646
+ let reversed_events = s. events . into_iter ( ) . rev ( ) . map ( Raw :: cast_unchecked) . collect ( ) ;
647
+ s. server
648
+ . mock_room_relations ( )
649
+ . match_target_event ( thread_root_id. to_owned ( ) )
650
+ . ok ( RoomRelationsResponseTemplate :: default ( ) . events ( reversed_events) )
651
+ . mock_once ( )
652
+ . mount ( )
653
+ . await ;
654
+
655
+ s. server
656
+ . mock_room_event ( )
657
+ . match_event_id ( )
658
+ . ok ( s. factory . text_msg ( "Thread root" ) . event_id ( thread_root_id) . into ( ) )
659
+ . mock_once ( )
660
+ . mount ( )
661
+ . await ;
662
+
663
+ // (The endpoint will be called for the current thread, and with an automatic
664
+ // subscription up to the given event ID.)
665
+ s. server
666
+ . mock_put_thread_subscription ( )
667
+ . match_automatic_event_id ( & s. mention_event_id )
668
+ . match_thread_id ( s. thread_root . to_owned ( ) )
669
+ . ok ( )
670
+ . mock_once ( )
671
+ . mount ( )
672
+ . await ;
673
+
674
+ let hit_start =
675
+ room_event_cache. paginate_thread_backwards ( thread_root_id. to_owned ( ) , 42 ) . await . unwrap ( ) ;
676
+ assert ! ( hit_start) ;
677
+
678
+ // Let the event cache process the update.
679
+ assert_let_timeout ! ( Ok ( ThreadEventCacheUpdate { .. } ) = thread_stream. recv( ) ) ;
680
+ }
681
+
682
+ #[ async_test]
683
+ async fn test_auto_subscribe_on_thread_paginate_root_event ( ) {
684
+ // In this scenario, the root of a thread is the event that would cause the
685
+ // subscription.
686
+
687
+ let s = thread_subscription_test_setup ( ) . await ;
688
+
689
+ let event_cache = s. client . event_cache ( ) ;
690
+ event_cache. subscribe ( ) . unwrap ( ) ;
691
+
692
+ let thread_root_id = event_id ! ( "$thread_root" ) ;
693
+ let thread_resp_id = event_id ! ( "$thread_resp" ) ;
694
+
695
+ // Receive an in-thread event.
696
+ let room = s
697
+ . server
698
+ . sync_room (
699
+ & s. client ,
700
+ JoinedRoomBuilder :: new ( & s. room_id ) . add_timeline_event (
701
+ s. factory
702
+ . text_msg ( "that's a good point" )
703
+ . in_thread ( thread_root_id, thread_root_id)
704
+ . event_id ( thread_resp_id) ,
705
+ ) ,
706
+ )
707
+ . await ;
708
+
709
+ let ( room_event_cache, _drop_handles) = room. event_cache ( ) . await . unwrap ( ) ;
710
+
711
+ let ( thread_events, mut thread_stream) =
712
+ room_event_cache. subscribe_to_thread ( thread_root_id. to_owned ( ) ) . await ;
713
+
714
+ // Sanity check: the sync event is added to the thread.
715
+ let mut thread_events = wait_for_initial_events ( thread_events, & mut thread_stream) . await ;
716
+ assert_eq ! ( thread_events. len( ) , 1 ) ;
717
+ assert_eq ! ( thread_events. remove( 0 ) . event_id( ) . as_deref( ) , Some ( thread_resp_id) ) ;
718
+
719
+ // It's possible to paginate the thread, and this will push the thread root
720
+ // because there's no prev-batch token.
721
+ s. server
722
+ . mock_room_relations ( )
723
+ . match_target_event ( thread_root_id. to_owned ( ) )
724
+ . ok ( RoomRelationsResponseTemplate :: default ( ) )
725
+ . mock_once ( )
726
+ . mount ( )
727
+ . await ;
728
+
729
+ s. server
730
+ . mock_room_event ( )
731
+ . match_event_id ( )
732
+ . ok ( s
733
+ . factory
734
+ . text_msg ( "da r00t" )
735
+ . event_id ( thread_root_id)
736
+ . mentions ( Mentions :: with_user_ids ( s. client . user_id ( ) . map ( ToOwned :: to_owned) ) )
737
+ . into ( ) )
738
+ . mock_once ( )
739
+ . mount ( )
740
+ . await ;
741
+
742
+ // (The endpoint will be called for the current thread, and with an automatic
743
+ // subscription up to the given event ID.)
744
+ s. server
745
+ . mock_put_thread_subscription ( )
746
+ . match_automatic_event_id ( thread_root_id)
747
+ . match_thread_id ( thread_root_id. to_owned ( ) )
748
+ . ok ( )
749
+ . mock_once ( )
750
+ . mount ( )
751
+ . await ;
752
+
753
+ let hit_start =
754
+ room_event_cache. paginate_thread_backwards ( thread_root_id. to_owned ( ) , 42 ) . await . unwrap ( ) ;
755
+ assert ! ( hit_start) ;
756
+
757
+ // Let the event cache process the update.
758
+ assert_let_timeout ! ( Ok ( ThreadEventCacheUpdate { .. } ) = thread_stream. recv( ) ) ;
759
+ }
0 commit comments