|
| 1 | +from contextlib import asynccontextmanager |
| 2 | +from random import Random |
| 3 | +from typing import Callable, TYPE_CHECKING |
| 4 | + |
| 5 | +import sentry_sdk |
| 6 | +from sentry_sdk.consts import OP |
| 7 | +from sentry_sdk.tracing import SENTRY_TRACE_HEADER_NAME, Transaction |
| 8 | +from sentry_sdk.tracing_utils import normalize_incoming_data |
| 9 | +from sentry_sdk.utils import event_from_exception, is_valid_sample_rate |
| 10 | + |
| 11 | +from fastapi_jsonrpc import JsonRpcContext, BaseError, Entrypoint |
| 12 | +from .http import sentry_asgi_context |
| 13 | + |
| 14 | +if TYPE_CHECKING: |
| 15 | + from .integration import FastApiJsonRPCIntegration |
| 16 | + |
| 17 | +_DEFAULT_TRANSACTION_NAME = "generic JRPC request" |
| 18 | +TransactionNameGenerator = Callable[[JsonRpcContext], str] |
| 19 | + |
| 20 | + |
| 21 | +@asynccontextmanager |
| 22 | +async def jrpc_transaction_middleware(ctx: JsonRpcContext): |
| 23 | + """ |
| 24 | + Start new transaction for each JRPC request. Applies same sampling decision for every transaction in the batch. |
| 25 | + """ |
| 26 | + |
| 27 | + current_asgi_context = sentry_asgi_context.get() |
| 28 | + headers = current_asgi_context["asgi_headers"] |
| 29 | + transaction_params = dict( |
| 30 | + # this name is replaced by event processor |
| 31 | + name=_DEFAULT_TRANSACTION_NAME, |
| 32 | + op=OP.HTTP_SERVER, |
| 33 | + source=sentry_sdk.tracing.TRANSACTION_SOURCE_CUSTOM, |
| 34 | + origin="manual", |
| 35 | + ) |
| 36 | + with sentry_sdk.isolation_scope() as jrpc_request_scope: |
| 37 | + jrpc_request_scope.clear() |
| 38 | + |
| 39 | + if SENTRY_TRACE_HEADER_NAME in headers: |
| 40 | + # continue existing trace |
| 41 | + # https://github.com/getsentry/sentry-python/blob/2.19.2/sentry_sdk/scope.py#L471 |
| 42 | + jrpc_request_scope.generate_propagation_context(headers) |
| 43 | + transaction = JrpcTransaction.continue_from_headers( |
| 44 | + normalize_incoming_data(headers), |
| 45 | + **transaction_params, |
| 46 | + ) |
| 47 | + else: |
| 48 | + # no parent transaction, start a new trace |
| 49 | + transaction = JrpcTransaction( |
| 50 | + trace_id=current_asgi_context["sampled_sentry_trace_id"].hex, |
| 51 | + **transaction_params, # type: ignore |
| 52 | + ) |
| 53 | + |
| 54 | + integration: FastApiJsonRPCIntegration | None = sentry_sdk.get_client().get_integration( |
| 55 | + "FastApiJsonRPCIntegration" |
| 56 | + ) |
| 57 | + name_generator = integration.transaction_name_generator if integration else default_transaction_name_generator |
| 58 | + |
| 59 | + with jrpc_request_scope.start_transaction( |
| 60 | + transaction, |
| 61 | + scope=jrpc_request_scope, |
| 62 | + ): |
| 63 | + jrpc_request_scope.add_event_processor( |
| 64 | + make_transaction_info_event_processor(ctx, name_generator) |
| 65 | + ) |
| 66 | + try: |
| 67 | + yield |
| 68 | + except Exception as exc: |
| 69 | + if isinstance(exc, BaseError): |
| 70 | + raise |
| 71 | + |
| 72 | + # attaching event to current transaction |
| 73 | + event, hint = event_from_exception( |
| 74 | + exc, |
| 75 | + client_options=sentry_sdk.get_client().options, |
| 76 | + mechanism={"type": "asgi", "handled": False}, |
| 77 | + ) |
| 78 | + sentry_sdk.capture_event(event, hint=hint) |
| 79 | + |
| 80 | + |
| 81 | +class JrpcTransaction(Transaction): |
| 82 | + """ |
| 83 | + Overrides `_set_initial_sampling_decision` to apply same sampling decision for transactions with same `trace_id`. |
| 84 | + """ |
| 85 | + |
| 86 | + def _set_initial_sampling_decision(self, sampling_context): |
| 87 | + super()._set_initial_sampling_decision(sampling_context) |
| 88 | + # https://github.com/getsentry/sentry-python/blob/2.19.2/sentry_sdk/tracing.py#L1125 |
| 89 | + if self.sampled or not is_valid_sample_rate(self.sample_rate, source="Tracing"): |
| 90 | + return |
| 91 | + |
| 92 | + if not self.sample_rate: |
| 93 | + return |
| 94 | + |
| 95 | + # https://github.com/getsentry/sentry-python/blob/2.19.2/sentry_sdk/tracing.py#L1158 |
| 96 | + self.sampled = Random(self.trace_id).random() < self.sample_rate # noqa: S311 |
| 97 | + |
| 98 | + |
| 99 | +def make_transaction_info_event_processor(ctx: JsonRpcContext, name_generator: TransactionNameGenerator) -> Callable: |
| 100 | + def _event_processor(event, _): |
| 101 | + event["transaction_info"]["source"] = sentry_sdk.tracing.TRANSACTION_SOURCE_CUSTOM |
| 102 | + if ctx.method_route is not None: |
| 103 | + event["transaction"] = name_generator(ctx) |
| 104 | + |
| 105 | + return event |
| 106 | + |
| 107 | + return _event_processor |
| 108 | + |
| 109 | + |
| 110 | +def default_transaction_name_generator(ctx: JsonRpcContext) -> str: |
| 111 | + return f"JRPC:{ctx.method_route.name}" |
| 112 | + |
| 113 | + |
| 114 | +def prepend_jrpc_transaction_middleware(): # noqa: C901 |
| 115 | + # prepend the jrpc_sentry_transaction_middleware to the middlewares list. |
| 116 | + # we cannot patch Entrypoint _init_ directly, since objects can be created before invoking this integration |
| 117 | + |
| 118 | + def _prepend_transaction_middleware(self: Entrypoint): |
| 119 | + if not hasattr(self, "__patched_middlewares__"): |
| 120 | + original_middlewares = self.__dict__.get("middlewares", []) |
| 121 | + self.__patched_middlewares__ = original_middlewares |
| 122 | + |
| 123 | + # middleware was passed manually |
| 124 | + if any(middleware is jrpc_transaction_middleware for middleware in self.__patched_middlewares__): |
| 125 | + return self.__patched_middlewares__ |
| 126 | + |
| 127 | + self.__patched_middlewares__ = [jrpc_transaction_middleware, *self.__patched_middlewares__] |
| 128 | + return self.__patched_middlewares__ |
| 129 | + |
| 130 | + def _middleware_setter(self: Entrypoint, value): |
| 131 | + self.__patched_middlewares__ = value |
| 132 | + _prepend_transaction_middleware(self) |
| 133 | + |
| 134 | + Entrypoint.middlewares = property(_prepend_transaction_middleware, _middleware_setter) |
0 commit comments