diff --git a/business_objects/monitor.py b/business_objects/monitor.py index 540741a..65429c5 100644 --- a/business_objects/monitor.py +++ b/business_objects/monitor.py @@ -1,10 +1,10 @@ from typing import Any, List, Optional import datetime from . import general -from .. import enums -from ..models import TaskQueue, Organization -from ..util import prevent_sql_injection -from ..session import session +from submodules.model import enums, telemetry +from submodules.model.models import TaskQueue, Organization +from submodules.model.util import prevent_sql_injection +from submodules.model.session import session from submodules.model.cognition_objects import ( macro as macro_db_bo, markdown_file as markdown_file_db_bo, diff --git a/daemon.py b/daemon.py index 7b0714a..8afed18 100644 --- a/daemon.py +++ b/daemon.py @@ -1,6 +1,7 @@ import threading -from submodules.model.business_objects import general import traceback +from submodules.model.business_objects import general +from submodules.model import telemetry def run_without_db_token(target, *args, **kwargs): @@ -8,10 +9,37 @@ def run_without_db_token(target, *args, **kwargs): DB session token isn't automatically created. You can still do this with general.get_ctx_token but need to return it yourself with remove_and_refresh_session. """ + fn_name = f"{target.__module__}.{target.__name__}" + + def wrapper(): + telemetry.TASKS_IN_PROGRESS.labels( + app_name=telemetry.APP_NAME, + task_name=fn_name, + ).inc() + + try: + target(*args, **kwargs) + except Exception: + telemetry.TASKS_ERRORS.labels( + app_name=telemetry.APP_NAME, + task_name=fn_name, + ).inc() + print("=== Exception in thread ===", flush=True) + print(traceback.format_exc(), flush=True) + print("===========================", flush=True) + else: + telemetry.TASKS_PROCESSED.labels( + app_name=telemetry.APP_NAME, + task_name=fn_name, + ).inc() + finally: + telemetry.TASKS_IN_PROGRESS.labels( + app_name=telemetry.APP_NAME, + task_name=fn_name, + ).dec() + threading.Thread( - target=target, - args=args, - kwargs=kwargs, + target=wrapper, daemon=True, ).start() @@ -21,18 +49,37 @@ def run_with_db_token(target, *args, **kwargs): DB session token is automatically created & returned at the end. Long running threads needs to occasionally daemon.reset_session_token_in_thread to ensure the session doesn't get a timeout. """ + fn_name = f"{target.__module__}.{target.__name__}" # this is a workaround to set the token in the actual thread context def wrapper(): general.get_ctx_token() + telemetry.TASKS_IN_PROGRESS.labels( + app_name=telemetry.APP_NAME, + task_name=fn_name, + ).inc() + try: target(*args, **kwargs) except Exception: + telemetry.TASKS_ERRORS.labels( + app_name=telemetry.APP_NAME, + task_name=fn_name, + ).inc() print("=== Exception in thread ===", flush=True) print(traceback.format_exc(), flush=True) print("===========================", flush=True) + else: + telemetry.TASKS_PROCESSED.labels( + app_name=telemetry.APP_NAME, + task_name=fn_name, + ).inc() finally: general.remove_and_refresh_session() + telemetry.TASKS_IN_PROGRESS.labels( + app_name=telemetry.APP_NAME, + task_name=fn_name, + ).dec() threading.Thread( target=wrapper, diff --git a/telemetry.py b/telemetry.py new file mode 100644 index 0000000..d8ac2b9 --- /dev/null +++ b/telemetry.py @@ -0,0 +1,188 @@ +from typing import Tuple + +import time +import os + +from opentelemetry import trace +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter +from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor +from opentelemetry.instrumentation.logging import LoggingInstrumentor +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from prometheus_client import REGISTRY, Counter, Gauge, Histogram +from prometheus_client.openmetrics.exposition import ( + CONTENT_TYPE_LATEST, + generate_latest, +) +from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint +from starlette.requests import Request +from starlette.responses import Response +from starlette.routing import Match +from starlette.status import HTTP_500_INTERNAL_SERVER_ERROR +from starlette.types import ASGIApp + + +APP_NAME = os.getenv("APP_NAME") +ENABLE_TELEMETRY = os.getenv("ENABLE_TELEMETRY", "false") == "true" + +INFO = Gauge("fastapi_app_info", "FastAPI application information.", ["app_name"]) +REQUESTS = Counter( + "fastapi_requests_total", + "Total count of requests by method and path.", + ["method", "path", "app_name"], +) +RESPONSES = Counter( + "fastapi_responses_total", + "Total count of responses by method, path and status codes.", + ["method", "path", "status_code", "app_name"], +) +REQUESTS_PROCESSING_TIME = Histogram( + "fastapi_requests_duration_seconds", + "Histogram of requests processing time by path (in seconds)", + ["method", "path", "app_name"], +) +EXCEPTIONS = Counter( + "fastapi_exceptions_total", + "Total count of exceptions raised by path and exception type", + ["method", "path", "exception_type", "app_name"], +) +REQUESTS_IN_PROGRESS = Gauge( + "fastapi_requests_in_progress", + "Gauge of requests by method and path currently being processed", + ["method", "path", "app_name"], +) +TASKS_IN_PROGRESS = Gauge( + "cognition_tasks_in_progress", + "Indicates if the task master thread is running (1) or not (0)", + ["task_name", "app_name"], +) +TASKS_PROCESSED = Counter( + "cognition_task_processed_total", + "Total items processed by the task", + ["task_name", "app_name"], +) +TASKS_ERRORS = Counter( + "cognition_task_errors_total", + "Total errors encountered by the task", + ["task_name", "app_name"], +) +WEBSOCKET_EXTERNAL_SUCCESS = Counter( + "cognition_websocket_external_success_total", + "Total successful external websocket connections", + ["app_name", "org_id", "project_id"], +) +WEBSOCKET_EXTERNAL_FAILURE = Counter( + "cognition_websocket_external_failure_total", + "Total failed external websocket connections", + ["app_name", "org_id", "project_id"], +) +WEBSOCKET_INTERNAL_SUCCESS = Counter( + "cognition_websocket_internal_success_total", + "Total successful internal websocket connections", + ["app_name", "org_id", "project_id"], +) +WEBSOCKET_INTERNAL_FAILURE = Counter( + "cognition_websocket_internal_failure_total", + "Total failed internal websocket connections", + ["app_name", "org_id", "project_id"], +) + + +class PrometheusMiddleware(BaseHTTPMiddleware): + def __init__(self, app: ASGIApp, app_name: str = "fastapi-app") -> None: + super().__init__(app) + self.app_name = app_name + INFO.labels(app_name=self.app_name).inc() + + async def dispatch( + self, request: Request, call_next: RequestResponseEndpoint + ) -> Response: + method = request.method + path, is_handled_path = self.get_path(request) + + if not is_handled_path: + return await call_next(request) + + REQUESTS_IN_PROGRESS.labels( + method=method, path=path, app_name=self.app_name + ).inc() + REQUESTS.labels(method=method, path=path, app_name=self.app_name).inc() + before_time = time.perf_counter() + try: + response = await call_next(request) + except BaseException as e: + status_code = HTTP_500_INTERNAL_SERVER_ERROR + EXCEPTIONS.labels( + method=method, + path=path, + exception_type=type(e).__name__, + app_name=self.app_name, + ).inc() + raise e from None + else: + status_code = response.status_code + after_time = time.perf_counter() + # retrieve trace id for exemplar + span = trace.get_current_span() + trace_id = trace.format_trace_id(span.get_span_context().trace_id) + + REQUESTS_PROCESSING_TIME.labels( + method=method, path=path, app_name=self.app_name + ).observe(after_time - before_time, exemplar={"TraceID": trace_id}) + finally: + RESPONSES.labels( + method=method, + path=path, + status_code=status_code, + app_name=self.app_name, + ).inc() + REQUESTS_IN_PROGRESS.labels( + method=method, path=path, app_name=self.app_name + ).dec() + + return response + + @staticmethod + def get_path(request: Request) -> Tuple[str, bool]: + for route in request.app.routes: + match, child_scope = route.matches(request.scope) + if match == Match.FULL: + return route.path, True + + return request.url.path, False + + +def metrics(request: Request) -> Response: + return Response( + generate_latest(REGISTRY), headers={"Content-Type": CONTENT_TYPE_LATEST} + ) + + +def setting_app_name(app_name: str) -> None: + global APP_NAME + if APP_NAME is None: + APP_NAME = app_name + + +def setting_otlp( + app: ASGIApp, app_name: str, endpoint: str, log_correlation: bool = True +) -> None: + # Setting OpenTelemetry + # set the service name to show in traces + resource = Resource.create( + attributes={"service.name": app_name, "compose_service": app_name} + ) + + # set the tracer provider + tracer = TracerProvider(resource=resource) + trace.set_tracer_provider(tracer) + + tracer.add_span_processor( + BatchSpanProcessor(OTLPSpanExporter(endpoint=endpoint, insecure=True)) + ) + + if log_correlation: + LoggingInstrumentor().instrument(set_logging_format=True) + + FastAPIInstrumentor.instrument_app(app, tracer_provider=tracer)