Skip to content

Commit 8b8c3b3

Browse files
committed
Merge branch 'use-generic-metrics-reporter' into 'main'
Metrics: Register metrics dynamically and add metric types See merge request ecdc/ess-dmsc/forwarder!293
2 parents 662eb8a + 0eb1c67 commit 8b8c3b3

14 files changed

+387
-205
lines changed

changes.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
* Remove support for deprecated ep00 schema
66
* Adopt pip-tools to manage requirements.txt files
7+
* Refactor statistics reporter to support dynamically added metrics
8+
* Add latency and per-PV graphite metrics
79

810
## v2.1.0
911

forwarder/handle_config_change.py

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from forwarder.common import Channel, CommandType, ConfigUpdate
99
from forwarder.configuration_store import ConfigurationStore, NullConfigurationStore
1010
from forwarder.kafka.kafka_producer import KafkaProducer
11+
from forwarder.metrics import Gauge
1112
from forwarder.status_reporter import StatusReporter
1213
from forwarder.update_handlers.create_update_handler import (
1314
UpdateHandler,
@@ -24,6 +25,7 @@ def _subscribe_to_pv(
2425
logger: Logger,
2526
fake_pv_period: int,
2627
pv_update_period: Optional[int],
28+
pvs_subscribed_metric: Optional[Gauge] = None,
2729
):
2830
if new_channel in update_handlers.keys():
2931
logger.warning(
@@ -46,12 +48,15 @@ def _subscribe_to_pv(
4648
logger.info(
4749
f"Subscribed to PV name='{new_channel.name}', schema='{new_channel.schema}', topic='{new_channel.output_topic}'"
4850
)
51+
if pvs_subscribed_metric:
52+
pvs_subscribed_metric.inc()
4953

5054

5155
def _unsubscribe_from_pv(
5256
remove_channel: Channel,
5357
update_handlers: Dict[Channel, UpdateHandler],
5458
logger: Logger,
59+
pvs_subscribed_metric: Optional[Gauge] = None,
5560
):
5661
def _match_channel_field(
5762
field_in_remove_request: Optional[str], field_in_existing_channel: Optional[str]
@@ -88,17 +93,23 @@ def _wildcard_match_channel_field(
8893
for channel in channels_to_remove:
8994
update_handlers[channel].stop()
9095
del update_handlers[channel]
96+
if pvs_subscribed_metric:
97+
pvs_subscribed_metric.dec()
9198

9299
logger.info(
93100
f"Unsubscribed from PVs matching name='{remove_channel.name}', schema='{remove_channel.schema}', topic='{remove_channel.output_topic}'"
94101
)
95102

96103

97104
def _unsubscribe_from_all(
98-
update_handlers: Dict[Channel, UpdateHandler], logger: Logger
105+
update_handlers: Dict[Channel, UpdateHandler],
106+
logger: Logger,
107+
pvs_subscribed_metric: Optional[Gauge] = None,
99108
):
100109
for update_handler in update_handlers.values():
101110
update_handler.stop()
111+
if pvs_subscribed_metric:
112+
pvs_subscribed_metric.dec()
102113
update_handlers.clear()
103114
logger.info("Unsubscribed from all PVs")
104115

@@ -114,12 +125,15 @@ def handle_configuration_change(
114125
logger: Logger,
115126
status_reporter: StatusReporter,
116127
configuration_store: ConfigurationStore = NullConfigurationStore,
128+
pvs_subscribed_metric: Optional[Gauge] = None,
117129
):
118130
"""
119131
Add or remove update handlers according to the requested change in configuration
120132
"""
121133
if configuration_change.command_type == CommandType.REMOVE_ALL:
122-
_unsubscribe_from_all(update_handlers, logger)
134+
_unsubscribe_from_all(
135+
update_handlers, logger, pvs_subscribed_metric=pvs_subscribed_metric
136+
)
123137
elif configuration_change.command_type == CommandType.INVALID:
124138
return
125139
else:
@@ -135,8 +149,14 @@ def handle_configuration_change(
135149
logger,
136150
fake_pv_period,
137151
pv_update_period,
152+
pvs_subscribed_metric=pvs_subscribed_metric,
138153
)
139154
elif configuration_change.command_type == CommandType.REMOVE:
140-
_unsubscribe_from_pv(channel, update_handlers, logger)
155+
_unsubscribe_from_pv(
156+
channel,
157+
update_handlers,
158+
logger,
159+
pvs_subscribed_metric=pvs_subscribed_metric,
160+
)
141161
status_reporter.report_status()
142162
configuration_store.save_configuration(update_handlers)

forwarder/kafka/kafka_helpers.py

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
from confluent_kafka import Consumer, Producer
55
from streaming_data_types.epics_connection_ep01 import ConnectionInfo, serialise_ep01
66

7-
from forwarder.utils import Counter
7+
from forwarder.metrics import Counter
8+
from forwarder.metrics.statistics_reporter import StatisticsReporter
89

910
from .kafka_producer import KafkaProducer
1011

@@ -59,9 +60,7 @@ def create_producer(
5960
username: Optional[str] = None,
6061
password: Optional[str] = None,
6162
ssl_ca_file: Optional[str] = None,
62-
counter: Optional[Counter] = None,
63-
buffer_err_counter: Optional[Counter] = None,
64-
delivery_err_counter: Optional[Counter] = None,
63+
statistics_reporter: Optional[StatisticsReporter] = None,
6564
) -> KafkaProducer:
6665
producer_config = {
6766
"bootstrap.servers": broker_address,
@@ -74,12 +73,31 @@ def create_producer(
7473
if ssl_ca_file:
7574
producer_config["ssl.ca.location"] = ssl_ca_file
7675
producer = Producer(producer_config)
77-
return KafkaProducer(
78-
producer,
79-
update_msg_counter=counter,
80-
update_buffer_err_counter=buffer_err_counter,
81-
update_delivery_err_counter=delivery_err_counter,
82-
)
76+
if not statistics_reporter:
77+
return KafkaProducer(producer)
78+
else:
79+
update_message_counter = Counter(
80+
"successful_sends_total", "Total number of updates sent to kafka"
81+
)
82+
buffer_err_counter = Counter(
83+
"send_buffer_errors_total", "Kafka producer queue errors"
84+
)
85+
delivery_err_counter = Counter(
86+
"send_delivery_errors_total", "Kafka delivery errors"
87+
)
88+
statistics_reporter.register_metric(
89+
update_message_counter.name, update_message_counter
90+
)
91+
statistics_reporter.register_metric(buffer_err_counter.name, buffer_err_counter)
92+
statistics_reporter.register_metric(
93+
delivery_err_counter.name, delivery_err_counter
94+
)
95+
return KafkaProducer(
96+
producer,
97+
update_msg_counter=update_message_counter,
98+
update_buffer_err_counter=buffer_err_counter,
99+
update_delivery_err_counter=delivery_err_counter,
100+
)
83101

84102

85103
def create_consumer(

forwarder/kafka/kafka_producer.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import confluent_kafka
55

66
from forwarder.application_logger import get_logger
7-
from forwarder.utils import Counter
7+
from forwarder.metrics import Counter
88

99

1010
class KafkaProducer:
@@ -44,12 +44,12 @@ def ack(err, _):
4444
if err:
4545
self.logger.error(f"Message failed delivery: {err}")
4646
if self._update_delivery_err_counter:
47-
self._update_delivery_err_counter.increment()
47+
self._update_delivery_err_counter.inc()
4848
else:
4949
# increment only for PVs related updates
5050
# key is None when we send commands.
5151
if self._update_msg_counter and key is not None:
52-
self._update_msg_counter.increment()
52+
self._update_msg_counter.inc()
5353

5454
try:
5555
self._producer.produce(
@@ -59,5 +59,5 @@ def ack(err, _):
5959
# Producer message buffer is full.
6060
# Data loss occurred as messages are produced faster than are sent to the kafka broker.
6161
if self._update_buffer_err_counter:
62-
self._update_buffer_err_counter.increment()
62+
self._update_buffer_err_counter.inc()
6363
self._producer.poll(0)

forwarder/metrics/__init__.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
"""
2+
Classes to model monitoring metrics based on the Prometheus client library.
3+
4+
Since we push these metrics to Graphite instead of relying on Prometheus
5+
standard mechanisms, we provide adapter classes to prevent coupling other
6+
modules with certain Prometheus internals.
7+
"""
8+
9+
from prometheus_client import Counter as PrometheusCounter
10+
from prometheus_client import Gauge as PrometheusGauge
11+
from prometheus_client import Summary as PrometheusSummary
12+
13+
14+
class Gauge(PrometheusGauge):
15+
@property
16+
def name(self):
17+
return self._name
18+
19+
@property
20+
def value(self):
21+
return self._value.get()
22+
23+
24+
class Counter(PrometheusCounter):
25+
@property
26+
def name(self):
27+
return self._name
28+
29+
@property
30+
def value(self):
31+
return self._value.get()
32+
33+
34+
class Summary(PrometheusSummary):
35+
@property
36+
def name(self):
37+
return self._name
38+
39+
@property
40+
def count(self):
41+
return self._count.get()
42+
43+
@property
44+
def sum(self):
45+
return self._sum.get()
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import time
2+
from logging import Logger
3+
from typing import Dict, Union, get_args
4+
5+
import graphyte # type: ignore
6+
7+
from forwarder.metrics import Counter, Gauge, Summary
8+
from forwarder.repeat_timer import RepeatTimer
9+
10+
MetricType = Union[Counter, Summary, Gauge]
11+
12+
13+
class StatisticsReporter:
14+
def __init__(
15+
self,
16+
graphyte_server: str,
17+
logger: Logger,
18+
prefix: str = "forwarder",
19+
update_interval_s: int = 10,
20+
):
21+
self._graphyte_server = graphyte_server
22+
self._logger = logger
23+
self._sender = graphyte.Sender(self._graphyte_server, prefix=prefix)
24+
self._repeating_timer = RepeatTimer(update_interval_s, self.send_statistics)
25+
self._metrics: Dict[str, MetricType] = {}
26+
27+
def register_metric(self, name: str, metric: MetricType):
28+
if not isinstance(metric, get_args(MetricType)):
29+
raise TypeError(f"Unsupported metric type: {type(metric).__name__}")
30+
self._metrics[name] = metric
31+
32+
def deregister_metric(self, name: str):
33+
if name in self._metrics:
34+
del self._metrics[name]
35+
36+
def start(self):
37+
self._repeating_timer.start()
38+
39+
def send_statistics(self):
40+
try:
41+
for metric_name, metric in self._metrics.items():
42+
timestamp = time.time()
43+
if isinstance(metric, (Counter, Gauge)):
44+
self._sender.send(metric_name, metric.value, timestamp)
45+
elif isinstance(metric, Summary):
46+
self._sender.send(f"{metric_name}_sum", metric.sum, timestamp)
47+
self._sender.send(f"{metric_name}_count", metric.count, timestamp)
48+
except Exception as ex:
49+
self._logger.error(f"Could not send statistics: {ex}")
50+
51+
def stop(self):
52+
if self._repeating_timer:
53+
self._repeating_timer.cancel()

0 commit comments

Comments
 (0)