|
28 | 28 | Union, |
29 | 29 | ) |
30 | 30 |
|
31 | | -from prometheus_client import Counter, Histogram |
| 31 | +from prometheus_client import Counter, Gauge, Histogram |
32 | 32 |
|
33 | 33 | from twisted.internet import defer |
34 | 34 | from twisted.internet.abstract import isIPAddress |
|
88 | 88 | ) |
89 | 89 |
|
90 | 90 |
|
| 91 | +last_pdu_age_metric = Gauge( |
| 92 | + "synapse_federation_last_received_pdu_age", |
| 93 | + "The age (in seconds) of the last PDU successfully received from the given domain", |
| 94 | + labelnames=("server_name",), |
| 95 | +) |
| 96 | + |
| 97 | + |
91 | 98 | class FederationServer(FederationBase): |
92 | 99 | def __init__(self, hs): |
93 | 100 | super().__init__(hs) |
@@ -118,6 +125,10 @@ def __init__(self, hs): |
118 | 125 | hs, "state_ids_resp", timeout_ms=30000 |
119 | 126 | ) |
120 | 127 |
|
| 128 | + self._federation_metrics_domains = ( |
| 129 | + hs.get_config().federation.federation_metrics_domains |
| 130 | + ) |
| 131 | + |
121 | 132 | async def on_backfill_request( |
122 | 133 | self, origin: str, room_id: str, versions: List[str], limit: int |
123 | 134 | ) -> Tuple[int, Dict[str, Any]]: |
@@ -262,7 +273,11 @@ async def _handle_pdus_in_txn( |
262 | 273 |
|
263 | 274 | pdus_by_room = {} # type: Dict[str, List[EventBase]] |
264 | 275 |
|
| 276 | + newest_pdu_ts = 0 |
| 277 | + |
265 | 278 | for p in transaction.pdus: # type: ignore |
| 279 | + # FIXME (richardv): I don't think this works: |
| 280 | + # https://github.com/matrix-org/synapse/issues/8429 |
266 | 281 | if "unsigned" in p: |
267 | 282 | unsigned = p["unsigned"] |
268 | 283 | if "age" in unsigned: |
@@ -300,6 +315,9 @@ async def _handle_pdus_in_txn( |
300 | 315 | event = event_from_pdu_json(p, room_version) |
301 | 316 | pdus_by_room.setdefault(room_id, []).append(event) |
302 | 317 |
|
| 318 | + if event.origin_server_ts > newest_pdu_ts: |
| 319 | + newest_pdu_ts = event.origin_server_ts |
| 320 | + |
303 | 321 | pdu_results = {} |
304 | 322 |
|
305 | 323 | # we can process different rooms in parallel (which is useful if they |
@@ -340,6 +358,10 @@ async def process_pdus_for_room(room_id: str): |
340 | 358 | process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT |
341 | 359 | ) |
342 | 360 |
|
| 361 | + if newest_pdu_ts and origin in self._federation_metrics_domains: |
| 362 | + newest_pdu_age = self._clock.time_msec() - newest_pdu_ts |
| 363 | + last_pdu_age_metric.labels(server_name=origin).set(newest_pdu_age / 1000) |
| 364 | + |
343 | 365 | return pdu_results |
344 | 366 |
|
345 | 367 | async def _handle_edus_in_txn(self, origin: str, transaction: Transaction): |
|
0 commit comments