3131import java .net .URI ;
3232import java .nio .ByteBuffer ;
3333import java .time .Duration ;
34+ import java .time .ZonedDateTime ;
35+ import java .time .format .DateTimeFormatter ;
36+ import java .time .format .DateTimeParseException ;
3437import java .util .List ;
3538import java .util .Optional ;
3639import java .util .concurrent .CompletableFuture ;
3740import java .util .concurrent .CompletionException ;
3841import java .util .concurrent .TimeUnit ;
3942import java .util .function .BiConsumer ;
43+ import java .util .function .Function ;
4044import java .util .function .Supplier ;
41- import java .util .function .ToIntFunction ;
4245
4346public abstract class StandardHttpClient <C extends HttpClient , F extends HttpClient .Factory , T extends StandardHttpClientBuilder <C , F , ?>>
4447 implements HttpClient , RequestTags {
@@ -85,7 +88,7 @@ public CompletableFuture<HttpResponse<AsyncBody>> consumeBytes(HttpRequest reque
8588 standardHttpRequest ,
8689 () -> consumeBytesOnce (standardHttpRequest , consumer ),
8790 r -> r .body ().cancel (),
88- HttpResponse :: code );
91+ r -> r );
8992 }
9093
9194 private CompletableFuture <HttpResponse <AsyncBody >> consumeBytesOnce (StandardHttpRequest standardHttpRequest ,
@@ -146,7 +149,7 @@ private CompletableFuture<HttpResponse<AsyncBody>> consumeBytesOnce(StandardHttp
146149 */
147150 private <V > CompletableFuture <V > retryWithExponentialBackoff (
148151 StandardHttpRequest request , Supplier <CompletableFuture <V >> action , java .util .function .Consumer <V > onCancel ,
149- ToIntFunction < V > codeExtractor ) {
152+ Function < V , HttpResponse <?>> responseExtractor ) {
150153 final URI uri = request .uri ();
151154 final RequestConfig requestConfig = getTag (RequestConfig .class );
152155 final ExponentialBackoffIntervalCalculator retryIntervalCalculator = ExponentialBackoffIntervalCalculator
@@ -160,18 +163,23 @@ private <V> CompletableFuture<V> retryWithExponentialBackoff(
160163 return AsyncUtils .retryWithExponentialBackoff (action , onCancel , timeout , retryIntervalCalculator ,
161164 (response , throwable , retryInterval ) -> {
162165 if (response != null ) {
163- final int code = codeExtractor .applyAsInt (response );
164- if (code >= 500 ) {
165- LOG .debug (
166- "HTTP operation on url: {} should be retried as the response code was {}, retrying after {} millis" ,
167- uri , code , retryInterval );
168- return true ;
166+ HttpResponse <?> httpResponse = responseExtractor .apply (response );
167+ if (httpResponse != null ) {
168+ final int code = httpResponse .code ();
169+ if (code == 429 || code >= 500 ) {
170+ retryInterval = Math .max (retryAfterMillis (httpResponse ), retryInterval );
171+ LOG .debug (
172+ "HTTP operation on url: {} should be retried as the response code was {}, retrying after {} millis" ,
173+ uri , code , retryInterval );
174+ return true ;
175+ }
169176 }
170177 } else {
171178 if (throwable instanceof CompletionException ) {
172179 throwable = throwable .getCause ();
173180 }
174181 if (throwable instanceof IOException ) {
182+ // TODO: may not be specific enough - incorrect ssl settings for example will get caught here
175183 LOG .debug (
176184 String .format ("HTTP operation on url: %s should be retried after %d millis because of IOException" ,
177185 uri , retryInterval ),
@@ -183,6 +191,25 @@ private <V> CompletableFuture<V> retryWithExponentialBackoff(
183191 });
184192 }
185193
194+ private long retryAfterMillis (HttpResponse <?> httpResponse ) {
195+ String retryAfter = httpResponse .header ("Retry-After" );
196+ if (retryAfter != null ) {
197+ try {
198+ return Integer .parseInt (retryAfter ) * 1000L ;
199+ } catch (NumberFormatException e ) {
200+ // not a simple number
201+ }
202+ // Kubernetes does not seem to currently use this, but just in case
203+ try {
204+ ZonedDateTime after = ZonedDateTime .parse (retryAfter , DateTimeFormatter .RFC_1123_DATE_TIME );
205+ return after .toEpochSecond () * 1000 - System .currentTimeMillis ();
206+ } catch (DateTimeParseException e1 ) {
207+ // not a recognized http date
208+ }
209+ }
210+ return 0 ; // we'll just use the default
211+ }
212+
186213 @ Override
187214 public io .fabric8 .kubernetes .client .http .WebSocket .Builder newWebSocketBuilder () {
188215 return new StandardWebSocketBuilder (this );
@@ -201,7 +228,7 @@ final CompletableFuture<WebSocket> buildWebSocket(StandardWebSocketBuilder stand
201228 standardWebSocketBuilder .asHttpRequest (),
202229 () -> buildWebSocketOnce (standardWebSocketBuilder , listener ),
203230 r -> Optional .ofNullable (r .webSocket ).ifPresent (w -> w .sendClose (1000 , null )),
204- r -> Optional . of ( r .webSocketUpgradeResponse ). map ( HttpResponse :: code ). orElse ( null ) );
231+ r -> r .webSocketUpgradeResponse );
205232
206233 CompletableFuture <WebSocket > result = new CompletableFuture <>();
207234
0 commit comments