|
15 | 15 |
|
16 | 16 | import functools |
17 | 17 | import gc |
| 18 | +import itertools |
18 | 19 | import logging |
19 | 20 | import os |
20 | 21 | import platform |
|
27 | 28 | from prometheus_client.core import ( |
28 | 29 | REGISTRY, |
29 | 30 | CounterMetricFamily, |
| 31 | + GaugeHistogramMetricFamily, |
30 | 32 | GaugeMetricFamily, |
31 | | - HistogramMetricFamily, |
32 | 33 | ) |
33 | 34 |
|
34 | 35 | from twisted.internet import reactor |
|
46 | 47 | METRICS_PREFIX = "/_synapse/metrics" |
47 | 48 |
|
48 | 49 | running_on_pypy = platform.python_implementation() == "PyPy" |
49 | | -all_gauges = {} # type: Dict[str, Union[LaterGauge, InFlightGauge, BucketCollector]] |
| 50 | +all_gauges = {} # type: Dict[str, Union[LaterGauge, InFlightGauge]] |
50 | 51 |
|
51 | 52 | HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat") |
52 | 53 |
|
@@ -205,63 +206,83 @@ def _register_with_collector(self): |
205 | 206 | all_gauges[self.name] = self |
206 | 207 |
|
207 | 208 |
|
208 | | -@attr.s(slots=True, hash=True) |
209 | | -class BucketCollector: |
210 | | - """ |
211 | | - Like a Histogram, but allows buckets to be point-in-time instead of |
212 | | - incrementally added to. |
| 209 | +class GaugeBucketCollector: |
| 210 | + """Like a Histogram, but the buckets are Gauges which are updated atomically. |
213 | 211 |
|
214 | | - Args: |
215 | | - name (str): Base name of metric to be exported to Prometheus. |
216 | | - data_collector (callable -> dict): A synchronous callable that |
217 | | - returns a dict mapping bucket to number of items in the |
218 | | - bucket. If these buckets are not the same as the buckets |
219 | | - given to this class, they will be remapped into them. |
220 | | - buckets (list[float]): List of floats/ints of the buckets to |
221 | | - give to Prometheus. +Inf is ignored, if given. |
| 212 | + The data is updated by calling `update_data` with an iterable of measurements. |
222 | 213 |
|
| 214 | + We assume that the data is updated less frequently than it is reported to |
| 215 | + Prometheus, and optimise for that case. |
223 | 216 | """ |
224 | 217 |
|
225 | | - name = attr.ib() |
226 | | - data_collector = attr.ib() |
227 | | - buckets = attr.ib() |
| 218 | + __slots__ = ("_name", "_documentation", "_bucket_bounds", "_metric") |
228 | 219 |
|
229 | | - def collect(self): |
| 220 | + def __init__( |
| 221 | + self, |
| 222 | + name: str, |
| 223 | + documentation: str, |
| 224 | + buckets: Iterable[float], |
| 225 | + registry=REGISTRY, |
| 226 | + ): |
| 227 | + """ |
| 228 | + Args: |
| 229 | + name: base name of metric to be exported to Prometheus. (a _bucket suffix |
| 230 | + will be added.) |
| 231 | + documentation: help text for the metric |
| 232 | + buckets: The top bounds of the buckets to report |
| 233 | + registry: metric registry to register with |
| 234 | + """ |
| 235 | + self._name = name |
| 236 | + self._documentation = documentation |
230 | 237 |
|
231 | | - # Fetch the data -- this must be synchronous! |
232 | | - data = self.data_collector() |
| 238 | + # the tops of the buckets |
| 239 | + self._bucket_bounds = [float(b) for b in buckets] |
| 240 | + if self._bucket_bounds != sorted(self._bucket_bounds): |
| 241 | + raise ValueError("Buckets not in sorted order") |
233 | 242 |
|
234 | | - buckets = {} # type: Dict[float, int] |
| 243 | + if self._bucket_bounds[-1] != float("inf"): |
| 244 | + self._bucket_bounds.append(float("inf")) |
235 | 245 |
|
236 | | - res = [] |
237 | | - for x in data.keys(): |
238 | | - for i, bound in enumerate(self.buckets): |
239 | | - if x <= bound: |
240 | | - buckets[bound] = buckets.get(bound, 0) + data[x] |
| 246 | + self._metric = self._values_to_metric([]) |
| 247 | + registry.register(self) |
241 | 248 |
|
242 | | - for i in self.buckets: |
243 | | - res.append([str(i), buckets.get(i, 0)]) |
| 249 | + def collect(self): |
| 250 | + yield self._metric |
244 | 251 |
|
245 | | - res.append(["+Inf", sum(data.values())]) |
| 252 | + def update_data(self, values: Iterable[float]): |
| 253 | + """Update the data to be reported by the metric |
246 | 254 |
|
247 | | - metric = HistogramMetricFamily( |
248 | | - self.name, "", buckets=res, sum_value=sum(x * y for x, y in data.items()) |
| 255 | + The existing data is cleared, and each measurement in the input is assigned |
| 256 | + to the relevant bucket. |
| 257 | + """ |
| 258 | + self._metric = self._values_to_metric(values) |
| 259 | + |
| 260 | + def _values_to_metric(self, values: Iterable[float]) -> GaugeHistogramMetricFamily: |
| 261 | + total = 0.0 |
| 262 | + bucket_values = [0 for _ in self._bucket_bounds] |
| 263 | + |
| 264 | + for v in values: |
| 265 | + # assign each value to a bucket |
| 266 | + for i, bound in enumerate(self._bucket_bounds): |
| 267 | + if v <= bound: |
| 268 | + bucket_values[i] += 1 |
| 269 | + break |
| 270 | + |
| 271 | + # ... and increment the sum |
| 272 | + total += v |
| 273 | + |
| 274 | + # now, aggregate the bucket values so that they count the number of entries in |
| 275 | + # that bucket or below. |
| 276 | + accumulated_values = itertools.accumulate(bucket_values) |
| 277 | + |
| 278 | + return GaugeHistogramMetricFamily( |
| 279 | + self._name, |
| 280 | + self._documentation, |
| 281 | + buckets=list( |
| 282 | + zip((str(b) for b in self._bucket_bounds), accumulated_values) |
| 283 | + ), |
| 284 | + gsum_value=total, |
249 | 285 | ) |
250 | | - yield metric |
251 | | - |
252 | | - def __attrs_post_init__(self): |
253 | | - self.buckets = [float(x) for x in self.buckets if x != "+Inf"] |
254 | | - if self.buckets != sorted(self.buckets): |
255 | | - raise ValueError("Buckets not sorted") |
256 | | - |
257 | | - self.buckets = tuple(self.buckets) |
258 | | - |
259 | | - if self.name in all_gauges.keys(): |
260 | | - logger.warning("%s already registered, reregistering" % (self.name,)) |
261 | | - REGISTRY.unregister(all_gauges.pop(self.name)) |
262 | | - |
263 | | - REGISTRY.register(self) |
264 | | - all_gauges[self.name] = self |
265 | 286 |
|
266 | 287 |
|
267 | 288 | # |
|
0 commit comments