Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 329ef5c

Browse files
authored
Fix the inbound PDU metric (#10279)
This broke in #10272
1 parent bc5589a commit 329ef5c

File tree

6 files changed

+93
-27
lines changed

6 files changed

+93
-27
lines changed

changelog.d/10279.bugfix

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix the prometheus `synapse_federation_server_pdu_process_time` metric. Broke in v1.37.1.

synapse/federation/federation_server.py

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -369,22 +369,21 @@ async def process_pdus_for_room(room_id: str):
369369

370370
async def process_pdu(pdu: EventBase) -> JsonDict:
371371
event_id = pdu.event_id
372-
with pdu_process_time.time():
373-
with nested_logging_context(event_id):
374-
try:
375-
await self._handle_received_pdu(origin, pdu)
376-
return {}
377-
except FederationError as e:
378-
logger.warning("Error handling PDU %s: %s", event_id, e)
379-
return {"error": str(e)}
380-
except Exception as e:
381-
f = failure.Failure()
382-
logger.error(
383-
"Failed to handle PDU %s",
384-
event_id,
385-
exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore
386-
)
387-
return {"error": str(e)}
372+
with nested_logging_context(event_id):
373+
try:
374+
await self._handle_received_pdu(origin, pdu)
375+
return {}
376+
except FederationError as e:
377+
logger.warning("Error handling PDU %s: %s", event_id, e)
378+
return {"error": str(e)}
379+
except Exception as e:
380+
f = failure.Failure()
381+
logger.error(
382+
"Failed to handle PDU %s",
383+
event_id,
384+
exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore
385+
)
386+
return {"error": str(e)}
388387

389388
await concurrently_execute(
390389
process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT
@@ -932,9 +931,13 @@ async def _process_incoming_pdus_in_room_inner(
932931
exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore
933932
)
934933

935-
await self.store.remove_received_event_from_staging(
934+
received_ts = await self.store.remove_received_event_from_staging(
936935
origin, event.event_id
937936
)
937+
if received_ts is not None:
938+
pdu_process_time.observe(
939+
(self._clock.time_msec() - received_ts) / 1000
940+
)
938941

939942
# We need to do this check outside the lock to avoid a race between
940943
# a new event being inserted by another instance and it attempting

synapse/storage/databases/main/event_federation.py

Lines changed: 56 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1075,16 +1075,62 @@ async def remove_received_event_from_staging(
10751075
self,
10761076
origin: str,
10771077
event_id: str,
1078-
) -> None:
1079-
"""Remove the given event from the staging area"""
1080-
await self.db_pool.simple_delete(
1081-
table="federation_inbound_events_staging",
1082-
keyvalues={
1083-
"origin": origin,
1084-
"event_id": event_id,
1085-
},
1086-
desc="remove_received_event_from_staging",
1087-
)
1078+
) -> Optional[int]:
1079+
"""Remove the given event from the staging area.
1080+
1081+
Returns:
1082+
The received_ts of the row that was deleted, if any.
1083+
"""
1084+
if self.db_pool.engine.supports_returning:
1085+
1086+
def _remove_received_event_from_staging_txn(txn):
1087+
sql = """
1088+
DELETE FROM federation_inbound_events_staging
1089+
WHERE origin = ? AND event_id = ?
1090+
RETURNING received_ts
1091+
"""
1092+
1093+
txn.execute(sql, (origin, event_id))
1094+
return txn.fetchone()
1095+
1096+
row = await self.db_pool.runInteraction(
1097+
"remove_received_event_from_staging",
1098+
_remove_received_event_from_staging_txn,
1099+
db_autocommit=True,
1100+
)
1101+
if row is None:
1102+
return None
1103+
1104+
return row[0]
1105+
1106+
else:
1107+
1108+
def _remove_received_event_from_staging_txn(txn):
1109+
received_ts = self.db_pool.simple_select_one_onecol_txn(
1110+
txn,
1111+
table="federation_inbound_events_staging",
1112+
keyvalues={
1113+
"origin": origin,
1114+
"event_id": event_id,
1115+
},
1116+
retcol="received_ts",
1117+
allow_none=True,
1118+
)
1119+
self.db_pool.simple_delete_txn(
1120+
txn,
1121+
table="federation_inbound_events_staging",
1122+
keyvalues={
1123+
"origin": origin,
1124+
"event_id": event_id,
1125+
},
1126+
)
1127+
1128+
return received_ts
1129+
1130+
return await self.db_pool.runInteraction(
1131+
"remove_received_event_from_staging",
1132+
_remove_received_event_from_staging_txn,
1133+
)
10881134

10891135
async def get_next_staged_event_id_for_room(
10901136
self,

synapse/storage/engines/_base.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,12 @@ def supports_using_any_list(self) -> bool:
4949
"""
5050
...
5151

52+
@property
53+
@abc.abstractmethod
54+
def supports_returning(self) -> bool:
55+
"""Do we support the `RETURNING` clause in insert/update/delete?"""
56+
...
57+
5258
@abc.abstractmethod
5359
def check_database(
5460
self, db_conn: ConnectionType, allow_outdated_version: bool = False

synapse/storage/engines/postgres.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,11 @@ def supports_using_any_list(self):
133133
"""Do we support using `a = ANY(?)` and passing a list"""
134134
return True
135135

136+
@property
137+
def supports_returning(self) -> bool:
138+
"""Do we support the `RETURNING` clause in insert/update/delete?"""
139+
return True
140+
136141
def is_deadlock(self, error):
137142
if isinstance(error, self.module.DatabaseError):
138143
# https://www.postgresql.org/docs/current/static/errcodes-appendix.html

synapse/storage/engines/sqlite.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,11 @@ def supports_using_any_list(self):
6060
"""Do we support using `a = ANY(?)` and passing a list"""
6161
return False
6262

63+
@property
64+
def supports_returning(self) -> bool:
65+
"""Do we support the `RETURNING` clause in insert/update/delete?"""
66+
return self.module.sqlite_version_info >= (3, 35, 0)
67+
6368
def check_database(self, db_conn, allow_outdated_version: bool = False):
6469
if not allow_outdated_version:
6570
version = self.module.sqlite_version_info

0 commit comments

Comments
 (0)