1+ use std:: sync:: Arc ;
12use futures_util:: { future, FutureExt } ;
23use http:: header:: AsHeaderName ;
3- use http:: { HeaderMap , HeaderValue , Request , Response } ;
4+ use http:: { HeaderMap , HeaderValue , Request , Response , Uri } ;
45use hyper_util:: client:: legacy:: Error ;
56use std:: time:: Duration ;
67use tower:: retry:: Policy ;
@@ -14,6 +15,19 @@ fn header_as_i64(headers: &HeaderMap<HeaderValue>, header: impl AsHeaderName) ->
1415 headers. get ( header) ?. to_str ( ) . ok ( ) ?. parse ( ) . ok ( )
1516}
1617
18+ pub trait RateLimitMetrics : Send + Sync {
19+ fn retry_error ( & self , url : & Uri , status_code : http:: StatusCode ) ;
20+ fn final_retry ( & self , url : & Uri , status_code : http:: StatusCode ) ;
21+ fn rate_limited ( & self , url : & Uri , status_code : http:: StatusCode , waiting_seconds : u64 ) ;
22+ }
23+
24+ pub struct NullRateLimitMetrics ;
25+ impl RateLimitMetrics for NullRateLimitMetrics {
26+ fn retry_error ( & self , _url : & Uri , _status_code : http:: StatusCode ) { }
27+ fn final_retry ( & self , _url : & Uri , _status_code : http:: StatusCode ) { }
28+ fn rate_limited ( & self , _url : & Uri , _status_code : http:: StatusCode , _waiting_seconds : u64 ) { }
29+ }
30+
1731#[ derive( Clone ) ]
1832pub enum RetryConfig {
1933 None ,
@@ -27,15 +41,15 @@ pub enum RetryConfig {
2741 /// It's not clear whether it's actually forbidden, or if it's a rate limit.
2842 /// For server errors (5xx), retry immediately
2943 /// For any other errors do not retry.
30- HandleRateLimits ( usize ) ,
44+ HandleRateLimits ( Arc < dyn RateLimitMetrics > , usize ) ,
3145}
3246
3347impl < B > Policy < Request < OctoBody > , Response < B > , Error > for RetryConfig {
3448 type Future = future:: BoxFuture < ' static , ( ) > ;
3549
3650 fn retry (
3751 & mut self ,
38- _req : & mut Request < OctoBody > ,
52+ req : & mut Request < OctoBody > ,
3953 result : & mut Result < Response < B > , Error > ,
4054 ) -> Option < Self :: Future > {
4155 match self {
@@ -86,7 +100,7 @@ impl<B> Policy<Request<OctoBody>, Response<B>, Error> for RetryConfig {
86100 }
87101 }
88102 } ,
89- RetryConfig :: HandleRateLimits ( max_retries) => {
103+ RetryConfig :: HandleRateLimits ( metrics , max_retries) => {
90104 if * max_retries > 0 {
91105 let response = result. as_ref ( ) . ok ( ) ?;
92106
@@ -110,10 +124,17 @@ impl<B> Policy<Request<OctoBody>, Response<B>, Error> for RetryConfig {
110124 {
111125 Some ( 60 )
112126 }
113- _ => None ,
127+ _ => {
128+ metrics. retry_error ( req. uri ( ) , response. status ( ) ) ;
129+ None
130+ }
114131 } ?;
115132
116133 * max_retries -= 1 ;
134+ metrics. rate_limited ( req. uri ( ) , response. status ( ) , wait_secs) ;
135+ if * max_retries == 0 {
136+ metrics. final_retry ( req. uri ( ) , response. status ( ) ) ;
137+ }
117138 Some (
118139 tokio:: time:: sleep ( Duration :: from_secs ( wait_secs) )
119140 . then ( move |_| {
@@ -123,6 +144,10 @@ impl<B> Policy<Request<OctoBody>, Response<B>, Error> for RetryConfig {
123144 )
124145 } else if response. status ( ) . is_server_error ( ) {
125146 * max_retries -= 1 ;
147+ metrics. retry_error ( req. uri ( ) , response. status ( ) ) ;
148+ if * max_retries == 0 {
149+ metrics. final_retry ( req. uri ( ) , response. status ( ) ) ;
150+ }
126151 Some ( future:: ready ( ( ) ) . boxed ( ) )
127152 } else {
128153 None
0 commit comments