Skip to content

Commit a06e745

Browse files
committed
refactor to make diff smaller
1 parent b9a125c commit a06e745

File tree

1 file changed

+71
-110
lines changed

1 file changed

+71
-110
lines changed

pydantic_ai_slim/pydantic_ai/_agent_graph.py

Lines changed: 71 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -143,64 +143,22 @@ def is_agent_node(
143143
return isinstance(node, AgentNode)
144144

145145

146-
def _is_retry_attempt(ctx: GraphRunContext[GraphAgentState, GraphAgentDeps[DepsT, NodeRunEndT]]) -> bool:
147-
# Check if we've already attempted a thinking-only retry to prevent infinite loops
148-
recent_messages = (
149-
ctx.state.message_history[-3:] if len(ctx.state.message_history) >= 3 else ctx.state.message_history
150-
)
151-
for msg in recent_messages:
152-
if isinstance(msg, _messages.ModelRequest):
153-
for part in msg.parts:
154-
if (
155-
isinstance(part, _messages.UserPromptPart)
156-
and isinstance(part.content, str)
157-
and part.content.startswith('[THINKING_RETRY]')
158-
):
159-
return True
160-
return False
161-
162-
163-
async def _create_thinking_retry(
164-
ctx: GraphRunContext[GraphAgentState, GraphAgentDeps[DepsT, NodeRunEndT]],
165-
) -> ModelRequestNode[DepsT, NodeRunEndT]:
166-
# Create retry prompt
167-
retry_prompt = (
168-
'Based on your thinking above, you MUST now provide '
169-
'a specific answer or use the available tools to complete the task. '
170-
'Do not respond with only thinking content. Provide actionable output.'
171-
)
172-
173-
# Create the retry request using UserPromptPart for API compatibility
174-
# We'll use a special content marker to detect this is a thinking retry
175-
retry_part = _messages.UserPromptPart(f'[THINKING_RETRY] {retry_prompt}')
176-
retry_request = _messages.ModelRequest(parts=[retry_part])
146+
def _create_thinking_retry_request(parts: list[_messages.ModelResponsePart]) -> _messages.ModelRequest | None:
147+
"""Handle thinking-only responses (responses that contain only ThinkingPart instances).
177148
178-
# Create new ModelRequestNode for retry (it will add the request to message history)
179-
return ModelRequestNode[DepsT, NodeRunEndT](request=retry_request)
180-
181-
182-
async def _process_response_parts(
183-
parts: list[_messages.ModelResponsePart], texts: list[str], tool_calls: list[_messages.ToolCallPart]
184-
) -> AsyncIterator[_messages.HandleResponseEvent]:
185-
for part in parts:
186-
if isinstance(part, _messages.TextPart):
187-
# ignore empty content for text parts, see #437
188-
if part.content:
189-
texts.append(part.content)
190-
elif isinstance(part, _messages.ToolCallPart):
191-
tool_calls.append(part)
192-
elif isinstance(part, _messages.BuiltinToolCallPart):
193-
yield _messages.BuiltinToolCallEvent(part)
194-
elif isinstance(part, _messages.BuiltinToolReturnPart):
195-
yield _messages.BuiltinToolResultEvent(part)
196-
elif isinstance(part, _messages.ThinkingPart):
197-
# We don't need to do anything with thinking parts in this tool-calling node.
198-
# We need to handle text parts in case there are no tool calls and/or the desired output comes
199-
# from the text, but thinking parts should not directly influence the execution of tools or
200-
# determination of the next node of graph execution here.
201-
pass
202-
else:
203-
assert_never(part)
149+
This can happen with models that support thinking mode when they don't provide
150+
actionable output alongside their thinking content.
151+
"""
152+
thinking_parts = [p for p in parts if isinstance(p, _messages.ThinkingPart)]
153+
if thinking_parts:
154+
# Create the retry request using UserPromptPart for API compatibility
155+
# We'll use a special content marker to detect this is a thinking retry
156+
retry_part = _messages.UserPromptPart(
157+
'Based on your thinking above, you MUST now provide '
158+
'a specific answer or use the available tools to complete the task. '
159+
'Do not respond with only thinking content. Provide actionable output.'
160+
)
161+
return _messages.ModelRequest(parts=[retry_part])
204162

205163

206164
@dataclasses.dataclass
@@ -490,67 +448,70 @@ async def stream(
490448
async for _event in stream:
491449
pass
492450

493-
async def _run_stream(
451+
async def _run_stream( # noqa: C901
494452
self, ctx: GraphRunContext[GraphAgentState, GraphAgentDeps[DepsT, NodeRunEndT]]
495453
) -> AsyncIterator[_messages.HandleResponseEvent]:
496454
if self._events_iterator is None:
497455
# Ensure that the stream is only run once
498-
self._events_iterator = self._create_stream_iterator(ctx)
456+
async def _run_stream() -> AsyncIterator[_messages.HandleResponseEvent]: # noqa: C901
457+
texts: list[str] = []
458+
tool_calls: list[_messages.ToolCallPart] = []
459+
for part in self.model_response.parts:
460+
if isinstance(part, _messages.TextPart):
461+
# ignore empty content for text parts, see #437
462+
if part.content:
463+
texts.append(part.content)
464+
elif isinstance(part, _messages.ToolCallPart):
465+
tool_calls.append(part)
466+
elif isinstance(part, _messages.BuiltinToolCallPart):
467+
yield _messages.BuiltinToolCallEvent(part)
468+
elif isinstance(part, _messages.BuiltinToolReturnPart):
469+
yield _messages.BuiltinToolResultEvent(part)
470+
elif isinstance(part, _messages.ThinkingPart):
471+
# We don't need to do anything with thinking parts in this tool-calling node.
472+
# We need to handle text parts in case there are no tool calls and/or the desired output comes
473+
# from the text, but thinking parts should not directly influence the execution of tools or
474+
# determination of the next node of graph execution here.
475+
pass
476+
else:
477+
assert_never(part)
478+
479+
# At the moment, we prioritize at least executing tool calls if they are present.
480+
# In the future, we'd consider making this configurable at the agent or run level.
481+
# This accounts for cases like anthropic returns that might contain a text response
482+
# and a tool call response, where the text response just indicates the tool call will happen.
483+
if tool_calls:
484+
async for event in self._handle_tool_calls(ctx, tool_calls):
485+
yield event
486+
elif texts:
487+
# No events are emitted during the handling of text responses, so we don't need to yield anything
488+
self._next_node = await self._handle_text_response(ctx, texts)
489+
else:
490+
# we've got an empty response, this sometimes happens with anthropic (and perhaps other models)
491+
# when the model has already returned text along side tool calls
492+
# in this scenario, if text responses are allowed, we return text from the most recent model
493+
# response, if any
494+
if isinstance(ctx.deps.output_schema, _output.TextOutputSchema):
495+
for message in reversed(ctx.state.message_history):
496+
if isinstance(message, _messages.ModelResponse):
497+
last_texts = [p.content for p in message.parts if isinstance(p, _messages.TextPart)]
498+
if last_texts:
499+
self._next_node = await self._handle_text_response(ctx, last_texts)
500+
return
501+
502+
# If there are no preceding model responses, we prompt the model to try again and provide actionable output.
503+
breakpoint()
504+
if retry_request := _create_thinking_retry_request(self.model_response.parts):
505+
self._next_node = ModelRequestNode[DepsT, NodeRunEndT](request=retry_request)
506+
return
507+
508+
raise exceptions.UnexpectedModelBehavior('Received empty model response')
509+
510+
self._events_iterator = _run_stream()
499511

500512
async for event in self._events_iterator:
501513
yield event
502514

503-
async def _create_stream_iterator(
504-
self, ctx: GraphRunContext[GraphAgentState, GraphAgentDeps[DepsT, NodeRunEndT]]
505-
) -> AsyncIterator[_messages.HandleResponseEvent]:
506-
texts: list[str] = []
507-
tool_calls: list[_messages.ToolCallPart] = []
508-
509-
# Process all parts in the model response
510-
async for event in _process_response_parts(self.model_response.parts, texts, tool_calls):
511-
yield event
512-
513-
# Handle the response based on what we found
514-
if tool_calls:
515-
async for event in self._handle_tool_calls(ctx, tool_calls):
516-
yield event
517-
elif texts:
518-
# No events are emitted during the handling of text responses, so we don't need to yield anything
519-
self._next_node = await self._handle_text_response(ctx, texts)
520-
else:
521-
self._next_node = await self._handle_empty_response(ctx)
522-
523-
async def _handle_empty_response(
524-
self, ctx: GraphRunContext[GraphAgentState, GraphAgentDeps[DepsT, NodeRunEndT]]
525-
) -> ModelRequestNode[DepsT, NodeRunEndT] | End[result.FinalResult[NodeRunEndT]]:
526-
# Handle thinking-only responses (responses that contain only ThinkingPart instances)
527-
# This can happen with models that support thinking mode when they don't provide
528-
# actionable output alongside their thinking content.
529-
thinking_parts = [p for p in self.model_response.parts if isinstance(p, _messages.ThinkingPart)]
530-
531-
if thinking_parts and not _is_retry_attempt(ctx):
532-
return await _create_thinking_retry(ctx)
533-
534-
# Original recovery logic - this sometimes happens with anthropic (and perhaps other models)
535-
# when the model has already returned text along side tool calls
536-
# in this scenario, if text responses are allowed, we return text from the most recent model
537-
# response, if any
538-
if isinstance(ctx.deps.output_schema, _output.TextOutputSchema):
539-
if next_node := await self._try_recover_from_history(ctx):
540-
return next_node
541-
542-
raise exceptions.UnexpectedModelBehavior('Received empty model response')
543-
544-
async def _try_recover_from_history(
545-
self, ctx: GraphRunContext[GraphAgentState, GraphAgentDeps[DepsT, NodeRunEndT]]
546-
) -> ModelRequestNode[DepsT, NodeRunEndT] | End[result.FinalResult[NodeRunEndT]] | None:
547-
for message in reversed(ctx.state.message_history):
548-
if isinstance(message, _messages.ModelResponse):
549-
last_texts = [p.content for p in message.parts if isinstance(p, _messages.TextPart)]
550-
if last_texts:
551-
return await self._handle_text_response(ctx, last_texts)
552-
return None
553-
554515
async def _handle_tool_calls(
555516
self,
556517
ctx: GraphRunContext[GraphAgentState, GraphAgentDeps[DepsT, NodeRunEndT]],

0 commit comments

Comments
 (0)