|
| 1 | +use std::time::Duration; |
| 2 | + |
1 | 3 | use assert_matches2::assert_let;
|
2 | 4 | use eyeball_im::VectorDiff;
|
3 | 5 | use imbl::Vector;
|
4 | 6 | use matrix_sdk::{
|
5 | 7 | assert_let_timeout,
|
6 | 8 | deserialized_responses::{ThreadSummaryStatus, TimelineEvent},
|
7 |
| - event_cache::{RoomEventCacheUpdate, ThreadEventCacheUpdate}, |
| 9 | + event_cache::{RoomEventCacheSubscriber, RoomEventCacheUpdate, ThreadEventCacheUpdate}, |
| 10 | + sleep::sleep, |
8 | 11 | test_utils::{
|
9 | 12 | assert_event_matches_msg,
|
10 | 13 | mocks::{MatrixMockServer, RoomRelationsResponseTemplate},
|
11 | 14 | },
|
| 15 | + Client, ThreadingSupport, |
12 | 16 | };
|
13 | 17 | use matrix_sdk_test::{
|
14 | 18 | async_test, event_factory::EventFactory, GlobalAccountDataTestEvent, JoinedRoomBuilder, ALICE,
|
15 | 19 | };
|
16 |
| -use ruma::{event_id, room_id, user_id}; |
| 20 | +use ruma::{ |
| 21 | + event_id, |
| 22 | + events::{AnySyncTimelineEvent, Mentions}, |
| 23 | + push::{ConditionalPushRule, Ruleset}, |
| 24 | + room_id, |
| 25 | + serde::Raw, |
| 26 | + user_id, OwnedEventId, OwnedRoomId, |
| 27 | +}; |
17 | 28 | use serde_json::json;
|
18 | 29 | use tokio::sync::broadcast;
|
19 | 30 |
|
@@ -433,3 +444,344 @@ async fn test_deduplication() {
|
433 | 444 | // The events were already known, so the stream is still empty.
|
434 | 445 | assert!(thread_stream.is_empty());
|
435 | 446 | }
|
| 447 | + |
| 448 | +struct ThreadSubscriptionTestSetup { |
| 449 | + server: MatrixMockServer, |
| 450 | + client: Client, |
| 451 | + factory: EventFactory, |
| 452 | + room_id: OwnedRoomId, |
| 453 | + subscriber: RoomEventCacheSubscriber, |
| 454 | + /// 3 events: 1 non-mention, 1 mention, and another non-mention. |
| 455 | + events: Vec<Raw<AnySyncTimelineEvent>>, |
| 456 | + mention_event_id: OwnedEventId, |
| 457 | + thread_root: OwnedEventId, |
| 458 | +} |
| 459 | + |
| 460 | +/// Create a new setup for a thread subscription test, with enough data so that |
| 461 | +/// a push context can be created. |
| 462 | +/// |
| 463 | +/// The setup uses custom push rules, to trigger notifications only on mentions. |
| 464 | +/// |
| 465 | +/// The setup includes 3 events (1 non-mention, 1 mention, and another |
| 466 | +/// non-mention) in the same thread, for easy testing of automated |
| 467 | +/// subscriptions. |
| 468 | +async fn thread_subscription_test_setup() -> ThreadSubscriptionTestSetup { |
| 469 | + let server = MatrixMockServer::new().await; |
| 470 | + |
| 471 | + let thread_root = event_id!("$thread_root"); |
| 472 | + |
| 473 | + // Assuming a client that's interested in thread subscriptions, |
| 474 | + let client = server |
| 475 | + .client_builder() |
| 476 | + .on_builder(|builder| { |
| 477 | + builder.with_threading_support(ThreadingSupport::Enabled { with_subscriptions: true }) |
| 478 | + }) |
| 479 | + .build() |
| 480 | + .await; |
| 481 | + |
| 482 | + // Immediately subscribe the event cache to sync updates. |
| 483 | + client.event_cache().subscribe().unwrap(); |
| 484 | + |
| 485 | + let room_id = room_id!("!omelette:fromage.fr"); |
| 486 | + let room = server.sync_joined_room(&client, room_id).await; |
| 487 | + |
| 488 | + let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); |
| 489 | + |
| 490 | + let (initial_events, mut subscriber) = room_event_cache.subscribe().await; |
| 491 | + assert!(initial_events.is_empty()); |
| 492 | + assert!(subscriber.is_empty()); |
| 493 | + |
| 494 | + // Provide a dummy sync with the room's member profile of the current user, so |
| 495 | + // the push context can be created. |
| 496 | + let own_user_id = client.user_id().unwrap(); |
| 497 | + let f = EventFactory::new().room(room_id).sender(*ALICE); |
| 498 | + let member = f.member(own_user_id).sender(own_user_id); |
| 499 | + |
| 500 | + // Override push rules so that only an intentional mention causes a |
| 501 | + // notification. |
| 502 | + let mut push_rules = Ruleset::default(); |
| 503 | + push_rules.override_.insert(ConditionalPushRule::is_user_mention(own_user_id)); |
| 504 | + |
| 505 | + server |
| 506 | + .mock_sync() |
| 507 | + .ok_and_run(&client, |sync_builder| { |
| 508 | + sync_builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_state_event(member)); |
| 509 | + sync_builder.add_global_account_data_event(GlobalAccountDataTestEvent::Custom(json!({ |
| 510 | + "type": "m.push_rules", |
| 511 | + "content": { |
| 512 | + "global": push_rules |
| 513 | + } |
| 514 | + }))); |
| 515 | + }) |
| 516 | + .await; |
| 517 | + |
| 518 | + // Wait for the initial sync processing to complete; it will trigger a member |
| 519 | + // update, at the very least. |
| 520 | + assert_let_timeout!(Ok(RoomEventCacheUpdate::UpdateMembers { .. }) = subscriber.recv()); |
| 521 | + |
| 522 | + let first_reply_event_id = event_id!("$first_reply"); |
| 523 | + let first_reply = f |
| 524 | + .text_msg("hey there") |
| 525 | + .in_thread(thread_root, thread_root) |
| 526 | + .event_id(first_reply_event_id) |
| 527 | + .into_raw(); |
| 528 | + |
| 529 | + let second_reply_event_id = event_id!("$second_reply"); |
| 530 | + let second_reply = f |
| 531 | + .text_msg("hoy test user!") |
| 532 | + .mentions(Mentions::with_user_ids([own_user_id.to_owned()])) |
| 533 | + .in_thread(thread_root, first_reply_event_id) |
| 534 | + .event_id(second_reply_event_id) |
| 535 | + .into_raw(); |
| 536 | + |
| 537 | + let third_reply_event_id = event_id!("$third_reply"); |
| 538 | + let third_reply = f |
| 539 | + .text_msg("ciao!") |
| 540 | + .in_thread(thread_root, second_reply_event_id) |
| 541 | + .event_id(third_reply_event_id) |
| 542 | + .into_raw(); |
| 543 | + |
| 544 | + ThreadSubscriptionTestSetup { |
| 545 | + server, |
| 546 | + client, |
| 547 | + factory: f, |
| 548 | + subscriber, |
| 549 | + events: vec![first_reply, second_reply, third_reply], |
| 550 | + mention_event_id: second_reply_event_id.to_owned(), |
| 551 | + thread_root: thread_root.to_owned(), |
| 552 | + room_id: room_id.to_owned(), |
| 553 | + } |
| 554 | +} |
| 555 | + |
| 556 | +#[async_test] |
| 557 | +async fn test_auto_subscribe_thread_via_sync() { |
| 558 | + let mut s = thread_subscription_test_setup().await; |
| 559 | + |
| 560 | + // (The endpoint will be called for the current thread, and with an automatic |
| 561 | + // subscription up to the given event ID.) |
| 562 | + s.server |
| 563 | + .mock_put_thread_subscription() |
| 564 | + .match_automatic_event_id(&s.mention_event_id) |
| 565 | + .match_thread_id(s.thread_root.to_owned()) |
| 566 | + .ok() |
| 567 | + .mock_once() |
| 568 | + .mount() |
| 569 | + .await; |
| 570 | + |
| 571 | + let mut thread_subscriber_updates = |
| 572 | + s.client.event_cache().subscribe_thread_subscriber_updates(); |
| 573 | + |
| 574 | + // When I receive 3 events (1 non mention, 1 mention, then 1 non mention again), |
| 575 | + // from sync, I'll get subscribed to the thread because of the second event. |
| 576 | + s.server |
| 577 | + .sync_room(&s.client, JoinedRoomBuilder::new(&s.room_id).add_timeline_bulk(s.events)) |
| 578 | + .await; |
| 579 | + |
| 580 | + // Let the event cache process the update. |
| 581 | + assert_let_timeout!( |
| 582 | + Ok(RoomEventCacheUpdate::UpdateTimelineEvents { .. }) = s.subscriber.recv() |
| 583 | + ); |
| 584 | + assert_let_timeout!(Ok(()) = thread_subscriber_updates.recv()); |
| 585 | + |
| 586 | + // The actual check is the `mock_once` call above! |
| 587 | +} |
| 588 | + |
| 589 | +#[async_test] |
| 590 | +async fn test_dont_auto_subscribe_on_already_subscribed_thread() { |
| 591 | + let mut s = thread_subscription_test_setup().await; |
| 592 | + |
| 593 | + // Given a thread I'm already subscribed to, |
| 594 | + s.server |
| 595 | + .mock_get_thread_subscription() |
| 596 | + .match_thread_id(s.thread_root.to_owned()) |
| 597 | + .ok(false) |
| 598 | + .mock_once() |
| 599 | + .mount() |
| 600 | + .await; |
| 601 | + |
| 602 | + // The PUT endpoint (to subscribe to the thread) shouldn't be called… |
| 603 | + s.server.mock_put_thread_subscription().ok().expect(0).mount().await; |
| 604 | + |
| 605 | + // …when I receive a new in-thread mention for this thread. |
| 606 | + s.server |
| 607 | + .sync_room(&s.client, JoinedRoomBuilder::new(&s.room_id).add_timeline_bulk(s.events)) |
| 608 | + .await; |
| 609 | + |
| 610 | + // Let the event cache process the update. |
| 611 | + assert_let_timeout!( |
| 612 | + Ok(RoomEventCacheUpdate::UpdateTimelineEvents { .. }) = s.subscriber.recv() |
| 613 | + ); |
| 614 | + |
| 615 | + // Let a bit of time for the background thread subscriber task to process the |
| 616 | + // update. |
| 617 | + sleep(Duration::from_millis(200)).await; |
| 618 | + |
| 619 | + // The actual check is the `expect` call above! |
| 620 | +} |
| 621 | + |
| 622 | +#[async_test] |
| 623 | +async fn test_auto_subscribe_on_thread_paginate() { |
| 624 | + // In this scenario, we're back-paginating a thread and making sure that the |
| 625 | + // back-paginated events do cause a subscription. |
| 626 | + |
| 627 | + let s = thread_subscription_test_setup().await; |
| 628 | + |
| 629 | + let event_cache = s.client.event_cache(); |
| 630 | + event_cache.subscribe().unwrap(); |
| 631 | + |
| 632 | + let mut thread_subscriber_updates = |
| 633 | + s.client.event_cache().subscribe_thread_subscriber_updates(); |
| 634 | + |
| 635 | + let thread_root_id = event_id!("$thread_root"); |
| 636 | + let thread_resp_id = event_id!("$thread_resp"); |
| 637 | + |
| 638 | + // Receive an in-thread event. |
| 639 | + let room = s |
| 640 | + .server |
| 641 | + .sync_room( |
| 642 | + &s.client, |
| 643 | + JoinedRoomBuilder::new(&s.room_id).add_timeline_event( |
| 644 | + s.factory |
| 645 | + .text_msg("that's a good point") |
| 646 | + .in_thread(thread_root_id, thread_root_id) |
| 647 | + .event_id(thread_resp_id), |
| 648 | + ), |
| 649 | + ) |
| 650 | + .await; |
| 651 | + |
| 652 | + let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); |
| 653 | + |
| 654 | + let (thread_events, mut thread_stream) = |
| 655 | + room_event_cache.subscribe_to_thread(thread_root_id.to_owned()).await; |
| 656 | + |
| 657 | + // Sanity check: the sync event is added to the thread. |
| 658 | + let mut thread_events = wait_for_initial_events(thread_events, &mut thread_stream).await; |
| 659 | + assert_eq!(thread_events.len(), 1); |
| 660 | + assert_eq!(thread_events.remove(0).event_id().as_deref(), Some(thread_resp_id)); |
| 661 | + |
| 662 | + assert!(thread_subscriber_updates.is_empty()); |
| 663 | + |
| 664 | + // It's possible to paginate the thread, and this will push the thread root |
| 665 | + // because there's no prev-batch token. |
| 666 | + let reversed_events = s.events.into_iter().rev().map(Raw::cast_unchecked).collect(); |
| 667 | + s.server |
| 668 | + .mock_room_relations() |
| 669 | + .match_target_event(thread_root_id.to_owned()) |
| 670 | + .ok(RoomRelationsResponseTemplate::default().events(reversed_events)) |
| 671 | + .mock_once() |
| 672 | + .mount() |
| 673 | + .await; |
| 674 | + |
| 675 | + s.server |
| 676 | + .mock_room_event() |
| 677 | + .match_event_id() |
| 678 | + .ok(s.factory.text_msg("Thread root").event_id(thread_root_id).into()) |
| 679 | + .mock_once() |
| 680 | + .mount() |
| 681 | + .await; |
| 682 | + |
| 683 | + // (The endpoint will be called for the current thread, and with an automatic |
| 684 | + // subscription up to the given event ID.) |
| 685 | + s.server |
| 686 | + .mock_put_thread_subscription() |
| 687 | + .match_automatic_event_id(&s.mention_event_id) |
| 688 | + .match_thread_id(s.thread_root.to_owned()) |
| 689 | + .ok() |
| 690 | + .mock_once() |
| 691 | + .mount() |
| 692 | + .await; |
| 693 | + |
| 694 | + let hit_start = |
| 695 | + room_event_cache.paginate_thread_backwards(thread_root_id.to_owned(), 42).await.unwrap(); |
| 696 | + assert!(hit_start); |
| 697 | + |
| 698 | + // Let the event cache process the update. |
| 699 | + assert_let_timeout!(Ok(ThreadEventCacheUpdate { .. }) = thread_stream.recv()); |
| 700 | + assert_let_timeout!(Ok(()) = thread_subscriber_updates.recv()); |
| 701 | + assert!(thread_subscriber_updates.is_empty()); |
| 702 | +} |
| 703 | + |
| 704 | +#[async_test] |
| 705 | +async fn test_auto_subscribe_on_thread_paginate_root_event() { |
| 706 | + // In this scenario, the root of a thread is the event that would cause the |
| 707 | + // subscription. |
| 708 | + |
| 709 | + let s = thread_subscription_test_setup().await; |
| 710 | + |
| 711 | + let event_cache = s.client.event_cache(); |
| 712 | + event_cache.subscribe().unwrap(); |
| 713 | + |
| 714 | + let mut thread_subscriber_updates = |
| 715 | + s.client.event_cache().subscribe_thread_subscriber_updates(); |
| 716 | + |
| 717 | + let thread_root_id = event_id!("$thread_root"); |
| 718 | + let thread_resp_id = event_id!("$thread_resp"); |
| 719 | + |
| 720 | + // Receive an in-thread event. |
| 721 | + let room = s |
| 722 | + .server |
| 723 | + .sync_room( |
| 724 | + &s.client, |
| 725 | + JoinedRoomBuilder::new(&s.room_id).add_timeline_event( |
| 726 | + s.factory |
| 727 | + .text_msg("that's a good point") |
| 728 | + .in_thread(thread_root_id, thread_root_id) |
| 729 | + .event_id(thread_resp_id), |
| 730 | + ), |
| 731 | + ) |
| 732 | + .await; |
| 733 | + |
| 734 | + let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); |
| 735 | + |
| 736 | + let (thread_events, mut thread_stream) = |
| 737 | + room_event_cache.subscribe_to_thread(thread_root_id.to_owned()).await; |
| 738 | + |
| 739 | + // Sanity check: the sync event is added to the thread. |
| 740 | + let mut thread_events = wait_for_initial_events(thread_events, &mut thread_stream).await; |
| 741 | + assert_eq!(thread_events.len(), 1); |
| 742 | + assert_eq!(thread_events.remove(0).event_id().as_deref(), Some(thread_resp_id)); |
| 743 | + |
| 744 | + assert!(thread_subscriber_updates.is_empty()); |
| 745 | + |
| 746 | + // It's possible to paginate the thread, and this will push the thread root |
| 747 | + // because there's no prev-batch token. |
| 748 | + s.server |
| 749 | + .mock_room_relations() |
| 750 | + .match_target_event(thread_root_id.to_owned()) |
| 751 | + .ok(RoomRelationsResponseTemplate::default()) |
| 752 | + .mock_once() |
| 753 | + .mount() |
| 754 | + .await; |
| 755 | + |
| 756 | + s.server |
| 757 | + .mock_room_event() |
| 758 | + .match_event_id() |
| 759 | + .ok(s |
| 760 | + .factory |
| 761 | + .text_msg("da r00t") |
| 762 | + .event_id(thread_root_id) |
| 763 | + .mentions(Mentions::with_user_ids(s.client.user_id().map(ToOwned::to_owned))) |
| 764 | + .into()) |
| 765 | + .mock_once() |
| 766 | + .mount() |
| 767 | + .await; |
| 768 | + |
| 769 | + // (The endpoint will be called for the current thread, and with an automatic |
| 770 | + // subscription up to the given event ID.) |
| 771 | + s.server |
| 772 | + .mock_put_thread_subscription() |
| 773 | + .match_automatic_event_id(thread_root_id) |
| 774 | + .match_thread_id(thread_root_id.to_owned()) |
| 775 | + .ok() |
| 776 | + .mock_once() |
| 777 | + .mount() |
| 778 | + .await; |
| 779 | + |
| 780 | + let hit_start = |
| 781 | + room_event_cache.paginate_thread_backwards(thread_root_id.to_owned(), 42).await.unwrap(); |
| 782 | + assert!(hit_start); |
| 783 | + |
| 784 | + // Let the event cache process the update. |
| 785 | + assert_let_timeout!(Ok(ThreadEventCacheUpdate { .. }) = thread_stream.recv()); |
| 786 | + assert_let_timeout!(Ok(()) = thread_subscriber_updates.recv()); |
| 787 | +} |
0 commit comments