Skip to content

Commit 7247ccd

Browse files
committed
add handling for thinking-only requests (currently causes UnexpectedModelBehavior)
1 parent 1a22d68 commit 7247ccd

File tree

2 files changed

+117
-52
lines changed

2 files changed

+117
-52
lines changed

pydantic_ai_slim/pydantic_ai/_agent_graph.py

Lines changed: 113 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,66 @@ def is_agent_node(
143143
return isinstance(node, AgentNode)
144144

145145

146+
def _is_retry_attempt(ctx: GraphRunContext[GraphAgentState, GraphAgentDeps[DepsT, NodeRunEndT]]) -> bool:
147+
# Check if we've already attempted a thinking-only retry to prevent infinite loops
148+
recent_messages = (
149+
ctx.state.message_history[-3:] if len(ctx.state.message_history) >= 3 else ctx.state.message_history
150+
)
151+
for msg in recent_messages:
152+
if isinstance(msg, _messages.ModelRequest):
153+
for part in msg.parts:
154+
if (
155+
isinstance(part, _messages.UserPromptPart)
156+
and isinstance(part.content, str)
157+
and part.content.startswith('[THINKING_RETRY]')
158+
):
159+
return True
160+
return False
161+
162+
163+
async def _create_thinking_retry(
164+
ctx: GraphRunContext[GraphAgentState, GraphAgentDeps[DepsT, NodeRunEndT]],
165+
) -> ModelRequestNode[DepsT, NodeRunEndT]:
166+
# Create retry prompt
167+
retry_prompt = (
168+
'Based on your thinking above, you MUST now provide '
169+
'a specific answer or use the available tools to complete the task. '
170+
'Do not respond with only thinking content. Provide actionable output.'
171+
)
172+
173+
# Create the retry request using UserPromptPart for API compatibility
174+
# We'll use a special content marker to detect this is a thinking retry
175+
retry_part = _messages.UserPromptPart(f'[THINKING_RETRY] {retry_prompt}')
176+
retry_request = _messages.ModelRequest(parts=[retry_part])
177+
178+
# Create new ModelRequestNode for retry (it will add the request to message history)
179+
return ModelRequestNode[DepsT, NodeRunEndT](request=retry_request)
180+
181+
182+
async def _process_response_parts(
183+
parts: list[_messages.ModelResponsePart], texts: list[str], tool_calls: list[_messages.ToolCallPart]
184+
) -> AsyncIterator[_messages.HandleResponseEvent]:
185+
for part in parts:
186+
if isinstance(part, _messages.TextPart):
187+
# ignore empty content for text parts, see #437
188+
if part.content:
189+
texts.append(part.content)
190+
elif isinstance(part, _messages.ToolCallPart):
191+
tool_calls.append(part)
192+
elif isinstance(part, _messages.BuiltinToolCallPart):
193+
yield _messages.BuiltinToolCallEvent(part)
194+
elif isinstance(part, _messages.BuiltinToolReturnPart):
195+
yield _messages.BuiltinToolResultEvent(part)
196+
elif isinstance(part, _messages.ThinkingPart):
197+
# We don't need to do anything with thinking parts in this tool-calling node.
198+
# We need to handle text parts in case there are no tool calls and/or the desired output comes
199+
# from the text, but thinking parts should not directly influence the execution of tools or
200+
# determination of the next node of graph execution here.
201+
pass
202+
else:
203+
assert_never(part)
204+
205+
146206
@dataclasses.dataclass
147207
class UserPromptNode(AgentNode[DepsT, NodeRunEndT]):
148208
"""The node that handles the user prompt and instructions."""
@@ -430,65 +490,67 @@ async def stream(
430490
async for _event in stream:
431491
pass
432492

433-
async def _run_stream( # noqa: C901
493+
async def _run_stream(
434494
self, ctx: GraphRunContext[GraphAgentState, GraphAgentDeps[DepsT, NodeRunEndT]]
435495
) -> AsyncIterator[_messages.HandleResponseEvent]:
436496
if self._events_iterator is None:
437497
# Ensure that the stream is only run once
438-
439-
async def _run_stream() -> AsyncIterator[_messages.HandleResponseEvent]:
440-
texts: list[str] = []
441-
tool_calls: list[_messages.ToolCallPart] = []
442-
for part in self.model_response.parts:
443-
if isinstance(part, _messages.TextPart):
444-
# ignore empty content for text parts, see #437
445-
if part.content:
446-
texts.append(part.content)
447-
elif isinstance(part, _messages.ToolCallPart):
448-
tool_calls.append(part)
449-
elif isinstance(part, _messages.BuiltinToolCallPart):
450-
yield _messages.BuiltinToolCallEvent(part)
451-
elif isinstance(part, _messages.BuiltinToolReturnPart):
452-
yield _messages.BuiltinToolResultEvent(part)
453-
elif isinstance(part, _messages.ThinkingPart):
454-
# We don't need to do anything with thinking parts in this tool-calling node.
455-
# We need to handle text parts in case there are no tool calls and/or the desired output comes
456-
# from the text, but thinking parts should not directly influence the execution of tools or
457-
# determination of the next node of graph execution here.
458-
pass
459-
else:
460-
assert_never(part)
461-
462-
# At the moment, we prioritize at least executing tool calls if they are present.
463-
# In the future, we'd consider making this configurable at the agent or run level.
464-
# This accounts for cases like anthropic returns that might contain a text response
465-
# and a tool call response, where the text response just indicates the tool call will happen.
466-
if tool_calls:
467-
async for event in self._handle_tool_calls(ctx, tool_calls):
468-
yield event
469-
elif texts:
470-
# No events are emitted during the handling of text responses, so we don't need to yield anything
471-
self._next_node = await self._handle_text_response(ctx, texts)
472-
else:
473-
# we've got an empty response, this sometimes happens with anthropic (and perhaps other models)
474-
# when the model has already returned text along side tool calls
475-
# in this scenario, if text responses are allowed, we return text from the most recent model
476-
# response, if any
477-
if isinstance(ctx.deps.output_schema, _output.TextOutputSchema):
478-
for message in reversed(ctx.state.message_history):
479-
if isinstance(message, _messages.ModelResponse):
480-
last_texts = [p.content for p in message.parts if isinstance(p, _messages.TextPart)]
481-
if last_texts:
482-
self._next_node = await self._handle_text_response(ctx, last_texts)
483-
return
484-
485-
raise exceptions.UnexpectedModelBehavior('Received empty model response')
486-
487-
self._events_iterator = _run_stream()
498+
self._events_iterator = self._create_stream_iterator(ctx)
488499

489500
async for event in self._events_iterator:
490501
yield event
491502

503+
async def _create_stream_iterator(
504+
self, ctx: GraphRunContext[GraphAgentState, GraphAgentDeps[DepsT, NodeRunEndT]]
505+
) -> AsyncIterator[_messages.HandleResponseEvent]:
506+
texts: list[str] = []
507+
tool_calls: list[_messages.ToolCallPart] = []
508+
509+
# Process all parts in the model response
510+
async for event in _process_response_parts(self.model_response.parts, texts, tool_calls):
511+
yield event
512+
513+
# Handle the response based on what we found
514+
if tool_calls:
515+
async for event in self._handle_tool_calls(ctx, tool_calls):
516+
yield event
517+
elif texts:
518+
# No events are emitted during the handling of text responses, so we don't need to yield anything
519+
self._next_node = await self._handle_text_response(ctx, texts)
520+
else:
521+
self._next_node = await self._handle_empty_response(ctx)
522+
523+
async def _handle_empty_response(
524+
self, ctx: GraphRunContext[GraphAgentState, GraphAgentDeps[DepsT, NodeRunEndT]]
525+
) -> ModelRequestNode[DepsT, NodeRunEndT] | End[result.FinalResult[NodeRunEndT]]:
526+
# Handle thinking-only responses (responses that contain only ThinkingPart instances)
527+
# This can happen with models that support thinking mode when they don't provide
528+
# actionable output alongside their thinking content.
529+
thinking_parts = [p for p in self.model_response.parts if isinstance(p, _messages.ThinkingPart)]
530+
531+
if thinking_parts and not _is_retry_attempt(ctx):
532+
return await _create_thinking_retry(ctx)
533+
534+
# Original recovery logic - this sometimes happens with anthropic (and perhaps other models)
535+
# when the model has already returned text along side tool calls
536+
# in this scenario, if text responses are allowed, we return text from the most recent model
537+
# response, if any
538+
if isinstance(ctx.deps.output_schema, _output.TextOutputSchema):
539+
if next_node := await self._try_recover_from_history(ctx):
540+
return next_node
541+
542+
raise exceptions.UnexpectedModelBehavior('Received empty model response')
543+
544+
async def _try_recover_from_history(
545+
self, ctx: GraphRunContext[GraphAgentState, GraphAgentDeps[DepsT, NodeRunEndT]]
546+
) -> ModelRequestNode[DepsT, NodeRunEndT] | End[result.FinalResult[NodeRunEndT]] | None:
547+
for message in reversed(ctx.state.message_history):
548+
if isinstance(message, _messages.ModelResponse):
549+
last_texts = [p.content for p in message.parts if isinstance(p, _messages.TextPart)]
550+
if last_texts:
551+
return await self._handle_text_response(ctx, last_texts)
552+
return None
553+
492554
async def _handle_tool_calls(
493555
self,
494556
ctx: GraphRunContext[GraphAgentState, GraphAgentDeps[DepsT, NodeRunEndT]],

pydantic_ai_slim/pydantic_ai/models/google.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,10 @@ async def _map_messages(self, messages: list[ModelMessage]) -> tuple[ContentDict
393393
message_parts = [{'text': ''}]
394394
contents.append({'role': 'user', 'parts': message_parts})
395395
elif isinstance(m, ModelResponse):
396-
contents.append(_content_model_response(m))
396+
model_content = _content_model_response(m)
397+
# Skip model responses with empty parts (e.g., thinking-only responses)
398+
if model_content['parts']:
399+
contents.append(model_content)
397400
else:
398401
assert_never(m)
399402
if instructions := self._get_instructions(messages):

0 commit comments

Comments
 (0)