Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from ..prometheus_metrics import (
PrometheusMetrics,
get_prometheus_metrics,
record_asyncio_event_looop_metrics,
record_request_metrics,
record_response_metrics,
)
Expand All @@ -33,15 +34,15 @@

def get_collector_registry(app: web.Application) -> CollectorRegistry:
metrics = app[PROMETHEUS_METRICS_APPKEY]
assert isinstance(metrics, PrometheusMetrics) # nosec
return metrics.registry


async def metrics_handler(request: web.Request):
registry = get_collector_registry(request.app)
metrics = request.app[PROMETHEUS_METRICS_APPKEY]
await record_asyncio_event_looop_metrics(metrics)

# NOTE: Cannot use ProcessPoolExecutor because registry is not pickable
result = await request.loop.run_in_executor(None, generate_latest, registry)
result = await request.loop.run_in_executor(None, generate_latest, metrics.registry)
response = web.Response(body=result)
response.content_type = CONTENT_TYPE_LATEST
return response
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from ..prometheus_metrics import (
PrometheusMetrics,
get_prometheus_metrics,
record_asyncio_event_looop_metrics,
record_request_metrics,
record_response_metrics,
)
Expand Down Expand Up @@ -91,6 +92,7 @@ def _startup(app: FastAPI) -> None:
async def metrics_endpoint(request: Request) -> Response:
prometheus_metrics = request.app.state.prometheus_metrics
assert isinstance(prometheus_metrics, PrometheusMetrics) # nosec
await record_asyncio_event_looop_metrics(prometheus_metrics)

content = await asyncio.get_event_loop().run_in_executor(
None, generate_latest, prometheus_metrics.registry
Expand Down
30 changes: 30 additions & 0 deletions packages/service-library/src/servicelib/prometheus_metrics.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import asyncio
import time
from collections.abc import Iterator
from contextlib import contextmanager
from dataclasses import dataclass
Expand Down Expand Up @@ -40,6 +42,8 @@ class PrometheusMetrics:
request_count: Counter
in_flight_requests: Gauge
response_latency_with_labels: Histogram
event_loop_tasks: Gauge
event_loop_lag: Gauge


def _get_exemplar() -> dict[str, str] | None:
Expand Down Expand Up @@ -89,6 +93,20 @@ def get_prometheus_metrics() -> PrometheusMetrics:
buckets=(0.1, 1, 5, 10),
)

event_loop_tasks = Gauge(
name="asyncio_event_loop_tasks",
documentation="Total number of tasks in the asyncio event loop",
labelnames=[],
registry=registry,
)

event_loop_lag = Gauge(
name="asyncio_event_loop_lag_seconds",
documentation="Time between scheduling and execution of event loop callbacks. >10ms consistently indicates event loop saturation",
labelnames=[],
registry=registry,
)

return PrometheusMetrics(
registry=registry,
process_collector=process_collector,
Expand All @@ -97,6 +115,8 @@ def get_prometheus_metrics() -> PrometheusMetrics:
request_count=request_count,
in_flight_requests=in_flight_requests,
response_latency_with_labels=response_latency_with_labels,
event_loop_tasks=event_loop_tasks,
event_loop_lag=event_loop_lag,
)


Expand Down Expand Up @@ -141,3 +161,13 @@ def record_response_metrics(
metrics.response_latency_with_labels.labels(method, endpoint, user_agent).observe(
amount=response_latency_seconds, exemplar=exemplar
)


async def record_asyncio_event_looop_metrics(metrics: PrometheusMetrics) -> None:

metrics.event_loop_tasks.set(len(asyncio.all_tasks()))

start_time = time.perf_counter()
await asyncio.sleep(0) # Yield control to event loop
lag = time.perf_counter() - start_time
metrics.event_loop_lag.set(lag)
24 changes: 23 additions & 1 deletion packages/service-library/tests/aiohttp/test_monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ def _assert_metrics_contain_entry(
)
assert len(filtered_samples) == 1
sample = filtered_samples[0]
assert sample.value == value
if value is not None:
assert sample.value == value
print(f"Found {metric_name=} with expected {value=}")
return

Expand Down Expand Up @@ -135,3 +136,24 @@ async def test_request_with_simcore_user_agent(client: TestClient, faker: Faker)
},
value=1,
)


async def test_asyncio_event_loop_tasks(client: TestClient):
response = await client.get("/monitored_request")
assert response.status == status.HTTP_200_OK
data = await response.json()
assert data
assert "data" in data
assert data["data"] == "OK"

response = await client.get("/metrics")
assert response.status == status.HTTP_200_OK
metrics_as_text = await response.text()

_assert_metrics_contain_entry(
metrics_as_text,
metric_name="asyncio_event_loop_tasks",
sample_name="asyncio_event_loop_tasks",
labels={},
value=None,
)
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import pytest
from asgi_lifespan import LifespanManager
from fastapi import FastAPI
from fastapi.responses import PlainTextResponse
from httpx import AsyncClient
from prometheus_client.openmetrics.exposition import CONTENT_TYPE_LATEST
from servicelib.fastapi.monitoring import setup_prometheus_instrumentation
Expand All @@ -16,6 +17,11 @@ async def app(app: FastAPI) -> AsyncIterable[FastAPI]:
Fixture that sets up the Prometheus middleware in the FastAPI app.
"""
setup_prometheus_instrumentation(app)

@app.get("/dummy-endpoint")
async def dummy_endpoint() -> PlainTextResponse:
return PlainTextResponse("OK", media_type="text/plain")

async with LifespanManager(app):
yield app

Expand All @@ -29,3 +35,16 @@ async def test_metrics_endpoint(client: AsyncClient, app: FastAPI):
assert response.headers["Content-Type"] == CONTENT_TYPE_LATEST
assert "# HELP" in response.text
assert "# TYPE" in response.text


async def test_asyncio_event_loop_tasks(client: AsyncClient, app: FastAPI):
"""
Test that the /metrics endpoint is available and returns Prometheus metrics.
"""
response = await client.get("/dummy-endpoint")
assert response.status_code == 200

response = await client.get("/metrics")
assert response.status_code == 200
assert response.headers["Content-Type"] == CONTENT_TYPE_LATEST
assert "asyncio_event_loop_tasks" in response.text
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,10 @@
from servicelib.long_running_tasks.models import TaskStatus
from servicelib.rest_constants import X_PRODUCT_NAME_HEADER
from settings_library.tracing import TracingSettings
from tenacity import TryAgain, retry_if_exception_type
from tenacity import TryAgain, retry_if_exception_type, wait_random_exponential
from tenacity.asyncio import AsyncRetrying
from tenacity.before_sleep import before_sleep_log
from tenacity.stop import stop_after_delay
from tenacity.wait import wait_fixed

from ..core.settings import WebServerSettings
from ..exceptions.backend_errors import (
Expand Down Expand Up @@ -253,7 +252,7 @@ async def _wait_for_long_running_task_results(self, lrt_response: httpx.Response

# GET task status now until done
async for attempt in AsyncRetrying(
wait=wait_fixed(0.5),
wait=wait_random_exponential(multiplier=0.5, min=0.5, max=30),
stop=stop_after_delay(_POLL_TIMEOUT),
reraise=True,
retry=retry_if_exception_type(TryAgain),
Expand Down
Loading