|
2 | 2 |
|
3 | 3 | """ |
4 | 4 |
|
| 5 | +import importlib |
| 6 | +import importlib.machinery |
| 7 | +import inspect |
5 | 8 | import logging |
| 9 | +import sys |
| 10 | +from functools import wraps |
| 11 | +from importlib.abc import Loader, MetaPathFinder |
| 12 | +from importlib.machinery import ModuleSpec |
| 13 | +from types import ModuleType |
| 14 | +from typing import Callable, Sequence |
6 | 15 |
|
7 | 16 | from fastapi import FastAPI |
8 | 17 | from httpx import AsyncClient, Client |
@@ -127,3 +136,60 @@ def setup_tracing( |
127 | 136 |
|
128 | 137 | def setup_httpx_client_tracing(client: AsyncClient | Client): |
129 | 138 | HTTPXClientInstrumentor.instrument_client(client) |
| 139 | + |
| 140 | + |
| 141 | +def _create_opentelemetry_function_span(func: Callable): |
| 142 | + """Decorator that wraps a function call in an OpenTelemetry span.""" |
| 143 | + tracer = trace.get_tracer(__name__) |
| 144 | + |
| 145 | + @wraps(func) |
| 146 | + def wrapper(*args, **kwargs): |
| 147 | + with tracer.start_as_current_span(f"{func.__module__}.{func.__name__}"): |
| 148 | + return func(*args, **kwargs) |
| 149 | + |
| 150 | + @wraps(func) |
| 151 | + async def async_wrapper(*args, **kwargs): |
| 152 | + with tracer.start_as_current_span(f"{func.__module__}.{func.__name__}"): |
| 153 | + return await func(*args, **kwargs) |
| 154 | + |
| 155 | + if inspect.iscoroutinefunction(func): |
| 156 | + return async_wrapper |
| 157 | + else: |
| 158 | + return wrapper |
| 159 | + |
| 160 | + |
| 161 | +class _AddTracingSpansLoader(Loader): |
| 162 | + def __init__(self, loader: Loader): |
| 163 | + self.loader = loader |
| 164 | + |
| 165 | + def exec_module(self, module: ModuleType): |
| 166 | + # Execute the module normally |
| 167 | + self.loader.exec_module(module) |
| 168 | + for name, func in inspect.getmembers(module, inspect.isfunction): |
| 169 | + if name in module.__dict__: |
| 170 | + setattr(module, name, _create_opentelemetry_function_span(func)) |
| 171 | + |
| 172 | + |
| 173 | +class _AddTracingSpansFinder(MetaPathFinder): |
| 174 | + def find_spec( |
| 175 | + self, |
| 176 | + fullname: str, |
| 177 | + path: Sequence[str] | None, |
| 178 | + target: ModuleType | None = None, |
| 179 | + ) -> ModuleSpec | None: |
| 180 | + if fullname.startswith("simcore_service"): |
| 181 | + # Find the original spec |
| 182 | + spec = importlib.machinery.PathFinder.find_spec( |
| 183 | + fullname=fullname, path=path |
| 184 | + ) |
| 185 | + # spec = find_spec(fullname, path) |
| 186 | + if spec and spec.loader: |
| 187 | + # Wrap the loader with our DecoratingLoader |
| 188 | + spec.loader = _AddTracingSpansLoader(spec.loader) |
| 189 | + return spec |
| 190 | + |
| 191 | + return None |
| 192 | + |
| 193 | + |
| 194 | +def setup_tracing_spans_for_simcore_service_functions(): |
| 195 | + sys.meta_path.insert(0, _AddTracingSpansFinder()) |
0 commit comments