Skip to content

Commit 28c9ed3

Browse files
authored
Remove unnecessary replication calls (#18564)
This should be reviewed commit by commit. Nowadays it's trivial to propagate cache invalidations, which means we can move some things off the main process, and not go through HTTP replication. `ReplicationGetQueryRestServlet` appeared to be unused, and was very weird, as it was being called if the current instance is the main one… to RPC to the main one (if no instance is set on a replication client, it makes it to the main process) The other two handlers could be relatively trivially moved to any workers, moving some methods to the worker store. **I've intentionally not removed the replication servlets yet** so that it's safe to rollout, and will do another PR that clean those up to remove on the N+1 version
1 parent 1dc2956 commit 28c9ed3

File tree

7 files changed

+97
-111
lines changed

7 files changed

+97
-111
lines changed

changelog.d/18564.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Remove unnecessary HTTP replication calls.

synapse/federation/federation_server.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,6 @@
8585
from synapse.metrics.background_process_metrics import wrap_as_background_process
8686
from synapse.replication.http.federation import (
8787
ReplicationFederationSendEduRestServlet,
88-
ReplicationGetQueryRestServlet,
8988
)
9089
from synapse.storage.databases.main.lock import Lock
9190
from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
@@ -1380,7 +1379,6 @@ def __init__(self, hs: "HomeServer"):
13801379
# and use them. However we have guards before we use them to ensure that
13811380
# we don't route to ourselves, and in monolith mode that will always be
13821381
# the case.
1383-
self._get_query_client = ReplicationGetQueryRestServlet.make_client(hs)
13841382
self._send_edu = ReplicationFederationSendEduRestServlet.make_client(hs)
13851383

13861384
self.edu_handlers: Dict[str, Callable[[str, dict], Awaitable[None]]] = {}
@@ -1469,10 +1467,6 @@ async def on_query(self, query_type: str, args: dict) -> JsonDict:
14691467
if handler:
14701468
return await handler(args)
14711469

1472-
# Check if we can route it somewhere else that isn't us
1473-
if self._instance_name == "master":
1474-
return await self._get_query_client(query_type=query_type, args=args)
1475-
14761470
# Uh oh, no handler! Let's raise an exception so the request returns an
14771471
# error.
14781472
logger.warning("No handler registered for query type %s", query_type)

synapse/handlers/federation.py

Lines changed: 3 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,6 @@
7373
from synapse.logging.opentracing import SynapseTags, set_tag, tag_args, trace
7474
from synapse.metrics.background_process_metrics import run_as_background_process
7575
from synapse.module_api import NOT_SPAM
76-
from synapse.replication.http.federation import (
77-
ReplicationCleanRoomRestServlet,
78-
ReplicationStoreRoomOnOutlierMembershipRestServlet,
79-
)
8076
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
8177
from synapse.storage.invite_rule import InviteRule
8278
from synapse.types import JsonDict, StrCollection, get_domain_from_id
@@ -163,19 +159,6 @@ def __init__(self, hs: "HomeServer"):
163159
self._notifier = hs.get_notifier()
164160
self._worker_locks = hs.get_worker_locks_handler()
165161

166-
self._clean_room_for_join_client = ReplicationCleanRoomRestServlet.make_client(
167-
hs
168-
)
169-
170-
if hs.config.worker.worker_app:
171-
self._maybe_store_room_on_outlier_membership = (
172-
ReplicationStoreRoomOnOutlierMembershipRestServlet.make_client(hs)
173-
)
174-
else:
175-
self._maybe_store_room_on_outlier_membership = (
176-
self.store.maybe_store_room_on_outlier_membership
177-
)
178-
179162
self._room_backfill = Linearizer("room_backfill")
180163

181164
self._third_party_event_rules = (
@@ -647,7 +630,7 @@ async def do_invite_join(
647630
# room.
648631
# In short, the races either have an acceptable outcome or should be
649632
# impossible.
650-
await self._clean_room_for_join(room_id)
633+
await self.store.clean_room_for_join(room_id)
651634

652635
try:
653636
# Try the host we successfully got a response to /make_join/
@@ -857,7 +840,7 @@ async def do_knock(
857840
event.internal_metadata.out_of_band_membership = True
858841

859842
# Record the room ID and its version so that we have a record of the room
860-
await self._maybe_store_room_on_outlier_membership(
843+
await self.store.maybe_store_room_on_outlier_membership(
861844
room_id=event.room_id, room_version=event_format_version
862845
)
863846

@@ -1115,7 +1098,7 @@ async def on_invite_request(
11151098
# keep a record of the room version, if we don't yet know it.
11161099
# (this may get overwritten if we later get a different room version in a
11171100
# join dance).
1118-
await self._maybe_store_room_on_outlier_membership(
1101+
await self.store.maybe_store_room_on_outlier_membership(
11191102
room_id=event.room_id, room_version=room_version
11201103
)
11211104

@@ -1761,18 +1744,6 @@ async def _check_key_revocation(self, public_key: str, url: str) -> None:
17611744
if "valid" not in response or not response["valid"]:
17621745
raise AuthError(403, "Third party certificate was invalid")
17631746

1764-
async def _clean_room_for_join(self, room_id: str) -> None:
1765-
"""Called to clean up any data in DB for a given room, ready for the
1766-
server to join the room.
1767-
1768-
Args:
1769-
room_id
1770-
"""
1771-
if self.config.worker.worker_app:
1772-
await self._clean_room_for_join_client(room_id)
1773-
else:
1774-
await self.store.clean_room_for_join(room_id)
1775-
17761747
async def get_room_complexity(
17771748
self, remote_room_hosts: List[str], room_id: str
17781749
) -> Optional[dict]:

synapse/replication/http/federation.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,8 @@ async def _handle_request( # type: ignore[override]
202202
return 200, {}
203203

204204

205+
# FIXME(2025-07-22): Remove this on the next release, this will only get used
206+
# during rollout to Synapse 1.135 and can be removed after that release.
205207
class ReplicationGetQueryRestServlet(ReplicationEndpoint):
206208
"""Handle responding to queries from federation.
207209
@@ -249,6 +251,8 @@ async def _handle_request( # type: ignore[override]
249251
return 200, result
250252

251253

254+
# FIXME(2025-07-22): Remove this on the next release, this will only get used
255+
# during rollout to Synapse 1.135 and can be removed after that release.
252256
class ReplicationCleanRoomRestServlet(ReplicationEndpoint):
253257
"""Called to clean up any data in DB for a given room, ready for the
254258
server to join the room.
@@ -284,6 +288,8 @@ async def _handle_request( # type: ignore[override]
284288
return 200, {}
285289

286290

291+
# FIXME(2025-07-22): Remove this on the next release, this will only get used
292+
# during rollout to Synapse 1.135 and can be removed after that release.
287293
class ReplicationStoreRoomOnOutlierMembershipRestServlet(ReplicationEndpoint):
288294
"""Called to clean up any data in DB for a given room, ready for the
289295
server to join the room.

synapse/storage/databases/main/cache.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
LoggingDatabaseConnection,
4242
LoggingTransaction,
4343
)
44-
from synapse.storage.databases.main.events import SLIDING_SYNC_RELEVANT_STATE_SET
4544
from synapse.storage.engines import PostgresEngine
4645
from synapse.storage.util.id_generators import MultiWriterIdGenerator
4746
from synapse.util.caches.descriptors import CachedFunction
@@ -284,6 +283,11 @@ def process_replication_position(
284283
super().process_replication_position(stream_name, instance_name, token)
285284

286285
def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None:
286+
# This is needed to avoid a circular import.
287+
from synapse.storage.databases.main.events import (
288+
SLIDING_SYNC_RELEVANT_STATE_SET,
289+
)
290+
287291
data = row.data
288292

289293
if row.type == EventsStreamEventRow.TypeId:
@@ -347,6 +351,11 @@ def _invalidate_caches_for_event(
347351
relates_to: Optional[str],
348352
backfilled: bool,
349353
) -> None:
354+
# This is needed to avoid a circular import.
355+
from synapse.storage.databases.main.events import (
356+
SLIDING_SYNC_RELEVANT_STATE_SET,
357+
)
358+
350359
# XXX: If you add something to this function make sure you add it to
351360
# `_invalidate_caches_for_room_events` as well.
352361

synapse/storage/databases/main/event_federation.py

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,14 @@
4646
from synapse.events import EventBase, make_event_from_dict
4747
from synapse.logging.opentracing import tag_args, trace
4848
from synapse.metrics.background_process_metrics import wrap_as_background_process
49-
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
49+
from synapse.storage._base import db_to_json, make_in_list_sql_clause
5050
from synapse.storage.background_updates import ForeignKeyConstraint
5151
from synapse.storage.database import (
5252
DatabasePool,
5353
LoggingDatabaseConnection,
5454
LoggingTransaction,
5555
)
56+
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
5657
from synapse.storage.databases.main.events_worker import EventsWorkerStore
5758
from synapse.storage.databases.main.signatures import SignatureWorkerStore
5859
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
@@ -123,7 +124,9 @@ def __init__(self, room_id: str):
123124
super().__init__("Unexpectedly no chain cover for events in %s" % (room_id,))
124125

125126

126-
class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBaseStore):
127+
class EventFederationWorkerStore(
128+
SignatureWorkerStore, EventsWorkerStore, CacheInvalidationWorkerStore
129+
):
127130
# TODO: this attribute comes from EventPushActionWorkerStore. Should we inherit from
128131
# that store so that mypy can deduce this for itself?
129132
stream_ordering_month_ago: Optional[int]
@@ -2053,6 +2056,19 @@ def _get_stats_for_federation_staging_txn(
20532056
number_pdus_in_federation_queue.set(count)
20542057
oldest_pdu_in_federation_staging.set(age)
20552058

2059+
async def clean_room_for_join(self, room_id: str) -> None:
2060+
await self.db_pool.runInteraction(
2061+
"clean_room_for_join", self._clean_room_for_join_txn, room_id
2062+
)
2063+
2064+
def _clean_room_for_join_txn(self, txn: LoggingTransaction, room_id: str) -> None:
2065+
query = "DELETE FROM event_forward_extremities WHERE room_id = ?"
2066+
2067+
txn.execute(query, (room_id,))
2068+
self._invalidate_cache_and_stream(
2069+
txn, self.get_latest_event_ids_in_room, (room_id,)
2070+
)
2071+
20562072

20572073
class EventFederationStore(EventFederationWorkerStore):
20582074
"""Responsible for storing and serving up the various graphs associated
@@ -2078,17 +2094,6 @@ def __init__(
20782094
self.EVENT_AUTH_STATE_ONLY, self._background_delete_non_state_event_auth
20792095
)
20802096

2081-
async def clean_room_for_join(self, room_id: str) -> None:
2082-
await self.db_pool.runInteraction(
2083-
"clean_room_for_join", self._clean_room_for_join_txn, room_id
2084-
)
2085-
2086-
def _clean_room_for_join_txn(self, txn: LoggingTransaction, room_id: str) -> None:
2087-
query = "DELETE FROM event_forward_extremities WHERE room_id = ?"
2088-
2089-
txn.execute(query, (room_id,))
2090-
txn.call_after(self.get_latest_event_ids_in_room.invalidate, (room_id,))
2091-
20922097
async def _background_delete_non_state_event_auth(
20932098
self, progress: JsonDict, batch_size: int
20942099
) -> int:

synapse/storage/databases/main/room.py

Lines changed: 59 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1935,6 +1935,65 @@ async def set_room_is_public_appservice(
19351935
desc="set_room_is_public_appservice_false",
19361936
)
19371937

1938+
async def has_auth_chain_index(self, room_id: str) -> bool:
1939+
"""Check if the room has (or can have) a chain cover index.
1940+
1941+
Defaults to True if we don't have an entry in `rooms` table nor any
1942+
events for the room.
1943+
"""
1944+
1945+
has_auth_chain_index = await self.db_pool.simple_select_one_onecol(
1946+
table="rooms",
1947+
keyvalues={"room_id": room_id},
1948+
retcol="has_auth_chain_index",
1949+
desc="has_auth_chain_index",
1950+
allow_none=True,
1951+
)
1952+
1953+
if has_auth_chain_index:
1954+
return True
1955+
1956+
# It's possible that we already have events for the room in our DB
1957+
# without a corresponding room entry. If we do then we don't want to
1958+
# mark the room as having an auth chain cover index.
1959+
max_ordering = await self.db_pool.simple_select_one_onecol(
1960+
table="events",
1961+
keyvalues={"room_id": room_id},
1962+
retcol="MAX(stream_ordering)",
1963+
allow_none=True,
1964+
desc="has_auth_chain_index_fallback",
1965+
)
1966+
1967+
return max_ordering is None
1968+
1969+
async def maybe_store_room_on_outlier_membership(
1970+
self, room_id: str, room_version: RoomVersion
1971+
) -> None:
1972+
"""
1973+
When we receive an invite or any other event over federation that may relate to a room
1974+
we are not in, store the version of the room if we don't already know the room version.
1975+
"""
1976+
# It's possible that we already have events for the room in our DB
1977+
# without a corresponding room entry. If we do then we don't want to
1978+
# mark the room as having an auth chain cover index.
1979+
has_auth_chain_index = await self.has_auth_chain_index(room_id)
1980+
1981+
await self.db_pool.simple_upsert(
1982+
desc="maybe_store_room_on_outlier_membership",
1983+
table="rooms",
1984+
keyvalues={"room_id": room_id},
1985+
values={},
1986+
insertion_values={
1987+
"room_version": room_version.identifier,
1988+
"is_public": False,
1989+
# We don't worry about setting the `creator` here because
1990+
# we don't process any messages in a room while a user is
1991+
# invited (only after the join).
1992+
"creator": "",
1993+
"has_auth_chain_index": has_auth_chain_index,
1994+
},
1995+
)
1996+
19381997

19391998
class _BackgroundUpdates:
19401999
REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory"
@@ -2186,37 +2245,6 @@ def _get_rooms(txn: LoggingTransaction) -> List[str]:
21862245

21872246
return len(rooms)
21882247

2189-
async def has_auth_chain_index(self, room_id: str) -> bool:
2190-
"""Check if the room has (or can have) a chain cover index.
2191-
2192-
Defaults to True if we don't have an entry in `rooms` table nor any
2193-
events for the room.
2194-
"""
2195-
2196-
has_auth_chain_index = await self.db_pool.simple_select_one_onecol(
2197-
table="rooms",
2198-
keyvalues={"room_id": room_id},
2199-
retcol="has_auth_chain_index",
2200-
desc="has_auth_chain_index",
2201-
allow_none=True,
2202-
)
2203-
2204-
if has_auth_chain_index:
2205-
return True
2206-
2207-
# It's possible that we already have events for the room in our DB
2208-
# without a corresponding room entry. If we do then we don't want to
2209-
# mark the room as having an auth chain cover index.
2210-
max_ordering = await self.db_pool.simple_select_one_onecol(
2211-
table="events",
2212-
keyvalues={"room_id": room_id},
2213-
retcol="MAX(stream_ordering)",
2214-
allow_none=True,
2215-
desc="has_auth_chain_index_fallback",
2216-
)
2217-
2218-
return max_ordering is None
2219-
22202248
async def _background_populate_room_depth_min_depth2(
22212249
self, progress: JsonDict, batch_size: int
22222250
) -> int:
@@ -2567,34 +2595,6 @@ def _write_partial_state_rooms_join_event_id(
25672595
updatevalues={"join_event_id": join_event_id},
25682596
)
25692597

2570-
async def maybe_store_room_on_outlier_membership(
2571-
self, room_id: str, room_version: RoomVersion
2572-
) -> None:
2573-
"""
2574-
When we receive an invite or any other event over federation that may relate to a room
2575-
we are not in, store the version of the room if we don't already know the room version.
2576-
"""
2577-
# It's possible that we already have events for the room in our DB
2578-
# without a corresponding room entry. If we do then we don't want to
2579-
# mark the room as having an auth chain cover index.
2580-
has_auth_chain_index = await self.has_auth_chain_index(room_id)
2581-
2582-
await self.db_pool.simple_upsert(
2583-
desc="maybe_store_room_on_outlier_membership",
2584-
table="rooms",
2585-
keyvalues={"room_id": room_id},
2586-
values={},
2587-
insertion_values={
2588-
"room_version": room_version.identifier,
2589-
"is_public": False,
2590-
# We don't worry about setting the `creator` here because
2591-
# we don't process any messages in a room while a user is
2592-
# invited (only after the join).
2593-
"creator": "",
2594-
"has_auth_chain_index": has_auth_chain_index,
2595-
},
2596-
)
2597-
25982598
async def add_event_report(
25992599
self,
26002600
room_id: str,

0 commit comments

Comments
 (0)