Skip to content

Commit 601d1d6

Browse files
chrismeyersfsuAlanCoding
authored andcommitted
Add metrics
* poc
1 parent 57c78cd commit 601d1d6

File tree

2 files changed

+259
-0
lines changed

2 files changed

+259
-0
lines changed

README.md

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,3 +232,147 @@ For more information about getting in touch, see the
232232
## Credits
233233

234234
Dispatcherd is sponsored by [Red Hat, Inc](https://www.redhat.com).
235+
236+
## Metrics
237+
238+
Start the dispatcher.
239+
240+
```
241+
pip install prometheus_client
242+
243+
$ curl http://localhost:8070
244+
245+
curl http://localhost:8070
246+
# HELP dispatcher_messages_received_total Number of messages received by dispatchermain
247+
# TYPE dispatcher_messages_received_total counter
248+
dispatcher_messages_received_total 88.0
249+
# HELP dispatcher_control_messages_count_total Number of control messages received.
250+
# TYPE dispatcher_control_messages_count_total counter
251+
dispatcher_control_messages_count_total 10.0
252+
# HELP dispatcher_worker_created_at Creation time of worker
253+
# TYPE dispatcher_worker_created_at gauge
254+
dispatcher_worker_created_at{worker_index="0"} 286576.365272104
255+
dispatcher_worker_created_at{worker_index="1"} 286576.365368035
256+
dispatcher_worker_created_at{worker_index="2"} 286578.814706941
257+
dispatcher_worker_created_at{worker_index="3"} 286578.817639637
258+
dispatcher_worker_created_at{worker_index="4"} 286578.820265243
259+
dispatcher_worker_created_at{worker_index="5"} 286578.822075874
260+
dispatcher_worker_created_at{worker_index="6"} 286578.824725725
261+
dispatcher_worker_created_at{worker_index="7"} 286578.837810563
262+
dispatcher_worker_created_at{worker_index="8"} 286578.844329095
263+
dispatcher_worker_created_at{worker_index="9"} 286578.863277972
264+
dispatcher_worker_created_at{worker_index="10"} 286578.878555905
265+
dispatcher_worker_created_at{worker_index="11"} 286579.36656921
266+
# HELP dispatcher_worker_finished_count Finished count of tasks by the worker
267+
# TYPE dispatcher_worker_finished_count gauge
268+
dispatcher_worker_finished_count{worker_index="0"} 2.0
269+
dispatcher_worker_finished_count{worker_index="1"} 1.0
270+
dispatcher_worker_finished_count{worker_index="2"} 1.0
271+
dispatcher_worker_finished_count{worker_index="3"} 7.0
272+
dispatcher_worker_finished_count{worker_index="4"} 8.0
273+
dispatcher_worker_finished_count{worker_index="5"} 1.0
274+
dispatcher_worker_finished_count{worker_index="6"} 0.0
275+
dispatcher_worker_finished_count{worker_index="7"} 0.0
276+
dispatcher_worker_finished_count{worker_index="8"} 0.0
277+
dispatcher_worker_finished_count{worker_index="9"} 2.0
278+
dispatcher_worker_finished_count{worker_index="10"} 1.0
279+
dispatcher_worker_finished_count{worker_index="11"} 1.0
280+
# HELP dispatcher_worker_status Status of worker.
281+
# TYPE dispatcher_worker_status gauge
282+
dispatcher_worker_status{dispatcher_worker_status="error",worker_index="0"} 0.0
283+
dispatcher_worker_status{dispatcher_worker_status="exited",worker_index="0"} 0.0
284+
dispatcher_worker_status{dispatcher_worker_status="initialized",worker_index="0"} 0.0
285+
dispatcher_worker_status{dispatcher_worker_status="ready",worker_index="0"} 1.0
286+
dispatcher_worker_status{dispatcher_worker_status="retired",worker_index="0"} 0.0
287+
dispatcher_worker_status{dispatcher_worker_status="spawned",worker_index="0"} 0.0
288+
dispatcher_worker_status{dispatcher_worker_status="starting",worker_index="0"} 0.0
289+
dispatcher_worker_status{dispatcher_worker_status="stopping",worker_index="0"} 0.0
290+
dispatcher_worker_status{dispatcher_worker_status="error",worker_index="1"} 0.0
291+
dispatcher_worker_status{dispatcher_worker_status="exited",worker_index="1"} 0.0
292+
dispatcher_worker_status{dispatcher_worker_status="initialized",worker_index="1"} 0.0
293+
dispatcher_worker_status{dispatcher_worker_status="ready",worker_index="1"} 1.0
294+
dispatcher_worker_status{dispatcher_worker_status="retired",worker_index="1"} 0.0
295+
dispatcher_worker_status{dispatcher_worker_status="spawned",worker_index="1"} 0.0
296+
dispatcher_worker_status{dispatcher_worker_status="starting",worker_index="1"} 0.0
297+
dispatcher_worker_status{dispatcher_worker_status="stopping",worker_index="1"} 0.0
298+
dispatcher_worker_status{dispatcher_worker_status="error",worker_index="2"} 0.0
299+
dispatcher_worker_status{dispatcher_worker_status="exited",worker_index="2"} 0.0
300+
dispatcher_worker_status{dispatcher_worker_status="initialized",worker_index="2"} 0.0
301+
dispatcher_worker_status{dispatcher_worker_status="ready",worker_index="2"} 1.0
302+
dispatcher_worker_status{dispatcher_worker_status="retired",worker_index="2"} 0.0
303+
dispatcher_worker_status{dispatcher_worker_status="spawned",worker_index="2"} 0.0
304+
dispatcher_worker_status{dispatcher_worker_status="starting",worker_index="2"} 0.0
305+
dispatcher_worker_status{dispatcher_worker_status="stopping",worker_index="2"} 0.0
306+
dispatcher_worker_status{dispatcher_worker_status="error",worker_index="3"} 0.0
307+
dispatcher_worker_status{dispatcher_worker_status="exited",worker_index="3"} 0.0
308+
dispatcher_worker_status{dispatcher_worker_status="initialized",worker_index="3"} 0.0
309+
dispatcher_worker_status{dispatcher_worker_status="ready",worker_index="3"} 1.0
310+
dispatcher_worker_status{dispatcher_worker_status="retired",worker_index="3"} 0.0
311+
dispatcher_worker_status{dispatcher_worker_status="spawned",worker_index="3"} 0.0
312+
dispatcher_worker_status{dispatcher_worker_status="starting",worker_index="3"} 0.0
313+
dispatcher_worker_status{dispatcher_worker_status="stopping",worker_index="3"} 0.0
314+
dispatcher_worker_status{dispatcher_worker_status="error",worker_index="4"} 0.0
315+
dispatcher_worker_status{dispatcher_worker_status="exited",worker_index="4"} 0.0
316+
dispatcher_worker_status{dispatcher_worker_status="initialized",worker_index="4"} 0.0
317+
dispatcher_worker_status{dispatcher_worker_status="ready",worker_index="4"} 1.0
318+
dispatcher_worker_status{dispatcher_worker_status="retired",worker_index="4"} 0.0
319+
dispatcher_worker_status{dispatcher_worker_status="spawned",worker_index="4"} 0.0
320+
dispatcher_worker_status{dispatcher_worker_status="starting",worker_index="4"} 0.0
321+
dispatcher_worker_status{dispatcher_worker_status="stopping",worker_index="4"} 0.0
322+
dispatcher_worker_status{dispatcher_worker_status="error",worker_index="5"} 0.0
323+
dispatcher_worker_status{dispatcher_worker_status="exited",worker_index="5"} 0.0
324+
dispatcher_worker_status{dispatcher_worker_status="initialized",worker_index="5"} 0.0
325+
dispatcher_worker_status{dispatcher_worker_status="ready",worker_index="5"} 1.0
326+
dispatcher_worker_status{dispatcher_worker_status="retired",worker_index="5"} 0.0
327+
dispatcher_worker_status{dispatcher_worker_status="spawned",worker_index="5"} 0.0
328+
dispatcher_worker_status{dispatcher_worker_status="starting",worker_index="5"} 0.0
329+
dispatcher_worker_status{dispatcher_worker_status="stopping",worker_index="5"} 0.0
330+
dispatcher_worker_status{dispatcher_worker_status="error",worker_index="6"} 0.0
331+
dispatcher_worker_status{dispatcher_worker_status="exited",worker_index="6"} 0.0
332+
dispatcher_worker_status{dispatcher_worker_status="initialized",worker_index="6"} 0.0
333+
dispatcher_worker_status{dispatcher_worker_status="ready",worker_index="6"} 1.0
334+
dispatcher_worker_status{dispatcher_worker_status="retired",worker_index="6"} 0.0
335+
dispatcher_worker_status{dispatcher_worker_status="spawned",worker_index="6"} 0.0
336+
dispatcher_worker_status{dispatcher_worker_status="starting",worker_index="6"} 0.0
337+
dispatcher_worker_status{dispatcher_worker_status="stopping",worker_index="6"} 0.0
338+
dispatcher_worker_status{dispatcher_worker_status="error",worker_index="7"} 0.0
339+
dispatcher_worker_status{dispatcher_worker_status="exited",worker_index="7"} 0.0
340+
dispatcher_worker_status{dispatcher_worker_status="initialized",worker_index="7"} 0.0
341+
dispatcher_worker_status{dispatcher_worker_status="ready",worker_index="7"} 1.0
342+
dispatcher_worker_status{dispatcher_worker_status="retired",worker_index="7"} 0.0
343+
dispatcher_worker_status{dispatcher_worker_status="spawned",worker_index="7"} 0.0
344+
dispatcher_worker_status{dispatcher_worker_status="starting",worker_index="7"} 0.0
345+
dispatcher_worker_status{dispatcher_worker_status="stopping",worker_index="7"} 0.0
346+
dispatcher_worker_status{dispatcher_worker_status="error",worker_index="8"} 0.0
347+
dispatcher_worker_status{dispatcher_worker_status="exited",worker_index="8"} 0.0
348+
dispatcher_worker_status{dispatcher_worker_status="initialized",worker_index="8"} 0.0
349+
dispatcher_worker_status{dispatcher_worker_status="ready",worker_index="8"} 1.0
350+
dispatcher_worker_status{dispatcher_worker_status="retired",worker_index="8"} 0.0
351+
dispatcher_worker_status{dispatcher_worker_status="spawned",worker_index="8"} 0.0
352+
dispatcher_worker_status{dispatcher_worker_status="starting",worker_index="8"} 0.0
353+
dispatcher_worker_status{dispatcher_worker_status="stopping",worker_index="8"} 0.0
354+
dispatcher_worker_status{dispatcher_worker_status="error",worker_index="9"} 0.0
355+
dispatcher_worker_status{dispatcher_worker_status="exited",worker_index="9"} 0.0
356+
dispatcher_worker_status{dispatcher_worker_status="initialized",worker_index="9"} 0.0
357+
dispatcher_worker_status{dispatcher_worker_status="ready",worker_index="9"} 1.0
358+
dispatcher_worker_status{dispatcher_worker_status="retired",worker_index="9"} 0.0
359+
dispatcher_worker_status{dispatcher_worker_status="spawned",worker_index="9"} 0.0
360+
dispatcher_worker_status{dispatcher_worker_status="starting",worker_index="9"} 0.0
361+
dispatcher_worker_status{dispatcher_worker_status="stopping",worker_index="9"} 0.0
362+
dispatcher_worker_status{dispatcher_worker_status="error",worker_index="10"} 0.0
363+
dispatcher_worker_status{dispatcher_worker_status="exited",worker_index="10"} 0.0
364+
dispatcher_worker_status{dispatcher_worker_status="initialized",worker_index="10"} 0.0
365+
dispatcher_worker_status{dispatcher_worker_status="ready",worker_index="10"} 1.0
366+
dispatcher_worker_status{dispatcher_worker_status="retired",worker_index="10"} 0.0
367+
dispatcher_worker_status{dispatcher_worker_status="spawned",worker_index="10"} 0.0
368+
dispatcher_worker_status{dispatcher_worker_status="starting",worker_index="10"} 0.0
369+
dispatcher_worker_status{dispatcher_worker_status="stopping",worker_index="10"} 0.0
370+
dispatcher_worker_status{dispatcher_worker_status="error",worker_index="11"} 0.0
371+
dispatcher_worker_status{dispatcher_worker_status="exited",worker_index="11"} 0.0
372+
dispatcher_worker_status{dispatcher_worker_status="initialized",worker_index="11"} 0.0
373+
dispatcher_worker_status{dispatcher_worker_status="ready",worker_index="11"} 1.0
374+
dispatcher_worker_status{dispatcher_worker_status="retired",worker_index="11"} 0.0
375+
dispatcher_worker_status{dispatcher_worker_status="spawned",worker_index="11"} 0.0
376+
dispatcher_worker_status{dispatcher_worker_status="starting",worker_index="11"} 0.0
377+
dispatcher_worker_status{dispatcher_worker_status="stopping",worker_index="11"} 0.0
378+
```

dispatcherd/service/main.py

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,17 @@
11
import asyncio
22
import json
33
import logging
4+
import prometheus_client
5+
from prometheus_client import (
6+
generate_latest,
7+
Gauge,
8+
Counter,
9+
Enum,
10+
CollectorRegistry,
11+
parser,
12+
)
13+
from prometheus_client.core import GaugeMetricFamily, CounterMetricFamily, REGISTRY, StateSetMetricFamily
14+
from prometheus_client.registry import Collector
415
import signal
516
import time
617
from os import getpid
@@ -17,6 +28,53 @@
1728
logger = logging.getLogger(__name__)
1829

1930

31+
class MetricsNamespace:
32+
def __init__(self, namespace):
33+
self._namespace = namespace
34+
35+
36+
class MetricsServerSettings(MetricsNamespace):
37+
def port(self):
38+
return 8070
39+
40+
41+
class MetricsServer(MetricsServerSettings):
42+
def __init__(self, namespace, registry):
43+
MetricsNamespace.__init__(self, namespace)
44+
self._registry = registry
45+
46+
self._server = None
47+
self._tid = None
48+
49+
def start(self):
50+
try:
51+
# TODO: addr for ipv6 ?
52+
self._server, self._tid = prometheus_client.start_http_server(self.port(), addr='localhost', registry=self._registry)
53+
except Exception:
54+
logger.error(f"MetricsServer failed to start for service '{self._namespace}.")
55+
raise
56+
57+
def stop(self):
58+
self._server.shutdown()
59+
self._tid.join()
60+
61+
62+
class CustomCollector(Collector):
63+
def __init__(self, do_metrics):
64+
self._do_metrics = do_metrics
65+
66+
def collect(self):
67+
for m in self._do_metrics():
68+
yield m
69+
70+
71+
class DispatcherMetricsServer(MetricsServer):
72+
def __init__(self, do_metrics):
73+
registry = CollectorRegistry(auto_describe=True)
74+
registry.register(CustomCollector(do_metrics))
75+
super().__init__('dispatcherd', registry)
76+
77+
2078
class DispatcherEvents:
2179
"Benchmark tests have to re-create this because they use same object in different event loops"
2280

@@ -61,6 +119,8 @@ def __init__(self, producers: Iterable[Producer], pool: WorkerPool, node_id: Opt
61119
self.node_id = str(uuid4())
62120

63121
self.events: DispatcherEvents = DispatcherEvents()
122+
123+
self._metrics_server = DispatcherMetricsServer(self.do_metrics)
64124

65125
self.delayed_runner = NextWakeupRunner(self.delayed_messages, self.process_delayed_task, name='delayed_task_runner')
66126
self.delayed_runner.exit_event = self.events.exit_event
@@ -268,6 +328,8 @@ async def main(self) -> None:
268328
current_task.set_name('dispatcherd_service_main')
269329

270330
await self.connect_signals()
331+
332+
self.start_metrics()
271333

272334
try:
273335
await self.start_working()
@@ -286,5 +348,58 @@ async def main(self) -> None:
286348
await self.shutdown()
287349

288350
await self.cancel_tasks()
351+
352+
self.stop_metrics()
289353

290354
logger.debug('Dispatcherd loop fully completed')
355+
356+
def start_metrics(self) -> None:
357+
self._metrics_server.start()
358+
359+
def stop_metrics(self) -> None:
360+
self._metrics_server.stop()
361+
362+
def do_metrics(self) -> None:
363+
yield CounterMetricFamily(
364+
f'dispatcher_messages_received_total',
365+
'Number of messages received by dispatchermain',
366+
value=self.received_count,
367+
)
368+
yield CounterMetricFamily(
369+
f'dispatcher_control_messages_count',
370+
'Number of control messages received.',
371+
value=self.control_count,
372+
)
373+
created_at = GaugeMetricFamily(
374+
f'dispatcher_worker_created_at',
375+
'Creation time of worker',
376+
labels=['worker_index'],
377+
)
378+
finished_count = GaugeMetricFamily(
379+
f'dispatcher_worker_finished_count',
380+
'Finished count of tasks by the worker',
381+
labels=['worker_index'],
382+
)
383+
worker_status = StateSetMetricFamily(
384+
f'dispatcher_worker_status',
385+
'Status of worker.',
386+
labels=['worker_index']
387+
)
388+
389+
390+
for worker_index, worker in self.pool.workers.items():
391+
created_at.add_metric([f'{worker_index}'], worker.created_at)
392+
finished_count.add_metric([f'{worker_index}'], worker.finished_count)
393+
worker_status.add_metric([f'{worker_index}'], {
394+
'initialized': worker.status == 'initialized',
395+
'spawned': worker.status == 'spawned',
396+
'starting': worker.status == 'starting',
397+
'ready': worker.status == 'ready',
398+
'stopping': worker.status == 'stopping',
399+
'exited': worker.status == 'exited',
400+
'error': worker.status == 'error',
401+
'retired': worker.status == 'retired'
402+
})
403+
yield created_at
404+
yield finished_count
405+
yield worker_status

0 commit comments

Comments
 (0)