|
| 1 | +# ------------------------------------ |
| 2 | +# Copyright (c) Microsoft Corporation. |
| 3 | +# Licensed under the MIT License. |
| 4 | +# ------------------------------------ |
| 5 | +"""The decorator to apply if you want the given method traced.""" |
| 6 | +from contextvars import ContextVar |
| 7 | +import functools |
| 8 | +from typing import Awaitable, Any, TypeVar, overload, Optional, Callable, TYPE_CHECKING |
| 9 | +from typing_extensions import ParamSpec |
| 10 | + |
| 11 | +from ._models import SpanKind |
| 12 | +from ._tracer import get_tracer |
| 13 | +from ...settings import settings |
| 14 | + |
| 15 | +if TYPE_CHECKING: |
| 16 | + from ._models import TracingOptions |
| 17 | + |
| 18 | + |
| 19 | +P = ParamSpec("P") |
| 20 | +T = TypeVar("T") |
| 21 | + |
| 22 | + |
| 23 | +# This context variable is used to determine if we are already in the span context of a decorated function. |
| 24 | +_in_span_context = ContextVar("in_span_context", default=False) |
| 25 | + |
| 26 | + |
| 27 | +@overload |
| 28 | +def distributed_trace(__func: Callable[P, T]) -> Callable[P, T]: |
| 29 | + pass |
| 30 | + |
| 31 | + |
| 32 | +@overload |
| 33 | +def distributed_trace(**kwargs: Any) -> Callable[[Callable[P, T]], Callable[P, T]]: |
| 34 | + pass |
| 35 | + |
| 36 | + |
| 37 | +def distributed_trace(__func: Optional[Callable[P, T]] = None, **kwargs: Any) -> Any: # pylint: disable=unused-argument |
| 38 | + """Decorator to apply to an SDK method to have it traced automatically. |
| 39 | +
|
| 40 | + Span will use the method's qualified name. |
| 41 | +
|
| 42 | + Note: This decorator SHOULD NOT be used by application developers. It's intended to be called by client |
| 43 | + libraries only. Application developers should use OpenTelemetry directly to instrument their applications. |
| 44 | +
|
| 45 | + :param callable __func: A function to decorate |
| 46 | +
|
| 47 | + :return: The decorated function |
| 48 | + :rtype: Any |
| 49 | + """ |
| 50 | + |
| 51 | + def decorator(func: Callable[P, T]) -> Callable[P, T]: |
| 52 | + |
| 53 | + @functools.wraps(func) |
| 54 | + def wrapper_use_tracer(*args: Any, **kwargs: Any) -> T: |
| 55 | + # If we are already in the span context of a decorated function, don't trace. |
| 56 | + if _in_span_context.get(): |
| 57 | + return func(*args, **kwargs) |
| 58 | + |
| 59 | + # This will be popped in the pipeline or transport runner. |
| 60 | + tracing_options: TracingOptions = kwargs.get("tracing_options", {}) |
| 61 | + |
| 62 | + # User can explicitly disable tracing for this call |
| 63 | + user_enabled = tracing_options.get("enabled") |
| 64 | + if user_enabled is False: |
| 65 | + return func(*args, **kwargs) |
| 66 | + |
| 67 | + # If tracing is disabled globally and user didn't explicitly enable it, don't trace. |
| 68 | + if not settings.tracing_enabled and user_enabled is None: |
| 69 | + return func(*args, **kwargs) |
| 70 | + |
| 71 | + config = {} |
| 72 | + if args and hasattr(args[0], "_instrumentation_config"): |
| 73 | + config = args[0]._instrumentation_config # pylint: disable=protected-access |
| 74 | + |
| 75 | + method_tracer = get_tracer( |
| 76 | + library_name=config.get("library_name"), |
| 77 | + library_version=config.get("library_version"), |
| 78 | + schema_url=config.get("schema_url"), |
| 79 | + attributes=config.get("attributes"), |
| 80 | + ) |
| 81 | + if not method_tracer: |
| 82 | + return func(*args, **kwargs) |
| 83 | + |
| 84 | + name = func.__qualname__ |
| 85 | + span_suppression_token = _in_span_context.set(True) |
| 86 | + try: |
| 87 | + with method_tracer.start_as_current_span( |
| 88 | + name=name, |
| 89 | + kind=SpanKind.INTERNAL, |
| 90 | + attributes=tracing_options.get("attributes"), |
| 91 | + ) as span: |
| 92 | + try: |
| 93 | + return func(*args, **kwargs) |
| 94 | + except Exception as err: # pylint: disable=broad-except |
| 95 | + ex_type = type(err) |
| 96 | + module = ex_type.__module__ if ex_type.__module__ != "builtins" else "" |
| 97 | + error_type = f"{module}.{ex_type.__qualname__}" if module else ex_type.__qualname__ |
| 98 | + span.set_attribute("error.type", error_type) |
| 99 | + raise |
| 100 | + finally: |
| 101 | + _in_span_context.reset(span_suppression_token) |
| 102 | + |
| 103 | + return wrapper_use_tracer |
| 104 | + |
| 105 | + return decorator if __func is None else decorator(__func) |
| 106 | + |
| 107 | + |
| 108 | +@overload |
| 109 | +def distributed_trace_async(__func: Callable[P, Awaitable[T]]) -> Callable[P, Awaitable[T]]: |
| 110 | + pass |
| 111 | + |
| 112 | + |
| 113 | +@overload |
| 114 | +def distributed_trace_async(**kwargs: Any) -> Callable[[Callable[P, Awaitable[T]]], Callable[P, Awaitable[T]]]: |
| 115 | + pass |
| 116 | + |
| 117 | + |
| 118 | +def distributed_trace_async( # pylint: disable=unused-argument |
| 119 | + __func: Optional[Callable[P, Awaitable[T]]] = None, |
| 120 | + **kwargs: Any, |
| 121 | +) -> Any: |
| 122 | + """Decorator to apply to an SDK method to have it traced automatically. |
| 123 | +
|
| 124 | + Span will use the method's qualified name. |
| 125 | +
|
| 126 | + Note: This decorator SHOULD NOT be used by application developers. It's intended to be called by client |
| 127 | + libraries only. Application developers should use OpenTelemetry directly to instrument their applications. |
| 128 | +
|
| 129 | + :param callable __func: A function to decorate |
| 130 | +
|
| 131 | + :return: The decorated function |
| 132 | + :rtype: Any |
| 133 | + """ |
| 134 | + |
| 135 | + def decorator(func: Callable[P, Awaitable[T]]) -> Callable[P, Awaitable[T]]: |
| 136 | + |
| 137 | + @functools.wraps(func) |
| 138 | + async def wrapper_use_tracer(*args: Any, **kwargs: Any) -> T: |
| 139 | + # If we are already in the span context of a decorated function, don't trace. |
| 140 | + if _in_span_context.get(): |
| 141 | + return await func(*args, **kwargs) |
| 142 | + |
| 143 | + # This will be popped in the pipeline or transport runner. |
| 144 | + tracing_options: TracingOptions = kwargs.get("tracing_options", {}) |
| 145 | + |
| 146 | + # User can explicitly disable tracing for this call |
| 147 | + user_enabled = tracing_options.get("enabled") |
| 148 | + if user_enabled is False: |
| 149 | + return await func(*args, **kwargs) |
| 150 | + |
| 151 | + # If tracing is disabled globally and user didn't explicitly enable it, don't trace. |
| 152 | + if not settings.tracing_enabled and user_enabled is None: |
| 153 | + return await func(*args, **kwargs) |
| 154 | + |
| 155 | + config = {} |
| 156 | + if args and hasattr(args[0], "_instrumentation_config"): |
| 157 | + config = args[0]._instrumentation_config # pylint: disable=protected-access |
| 158 | + |
| 159 | + method_tracer = get_tracer( |
| 160 | + library_name=config.get("library_name"), |
| 161 | + library_version=config.get("library_version"), |
| 162 | + schema_url=config.get("schema_url"), |
| 163 | + attributes=config.get("attributes"), |
| 164 | + ) |
| 165 | + if not method_tracer: |
| 166 | + return await func(*args, **kwargs) |
| 167 | + |
| 168 | + name = func.__qualname__ |
| 169 | + span_suppression_token = _in_span_context.set(True) |
| 170 | + try: |
| 171 | + with method_tracer.start_as_current_span( |
| 172 | + name=name, |
| 173 | + kind=SpanKind.INTERNAL, |
| 174 | + attributes=tracing_options.get("attributes"), |
| 175 | + ) as span: |
| 176 | + try: |
| 177 | + return await func(*args, **kwargs) |
| 178 | + except Exception as err: # pylint: disable=broad-except |
| 179 | + ex_type = type(err) |
| 180 | + module = ex_type.__module__ if ex_type.__module__ != "builtins" else "" |
| 181 | + error_type = f"{module}.{ex_type.__qualname__}" if module else ex_type.__qualname__ |
| 182 | + span.set_attribute("error.type", error_type) |
| 183 | + raise |
| 184 | + finally: |
| 185 | + _in_span_context.reset(span_suppression_token) |
| 186 | + |
| 187 | + return wrapper_use_tracer |
| 188 | + |
| 189 | + return decorator if __func is None else decorator(__func) |
0 commit comments