11use futures:: { FutureExt , Stream } ;
2- use std:: { pin:: Pin , task:: Poll , time:: Duration } ;
2+ use std:: {
3+ pin:: Pin ,
4+ task:: { Context , Poll } ,
5+ time:: Duration ,
6+ } ;
37use tokio:: time:: sleep;
48
59/// Helper trait alias for backoff streams.
610/// We define any stream that yields `Duration`s as a backoff
711pub trait Backoff : Stream < Item = Duration > + Unpin { }
812
9- /// Blanket implementation of `Backoff` for any stream that yields `Duration`s.
13+ // Blanket implementation of `Backoff` for any stream that yields `Duration`s.
1014impl < T > Backoff for T where T : Stream < Item = Duration > + Unpin { }
1115
1216/// A stream that yields exponentially increasing backoff durations.
@@ -23,6 +27,7 @@ pub struct ExponentialBackoff {
2327}
2428
2529impl ExponentialBackoff {
30+ /// Creates a new exponential backoff stream with the given initial duration and max retries.
2631 pub fn new ( initial : Duration , max_retries : usize ) -> Self {
2732 Self { retry_count : 0 , max_retries, backoff : initial, timeout : None }
2833 }
@@ -38,38 +43,36 @@ impl Stream for ExponentialBackoff {
3843
3944 /// Polls the exponential backoff stream. Returns `Poll::Ready` with the current backoff
4045 /// duration if the backoff timeout has elapsed, otherwise returns `Poll::Pending`.
41- fn poll_next (
42- self : Pin < & mut Self > ,
43- cx : & mut std:: task:: Context < ' _ > ,
44- ) -> Poll < Option < Self :: Item > > {
46+ fn poll_next ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
4547 let this = self . get_mut ( ) ;
4648
4749 loop {
48- if let Some ( ref mut timeout) = this. timeout {
49- if timeout. poll_unpin ( cx) . is_ready ( ) {
50- // Timeout has elapsed, so reset the timeout and double the backoff
51- this. backoff *= 2 ;
52- this. retry_count += 1 ;
50+ let Some ( ref mut timeout) = this. timeout else {
51+ // Set the initial timeout
52+ this. reset_timeout ( ) ;
53+ continue ;
54+ } ;
55+
56+ if timeout. poll_unpin ( cx) . is_ready ( ) {
57+ // Timeout has elapsed, so reset the timeout and double the backoff
58+ this. backoff *= 2 ;
59+ this. retry_count += 1 ;
5360
54- // Close the stream
55- if this. retry_count >= this. max_retries {
56- return Poll :: Ready ( None ) ;
57- }
61+ // Close the stream
62+ if this. retry_count >= this. max_retries {
63+ return Poll :: Ready ( None ) ;
64+ }
5865
59- this. reset_timeout ( ) ;
66+ this. reset_timeout ( ) ;
6067
61- // Wake up the task to poll the timeout again
62- cx. waker ( ) . wake_by_ref ( ) ;
68+ // Wake up the task to poll the timeout again
69+ cx. waker ( ) . wake_by_ref ( ) ;
6370
64- // Return the current backoff duration
65- return Poll :: Ready ( Some ( this. backoff ) ) ;
66- } else {
67- // Timeout has not elapsed, so return pending
68- return Poll :: Pending ;
69- }
71+ // Return the current backoff duration
72+ return Poll :: Ready ( Some ( this. backoff ) ) ;
7073 } else {
71- // Set initial timeout
72- this . reset_timeout ( ) ;
74+ // Timeout has not elapsed, so return pending
75+ return Poll :: Pending ;
7376 }
7477 }
7578 }
0 commit comments