Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 785bcee

Browse files
committed
Merge branch 'release-v1.37' into develop
2 parents f558369 + ba9b744 commit 785bcee

File tree

12 files changed

+718
-6
lines changed

12 files changed

+718
-6
lines changed

changelog.d/10269.bugfix

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Handle inbound events from federation asynchronously.

changelog.d/10272.bugfix

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Handle inbound events from federation asynchronously.

synapse/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
except ImportError:
4848
pass
4949

50-
__version__ = "1.37.0"
50+
__version__ = "1.37.1a1"
5151

5252
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
5353
# We import here so that we don't have to install a bunch of deps when

synapse/app/generic_worker.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@
108108
from synapse.storage.databases.main.censor_events import CensorEventsStore
109109
from synapse.storage.databases.main.client_ips import ClientIpWorkerStore
110110
from synapse.storage.databases.main.e2e_room_keys import EndToEndRoomKeyStore
111+
from synapse.storage.databases.main.lock import LockStore
111112
from synapse.storage.databases.main.media_repository import MediaRepositoryStore
112113
from synapse.storage.databases.main.metrics import ServerMetricsStore
113114
from synapse.storage.databases.main.monthly_active_users import (
@@ -249,6 +250,7 @@ class GenericWorkerSlavedStore(
249250
ServerMetricsStore,
250251
SearchStore,
251252
TransactionWorkerStore,
253+
LockStore,
252254
BaseSlavedStore,
253255
):
254256
pass

synapse/federation/federation_server.py

Lines changed: 96 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
SynapseError,
4545
UnsupportedRoomVersionError,
4646
)
47-
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
47+
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
4848
from synapse.events import EventBase
4949
from synapse.events.snapshot import EventContext
5050
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
@@ -58,10 +58,12 @@
5858
)
5959
from synapse.logging.opentracing import log_kv, start_active_span_from_edu, trace
6060
from synapse.logging.utils import log_function
61+
from synapse.metrics.background_process_metrics import wrap_as_background_process
6162
from synapse.replication.http.federation import (
6263
ReplicationFederationSendEduRestServlet,
6364
ReplicationGetQueryRestServlet,
6465
)
66+
from synapse.storage.databases.main.lock import Lock
6567
from synapse.types import JsonDict
6668
from synapse.util import glob_to_regex, json_decoder, unwrapFirstError
6769
from synapse.util.async_helpers import Linearizer, concurrently_execute
@@ -97,6 +99,11 @@
9799
)
98100

99101

102+
# The name of the lock to use when process events in a room received over
103+
# federation.
104+
_INBOUND_EVENT_HANDLING_LOCK_NAME = "federation_inbound_pdu"
105+
106+
100107
class FederationServer(FederationBase):
101108
def __init__(self, hs: "HomeServer"):
102109
super().__init__(hs)
@@ -857,7 +864,94 @@ async def _handle_received_pdu(self, origin: str, pdu: EventBase) -> None:
857864
except SynapseError as e:
858865
raise FederationError("ERROR", e.code, e.msg, affected=pdu.event_id)
859866

860-
await self.handler.on_receive_pdu(origin, pdu, sent_to_us_directly=True)
867+
# Add the event to our staging area
868+
await self.store.insert_received_event_to_staging(origin, pdu)
869+
870+
# Try and acquire the processing lock for the room, if we get it start a
871+
# background process for handling the events in the room.
872+
lock = await self.store.try_acquire_lock(
873+
_INBOUND_EVENT_HANDLING_LOCK_NAME, pdu.room_id
874+
)
875+
if lock:
876+
self._process_incoming_pdus_in_room_inner(
877+
pdu.room_id, room_version, lock, origin, pdu
878+
)
879+
880+
@wrap_as_background_process("_process_incoming_pdus_in_room_inner")
881+
async def _process_incoming_pdus_in_room_inner(
882+
self,
883+
room_id: str,
884+
room_version: RoomVersion,
885+
lock: Lock,
886+
latest_origin: str,
887+
latest_event: EventBase,
888+
) -> None:
889+
"""Process events in the staging area for the given room.
890+
891+
The latest_origin and latest_event args are the latest origin and event
892+
received.
893+
"""
894+
895+
# The common path is for the event we just received be the only event in
896+
# the room, so instead of pulling the event out of the DB and parsing
897+
# the event we just pull out the next event ID and check if that matches.
898+
next_origin, next_event_id = await self.store.get_next_staged_event_id_for_room(
899+
room_id
900+
)
901+
if next_origin == latest_origin and next_event_id == latest_event.event_id:
902+
origin = latest_origin
903+
event = latest_event
904+
else:
905+
next = await self.store.get_next_staged_event_for_room(
906+
room_id, room_version
907+
)
908+
if not next:
909+
return
910+
911+
origin, event = next
912+
913+
# We loop round until there are no more events in the room in the
914+
# staging area, or we fail to get the lock (which means another process
915+
# has started processing).
916+
while True:
917+
async with lock:
918+
try:
919+
await self.handler.on_receive_pdu(
920+
origin, event, sent_to_us_directly=True
921+
)
922+
except FederationError as e:
923+
# XXX: Ideally we'd inform the remote we failed to process
924+
# the event, but we can't return an error in the transaction
925+
# response (as we've already responded).
926+
logger.warning("Error handling PDU %s: %s", event.event_id, e)
927+
except Exception:
928+
f = failure.Failure()
929+
logger.error(
930+
"Failed to handle PDU %s",
931+
event.event_id,
932+
exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore
933+
)
934+
935+
await self.store.remove_received_event_from_staging(
936+
origin, event.event_id
937+
)
938+
939+
# We need to do this check outside the lock to avoid a race between
940+
# a new event being inserted by another instance and it attempting
941+
# to acquire the lock.
942+
next = await self.store.get_next_staged_event_for_room(
943+
room_id, room_version
944+
)
945+
if not next:
946+
break
947+
948+
origin, event = next
949+
950+
lock = await self.store.try_acquire_lock(
951+
_INBOUND_EVENT_HANDLING_LOCK_NAME, room_id
952+
)
953+
if not lock:
954+
return
861955

862956
def __str__(self) -> str:
863957
return "<ReplicationLayer(%s)>" % self.server_name

synapse/storage/databases/main/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
from .filtering import FilteringStore
4747
from .group_server import GroupServerStore
4848
from .keys import KeyStore
49+
from .lock import LockStore
4950
from .media_repository import MediaRepositoryStore
5051
from .metrics import ServerMetricsStore
5152
from .monthly_active_users import MonthlyActiveUsersStore
@@ -119,6 +120,7 @@ class DataStore(
119120
CacheInvalidationWorkerStore,
120121
ServerMetricsStore,
121122
EventForwardExtremitiesStore,
123+
LockStore,
122124
):
123125
def __init__(self, database: DatabasePool, db_conn, hs):
124126
self.hs = hs

synapse/storage/databases/main/event_federation.py

Lines changed: 106 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,20 @@
1414
import itertools
1515
import logging
1616
from queue import Empty, PriorityQueue
17-
from typing import Collection, Dict, Iterable, List, Set, Tuple
17+
from typing import Collection, Dict, Iterable, List, Optional, Set, Tuple
1818

1919
from synapse.api.constants import MAX_DEPTH
2020
from synapse.api.errors import StoreError
21-
from synapse.events import EventBase
21+
from synapse.api.room_versions import RoomVersion
22+
from synapse.events import EventBase, make_event_from_dict
2223
from synapse.metrics.background_process_metrics import wrap_as_background_process
23-
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
24+
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
2425
from synapse.storage.database import DatabasePool, LoggingTransaction
2526
from synapse.storage.databases.main.events_worker import EventsWorkerStore
2627
from synapse.storage.databases.main.signatures import SignatureWorkerStore
2728
from synapse.storage.engines import PostgresEngine
2829
from synapse.storage.types import Cursor
30+
from synapse.util import json_encoder
2931
from synapse.util.caches.descriptors import cached
3032
from synapse.util.caches.lrucache import LruCache
3133
from synapse.util.iterutils import batch_iter
@@ -1044,6 +1046,107 @@ def _delete_old_forward_extrem_cache_txn(txn):
10441046
_delete_old_forward_extrem_cache_txn,
10451047
)
10461048

1049+
async def insert_received_event_to_staging(
1050+
self, origin: str, event: EventBase
1051+
) -> None:
1052+
"""Insert a newly received event from federation into the staging area."""
1053+
1054+
# We use an upsert here to handle the case where we see the same event
1055+
# from the same server multiple times.
1056+
await self.db_pool.simple_upsert(
1057+
table="federation_inbound_events_staging",
1058+
keyvalues={
1059+
"origin": origin,
1060+
"event_id": event.event_id,
1061+
},
1062+
values={},
1063+
insertion_values={
1064+
"room_id": event.room_id,
1065+
"received_ts": self._clock.time_msec(),
1066+
"event_json": json_encoder.encode(event.get_dict()),
1067+
"internal_metadata": json_encoder.encode(
1068+
event.internal_metadata.get_dict()
1069+
),
1070+
},
1071+
desc="insert_received_event_to_staging",
1072+
)
1073+
1074+
async def remove_received_event_from_staging(
1075+
self,
1076+
origin: str,
1077+
event_id: str,
1078+
) -> None:
1079+
"""Remove the given event from the staging area"""
1080+
await self.db_pool.simple_delete(
1081+
table="federation_inbound_events_staging",
1082+
keyvalues={
1083+
"origin": origin,
1084+
"event_id": event_id,
1085+
},
1086+
desc="remove_received_event_from_staging",
1087+
)
1088+
1089+
async def get_next_staged_event_id_for_room(
1090+
self,
1091+
room_id: str,
1092+
) -> Optional[Tuple[str, str]]:
1093+
"""Get the next event ID in the staging area for the given room."""
1094+
1095+
def _get_next_staged_event_id_for_room_txn(txn):
1096+
sql = """
1097+
SELECT origin, event_id
1098+
FROM federation_inbound_events_staging
1099+
WHERE room_id = ?
1100+
ORDER BY received_ts ASC
1101+
LIMIT 1
1102+
"""
1103+
1104+
txn.execute(sql, (room_id,))
1105+
1106+
return txn.fetchone()
1107+
1108+
return await self.db_pool.runInteraction(
1109+
"get_next_staged_event_id_for_room", _get_next_staged_event_id_for_room_txn
1110+
)
1111+
1112+
async def get_next_staged_event_for_room(
1113+
self,
1114+
room_id: str,
1115+
room_version: RoomVersion,
1116+
) -> Optional[Tuple[str, EventBase]]:
1117+
"""Get the next event in the staging area for the given room."""
1118+
1119+
def _get_next_staged_event_for_room_txn(txn):
1120+
sql = """
1121+
SELECT event_json, internal_metadata, origin
1122+
FROM federation_inbound_events_staging
1123+
WHERE room_id = ?
1124+
ORDER BY received_ts ASC
1125+
LIMIT 1
1126+
"""
1127+
txn.execute(sql, (room_id,))
1128+
1129+
return txn.fetchone()
1130+
1131+
row = await self.db_pool.runInteraction(
1132+
"get_next_staged_event_for_room", _get_next_staged_event_for_room_txn
1133+
)
1134+
1135+
if not row:
1136+
return None
1137+
1138+
event_d = db_to_json(row[0])
1139+
internal_metadata_d = db_to_json(row[1])
1140+
origin = row[2]
1141+
1142+
event = make_event_from_dict(
1143+
event_dict=event_d,
1144+
room_version=room_version,
1145+
internal_metadata_dict=internal_metadata_d,
1146+
)
1147+
1148+
return origin, event
1149+
10471150

10481151
class EventFederationStore(EventFederationWorkerStore):
10491152
"""Responsible for storing and serving up the various graphs associated

0 commit comments

Comments
 (0)