Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions business_objects/monitor.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from typing import Any, List, Optional
import datetime
from . import general
from .. import enums
from ..models import TaskQueue, Organization
from ..util import prevent_sql_injection
from ..session import session
from submodules.model import enums, telemetry
from submodules.model.models import TaskQueue, Organization
from submodules.model.util import prevent_sql_injection
from submodules.model.session import session
from submodules.model.cognition_objects import (
macro as macro_db_bo,
markdown_file as markdown_file_db_bo,
Expand Down
55 changes: 51 additions & 4 deletions daemon.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,45 @@
import threading
from submodules.model.business_objects import general
import traceback
from submodules.model.business_objects import general
from submodules.model import telemetry


def run_without_db_token(target, *args, **kwargs):
"""
DB session token isn't automatically created.
You can still do this with general.get_ctx_token but need to return it yourself with remove_and_refresh_session.
"""
fn_name = f"{target.__module__}.{target.__name__}"

def wrapper():
telemetry.TASKS_IN_PROGRESS.labels(
app_name=telemetry.APP_NAME,
task_name=fn_name,
).inc()

try:
target(*args, **kwargs)
except Exception:
telemetry.TASKS_ERRORS.labels(
app_name=telemetry.APP_NAME,
task_name=fn_name,
).inc()
print("=== Exception in thread ===", flush=True)
print(traceback.format_exc(), flush=True)
print("===========================", flush=True)
else:
telemetry.TASKS_PROCESSED.labels(
app_name=telemetry.APP_NAME,
task_name=fn_name,
).inc()
finally:
telemetry.TASKS_IN_PROGRESS.labels(
app_name=telemetry.APP_NAME,
task_name=fn_name,
).dec()

threading.Thread(
target=target,
args=args,
kwargs=kwargs,
target=wrapper,
daemon=True,
).start()

Expand All @@ -21,18 +49,37 @@ def run_with_db_token(target, *args, **kwargs):
DB session token is automatically created & returned at the end.
Long running threads needs to occasionally daemon.reset_session_token_in_thread to ensure the session doesn't get a timeout.
"""
fn_name = f"{target.__module__}.{target.__name__}"

# this is a workaround to set the token in the actual thread context
def wrapper():
general.get_ctx_token()
telemetry.TASKS_IN_PROGRESS.labels(
app_name=telemetry.APP_NAME,
task_name=fn_name,
).inc()

try:
target(*args, **kwargs)
except Exception:
telemetry.TASKS_ERRORS.labels(
app_name=telemetry.APP_NAME,
task_name=fn_name,
).inc()
print("=== Exception in thread ===", flush=True)
print(traceback.format_exc(), flush=True)
print("===========================", flush=True)
else:
telemetry.TASKS_PROCESSED.labels(
app_name=telemetry.APP_NAME,
task_name=fn_name,
).inc()
finally:
general.remove_and_refresh_session()
telemetry.TASKS_IN_PROGRESS.labels(
app_name=telemetry.APP_NAME,
task_name=fn_name,
).dec()

threading.Thread(
target=wrapper,
Expand Down
188 changes: 188 additions & 0 deletions telemetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
from typing import Tuple

import time
import os

from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.logging import LoggingInstrumentor
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from prometheus_client import REGISTRY, Counter, Gauge, Histogram
from prometheus_client.openmetrics.exposition import (
CONTENT_TYPE_LATEST,
generate_latest,
)
from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint
from starlette.requests import Request
from starlette.responses import Response
from starlette.routing import Match
from starlette.status import HTTP_500_INTERNAL_SERVER_ERROR
from starlette.types import ASGIApp


APP_NAME = os.getenv("APP_NAME")
ENABLE_TELEMETRY = os.getenv("ENABLE_TELEMETRY", "false") == "true"

INFO = Gauge("fastapi_app_info", "FastAPI application information.", ["app_name"])
REQUESTS = Counter(
"fastapi_requests_total",
"Total count of requests by method and path.",
["method", "path", "app_name"],
)
RESPONSES = Counter(
"fastapi_responses_total",
"Total count of responses by method, path and status codes.",
["method", "path", "status_code", "app_name"],
)
REQUESTS_PROCESSING_TIME = Histogram(
"fastapi_requests_duration_seconds",
"Histogram of requests processing time by path (in seconds)",
["method", "path", "app_name"],
)
EXCEPTIONS = Counter(
"fastapi_exceptions_total",
"Total count of exceptions raised by path and exception type",
["method", "path", "exception_type", "app_name"],
)
REQUESTS_IN_PROGRESS = Gauge(
"fastapi_requests_in_progress",
"Gauge of requests by method and path currently being processed",
["method", "path", "app_name"],
)
TASKS_IN_PROGRESS = Gauge(
"cognition_tasks_in_progress",
"Indicates if the task master thread is running (1) or not (0)",
["task_name", "app_name"],
)
TASKS_PROCESSED = Counter(
"cognition_task_processed_total",
"Total items processed by the task",
["task_name", "app_name"],
)
TASKS_ERRORS = Counter(
"cognition_task_errors_total",
"Total errors encountered by the task",
["task_name", "app_name"],
)
WEBSOCKET_EXTERNAL_SUCCESS = Counter(
"cognition_websocket_external_success_total",
"Total successful external websocket connections",
["app_name", "org_id", "project_id"],
)
WEBSOCKET_EXTERNAL_FAILURE = Counter(
"cognition_websocket_external_failure_total",
"Total failed external websocket connections",
["app_name", "org_id", "project_id"],
)
WEBSOCKET_INTERNAL_SUCCESS = Counter(
"cognition_websocket_internal_success_total",
"Total successful internal websocket connections",
["app_name", "org_id", "project_id"],
)
WEBSOCKET_INTERNAL_FAILURE = Counter(
"cognition_websocket_internal_failure_total",
"Total failed internal websocket connections",
["app_name", "org_id", "project_id"],
)


class PrometheusMiddleware(BaseHTTPMiddleware):
def __init__(self, app: ASGIApp, app_name: str = "fastapi-app") -> None:
super().__init__(app)
self.app_name = app_name
INFO.labels(app_name=self.app_name).inc()

async def dispatch(
self, request: Request, call_next: RequestResponseEndpoint
) -> Response:
method = request.method
path, is_handled_path = self.get_path(request)

if not is_handled_path:
return await call_next(request)

REQUESTS_IN_PROGRESS.labels(
method=method, path=path, app_name=self.app_name
).inc()
REQUESTS.labels(method=method, path=path, app_name=self.app_name).inc()
before_time = time.perf_counter()
try:
response = await call_next(request)
except BaseException as e:
status_code = HTTP_500_INTERNAL_SERVER_ERROR
EXCEPTIONS.labels(
method=method,
path=path,
exception_type=type(e).__name__,
app_name=self.app_name,
).inc()
raise e from None
else:
status_code = response.status_code
after_time = time.perf_counter()
# retrieve trace id for exemplar
span = trace.get_current_span()
trace_id = trace.format_trace_id(span.get_span_context().trace_id)

REQUESTS_PROCESSING_TIME.labels(
method=method, path=path, app_name=self.app_name
).observe(after_time - before_time, exemplar={"TraceID": trace_id})
finally:
RESPONSES.labels(
method=method,
path=path,
status_code=status_code,
app_name=self.app_name,
).inc()
REQUESTS_IN_PROGRESS.labels(
method=method, path=path, app_name=self.app_name
).dec()

return response

@staticmethod
def get_path(request: Request) -> Tuple[str, bool]:
for route in request.app.routes:
match, child_scope = route.matches(request.scope)
if match == Match.FULL:
return route.path, True

return request.url.path, False


def metrics(request: Request) -> Response:
return Response(
generate_latest(REGISTRY), headers={"Content-Type": CONTENT_TYPE_LATEST}
)


def setting_app_name(app_name: str) -> None:
global APP_NAME
if APP_NAME is None:
APP_NAME = app_name


def setting_otlp(
app: ASGIApp, app_name: str, endpoint: str, log_correlation: bool = True
) -> None:
# Setting OpenTelemetry
# set the service name to show in traces
resource = Resource.create(
attributes={"service.name": app_name, "compose_service": app_name}
)

# set the tracer provider
tracer = TracerProvider(resource=resource)
trace.set_tracer_provider(tracer)

tracer.add_span_processor(
BatchSpanProcessor(OTLPSpanExporter(endpoint=endpoint, insecure=True))
)

if log_correlation:
LoggingInstrumentor().instrument(set_logging_format=True)

FastAPIInstrumentor.instrument_app(app, tracer_provider=tracer)