Skip to content

Commit e84e547

Browse files
committed
feat: Integrate with Prometheus and Grafana
Introduce a basic observability stack using Prometheus for metrics collection and Grafana for visualisation. Prometheus is configured to scrape metrics from the CogStack Model Gateway, Ripper, and Scheduler, while Grafana is set up with some basic dashboards for each service. At this stage, our instrumentation (and visualisations) focus on sheding some light on the total number of requests received and processed, along with information about their outcome (success or failure), the model and task requested. In the future, we should expand our set up for more detailed insights such as the time taken to process requests. Relevant metadata are already provided as labels in the collected metrics (e.g. 'created_at', 'started_at', and 'finished_at' timestamps for each task), so we can easily extend our dashboards to integrate these. Signed-off-by: Phoevos Kalemkeris <[email protected]>
1 parent 8e3332a commit e84e547

File tree

22 files changed

+909
-17
lines changed

22 files changed

+909
-17
lines changed

README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,4 +171,3 @@ The project is still under active development. In the future we will be focusing
171171
* **Smart scheduling**: Implement a more sophisticated scheduling algorithm that takes into account
172172
the state of the cluster.
173173
* **CI/CD**: Set up a continuous integration and deployment pipeline for the project.
174-
* **Monitoring**: Integrate with Prometheus and Grafana.

cogstack_model_gateway/common/config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@
3030
"CMG_QUEUE_NAME": "cmg_tasks",
3131
"CMG_QUEUE_HOST": "rabbitmq",
3232
"CMG_QUEUE_PORT": "5672",
33+
"CMG_SCHEDULER_METRICS_PORT": "8001",
3334
"CMG_SCHEDULER_MAX_CONCURRENT_TASKS": "1",
35+
"CMG_RIPPER_METRICS_PORT": "8002",
3436
},
3537
}
3638

cogstack_model_gateway/gateway/Dockerfile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ RUN curl -sSL https://install.python-poetry.org | python3 -
1919

2020
WORKDIR /app
2121

22+
COPY cogstack_model_gateway/gateway/gunicorn.conf.py ./
2223
COPY pyproject.toml poetry.lock ./
2324
RUN poetry install --only main --no-root --no-directory
2425

@@ -30,4 +31,4 @@ RUN poetry install --only main
3031

3132
EXPOSE 8000
3233

33-
CMD ["poetry", "run", "fastapi", "run", "--workers", "4", "cogstack_model_gateway/gateway/main.py"]
34+
CMD ["poetry", "run", "gunicorn", "-c", "gunicorn.conf.py", "cogstack_model_gateway.gateway.main:app"]
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
from prometheus_client import multiprocess
2+
3+
bind = "0.0.0.0:8000"
4+
5+
workers = 4
6+
7+
worker_class = "uvicorn.workers.UvicornWorker"
8+
9+
10+
def child_exit(server, worker):
11+
"""Mark the Prometheus metrics for this worker as dead before a worker exits.
12+
13+
This function is called by Gunicorn when a worker process exits. It marks the process as dead in
14+
the Prometheus multiprocess registry, allowing MultiProcessCollector to ignore its old files.
15+
"""
16+
multiprocess.mark_process_dead(worker.pid)

cogstack_model_gateway/gateway/main.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,36 @@
11
import logging
2+
import os
23
from contextlib import asynccontextmanager
34

45
import urllib3
56
from fastapi import FastAPI
7+
from prometheus_client import CollectorRegistry, make_asgi_app, multiprocess
68

79
from cogstack_model_gateway.common.config import load_config
810
from cogstack_model_gateway.common.db import DatabaseManager
911
from cogstack_model_gateway.common.logging import configure_logging
1012
from cogstack_model_gateway.common.object_store import ObjectStoreManager
1113
from cogstack_model_gateway.common.queue import QueueManager
1214
from cogstack_model_gateway.common.tasks import TaskManager
15+
from cogstack_model_gateway.gateway.prometheus.metrics import gateway_requests_total
1316
from cogstack_model_gateway.gateway.routers import models, tasks
1417

1518
log = logging.getLogger("cmg.gateway")
1619

1720

21+
def make_metrics_app():
22+
"""Create a registry for each process and aggregate metrics with MultiProcessCollector."""
23+
if "PROMETHEUS_MULTIPROC_DIR" not in os.environ:
24+
raise RuntimeError(
25+
"Environment variable PROMETHEUS_MULTIPROC_DIR is not set. Please set it to a directory"
26+
" where Prometheus can store metrics data for multiprocess mode, e.g. /tmp/prometheus/"
27+
)
28+
os.makedirs(os.environ["PROMETHEUS_MULTIPROC_DIR"], exist_ok=True)
29+
registry = CollectorRegistry()
30+
multiprocess.MultiProcessCollector(registry)
31+
return make_asgi_app(registry=registry)
32+
33+
1834
@asynccontextmanager
1935
async def lifespan(app: FastAPI):
2036
"""Setup gateway and initialize database, object store, queue, and task manager connections."""
@@ -73,6 +89,15 @@ async def lifespan(app: FastAPI):
7389
app.include_router(models.router)
7490
app.include_router(tasks.router)
7591

92+
app.mount("/metrics", make_metrics_app())
93+
94+
95+
@app.middleware("http")
96+
async def prometheus_request_counter(request, call_next):
97+
response = await call_next(request)
98+
gateway_requests_total.labels(method=request.method, endpoint=request.url.path).inc()
99+
return response
100+
76101

77102
@app.get("/")
78103
async def root():
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
from prometheus_client import Counter
2+
3+
gateway_requests_total = Counter(
4+
"gateway_requests_total",
5+
"Total number of HTTP requests to the CogStack Model Gateway",
6+
["method", "endpoint"],
7+
)
8+
9+
gateway_models_deployed_total = Counter(
10+
"gateway_models_deployed_total",
11+
"Total number of models deployed via the CogStack Model Gateway",
12+
["model", "model_uri"],
13+
)
14+
15+
gateway_tasks_processed_total = Counter(
16+
"gateway_tasks_processed_total",
17+
"Total number of tasks processed through the CogStack Model Gateway",
18+
["model", "task"],
19+
)

cogstack_model_gateway/gateway/routers/models.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@
1414
from cogstack_model_gateway.common.tracking import TrackingClient
1515
from cogstack_model_gateway.gateway.core.models import get_running_models, run_model_container
1616
from cogstack_model_gateway.gateway.core.priority import calculate_task_priority
17+
from cogstack_model_gateway.gateway.prometheus.metrics import (
18+
gateway_models_deployed_total,
19+
gateway_tasks_processed_total,
20+
)
1721
from cogstack_model_gateway.gateway.routers.utils import (
1822
get_cms_url,
1923
get_content_type,
@@ -133,6 +137,7 @@ async def get_models(
133137
)
134138
async def get_model_info(model_name: str):
135139
"""Get information about a running model server through its `/info` API."""
140+
gateway_tasks_processed_total.labels(model=model_name, task="info").inc()
136141
# FIXME: Enable SSL verification when certificates are properly set up
137142
response = requests.get(get_cms_url(model_name, "info"), verify=False)
138143
if response.status_code == 404:
@@ -220,6 +225,8 @@ async def deploy_model(
220225
status_code=500, detail=f"Failed to deploy model '{model_name}': {str(e)}"
221226
)
222227

228+
gateway_models_deployed_total.labels(model=model_name, model_uri=model_uri).inc()
229+
223230
log.info(f"Model '{model_name}' deployed successfully with container ID {container.id}")
224231
return {
225232
"message": f"Model '{model_name}' deployed successfully",
@@ -335,4 +342,6 @@ async def execute_task(
335342
qm: QueueManager = config.queue_manager
336343
qm.publish(task_dict, priority)
337344

345+
gateway_tasks_processed_total.labels(model=model_name, task=task).inc()
346+
338347
return {"uuid": task_uuid, "status": "Task submitted successfully"}

cogstack_model_gateway/ripper/main.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import docker
99
from dateutil import parser
1010
from docker.models.containers import Container
11+
from prometheus_client import start_http_server
1112

1213
from cogstack_model_gateway.common.containers import (
1314
IS_MODEL_LABEL,
@@ -16,8 +17,10 @@
1617
TTL_LABEL,
1718
)
1819
from cogstack_model_gateway.common.logging import configure_logging
20+
from cogstack_model_gateway.ripper.prometheus.metrics import containers_purged_total
1921

2022
PURGE_INTERVAL = int(os.getenv("CMG_RIPPER_INTERVAL") or 60)
23+
METRICS_PORT = int(os.getenv("CMG_RIPPER_METRICS_PORT") or 8002)
2124

2225
log = logging.getLogger("cmg.ripper")
2326

@@ -27,6 +30,7 @@ def stop_and_remove_container(container: Container):
2730
log.info(f"Stopping and removing expired container: {container.name}")
2831
container.stop()
2932
container.remove()
33+
containers_purged_total.inc()
3034

3135

3236
def purge_expired_containers():
@@ -71,6 +75,9 @@ def purge_expired_containers():
7175
def main():
7276
"""Run the ripper service."""
7377
configure_logging()
78+
79+
start_http_server(METRICS_PORT)
80+
7481
purge_expired_containers()
7582

7683

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
from prometheus_client import Counter
2+
3+
containers_purged_total = Counter(
4+
"ripper_containers_purged_total",
5+
"Total number of Docker containers purged by the Ripper",
6+
)

cogstack_model_gateway/scheduler/main.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
import logging
22
import sys
33

4-
from cogstack_model_gateway.common.config import load_config
4+
from prometheus_client import start_http_server
5+
6+
from cogstack_model_gateway.common.config import Config, load_config
57
from cogstack_model_gateway.common.db import DatabaseManager
68
from cogstack_model_gateway.common.logging import configure_logging
79
from cogstack_model_gateway.common.object_store import ObjectStoreManager
@@ -12,12 +14,11 @@
1214
log = logging.getLogger("cmg.scheduler")
1315

1416

15-
def initialize_connections() -> tuple[
16-
DatabaseManager, ObjectStoreManager, QueueManager, TaskManager
17-
]:
17+
def initialize_connections(
18+
config: Config,
19+
) -> tuple[DatabaseManager, ObjectStoreManager, QueueManager, TaskManager]:
1820
"""Initialize database, object store, queue, and task manager connections for the scheduler."""
1921
log.info("Initializing database and queue connections")
20-
config = load_config()
2122
dbm = DatabaseManager(
2223
user=config.cmg.db_user,
2324
password=config.cmg.db_password,
@@ -65,7 +66,10 @@ def main():
6566
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
6667

6768
configure_logging()
68-
connections = initialize_connections()
69+
config = load_config()
70+
connections = initialize_connections(config)
71+
72+
start_http_server(int(config.cmg.scheduler_metrics_port))
6973

7074
scheduler = Scheduler(
7175
task_object_store_manager=connections[1],

0 commit comments

Comments
 (0)