Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit e9a649e

Browse files
committed
Add an API for listing threads in a room.
1 parent 502f075 commit e9a649e

File tree

7 files changed

+277
-2
lines changed

7 files changed

+277
-2
lines changed

changelog.d/13394.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Experimental support for [MSC3856](https://github.com/matrix-org/matrix-spec-proposals/pull/3856): threads list API.

synapse/config/experimental.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,3 +93,6 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
9393

9494
# MSC3848: Introduce errcodes for specific event sending failures
9595
self.msc3848_enabled: bool = experimental.get("msc3848_enabled", False)
96+
97+
# MSC3856: Threads list API
98+
self.msc3856_enabled: bool = experimental.get("msc3856_enabled", False)

synapse/handlers/relations.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -480,3 +480,66 @@ async def get_bundled_aggregations(
480480
results.setdefault(event_id, BundledAggregations()).replace = edit
481481

482482
return results
483+
484+
async def get_threads(
485+
self,
486+
requester: Requester,
487+
room_id: str,
488+
limit: int = 5,
489+
from_token: Optional[StreamToken] = None,
490+
to_token: Optional[StreamToken] = None,
491+
) -> JsonDict:
492+
"""Get related events of a event, ordered by topological ordering.
493+
494+
Args:
495+
requester: The user requesting the relations.
496+
room_id: The room the event belongs to.
497+
limit: Only fetch the most recent `limit` events.
498+
from_token: Fetch rows from the given token, or from the start if None.
499+
to_token: Fetch rows up to the given token, or up to the end if None.
500+
501+
Returns:
502+
The pagination chunk.
503+
"""
504+
505+
user_id = requester.user.to_string()
506+
507+
# TODO Properly handle a user leaving a room.
508+
(_, member_event_id) = await self._auth.check_user_in_room_or_world_readable(
509+
room_id, user_id, allow_departed_users=True
510+
)
511+
512+
# Note that ignored users are not passed into get_relations_for_event
513+
# below. Ignored users are handled in filter_events_for_client (and by
514+
# not passing them in here we should get a better cache hit rate).
515+
thread_roots, next_token = await self._main_store.get_threads(
516+
room_id=room_id, limit=limit, from_token=from_token, to_token=to_token
517+
)
518+
519+
events = await self._main_store.get_events_as_list(thread_roots)
520+
521+
events = await filter_events_for_client(
522+
self._storage_controllers,
523+
user_id,
524+
events,
525+
is_peeking=(member_event_id is None),
526+
)
527+
528+
now = self._clock.time_msec()
529+
530+
aggregations = await self.get_bundled_aggregations(
531+
events, requester.user.to_string()
532+
)
533+
serialized_events = self._event_serializer.serialize_events(
534+
events, now, bundle_aggregations=aggregations
535+
)
536+
537+
return_value: JsonDict = {"chunk": serialized_events}
538+
539+
if next_token:
540+
return_value["next_batch"] = await next_token.to_string(self._main_store)
541+
542+
if from_token:
543+
return_value["prev_batch"] = await from_token.to_string(self._main_store)
544+
545+
return return_value

synapse/rest/client/relations.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414

1515
import logging
16+
import re
1617
from typing import TYPE_CHECKING, Optional, Tuple
1718

1819
from synapse.http.server import HttpServer
@@ -91,5 +92,48 @@ async def on_GET(
9192
return 200, result
9293

9394

95+
class ThreadsServlet(RestServlet):
96+
PATTERNS = (
97+
re.compile(
98+
"^/_matrix/client/unstable/org.matrix.msc3856/rooms/(?P<room_id>[^/]*)/threads"
99+
),
100+
)
101+
102+
def __init__(self, hs: "HomeServer"):
103+
super().__init__()
104+
self.auth = hs.get_auth()
105+
self.store = hs.get_datastores().main
106+
self._relations_handler = hs.get_relations_handler()
107+
108+
async def on_GET(
109+
self, request: SynapseRequest, room_id: str
110+
) -> Tuple[int, JsonDict]:
111+
requester = await self.auth.get_user_by_req(request)
112+
113+
limit = parse_integer(request, "limit", default=5)
114+
from_token_str = parse_string(request, "from")
115+
to_token_str = parse_string(request, "to")
116+
117+
# Return the relations
118+
from_token = None
119+
if from_token_str:
120+
from_token = await StreamToken.from_string(self.store, from_token_str)
121+
to_token = None
122+
if to_token_str:
123+
to_token = await StreamToken.from_string(self.store, to_token_str)
124+
125+
result = await self._relations_handler.get_threads(
126+
requester=requester,
127+
room_id=room_id,
128+
limit=limit,
129+
from_token=from_token,
130+
to_token=to_token,
131+
)
132+
133+
return 200, result
134+
135+
94136
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
95137
RelationPaginationServlet(hs).register(http_server)
138+
if hs.config.experimental.msc3856_enabled:
139+
ThreadsServlet(hs).register(http_server)

synapse/storage/databases/main/events.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1594,7 +1594,7 @@ def _update_metadata_tables_txn(
15941594
)
15951595

15961596
# Remove from relations table.
1597-
self._handle_redact_relations(txn, event.redacts)
1597+
self._handle_redact_relations(txn, event.room_id, event.redacts)
15981598

15991599
# Update the event_forward_extremities, event_backward_extremities and
16001600
# event_edges tables.
@@ -1909,6 +1909,7 @@ def _handle_event_relations(
19091909
self.store.get_thread_participated.invalidate,
19101910
(relation.parent_id, event.sender),
19111911
)
1912+
txn.call_after(self.store.get_threads.invalidate, (event.room_id,))
19121913

19131914
def _handle_insertion_event(
19141915
self, txn: LoggingTransaction, event: EventBase
@@ -2033,13 +2034,14 @@ def _handle_batch_event(self, txn: LoggingTransaction, event: EventBase) -> None
20332034
txn.execute(sql, (batch_id,))
20342035

20352036
def _handle_redact_relations(
2036-
self, txn: LoggingTransaction, redacted_event_id: str
2037+
self, txn: LoggingTransaction, room_id: str, redacted_event_id: str
20372038
) -> None:
20382039
"""Handles receiving a redaction and checking whether the redacted event
20392040
has any relations which must be removed from the database.
20402041
20412042
Args:
20422043
txn
2044+
room_id: The room ID of the event that was redacted.
20432045
redacted_event_id: The event that was redacted.
20442046
"""
20452047

@@ -2068,6 +2070,7 @@ def _handle_redact_relations(
20682070
self.store._invalidate_cache_and_stream(
20692071
txn, self.store.get_thread_participated, (redacted_relates_to,)
20702072
)
2073+
txn.call_after(self.store.get_threads.invalidate, (room_id,))
20712074
self.store._invalidate_cache_and_stream(
20722075
txn,
20732076
self.store.get_mutual_event_relations_for_rel_type,

synapse/storage/databases/main/relations.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -814,6 +814,93 @@ def _get_event_relations(
814814
"get_event_relations", _get_event_relations
815815
)
816816

817+
@cached(tree=True)
818+
async def get_threads(
819+
self,
820+
room_id: str,
821+
limit: int = 5,
822+
from_token: Optional[StreamToken] = None,
823+
to_token: Optional[StreamToken] = None,
824+
) -> Tuple[List[str], Optional[StreamToken]]:
825+
"""Get a list of thread IDs, ordered by topological ordering of their
826+
latest reply.
827+
828+
Args:
829+
room_id: The room the event belongs to.
830+
limit: Only fetch the most recent `limit` threads.
831+
from_token: Fetch rows from the given token, or from the start if None.
832+
to_token: Fetch rows up to the given token, or up to the end if None.
833+
834+
Returns:
835+
A tuple of:
836+
A list of thread root event IDs.
837+
838+
The next stream token, if one exists.
839+
"""
840+
pagination_clause = generate_pagination_where_clause(
841+
direction="b",
842+
column_names=("topological_ordering", "stream_ordering"),
843+
from_token=from_token.room_key.as_historical_tuple()
844+
if from_token
845+
else None,
846+
to_token=to_token.room_key.as_historical_tuple() if to_token else None,
847+
engine=self.database_engine,
848+
)
849+
850+
if pagination_clause:
851+
pagination_clause = "AND " + pagination_clause
852+
853+
sql = f"""
854+
SELECT relates_to_id, MAX(topological_ordering), MAX(stream_ordering)
855+
FROM event_relations
856+
INNER JOIN events USING (event_id)
857+
WHERE
858+
room_id = ? AND
859+
relation_type = '{RelationTypes.THREAD}'
860+
{pagination_clause}
861+
GROUP BY relates_to_id
862+
ORDER BY MAX(topological_ordering) DESC, MAX(stream_ordering) DESC
863+
LIMIT ?
864+
"""
865+
866+
def _get_threads_txn(
867+
txn: LoggingTransaction,
868+
) -> Tuple[List[str], Optional[StreamToken]]:
869+
txn.execute(sql, [room_id, limit + 1])
870+
871+
last_topo_id = None
872+
last_stream_id = None
873+
thread_ids = []
874+
for thread_id, topo_id, stream_id in txn:
875+
thread_ids.append(thread_id)
876+
last_topo_id = topo_id
877+
last_stream_id = stream_id
878+
879+
# If there are more events, generate the next pagination key.
880+
next_token = None
881+
if len(thread_ids) > limit and last_topo_id and last_stream_id:
882+
next_key = RoomStreamToken(last_topo_id, last_stream_id)
883+
if from_token:
884+
next_token = from_token.copy_and_replace(
885+
StreamKeyType.ROOM, next_key
886+
)
887+
else:
888+
next_token = StreamToken(
889+
room_key=next_key,
890+
presence_key=0,
891+
typing_key=0,
892+
receipt_key=0,
893+
account_data_key=0,
894+
push_rules_key=0,
895+
to_device_key=0,
896+
device_list_key=0,
897+
groups_key=0,
898+
)
899+
900+
return thread_ids[:limit], next_token
901+
902+
return await self.db_pool.runInteraction("get_threads", _get_threads_txn)
903+
817904

818905
class RelationsStore(RelationsWorkerStore):
819906
pass

tests/rest/client/test_relations.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1677,3 +1677,77 @@ def test_redact_parent_thread(self) -> None:
16771677
relations[RelationTypes.THREAD]["latest_event"]["event_id"],
16781678
related_event_id,
16791679
)
1680+
1681+
1682+
class ThreadsTestCase(BaseRelationsTestCase):
1683+
@unittest.override_config({"experimental_features": {"msc3856_enabled": True}})
1684+
def test_threads(self) -> None:
1685+
"""Create threads and ensure the ordering is due to their latest event."""
1686+
# Create 2 threads.
1687+
thread_1 = self.parent_id
1688+
res = self.helper.send(self.room, body="Thread Root!", tok=self.user_token)
1689+
thread_2 = res["event_id"]
1690+
1691+
self._send_relation(RelationTypes.THREAD, "m.room.test")
1692+
self._send_relation(RelationTypes.THREAD, "m.room.test", parent_id=thread_2)
1693+
1694+
# Request the threads in the room.
1695+
channel = self.make_request(
1696+
"GET",
1697+
f"/_matrix/client/unstable/org.matrix.msc3856/rooms/{self.room}/threads",
1698+
access_token=self.user_token,
1699+
)
1700+
self.assertEquals(200, channel.code, channel.json_body)
1701+
thread_roots = [ev["event_id"] for ev in channel.json_body["chunk"]]
1702+
self.assertEqual(thread_roots, [thread_2, thread_1])
1703+
1704+
# Update the first thread, the ordering should swap.
1705+
self._send_relation(RelationTypes.THREAD, "m.room.test")
1706+
1707+
channel = self.make_request(
1708+
"GET",
1709+
f"/_matrix/client/unstable/org.matrix.msc3856/rooms/{self.room}/threads",
1710+
access_token=self.user_token,
1711+
)
1712+
self.assertEquals(200, channel.code, channel.json_body)
1713+
thread_roots = [ev["event_id"] for ev in channel.json_body["chunk"]]
1714+
self.assertEqual(thread_roots, [thread_1, thread_2])
1715+
1716+
@unittest.override_config({"experimental_features": {"msc3856_enabled": True}})
1717+
def test_pagination(self) -> None:
1718+
"""Create threads and paginate through them."""
1719+
# Create 2 threads.
1720+
thread_1 = self.parent_id
1721+
res = self.helper.send(self.room, body="Thread Root!", tok=self.user_token)
1722+
thread_2 = res["event_id"]
1723+
1724+
self._send_relation(RelationTypes.THREAD, "m.room.test")
1725+
self._send_relation(RelationTypes.THREAD, "m.room.test", parent_id=thread_2)
1726+
1727+
# Request the threads in the room.
1728+
channel = self.make_request(
1729+
"GET",
1730+
f"/_matrix/client/unstable/org.matrix.msc3856/rooms/{self.room}/threads?limit=1",
1731+
access_token=self.user_token,
1732+
)
1733+
self.assertEquals(200, channel.code, channel.json_body)
1734+
thread_roots = [ev["event_id"] for ev in channel.json_body["chunk"]]
1735+
self.assertEqual(thread_roots, [thread_2])
1736+
1737+
# Make sure next_batch has something in it that looks like it could be a
1738+
# valid token.
1739+
next_batch = channel.json_body.get("next_batch")
1740+
self.assertIsInstance(next_batch, str, channel.json_body)
1741+
1742+
channel = self.make_request(
1743+
"GET",
1744+
f"/_matrix/client/unstable/org.matrix.msc3856/rooms/{self.room}/threads?limit=1&from={next_batch}",
1745+
access_token=self.user_token,
1746+
)
1747+
self.assertEquals(200, channel.code, channel.json_body)
1748+
thread_roots = [ev["event_id"] for ev in channel.json_body["chunk"]]
1749+
self.assertEqual(thread_roots, [thread_1], channel.json_body)
1750+
1751+
self.assertNotIn("next_batch", channel.json_body, channel.json_body)
1752+
1753+
# XXX Test ignoring users.

0 commit comments

Comments
 (0)