Skip to content
This repository was archived by the owner on Apr 12, 2024. It is now read-only.

Commit c0bebd0

Browse files
committed
Merge remote-tracking branch 'origin/erikj/async_federation_base_branch' into release-v1.37
2 parents 88f9e8d + c54db67 commit c0bebd0

File tree

11 files changed

+717
-5
lines changed

11 files changed

+717
-5
lines changed

changelog.d/10269.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add a distributed lock implementation.

changelog.d/10272.bugfix

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Handle inbound events from federation asynchronously.

synapse/app/generic_worker.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@
108108
from synapse.storage.databases.main.censor_events import CensorEventsStore
109109
from synapse.storage.databases.main.client_ips import ClientIpWorkerStore
110110
from synapse.storage.databases.main.e2e_room_keys import EndToEndRoomKeyStore
111+
from synapse.storage.databases.main.lock import LockStore
111112
from synapse.storage.databases.main.media_repository import MediaRepositoryStore
112113
from synapse.storage.databases.main.metrics import ServerMetricsStore
113114
from synapse.storage.databases.main.monthly_active_users import (
@@ -249,6 +250,7 @@ class GenericWorkerSlavedStore(
249250
ServerMetricsStore,
250251
SearchStore,
251252
TransactionWorkerStore,
253+
LockStore,
252254
BaseSlavedStore,
253255
):
254256
pass

synapse/federation/federation_server.py

Lines changed: 96 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
SynapseError,
4545
UnsupportedRoomVersionError,
4646
)
47-
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
47+
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
4848
from synapse.events import EventBase
4949
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
5050
from synapse.federation.persistence import TransactionActions
@@ -57,10 +57,12 @@
5757
)
5858
from synapse.logging.opentracing import log_kv, start_active_span_from_edu, trace
5959
from synapse.logging.utils import log_function
60+
from synapse.metrics.background_process_metrics import wrap_as_background_process
6061
from synapse.replication.http.federation import (
6162
ReplicationFederationSendEduRestServlet,
6263
ReplicationGetQueryRestServlet,
6364
)
65+
from synapse.storage.databases.main.lock import Lock
6466
from synapse.types import JsonDict
6567
from synapse.util import glob_to_regex, json_decoder, unwrapFirstError
6668
from synapse.util.async_helpers import Linearizer, concurrently_execute
@@ -96,6 +98,11 @@
9698
)
9799

98100

101+
# The name of the lock to use when process events in a room received over
102+
# federation.
103+
_INBOUND_EVENT_HANDLING_LOCK_NAME = "federation_inbound_pdu"
104+
105+
99106
class FederationServer(FederationBase):
100107
def __init__(self, hs: "HomeServer"):
101108
super().__init__(hs)
@@ -834,7 +841,94 @@ async def _handle_received_pdu(self, origin: str, pdu: EventBase) -> None:
834841
except SynapseError as e:
835842
raise FederationError("ERROR", e.code, e.msg, affected=pdu.event_id)
836843

837-
await self.handler.on_receive_pdu(origin, pdu, sent_to_us_directly=True)
844+
# Add the event to our staging area
845+
await self.store.insert_received_event_to_staging(origin, pdu)
846+
847+
# Try and acquire the processing lock for the room, if we get it start a
848+
# background process for handling the events in the room.
849+
lock = await self.store.try_acquire_lock(
850+
_INBOUND_EVENT_HANDLING_LOCK_NAME, pdu.room_id
851+
)
852+
if lock:
853+
self._process_incoming_pdus_in_room_inner(
854+
pdu.room_id, room_version, lock, origin, pdu
855+
)
856+
857+
@wrap_as_background_process("_process_incoming_pdus_in_room_inner")
858+
async def _process_incoming_pdus_in_room_inner(
859+
self,
860+
room_id: str,
861+
room_version: RoomVersion,
862+
lock: Lock,
863+
latest_origin: str,
864+
latest_event: EventBase,
865+
) -> None:
866+
"""Process events in the staging area for the given room.
867+
868+
The latest_origin and latest_event args are the latest origin and event
869+
received.
870+
"""
871+
872+
# The common path is for the event we just received be the only event in
873+
# the room, so instead of pulling the event out of the DB and parsing
874+
# the event we just pull out the next event ID and check if that matches.
875+
next_origin, next_event_id = await self.store.get_next_staged_event_id_for_room(
876+
room_id
877+
)
878+
if next_origin == latest_origin and next_event_id == latest_event.event_id:
879+
origin = latest_origin
880+
event = latest_event
881+
else:
882+
next = await self.store.get_next_staged_event_for_room(
883+
room_id, room_version
884+
)
885+
if not next:
886+
return
887+
888+
origin, event = next
889+
890+
# We loop round until there are no more events in the room in the
891+
# staging area, or we fail to get the lock (which means another process
892+
# has started processing).
893+
while True:
894+
async with lock:
895+
try:
896+
await self.handler.on_receive_pdu(
897+
origin, event, sent_to_us_directly=True
898+
)
899+
except FederationError as e:
900+
# XXX: Ideally we'd inform the remote we failed to process
901+
# the event, but we can't return an error in the transaction
902+
# response (as we've already responded).
903+
logger.warning("Error handling PDU %s: %s", event.event_id, e)
904+
except Exception:
905+
f = failure.Failure()
906+
logger.error(
907+
"Failed to handle PDU %s",
908+
event.event_id,
909+
exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore
910+
)
911+
912+
await self.store.remove_received_event_from_staging(
913+
origin, event.event_id
914+
)
915+
916+
# We need to do this check outside the lock to avoid a race between
917+
# a new event being inserted by another instance and it attempting
918+
# to acquire the lock.
919+
next = await self.store.get_next_staged_event_for_room(
920+
room_id, room_version
921+
)
922+
if not next:
923+
break
924+
925+
origin, event = next
926+
927+
lock = await self.store.try_acquire_lock(
928+
_INBOUND_EVENT_HANDLING_LOCK_NAME, room_id
929+
)
930+
if not lock:
931+
return
838932

839933
def __str__(self) -> str:
840934
return "<ReplicationLayer(%s)>" % self.server_name

synapse/storage/databases/main/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
from .filtering import FilteringStore
4747
from .group_server import GroupServerStore
4848
from .keys import KeyStore
49+
from .lock import LockStore
4950
from .media_repository import MediaRepositoryStore
5051
from .metrics import ServerMetricsStore
5152
from .monthly_active_users import MonthlyActiveUsersStore
@@ -119,6 +120,7 @@ class DataStore(
119120
CacheInvalidationWorkerStore,
120121
ServerMetricsStore,
121122
EventForwardExtremitiesStore,
123+
LockStore,
122124
):
123125
def __init__(self, database: DatabasePool, db_conn, hs):
124126
self.hs = hs

synapse/storage/databases/main/event_federation.py

Lines changed: 106 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,20 @@
1414
import itertools
1515
import logging
1616
from queue import Empty, PriorityQueue
17-
from typing import Collection, Dict, Iterable, List, Set, Tuple
17+
from typing import Collection, Dict, Iterable, List, Optional, Set, Tuple
1818

1919
from synapse.api.constants import MAX_DEPTH
2020
from synapse.api.errors import StoreError
21-
from synapse.events import EventBase
21+
from synapse.api.room_versions import RoomVersion
22+
from synapse.events import EventBase, make_event_from_dict
2223
from synapse.metrics.background_process_metrics import wrap_as_background_process
23-
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
24+
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
2425
from synapse.storage.database import DatabasePool, LoggingTransaction
2526
from synapse.storage.databases.main.events_worker import EventsWorkerStore
2627
from synapse.storage.databases.main.signatures import SignatureWorkerStore
2728
from synapse.storage.engines import PostgresEngine
2829
from synapse.storage.types import Cursor
30+
from synapse.util import json_encoder
2931
from synapse.util.caches.descriptors import cached
3032
from synapse.util.caches.lrucache import LruCache
3133
from synapse.util.iterutils import batch_iter
@@ -1044,6 +1046,107 @@ def _delete_old_forward_extrem_cache_txn(txn):
10441046
_delete_old_forward_extrem_cache_txn,
10451047
)
10461048

1049+
async def insert_received_event_to_staging(
1050+
self, origin: str, event: EventBase
1051+
) -> None:
1052+
"""Insert a newly received event from federation into the staging area."""
1053+
1054+
# We use an upsert here to handle the case where we see the same event
1055+
# from the same server multiple times.
1056+
await self.db_pool.simple_upsert(
1057+
table="federation_inbound_events_staging",
1058+
keyvalues={
1059+
"origin": origin,
1060+
"event_id": event.event_id,
1061+
},
1062+
values={},
1063+
insertion_values={
1064+
"room_id": event.room_id,
1065+
"received_ts": self._clock.time_msec(),
1066+
"event_json": json_encoder.encode(event.get_dict()),
1067+
"internal_metadata": json_encoder.encode(
1068+
event.internal_metadata.get_dict()
1069+
),
1070+
},
1071+
desc="insert_received_event_to_staging",
1072+
)
1073+
1074+
async def remove_received_event_from_staging(
1075+
self,
1076+
origin: str,
1077+
event_id: str,
1078+
) -> None:
1079+
"""Remove the given event from the staging area"""
1080+
await self.db_pool.simple_delete(
1081+
table="federation_inbound_events_staging",
1082+
keyvalues={
1083+
"origin": origin,
1084+
"event_id": event_id,
1085+
},
1086+
desc="remove_received_event_from_staging",
1087+
)
1088+
1089+
async def get_next_staged_event_id_for_room(
1090+
self,
1091+
room_id: str,
1092+
) -> Optional[Tuple[str, str]]:
1093+
"""Get the next event ID in the staging area for the given room."""
1094+
1095+
def _get_next_staged_event_id_for_room_txn(txn):
1096+
sql = """
1097+
SELECT origin, event_id
1098+
FROM federation_inbound_events_staging
1099+
WHERE room_id = ?
1100+
ORDER BY received_ts ASC
1101+
LIMIT 1
1102+
"""
1103+
1104+
txn.execute(sql, (room_id,))
1105+
1106+
return txn.fetchone()
1107+
1108+
return await self.db_pool.runInteraction(
1109+
"get_next_staged_event_id_for_room", _get_next_staged_event_id_for_room_txn
1110+
)
1111+
1112+
async def get_next_staged_event_for_room(
1113+
self,
1114+
room_id: str,
1115+
room_version: RoomVersion,
1116+
) -> Optional[Tuple[str, EventBase]]:
1117+
"""Get the next event in the staging area for the given room."""
1118+
1119+
def _get_next_staged_event_for_room_txn(txn):
1120+
sql = """
1121+
SELECT event_json, internal_metadata, origin
1122+
FROM federation_inbound_events_staging
1123+
WHERE room_id = ?
1124+
ORDER BY received_ts ASC
1125+
LIMIT 1
1126+
"""
1127+
txn.execute(sql, (room_id,))
1128+
1129+
return txn.fetchone()
1130+
1131+
row = await self.db_pool.runInteraction(
1132+
"get_next_staged_event_for_room", _get_next_staged_event_for_room_txn
1133+
)
1134+
1135+
if not row:
1136+
return None
1137+
1138+
event_d = db_to_json(row[0])
1139+
internal_metadata_d = db_to_json(row[1])
1140+
origin = row[2]
1141+
1142+
event = make_event_from_dict(
1143+
event_dict=event_d,
1144+
room_version=room_version,
1145+
internal_metadata_dict=internal_metadata_d,
1146+
)
1147+
1148+
return origin, event
1149+
10471150

10481151
class EventFederationStore(EventFederationWorkerStore):
10491152
"""Responsible for storing and serving up the various graphs associated

0 commit comments

Comments
 (0)