Skip to content
This repository was archived by the owner on Apr 12, 2024. It is now read-only.

Commit 70783a3

Browse files
committed
Merge commit '43f2b67e4' into anoa/dinsic_release_1_21_x
* commit '43f2b67e4': Intelligently select extremities used in backfill. (#8349) Add flags to /versions about whether new rooms are encrypted by default. (#8343) Fix ratelimiting for federation `/send` requests. (#8342) blacklist MSC2753 sytests until it's implemented in synapse (#8285)
2 parents 7176832 + 43f2b67 commit 70783a3

File tree

10 files changed

+141
-37
lines changed

10 files changed

+141
-37
lines changed

changelog.d/8342.bugfix

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix ratelimitng of federation `/send` requests.

changelog.d/8343.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add flags to the `/versions` endpoint that includes whether new rooms default to using E2EE.

changelog.d/8349.bugfix

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix a longstanding bug where back pagination over federation could get stuck if it failed to handle a received event.

synapse/federation/federation_server.py

Lines changed: 40 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,16 @@ def __init__(self, hs):
9797
self.state = hs.get_state_handler()
9898

9999
self.device_handler = hs.get_device_handler()
100+
self._federation_ratelimiter = hs.get_federation_ratelimiter()
100101

101102
self._server_linearizer = Linearizer("fed_server")
102103
self._transaction_linearizer = Linearizer("fed_txn_handler")
103104

105+
# We cache results for transaction with the same ID
106+
self._transaction_resp_cache = ResponseCache(
107+
hs, "fed_txn_handler", timeout_ms=30000
108+
)
109+
104110
self.transaction_actions = TransactionActions(self.store)
105111

106112
self.registry = hs.get_federation_registry()
@@ -135,22 +141,44 @@ async def on_incoming_transaction(
135141
request_time = self._clock.time_msec()
136142

137143
transaction = Transaction(**transaction_data)
144+
transaction_id = transaction.transaction_id # type: ignore
138145

139-
if not transaction.transaction_id: # type: ignore
146+
if not transaction_id:
140147
raise Exception("Transaction missing transaction_id")
141148

142-
logger.debug("[%s] Got transaction", transaction.transaction_id) # type: ignore
149+
logger.debug("[%s] Got transaction", transaction_id)
143150

144-
# use a linearizer to ensure that we don't process the same transaction
145-
# multiple times in parallel.
146-
with (
147-
await self._transaction_linearizer.queue(
148-
(origin, transaction.transaction_id) # type: ignore
149-
)
150-
):
151-
result = await self._handle_incoming_transaction(
152-
origin, transaction, request_time
153-
)
151+
# We wrap in a ResponseCache so that we de-duplicate retried
152+
# transactions.
153+
return await self._transaction_resp_cache.wrap(
154+
(origin, transaction_id),
155+
self._on_incoming_transaction_inner,
156+
origin,
157+
transaction,
158+
request_time,
159+
)
160+
161+
async def _on_incoming_transaction_inner(
162+
self, origin: str, transaction: Transaction, request_time: int
163+
) -> Tuple[int, Dict[str, Any]]:
164+
# Use a linearizer to ensure that transactions from a remote are
165+
# processed in order.
166+
with await self._transaction_linearizer.queue(origin):
167+
# We rate limit here *after* we've queued up the incoming requests,
168+
# so that we don't fill up the ratelimiter with blocked requests.
169+
#
170+
# This is important as the ratelimiter allows N concurrent requests
171+
# at a time, and only starts ratelimiting if there are more requests
172+
# than that being processed at a time. If we queued up requests in
173+
# the linearizer/response cache *after* the ratelimiting then those
174+
# queued up requests would count as part of the allowed limit of N
175+
# concurrent requests.
176+
with self._federation_ratelimiter.ratelimit(origin) as d:
177+
await d
178+
179+
result = await self._handle_incoming_transaction(
180+
origin, transaction, request_time
181+
)
154182

155183
return result
156184

synapse/federation/transport/server.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
)
4747
from synapse.server import HomeServer
4848
from synapse.types import ThirdPartyInstanceID, get_domain_from_id
49-
from synapse.util.ratelimitutils import FederationRateLimiter
5049
from synapse.util.versionstring import get_version_string
5150

5251
logger = logging.getLogger(__name__)
@@ -73,9 +72,7 @@ def __init__(self, hs, servlet_groups=None):
7372
super(TransportLayerServer, self).__init__(hs, canonical_json=False)
7473

7574
self.authenticator = Authenticator(hs)
76-
self.ratelimiter = FederationRateLimiter(
77-
self.clock, config=hs.config.rc_federation
78-
)
75+
self.ratelimiter = hs.get_federation_ratelimiter()
7976

8077
self.register_servlets()
8178

@@ -273,6 +270,8 @@ class BaseFederationServlet:
273270

274271
PREFIX = FEDERATION_V1_PREFIX # Allows specifying the API version
275272

273+
RATELIMIT = True # Whether to rate limit requests or not
274+
276275
def __init__(self, handler, authenticator, ratelimiter, server_name):
277276
self.handler = handler
278277
self.authenticator = authenticator
@@ -336,7 +335,7 @@ async def new_func(request, *args, **kwargs):
336335
)
337336

338337
with scope:
339-
if origin:
338+
if origin and self.RATELIMIT:
340339
with ratelimiter.ratelimit(origin) as d:
341340
await d
342341
if request._disconnected:
@@ -373,6 +372,10 @@ def register(self, server):
373372
class FederationSendServlet(BaseFederationServlet):
374373
PATH = "/send/(?P<transaction_id>[^/]*)/?"
375374

375+
# We ratelimit manually in the handler as we queue up the requests and we
376+
# don't want to fill up the ratelimiter with blocked requests.
377+
RATELIMIT = False
378+
376379
def __init__(self, handler, server_name, **kwargs):
377380
super(FederationSendServlet, self).__init__(
378381
handler, server_name=server_name, **kwargs

synapse/handlers/federation.py

Lines changed: 57 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -920,15 +920,26 @@ async def backfill(self, dest, room_id, limit, extremities):
920920

921921
return events
922922

923-
async def maybe_backfill(self, room_id, current_depth):
923+
async def maybe_backfill(
924+
self, room_id: str, current_depth: int, limit: int
925+
) -> bool:
924926
"""Checks the database to see if we should backfill before paginating,
925927
and if so do.
928+
929+
Args:
930+
room_id
931+
current_depth: The depth from which we're paginating from. This is
932+
used to decide if we should backfill and what extremities to
933+
use.
934+
limit: The number of events that the pagination request will
935+
return. This is used as part of the heuristic to decide if we
936+
should back paginate.
926937
"""
927938
extremities = await self.store.get_oldest_events_with_depth_in_room(room_id)
928939

929940
if not extremities:
930941
logger.debug("Not backfilling as no extremeties found.")
931-
return
942+
return False
932943

933944
# We only want to paginate if we can actually see the events we'll get,
934945
# as otherwise we'll just spend a lot of resources to get redacted
@@ -981,16 +992,54 @@ async def maybe_backfill(self, room_id, current_depth):
981992
sorted_extremeties_tuple = sorted(extremities.items(), key=lambda e: -int(e[1]))
982993
max_depth = sorted_extremeties_tuple[0][1]
983994

995+
# If we're approaching an extremity we trigger a backfill, otherwise we
996+
# no-op.
997+
#
998+
# We chose twice the limit here as then clients paginating backwards
999+
# will send pagination requests that trigger backfill at least twice
1000+
# using the most recent extremity before it gets removed (see below). We
1001+
# chose more than one times the limit in case of failure, but choosing a
1002+
# much larger factor will result in triggering a backfill request much
1003+
# earlier than necessary.
1004+
if current_depth - 2 * limit > max_depth:
1005+
logger.debug(
1006+
"Not backfilling as we don't need to. %d < %d - 2 * %d",
1007+
max_depth,
1008+
current_depth,
1009+
limit,
1010+
)
1011+
return False
1012+
1013+
logger.debug(
1014+
"room_id: %s, backfill: current_depth: %s, max_depth: %s, extrems: %s",
1015+
room_id,
1016+
current_depth,
1017+
max_depth,
1018+
sorted_extremeties_tuple,
1019+
)
1020+
1021+
# We ignore extremities that have a greater depth than our current depth
1022+
# as:
1023+
# 1. we don't really care about getting events that have happened
1024+
# before our current position; and
1025+
# 2. we have likely previously tried and failed to backfill from that
1026+
# extremity, so to avoid getting "stuck" requesting the same
1027+
# backfill repeatedly we drop those extremities.
1028+
filtered_sorted_extremeties_tuple = [
1029+
t for t in sorted_extremeties_tuple if int(t[1]) <= current_depth
1030+
]
1031+
1032+
# However, we need to check that the filtered extremities are non-empty.
1033+
# If they are empty then either we can a) bail or b) still attempt to
1034+
# backill. We opt to try backfilling anyway just in case we do get
1035+
# relevant events.
1036+
if filtered_sorted_extremeties_tuple:
1037+
sorted_extremeties_tuple = filtered_sorted_extremeties_tuple
1038+
9841039
# We don't want to specify too many extremities as it causes the backfill
9851040
# request URI to be too long.
9861041
extremities = dict(sorted_extremeties_tuple[:5])
9871042

988-
if current_depth > max_depth:
989-
logger.debug(
990-
"Not backfilling as we don't need to. %d < %d", max_depth, current_depth
991-
)
992-
return
993-
9941043
# Now we need to decide which hosts to hit first.
9951044

9961045
# First we try hosts that are already in the room

synapse/handlers/pagination.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -358,9 +358,9 @@ async def get_messages(
358358
# if we're going backwards, we might need to backfill. This
359359
# requires that we have a topo token.
360360
if room_token.topological:
361-
max_topo = room_token.topological
361+
curr_topo = room_token.topological
362362
else:
363-
max_topo = await self.store.get_max_topological_token(
363+
curr_topo = await self.store.get_current_topological_token(
364364
room_id, room_token.stream
365365
)
366366

@@ -379,13 +379,13 @@ async def get_messages(
379379
leave_token = RoomStreamToken.parse(leave_token_str)
380380
assert leave_token.topological is not None
381381

382-
if leave_token.topological < max_topo:
382+
if leave_token.topological < curr_topo:
383383
from_token = from_token.copy_and_replace(
384384
"room_key", leave_token
385385
)
386386

387387
await self.hs.get_handlers().federation_handler.maybe_backfill(
388-
room_id, max_topo
388+
room_id, curr_topo, limit=pagin_config.limit,
389389
)
390390

391391
to_room_key = None

synapse/rest/client/versions.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import logging
2020
import re
2121

22+
from synapse.api.constants import RoomCreationPreset
2223
from synapse.http.servlet import RestServlet
2324

2425
logger = logging.getLogger(__name__)
@@ -31,6 +32,20 @@ def __init__(self, hs):
3132
super(VersionsRestServlet, self).__init__()
3233
self.config = hs.config
3334

35+
# Calculate these once since they shouldn't change after start-up.
36+
self.e2ee_forced_public = (
37+
RoomCreationPreset.PUBLIC_CHAT
38+
in self.config.encryption_enabled_by_default_for_room_presets
39+
)
40+
self.e2ee_forced_private = (
41+
RoomCreationPreset.PRIVATE_CHAT
42+
in self.config.encryption_enabled_by_default_for_room_presets
43+
)
44+
self.e2ee_forced_trusted_private = (
45+
RoomCreationPreset.TRUSTED_PRIVATE_CHAT
46+
in self.config.encryption_enabled_by_default_for_room_presets
47+
)
48+
3449
def on_GET(self, request):
3550
return (
3651
200,
@@ -65,6 +80,10 @@ def on_GET(self, request):
6580
"m.lazy_load_members": True,
6681
# Implements additional endpoints as described in MSC2666
6782
"uk.half-shot.msc2666": True,
83+
# Whether new rooms will be set to encrypted or not (based on presets).
84+
"io.element.e2ee_forced.public": self.e2ee_forced_public,
85+
"io.element.e2ee_forced.private": self.e2ee_forced_private,
86+
"io.element.e2ee_forced.trusted_private": self.e2ee_forced_trusted_private,
6887
},
6988
},
7089
)

synapse/server.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@
114114
from synapse.types import DomainSpecificString
115115
from synapse.util import Clock
116116
from synapse.util.distributor import Distributor
117+
from synapse.util.ratelimitutils import FederationRateLimiter
117118
from synapse.util.stringutils import random_string
118119

119120
logger = logging.getLogger(__name__)
@@ -642,6 +643,10 @@ def get_replication_data_handler(self) -> ReplicationDataHandler:
642643
def get_replication_streams(self) -> Dict[str, Stream]:
643644
return {stream.NAME: stream(self) for stream in STREAMS_MAP.values()}
644645

646+
@cache_in_self
647+
def get_federation_ratelimiter(self) -> FederationRateLimiter:
648+
return FederationRateLimiter(self.clock, config=self.config.rc_federation)
649+
645650
async def remove_pusher(self, app_id: str, push_key: str, user_id: str):
646651
return await self.get_pusherpool().remove_pusher(app_id, push_key, user_id)
647652

synapse/storage/databases/main/stream.py

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -640,23 +640,20 @@ async def get_topological_token_for_event(self, event_id: str) -> str:
640640
)
641641
return "t%d-%d" % (row["topological_ordering"], row["stream_ordering"])
642642

643-
async def get_max_topological_token(self, room_id: str, stream_key: int) -> int:
644-
"""Get the max topological token in a room before the given stream
643+
async def get_current_topological_token(self, room_id: str, stream_key: int) -> int:
644+
"""Gets the topological token in a room after or at the given stream
645645
ordering.
646646
647647
Args:
648648
room_id
649649
stream_key
650-
651-
Returns:
652-
The maximum topological token.
653650
"""
654651
sql = (
655-
"SELECT coalesce(max(topological_ordering), 0) FROM events"
656-
" WHERE room_id = ? AND stream_ordering < ?"
652+
"SELECT coalesce(MIN(topological_ordering), 0) FROM events"
653+
" WHERE room_id = ? AND stream_ordering >= ?"
657654
)
658655
row = await self.db_pool.execute(
659-
"get_max_topological_token", None, sql, room_id, stream_key
656+
"get_current_topological_token", None, sql, room_id, stream_key
660657
)
661658
return row[0][0] if row else 0
662659

0 commit comments

Comments
 (0)