@@ -143,64 +143,23 @@ def is_agent_node(
143143 return isinstance (node , AgentNode )
144144
145145
146- def _is_retry_attempt (ctx : GraphRunContext [GraphAgentState , GraphAgentDeps [DepsT , NodeRunEndT ]]) -> bool :
147- # Check if we've already attempted a thinking-only retry to prevent infinite loops
148- recent_messages = (
149- ctx .state .message_history [- 3 :] if len (ctx .state .message_history ) >= 3 else ctx .state .message_history
150- )
151- for msg in recent_messages :
152- if isinstance (msg , _messages .ModelRequest ):
153- for part in msg .parts :
154- if (
155- isinstance (part , _messages .UserPromptPart )
156- and isinstance (part .content , str )
157- and part .content .startswith ('[THINKING_RETRY]' )
158- ):
159- return True
160- return False
161-
162-
163- async def _create_thinking_retry (
164- ctx : GraphRunContext [GraphAgentState , GraphAgentDeps [DepsT , NodeRunEndT ]],
165- ) -> ModelRequestNode [DepsT , NodeRunEndT ]:
166- # Create retry prompt
167- retry_prompt = (
168- 'Based on your thinking above, you MUST now provide '
169- 'a specific answer or use the available tools to complete the task. '
170- 'Do not respond with only thinking content. Provide actionable output.'
171- )
172-
173- # Create the retry request using UserPromptPart for API compatibility
174- # We'll use a special content marker to detect this is a thinking retry
175- retry_part = _messages .UserPromptPart (f'[THINKING_RETRY] { retry_prompt } ' )
176- retry_request = _messages .ModelRequest (parts = [retry_part ])
146+ def _create_thinking_retry_request (parts : list [_messages .ModelResponsePart ]) -> _messages .ModelRequest | None :
147+ # Handle thinking-only responses (responses that contain only ThinkingPart instances)
148+ # This can happen with models that support thinking mode when they don't provide
149+ # actionable output alongside their thinking content.
150+ thinking_parts = [p for p in parts if isinstance (p , _messages .ThinkingPart )]
151+ if thinking_parts :
152+ # Create retry prompt
153+ retry_prompt = (
154+ 'Based on your thinking above, you MUST now provide '
155+ 'a specific answer or use the available tools to complete the task. '
156+ 'Do not respond with only thinking content. Provide actionable output.'
157+ )
177158
178- # Create new ModelRequestNode for retry (it will add the request to message history)
179- return ModelRequestNode [DepsT , NodeRunEndT ](request = retry_request )
180-
181-
182- async def _process_response_parts (
183- parts : list [_messages .ModelResponsePart ], texts : list [str ], tool_calls : list [_messages .ToolCallPart ]
184- ) -> AsyncIterator [_messages .HandleResponseEvent ]:
185- for part in parts :
186- if isinstance (part , _messages .TextPart ):
187- # ignore empty content for text parts, see #437
188- if part .content :
189- texts .append (part .content )
190- elif isinstance (part , _messages .ToolCallPart ):
191- tool_calls .append (part )
192- elif isinstance (part , _messages .BuiltinToolCallPart ):
193- yield _messages .BuiltinToolCallEvent (part )
194- elif isinstance (part , _messages .BuiltinToolReturnPart ):
195- yield _messages .BuiltinToolResultEvent (part )
196- elif isinstance (part , _messages .ThinkingPart ):
197- # We don't need to do anything with thinking parts in this tool-calling node.
198- # We need to handle text parts in case there are no tool calls and/or the desired output comes
199- # from the text, but thinking parts should not directly influence the execution of tools or
200- # determination of the next node of graph execution here.
201- pass
202- else :
203- assert_never (part )
159+ # Create the retry request using UserPromptPart for API compatibility
160+ # We'll use a special content marker to detect this is a thinking retry
161+ retry_part = _messages .UserPromptPart (f'[THINKING_RETRY] { retry_prompt } ' )
162+ return _messages .ModelRequest (parts = [retry_part ])
204163
205164
206165@dataclasses .dataclass
@@ -490,67 +449,69 @@ async def stream(
490449 async for _event in stream :
491450 pass
492451
493- async def _run_stream (
452+ async def _run_stream ( # noqa: C901
494453 self , ctx : GraphRunContext [GraphAgentState , GraphAgentDeps [DepsT , NodeRunEndT ]]
495454 ) -> AsyncIterator [_messages .HandleResponseEvent ]:
496455 if self ._events_iterator is None :
497456 # Ensure that the stream is only run once
498- self ._events_iterator = self ._create_stream_iterator (ctx )
457+ async def _run_stream () -> AsyncIterator [_messages .HandleResponseEvent ]: # noqa: C901
458+ texts : list [str ] = []
459+ tool_calls : list [_messages .ToolCallPart ] = []
460+ for part in self .model_response .parts :
461+ if isinstance (part , _messages .TextPart ):
462+ # ignore empty content for text parts, see #437
463+ if part .content :
464+ texts .append (part .content )
465+ elif isinstance (part , _messages .ToolCallPart ):
466+ tool_calls .append (part )
467+ elif isinstance (part , _messages .BuiltinToolCallPart ):
468+ yield _messages .BuiltinToolCallEvent (part )
469+ elif isinstance (part , _messages .BuiltinToolReturnPart ):
470+ yield _messages .BuiltinToolResultEvent (part )
471+ elif isinstance (part , _messages .ThinkingPart ):
472+ # We don't need to do anything with thinking parts in this tool-calling node.
473+ # We need to handle text parts in case there are no tool calls and/or the desired output comes
474+ # from the text, but thinking parts should not directly influence the execution of tools or
475+ # determination of the next node of graph execution here.
476+ pass
477+ else :
478+ assert_never (part )
479+
480+ # At the moment, we prioritize at least executing tool calls if they are present.
481+ # In the future, we'd consider making this configurable at the agent or run level.
482+ # This accounts for cases like anthropic returns that might contain a text response
483+ # and a tool call response, where the text response just indicates the tool call will happen.
484+ if tool_calls :
485+ async for event in self ._handle_tool_calls (ctx , tool_calls ):
486+ yield event
487+ elif texts :
488+ # No events are emitted during the handling of text responses, so we don't need to yield anything
489+ self ._next_node = await self ._handle_text_response (ctx , texts )
490+ else :
491+ # we've got an empty response, this sometimes happens with anthropic (and perhaps other models)
492+ # when the model has already returned text along side tool calls
493+ # in this scenario, if text responses are allowed, we return text from the most recent model
494+ # response, if any
495+ if isinstance (ctx .deps .output_schema , _output .TextOutputSchema ):
496+ for message in reversed (ctx .state .message_history ):
497+ if isinstance (message , _messages .ModelResponse ):
498+ last_texts = [p .content for p in message .parts if isinstance (p , _messages .TextPart )]
499+ if last_texts :
500+ self ._next_node = await self ._handle_text_response (ctx , last_texts )
501+ return
502+
503+ # If there are no preceding model responses, we prompt the model to try again and provide actionable output.
504+ if retry_request := _create_thinking_retry_request (self .model_response .parts ):
505+ self ._next_node = ModelRequestNode [DepsT , NodeRunEndT ](request = retry_request )
506+ return
507+
508+ raise exceptions .UnexpectedModelBehavior ('Received empty model response' )
509+
510+ self ._events_iterator = _run_stream ()
499511
500512 async for event in self ._events_iterator :
501513 yield event
502514
503- async def _create_stream_iterator (
504- self , ctx : GraphRunContext [GraphAgentState , GraphAgentDeps [DepsT , NodeRunEndT ]]
505- ) -> AsyncIterator [_messages .HandleResponseEvent ]:
506- texts : list [str ] = []
507- tool_calls : list [_messages .ToolCallPart ] = []
508-
509- # Process all parts in the model response
510- async for event in _process_response_parts (self .model_response .parts , texts , tool_calls ):
511- yield event
512-
513- # Handle the response based on what we found
514- if tool_calls :
515- async for event in self ._handle_tool_calls (ctx , tool_calls ):
516- yield event
517- elif texts :
518- # No events are emitted during the handling of text responses, so we don't need to yield anything
519- self ._next_node = await self ._handle_text_response (ctx , texts )
520- else :
521- self ._next_node = await self ._handle_empty_response (ctx )
522-
523- async def _handle_empty_response (
524- self , ctx : GraphRunContext [GraphAgentState , GraphAgentDeps [DepsT , NodeRunEndT ]]
525- ) -> ModelRequestNode [DepsT , NodeRunEndT ] | End [result .FinalResult [NodeRunEndT ]]:
526- # Handle thinking-only responses (responses that contain only ThinkingPart instances)
527- # This can happen with models that support thinking mode when they don't provide
528- # actionable output alongside their thinking content.
529- thinking_parts = [p for p in self .model_response .parts if isinstance (p , _messages .ThinkingPart )]
530-
531- if thinking_parts and not _is_retry_attempt (ctx ):
532- return await _create_thinking_retry (ctx )
533-
534- # Original recovery logic - this sometimes happens with anthropic (and perhaps other models)
535- # when the model has already returned text along side tool calls
536- # in this scenario, if text responses are allowed, we return text from the most recent model
537- # response, if any
538- if isinstance (ctx .deps .output_schema , _output .TextOutputSchema ):
539- if next_node := await self ._try_recover_from_history (ctx ):
540- return next_node
541-
542- raise exceptions .UnexpectedModelBehavior ('Received empty model response' )
543-
544- async def _try_recover_from_history (
545- self , ctx : GraphRunContext [GraphAgentState , GraphAgentDeps [DepsT , NodeRunEndT ]]
546- ) -> ModelRequestNode [DepsT , NodeRunEndT ] | End [result .FinalResult [NodeRunEndT ]] | None :
547- for message in reversed (ctx .state .message_history ):
548- if isinstance (message , _messages .ModelResponse ):
549- last_texts = [p .content for p in message .parts if isinstance (p , _messages .TextPart )]
550- if last_texts :
551- return await self ._handle_text_response (ctx , last_texts )
552- return None
553-
554515 async def _handle_tool_calls (
555516 self ,
556517 ctx : GraphRunContext [GraphAgentState , GraphAgentDeps [DepsT , NodeRunEndT ]],
0 commit comments