Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 2b6d41e

Browse files
authored
Recursively fetch the thread for receipts & notifications. (#13824)
Consider an event to be part of a thread if you can follow a chain of relations up to a thread root. Part of MSC3773 & MSC3771.
1 parent 3e74ad2 commit 2b6d41e

File tree

5 files changed

+162
-2
lines changed

5 files changed

+162
-2
lines changed

changelog.d/13824.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Experimental support for thread-specific receipts ([MSC3771](https://github.com/matrix-org/matrix-spec-proposals/pull/3771)).

synapse/push/bulk_push_rule_evaluator.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,8 +286,13 @@ async def action_for_event_by_user(
286286
relation.parent_id,
287287
itertools.chain(*(r.rules() for r in rules_by_user.values())),
288288
)
289+
# Recursively attempt to find the thread this event relates to.
289290
if relation.rel_type == RelationTypes.THREAD:
290291
thread_id = relation.parent_id
292+
else:
293+
# Since the event has not yet been persisted we check whether
294+
# the parent is part of a thread.
295+
thread_id = await self.store.get_thread_id(relation.parent_id) or "main"
291296

292297
evaluator = PushRuleEvaluator(
293298
_flatten_dict(event),

synapse/rest/client/receipts.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
from typing import TYPE_CHECKING, Tuple
1717

1818
from synapse.api.constants import ReceiptTypes
19-
from synapse.api.errors import SynapseError
19+
from synapse.api.errors import Codes, SynapseError
2020
from synapse.http.server import HttpServer
2121
from synapse.http.servlet import RestServlet, parse_json_object_from_request
2222
from synapse.http.site import SynapseRequest
@@ -43,6 +43,7 @@ def __init__(self, hs: "HomeServer"):
4343
self.receipts_handler = hs.get_receipts_handler()
4444
self.read_marker_handler = hs.get_read_marker_handler()
4545
self.presence_handler = hs.get_presence_handler()
46+
self._main_store = hs.get_datastores().main
4647

4748
self._known_receipt_types = {
4849
ReceiptTypes.READ,
@@ -71,7 +72,24 @@ async def on_POST(
7172
thread_id = body.get("thread_id")
7273
if not thread_id or not isinstance(thread_id, str):
7374
raise SynapseError(
74-
400, "thread_id field must be a non-empty string"
75+
400,
76+
"thread_id field must be a non-empty string",
77+
Codes.INVALID_PARAM,
78+
)
79+
80+
if receipt_type == ReceiptTypes.FULLY_READ:
81+
raise SynapseError(
82+
400,
83+
f"thread_id is not compatible with {ReceiptTypes.FULLY_READ} receipts.",
84+
Codes.INVALID_PARAM,
85+
)
86+
87+
# Ensure the event ID roughly correlates to the thread ID.
88+
if thread_id != await self._main_store.get_thread_id(event_id):
89+
raise SynapseError(
90+
400,
91+
f"event_id {event_id} is not related to thread {thread_id}",
92+
Codes.INVALID_PARAM,
7593
)
7694

7795
await self.presence_handler.bump_presence_active_time(requester.user)

synapse/storage/databases/main/relations.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -832,6 +832,42 @@ def _get_event_relations(
832832
"get_event_relations", _get_event_relations
833833
)
834834

835+
@cached()
836+
async def get_thread_id(self, event_id: str) -> Optional[str]:
837+
"""
838+
Get the thread ID for an event. This considers multi-level relations,
839+
e.g. an annotation to an event which is part of a thread.
840+
841+
Args:
842+
event_id: The event ID to fetch the thread ID for.
843+
844+
Returns:
845+
The event ID of the root event in the thread, if this event is part
846+
of a thread. None, otherwise.
847+
"""
848+
# Since event relations form a tree, we should only ever find 0 or 1
849+
# results from the below query.
850+
sql = """
851+
WITH RECURSIVE related_events AS (
852+
SELECT event_id, relates_to_id, relation_type
853+
FROM event_relations
854+
WHERE event_id = ?
855+
UNION SELECT e.event_id, e.relates_to_id, e.relation_type
856+
FROM event_relations e
857+
INNER JOIN related_events r ON r.relates_to_id = e.event_id
858+
) SELECT relates_to_id FROM related_events WHERE relation_type = 'm.thread';
859+
"""
860+
861+
def _get_thread_id(txn: LoggingTransaction) -> Optional[str]:
862+
txn.execute(sql, (event_id,))
863+
# TODO Should we ensure there's only a single result here?
864+
row = txn.fetchone()
865+
if row:
866+
return row[0]
867+
return None
868+
869+
return await self.db_pool.runInteraction("get_thread_id", _get_thread_id)
870+
835871

836872
class RelationsStore(RelationsWorkerStore):
837873
pass

tests/storage/test_event_push_actions.py

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -588,6 +588,106 @@ def _mark_read(event_id: str, thread_id: Optional[str] = None) -> None:
588588
_rotate()
589589
_assert_counts(0, 0, 0, 0)
590590

591+
def test_recursive_thread(self) -> None:
592+
"""
593+
Events related to events in a thread should still be considered part of
594+
that thread.
595+
"""
596+
597+
# Create a user to receive notifications and send receipts.
598+
user_id = self.register_user("user1235", "pass")
599+
token = self.login("user1235", "pass")
600+
601+
# And another users to send events.
602+
other_id = self.register_user("other", "pass")
603+
other_token = self.login("other", "pass")
604+
605+
# Create a room and put both users in it.
606+
room_id = self.helper.create_room_as(user_id, tok=token)
607+
self.helper.join(room_id, other_id, tok=other_token)
608+
609+
# Update the user's push rules to care about reaction events.
610+
self.get_success(
611+
self.store.add_push_rule(
612+
user_id,
613+
"related_events",
614+
priority_class=5,
615+
conditions=[
616+
{"kind": "event_match", "key": "type", "pattern": "m.reaction"}
617+
],
618+
actions=["notify"],
619+
)
620+
)
621+
622+
def _create_event(type: str, content: JsonDict) -> str:
623+
result = self.helper.send_event(
624+
room_id, type=type, content=content, tok=other_token
625+
)
626+
return result["event_id"]
627+
628+
def _assert_counts(noitf_count: int, thread_notif_count: int) -> None:
629+
counts = self.get_success(
630+
self.store.db_pool.runInteraction(
631+
"get-unread-counts",
632+
self.store._get_unread_counts_by_receipt_txn,
633+
room_id,
634+
user_id,
635+
)
636+
)
637+
self.assertEqual(
638+
counts.main_timeline,
639+
NotifCounts(
640+
notify_count=noitf_count, unread_count=0, highlight_count=0
641+
),
642+
)
643+
if thread_notif_count:
644+
self.assertEqual(
645+
counts.threads,
646+
{
647+
thread_id: NotifCounts(
648+
notify_count=thread_notif_count,
649+
unread_count=0,
650+
highlight_count=0,
651+
),
652+
},
653+
)
654+
else:
655+
self.assertEqual(counts.threads, {})
656+
657+
# Create a root event.
658+
thread_id = _create_event(
659+
"m.room.message", {"msgtype": "m.text", "body": "msg"}
660+
)
661+
_assert_counts(1, 0)
662+
663+
# Reply, creating a thread.
664+
reply_id = _create_event(
665+
"m.room.message",
666+
{
667+
"msgtype": "m.text",
668+
"body": "msg",
669+
"m.relates_to": {
670+
"rel_type": "m.thread",
671+
"event_id": thread_id,
672+
},
673+
},
674+
)
675+
_assert_counts(1, 1)
676+
677+
# Create an event related to a thread event, this should still appear in
678+
# the thread.
679+
_create_event(
680+
type="m.reaction",
681+
content={
682+
"m.relates_to": {
683+
"rel_type": "m.annotation",
684+
"event_id": reply_id,
685+
"key": "A",
686+
}
687+
},
688+
)
689+
_assert_counts(1, 2)
690+
591691
def test_find_first_stream_ordering_after_ts(self) -> None:
592692
def add_event(so: int, ts: int) -> None:
593693
self.get_success(

0 commit comments

Comments
 (0)