Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit c54db67

Browse files
authored
Handle inbound events from federation asynchronously (#10272)
Fixes #9490 This will break a couple of SyTest that are expecting failures to be added to the response of a federation /send, which obviously doesn't happen now that things are asynchronous. Two drawbacks: Currently there is no logic to handle any events left in the staging area after restart, and so they'll only be handled on the next incoming event in that room. That can be fixed separately. We now only process one event per room at a time. This can be fixed up further down the line.
1 parent 85d237e commit c54db67

File tree

5 files changed

+241
-5
lines changed

5 files changed

+241
-5
lines changed

changelog.d/10272.bugfix

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Handle inbound events from federation asynchronously.

synapse/federation/federation_server.py

Lines changed: 96 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
SynapseError,
4545
UnsupportedRoomVersionError,
4646
)
47-
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
47+
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
4848
from synapse.events import EventBase
4949
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
5050
from synapse.federation.persistence import TransactionActions
@@ -57,10 +57,12 @@
5757
)
5858
from synapse.logging.opentracing import log_kv, start_active_span_from_edu, trace
5959
from synapse.logging.utils import log_function
60+
from synapse.metrics.background_process_metrics import wrap_as_background_process
6061
from synapse.replication.http.federation import (
6162
ReplicationFederationSendEduRestServlet,
6263
ReplicationGetQueryRestServlet,
6364
)
65+
from synapse.storage.databases.main.lock import Lock
6466
from synapse.types import JsonDict
6567
from synapse.util import glob_to_regex, json_decoder, unwrapFirstError
6668
from synapse.util.async_helpers import Linearizer, concurrently_execute
@@ -96,6 +98,11 @@
9698
)
9799

98100

101+
# The name of the lock to use when process events in a room received over
102+
# federation.
103+
_INBOUND_EVENT_HANDLING_LOCK_NAME = "federation_inbound_pdu"
104+
105+
99106
class FederationServer(FederationBase):
100107
def __init__(self, hs: "HomeServer"):
101108
super().__init__(hs)
@@ -834,7 +841,94 @@ async def _handle_received_pdu(self, origin: str, pdu: EventBase) -> None:
834841
except SynapseError as e:
835842
raise FederationError("ERROR", e.code, e.msg, affected=pdu.event_id)
836843

837-
await self.handler.on_receive_pdu(origin, pdu, sent_to_us_directly=True)
844+
# Add the event to our staging area
845+
await self.store.insert_received_event_to_staging(origin, pdu)
846+
847+
# Try and acquire the processing lock for the room, if we get it start a
848+
# background process for handling the events in the room.
849+
lock = await self.store.try_acquire_lock(
850+
_INBOUND_EVENT_HANDLING_LOCK_NAME, pdu.room_id
851+
)
852+
if lock:
853+
self._process_incoming_pdus_in_room_inner(
854+
pdu.room_id, room_version, lock, origin, pdu
855+
)
856+
857+
@wrap_as_background_process("_process_incoming_pdus_in_room_inner")
858+
async def _process_incoming_pdus_in_room_inner(
859+
self,
860+
room_id: str,
861+
room_version: RoomVersion,
862+
lock: Lock,
863+
latest_origin: str,
864+
latest_event: EventBase,
865+
) -> None:
866+
"""Process events in the staging area for the given room.
867+
868+
The latest_origin and latest_event args are the latest origin and event
869+
received.
870+
"""
871+
872+
# The common path is for the event we just received be the only event in
873+
# the room, so instead of pulling the event out of the DB and parsing
874+
# the event we just pull out the next event ID and check if that matches.
875+
next_origin, next_event_id = await self.store.get_next_staged_event_id_for_room(
876+
room_id
877+
)
878+
if next_origin == latest_origin and next_event_id == latest_event.event_id:
879+
origin = latest_origin
880+
event = latest_event
881+
else:
882+
next = await self.store.get_next_staged_event_for_room(
883+
room_id, room_version
884+
)
885+
if not next:
886+
return
887+
888+
origin, event = next
889+
890+
# We loop round until there are no more events in the room in the
891+
# staging area, or we fail to get the lock (which means another process
892+
# has started processing).
893+
while True:
894+
async with lock:
895+
try:
896+
await self.handler.on_receive_pdu(
897+
origin, event, sent_to_us_directly=True
898+
)
899+
except FederationError as e:
900+
# XXX: Ideally we'd inform the remote we failed to process
901+
# the event, but we can't return an error in the transaction
902+
# response (as we've already responded).
903+
logger.warning("Error handling PDU %s: %s", event.event_id, e)
904+
except Exception:
905+
f = failure.Failure()
906+
logger.error(
907+
"Failed to handle PDU %s",
908+
event.event_id,
909+
exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore
910+
)
911+
912+
await self.store.remove_received_event_from_staging(
913+
origin, event.event_id
914+
)
915+
916+
# We need to do this check outside the lock to avoid a race between
917+
# a new event being inserted by another instance and it attempting
918+
# to acquire the lock.
919+
next = await self.store.get_next_staged_event_for_room(
920+
room_id, room_version
921+
)
922+
if not next:
923+
break
924+
925+
origin, event = next
926+
927+
lock = await self.store.try_acquire_lock(
928+
_INBOUND_EVENT_HANDLING_LOCK_NAME, room_id
929+
)
930+
if not lock:
931+
return
838932

839933
def __str__(self) -> str:
840934
return "<ReplicationLayer(%s)>" % self.server_name

synapse/storage/databases/main/event_federation.py

Lines changed: 106 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,20 @@
1414
import itertools
1515
import logging
1616
from queue import Empty, PriorityQueue
17-
from typing import Collection, Dict, Iterable, List, Set, Tuple
17+
from typing import Collection, Dict, Iterable, List, Optional, Set, Tuple
1818

1919
from synapse.api.constants import MAX_DEPTH
2020
from synapse.api.errors import StoreError
21-
from synapse.events import EventBase
21+
from synapse.api.room_versions import RoomVersion
22+
from synapse.events import EventBase, make_event_from_dict
2223
from synapse.metrics.background_process_metrics import wrap_as_background_process
23-
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
24+
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
2425
from synapse.storage.database import DatabasePool, LoggingTransaction
2526
from synapse.storage.databases.main.events_worker import EventsWorkerStore
2627
from synapse.storage.databases.main.signatures import SignatureWorkerStore
2728
from synapse.storage.engines import PostgresEngine
2829
from synapse.storage.types import Cursor
30+
from synapse.util import json_encoder
2931
from synapse.util.caches.descriptors import cached
3032
from synapse.util.caches.lrucache import LruCache
3133
from synapse.util.iterutils import batch_iter
@@ -1044,6 +1046,107 @@ def _delete_old_forward_extrem_cache_txn(txn):
10441046
_delete_old_forward_extrem_cache_txn,
10451047
)
10461048

1049+
async def insert_received_event_to_staging(
1050+
self, origin: str, event: EventBase
1051+
) -> None:
1052+
"""Insert a newly received event from federation into the staging area."""
1053+
1054+
# We use an upsert here to handle the case where we see the same event
1055+
# from the same server multiple times.
1056+
await self.db_pool.simple_upsert(
1057+
table="federation_inbound_events_staging",
1058+
keyvalues={
1059+
"origin": origin,
1060+
"event_id": event.event_id,
1061+
},
1062+
values={},
1063+
insertion_values={
1064+
"room_id": event.room_id,
1065+
"received_ts": self._clock.time_msec(),
1066+
"event_json": json_encoder.encode(event.get_dict()),
1067+
"internal_metadata": json_encoder.encode(
1068+
event.internal_metadata.get_dict()
1069+
),
1070+
},
1071+
desc="insert_received_event_to_staging",
1072+
)
1073+
1074+
async def remove_received_event_from_staging(
1075+
self,
1076+
origin: str,
1077+
event_id: str,
1078+
) -> None:
1079+
"""Remove the given event from the staging area"""
1080+
await self.db_pool.simple_delete(
1081+
table="federation_inbound_events_staging",
1082+
keyvalues={
1083+
"origin": origin,
1084+
"event_id": event_id,
1085+
},
1086+
desc="remove_received_event_from_staging",
1087+
)
1088+
1089+
async def get_next_staged_event_id_for_room(
1090+
self,
1091+
room_id: str,
1092+
) -> Optional[Tuple[str, str]]:
1093+
"""Get the next event ID in the staging area for the given room."""
1094+
1095+
def _get_next_staged_event_id_for_room_txn(txn):
1096+
sql = """
1097+
SELECT origin, event_id
1098+
FROM federation_inbound_events_staging
1099+
WHERE room_id = ?
1100+
ORDER BY received_ts ASC
1101+
LIMIT 1
1102+
"""
1103+
1104+
txn.execute(sql, (room_id,))
1105+
1106+
return txn.fetchone()
1107+
1108+
return await self.db_pool.runInteraction(
1109+
"get_next_staged_event_id_for_room", _get_next_staged_event_id_for_room_txn
1110+
)
1111+
1112+
async def get_next_staged_event_for_room(
1113+
self,
1114+
room_id: str,
1115+
room_version: RoomVersion,
1116+
) -> Optional[Tuple[str, EventBase]]:
1117+
"""Get the next event in the staging area for the given room."""
1118+
1119+
def _get_next_staged_event_for_room_txn(txn):
1120+
sql = """
1121+
SELECT event_json, internal_metadata, origin
1122+
FROM federation_inbound_events_staging
1123+
WHERE room_id = ?
1124+
ORDER BY received_ts ASC
1125+
LIMIT 1
1126+
"""
1127+
txn.execute(sql, (room_id,))
1128+
1129+
return txn.fetchone()
1130+
1131+
row = await self.db_pool.runInteraction(
1132+
"get_next_staged_event_for_room", _get_next_staged_event_for_room_txn
1133+
)
1134+
1135+
if not row:
1136+
return None
1137+
1138+
event_d = db_to_json(row[0])
1139+
internal_metadata_d = db_to_json(row[1])
1140+
origin = row[2]
1141+
1142+
event = make_event_from_dict(
1143+
event_dict=event_d,
1144+
room_version=room_version,
1145+
internal_metadata_dict=internal_metadata_d,
1146+
)
1147+
1148+
return origin, event
1149+
10471150

10481151
class EventFederationStore(EventFederationWorkerStore):
10491152
"""Responsible for storing and serving up the various graphs associated
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/* Copyright 2021 The Matrix.org Foundation C.I.C
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
17+
-- A staging area for newly received events over federation.
18+
--
19+
-- Note we may store the same event multiple times if it comes from different
20+
-- servers; this is to handle the case if we get a redacted and non-redacted
21+
-- versions of the event.
22+
CREATE TABLE federation_inbound_events_staging (
23+
origin TEXT NOT NULL,
24+
room_id TEXT NOT NULL,
25+
event_id TEXT NOT NULL,
26+
received_ts BIGINT NOT NULL,
27+
event_json TEXT NOT NULL,
28+
internal_metadata TEXT NOT NULL
29+
);
30+
31+
CREATE INDEX federation_inbound_events_staging_room ON federation_inbound_events_staging(room_id, received_ts);
32+
CREATE UNIQUE INDEX federation_inbound_events_staging_instance_event ON federation_inbound_events_staging(origin, event_id);

sytest-blacklist

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,9 @@ We can't peek into rooms with invited history_visibility
4141
We can't peek into rooms with joined history_visibility
4242
Local users can peek by room alias
4343
Peeked rooms only turn up in the sync for the device who peeked them
44+
45+
46+
# Blacklisted due to changes made in #10272
47+
Outbound federation will ignore a missing event with bad JSON for room version 6
48+
Backfilled events whose prev_events are in a different room do not allow cross-room back-pagination
49+
Federation rejects inbound events where the prev_events cannot be found

0 commit comments

Comments
 (0)