Skip to content

Commit 4b40d2d

Browse files
committed
fix: significantly simplifying instrumentation
Removing the workflow spans, using better suited hook functions to patch into
1 parent 0929e58 commit 4b40d2d

File tree

7 files changed

+112
-207
lines changed

7 files changed

+112
-207
lines changed

sentry_sdk/integrations/pydantic_ai/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,7 @@ def setup_once():
3535
Set up the pydantic-ai integration.
3636
3737
This patches the key methods in pydantic-ai to create Sentry spans for:
38-
- Agent workflow execution (root span)
39-
- Individual agent invocations
38+
- Agent invocations (Agent.run methods)
4039
- Model requests (AI client calls)
4140
- Tool executions
4241
"""

sentry_sdk/integrations/pydantic_ai/patches/agent_run.py

Lines changed: 79 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from sentry_sdk.tracing_utils import set_span_errored
55
from sentry_sdk.utils import event_from_exception
66

7-
from ..spans import agent_workflow_span
7+
from ..spans import invoke_agent_span, update_invoke_agent_span
88

99
from typing import TYPE_CHECKING
1010
from pydantic_ai.agent import Agent
@@ -28,17 +28,29 @@ def _capture_exception(exc):
2828
class _StreamingContextManagerWrapper:
2929
"""Wrapper for streaming methods that return async context managers."""
3030

31-
def __init__(self, agent, original_ctx_manager, is_streaming=True):
32-
# type: (Any, Any, bool) -> None
31+
def __init__(
32+
self,
33+
agent,
34+
original_ctx_manager,
35+
user_prompt,
36+
model,
37+
model_settings,
38+
is_streaming=True,
39+
):
40+
# type: (Any, Any, Any, Any, Any, bool) -> None
3341
self.agent = agent
3442
self.original_ctx_manager = original_ctx_manager
43+
self.user_prompt = user_prompt
44+
self.model = model
45+
self.model_settings = model_settings
3546
self.is_streaming = is_streaming
3647
self._isolation_scope = None # type: Any
37-
self._workflow_span = None # type: Optional[sentry_sdk.tracing.Span]
48+
self._span = None # type: Optional[sentry_sdk.tracing.Span]
49+
self._result = None # type: Any
3850

3951
async def __aenter__(self):
4052
# type: () -> Any
41-
# Set up isolation scope and workflow span
53+
# Set up isolation scope and invoke_agent span
4254
self._isolation_scope = sentry_sdk.isolation_scope()
4355
self._isolation_scope.__enter__()
4456

@@ -47,23 +59,33 @@ async def __aenter__(self):
4759
"pydantic_ai_agent", {"_agent": self.agent, "_streaming": self.is_streaming}
4860
)
4961

50-
# Create workflow span
51-
self._workflow_span = agent_workflow_span(self.agent)
52-
self._workflow_span.__enter__()
62+
# Create invoke_agent span (will be closed in __aexit__)
63+
self._span = invoke_agent_span(
64+
self.user_prompt, self.agent, self.model, self.model_settings
65+
)
66+
self._span.__enter__()
5367

5468
# Enter the original context manager
5569
result = await self.original_ctx_manager.__aenter__()
70+
self._result = result
5671
return result
5772

5873
async def __aexit__(self, exc_type, exc_val, exc_tb):
5974
# type: (Any, Any, Any) -> None
6075
try:
6176
# Exit the original context manager first
6277
await self.original_ctx_manager.__aexit__(exc_type, exc_val, exc_tb)
78+
79+
# Update span with output if successful
80+
if exc_type is None and self._result and hasattr(self._result, "output"):
81+
output = (
82+
self._result.output if hasattr(self._result, "output") else None
83+
)
84+
update_invoke_agent_span(self._span, output)
6385
finally:
64-
# Clean up workflow span
65-
if self._workflow_span:
66-
self._workflow_span.__exit__(exc_type, exc_val, exc_tb)
86+
# Clean up invoke span
87+
if self._span:
88+
self._span.__exit__(exc_type, exc_val, exc_tb)
6789

6890
# Clean up isolation scope
6991
if self._isolation_scope:
@@ -73,7 +95,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
7395
def _create_run_wrapper(original_func, is_streaming=False):
7496
# type: (Callable[..., Any], bool) -> Callable[..., Any]
7597
"""
76-
Wraps the Agent.run method to create a root span for the agent workflow.
98+
Wraps the Agent.run method to create an invoke_agent span.
7799
78100
Args:
79101
original_func: The original run method
@@ -92,19 +114,23 @@ async def wrapper(self, *args, **kwargs):
92114
"pydantic_ai_agent", {"_agent": self, "_streaming": is_streaming}
93115
)
94116

95-
with agent_workflow_span(self):
96-
result = None
117+
# Extract parameters for the span
118+
user_prompt = kwargs.get("user_prompt") or (args[0] if args else None)
119+
model = kwargs.get("model")
120+
model_settings = kwargs.get("model_settings")
121+
122+
# Create invoke_agent span
123+
with invoke_agent_span(user_prompt, self, model, model_settings) as span:
97124
try:
98125
result = await original_func(self, *args, **kwargs)
126+
127+
# Update span with output
128+
output = result.output if hasattr(result, "output") else None
129+
update_invoke_agent_span(span, output)
130+
99131
return result
100132
except Exception as exc:
101133
_capture_exception(exc)
102-
103-
# It could be that there is an "invoke agent" span still open
104-
current_span = sentry_sdk.get_current_span()
105-
if current_span is not None and current_span.timestamp is None:
106-
current_span.__exit__(None, None, None)
107-
108134
raise exc from None
109135

110136
return wrapper
@@ -113,36 +139,22 @@ async def wrapper(self, *args, **kwargs):
113139
def _create_run_sync_wrapper(original_func):
114140
# type: (Callable[..., Any]) -> Callable[..., Any]
115141
"""
116-
Wraps the Agent.run_sync method to create a root span for the agent workflow.
117-
Note: run_sync is always non-streaming.
142+
Wraps the Agent.run_sync method - no span needed as it delegates to run().
143+
144+
Note: run_sync just calls self.run() via run_until_complete, so the
145+
invoke_agent span will be created by the run() wrapper.
118146
"""
119147

120148
@wraps(original_func)
121149
def wrapper(self, *args, **kwargs):
122150
# type: (Any, *Any, **Any) -> Any
123-
# Isolate each workflow so that when agents are run they
124-
# don't touch each other's scopes
125-
with sentry_sdk.isolation_scope():
126-
# Store agent reference and streaming flag in Sentry scope for access in nested spans
127-
# We store the full agent to allow access to tools and system prompts
128-
sentry_sdk.get_current_scope().set_context(
129-
"pydantic_ai_agent", {"_agent": self, "_streaming": False}
130-
)
131-
132-
with agent_workflow_span(self):
133-
result = None
134-
try:
135-
result = original_func(self, *args, **kwargs)
136-
return result
137-
except Exception as exc:
138-
_capture_exception(exc)
139-
140-
# It could be that there is an "invoke agent" span still open
141-
current_span = sentry_sdk.get_current_span()
142-
if current_span is not None and current_span.timestamp is None:
143-
current_span.__exit__(None, None, None)
144-
145-
raise exc from None
151+
# Just call the original function - it will call run() which has the instrumentation
152+
try:
153+
result = original_func(self, *args, **kwargs)
154+
return result
155+
except Exception as exc:
156+
_capture_exception(exc)
157+
raise exc from None
146158

147159
return wrapper
148160

@@ -156,12 +168,22 @@ def _create_streaming_wrapper(original_func):
156168
@wraps(original_func)
157169
def wrapper(self, *args, **kwargs):
158170
# type: (Any, *Any, **Any) -> Any
171+
# Extract parameters for the span
172+
user_prompt = kwargs.get("user_prompt") or (args[0] if args else None)
173+
model = kwargs.get("model")
174+
model_settings = kwargs.get("model_settings")
175+
159176
# Call original function to get the context manager
160177
original_ctx_manager = original_func(self, *args, **kwargs)
161178

162179
# Wrap it with our instrumentation
163180
return _StreamingContextManagerWrapper(
164-
agent=self, original_ctx_manager=original_ctx_manager, is_streaming=True
181+
agent=self,
182+
original_ctx_manager=original_ctx_manager,
183+
user_prompt=user_prompt,
184+
model=model,
185+
model_settings=model_settings,
186+
is_streaming=True,
165187
)
166188

167189
return wrapper
@@ -170,34 +192,22 @@ def wrapper(self, *args, **kwargs):
170192
def _create_streaming_events_wrapper(original_func):
171193
# type: (Callable[..., Any]) -> Callable[..., Any]
172194
"""
173-
Wraps run_stream_events method that returns an async generator/iterator.
195+
Wraps run_stream_events method - no span needed as it delegates to run().
196+
197+
Note: run_stream_events internally calls self.run() with an event_stream_handler,
198+
so the invoke_agent span will be created by the run() wrapper.
174199
"""
175200

176201
@wraps(original_func)
177202
async def wrapper(self, *args, **kwargs):
178203
# type: (Any, *Any, **Any) -> Any
179-
# Isolate each workflow so that when agents are run in asyncio tasks they
180-
# don't touch each other's scopes
181-
with sentry_sdk.isolation_scope():
182-
# Store agent reference and streaming flag in Sentry scope for access in nested spans
183-
sentry_sdk.get_current_scope().set_context(
184-
"pydantic_ai_agent", {"_agent": self, "_streaming": True}
185-
)
186-
187-
with agent_workflow_span(self):
188-
try:
189-
# Call the original generator and yield all events
190-
async for event in original_func(self, *args, **kwargs):
191-
yield event
192-
except Exception as exc:
193-
_capture_exception(exc)
194-
195-
# It could be that there is an "invoke agent" span still open
196-
current_span = sentry_sdk.get_current_span()
197-
if current_span is not None and current_span.timestamp is None:
198-
current_span.__exit__(None, None, None)
199-
200-
raise exc from None
204+
# Just call the original generator - it will call run() which has the instrumentation
205+
try:
206+
async for event in original_func(self, *args, **kwargs):
207+
yield event
208+
except Exception as exc:
209+
_capture_exception(exc)
210+
raise exc from None
201211

202212
return wrapper
203213

0 commit comments

Comments
 (0)