Skip to content
This repository was archived by the owner on Apr 12, 2024. It is now read-only.

Commit a0a1ba6

Browse files
authored
Merge pull request #8425 from matrix-org/rav/extremity_metrics
Add an improved "forward extremities" metric
2 parents 8b40843 + 32acab3 commit a0a1ba6

File tree

5 files changed

+144
-84
lines changed

5 files changed

+144
-84
lines changed

changelog.d/8425.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add experimental prometheus metric to track numbers of "large" rooms for state resolutiom.

synapse/metrics/__init__.py

Lines changed: 68 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import functools
1717
import gc
18+
import itertools
1819
import logging
1920
import os
2021
import platform
@@ -27,8 +28,8 @@
2728
from prometheus_client.core import (
2829
REGISTRY,
2930
CounterMetricFamily,
31+
GaugeHistogramMetricFamily,
3032
GaugeMetricFamily,
31-
HistogramMetricFamily,
3233
)
3334

3435
from twisted.internet import reactor
@@ -46,7 +47,7 @@
4647
METRICS_PREFIX = "/_synapse/metrics"
4748

4849
running_on_pypy = platform.python_implementation() == "PyPy"
49-
all_gauges = {} # type: Dict[str, Union[LaterGauge, InFlightGauge, BucketCollector]]
50+
all_gauges = {} # type: Dict[str, Union[LaterGauge, InFlightGauge]]
5051

5152
HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
5253

@@ -205,63 +206,83 @@ def _register_with_collector(self):
205206
all_gauges[self.name] = self
206207

207208

208-
@attr.s(slots=True, hash=True)
209-
class BucketCollector:
210-
"""
211-
Like a Histogram, but allows buckets to be point-in-time instead of
212-
incrementally added to.
209+
class GaugeBucketCollector:
210+
"""Like a Histogram, but the buckets are Gauges which are updated atomically.
213211
214-
Args:
215-
name (str): Base name of metric to be exported to Prometheus.
216-
data_collector (callable -> dict): A synchronous callable that
217-
returns a dict mapping bucket to number of items in the
218-
bucket. If these buckets are not the same as the buckets
219-
given to this class, they will be remapped into them.
220-
buckets (list[float]): List of floats/ints of the buckets to
221-
give to Prometheus. +Inf is ignored, if given.
212+
The data is updated by calling `update_data` with an iterable of measurements.
222213
214+
We assume that the data is updated less frequently than it is reported to
215+
Prometheus, and optimise for that case.
223216
"""
224217

225-
name = attr.ib()
226-
data_collector = attr.ib()
227-
buckets = attr.ib()
218+
__slots__ = ("_name", "_documentation", "_bucket_bounds", "_metric")
228219

229-
def collect(self):
220+
def __init__(
221+
self,
222+
name: str,
223+
documentation: str,
224+
buckets: Iterable[float],
225+
registry=REGISTRY,
226+
):
227+
"""
228+
Args:
229+
name: base name of metric to be exported to Prometheus. (a _bucket suffix
230+
will be added.)
231+
documentation: help text for the metric
232+
buckets: The top bounds of the buckets to report
233+
registry: metric registry to register with
234+
"""
235+
self._name = name
236+
self._documentation = documentation
230237

231-
# Fetch the data -- this must be synchronous!
232-
data = self.data_collector()
238+
# the tops of the buckets
239+
self._bucket_bounds = [float(b) for b in buckets]
240+
if self._bucket_bounds != sorted(self._bucket_bounds):
241+
raise ValueError("Buckets not in sorted order")
233242

234-
buckets = {} # type: Dict[float, int]
243+
if self._bucket_bounds[-1] != float("inf"):
244+
self._bucket_bounds.append(float("inf"))
235245

236-
res = []
237-
for x in data.keys():
238-
for i, bound in enumerate(self.buckets):
239-
if x <= bound:
240-
buckets[bound] = buckets.get(bound, 0) + data[x]
246+
self._metric = self._values_to_metric([])
247+
registry.register(self)
241248

242-
for i in self.buckets:
243-
res.append([str(i), buckets.get(i, 0)])
249+
def collect(self):
250+
yield self._metric
244251

245-
res.append(["+Inf", sum(data.values())])
252+
def update_data(self, values: Iterable[float]):
253+
"""Update the data to be reported by the metric
246254
247-
metric = HistogramMetricFamily(
248-
self.name, "", buckets=res, sum_value=sum(x * y for x, y in data.items())
255+
The existing data is cleared, and each measurement in the input is assigned
256+
to the relevant bucket.
257+
"""
258+
self._metric = self._values_to_metric(values)
259+
260+
def _values_to_metric(self, values: Iterable[float]) -> GaugeHistogramMetricFamily:
261+
total = 0.0
262+
bucket_values = [0 for _ in self._bucket_bounds]
263+
264+
for v in values:
265+
# assign each value to a bucket
266+
for i, bound in enumerate(self._bucket_bounds):
267+
if v <= bound:
268+
bucket_values[i] += 1
269+
break
270+
271+
# ... and increment the sum
272+
total += v
273+
274+
# now, aggregate the bucket values so that they count the number of entries in
275+
# that bucket or below.
276+
accumulated_values = itertools.accumulate(bucket_values)
277+
278+
return GaugeHistogramMetricFamily(
279+
self._name,
280+
self._documentation,
281+
buckets=list(
282+
zip((str(b) for b in self._bucket_bounds), accumulated_values)
283+
),
284+
gsum_value=total,
249285
)
250-
yield metric
251-
252-
def __attrs_post_init__(self):
253-
self.buckets = [float(x) for x in self.buckets if x != "+Inf"]
254-
if self.buckets != sorted(self.buckets):
255-
raise ValueError("Buckets not sorted")
256-
257-
self.buckets = tuple(self.buckets)
258-
259-
if self.name in all_gauges.keys():
260-
logger.warning("%s already registered, reregistering" % (self.name,))
261-
REGISTRY.unregister(all_gauges.pop(self.name))
262-
263-
REGISTRY.register(self)
264-
all_gauges[self.name] = self
265286

266287

267288
#

synapse/metrics/_exposition.py

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import threading
2727
from http.server import BaseHTTPRequestHandler, HTTPServer
2828
from socketserver import ThreadingMixIn
29+
from typing import Dict, List
2930
from urllib.parse import parse_qs, urlparse
3031

3132
from prometheus_client import REGISTRY
@@ -124,16 +125,33 @@ def generate_latest(registry, emit_help=False):
124125
)
125126
)
126127
output.append("# TYPE {0} {1}\n".format(mname, mtype))
127-
for sample in metric.samples:
128-
# Get rid of the OpenMetrics specific samples
128+
129+
om_samples = {} # type: Dict[str, List[str]]
130+
for s in metric.samples:
129131
for suffix in ["_created", "_gsum", "_gcount"]:
130-
if sample.name.endswith(suffix):
132+
if s.name == metric.name + suffix:
133+
# OpenMetrics specific sample, put in a gauge at the end.
134+
# (these come from gaugehistograms which don't get renamed,
135+
# so no need to faff with mnewname)
136+
om_samples.setdefault(suffix, []).append(sample_line(s, s.name))
131137
break
132138
else:
133-
newname = sample.name.replace(mnewname, mname)
139+
newname = s.name.replace(mnewname, mname)
134140
if ":" in newname and newname.endswith("_total"):
135141
newname = newname[: -len("_total")]
136-
output.append(sample_line(sample, newname))
142+
output.append(sample_line(s, newname))
143+
144+
for suffix, lines in sorted(om_samples.items()):
145+
if emit_help:
146+
output.append(
147+
"# HELP {0}{1} {2}\n".format(
148+
metric.name,
149+
suffix,
150+
metric.documentation.replace("\\", r"\\").replace("\n", r"\n"),
151+
)
152+
)
153+
output.append("# TYPE {0}{1} gauge\n".format(metric.name, suffix))
154+
output.extend(lines)
137155

138156
# Get rid of the weird colon things while we're at it
139157
if mtype == "counter":
@@ -152,16 +170,16 @@ def generate_latest(registry, emit_help=False):
152170
)
153171
)
154172
output.append("# TYPE {0} {1}\n".format(mnewname, mtype))
155-
for sample in metric.samples:
156-
# Get rid of the OpenMetrics specific samples
173+
174+
for s in metric.samples:
175+
# Get rid of the OpenMetrics specific samples (we should already have
176+
# dealt with them above anyway.)
157177
for suffix in ["_created", "_gsum", "_gcount"]:
158-
if sample.name.endswith(suffix):
178+
if s.name == metric.name + suffix:
159179
break
160180
else:
161181
output.append(
162-
sample_line(
163-
sample, sample.name.replace(":total", "").replace(":", "_")
164-
)
182+
sample_line(s, s.name.replace(":total", "").replace(":", "_"))
165183
)
166184

167185
return "".join(output).encode("utf-8")

synapse/storage/databases/main/metrics.py

Lines changed: 35 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,35 @@
1212
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
15-
import typing
16-
from collections import Counter
1715

18-
from synapse.metrics import BucketCollector
16+
from synapse.metrics import GaugeBucketCollector
1917
from synapse.metrics.background_process_metrics import run_as_background_process
2018
from synapse.storage._base import SQLBaseStore
2119
from synapse.storage.database import DatabasePool
2220
from synapse.storage.databases.main.event_push_actions import (
2321
EventPushActionsWorkerStore,
2422
)
2523

24+
# Collect metrics on the number of forward extremities that exist.
25+
_extremities_collecter = GaugeBucketCollector(
26+
"synapse_forward_extremities",
27+
"Number of rooms on the server with the given number of forward extremities"
28+
" or fewer",
29+
buckets=[1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500],
30+
)
31+
32+
# we also expose metrics on the "number of excess extremity events", which is
33+
# (E-1)*N, where E is the number of extremities and N is the number of state
34+
# events in the room. This is an approximation to the number of state events
35+
# we could remove from state resolution by reducing the graph to a single
36+
# forward extremity.
37+
_excess_state_events_collecter = GaugeBucketCollector(
38+
"synapse_excess_extremity_events",
39+
"Number of rooms on the server with the given number of excess extremity "
40+
"events, or fewer",
41+
buckets=[0] + [1 << n for n in range(12)],
42+
)
43+
2644

2745
class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
2846
"""Functions to pull various metrics from the DB, for e.g. phone home
@@ -32,18 +50,6 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
3250
def __init__(self, database: DatabasePool, db_conn, hs):
3351
super().__init__(database, db_conn, hs)
3452

35-
# Collect metrics on the number of forward extremities that exist.
36-
# Counter of number of extremities to count
37-
self._current_forward_extremities_amount = (
38-
Counter()
39-
) # type: typing.Counter[int]
40-
41-
BucketCollector(
42-
"synapse_forward_extremities",
43-
lambda: self._current_forward_extremities_amount,
44-
buckets=[1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"],
45-
)
46-
4753
# Read the extrems every 60 minutes
4854
def read_forward_extremities():
4955
# run as a background process to make sure that the database transactions
@@ -58,14 +64,25 @@ async def _read_forward_extremities(self):
5864
def fetch(txn):
5965
txn.execute(
6066
"""
61-
select count(*) c from event_forward_extremities
62-
group by room_id
67+
SELECT t1.c, t2.c
68+
FROM (
69+
SELECT room_id, COUNT(*) c FROM event_forward_extremities
70+
GROUP BY room_id
71+
) t1 LEFT JOIN (
72+
SELECT room_id, COUNT(*) c FROM current_state_events
73+
GROUP BY room_id
74+
) t2 ON t1.room_id = t2.room_id
6375
"""
6476
)
6577
return txn.fetchall()
6678

6779
res = await self.db_pool.runInteraction("read_forward_extremities", fetch)
68-
self._current_forward_extremities_amount = Counter([x[0] for x in res])
80+
81+
_extremities_collecter.update_data(x[0] for x in res)
82+
83+
_excess_state_events_collecter.update_data(
84+
(x[0] - 1) * x[1] for x in res if x[1]
85+
)
6986

7087
async def count_daily_messages(self):
7188
"""

tests/storage/test_event_metrics.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -52,14 +52,14 @@ def test_exposed_to_prometheus(self):
5252
self.reactor.advance(60 * 60 * 1000)
5353
self.pump(1)
5454

55-
items = set(
55+
items = list(
5656
filter(
5757
lambda x: b"synapse_forward_extremities_" in x,
58-
generate_latest(REGISTRY).split(b"\n"),
58+
generate_latest(REGISTRY, emit_help=False).split(b"\n"),
5959
)
6060
)
6161

62-
expected = {
62+
expected = [
6363
b'synapse_forward_extremities_bucket{le="1.0"} 0.0',
6464
b'synapse_forward_extremities_bucket{le="2.0"} 2.0',
6565
b'synapse_forward_extremities_bucket{le="3.0"} 2.0',
@@ -72,9 +72,12 @@ def test_exposed_to_prometheus(self):
7272
b'synapse_forward_extremities_bucket{le="100.0"} 3.0',
7373
b'synapse_forward_extremities_bucket{le="200.0"} 3.0',
7474
b'synapse_forward_extremities_bucket{le="500.0"} 3.0',
75-
b'synapse_forward_extremities_bucket{le="+Inf"} 3.0',
76-
b"synapse_forward_extremities_count 3.0",
77-
b"synapse_forward_extremities_sum 10.0",
78-
}
79-
75+
# per https://docs.google.com/document/d/1KwV0mAXwwbvvifBvDKH_LU1YjyXE_wxCkHNoCGq1GX0/edit#heading=h.wghdjzzh72j9,
76+
# "inf" is valid: "this includes variants such as inf"
77+
b'synapse_forward_extremities_bucket{le="inf"} 3.0',
78+
b"# TYPE synapse_forward_extremities_gcount gauge",
79+
b"synapse_forward_extremities_gcount 3.0",
80+
b"# TYPE synapse_forward_extremities_gsum gauge",
81+
b"synapse_forward_extremities_gsum 10.0",
82+
]
8083
self.assertEqual(items, expected)

0 commit comments

Comments
 (0)