Skip to content

Commit 5e33f7f

Browse files
🎨 Add prometheus metrics for logstreaming (ITISFoundation#5594)
1 parent 11b7724 commit 5e33f7f

File tree

9 files changed

+156
-38
lines changed

9 files changed

+156
-38
lines changed

‎services/api-server/src/simcore_service_api_server/api/dependencies/rabbitmq.py‎

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,19 @@
1-
from typing import Annotated, cast
1+
import logging
2+
from typing import Annotated, Final, cast
23

34
from fastapi import Depends, FastAPI
45
from pydantic import NonNegativeInt
6+
from servicelib.aiohttp.application_setup import ApplicationSetupError
57
from servicelib.fastapi.dependencies import get_app
68
from servicelib.rabbitmq import RabbitMQClient
9+
from tenacity import before_sleep_log, retry, stop_after_delay, wait_fixed
710

811
from ...services.log_streaming import LogDistributor
912

13+
_MAX_WAIT_FOR_LOG_DISTRIBUTOR_SECONDS: Final[int] = 10
14+
15+
_logger = logging.getLogger(__name__)
16+
1017

1118
def get_rabbitmq_client(app: Annotated[FastAPI, Depends(get_app)]) -> RabbitMQClient:
1219
assert app.state.rabbitmq_client # nosec
@@ -18,6 +25,20 @@ def get_log_distributor(app: Annotated[FastAPI, Depends(get_app)]) -> LogDistrib
1825
return cast(LogDistributor, app.state.log_distributor)
1926

2027

28+
@retry(
29+
wait=wait_fixed(2),
30+
stop=stop_after_delay(_MAX_WAIT_FOR_LOG_DISTRIBUTOR_SECONDS),
31+
before_sleep=before_sleep_log(_logger, logging.WARNING),
32+
reraise=True,
33+
)
34+
async def wait_till_log_distributor_ready(app) -> None:
35+
if not hasattr(app.state, "log_distributor"):
36+
raise ApplicationSetupError(
37+
f"Api server's log_distributor was not ready within {_MAX_WAIT_FOR_LOG_DISTRIBUTOR_SECONDS=} seconds"
38+
)
39+
return
40+
41+
2142
def get_log_check_timeout(app: Annotated[FastAPI, Depends(get_app)]) -> NonNegativeInt:
2243
assert app.state.settings # nosec
2344
return cast(NonNegativeInt, app.state.settings.API_SERVER_LOG_CHECK_TIMEOUT_SECONDS)

‎services/api-server/src/simcore_service_api_server/api/errors/custom_errors.py‎

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ class MissingWallet(CustomBaseError):
1919
pass
2020

2121

22+
class ApplicationSetupError(CustomBaseError):
23+
pass
24+
25+
2226
async def custom_error_handler(_: Request, exc: CustomBaseError):
2327
if isinstance(exc, InsufficientCredits):
2428
return JSONResponse(
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
from dataclasses import dataclass, field
2+
from datetime import timedelta
3+
from typing import Final, cast
4+
5+
from fastapi import FastAPI
6+
from prometheus_client import CollectorRegistry, Gauge
7+
from servicelib.background_task import start_periodic_task, stop_periodic_task
8+
from servicelib.fastapi.prometheus_instrumentation import (
9+
setup_prometheus_instrumentation as setup_rest_instrumentation,
10+
)
11+
from simcore_service_api_server.api.dependencies.rabbitmq import (
12+
get_log_distributor,
13+
wait_till_log_distributor_ready,
14+
)
15+
from simcore_service_api_server.models.schemas.jobs import JobID
16+
17+
from .._meta import PROJECT_NAME
18+
19+
METRICS_NAMESPACE: Final[str] = PROJECT_NAME.replace("-", "_")
20+
21+
22+
@dataclass(slots=True, kw_only=True)
23+
class ApiServerPrometheusInstrumentation:
24+
registry: CollectorRegistry
25+
_logstreaming_queues: Gauge = field(init=False)
26+
27+
def __post_init__(self) -> None:
28+
self._logstreaming_queues = Gauge(
29+
"log_stream_queue_length",
30+
"#Logs in log streaming queue",
31+
["job_id"],
32+
namespace=METRICS_NAMESPACE,
33+
)
34+
35+
def update_metrics(self, log_queue_sizes: dict[JobID, int]):
36+
self._logstreaming_queues.clear()
37+
for job_id, length in log_queue_sizes.items():
38+
self._logstreaming_queues.labels(job_id=job_id).set(length)
39+
40+
41+
async def _collect_prometheus_metrics_task(app: FastAPI):
42+
get_instrumentation(app).update_metrics(
43+
log_queue_sizes=get_log_distributor(app).get_log_queue_sizes()
44+
)
45+
46+
47+
def setup_prometheus_instrumentation(app: FastAPI):
48+
instrumentator = setup_rest_instrumentation(app)
49+
50+
async def on_startup() -> None:
51+
app.state.instrumentation = ApiServerPrometheusInstrumentation(
52+
registry=instrumentator.registry
53+
)
54+
await wait_till_log_distributor_ready(app)
55+
app.state.instrumentation_task = start_periodic_task(
56+
task=_collect_prometheus_metrics_task,
57+
interval=timedelta(
58+
seconds=app.state.settings.API_SERVER_PROMETHEUS_INSTRUMENTATION_COLLECT_SECONDS
59+
),
60+
task_name="prometheus_metrics_collection_task",
61+
app=app,
62+
)
63+
64+
async def on_shutdown() -> None:
65+
assert app.state.instrumentation_task # nosec
66+
await stop_periodic_task(app.state.instrumentation_task)
67+
68+
app.add_event_handler("startup", on_startup)
69+
app.add_event_handler("shutdown", on_shutdown)
70+
71+
72+
def get_instrumentation(app: FastAPI) -> ApiServerPrometheusInstrumentation:
73+
assert (
74+
app.state.instrumentation
75+
), "Instrumentation not setup. Please check the configuration" # nosec
76+
return cast(ApiServerPrometheusInstrumentation, app.state.instrumentation)

‎services/api-server/src/simcore_service_api_server/core/application.py‎

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,6 @@
55
from fastapi_pagination import add_pagination
66
from httpx import HTTPError as HttpxException
77
from models_library.basic_types import BootModeEnum
8-
from servicelib.fastapi.prometheus_instrumentation import (
9-
setup_prometheus_instrumentation,
10-
)
118
from servicelib.logging_utils import config_all_loggers
129
from simcore_service_api_server.api.errors.log_handling_error import (
1310
log_handling_error_handler,
@@ -30,6 +27,7 @@
3027
from ..api.routes.health import router as health_router
3128
from ..services import catalog, director_v2, storage, webserver
3229
from ..services.rabbitmq import setup_rabbitmq
30+
from ._prometheus_instrumentation import setup_prometheus_instrumentation
3331
from .events import create_start_app_handler, create_stop_app_handler
3432
from .openapi import override_openapi_method, use_route_names_as_operation_ids
3533
from .settings import ApplicationSettings

‎services/api-server/src/simcore_service_api_server/core/settings.py‎

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from typing import Any
55

66
from models_library.basic_types import BootModeEnum, LogLevel
7-
from pydantic import Field, NonNegativeInt, SecretStr, parse_obj_as
7+
from pydantic import Field, NonNegativeInt, PositiveInt, SecretStr, parse_obj_as
88
from pydantic.class_validators import validator
99
from settings_library.base import BaseCustomSettings
1010
from settings_library.basic_types import PortInt, VersionTag
@@ -137,6 +137,7 @@ class ApplicationSettings(BasicSettings):
137137
)
138138
API_SERVER_LOG_CHECK_TIMEOUT_SECONDS: NonNegativeInt = 3 * 60
139139
API_SERVER_PROMETHEUS_INSTRUMENTATION_ENABLED: bool = True
140+
API_SERVER_PROMETHEUS_INSTRUMENTATION_COLLECT_SECONDS: PositiveInt = 5
140141
# DEV-TOOLS
141142
API_SERVER_DEV_HTTP_CALLS_LOGS_PATH: Path | None = Field(
142143
default=None,

‎services/api-server/src/simcore_service_api_server/services/log_streaming.py‎

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import asyncio
22
import logging
33
from asyncio import Queue
4-
from typing import AsyncIterable, Awaitable, Callable, Final
4+
from typing import AsyncIterable, Final
55

66
from models_library.rabbitmq_messages import LoggerRabbitMessage
77
from models_library.users import UserID
@@ -31,7 +31,7 @@ class LogStreamerRegistionConflict(LogDistributionBaseException):
3131
class LogDistributor:
3232
def __init__(self, rabbitmq_client: RabbitMQClient):
3333
self._rabbit_client = rabbitmq_client
34-
self._log_streamers: dict[JobID, Callable[[JobLog], Awaitable[None]]] = {}
34+
self._log_streamers: dict[JobID, Queue] = {}
3535
self._queue_name: str
3636

3737
async def setup(self):
@@ -72,22 +72,20 @@ async def _distribute_logs(self, data: bytes):
7272
log_level=got.log_level,
7373
messages=got.messages,
7474
)
75-
callback = self._log_streamers.get(item.job_id)
76-
if callback is None:
75+
queue = self._log_streamers.get(item.job_id)
76+
if queue is None:
7777
raise LogStreamerNotRegistered(
7878
f"Could not forward log because a logstreamer associated with job_id={item.job_id} was not registered"
7979
)
80-
await callback(item)
80+
await queue.put(item)
8181
return True
8282

83-
async def register(
84-
self, job_id: JobID, callback: Callable[[JobLog], Awaitable[None]]
85-
):
83+
async def register(self, job_id: JobID, queue: Queue):
8684
if job_id in self._log_streamers:
8785
raise LogStreamerRegistionConflict(
8886
f"A stream was already connected to {job_id=}. Only a single stream can be connected at the time"
8987
)
90-
self._log_streamers[job_id] = callback
88+
self._log_streamers[job_id] = queue
9189
await self._rabbit_client.add_topics(
9290
LoggerRabbitMessage.get_channel_name(), topics=[f"{job_id}.*"]
9391
)
@@ -100,6 +98,9 @@ async def deregister(self, job_id: JobID):
10098
)
10199
del self._log_streamers[job_id]
102100

101+
def get_log_queue_sizes(self) -> dict[JobID, int]:
102+
return {k: v.qsize() for k, v in self._log_streamers.items()}
103+
103104

104105
class LogStreamer:
105106
def __init__(
@@ -120,7 +121,7 @@ def __init__(
120121
self._log_check_timeout: NonNegativeInt = log_check_timeout
121122

122123
async def setup(self):
123-
await self._log_distributor.register(self._job_id, self._queue.put)
124+
await self._log_distributor.register(self._job_id, self._queue)
124125
self._is_registered = True
125126

126127
async def teardown(self):

‎services/api-server/tests/unit/_with_db/conftest.py‎

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,9 @@ def app_environment(
154154
) -> EnvVarsDict:
155155
"""app environments WITH database settings"""
156156
mocker.patch("simcore_service_api_server.core.application.setup_rabbitmq")
157+
mocker.patch(
158+
"simcore_service_api_server.core.application.setup_prometheus_instrumentation"
159+
)
157160

158161
envs = setenvs_from_dict(monkeypatch, default_app_env_vars)
159162
assert "API_SERVER_POSTGRES" not in envs

‎services/api-server/tests/unit/conftest.py‎

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,9 @@ def mock_missing_plugins(app_environment: EnvVarsDict, mocker: MockerFixture):
8787
settings = ApplicationSettings.create_from_envs()
8888
if settings.API_SERVER_RABBITMQ is None:
8989
mocker.patch("simcore_service_api_server.core.application.setup_rabbitmq")
90+
mocker.patch(
91+
"simcore_service_api_server.core.application.setup_prometheus_instrumentation"
92+
)
9093
return app_environment
9194

9295

‎services/api-server/tests/unit/test_services_rabbitmq.py‎

Lines changed: 34 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
# pylint: disable=too-many-arguments
44
# pylint: disable=unused-argument
55
# pylint: disable=unused-variable
6+
# pylint: disable=R6301
67

78
import asyncio
89
import logging
@@ -16,6 +17,7 @@
1617
import httpx
1718
import pytest
1819
import respx
20+
from attr import dataclass
1921
from faker import Faker
2022
from fastapi import FastAPI, status
2123
from fastapi.encoders import jsonable_encoder
@@ -113,12 +115,18 @@ async def test_subscribe_publish_receive_logs(
113115
log_distributor: LogDistributor,
114116
mocker: MockerFixture,
115117
):
116-
async def _consumer_message_handler(job_log: JobLog):
117-
_consumer_message_handler.called = True
118-
_consumer_message_handler.job_log = job_log
119-
assert isinstance(job_log, JobLog)
118+
@dataclass
119+
class MockQueue:
120+
called: bool = False
121+
job_log: JobLog | None = None
120122

121-
await log_distributor.register(project_id, _consumer_message_handler)
123+
async def put(self, job_log: JobLog):
124+
self.called = True
125+
self.job_log = job_log
126+
assert isinstance(job_log, JobLog)
127+
128+
mock_queue = MockQueue()
129+
await log_distributor.register(project_id, mock_queue) # type: ignore
122130

123131
# log producer
124132
rabbitmq_producer = create_rabbitmq_client("pytest_producer")
@@ -128,16 +136,14 @@ async def _consumer_message_handler(job_log: JobLog):
128136
node_id=node_id,
129137
messages=[faker.text() for _ in range(10)],
130138
)
131-
_consumer_message_handler.called = False
132-
_consumer_message_handler.job_log = None
133139
await rabbitmq_producer.publish(log_message.channel_name, log_message)
134140

135141
# check it received
136142
await asyncio.sleep(1)
137143
await log_distributor.deregister(project_id)
138144

139-
assert _consumer_message_handler.called
140-
job_log = _consumer_message_handler.job_log
145+
assert mock_queue.called
146+
job_log = mock_queue.job_log
141147
assert isinstance(job_log, JobLog)
142148
assert job_log.job_id == log_message.project_id
143149

@@ -147,12 +153,13 @@ async def rabbit_consuming_context(
147153
app: FastAPI,
148154
project_id: ProjectID,
149155
) -> AsyncIterable[AsyncMock]:
150-
consumer_message_handler = AsyncMock()
151156

157+
queue = asyncio.Queue()
158+
queue.put = AsyncMock()
152159
log_distributor: LogDistributor = get_log_distributor(app)
153-
await log_distributor.register(project_id, consumer_message_handler)
160+
await log_distributor.register(project_id, queue)
154161

155-
yield consumer_message_handler
162+
yield queue.put
156163

157164
await log_distributor.deregister(project_id)
158165

@@ -233,10 +240,12 @@ async def test_log_distributor_register_deregister(
233240
):
234241
collected_logs: list[str] = []
235242

236-
async def callback(job_log: JobLog):
237-
for msg in job_log.messages:
238-
collected_logs.append(msg)
243+
class MockQueue:
244+
async def put(self, job_log: JobLog):
245+
for msg in job_log.messages:
246+
collected_logs.append(msg)
239247

248+
queue = MockQueue()
240249
published_logs: list[str] = []
241250

242251
async def _log_publisher():
@@ -246,12 +255,12 @@ async def _log_publisher():
246255
await produce_logs("expected", project_id, node_id, [msg], logging.DEBUG)
247256
published_logs.append(msg)
248257

249-
await log_distributor.register(project_id, callback)
258+
await log_distributor.register(project_id, queue) # type: ignore
250259
publisher_task = asyncio.create_task(_log_publisher())
251260
await asyncio.sleep(0.1)
252261
await log_distributor.deregister(project_id)
253262
await asyncio.sleep(0.1)
254-
await log_distributor.register(project_id, callback)
263+
await log_distributor.register(project_id, queue) # type: ignore
255264
await asyncio.gather(publisher_task)
256265
await asyncio.sleep(0.5)
257266
await log_distributor.deregister(project_id)
@@ -274,12 +283,14 @@ async def test_log_distributor_multiple_streams(
274283

275284
collected_logs: dict[JobID, list[str]] = {id_: [] for id_ in job_ids}
276285

277-
async def callback(job_log: JobLog):
278-
job_id = job_log.job_id
279-
assert (msgs := collected_logs.get(job_id)) is not None
280-
for msg in job_log.messages:
281-
msgs.append(msg)
286+
class MockQueue:
287+
async def put(self, job_log: JobLog):
288+
job_id = job_log.job_id
289+
assert (msgs := collected_logs.get(job_id)) is not None
290+
for msg in job_log.messages:
291+
msgs.append(msg)
282292

293+
queue = MockQueue()
283294
published_logs: dict[JobID, list[str]] = {id_: [] for id_ in job_ids}
284295

285296
async def _log_publisher():
@@ -291,7 +302,7 @@ async def _log_publisher():
291302
published_logs[job_id].append(msg)
292303

293304
for job_id in job_ids:
294-
await log_distributor.register(job_id, callback)
305+
await log_distributor.register(job_id, queue) # type: ignore
295306
publisher_task = asyncio.create_task(_log_publisher())
296307
await asyncio.gather(publisher_task)
297308
await asyncio.sleep(0.5)

0 commit comments

Comments
 (0)