Skip to content

Commit 3665ed0

Browse files
committed
test(event cache): add tests for automatic thread subscriptions
1 parent 78c4a71 commit 3665ed0

File tree

2 files changed

+361
-2
lines changed
  • crates/matrix-sdk

2 files changed

+361
-2
lines changed

crates/matrix-sdk/src/test_utils/mocks/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3916,6 +3916,13 @@ impl<'a> MockEndpoint<'a, PutThreadSubscriptionEndpoint> {
39163916
self.endpoint.matchers = self.endpoint.matchers.match_thread_id(thread_root);
39173917
self
39183918
}
3919+
/// Match the request body's `automatic` field against a specific event id.
3920+
pub fn match_automatic_event_id(mut self, up_to_event_id: &EventId) -> Self {
3921+
self.mock = self.mock.and(body_json(json!({
3922+
"automatic": up_to_event_id
3923+
})));
3924+
self
3925+
}
39193926
}
39203927

39213928
/// A prebuilt mock for `DELETE

crates/matrix-sdk/tests/integration/event_cache/threads.rs

Lines changed: 354 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,30 @@
1+
use std::time::Duration;
2+
13
use assert_matches2::assert_let;
24
use eyeball_im::VectorDiff;
35
use imbl::Vector;
46
use matrix_sdk::{
57
assert_let_timeout,
68
deserialized_responses::{ThreadSummaryStatus, TimelineEvent},
7-
event_cache::{RoomEventCacheUpdate, ThreadEventCacheUpdate},
9+
event_cache::{RoomEventCacheSubscriber, RoomEventCacheUpdate, ThreadEventCacheUpdate},
10+
sleep::sleep,
811
test_utils::{
912
assert_event_matches_msg,
1013
mocks::{MatrixMockServer, RoomRelationsResponseTemplate},
1114
},
15+
Client, ThreadingSupport,
1216
};
1317
use matrix_sdk_test::{
1418
async_test, event_factory::EventFactory, GlobalAccountDataTestEvent, JoinedRoomBuilder, ALICE,
1519
};
16-
use ruma::{event_id, room_id, user_id};
20+
use ruma::{
21+
event_id,
22+
events::{AnySyncTimelineEvent, Mentions},
23+
push::{ConditionalPushRule, Ruleset},
24+
room_id,
25+
serde::Raw,
26+
user_id, OwnedEventId, OwnedRoomId,
27+
};
1728
use serde_json::json;
1829
use tokio::sync::broadcast;
1930

@@ -433,3 +444,344 @@ async fn test_deduplication() {
433444
// The events were already known, so the stream is still empty.
434445
assert!(thread_stream.is_empty());
435446
}
447+
448+
struct ThreadSubscriptionTestSetup {
449+
server: MatrixMockServer,
450+
client: Client,
451+
factory: EventFactory,
452+
room_id: OwnedRoomId,
453+
subscriber: RoomEventCacheSubscriber,
454+
/// 3 events: 1 non-mention, 1 mention, and another non-mention.
455+
events: Vec<Raw<AnySyncTimelineEvent>>,
456+
mention_event_id: OwnedEventId,
457+
thread_root: OwnedEventId,
458+
}
459+
460+
/// Create a new setup for a thread subscription test, with enough data so that
461+
/// a push context can be created.
462+
///
463+
/// The setup uses custom push rules, to trigger notifications only on mentions.
464+
///
465+
/// The setup includes 3 events (1 non-mention, 1 mention, and another
466+
/// non-mention) in the same thread, for easy testing of automated
467+
/// subscriptions.
468+
async fn thread_subscription_test_setup() -> ThreadSubscriptionTestSetup {
469+
let server = MatrixMockServer::new().await;
470+
471+
let thread_root = event_id!("$thread_root");
472+
473+
// Assuming a client that's interested in thread subscriptions,
474+
let client = server
475+
.client_builder()
476+
.on_builder(|builder| {
477+
builder.with_threading_support(ThreadingSupport::Enabled { with_subscriptions: true })
478+
})
479+
.build()
480+
.await;
481+
482+
// Immediately subscribe the event cache to sync updates.
483+
client.event_cache().subscribe().unwrap();
484+
485+
let room_id = room_id!("!omelette:fromage.fr");
486+
let room = server.sync_joined_room(&client, room_id).await;
487+
488+
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
489+
490+
let (initial_events, mut subscriber) = room_event_cache.subscribe().await;
491+
assert!(initial_events.is_empty());
492+
assert!(subscriber.is_empty());
493+
494+
// Provide a dummy sync with the room's member profile of the current user, so
495+
// the push context can be created.
496+
let own_user_id = client.user_id().unwrap();
497+
let f = EventFactory::new().room(room_id).sender(*ALICE);
498+
let member = f.member(own_user_id).sender(own_user_id);
499+
500+
// Override push rules so that only an intentional mention causes a
501+
// notification.
502+
let mut push_rules = Ruleset::default();
503+
push_rules.override_.insert(ConditionalPushRule::is_user_mention(own_user_id));
504+
505+
server
506+
.mock_sync()
507+
.ok_and_run(&client, |sync_builder| {
508+
sync_builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_state_event(member));
509+
sync_builder.add_global_account_data_event(GlobalAccountDataTestEvent::Custom(json!({
510+
"type": "m.push_rules",
511+
"content": {
512+
"global": push_rules
513+
}
514+
})));
515+
})
516+
.await;
517+
518+
// Wait for the initial sync processing to complete; it will trigger a member
519+
// update, at the very least.
520+
assert_let_timeout!(Ok(RoomEventCacheUpdate::UpdateMembers { .. }) = subscriber.recv());
521+
522+
let first_reply_event_id = event_id!("$first_reply");
523+
let first_reply = f
524+
.text_msg("hey there")
525+
.in_thread(thread_root, thread_root)
526+
.event_id(first_reply_event_id)
527+
.into_raw();
528+
529+
let second_reply_event_id = event_id!("$second_reply");
530+
let second_reply = f
531+
.text_msg("hoy test user!")
532+
.mentions(Mentions::with_user_ids([own_user_id.to_owned()]))
533+
.in_thread(thread_root, first_reply_event_id)
534+
.event_id(second_reply_event_id)
535+
.into_raw();
536+
537+
let third_reply_event_id = event_id!("$third_reply");
538+
let third_reply = f
539+
.text_msg("ciao!")
540+
.in_thread(thread_root, second_reply_event_id)
541+
.event_id(third_reply_event_id)
542+
.into_raw();
543+
544+
ThreadSubscriptionTestSetup {
545+
server,
546+
client,
547+
factory: f,
548+
subscriber,
549+
events: vec![first_reply, second_reply, third_reply],
550+
mention_event_id: second_reply_event_id.to_owned(),
551+
thread_root: thread_root.to_owned(),
552+
room_id: room_id.to_owned(),
553+
}
554+
}
555+
556+
#[async_test]
557+
async fn test_auto_subscribe_thread_via_sync() {
558+
let mut s = thread_subscription_test_setup().await;
559+
560+
// (The endpoint will be called for the current thread, and with an automatic
561+
// subscription up to the given event ID.)
562+
s.server
563+
.mock_put_thread_subscription()
564+
.match_automatic_event_id(&s.mention_event_id)
565+
.match_thread_id(s.thread_root.to_owned())
566+
.ok()
567+
.mock_once()
568+
.mount()
569+
.await;
570+
571+
let mut thread_subscriber_updates =
572+
s.client.event_cache().subscribe_thread_subscriber_updates();
573+
574+
// When I receive 3 events (1 non mention, 1 mention, then 1 non mention again),
575+
// from sync, I'll get subscribed to the thread because of the second event.
576+
s.server
577+
.sync_room(&s.client, JoinedRoomBuilder::new(&s.room_id).add_timeline_bulk(s.events))
578+
.await;
579+
580+
// Let the event cache process the update.
581+
assert_let_timeout!(
582+
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { .. }) = s.subscriber.recv()
583+
);
584+
assert_let_timeout!(Ok(()) = thread_subscriber_updates.recv());
585+
586+
// The actual check is the `mock_once` call above!
587+
}
588+
589+
#[async_test]
590+
async fn test_dont_auto_subscribe_on_already_subscribed_thread() {
591+
let mut s = thread_subscription_test_setup().await;
592+
593+
// Given a thread I'm already subscribed to,
594+
s.server
595+
.mock_get_thread_subscription()
596+
.match_thread_id(s.thread_root.to_owned())
597+
.ok(false)
598+
.mock_once()
599+
.mount()
600+
.await;
601+
602+
// The PUT endpoint (to subscribe to the thread) shouldn't be called…
603+
s.server.mock_put_thread_subscription().ok().expect(0).mount().await;
604+
605+
// …when I receive a new in-thread mention for this thread.
606+
s.server
607+
.sync_room(&s.client, JoinedRoomBuilder::new(&s.room_id).add_timeline_bulk(s.events))
608+
.await;
609+
610+
// Let the event cache process the update.
611+
assert_let_timeout!(
612+
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { .. }) = s.subscriber.recv()
613+
);
614+
615+
// Let a bit of time for the background thread subscriber task to process the
616+
// update.
617+
sleep(Duration::from_millis(200)).await;
618+
619+
// The actual check is the `expect` call above!
620+
}
621+
622+
#[async_test]
623+
async fn test_auto_subscribe_on_thread_paginate() {
624+
// In this scenario, we're back-paginating a thread and making sure that the
625+
// back-paginated events do cause a subscription.
626+
627+
let s = thread_subscription_test_setup().await;
628+
629+
let event_cache = s.client.event_cache();
630+
event_cache.subscribe().unwrap();
631+
632+
let mut thread_subscriber_updates =
633+
s.client.event_cache().subscribe_thread_subscriber_updates();
634+
635+
let thread_root_id = event_id!("$thread_root");
636+
let thread_resp_id = event_id!("$thread_resp");
637+
638+
// Receive an in-thread event.
639+
let room = s
640+
.server
641+
.sync_room(
642+
&s.client,
643+
JoinedRoomBuilder::new(&s.room_id).add_timeline_event(
644+
s.factory
645+
.text_msg("that's a good point")
646+
.in_thread(thread_root_id, thread_root_id)
647+
.event_id(thread_resp_id),
648+
),
649+
)
650+
.await;
651+
652+
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
653+
654+
let (thread_events, mut thread_stream) =
655+
room_event_cache.subscribe_to_thread(thread_root_id.to_owned()).await;
656+
657+
// Sanity check: the sync event is added to the thread.
658+
let mut thread_events = wait_for_initial_events(thread_events, &mut thread_stream).await;
659+
assert_eq!(thread_events.len(), 1);
660+
assert_eq!(thread_events.remove(0).event_id().as_deref(), Some(thread_resp_id));
661+
662+
assert!(thread_subscriber_updates.is_empty());
663+
664+
// It's possible to paginate the thread, and this will push the thread root
665+
// because there's no prev-batch token.
666+
let reversed_events = s.events.into_iter().rev().map(Raw::cast_unchecked).collect();
667+
s.server
668+
.mock_room_relations()
669+
.match_target_event(thread_root_id.to_owned())
670+
.ok(RoomRelationsResponseTemplate::default().events(reversed_events))
671+
.mock_once()
672+
.mount()
673+
.await;
674+
675+
s.server
676+
.mock_room_event()
677+
.match_event_id()
678+
.ok(s.factory.text_msg("Thread root").event_id(thread_root_id).into())
679+
.mock_once()
680+
.mount()
681+
.await;
682+
683+
// (The endpoint will be called for the current thread, and with an automatic
684+
// subscription up to the given event ID.)
685+
s.server
686+
.mock_put_thread_subscription()
687+
.match_automatic_event_id(&s.mention_event_id)
688+
.match_thread_id(s.thread_root.to_owned())
689+
.ok()
690+
.mock_once()
691+
.mount()
692+
.await;
693+
694+
let hit_start =
695+
room_event_cache.paginate_thread_backwards(thread_root_id.to_owned(), 42).await.unwrap();
696+
assert!(hit_start);
697+
698+
// Let the event cache process the update.
699+
assert_let_timeout!(Ok(ThreadEventCacheUpdate { .. }) = thread_stream.recv());
700+
assert_let_timeout!(Ok(()) = thread_subscriber_updates.recv());
701+
assert!(thread_subscriber_updates.is_empty());
702+
}
703+
704+
#[async_test]
705+
async fn test_auto_subscribe_on_thread_paginate_root_event() {
706+
// In this scenario, the root of a thread is the event that would cause the
707+
// subscription.
708+
709+
let s = thread_subscription_test_setup().await;
710+
711+
let event_cache = s.client.event_cache();
712+
event_cache.subscribe().unwrap();
713+
714+
let mut thread_subscriber_updates =
715+
s.client.event_cache().subscribe_thread_subscriber_updates();
716+
717+
let thread_root_id = event_id!("$thread_root");
718+
let thread_resp_id = event_id!("$thread_resp");
719+
720+
// Receive an in-thread event.
721+
let room = s
722+
.server
723+
.sync_room(
724+
&s.client,
725+
JoinedRoomBuilder::new(&s.room_id).add_timeline_event(
726+
s.factory
727+
.text_msg("that's a good point")
728+
.in_thread(thread_root_id, thread_root_id)
729+
.event_id(thread_resp_id),
730+
),
731+
)
732+
.await;
733+
734+
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
735+
736+
let (thread_events, mut thread_stream) =
737+
room_event_cache.subscribe_to_thread(thread_root_id.to_owned()).await;
738+
739+
// Sanity check: the sync event is added to the thread.
740+
let mut thread_events = wait_for_initial_events(thread_events, &mut thread_stream).await;
741+
assert_eq!(thread_events.len(), 1);
742+
assert_eq!(thread_events.remove(0).event_id().as_deref(), Some(thread_resp_id));
743+
744+
assert!(thread_subscriber_updates.is_empty());
745+
746+
// It's possible to paginate the thread, and this will push the thread root
747+
// because there's no prev-batch token.
748+
s.server
749+
.mock_room_relations()
750+
.match_target_event(thread_root_id.to_owned())
751+
.ok(RoomRelationsResponseTemplate::default())
752+
.mock_once()
753+
.mount()
754+
.await;
755+
756+
s.server
757+
.mock_room_event()
758+
.match_event_id()
759+
.ok(s
760+
.factory
761+
.text_msg("da r00t")
762+
.event_id(thread_root_id)
763+
.mentions(Mentions::with_user_ids(s.client.user_id().map(ToOwned::to_owned)))
764+
.into())
765+
.mock_once()
766+
.mount()
767+
.await;
768+
769+
// (The endpoint will be called for the current thread, and with an automatic
770+
// subscription up to the given event ID.)
771+
s.server
772+
.mock_put_thread_subscription()
773+
.match_automatic_event_id(thread_root_id)
774+
.match_thread_id(thread_root_id.to_owned())
775+
.ok()
776+
.mock_once()
777+
.mount()
778+
.await;
779+
780+
let hit_start =
781+
room_event_cache.paginate_thread_backwards(thread_root_id.to_owned(), 42).await.unwrap();
782+
assert!(hit_start);
783+
784+
// Let the event cache process the update.
785+
assert_let_timeout!(Ok(ThreadEventCacheUpdate { .. }) = thread_stream.recv());
786+
assert_let_timeout!(Ok(()) = thread_subscriber_updates.recv());
787+
}

0 commit comments

Comments
 (0)