Skip to content

Commit df9a5fa

Browse files
eliasoledomora97
andauthored
Prometheus Exporter (#1303)
Add a new command: cmsPrometheusExporter This will start a new HTTP server that listens on a port (defaults to 8811) for requests to /metrics. At this endpoint many metrics about a contest (or all contests) are exposed for Prometheus to be collected. Those metrics consider both system values (queue status, connected workers, ...) and contest values (submissions, users, questions, ...) This exporter works by executing simple queries to the database, and obtains the queue information asking Evaluation Service. The metrics exposed may leak information about the contest, so it's important to secure this endpoint (for example using a reverse proxy, or binding localhost). With this exporter it's possible to build interactive and real-time dashboard for monitoring the status of the contest. --------- Co-authored-by: Edoardo Morassutto <[email protected]>
1 parent 6aa76fc commit df9a5fa

File tree

4 files changed

+363
-1
lines changed

4 files changed

+363
-1
lines changed

cmscontrib/PrometheusExporter.py

Lines changed: 350 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,350 @@
1+
#!/usr/bin/env python3
2+
3+
# Contest Management System - http://cms-dev.github.io/
4+
# Copyright © 2020 Edoardo Morassutto <[email protected]>
5+
#
6+
# This program is free software: you can redistribute it and/or modify
7+
# it under the terms of the GNU Affero General Public License as
8+
# published by the Free Software Foundation, either version 3 of the
9+
# License, or (at your option) any later version.
10+
#
11+
# This program is distributed in the hope that it will be useful,
12+
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14+
# GNU Affero General Public License for more details.
15+
#
16+
# You should have received a copy of the GNU Affero General Public License
17+
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18+
19+
# We enable monkey patching to make many libraries gevent-friendly
20+
# (for instance, urllib3, used by requests)
21+
import gevent.monkey
22+
23+
from cms.db.contest import Contest
24+
25+
gevent.monkey.patch_all() # noqa
26+
27+
import argparse
28+
import logging
29+
30+
from prometheus_client import start_http_server
31+
from prometheus_client.core import REGISTRY, CounterMetricFamily, GaugeMetricFamily
32+
from sqlalchemy import func, distinct
33+
34+
from cms import ServiceCoord
35+
from cms import config
36+
from cms.db import (
37+
Announcement,
38+
Dataset,
39+
Message,
40+
Participation,
41+
Question,
42+
SessionGen,
43+
Submission,
44+
SubmissionResult,
45+
Task,
46+
)
47+
from cms.io.service import Service
48+
from cms.io.rpc import RPCError
49+
from cms.server.admin.server import AdminWebServer
50+
51+
logger = logging.getLogger(__name__)
52+
53+
class PrometheusExporter(Service):
54+
def __init__(self, args):
55+
super().__init__()
56+
57+
self.host = args.host
58+
self.port = args.port
59+
60+
self.export_submissions = not args.no_submissions
61+
self.export_workers = not args.no_workers
62+
self.export_queue = not args.no_queue
63+
self.export_communiactions = not args.no_communications
64+
self.export_users = not args.no_users
65+
self.evaluation_service = None
66+
67+
def run(self):
68+
REGISTRY.register(self)
69+
start_http_server(self.port, addr=self.host)
70+
logger.info("Started at http://%s:%s/metric", self.host, self.port)
71+
super().run()
72+
73+
def collect(self):
74+
with SessionGen() as session:
75+
if self.export_submissions:
76+
yield from self._collect_submissions(session)
77+
if self.export_communiactions:
78+
yield from self._collect_communications(session)
79+
if self.export_users:
80+
yield from self._collect_users(session)
81+
82+
if self.evaluation_service is None:
83+
try:
84+
self.evaluation_service = self.connect_to(ServiceCoord("EvaluationService", 0))
85+
except RPCError:
86+
pass
87+
if self.evaluation_service is not None:
88+
try:
89+
if self.export_workers:
90+
yield from self._collect_workers()
91+
if self.export_queue:
92+
yield from self._collect_queue()
93+
except RPCError:
94+
self.evaluation_service = None
95+
96+
metric = GaugeMetricFamily("cms_es_is_up", "Whether the Evaluation Service is currently up")
97+
metric.add_metric([], 1 if self.evaluation_service is not None else 0)
98+
yield metric
99+
100+
def _collect_submissions(self, session):
101+
# compiling / max_compilations / compilation_fail / evaluating /
102+
# max_evaluations / scoring / scored / total
103+
stats = AdminWebServer.submissions_status(None)
104+
metric = GaugeMetricFamily(
105+
"cms_submissions",
106+
"Number of submissions per category",
107+
labels=["status"],
108+
)
109+
for status, count in stats.items():
110+
metric.add_metric([status], count)
111+
yield metric
112+
113+
metric = CounterMetricFamily(
114+
"cms_task_submissions",
115+
"Number of submissions per task",
116+
labels=["task"],
117+
)
118+
data = (
119+
session.query(Task.name, func.count(SubmissionResult.submission_id))
120+
.select_from(SubmissionResult)
121+
.join(Dataset)
122+
.join(Task, Dataset.task_id == Task.id)
123+
.filter(Task.active_dataset_id == SubmissionResult.dataset_id)
124+
.group_by(Task.name)
125+
.all()
126+
)
127+
for task_name, count in data:
128+
metric.add_metric([task_name], count)
129+
yield metric
130+
131+
metric = CounterMetricFamily(
132+
"cms_submissions_language",
133+
"Number of submissions per language",
134+
labels=["language"],
135+
)
136+
data = (
137+
session.query(Submission.language, func.count(Submission.id))
138+
.select_from(Submission)
139+
.group_by(Submission.language)
140+
.all()
141+
)
142+
for language, count in data:
143+
metric.add_metric([language], count)
144+
yield metric
145+
146+
def _collect_workers(self):
147+
status = self.evaluation_service.workers_status().get()
148+
metric = GaugeMetricFamily(
149+
"cms_workers",
150+
"Number of cmsWorker instances",
151+
labels=["status"],
152+
)
153+
metric.add_metric(["total"], len(status))
154+
metric.add_metric(
155+
["connected"], sum(1 for worker in status.values() if worker["connected"])
156+
)
157+
metric.add_metric(
158+
["working"], sum(1 for worker in status.values() if worker["operations"])
159+
)
160+
yield metric
161+
162+
def _collect_queue(self):
163+
status = self.evaluation_service.queue_status().get()
164+
165+
metric = GaugeMetricFamily("cms_queue_length", "Number of entries in the queue")
166+
metric.add_metric([], len(status))
167+
yield metric
168+
169+
metric = GaugeMetricFamily(
170+
"cms_queue_item_types",
171+
"Types of items in the queue",
172+
labels=["type"],
173+
)
174+
types = {}
175+
for item in status:
176+
typ = item["item"]["type"]
177+
types.setdefault(typ, 0)
178+
types[typ] += 1
179+
for typ, count in types.items():
180+
metric.add_metric([typ], count)
181+
yield metric
182+
183+
metric = GaugeMetricFamily(
184+
"cms_queue_oldest_job",
185+
"Timestamp of the oldest job in the queue",
186+
)
187+
if status:
188+
oldest = min(status, key=lambda x: x["timestamp"])
189+
metric.add_metric([], oldest["timestamp"])
190+
yield metric
191+
192+
def _collect_communications(self, session):
193+
metric = CounterMetricFamily(
194+
"cms_questions",
195+
"Number of questions",
196+
labels=["status"],
197+
)
198+
data = session.query(func.count(Question.id)).select_from(Question).all()
199+
metric.add_metric(["total"], data[0][0])
200+
data = (
201+
session.query(func.count(Question.id))
202+
.select_from(Question)
203+
.filter(Question.ignored == True)
204+
.all()
205+
)
206+
metric.add_metric(["ignored"], data[0][0])
207+
data = (
208+
session.query(func.count(Question.id))
209+
.select_from(Question)
210+
.filter(Question.reply_timestamp != None)
211+
.all()
212+
)
213+
metric.add_metric(["answered"], data[0][0])
214+
yield metric
215+
216+
metric = CounterMetricFamily("cms_messages", "Number of private messages")
217+
data = session.query(func.count(Message.id)).select_from(Message).all()
218+
metric.add_metric([], data[0][0])
219+
yield metric
220+
221+
metric = CounterMetricFamily("cms_announcements", "Number of announcements")
222+
data = (
223+
session.query(func.count(Announcement.id)).select_from(Announcement).all()
224+
)
225+
metric.add_metric([], data[0][0])
226+
yield metric
227+
228+
def _collect_users(self, session):
229+
metric = GaugeMetricFamily(
230+
"cms_participations",
231+
"Number of participations grouped by category and contest",
232+
labels=["category", "contest"],
233+
)
234+
data = (
235+
session.query(Participation.contest_id, func.count(Participation.id))
236+
.select_from(Participation)
237+
.group_by(Participation.contest_id)
238+
.all()
239+
)
240+
for contest_id, count in data:
241+
metric.add_metric(["total", str(contest_id)], count)
242+
data = (
243+
session.query(Participation.contest_id, func.count(Participation.id))
244+
.select_from(Participation)
245+
.filter(Participation.hidden == True)
246+
.group_by(Participation.contest_id)
247+
.all()
248+
)
249+
for contest_id, count in data:
250+
metric.add_metric(["hidden", str(contest_id)], count)
251+
data = (
252+
session.query(Participation.contest_id, func.count(Participation.id))
253+
.select_from(Participation)
254+
.filter(Participation.unrestricted == True)
255+
.group_by(Participation.contest_id)
256+
.all()
257+
)
258+
for contest_id, count in data:
259+
metric.add_metric(["unrestricted", str(contest_id)], count)
260+
data = (
261+
session.query(Participation.contest_id, func.count(Participation.id))
262+
.select_from(Participation)
263+
.filter(Participation.starting_time != None)
264+
.group_by(Participation.contest_id)
265+
.all()
266+
)
267+
for contest_id, count in data:
268+
metric.add_metric(["started", str(contest_id)], count)
269+
data = (
270+
session.query(
271+
Participation.contest_id, func.count(distinct(Participation.id))
272+
)
273+
.select_from(Participation)
274+
.join(Submission)
275+
.group_by(Participation.contest_id)
276+
.all()
277+
)
278+
for contest_id, count in data:
279+
metric.add_metric(["submitted", str(contest_id)], count)
280+
data = (
281+
session.query(
282+
Participation.contest_id, func.count(distinct(Participation.id))
283+
)
284+
.select_from(Participation)
285+
.join(Submission)
286+
.join(SubmissionResult)
287+
.join(Dataset)
288+
.join(Task, Dataset.task_id == Task.id)
289+
.filter(Task.active_dataset_id == SubmissionResult.dataset_id)
290+
.filter(SubmissionResult.score > 0)
291+
.group_by(Participation.contest_id)
292+
.all()
293+
)
294+
for contest_id, count in data:
295+
metric.add_metric(["non_zero", str(contest_id)], count)
296+
yield metric
297+
298+
299+
def main():
300+
"""Parse arguments and launch process."""
301+
parser = argparse.ArgumentParser(description="Prometheus exporter.")
302+
parser.add_argument(
303+
"--host",
304+
help="IP address to bind to",
305+
default=config.prometheus_listen_address,
306+
)
307+
parser.add_argument(
308+
"--port",
309+
help="Port to use",
310+
default=config.prometheus_listen_port,
311+
type=int,
312+
)
313+
parser.add_argument(
314+
"--no-submissions",
315+
help="Do not export submissions metrics",
316+
action="store_true",
317+
)
318+
parser.add_argument(
319+
"--no-workers",
320+
help="Do not export workers metrics",
321+
action="store_true",
322+
)
323+
parser.add_argument(
324+
"--no-queue",
325+
help="Do not export queue metrics",
326+
action="store_true",
327+
)
328+
parser.add_argument(
329+
"--no-communications",
330+
help="Do not export communications metrics",
331+
action="store_true",
332+
)
333+
parser.add_argument(
334+
"--no-users",
335+
help="Do not export users metrics",
336+
action="store_true",
337+
)
338+
339+
# unsed, but passed by ResourceService
340+
parser.add_argument("shard", default="", help="unused")
341+
parser.add_argument("-c", "--contest", default="", help="unused")
342+
343+
args = parser.parse_args()
344+
345+
service = PrometheusExporter(args)
346+
service.run()
347+
348+
349+
if __name__ == "__main__":
350+
main()

config/cms.conf.sample

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@
4343
"ContestWebServer": [["localhost", 21000]],
4444
"AdminWebServer": [["localhost", 21100]],
4545
"ProxyService": [["localhost", 28600]],
46-
"PrintingService": [["localhost", 25123]]
46+
"PrintingService": [["localhost", 25123]],
47+
"PrometheusExporter": []
4748
},
4849

4950
"other_services":
@@ -189,6 +190,13 @@
189190
"pdf_printing_allowed": false,
190191

191192

193+
"_section": "PrometheusExporter",
194+
195+
"_help": "Listening HTTP address and port for the exporter. If exposed",
196+
"_help": "this may leak private information, make sure to secure this endpoint.",
197+
"prometheus_listen_address": "127.0.0.1",
198+
"prometheus_listen_port": 8811,
199+
192200

193201
"_help": "This is the end of this file."
194202
}

requirements.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,6 @@ pyyaml>=5.3,<6.1 # http://pyyaml.org/wiki/PyYAML
2626
# Only for printing:
2727
pycups==2.0.4 # https://pypi.python.org/pypi/pycups
2828
PyPDF2>=1.26,<3.1 # https://github.com/mstamy2/PyPDF2/blob/master/CHANGELOG
29+
30+
# Only for cmsPrometheusExporter
31+
prometheus-client>=0.7,<0.8 # https://pypi.org/project/prometheus-client

setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ def run(self):
168168
"cmsRemoveUser=cmscontrib.RemoveUser:main",
169169
"cmsSpoolExporter=cmscontrib.SpoolExporter:main",
170170
"cmsMake=cmstaskenv.cmsMake:main",
171+
"cmsPrometheusExporter=cmscontrib.PrometheusExporter:main",
171172
],
172173
"cms.grading.tasktypes": [
173174
"Batch=cms.grading.tasktypes.Batch:Batch",

0 commit comments

Comments
 (0)