|  | 
|  | 1 | +from functools import wraps | 
|  | 2 | + | 
|  | 3 | +import sentry_sdk | 
|  | 4 | +from sentry_sdk.tracing_utils import set_span_errored | 
|  | 5 | +from sentry_sdk.utils import event_from_exception | 
|  | 6 | + | 
|  | 7 | +from ..spans import invoke_agent_span, update_invoke_agent_span | 
|  | 8 | + | 
|  | 9 | +from typing import TYPE_CHECKING | 
|  | 10 | +from pydantic_ai.agent import Agent  # type: ignore | 
|  | 11 | + | 
|  | 12 | +if TYPE_CHECKING: | 
|  | 13 | +    from typing import Any, Callable, Optional | 
|  | 14 | + | 
|  | 15 | + | 
|  | 16 | +def _capture_exception(exc): | 
|  | 17 | +    # type: (Any) -> None | 
|  | 18 | +    set_span_errored() | 
|  | 19 | + | 
|  | 20 | +    event, hint = event_from_exception( | 
|  | 21 | +        exc, | 
|  | 22 | +        client_options=sentry_sdk.get_client().options, | 
|  | 23 | +        mechanism={"type": "pydantic_ai", "handled": False}, | 
|  | 24 | +    ) | 
|  | 25 | +    sentry_sdk.capture_event(event, hint=hint) | 
|  | 26 | + | 
|  | 27 | + | 
|  | 28 | +class _StreamingContextManagerWrapper: | 
|  | 29 | +    """Wrapper for streaming methods that return async context managers.""" | 
|  | 30 | + | 
|  | 31 | +    def __init__( | 
|  | 32 | +        self, | 
|  | 33 | +        agent, | 
|  | 34 | +        original_ctx_manager, | 
|  | 35 | +        user_prompt, | 
|  | 36 | +        model, | 
|  | 37 | +        model_settings, | 
|  | 38 | +        is_streaming=True, | 
|  | 39 | +    ): | 
|  | 40 | +        # type: (Any, Any, Any, Any, Any, bool) -> None | 
|  | 41 | +        self.agent = agent | 
|  | 42 | +        self.original_ctx_manager = original_ctx_manager | 
|  | 43 | +        self.user_prompt = user_prompt | 
|  | 44 | +        self.model = model | 
|  | 45 | +        self.model_settings = model_settings | 
|  | 46 | +        self.is_streaming = is_streaming | 
|  | 47 | +        self._isolation_scope = None  # type: Any | 
|  | 48 | +        self._span = None  # type: Optional[sentry_sdk.tracing.Span] | 
|  | 49 | +        self._result = None  # type: Any | 
|  | 50 | + | 
|  | 51 | +    async def __aenter__(self): | 
|  | 52 | +        # type: () -> Any | 
|  | 53 | +        # Set up isolation scope and invoke_agent span | 
|  | 54 | +        self._isolation_scope = sentry_sdk.isolation_scope() | 
|  | 55 | +        self._isolation_scope.__enter__() | 
|  | 56 | + | 
|  | 57 | +        # Store agent reference and streaming flag | 
|  | 58 | +        sentry_sdk.get_current_scope().set_context( | 
|  | 59 | +            "pydantic_ai_agent", {"_agent": self.agent, "_streaming": self.is_streaming} | 
|  | 60 | +        ) | 
|  | 61 | + | 
|  | 62 | +        # Create invoke_agent span (will be closed in __aexit__) | 
|  | 63 | +        self._span = invoke_agent_span( | 
|  | 64 | +            self.user_prompt, self.agent, self.model, self.model_settings | 
|  | 65 | +        ) | 
|  | 66 | +        self._span.__enter__() | 
|  | 67 | + | 
|  | 68 | +        # Enter the original context manager | 
|  | 69 | +        result = await self.original_ctx_manager.__aenter__() | 
|  | 70 | +        self._result = result | 
|  | 71 | +        return result | 
|  | 72 | + | 
|  | 73 | +    async def __aexit__(self, exc_type, exc_val, exc_tb): | 
|  | 74 | +        # type: (Any, Any, Any) -> None | 
|  | 75 | +        try: | 
|  | 76 | +            # Exit the original context manager first | 
|  | 77 | +            await self.original_ctx_manager.__aexit__(exc_type, exc_val, exc_tb) | 
|  | 78 | + | 
|  | 79 | +            # Update span with output if successful | 
|  | 80 | +            if exc_type is None and self._result and hasattr(self._result, "output"): | 
|  | 81 | +                output = ( | 
|  | 82 | +                    self._result.output if hasattr(self._result, "output") else None | 
|  | 83 | +                ) | 
|  | 84 | +                if self._span is not None: | 
|  | 85 | +                    update_invoke_agent_span(self._span, output) | 
|  | 86 | +        finally: | 
|  | 87 | +            sentry_sdk.get_current_scope().remove_context("pydantic_ai_agent") | 
|  | 88 | +            # Clean up invoke span | 
|  | 89 | +            if self._span: | 
|  | 90 | +                self._span.__exit__(exc_type, exc_val, exc_tb) | 
|  | 91 | + | 
|  | 92 | +            # Clean up isolation scope | 
|  | 93 | +            if self._isolation_scope: | 
|  | 94 | +                self._isolation_scope.__exit__(exc_type, exc_val, exc_tb) | 
|  | 95 | + | 
|  | 96 | + | 
|  | 97 | +def _create_run_wrapper(original_func, is_streaming=False): | 
|  | 98 | +    # type: (Callable[..., Any], bool) -> Callable[..., Any] | 
|  | 99 | +    """ | 
|  | 100 | +    Wraps the Agent.run method to create an invoke_agent span. | 
|  | 101 | +
 | 
|  | 102 | +    Args: | 
|  | 103 | +        original_func: The original run method | 
|  | 104 | +        is_streaming: Whether this is a streaming method (for future use) | 
|  | 105 | +    """ | 
|  | 106 | + | 
|  | 107 | +    @wraps(original_func) | 
|  | 108 | +    async def wrapper(self, *args, **kwargs): | 
|  | 109 | +        # type: (Any, *Any, **Any) -> Any | 
|  | 110 | +        # Isolate each workflow so that when agents are run in asyncio tasks they | 
|  | 111 | +        # don't touch each other's scopes | 
|  | 112 | +        with sentry_sdk.isolation_scope(): | 
|  | 113 | +            # Store agent reference and streaming flag in Sentry scope for access in nested spans | 
|  | 114 | +            # We store the full agent to allow access to tools and system prompts | 
|  | 115 | +            sentry_sdk.get_current_scope().set_context( | 
|  | 116 | +                "pydantic_ai_agent", {"_agent": self, "_streaming": is_streaming} | 
|  | 117 | +            ) | 
|  | 118 | + | 
|  | 119 | +            # Extract parameters for the span | 
|  | 120 | +            user_prompt = kwargs.get("user_prompt") or (args[0] if args else None) | 
|  | 121 | +            model = kwargs.get("model") | 
|  | 122 | +            model_settings = kwargs.get("model_settings") | 
|  | 123 | + | 
|  | 124 | +            # Create invoke_agent span | 
|  | 125 | +            with invoke_agent_span(user_prompt, self, model, model_settings) as span: | 
|  | 126 | +                try: | 
|  | 127 | +                    result = await original_func(self, *args, **kwargs) | 
|  | 128 | + | 
|  | 129 | +                    # Update span with output | 
|  | 130 | +                    output = result.output if hasattr(result, "output") else None | 
|  | 131 | +                    update_invoke_agent_span(span, output) | 
|  | 132 | + | 
|  | 133 | +                    return result | 
|  | 134 | +                except Exception as exc: | 
|  | 135 | +                    _capture_exception(exc) | 
|  | 136 | +                    raise exc from None | 
|  | 137 | +                finally: | 
|  | 138 | +                    sentry_sdk.get_current_scope().remove_context("pydantic_ai_agent") | 
|  | 139 | + | 
|  | 140 | +    return wrapper | 
|  | 141 | + | 
|  | 142 | + | 
|  | 143 | +def _create_streaming_wrapper(original_func): | 
|  | 144 | +    # type: (Callable[..., Any]) -> Callable[..., Any] | 
|  | 145 | +    """ | 
|  | 146 | +    Wraps run_stream method that returns an async context manager. | 
|  | 147 | +    """ | 
|  | 148 | + | 
|  | 149 | +    @wraps(original_func) | 
|  | 150 | +    def wrapper(self, *args, **kwargs): | 
|  | 151 | +        # type: (Any, *Any, **Any) -> Any | 
|  | 152 | +        # Extract parameters for the span | 
|  | 153 | +        user_prompt = kwargs.get("user_prompt") or (args[0] if args else None) | 
|  | 154 | +        model = kwargs.get("model") | 
|  | 155 | +        model_settings = kwargs.get("model_settings") | 
|  | 156 | + | 
|  | 157 | +        # Call original function to get the context manager | 
|  | 158 | +        original_ctx_manager = original_func(self, *args, **kwargs) | 
|  | 159 | + | 
|  | 160 | +        # Wrap it with our instrumentation | 
|  | 161 | +        return _StreamingContextManagerWrapper( | 
|  | 162 | +            agent=self, | 
|  | 163 | +            original_ctx_manager=original_ctx_manager, | 
|  | 164 | +            user_prompt=user_prompt, | 
|  | 165 | +            model=model, | 
|  | 166 | +            model_settings=model_settings, | 
|  | 167 | +            is_streaming=True, | 
|  | 168 | +        ) | 
|  | 169 | + | 
|  | 170 | +    return wrapper | 
|  | 171 | + | 
|  | 172 | + | 
|  | 173 | +def _create_streaming_events_wrapper(original_func): | 
|  | 174 | +    # type: (Callable[..., Any]) -> Callable[..., Any] | 
|  | 175 | +    """ | 
|  | 176 | +    Wraps run_stream_events method - no span needed as it delegates to run(). | 
|  | 177 | +
 | 
|  | 178 | +    Note: run_stream_events internally calls self.run() with an event_stream_handler, | 
|  | 179 | +    so the invoke_agent span will be created by the run() wrapper. | 
|  | 180 | +    """ | 
|  | 181 | + | 
|  | 182 | +    @wraps(original_func) | 
|  | 183 | +    async def wrapper(self, *args, **kwargs): | 
|  | 184 | +        # type: (Any, *Any, **Any) -> Any | 
|  | 185 | +        # Just call the original generator - it will call run() which has the instrumentation | 
|  | 186 | +        try: | 
|  | 187 | +            async for event in original_func(self, *args, **kwargs): | 
|  | 188 | +                yield event | 
|  | 189 | +        except Exception as exc: | 
|  | 190 | +            _capture_exception(exc) | 
|  | 191 | +            raise exc from None | 
|  | 192 | + | 
|  | 193 | +    return wrapper | 
|  | 194 | + | 
|  | 195 | + | 
|  | 196 | +def _patch_agent_run(): | 
|  | 197 | +    # type: () -> None | 
|  | 198 | +    """ | 
|  | 199 | +    Patches the Agent run methods to create spans for agent execution. | 
|  | 200 | +
 | 
|  | 201 | +    This patches both non-streaming (run, run_sync) and streaming | 
|  | 202 | +    (run_stream, run_stream_events) methods. | 
|  | 203 | +    """ | 
|  | 204 | + | 
|  | 205 | +    # Store original methods | 
|  | 206 | +    original_run = Agent.run | 
|  | 207 | +    original_run_stream = Agent.run_stream | 
|  | 208 | +    original_run_stream_events = Agent.run_stream_events | 
|  | 209 | + | 
|  | 210 | +    # Wrap and apply patches for non-streaming methods | 
|  | 211 | +    Agent.run = _create_run_wrapper(original_run, is_streaming=False) | 
|  | 212 | + | 
|  | 213 | +    # Wrap and apply patches for streaming methods | 
|  | 214 | +    Agent.run_stream = _create_streaming_wrapper(original_run_stream) | 
|  | 215 | +    Agent.run_stream_events = _create_streaming_events_wrapper( | 
|  | 216 | +        original_run_stream_events | 
|  | 217 | +    ) | 
0 commit comments