Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 478f1ec

Browse files
committed
Allow limiting threads by participation.
1 parent 9a5ff39 commit 478f1ec

File tree

4 files changed

+86
-12
lines changed

4 files changed

+86
-12
lines changed

synapse/handlers/relations.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -485,6 +485,7 @@ async def get_threads(
485485
self,
486486
requester: Requester,
487487
room_id: str,
488+
filter: str,
488489
limit: int = 5,
489490
from_token: Optional[StreamToken] = None,
490491
to_token: Optional[StreamToken] = None,
@@ -497,7 +498,11 @@ async def get_threads(
497498
requester: The user requesting the relations.
498499
event_id: Fetch events that relate to this event ID.
499500
room_id: The room the event belongs to.
501+
filter: One of "all" or "participated" to indicate which threads should
502+
be returned.
500503
limit: Only fetch the most recent `limit` events.
504+
include_all: True if all threads should be included, false to limit
505+
to only threads the user has participated in.
501506
from_token: Fetch rows from the given token, or from the start if None.
502507
to_token: Fetch rows up to the given token, or up to the end if None.
503508
@@ -516,7 +521,11 @@ async def get_threads(
516521
# below. Ignored users are handled in filter_events_for_client (and by
517522
# not passing them in here we should get a better cache hit rate).
518523
thread_roots, next_token = await self._main_store.get_threads(
519-
room_id=room_id, limit=limit, from_token=from_token, to_token=to_token
524+
room_id=room_id,
525+
participating_user_id=None if filter == "all" else user_id,
526+
limit=limit,
527+
from_token=from_token,
528+
to_token=to_token,
520529
)
521530

522531
events = await self._main_store.get_events_as_list(thread_roots)

synapse/rest/client/relations.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,9 @@ async def on_GET(
113113
limit = parse_integer(request, "limit", default=5)
114114
from_token_str = parse_string(request, "from")
115115
to_token_str = parse_string(request, "to")
116+
filter_str = parse_string(
117+
request, "filter", default="all", allowed_values=["all", "participated"]
118+
)
116119

117120
# Return the relations
118121
from_token = None
@@ -125,6 +128,7 @@ async def on_GET(
125128
result = await self._relations_handler.get_threads(
126129
requester=requester,
127130
room_id=room_id,
131+
filter=filter_str,
128132
limit=limit,
129133
from_token=from_token,
130134
to_token=to_token,

synapse/storage/databases/main/relations.py

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -818,6 +818,7 @@ def _get_event_relations(
818818
async def get_threads(
819819
self,
820820
room_id: str,
821+
participating_user_id: Optional[str] = None,
821822
limit: int = 5,
822823
from_token: Optional[StreamToken] = None,
823824
to_token: Optional[StreamToken] = None,
@@ -827,6 +828,8 @@ async def get_threads(
827828
828829
Args:
829830
room_id: The room the event belongs to.
831+
participating_user_id: A user who must have participated in the thread.
832+
Provide None to return all threads.
830833
limit: Only fetch the most recent `limit` events.
831834
from_token: Fetch rows from the given token, or from the start if None.
832835
to_token: Fetch rows up to the given token, or up to the end if None.
@@ -837,9 +840,21 @@ async def get_threads(
837840
838841
The next stream token, if one exists.
839842
"""
843+
where_clause = [
844+
"child.room_id = ?",
845+
f"relation_type = '{RelationTypes.THREAD}'",
846+
]
847+
where_args: List[Union[str, int]] = [room_id]
848+
849+
# The user has participated if they sent the thread root or have any reply
850+
# to the thread.
851+
if participating_user_id:
852+
where_clause.append("parent.sender = ?")
853+
where_args.append(participating_user_id)
854+
840855
pagination_clause = generate_pagination_where_clause(
841856
direction="b",
842-
column_names=("topological_ordering", "stream_ordering"),
857+
column_names=("child.topological_ordering", "child.stream_ordering"),
843858
from_token=from_token.room_key.as_historical_tuple()
844859
if from_token
845860
else None,
@@ -848,25 +863,25 @@ async def get_threads(
848863
)
849864

850865
if pagination_clause:
851-
pagination_clause = "AND " + pagination_clause
866+
where_clause.append(pagination_clause)
852867

853868
sql = f"""
854-
SELECT relates_to_id, MAX(topological_ordering), MAX(stream_ordering)
855-
FROM event_relations
856-
INNER JOIN events USING (event_id)
857-
WHERE
858-
room_id = ? AND
859-
relation_type = '{RelationTypes.THREAD}'
860-
{pagination_clause}
869+
SELECT relates_to_id, MAX(child.topological_ordering), MAX(child.stream_ordering)
870+
FROM events AS child
871+
INNER JOIN event_relations USING (event_id)
872+
INNER JOIN events AS parent ON
873+
parent.event_id = relates_to_id
874+
AND parent.room_id = child.room_id
875+
WHERE {" AND ".join(where_clause)}
861876
GROUP BY relates_to_id
862-
ORDER BY topological_ordering DESC, stream_ordering DESC
877+
ORDER BY child.topological_ordering DESC, child.stream_ordering DESC
863878
LIMIT ?
864879
"""
865880

866881
def _get_threads_txn(
867882
txn: LoggingTransaction,
868883
) -> Tuple[List[str], Optional[StreamToken]]:
869-
txn.execute(sql, [room_id, limit + 1])
884+
txn.execute(sql, where_args + [limit + 1])
870885

871886
last_topo_id = None
872887
last_stream_id = None

tests/rest/client/test_relations.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1750,4 +1750,50 @@ def test_pagination(self) -> None:
17501750

17511751
self.assertNotIn("next_batch", channel.json_body, channel.json_body)
17521752

1753+
@unittest.override_config({"experimental_features": {"msc3856_enabled": True}})
1754+
def test_filter(self) -> None:
1755+
"""Create threads and ensure the ordering is due to their latest event."""
1756+
# Thread 1 has the user as the root event.
1757+
thread_1 = self.parent_id
1758+
self._send_relation(
1759+
RelationTypes.THREAD, "m.room.test", access_token=self.user2_token
1760+
)
1761+
1762+
# Thread 2 has the user replying.
1763+
res = self.helper.send(self.room, body="Thread Root!", tok=self.user2_token)
1764+
thread_2 = res["event_id"]
1765+
self._send_relation(RelationTypes.THREAD, "m.room.test", parent_id=thread_2)
1766+
1767+
# Thread 3 has the user not participating in.
1768+
res = self.helper.send(self.room, body="Another thread!", tok=self.user2_token)
1769+
thread_3 = res["event_id"]
1770+
self._send_relation(
1771+
RelationTypes.THREAD,
1772+
"m.room.test",
1773+
access_token=self.user2_token,
1774+
parent_id=thread_3,
1775+
)
1776+
1777+
# All threads in the room.
1778+
channel = self.make_request(
1779+
"GET",
1780+
f"/_matrix/client/unstable/org.matrix.msc3856/rooms/{self.room}/threads",
1781+
access_token=self.user_token,
1782+
)
1783+
self.assertEquals(200, channel.code, channel.json_body)
1784+
thread_roots = [ev["event_id"] for ev in channel.json_body["chunk"]]
1785+
self.assertEqual(
1786+
thread_roots, [thread_3, thread_2, thread_1], channel.json_body
1787+
)
1788+
1789+
# Only participated threads.
1790+
channel = self.make_request(
1791+
"GET",
1792+
f"/_matrix/client/unstable/org.matrix.mscxxxx/rooms/{self.room}/threads?filter=participated",
1793+
access_token=self.user_token,
1794+
)
1795+
self.assertEquals(200, channel.code, channel.json_body)
1796+
thread_roots = [ev["event_id"] for ev in channel.json_body["chunk"]]
1797+
self.assertEqual(thread_roots, [thread_2, thread_1], channel.json_body)
1798+
17531799
# XXX Test ignoring users.

0 commit comments

Comments
 (0)