Skip to content

Commit fa52e17

Browse files
OpenTelemetry monitoring (#197)
* perf: add telemetry * perf: toggle telemetry * perf: update resource attributes * perf(prometheus): add task monitors * perf: daemon monitoring * perf: update daemon + telemetry * perf: add total tasks metric * perf: remove metrics failure updates * perf: remove unnecessary metric
1 parent 6691db0 commit fa52e17

File tree

3 files changed

+243
-8
lines changed

3 files changed

+243
-8
lines changed

business_objects/monitor.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
from typing import Any, List, Optional
22
import datetime
33
from . import general
4-
from .. import enums
5-
from ..models import TaskQueue, Organization
6-
from ..util import prevent_sql_injection
7-
from ..session import session
4+
from submodules.model import enums, telemetry
5+
from submodules.model.models import TaskQueue, Organization
6+
from submodules.model.util import prevent_sql_injection
7+
from submodules.model.session import session
88
from submodules.model.cognition_objects import (
99
macro as macro_db_bo,
1010
markdown_file as markdown_file_db_bo,

daemon.py

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,45 @@
11
import threading
2-
from submodules.model.business_objects import general
32
import traceback
3+
from submodules.model.business_objects import general
4+
from submodules.model import telemetry
45

56

67
def run_without_db_token(target, *args, **kwargs):
78
"""
89
DB session token isn't automatically created.
910
You can still do this with general.get_ctx_token but need to return it yourself with remove_and_refresh_session.
1011
"""
12+
fn_name = f"{target.__module__}.{target.__name__}"
13+
14+
def wrapper():
15+
telemetry.TASKS_IN_PROGRESS.labels(
16+
app_name=telemetry.APP_NAME,
17+
task_name=fn_name,
18+
).inc()
19+
20+
try:
21+
target(*args, **kwargs)
22+
except Exception:
23+
telemetry.TASKS_ERRORS.labels(
24+
app_name=telemetry.APP_NAME,
25+
task_name=fn_name,
26+
).inc()
27+
print("=== Exception in thread ===", flush=True)
28+
print(traceback.format_exc(), flush=True)
29+
print("===========================", flush=True)
30+
else:
31+
telemetry.TASKS_PROCESSED.labels(
32+
app_name=telemetry.APP_NAME,
33+
task_name=fn_name,
34+
).inc()
35+
finally:
36+
telemetry.TASKS_IN_PROGRESS.labels(
37+
app_name=telemetry.APP_NAME,
38+
task_name=fn_name,
39+
).dec()
40+
1141
threading.Thread(
12-
target=target,
13-
args=args,
14-
kwargs=kwargs,
42+
target=wrapper,
1543
daemon=True,
1644
).start()
1745

@@ -21,18 +49,37 @@ def run_with_db_token(target, *args, **kwargs):
2149
DB session token is automatically created & returned at the end.
2250
Long running threads needs to occasionally daemon.reset_session_token_in_thread to ensure the session doesn't get a timeout.
2351
"""
52+
fn_name = f"{target.__module__}.{target.__name__}"
2453

2554
# this is a workaround to set the token in the actual thread context
2655
def wrapper():
2756
general.get_ctx_token()
57+
telemetry.TASKS_IN_PROGRESS.labels(
58+
app_name=telemetry.APP_NAME,
59+
task_name=fn_name,
60+
).inc()
61+
2862
try:
2963
target(*args, **kwargs)
3064
except Exception:
65+
telemetry.TASKS_ERRORS.labels(
66+
app_name=telemetry.APP_NAME,
67+
task_name=fn_name,
68+
).inc()
3169
print("=== Exception in thread ===", flush=True)
3270
print(traceback.format_exc(), flush=True)
3371
print("===========================", flush=True)
72+
else:
73+
telemetry.TASKS_PROCESSED.labels(
74+
app_name=telemetry.APP_NAME,
75+
task_name=fn_name,
76+
).inc()
3477
finally:
3578
general.remove_and_refresh_session()
79+
telemetry.TASKS_IN_PROGRESS.labels(
80+
app_name=telemetry.APP_NAME,
81+
task_name=fn_name,
82+
).dec()
3683

3784
threading.Thread(
3885
target=wrapper,

telemetry.py

Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
from typing import Tuple
2+
3+
import time
4+
import os
5+
6+
from opentelemetry import trace
7+
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
8+
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
9+
from opentelemetry.instrumentation.logging import LoggingInstrumentor
10+
from opentelemetry.sdk.resources import Resource
11+
from opentelemetry.sdk.trace import TracerProvider
12+
from opentelemetry.sdk.trace.export import BatchSpanProcessor
13+
from prometheus_client import REGISTRY, Counter, Gauge, Histogram
14+
from prometheus_client.openmetrics.exposition import (
15+
CONTENT_TYPE_LATEST,
16+
generate_latest,
17+
)
18+
from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint
19+
from starlette.requests import Request
20+
from starlette.responses import Response
21+
from starlette.routing import Match
22+
from starlette.status import HTTP_500_INTERNAL_SERVER_ERROR
23+
from starlette.types import ASGIApp
24+
25+
26+
APP_NAME = os.getenv("APP_NAME")
27+
ENABLE_TELEMETRY = os.getenv("ENABLE_TELEMETRY", "false") == "true"
28+
29+
INFO = Gauge("fastapi_app_info", "FastAPI application information.", ["app_name"])
30+
REQUESTS = Counter(
31+
"fastapi_requests_total",
32+
"Total count of requests by method and path.",
33+
["method", "path", "app_name"],
34+
)
35+
RESPONSES = Counter(
36+
"fastapi_responses_total",
37+
"Total count of responses by method, path and status codes.",
38+
["method", "path", "status_code", "app_name"],
39+
)
40+
REQUESTS_PROCESSING_TIME = Histogram(
41+
"fastapi_requests_duration_seconds",
42+
"Histogram of requests processing time by path (in seconds)",
43+
["method", "path", "app_name"],
44+
)
45+
EXCEPTIONS = Counter(
46+
"fastapi_exceptions_total",
47+
"Total count of exceptions raised by path and exception type",
48+
["method", "path", "exception_type", "app_name"],
49+
)
50+
REQUESTS_IN_PROGRESS = Gauge(
51+
"fastapi_requests_in_progress",
52+
"Gauge of requests by method and path currently being processed",
53+
["method", "path", "app_name"],
54+
)
55+
TASKS_IN_PROGRESS = Gauge(
56+
"cognition_tasks_in_progress",
57+
"Indicates if the task master thread is running (1) or not (0)",
58+
["task_name", "app_name"],
59+
)
60+
TASKS_PROCESSED = Counter(
61+
"cognition_task_processed_total",
62+
"Total items processed by the task",
63+
["task_name", "app_name"],
64+
)
65+
TASKS_ERRORS = Counter(
66+
"cognition_task_errors_total",
67+
"Total errors encountered by the task",
68+
["task_name", "app_name"],
69+
)
70+
WEBSOCKET_EXTERNAL_SUCCESS = Counter(
71+
"cognition_websocket_external_success_total",
72+
"Total successful external websocket connections",
73+
["app_name", "org_id", "project_id"],
74+
)
75+
WEBSOCKET_EXTERNAL_FAILURE = Counter(
76+
"cognition_websocket_external_failure_total",
77+
"Total failed external websocket connections",
78+
["app_name", "org_id", "project_id"],
79+
)
80+
WEBSOCKET_INTERNAL_SUCCESS = Counter(
81+
"cognition_websocket_internal_success_total",
82+
"Total successful internal websocket connections",
83+
["app_name", "org_id", "project_id"],
84+
)
85+
WEBSOCKET_INTERNAL_FAILURE = Counter(
86+
"cognition_websocket_internal_failure_total",
87+
"Total failed internal websocket connections",
88+
["app_name", "org_id", "project_id"],
89+
)
90+
91+
92+
class PrometheusMiddleware(BaseHTTPMiddleware):
93+
def __init__(self, app: ASGIApp, app_name: str = "fastapi-app") -> None:
94+
super().__init__(app)
95+
self.app_name = app_name
96+
INFO.labels(app_name=self.app_name).inc()
97+
98+
async def dispatch(
99+
self, request: Request, call_next: RequestResponseEndpoint
100+
) -> Response:
101+
method = request.method
102+
path, is_handled_path = self.get_path(request)
103+
104+
if not is_handled_path:
105+
return await call_next(request)
106+
107+
REQUESTS_IN_PROGRESS.labels(
108+
method=method, path=path, app_name=self.app_name
109+
).inc()
110+
REQUESTS.labels(method=method, path=path, app_name=self.app_name).inc()
111+
before_time = time.perf_counter()
112+
try:
113+
response = await call_next(request)
114+
except BaseException as e:
115+
status_code = HTTP_500_INTERNAL_SERVER_ERROR
116+
EXCEPTIONS.labels(
117+
method=method,
118+
path=path,
119+
exception_type=type(e).__name__,
120+
app_name=self.app_name,
121+
).inc()
122+
raise e from None
123+
else:
124+
status_code = response.status_code
125+
after_time = time.perf_counter()
126+
# retrieve trace id for exemplar
127+
span = trace.get_current_span()
128+
trace_id = trace.format_trace_id(span.get_span_context().trace_id)
129+
130+
REQUESTS_PROCESSING_TIME.labels(
131+
method=method, path=path, app_name=self.app_name
132+
).observe(after_time - before_time, exemplar={"TraceID": trace_id})
133+
finally:
134+
RESPONSES.labels(
135+
method=method,
136+
path=path,
137+
status_code=status_code,
138+
app_name=self.app_name,
139+
).inc()
140+
REQUESTS_IN_PROGRESS.labels(
141+
method=method, path=path, app_name=self.app_name
142+
).dec()
143+
144+
return response
145+
146+
@staticmethod
147+
def get_path(request: Request) -> Tuple[str, bool]:
148+
for route in request.app.routes:
149+
match, child_scope = route.matches(request.scope)
150+
if match == Match.FULL:
151+
return route.path, True
152+
153+
return request.url.path, False
154+
155+
156+
def metrics(request: Request) -> Response:
157+
return Response(
158+
generate_latest(REGISTRY), headers={"Content-Type": CONTENT_TYPE_LATEST}
159+
)
160+
161+
162+
def setting_app_name(app_name: str) -> None:
163+
global APP_NAME
164+
if APP_NAME is None:
165+
APP_NAME = app_name
166+
167+
168+
def setting_otlp(
169+
app: ASGIApp, app_name: str, endpoint: str, log_correlation: bool = True
170+
) -> None:
171+
# Setting OpenTelemetry
172+
# set the service name to show in traces
173+
resource = Resource.create(
174+
attributes={"service.name": app_name, "compose_service": app_name}
175+
)
176+
177+
# set the tracer provider
178+
tracer = TracerProvider(resource=resource)
179+
trace.set_tracer_provider(tracer)
180+
181+
tracer.add_span_processor(
182+
BatchSpanProcessor(OTLPSpanExporter(endpoint=endpoint, insecure=True))
183+
)
184+
185+
if log_correlation:
186+
LoggingInstrumentor().instrument(set_logging_format=True)
187+
188+
FastAPIInstrumentor.instrument_app(app, tracer_provider=tracer)

0 commit comments

Comments
 (0)