|
| 1 | +""" |
| 2 | +This module provides classes and functions to enable distributed tracing and logging |
| 3 | +using OpenTelemetry. |
| 4 | +
|
| 5 | +Additional dependencies: |
| 6 | + pip install opentelemetry-distro |
| 7 | + opentelemetry-bootstrap --action=install |
| 8 | +
|
| 9 | +Features: |
| 10 | +
|
| 11 | +- An `use_open_telemetry` function that can be used to apply useful configuration. |
| 12 | +- OTELMiddleware: Middleware for automatic tracing of HTTP requests. |
| 13 | +- Environment-based configuration for OpenTelemetry resource attributes. |
| 14 | +- Logging and tracing setup using user-provided exporters. |
| 15 | +- Context manager and decorator utilities for tracing custom operations and function |
| 16 | + calls. |
| 17 | +
|
| 18 | +Usage: |
| 19 | + from blacksheep.server.otel import use_open_telemetry |
| 20 | +
|
| 21 | + # Configure log_exporter and span_exporter as needed |
| 22 | + use_open_telemetry(app, log_exporter, span_exporter) |
| 23 | +""" |
| 24 | + |
| 25 | +import logging |
| 26 | +import os |
| 27 | +from contextlib import contextmanager |
| 28 | +from functools import wraps |
| 29 | +from typing import Awaitable, Callable, Dict, Optional |
| 30 | + |
| 31 | +from opentelemetry import trace |
| 32 | +from opentelemetry.instrumentation.logging import LoggingInstrumentor |
| 33 | +from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler |
| 34 | +from opentelemetry.sdk._logs.export import BatchLogRecordProcessor, LogExporter |
| 35 | +from opentelemetry.sdk.trace import TracerProvider |
| 36 | +from opentelemetry.sdk.trace.export import BatchSpanProcessor, SpanExporter |
| 37 | +from opentelemetry.trace import SpanKind |
| 38 | + |
| 39 | +from blacksheep import Application |
| 40 | +from blacksheep.messages import Request, Response |
| 41 | +from blacksheep.server.env import get_env |
| 42 | + |
| 43 | +ExceptionHandler = Callable[[Request, Exception], Awaitable[Response]] |
| 44 | + |
| 45 | + |
| 46 | +class OTELMiddleware: |
| 47 | + """ |
| 48 | + Middleware configuring OpenTelemetry for all web requests. |
| 49 | + """ |
| 50 | + |
| 51 | + def __init__(self, exc_handler: ExceptionHandler) -> None: |
| 52 | + self._exc_handler = exc_handler |
| 53 | + self._tracer = trace.get_tracer(__name__) |
| 54 | + |
| 55 | + async def __call__(self, request: Request, handler): |
| 56 | + path = request.url.path.decode("utf8") |
| 57 | + method = request.method |
| 58 | + with self._tracer.start_as_current_span( |
| 59 | + f"{method} {path}", kind=SpanKind.SERVER |
| 60 | + ) as span: |
| 61 | + try: |
| 62 | + response = await handler(request) |
| 63 | + except Exception as exc: |
| 64 | + # This approach is correct because it supports controlling the response |
| 65 | + # using exceptions. Unhandled exceptions are handled by the Span. |
| 66 | + response = await self._exc_handler(request, exc) |
| 67 | + |
| 68 | + self.set_span_attributes(span, request, response, path) |
| 69 | + return response |
| 70 | + |
| 71 | + def set_span_attributes( |
| 72 | + self, span: trace.Span, request: Request, response: Response, path: str |
| 73 | + ) -> None: |
| 74 | + """ |
| 75 | + Configure the attributes on the span for a given request-response cycle. |
| 76 | + """ |
| 77 | + # To reduce cardinality, update the span name to use the |
| 78 | + # route that matched the request |
| 79 | + route = request.route # type: ignore |
| 80 | + span.update_name(f"{request.method} {route}") |
| 81 | + |
| 82 | + span.set_attribute("http.status_code", response.status) |
| 83 | + span.set_attribute("http.method", request.method) |
| 84 | + span.set_attribute("http.path", path) |
| 85 | + span.set_attribute("http.url", request.url.value.decode()) |
| 86 | + span.set_attribute("http.route", route) |
| 87 | + span.set_attribute("http.status_code", response.status) |
| 88 | + span.set_attribute("client.ip", request.original_client_ip) |
| 89 | + |
| 90 | + if response.status >= 400: |
| 91 | + span.set_status(trace.Status(trace.StatusCode.ERROR)) |
| 92 | + |
| 93 | + |
| 94 | +def _configure_logging(log_exporter: LogExporter, span_exporter: SpanExporter): |
| 95 | + """ |
| 96 | + - Set up a custom LoggerProvider and attach a BatchLogRecordProcessor with the |
| 97 | + provided log_exporter. |
| 98 | + - Set the log level for the "opentelemetry" logger to WARNING to reduce noise. |
| 99 | + - Add a LoggingHandler to the root logger, ensuring OpenTelemetry logs are |
| 100 | + processed |
| 101 | + - Instrument logging with LoggingInstrumentor().instrument(set_logging_format=True) |
| 102 | + to ensure logs are formatted and correlated with traces. |
| 103 | + - Set up the tracer provider and attaches a BatchSpanProcessor for the given |
| 104 | + span_exporter. |
| 105 | + """ |
| 106 | + log_provider = LoggerProvider() |
| 107 | + log_provider.add_log_record_processor(BatchLogRecordProcessor(log_exporter)) |
| 108 | + logging.getLogger("opentelemetry").setLevel(logging.WARNING) |
| 109 | + logging.getLogger().addHandler( |
| 110 | + LoggingHandler(level=logging.NOTSET, logger_provider=log_provider) |
| 111 | + ) |
| 112 | + |
| 113 | + LoggingInstrumentor().instrument(set_logging_format=True) |
| 114 | + |
| 115 | + trace.set_tracer_provider(TracerProvider()) |
| 116 | + trace.get_tracer_provider().add_span_processor( |
| 117 | + BatchSpanProcessor(span_exporter) |
| 118 | + ) # type: ignore |
| 119 | + |
| 120 | + |
| 121 | +def set_attributes( |
| 122 | + service_name: str, |
| 123 | + service_namespace: str = "default", |
| 124 | + env: str = "", |
| 125 | +): |
| 126 | + """ |
| 127 | + Sets the OTEL_RESOURCE_ATTRIBUTES environment variable with service metadata |
| 128 | + for OpenTelemetry. |
| 129 | +
|
| 130 | + Args: |
| 131 | + service_name (str): The name of the service. |
| 132 | + service_namespace (str, optional): The namespace of the service. Defaults to |
| 133 | + "default". |
| 134 | + env (str, optional): The deployment environment. If not provided, it is |
| 135 | + determined from the environment. |
| 136 | +
|
| 137 | + Returns: |
| 138 | + None |
| 139 | + """ |
| 140 | + if not env: |
| 141 | + env = get_env() |
| 142 | + os.environ["OTEL_RESOURCE_ATTRIBUTES"] = ( |
| 143 | + f"service.name={service_name}," |
| 144 | + f"service.namespace={service_namespace}," |
| 145 | + f"deployment.environment={env}" |
| 146 | + ) |
| 147 | + |
| 148 | + |
| 149 | +def use_open_telemetry( |
| 150 | + app: Application, |
| 151 | + log_exporter: LogExporter, |
| 152 | + span_exporter: SpanExporter, |
| 153 | + middleware: Optional[OTELMiddleware] = None, |
| 154 | +): |
| 155 | + """ |
| 156 | + Configures OpenTelemetry tracing and logging for a BlackSheep application. |
| 157 | +
|
| 158 | + This function sets up OpenTelemetry log and span exporters, configures resource |
| 159 | + attributes, and injects OTEL middleware for automatic tracing of HTTP requests. |
| 160 | + It also patches the router to track matched route patterns and ensures proper |
| 161 | + shutdown of the tracer provider on application stop. |
| 162 | +
|
| 163 | + Args: |
| 164 | + app (Application): The BlackSheep application instance. |
| 165 | + log_exporter (LogExporter): The OpenTelemetry log exporter to use. |
| 166 | + span_exporter (SpanExporter): The OpenTelemetry span exporter to use. |
| 167 | + middleware (optional OTELMiddleware): Custom OTEL middleware instance. |
| 168 | + If not provided, the default OTELMiddleware is used. |
| 169 | +
|
| 170 | + Returns: |
| 171 | + None |
| 172 | + """ |
| 173 | + if os.getenv("OTEL_RESOURCE_ATTRIBUTES") is None: |
| 174 | + # set a default value |
| 175 | + set_attributes("blacksheep-app") |
| 176 | + |
| 177 | + _configure_logging(log_exporter, span_exporter) |
| 178 | + |
| 179 | + # Insert the middleware at the beginning of the middlewares list |
| 180 | + @app.on_middlewares_configuration |
| 181 | + def add_otel_middleware(app): |
| 182 | + app.middlewares.insert( |
| 183 | + 0, middleware or OTELMiddleware(app.handle_request_handler_exception) |
| 184 | + ) |
| 185 | + |
| 186 | + @app.on_start |
| 187 | + async def on_start(app): |
| 188 | + # Patch the router to keep track of the route pattern that matched the request, |
| 189 | + # if any. |
| 190 | + # https://www.neoteroi.dev/blacksheep/routing/#how-to-track-routes-that-matched-a-request |
| 191 | + def wrap_get_route_match(fn): |
| 192 | + @wraps(fn) |
| 193 | + def get_route_match(request): |
| 194 | + match = fn(request) |
| 195 | + request.route = match.pattern.decode() if match else "Not Found" |
| 196 | + return match |
| 197 | + |
| 198 | + return get_route_match |
| 199 | + |
| 200 | + app.router.get_match = wrap_get_route_match(app.router.get_match) |
| 201 | + |
| 202 | + @app.on_stop |
| 203 | + async def on_stop(app): |
| 204 | + # Try calling shutdown() on app stop to flush all remaining spans. |
| 205 | + try: |
| 206 | + trace.get_tracer_provider().shutdown() |
| 207 | + except TypeError: |
| 208 | + pass |
| 209 | + |
| 210 | + |
| 211 | +@contextmanager |
| 212 | +def client_span_context( |
| 213 | + operation_name: str, attributes: Dict[str, str], *args, **kwargs |
| 214 | +): |
| 215 | + tracer = trace.get_tracer(__name__) |
| 216 | + with tracer.start_as_current_span(operation_name, kind=SpanKind.CLIENT) as span: |
| 217 | + span.set_attributes(attributes) |
| 218 | + for i, value in enumerate(args): |
| 219 | + span.set_attribute(f"@arg{i}", str(value)) |
| 220 | + for key, value in kwargs.items(): |
| 221 | + span.set_attribute(f"@{key}", str(value)) |
| 222 | + try: |
| 223 | + yield |
| 224 | + except Exception as ex: |
| 225 | + span.record_exception(ex) |
| 226 | + span.set_attribute("ERROR", str(ex)) |
| 227 | + span.set_attribute("http.status_code", 500) |
| 228 | + span.set_status(trace.Status(trace.StatusCode.ERROR)) |
| 229 | + raise |
| 230 | + |
| 231 | + |
| 232 | +def logcall(component="Service"): |
| 233 | + """ |
| 234 | + Wraps a function to log each call using OpenTelemetry, as SpanKind.CLIENT. |
| 235 | + """ |
| 236 | + |
| 237 | + def log_decorator(fn): |
| 238 | + @wraps(fn) |
| 239 | + async def wrapper(*args, **kwargs): |
| 240 | + with client_span_context( |
| 241 | + fn.__name__, {"component": component}, *args, **kwargs |
| 242 | + ): |
| 243 | + return await fn(*args, **kwargs) |
| 244 | + |
| 245 | + return wrapper |
| 246 | + |
| 247 | + return log_decorator |
0 commit comments