|
1 | 1 | import logging |
| 2 | +from collections.abc import Iterator |
2 | 3 | from dataclasses import dataclass, field |
3 | 4 | from datetime import timedelta |
4 | 5 | from typing import Final, cast |
@@ -37,25 +38,29 @@ def __post_init__(self) -> None: |
37 | 38 | "#Logs in log streaming queue", |
38 | 39 | ["job_id"], |
39 | 40 | namespace=METRICS_NAMESPACE, |
| 41 | + registry=self.registry, |
40 | 42 | ) |
41 | 43 | self._health_check_qauge = Gauge( |
42 | 44 | "log_stream_health_check", |
43 | 45 | "#Failures of log stream health check", |
44 | 46 | namespace=METRICS_NAMESPACE, |
| 47 | + registry=self.registry, |
45 | 48 | ) |
46 | 49 |
|
47 | 50 | def update_metrics( |
48 | | - self, log_queue_sizes: dict[JobID, int], health_check_failure_count: PositiveInt |
| 51 | + self, |
| 52 | + iter_log_queue_sizes: Iterator[tuple[JobID, int]], |
| 53 | + health_check_failure_count: PositiveInt, |
49 | 54 | ): |
50 | 55 | self._health_check_qauge.set(health_check_failure_count) |
51 | 56 | self._logstreaming_queues.clear() |
52 | | - for job_id, length in log_queue_sizes.items(): |
| 57 | + for job_id, length in iter_log_queue_sizes: |
53 | 58 | self._logstreaming_queues.labels(job_id=job_id).set(length) |
54 | 59 |
|
55 | 60 |
|
56 | 61 | async def _collect_prometheus_metrics_task(app: FastAPI): |
57 | 62 | get_instrumentation(app).update_metrics( |
58 | | - log_queue_sizes=get_log_distributor(app).get_log_queue_sizes, |
| 63 | + iter_log_queue_sizes=get_log_distributor(app).iter_log_queue_sizes, |
59 | 64 | health_check_failure_count=get_health_checker(app).health_check_failure_count, |
60 | 65 | ) |
61 | 66 |
|
|
0 commit comments