|
16 | 16 | from queue import Empty, PriorityQueue |
17 | 17 | from typing import Collection, Dict, Iterable, List, Optional, Set, Tuple |
18 | 18 |
|
19 | | -from prometheus_client import Gauge |
| 19 | +from prometheus_client import Counter, Gauge |
20 | 20 |
|
21 | 21 | from synapse.api.constants import MAX_DEPTH |
22 | 22 | from synapse.api.errors import StoreError |
23 | | -from synapse.api.room_versions import RoomVersion |
| 23 | +from synapse.api.room_versions import EventFormatVersions, RoomVersion |
24 | 24 | from synapse.events import EventBase, make_event_from_dict |
25 | 25 | from synapse.metrics.background_process_metrics import wrap_as_background_process |
26 | 26 | from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause |
|
44 | 44 | "The total number of events in the inbound federation staging", |
45 | 45 | ) |
46 | 46 |
|
| 47 | +pdus_pruned_from_federation_queue = Counter( |
| 48 | + "synapse_federation_server_number_inbound_pdu_pruned", |
| 49 | + "The number of events in the inbound federation staging that have been " |
| 50 | + "pruned due to the queue getting too long", |
| 51 | +) |
| 52 | + |
47 | 53 | logger = logging.getLogger(__name__) |
48 | 54 |
|
49 | 55 |
|
@@ -1277,6 +1283,100 @@ def _get_next_staged_event_for_room_txn(txn): |
1277 | 1283 |
|
1278 | 1284 | return origin, event |
1279 | 1285 |
|
| 1286 | + async def prune_staged_events_in_room( |
| 1287 | + self, |
| 1288 | + room_id: str, |
| 1289 | + room_version: RoomVersion, |
| 1290 | + ) -> bool: |
| 1291 | + """Checks if there are lots of staged events for the room, and if so |
| 1292 | + prune them down. |
| 1293 | +
|
| 1294 | + Returns: |
| 1295 | + Whether any events were pruned |
| 1296 | + """ |
| 1297 | + |
| 1298 | + # First check the size of the queue. |
| 1299 | + count = await self.db_pool.simple_select_one_onecol( |
| 1300 | + table="federation_inbound_events_staging", |
| 1301 | + keyvalues={"room_id": room_id}, |
| 1302 | + retcol="COALESCE(COUNT(*), 0)", |
| 1303 | + desc="prune_staged_events_in_room_count", |
| 1304 | + ) |
| 1305 | + |
| 1306 | + if count < 100: |
| 1307 | + return False |
| 1308 | + |
| 1309 | + # If the queue is too large, then we want clear the entire queue, |
| 1310 | + # keeping only the forward extremities (i.e. the events not referenced |
| 1311 | + # by other events in the queue). We do this so that we can always |
| 1312 | + # backpaginate in all the events we have dropped. |
| 1313 | + rows = await self.db_pool.simple_select_list( |
| 1314 | + table="federation_inbound_events_staging", |
| 1315 | + keyvalues={"room_id": room_id}, |
| 1316 | + retcols=("event_id", "event_json"), |
| 1317 | + desc="prune_staged_events_in_room_fetch", |
| 1318 | + ) |
| 1319 | + |
| 1320 | + # Find the set of events referenced by those in the queue, as well as |
| 1321 | + # collecting all the event IDs in the queue. |
| 1322 | + referenced_events: Set[str] = set() |
| 1323 | + seen_events: Set[str] = set() |
| 1324 | + for row in rows: |
| 1325 | + event_id = row["event_id"] |
| 1326 | + seen_events.add(event_id) |
| 1327 | + event_d = db_to_json(row["event_json"]) |
| 1328 | + |
| 1329 | + # We don't bother parsing the dicts into full blown event objects, |
| 1330 | + # as that is needlessly expensive. |
| 1331 | + |
| 1332 | + # We haven't checked that the `prev_events` have the right format |
| 1333 | + # yet, so we check as we go. |
| 1334 | + prev_events = event_d.get("prev_events", []) |
| 1335 | + if not isinstance(prev_events, list): |
| 1336 | + logger.info("Invalid prev_events for %s", event_id) |
| 1337 | + continue |
| 1338 | + |
| 1339 | + if room_version.event_format == EventFormatVersions.V1: |
| 1340 | + for prev_event_tuple in prev_events: |
| 1341 | + if not isinstance(prev_event_tuple, list) or len(prev_events) != 2: |
| 1342 | + logger.info("Invalid prev_events for %s", event_id) |
| 1343 | + break |
| 1344 | + |
| 1345 | + prev_event_id = prev_event_tuple[0] |
| 1346 | + if not isinstance(prev_event_id, str): |
| 1347 | + logger.info("Invalid prev_events for %s", event_id) |
| 1348 | + break |
| 1349 | + |
| 1350 | + referenced_events.add(prev_event_id) |
| 1351 | + else: |
| 1352 | + for prev_event_id in prev_events: |
| 1353 | + if not isinstance(prev_event_id, str): |
| 1354 | + logger.info("Invalid prev_events for %s", event_id) |
| 1355 | + break |
| 1356 | + |
| 1357 | + referenced_events.add(prev_event_id) |
| 1358 | + |
| 1359 | + to_delete = referenced_events & seen_events |
| 1360 | + if not to_delete: |
| 1361 | + return False |
| 1362 | + |
| 1363 | + pdus_pruned_from_federation_queue.inc(len(to_delete)) |
| 1364 | + logger.info( |
| 1365 | + "Pruning %d events in room %s from federation queue", |
| 1366 | + len(to_delete), |
| 1367 | + room_id, |
| 1368 | + ) |
| 1369 | + |
| 1370 | + await self.db_pool.simple_delete_many( |
| 1371 | + table="federation_inbound_events_staging", |
| 1372 | + keyvalues={"room_id": room_id}, |
| 1373 | + iterable=to_delete, |
| 1374 | + column="event_id", |
| 1375 | + desc="prune_staged_events_in_room_delete", |
| 1376 | + ) |
| 1377 | + |
| 1378 | + return True |
| 1379 | + |
1280 | 1380 | async def get_all_rooms_with_staged_incoming_events(self) -> List[str]: |
1281 | 1381 | """Get the room IDs of all events currently staged.""" |
1282 | 1382 | return await self.db_pool.simple_select_onecol( |
|
0 commit comments