@@ -143,66 +143,18 @@ def is_agent_node(
143
143
return isinstance (node , AgentNode )
144
144
145
145
146
- def _is_retry_attempt (ctx : GraphRunContext [GraphAgentState , GraphAgentDeps [DepsT , NodeRunEndT ]]) -> bool :
147
- # Check if we've already attempted a thinking-only retry to prevent infinite loops
148
- recent_messages = (
149
- ctx .state .message_history [- 3 :] if len (ctx .state .message_history ) >= 3 else ctx .state .message_history
150
- )
151
- for msg in recent_messages :
152
- if isinstance (msg , _messages .ModelRequest ):
153
- for part in msg .parts :
154
- if (
155
- isinstance (part , _messages .UserPromptPart )
156
- and isinstance (part .content , str )
157
- and part .content .startswith ('[THINKING_RETRY]' )
158
- ):
159
- return True
160
- return False
161
-
162
-
163
146
async def _create_thinking_retry (
164
147
ctx : GraphRunContext [GraphAgentState , GraphAgentDeps [DepsT , NodeRunEndT ]],
165
148
) -> ModelRequestNode [DepsT , NodeRunEndT ]:
166
149
# Create retry prompt
167
- retry_prompt = (
168
- 'Based on your thinking above, you MUST now provide '
169
- 'a specific answer or use the available tools to complete the task. '
170
- 'Do not respond with only thinking content. Provide actionable output.'
171
- )
172
-
173
- # Create the retry request using UserPromptPart for API compatibility
174
- # We'll use a special content marker to detect this is a thinking retry
175
- retry_part = _messages .UserPromptPart (f'[THINKING_RETRY] { retry_prompt } ' )
150
+ retry_prompt = 'Responses without text or tool calls are not permitted.'
151
+ retry_part = _messages .RetryPromptPart (retry_prompt )
176
152
retry_request = _messages .ModelRequest (parts = [retry_part ])
177
153
178
154
# Create new ModelRequestNode for retry (it will add the request to message history)
179
155
return ModelRequestNode [DepsT , NodeRunEndT ](request = retry_request )
180
156
181
157
182
- async def _process_response_parts (
183
- parts : list [_messages .ModelResponsePart ], texts : list [str ], tool_calls : list [_messages .ToolCallPart ]
184
- ) -> AsyncIterator [_messages .HandleResponseEvent ]:
185
- for part in parts :
186
- if isinstance (part , _messages .TextPart ):
187
- # ignore empty content for text parts, see #437
188
- if part .content :
189
- texts .append (part .content )
190
- elif isinstance (part , _messages .ToolCallPart ):
191
- tool_calls .append (part )
192
- elif isinstance (part , _messages .BuiltinToolCallPart ):
193
- yield _messages .BuiltinToolCallEvent (part )
194
- elif isinstance (part , _messages .BuiltinToolReturnPart ):
195
- yield _messages .BuiltinToolResultEvent (part )
196
- elif isinstance (part , _messages .ThinkingPart ):
197
- # We don't need to do anything with thinking parts in this tool-calling node.
198
- # We need to handle text parts in case there are no tool calls and/or the desired output comes
199
- # from the text, but thinking parts should not directly influence the execution of tools or
200
- # determination of the next node of graph execution here.
201
- pass
202
- else :
203
- assert_never (part )
204
-
205
-
206
158
@dataclasses .dataclass
207
159
class UserPromptNode (AgentNode [DepsT , NodeRunEndT ]):
208
160
"""The node that handles the user prompt and instructions."""
@@ -488,67 +440,77 @@ async def stream(
488
440
async for _event in stream :
489
441
pass
490
442
491
- async def _run_stream (
443
+ async def _run_stream ( # noqa: C901
492
444
self , ctx : GraphRunContext [GraphAgentState , GraphAgentDeps [DepsT , NodeRunEndT ]]
493
445
) -> AsyncIterator [_messages .HandleResponseEvent ]:
494
446
if self ._events_iterator is None :
495
447
# Ensure that the stream is only run once
496
- self ._events_iterator = self ._create_stream_iterator (ctx )
497
448
498
- async for event in self ._events_iterator :
499
- yield event
500
-
501
- async def _create_stream_iterator (
502
- self , ctx : GraphRunContext [GraphAgentState , GraphAgentDeps [DepsT , NodeRunEndT ]]
503
- ) -> AsyncIterator [_messages .HandleResponseEvent ]:
504
- texts : list [str ] = []
505
- tool_calls : list [_messages .ToolCallPart ] = []
449
+ async def _run_stream () -> AsyncIterator [_messages .HandleResponseEvent ]: # noqa: C901
450
+ texts : list [str ] = []
451
+ tool_calls : list [_messages .ToolCallPart ] = []
452
+
453
+ for part in self .model_response .parts :
454
+ if isinstance (part , _messages .TextPart ):
455
+ # ignore empty content for text parts, see #437
456
+ if part .content :
457
+ texts .append (part .content )
458
+ elif isinstance (part , _messages .ToolCallPart ):
459
+ tool_calls .append (part )
460
+ elif isinstance (part , _messages .BuiltinToolCallPart ):
461
+ yield _messages .BuiltinToolCallEvent (part )
462
+ elif isinstance (part , _messages .BuiltinToolReturnPart ):
463
+ yield _messages .BuiltinToolResultEvent (part )
464
+ elif isinstance (part , _messages .ThinkingPart ):
465
+ # We don't need to do anything with thinking parts in this tool-calling node.
466
+ # We need to handle text parts in case there are no tool calls and/or the desired output comes
467
+ # from the text, but thinking parts should not directly influence the execution of tools or
468
+ # determination of the next node of graph execution here.
469
+ pass
470
+ else :
471
+ assert_never (part )
472
+
473
+ # At the moment, we prioritize at least executing tool calls if they are present.
474
+ # In the future, we'd consider making this configurable at the agent or run level.
475
+ # This accounts for cases like anthropic returns that might contain a text response
476
+ # and a tool call response, where the text response just indicates the tool call will happen.
477
+ if tool_calls :
478
+ async for event in self ._handle_tool_calls (ctx , tool_calls ):
479
+ yield event
480
+ elif texts :
481
+ # No events are emitted during the handling of text responses, so we don't need to yield anything
482
+ self ._next_node = await self ._handle_text_response (ctx , texts )
483
+ else :
484
+ # we've got an empty response
485
+
486
+ thinking_parts = [p for p in self .model_response .parts if isinstance (p , _messages .ThinkingPart )]
487
+
488
+ if thinking_parts :
489
+ # handle thinking-only responses (responses that contain only ThinkingPart instances)
490
+ # this can happen with models that support thinking mode when they don't provide
491
+ # actionable output alongside their thinking content.
492
+ self ._next_node = await _create_thinking_retry (ctx )
493
+ else :
494
+ # handle empty response with no thinking
495
+ # this sometimes happens with anthropic (and perhaps other models)
496
+ # when the model has already returned text along side tool calls
497
+ # in this scenario, if text responses are allowed, we return text from the most recent model
498
+ # response, if any
499
+ if isinstance (ctx .deps .output_schema , _output .TextOutputSchema ):
500
+ for message in reversed (ctx .state .message_history ):
501
+ if isinstance (message , _messages .ModelResponse ):
502
+ last_texts = [p .content for p in message .parts if isinstance (p , _messages .TextPart )]
503
+ if last_texts :
504
+ self ._next_node = await self ._handle_text_response (ctx , last_texts )
505
+ return
506
+
507
+ raise exceptions .UnexpectedModelBehavior ('Received empty model response' )
508
+
509
+ self ._events_iterator = _run_stream ()
506
510
507
- # Process all parts in the model response
508
- async for event in _process_response_parts (self .model_response .parts , texts , tool_calls ):
511
+ async for event in self ._events_iterator :
509
512
yield event
510
513
511
- # Handle the response based on what we found
512
- if tool_calls :
513
- async for event in self ._handle_tool_calls (ctx , tool_calls ):
514
- yield event
515
- elif texts :
516
- # No events are emitted during the handling of text responses, so we don't need to yield anything
517
- self ._next_node = await self ._handle_text_response (ctx , texts )
518
- else :
519
- self ._next_node = await self ._handle_empty_response (ctx )
520
-
521
- async def _handle_empty_response (
522
- self , ctx : GraphRunContext [GraphAgentState , GraphAgentDeps [DepsT , NodeRunEndT ]]
523
- ) -> ModelRequestNode [DepsT , NodeRunEndT ] | End [result .FinalResult [NodeRunEndT ]]:
524
- # Handle thinking-only responses (responses that contain only ThinkingPart instances)
525
- # This can happen with models that support thinking mode when they don't provide
526
- # actionable output alongside their thinking content.
527
- thinking_parts = [p for p in self .model_response .parts if isinstance (p , _messages .ThinkingPart )]
528
-
529
- if thinking_parts and not _is_retry_attempt (ctx ):
530
- return await _create_thinking_retry (ctx )
531
-
532
- # Original recovery logic - this sometimes happens with anthropic (and perhaps other models)
533
- # when the model has already returned text along side tool calls
534
- # in this scenario, if text responses are allowed, we return text from the most recent model
535
- # response, if any
536
- if isinstance (ctx .deps .output_schema , _output .TextOutputSchema ):
537
- if next_node := await self ._try_recover_from_history (ctx ):
538
- return next_node
539
-
540
- raise exceptions .UnexpectedModelBehavior ('Received empty model response' )
541
-
542
- async def _try_recover_from_history (
543
- self , ctx : GraphRunContext [GraphAgentState , GraphAgentDeps [DepsT , NodeRunEndT ]]
544
- ) -> ModelRequestNode [DepsT , NodeRunEndT ] | End [result .FinalResult [NodeRunEndT ]] | None :
545
- for message in reversed (ctx .state .message_history ):
546
- if isinstance (message , _messages .ModelResponse ):
547
- last_texts = [p .content for p in message .parts if isinstance (p , _messages .TextPart )]
548
- if last_texts :
549
- return await self ._handle_text_response (ctx , last_texts )
550
- return None
551
-
552
514
async def _handle_tool_calls (
553
515
self ,
554
516
ctx : GraphRunContext [GraphAgentState , GraphAgentDeps [DepsT , NodeRunEndT ]],
0 commit comments