diff --git a/packages/service-library/src/servicelib/aiohttp/monitoring.py b/packages/service-library/src/servicelib/aiohttp/monitoring.py index dae94750569..07e0b08437a 100644 --- a/packages/service-library/src/servicelib/aiohttp/monitoring.py +++ b/packages/service-library/src/servicelib/aiohttp/monitoring.py @@ -20,6 +20,7 @@ from ..prometheus_metrics import ( PrometheusMetrics, get_prometheus_metrics, + record_asyncio_event_looop_metrics, record_request_metrics, record_response_metrics, ) @@ -33,15 +34,15 @@ def get_collector_registry(app: web.Application) -> CollectorRegistry: metrics = app[PROMETHEUS_METRICS_APPKEY] - assert isinstance(metrics, PrometheusMetrics) # nosec return metrics.registry async def metrics_handler(request: web.Request): - registry = get_collector_registry(request.app) + metrics = request.app[PROMETHEUS_METRICS_APPKEY] + await record_asyncio_event_looop_metrics(metrics) # NOTE: Cannot use ProcessPoolExecutor because registry is not pickable - result = await request.loop.run_in_executor(None, generate_latest, registry) + result = await request.loop.run_in_executor(None, generate_latest, metrics.registry) response = web.Response(body=result) response.content_type = CONTENT_TYPE_LATEST return response diff --git a/packages/service-library/src/servicelib/fastapi/monitoring.py b/packages/service-library/src/servicelib/fastapi/monitoring.py index a9c33f0d216..cce3c1c9480 100644 --- a/packages/service-library/src/servicelib/fastapi/monitoring.py +++ b/packages/service-library/src/servicelib/fastapi/monitoring.py @@ -23,6 +23,7 @@ from ..prometheus_metrics import ( PrometheusMetrics, get_prometheus_metrics, + record_asyncio_event_looop_metrics, record_request_metrics, record_response_metrics, ) @@ -91,6 +92,7 @@ def _startup(app: FastAPI) -> None: async def metrics_endpoint(request: Request) -> Response: prometheus_metrics = request.app.state.prometheus_metrics assert isinstance(prometheus_metrics, PrometheusMetrics) # nosec + await record_asyncio_event_looop_metrics(prometheus_metrics) content = await asyncio.get_event_loop().run_in_executor( None, generate_latest, prometheus_metrics.registry diff --git a/packages/service-library/src/servicelib/prometheus_metrics.py b/packages/service-library/src/servicelib/prometheus_metrics.py index 7f24b4e004b..8a42de99d31 100644 --- a/packages/service-library/src/servicelib/prometheus_metrics.py +++ b/packages/service-library/src/servicelib/prometheus_metrics.py @@ -1,3 +1,5 @@ +import asyncio +import time from collections.abc import Iterator from contextlib import contextmanager from dataclasses import dataclass @@ -40,6 +42,8 @@ class PrometheusMetrics: request_count: Counter in_flight_requests: Gauge response_latency_with_labels: Histogram + event_loop_tasks: Gauge + event_loop_lag: Gauge def _get_exemplar() -> dict[str, str] | None: @@ -89,6 +93,20 @@ def get_prometheus_metrics() -> PrometheusMetrics: buckets=(0.1, 1, 5, 10), ) + event_loop_tasks = Gauge( + name="asyncio_event_loop_tasks", + documentation="Total number of tasks in the asyncio event loop", + labelnames=[], + registry=registry, + ) + + event_loop_lag = Gauge( + name="asyncio_event_loop_lag_seconds", + documentation="Time between scheduling and execution of event loop callbacks. >10ms consistently indicates event loop saturation", + labelnames=[], + registry=registry, + ) + return PrometheusMetrics( registry=registry, process_collector=process_collector, @@ -97,6 +115,8 @@ def get_prometheus_metrics() -> PrometheusMetrics: request_count=request_count, in_flight_requests=in_flight_requests, response_latency_with_labels=response_latency_with_labels, + event_loop_tasks=event_loop_tasks, + event_loop_lag=event_loop_lag, ) @@ -141,3 +161,13 @@ def record_response_metrics( metrics.response_latency_with_labels.labels(method, endpoint, user_agent).observe( amount=response_latency_seconds, exemplar=exemplar ) + + +async def record_asyncio_event_looop_metrics(metrics: PrometheusMetrics) -> None: + + metrics.event_loop_tasks.set(len(asyncio.all_tasks())) + + start_time = time.perf_counter() + await asyncio.sleep(0) # Yield control to event loop + lag = time.perf_counter() - start_time + metrics.event_loop_lag.set(lag) diff --git a/packages/service-library/tests/aiohttp/test_monitoring.py b/packages/service-library/tests/aiohttp/test_monitoring.py index a7ad0d003d8..2ae46d4549c 100644 --- a/packages/service-library/tests/aiohttp/test_monitoring.py +++ b/packages/service-library/tests/aiohttp/test_monitoring.py @@ -62,7 +62,8 @@ def _assert_metrics_contain_entry( ) assert len(filtered_samples) == 1 sample = filtered_samples[0] - assert sample.value == value + if value is not None: + assert sample.value == value print(f"Found {metric_name=} with expected {value=}") return @@ -135,3 +136,24 @@ async def test_request_with_simcore_user_agent(client: TestClient, faker: Faker) }, value=1, ) + + +async def test_asyncio_event_loop_tasks(client: TestClient): + response = await client.get("/monitored_request") + assert response.status == status.HTTP_200_OK + data = await response.json() + assert data + assert "data" in data + assert data["data"] == "OK" + + response = await client.get("/metrics") + assert response.status == status.HTTP_200_OK + metrics_as_text = await response.text() + + _assert_metrics_contain_entry( + metrics_as_text, + metric_name="asyncio_event_loop_tasks", + sample_name="asyncio_event_loop_tasks", + labels={}, + value=None, + ) diff --git a/packages/service-library/tests/fastapi/test_prometheus_middleware.py b/packages/service-library/tests/fastapi/test_prometheus_middleware.py index 9d67cc8ee0d..92027dbbed9 100644 --- a/packages/service-library/tests/fastapi/test_prometheus_middleware.py +++ b/packages/service-library/tests/fastapi/test_prometheus_middleware.py @@ -5,6 +5,7 @@ import pytest from asgi_lifespan import LifespanManager from fastapi import FastAPI +from fastapi.responses import PlainTextResponse from httpx import AsyncClient from prometheus_client.openmetrics.exposition import CONTENT_TYPE_LATEST from servicelib.fastapi.monitoring import setup_prometheus_instrumentation @@ -16,6 +17,11 @@ async def app(app: FastAPI) -> AsyncIterable[FastAPI]: Fixture that sets up the Prometheus middleware in the FastAPI app. """ setup_prometheus_instrumentation(app) + + @app.get("/dummy-endpoint") + async def dummy_endpoint() -> PlainTextResponse: + return PlainTextResponse("OK", media_type="text/plain") + async with LifespanManager(app): yield app @@ -29,3 +35,16 @@ async def test_metrics_endpoint(client: AsyncClient, app: FastAPI): assert response.headers["Content-Type"] == CONTENT_TYPE_LATEST assert "# HELP" in response.text assert "# TYPE" in response.text + + +async def test_asyncio_event_loop_tasks(client: AsyncClient, app: FastAPI): + """ + Test that the /metrics endpoint is available and returns Prometheus metrics. + """ + response = await client.get("/dummy-endpoint") + assert response.status_code == 200 + + response = await client.get("/metrics") + assert response.status_code == 200 + assert response.headers["Content-Type"] == CONTENT_TYPE_LATEST + assert "asyncio_event_loop_tasks" in response.text diff --git a/services/api-server/src/simcore_service_api_server/services_http/webserver.py b/services/api-server/src/simcore_service_api_server/services_http/webserver.py index c564c29cbd0..3289b247d08 100644 --- a/services/api-server/src/simcore_service_api_server/services_http/webserver.py +++ b/services/api-server/src/simcore_service_api_server/services_http/webserver.py @@ -50,11 +50,10 @@ from servicelib.long_running_tasks.models import TaskStatus from servicelib.rest_constants import X_PRODUCT_NAME_HEADER from settings_library.tracing import TracingSettings -from tenacity import TryAgain, retry_if_exception_type +from tenacity import TryAgain, retry_if_exception_type, wait_random_exponential from tenacity.asyncio import AsyncRetrying from tenacity.before_sleep import before_sleep_log from tenacity.stop import stop_after_delay -from tenacity.wait import wait_fixed from ..core.settings import WebServerSettings from ..exceptions.backend_errors import ( @@ -253,7 +252,7 @@ async def _wait_for_long_running_task_results(self, lrt_response: httpx.Response # GET task status now until done async for attempt in AsyncRetrying( - wait=wait_fixed(0.5), + wait=wait_random_exponential(multiplier=0.5, min=0.5, max=30), stop=stop_after_delay(_POLL_TIMEOUT), reraise=True, retry=retry_if_exception_type(TryAgain),