11use adk_core:: { Agent , Event , AdkError , InvocationContext } ;
22use async_trait:: async_trait;
33use std:: sync:: Arc ;
4+ use std:: sync:: atomic:: { AtomicU32 , Ordering } ;
45use std:: pin:: Pin ;
56use std:: task:: { Context as TaskContext , Poll } ;
67use futures:: { Stream , Future } ;
78use dialoguer:: { Select , Input , theme:: ColorfulTheme } ;
89
10+ type AgentOutput = Pin < Box < dyn Stream < Item = Result < Event , AdkError > > + Send > > ;
11+
912pub struct ResilientAgent {
1013 inner : Arc < dyn Agent > ,
1114 subs : Vec < Arc < dyn Agent > > ,
15+ retry_count : Arc < AtomicU32 > ,
1216}
1317
1418impl ResilientAgent {
19+ const MAX_RETRY_ATTEMPTS : u32 = 3 ;
20+
1521 pub fn new ( inner : Arc < dyn Agent > ) -> Self {
1622 Self {
1723 inner : inner. clone ( ) ,
1824 subs : vec ! [ inner] ,
25+ retry_count : Arc :: new ( AtomicU32 :: new ( 0 ) ) ,
1926 }
2027 }
28+
29+ // Helper for immediate errors (recursion in async fn)
30+ async fn handle_error ( & self , context : Arc < dyn InvocationContext > , e : AdkError ) -> Result < AgentOutput , AdkError > {
31+ let current_retry = self . retry_count . fetch_add ( 1 , Ordering :: SeqCst ) ;
32+
33+ // Check if max retry attempts reached
34+ if current_retry >= Self :: MAX_RETRY_ATTEMPTS {
35+ println ! ( "\n ❌ Maximum retry attempts ({}) reached." , Self :: MAX_RETRY_ATTEMPTS ) ;
36+ self . retry_count . store ( 0 , Ordering :: SeqCst ) ; // Reset counter
37+ return Err ( AdkError :: Tool ( format ! (
38+ "Agent '{}' failed after {} retry attempts" ,
39+ self . name( ) ,
40+ Self :: MAX_RETRY_ATTEMPTS
41+ ) ) ) ;
42+ }
43+
44+ println ! ( "\n ⚠️ Agent '{}' encountered error: {}" , self . name( ) , e) ;
45+ println ! ( "The agent loop limit has been exceeded." ) ;
46+ println ! ( "Retry attempt {}/{}" , current_retry + 1 , Self :: MAX_RETRY_ATTEMPTS ) ;
47+
48+ let selections = & [ "Retry (reset counter)" , "Provide Guidance & Retry" , "Abort" ] ;
49+ let selection = Select :: with_theme ( & ColorfulTheme :: default ( ) )
50+ . with_prompt ( "How would you like to proceed?" )
51+ . default ( 0 )
52+ . items ( & selections[ ..] )
53+ . interact ( )
54+ . unwrap_or ( 2 ) ;
55+
56+ match selection {
57+ 0 => {
58+ println ! ( "🔄 Retrying agent execution..." ) ;
59+ return self . run ( context) . await ;
60+ } ,
61+ 1 => {
62+ let input: String = Input :: with_theme ( & ColorfulTheme :: default ( ) )
63+ . with_prompt ( "Please provide guidance for the agent" )
64+ . interact_text ( )
65+ . unwrap_or_default ( ) ;
66+
67+ if !input. is_empty ( ) {
68+ println ! ( "(Note: User guidance provided: '{}' - but context injection is not implemented. Retrying anyway.)" , input) ;
69+ }
70+ println ! ( "🔄 Retrying with new guidance..." ) ;
71+ return self . run ( context) . await ;
72+ } ,
73+ _ => {
74+ self . retry_count . store ( 0 , Ordering :: SeqCst ) ; // Reset counter on abort
75+ return Err ( e) ;
76+ }
77+ }
78+ }
2179}
2280
23- type AgentOutput = Pin < Box < dyn Stream < Item = Result < Event , AdkError > > + Send > > ;
24-
2581#[ async_trait]
2682impl Agent for ResilientAgent {
2783 fn name ( & self ) -> & str {
@@ -40,11 +96,14 @@ impl Agent for ResilientAgent {
4096 // Initial run
4197 match self . inner . run ( context. clone ( ) ) . await {
4298 Ok ( stream) => {
99+ // Success - reset retry counter
100+ self . retry_count . store ( 0 , Ordering :: SeqCst ) ;
43101 // Wrap the stream to handle errors during iteration
44102 Ok ( Box :: pin ( ResilientStream :: new (
45103 self . inner . clone ( ) ,
46104 context,
47- stream
105+ stream,
106+ self . retry_count . clone ( ) ,
48107 ) ) )
49108 } ,
50109 Err ( e) => {
@@ -61,42 +120,6 @@ impl Agent for ResilientAgent {
61120 }
62121}
63122
64- impl ResilientAgent {
65- // Helper for immediate errors (recursion in async fn)
66- async fn handle_error ( & self , context : Arc < dyn InvocationContext > , e : AdkError ) -> Result < AgentOutput , AdkError > {
67- println ! ( "\n ⚠️ Agent '{}' encountered error: {}" , self . name( ) , e) ;
68- println ! ( "The agent loop limit has been exceeded." ) ;
69-
70- let selections = & [ "Retry (reset counter)" , "Provide Guidance & Retry" , "Abort" ] ;
71- let selection = Select :: with_theme ( & ColorfulTheme :: default ( ) )
72- . with_prompt ( "How would you like to proceed?" )
73- . default ( 0 )
74- . items ( & selections[ ..] )
75- . interact ( )
76- . unwrap_or ( 2 ) ;
77-
78- match selection {
79- 0 => {
80- println ! ( "🔄 Retrying agent execution..." ) ;
81- return self . run ( context) . await ;
82- } ,
83- 1 => {
84- let input: String = Input :: with_theme ( & ColorfulTheme :: default ( ) )
85- . with_prompt ( "Please provide guidance for the agent" )
86- . interact_text ( )
87- . unwrap_or_default ( ) ;
88-
89- if !input. is_empty ( ) {
90- println ! ( "(Note: User guidance provided: '{}' - but context injection is not implemented. Retrying anyway.)" , input) ;
91- }
92- println ! ( "🔄 Retrying with new guidance..." ) ;
93- return self . run ( context) . await ;
94- } ,
95- _ => return Err ( e) ,
96- }
97- }
98- }
99-
100123// ============================================================================
101124// ResilientStream Implementation
102125// ============================================================================
@@ -111,20 +134,23 @@ struct ResilientStream {
111134 context : Arc < dyn InvocationContext > ,
112135 state : StreamState ,
113136 agent_name : String , // Cached for logging
137+ retry_count : Arc < AtomicU32 > ,
114138}
115139
116140impl ResilientStream {
117141 fn new (
118142 inner_agent : Arc < dyn Agent > ,
119143 context : Arc < dyn InvocationContext > ,
120144 stream : AgentOutput ,
145+ retry_count : Arc < AtomicU32 > ,
121146 ) -> Self {
122147 let agent_name = inner_agent. name ( ) . to_string ( ) ;
123148 Self {
124149 inner_agent,
125150 context,
126151 state : StreamState :: Streaming ( stream) ,
127152 agent_name,
153+ retry_count,
128154 }
129155 }
130156
0 commit comments