11use std:: collections:: { HashMap , HashSet } ;
22use std:: sync:: Arc ;
3+ use std:: time:: Duration ;
34
45use futures:: StreamExt as _;
56use futures:: stream:: FuturesUnordered ;
@@ -8,14 +9,69 @@ use rostra_core::event::VerifiedEvent;
89use rostra_core:: id:: { RostraId , ToShort as _} ;
910use rostra_p2p:: Connection ;
1011use rostra_util_error:: FmtCompact as _;
11- use tokio:: sync:: watch;
12+ use tokio:: sync:: { RwLock , watch} ;
13+ use tokio:: time:: Instant ;
1214use tracing:: { debug, info, instrument, trace, warn} ;
1315
14- use crate :: client:: Client ;
16+ use crate :: client:: { Client , INITIAL_BACKOFF_DURATION , MAX_BACKOFF_DURATION } ;
1517use crate :: connection_cache:: ConnectionCache ;
1618
1719const LOG_TARGET : & str = "rostra::poll_follower_heads" ;
1820
21+ /// Per-peer backoff state for polling.
22+ #[ derive( Debug , Clone , Default ) ]
23+ struct PeerBackoffState {
24+ /// Number of consecutive failures
25+ consecutive_failures : u32 ,
26+ /// Time until which we should not attempt to poll
27+ backoff_until : Option < Instant > ,
28+ }
29+
30+ impl PeerBackoffState {
31+ /// Calculate the backoff duration based on consecutive failures.
32+ fn calculate_backoff_duration ( & self ) -> Duration {
33+ if self . consecutive_failures == 0 {
34+ return Duration :: ZERO ;
35+ }
36+ let shift = self . consecutive_failures . saturating_sub ( 1 ) . min ( 63 ) ;
37+ let multiplier = 1u64 << shift;
38+ let backoff_secs = INITIAL_BACKOFF_DURATION
39+ . as_secs ( )
40+ . saturating_mul ( multiplier) ;
41+ Duration :: from_secs ( backoff_secs) . min ( MAX_BACKOFF_DURATION )
42+ }
43+
44+ /// Check if we should skip polling due to backoff.
45+ fn is_in_backoff ( & self ) -> bool {
46+ self . backoff_until
47+ . map ( |until| Instant :: now ( ) < until)
48+ . unwrap_or ( false )
49+ }
50+
51+ /// Get remaining backoff duration, if any.
52+ fn backoff_remaining ( & self ) -> Option < Duration > {
53+ let until = self . backoff_until ?;
54+ let now = Instant :: now ( ) ;
55+ if now < until { Some ( until - now) } else { None }
56+ }
57+
58+ /// Record a successful poll, resetting backoff state.
59+ fn record_success ( & mut self ) {
60+ self . consecutive_failures = 0 ;
61+ self . backoff_until = None ;
62+ }
63+
64+ /// Record a failed poll, updating backoff state.
65+ fn record_failure ( & mut self ) {
66+ self . consecutive_failures = self . consecutive_failures . saturating_add ( 1 ) ;
67+ let backoff_duration = self . calculate_backoff_duration ( ) ;
68+ self . backoff_until = Some ( Instant :: now ( ) + backoff_duration) ;
69+ }
70+ }
71+
72+ /// Shared backoff state for all peers.
73+ type SharedBackoffState = Arc < RwLock < HashMap < RostraId , PeerBackoffState > > > ;
74+
1975/// Polls followers for new head updates using the WAIT_FOLLOWERS_NEW_HEADS RPC.
2076///
2177/// This task maintains connections to self and direct followers, polling each
@@ -49,6 +105,8 @@ impl PollFollowerHeadUpdates {
49105 let mut active_peers: HashSet < RostraId > = HashSet :: new ( ) ;
50106 // FuturesUnordered to manage concurrent polling tasks
51107 let mut poll_futures: FuturesUnordered < _ > = FuturesUnordered :: new ( ) ;
108+ // Shared backoff state for all peers
109+ let backoff_state: SharedBackoffState = Arc :: new ( RwLock :: new ( HashMap :: new ( ) ) ) ;
52110
53111 // Start with self
54112 active_peers. insert ( self . self_id ) ;
@@ -81,10 +139,19 @@ impl PollFollowerHeadUpdates {
81139 let db = self . db . clone ( ) ;
82140 let self_id = self . self_id ;
83141 let wot_rx = self . self_wot_rx . clone ( ) ;
142+ let backoff = backoff_state. clone ( ) ;
84143
85144 poll_futures. push ( async move {
86- Self :: poll_peer_for_heads ( client, connections, db, self_id, peer_id, wot_rx)
87- . await ;
145+ Self :: poll_peer_for_heads (
146+ client,
147+ connections,
148+ db,
149+ self_id,
150+ peer_id,
151+ wot_rx,
152+ backoff,
153+ )
154+ . await ;
88155 peer_id
89156 } ) ;
90157 }
@@ -125,8 +192,30 @@ impl PollFollowerHeadUpdates {
125192 self_id : RostraId ,
126193 peer_id : RostraId ,
127194 wot_rx : watch:: Receiver < Arc < WotData > > ,
195+ backoff_state : SharedBackoffState ,
128196 ) {
129197 loop {
198+ // Check if we're in backoff for this peer
199+ {
200+ let state = backoff_state. read ( ) . await ;
201+ if let Some ( peer_state) = state. get ( & peer_id) {
202+ if peer_state. is_in_backoff ( ) {
203+ if let Some ( remaining) = peer_state. backoff_remaining ( ) {
204+ trace ! (
205+ target: LOG_TARGET ,
206+ peer_id = %peer_id. to_short( ) ,
207+ remaining_secs = remaining. as_secs( ) ,
208+ "Peer is in backoff, waiting"
209+ ) ;
210+ // Sleep for the remaining backoff duration
211+ drop ( state) ; // Release lock before sleeping
212+ tokio:: time:: sleep ( remaining) . await ;
213+ continue ;
214+ }
215+ }
216+ }
217+ }
218+
130219 let Ok ( client_ref) = client. client_ref ( ) else {
131220 break ;
132221 } ;
@@ -140,15 +229,33 @@ impl PollFollowerHeadUpdates {
140229 err = %err. fmt_compact( ) ,
141230 "Could not connect to peer for polling"
142231 ) ;
143- // Wait a bit before retrying
144- tokio:: time:: sleep ( std:: time:: Duration :: from_secs ( 30 ) ) . await ;
232+ // Record failure and apply backoff
233+ {
234+ let mut state = backoff_state. write ( ) . await ;
235+ let peer_state = state. entry ( peer_id) . or_default ( ) ;
236+ peer_state. record_failure ( ) ;
237+ debug ! (
238+ target: LOG_TARGET ,
239+ peer_id = %peer_id. to_short( ) ,
240+ consecutive_failures = peer_state. consecutive_failures,
241+ backoff_secs = peer_state. calculate_backoff_duration( ) . as_secs( ) ,
242+ "Connection failed, applying backoff"
243+ ) ;
244+ }
145245 continue ;
146246 }
147247 } ;
148248
149249 match Self :: poll_once ( & conn, & db, self_id, & wot_rx) . await {
150250 Ok ( ( ) ) => {
151251 trace ! ( target: LOG_TARGET , %peer_id, "Successfully polled peer" ) ;
252+ // Reset backoff on success
253+ {
254+ let mut state = backoff_state. write ( ) . await ;
255+ if let Some ( peer_state) = state. get_mut ( & peer_id) {
256+ peer_state. record_success ( ) ;
257+ }
258+ }
152259 }
153260 Err ( err) => {
154261 debug ! (
@@ -157,6 +264,19 @@ impl PollFollowerHeadUpdates {
157264 err = %err,
158265 "Error polling peer"
159266 ) ;
267+ // Record failure and apply backoff
268+ {
269+ let mut state = backoff_state. write ( ) . await ;
270+ let peer_state = state. entry ( peer_id) . or_default ( ) ;
271+ peer_state. record_failure ( ) ;
272+ debug ! (
273+ target: LOG_TARGET ,
274+ peer_id = %peer_id. to_short( ) ,
275+ consecutive_failures = peer_state. consecutive_failures,
276+ backoff_secs = peer_state. calculate_backoff_duration( ) . as_secs( ) ,
277+ "Poll failed, applying backoff"
278+ ) ;
279+ }
160280 // On error, break and let the outer loop restart
161281 break ;
162282 }
0 commit comments