@@ -143,6 +143,66 @@ def is_agent_node(
143
143
return isinstance (node , AgentNode )
144
144
145
145
146
+ def _is_retry_attempt (ctx : GraphRunContext [GraphAgentState , GraphAgentDeps [DepsT , NodeRunEndT ]]) -> bool :
147
+ # Check if we've already attempted a thinking-only retry to prevent infinite loops
148
+ recent_messages = (
149
+ ctx .state .message_history [- 3 :] if len (ctx .state .message_history ) >= 3 else ctx .state .message_history
150
+ )
151
+ for msg in recent_messages :
152
+ if isinstance (msg , _messages .ModelRequest ):
153
+ for part in msg .parts :
154
+ if (
155
+ isinstance (part , _messages .UserPromptPart )
156
+ and isinstance (part .content , str )
157
+ and part .content .startswith ('[THINKING_RETRY]' )
158
+ ):
159
+ return True
160
+ return False
161
+
162
+
163
+ async def _create_thinking_retry (
164
+ ctx : GraphRunContext [GraphAgentState , GraphAgentDeps [DepsT , NodeRunEndT ]],
165
+ ) -> ModelRequestNode [DepsT , NodeRunEndT ]:
166
+ # Create retry prompt
167
+ retry_prompt = (
168
+ 'Based on your thinking above, you MUST now provide '
169
+ 'a specific answer or use the available tools to complete the task. '
170
+ 'Do not respond with only thinking content. Provide actionable output.'
171
+ )
172
+
173
+ # Create the retry request using UserPromptPart for API compatibility
174
+ # We'll use a special content marker to detect this is a thinking retry
175
+ retry_part = _messages .UserPromptPart (f'[THINKING_RETRY] { retry_prompt } ' )
176
+ retry_request = _messages .ModelRequest (parts = [retry_part ])
177
+
178
+ # Create new ModelRequestNode for retry (it will add the request to message history)
179
+ return ModelRequestNode [DepsT , NodeRunEndT ](request = retry_request )
180
+
181
+
182
+ async def _process_response_parts (
183
+ parts : list [_messages .ModelResponsePart ], texts : list [str ], tool_calls : list [_messages .ToolCallPart ]
184
+ ) -> AsyncIterator [_messages .HandleResponseEvent ]:
185
+ for part in parts :
186
+ if isinstance (part , _messages .TextPart ):
187
+ # ignore empty content for text parts, see #437
188
+ if part .content :
189
+ texts .append (part .content )
190
+ elif isinstance (part , _messages .ToolCallPart ):
191
+ tool_calls .append (part )
192
+ elif isinstance (part , _messages .BuiltinToolCallPart ):
193
+ yield _messages .BuiltinToolCallEvent (part )
194
+ elif isinstance (part , _messages .BuiltinToolReturnPart ):
195
+ yield _messages .BuiltinToolResultEvent (part )
196
+ elif isinstance (part , _messages .ThinkingPart ):
197
+ # We don't need to do anything with thinking parts in this tool-calling node.
198
+ # We need to handle text parts in case there are no tool calls and/or the desired output comes
199
+ # from the text, but thinking parts should not directly influence the execution of tools or
200
+ # determination of the next node of graph execution here.
201
+ pass
202
+ else :
203
+ assert_never (part )
204
+
205
+
146
206
@dataclasses .dataclass
147
207
class UserPromptNode (AgentNode [DepsT , NodeRunEndT ]):
148
208
"""The node that handles the user prompt and instructions."""
@@ -430,65 +490,67 @@ async def stream(
430
490
async for _event in stream :
431
491
pass
432
492
433
- async def _run_stream ( # noqa: C901
493
+ async def _run_stream (
434
494
self , ctx : GraphRunContext [GraphAgentState , GraphAgentDeps [DepsT , NodeRunEndT ]]
435
495
) -> AsyncIterator [_messages .HandleResponseEvent ]:
436
496
if self ._events_iterator is None :
437
497
# Ensure that the stream is only run once
438
-
439
- async def _run_stream () -> AsyncIterator [_messages .HandleResponseEvent ]:
440
- texts : list [str ] = []
441
- tool_calls : list [_messages .ToolCallPart ] = []
442
- for part in self .model_response .parts :
443
- if isinstance (part , _messages .TextPart ):
444
- # ignore empty content for text parts, see #437
445
- if part .content :
446
- texts .append (part .content )
447
- elif isinstance (part , _messages .ToolCallPart ):
448
- tool_calls .append (part )
449
- elif isinstance (part , _messages .BuiltinToolCallPart ):
450
- yield _messages .BuiltinToolCallEvent (part )
451
- elif isinstance (part , _messages .BuiltinToolReturnPart ):
452
- yield _messages .BuiltinToolResultEvent (part )
453
- elif isinstance (part , _messages .ThinkingPart ):
454
- # We don't need to do anything with thinking parts in this tool-calling node.
455
- # We need to handle text parts in case there are no tool calls and/or the desired output comes
456
- # from the text, but thinking parts should not directly influence the execution of tools or
457
- # determination of the next node of graph execution here.
458
- pass
459
- else :
460
- assert_never (part )
461
-
462
- # At the moment, we prioritize at least executing tool calls if they are present.
463
- # In the future, we'd consider making this configurable at the agent or run level.
464
- # This accounts for cases like anthropic returns that might contain a text response
465
- # and a tool call response, where the text response just indicates the tool call will happen.
466
- if tool_calls :
467
- async for event in self ._handle_tool_calls (ctx , tool_calls ):
468
- yield event
469
- elif texts :
470
- # No events are emitted during the handling of text responses, so we don't need to yield anything
471
- self ._next_node = await self ._handle_text_response (ctx , texts )
472
- else :
473
- # we've got an empty response, this sometimes happens with anthropic (and perhaps other models)
474
- # when the model has already returned text along side tool calls
475
- # in this scenario, if text responses are allowed, we return text from the most recent model
476
- # response, if any
477
- if isinstance (ctx .deps .output_schema , _output .TextOutputSchema ):
478
- for message in reversed (ctx .state .message_history ):
479
- if isinstance (message , _messages .ModelResponse ):
480
- last_texts = [p .content for p in message .parts if isinstance (p , _messages .TextPart )]
481
- if last_texts :
482
- self ._next_node = await self ._handle_text_response (ctx , last_texts )
483
- return
484
-
485
- raise exceptions .UnexpectedModelBehavior ('Received empty model response' )
486
-
487
- self ._events_iterator = _run_stream ()
498
+ self ._events_iterator = self ._create_stream_iterator (ctx )
488
499
489
500
async for event in self ._events_iterator :
490
501
yield event
491
502
503
+ async def _create_stream_iterator (
504
+ self , ctx : GraphRunContext [GraphAgentState , GraphAgentDeps [DepsT , NodeRunEndT ]]
505
+ ) -> AsyncIterator [_messages .HandleResponseEvent ]:
506
+ texts : list [str ] = []
507
+ tool_calls : list [_messages .ToolCallPart ] = []
508
+
509
+ # Process all parts in the model response
510
+ async for event in _process_response_parts (self .model_response .parts , texts , tool_calls ):
511
+ yield event
512
+
513
+ # Handle the response based on what we found
514
+ if tool_calls :
515
+ async for event in self ._handle_tool_calls (ctx , tool_calls ):
516
+ yield event
517
+ elif texts :
518
+ # No events are emitted during the handling of text responses, so we don't need to yield anything
519
+ self ._next_node = await self ._handle_text_response (ctx , texts )
520
+ else :
521
+ self ._next_node = await self ._handle_empty_response (ctx )
522
+
523
+ async def _handle_empty_response (
524
+ self , ctx : GraphRunContext [GraphAgentState , GraphAgentDeps [DepsT , NodeRunEndT ]]
525
+ ) -> ModelRequestNode [DepsT , NodeRunEndT ] | End [result .FinalResult [NodeRunEndT ]]:
526
+ # Handle thinking-only responses (responses that contain only ThinkingPart instances)
527
+ # This can happen with models that support thinking mode when they don't provide
528
+ # actionable output alongside their thinking content.
529
+ thinking_parts = [p for p in self .model_response .parts if isinstance (p , _messages .ThinkingPart )]
530
+
531
+ if thinking_parts and not _is_retry_attempt (ctx ):
532
+ return await _create_thinking_retry (ctx )
533
+
534
+ # Original recovery logic - this sometimes happens with anthropic (and perhaps other models)
535
+ # when the model has already returned text along side tool calls
536
+ # in this scenario, if text responses are allowed, we return text from the most recent model
537
+ # response, if any
538
+ if isinstance (ctx .deps .output_schema , _output .TextOutputSchema ):
539
+ if next_node := await self ._try_recover_from_history (ctx ):
540
+ return next_node
541
+
542
+ raise exceptions .UnexpectedModelBehavior ('Received empty model response' )
543
+
544
+ async def _try_recover_from_history (
545
+ self , ctx : GraphRunContext [GraphAgentState , GraphAgentDeps [DepsT , NodeRunEndT ]]
546
+ ) -> ModelRequestNode [DepsT , NodeRunEndT ] | End [result .FinalResult [NodeRunEndT ]] | None :
547
+ for message in reversed (ctx .state .message_history ):
548
+ if isinstance (message , _messages .ModelResponse ):
549
+ last_texts = [p .content for p in message .parts if isinstance (p , _messages .TextPart )]
550
+ if last_texts :
551
+ return await self ._handle_text_response (ctx , last_texts )
552
+ return None
553
+
492
554
async def _handle_tool_calls (
493
555
self ,
494
556
ctx : GraphRunContext [GraphAgentState , GraphAgentDeps [DepsT , NodeRunEndT ]],
0 commit comments