Skip to content
This repository was archived by the owner on Mar 5, 2025. It is now read-only.

Commit 783164c

Browse files
committed
Handle the Retry-After: header
1 parent 1a6f05d commit 783164c

File tree

8 files changed

+84
-9
lines changed

8 files changed

+84
-9
lines changed

CHANGES

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ Platform 3.28
1717

1818
We added 429 to the list of HTTP status codes that allow retrying requests.
1919

20+
We now handle the Retry-After: header of 429 HTTP status codes.
21+
2022
Platform 3.27
2123

2224
* Java version support

http-client/src/main/java/com/proofpoint/http/client/balancing/BackoffPolicy.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,5 @@ interface BackoffPolicy
2121
{
2222
BackoffPolicy nextAttempt();
2323

24-
Duration backoff(Duration previousBackoff);
24+
Duration backoff(Duration previousBackoff, Duration suggestedBackoff);
2525
}

http-client/src/main/java/com/proofpoint/http/client/balancing/BalancingHttpClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ public <T, E extends Exception> T execute(Request request, ResponseHandler<T, E>
143143
}
144144
catch (RetryException e) {
145145
attempt.markBad(e.getFailureCategory());
146-
Duration backoff = attemptBackoffPolicy.backoff(previousBackoff);
146+
Duration backoff = attemptBackoffPolicy.backoff(previousBackoff, e.getSuggestedBackoff());
147147
long millis = backoff.roundTo(MILLISECONDS);
148148
try {
149149
Thread.sleep(millis);
@@ -316,7 +316,7 @@ else if (t instanceof RetryException retryException) {
316316
attempt.markBad(retryException.getFailureCategory());
317317
TraceToken traceToken = getCurrentTraceToken();
318318
synchronized (subFutureLock) {
319-
Duration backoff = attemptBackoffPolicy.backoff(previousBackoff);
319+
Duration backoff = attemptBackoffPolicy.backoff(previousBackoff, retryException.getSuggestedBackoff());
320320
ScheduledFuture<?> scheduledFuture = retryExecutor.schedule(() -> {
321321
try (TraceTokenScope scope = registerTraceToken(traceToken)){
322322
synchronized (subFutureLock) {

http-client/src/main/java/com/proofpoint/http/client/balancing/DecorrelatedJitteredBackoffPolicy.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public BackoffPolicy nextAttempt()
4242
}
4343

4444
@Override
45-
public Duration backoff(Duration previousBackoff)
45+
public Duration backoff(Duration previousBackoff, Duration suggestedBackoff)
4646
{
4747
long prev = previousBackoff.roundTo(TimeUnit.NANOSECONDS);
4848
long range = Math.abs(prev * 3 - min);
@@ -52,7 +52,7 @@ public Duration backoff(Duration previousBackoff)
5252
} else {
5353
randBackoff = min + ThreadLocalRandom.current().nextLong(range);
5454
}
55-
long backoff = Math.min(max, randBackoff);
55+
long backoff = Math.min(max, Math.max(randBackoff, suggestedBackoff.roundTo(TimeUnit.NANOSECONDS)));
5656

5757
return new Duration(backoff, TimeUnit.NANOSECONDS);
5858
}

http-client/src/main/java/com/proofpoint/http/client/balancing/HttpServiceBalancerImpl.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import java.util.stream.Collectors;
4343

4444
import static com.google.common.base.Preconditions.checkState;
45+
import static com.proofpoint.http.client.balancing.RetryException.NO_SUGGESTED_BACKOFF;
4546
import static java.util.Objects.requireNonNull;
4647
import static java.util.concurrent.TimeUnit.NANOSECONDS;
4748
import static java.util.concurrent.TimeUnit.SECONDS;
@@ -289,7 +290,7 @@ public void mark(boolean isFailure, InstanceState uriState, HttpServiceAttemptIm
289290
if (++uriState.numFailures >= balancer.consecutiveFailures) {
290291
uriState.liveness = DEAD;
291292
uriState.backoffPolicy = balancer.backoffPolicy;
292-
uriState.lastBackoff = uriState.backoffPolicy.backoff(ZERO_DURATION);
293+
uriState.lastBackoff = uriState.backoffPolicy.backoff(ZERO_DURATION, NO_SUGGESTED_BACKOFF);
293294
uriState.deadUntil = balancer.ticker.read() + uriState.lastBackoff.roundTo(NANOSECONDS);
294295
balancer.httpServiceBalancerStats.removal(attempt.uri).add(uriState.lastBackoff);
295296
}
@@ -321,7 +322,7 @@ public void mark(boolean isFailure, InstanceState uriState, HttpServiceAttemptIm
321322
if (isFailure) {
322323
uriState.liveness = DEAD;
323324
uriState.backoffPolicy = uriState.backoffPolicy.nextAttempt();
324-
uriState.lastBackoff = uriState.backoffPolicy.backoff(uriState.lastBackoff);
325+
uriState.lastBackoff = uriState.backoffPolicy.backoff(uriState.lastBackoff, NO_SUGGESTED_BACKOFF);
325326
uriState.deadUntil = balancer.ticker.read() + uriState.lastBackoff.roundTo(NANOSECONDS);
326327
balancer.httpServiceBalancerStats.removal(attempt.uri).add(uriState.lastBackoff);
327328
}

http-client/src/main/java/com/proofpoint/http/client/balancing/RetryException.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,35 +15,52 @@
1515
*/
1616
package com.proofpoint.http.client.balancing;
1717

18+
import com.proofpoint.units.Duration;
19+
20+
import java.util.concurrent.TimeUnit;
21+
1822
class RetryException extends Exception
1923
{
24+
static final Duration NO_SUGGESTED_BACKOFF = new Duration(0, TimeUnit.MILLISECONDS);
2025
private final String failureCategory;
26+
private final Duration suggestedBackoff;
2127

2228
RetryException(String failureCategory)
29+
{
30+
this(failureCategory, NO_SUGGESTED_BACKOFF);
31+
}
32+
33+
RetryException(String failureCategory, Duration suggestedBackoff)
2334
{
2435
this.failureCategory = failureCategory;
36+
this.suggestedBackoff = suggestedBackoff;
2537
}
2638

2739
RetryException(Exception cause)
2840
{
2941
super(cause);
3042
failureCategory = cause.getClass().getSimpleName();
43+
suggestedBackoff = NO_SUGGESTED_BACKOFF;
3144
}
3245

3346
RetryException(Exception cause, String failureCategory)
3447
{
3548
super(cause);
3649
this.failureCategory = failureCategory;
50+
suggestedBackoff = NO_SUGGESTED_BACKOFF;
3751
}
3852

3953
RetryException(Exception cause, Exception failureException)
4054
{
4155
super(cause);
4256
failureCategory = failureException.getClass().getSimpleName();
57+
suggestedBackoff = NO_SUGGESTED_BACKOFF;
4358
}
4459

4560
String getFailureCategory()
4661
{
4762
return failureCategory;
4863
}
64+
65+
Duration getSuggestedBackoff() { return suggestedBackoff; }
4966
}

http-client/src/main/java/com/proofpoint/http/client/balancing/RetryingResponseHandler.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,17 @@
2222
import com.proofpoint.http.client.Response;
2323
import com.proofpoint.http.client.ResponseHandler;
2424
import com.proofpoint.log.Logger;
25+
import com.proofpoint.units.Duration;
2526

2627
import java.util.Set;
2728
import java.util.concurrent.ExecutionException;
29+
import java.util.concurrent.TimeUnit;
2830
import java.util.concurrent.atomic.AtomicBoolean;
2931

32+
import static com.proofpoint.http.client.balancing.RetryException.NO_SUGGESTED_BACKOFF;
33+
import static java.lang.Integer.parseInt;
34+
import static java.util.concurrent.TimeUnit.SECONDS;
35+
3036
final class RetryingResponseHandler<T, E extends Exception>
3137
implements ResponseHandler<T, RetryException>
3238
{
@@ -89,7 +95,19 @@ public T handle(Request request, Response response)
8995
log.warn("%d response querying %s",
9096
response.getStatusCode(), request.getUri().resolve("/"));
9197
if (!("no".equalsIgnoreCase(retryHeader)) && bodySourceRetryable(request) && retryBudget.canRetry()) {
92-
throw new RetryException(failureCategory);
98+
Duration suggestedBackoff = NO_SUGGESTED_BACKOFF;
99+
if (response.getStatusCode() == 429) {
100+
String retryAfterHeader = response.getHeader("Retry-After");
101+
if (retryAfterHeader != null) {
102+
try {
103+
suggestedBackoff = new Duration(parseInt(retryAfterHeader), SECONDS);
104+
}
105+
catch (NumberFormatException e) {
106+
// ignore
107+
}
108+
}
109+
}
110+
throw new RetryException(failureCategory, suggestedBackoff);
93111
}
94112

95113
Object result;

http-client/src/test/java/com/proofpoint/http/client/balancing/TestDecorrelatedJitteredBackoffPolicy.java

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020

2121
import java.util.concurrent.TimeUnit;
2222

23+
import static com.proofpoint.http.client.balancing.RetryException.NO_SUGGESTED_BACKOFF;
2324
import static com.proofpoint.testing.Assertions.assertGreaterThanOrEqual;
2425
import static com.proofpoint.testing.Assertions.assertLessThanOrEqual;
26+
import static org.testng.Assert.assertEquals;
2527

2628
public class TestDecorrelatedJitteredBackoffPolicy
2729
{
@@ -35,7 +37,7 @@ public void testBackoff()
3537
Duration last = new Duration(0, TimeUnit.SECONDS);
3638
Duration prev = min; // Special-case constraint for the first backoff
3739
for (int j = 0; j < 1000; j++) {
38-
Duration next = policy.backoff(last);
40+
Duration next = policy.backoff(last, NO_SUGGESTED_BACKOFF);
3941
assertGreaterThanOrEqual(next, min);
4042
assertLessThanOrEqual(next, new Duration(prev.roundTo(TimeUnit.NANOSECONDS) * 3, TimeUnit.NANOSECONDS));
4143
assertLessThanOrEqual(next, max);
@@ -44,4 +46,39 @@ public void testBackoff()
4446
}
4547
}
4648
}
49+
50+
@Test
51+
public void testSuggestedBackoff()
52+
{
53+
Duration min = new Duration(1, TimeUnit.MILLISECONDS);
54+
Duration suggested = new Duration(1, TimeUnit.SECONDS);
55+
Duration max = new Duration(5, TimeUnit.SECONDS);
56+
BackoffPolicy policy = new DecorrelatedJitteredBackoffPolicy(min, max);
57+
for (int i = 0; i < 1000; i++) {
58+
Duration last = new Duration(0, TimeUnit.SECONDS);
59+
Duration prev = min; // Special-case constraint for the first backoff
60+
for (int j = 0; j < 1000; j++) {
61+
Duration next = policy.backoff(last, suggested);
62+
assertGreaterThanOrEqual(next, suggested);
63+
if (next.getValue(TimeUnit.NANOSECONDS) > suggested.getValue(TimeUnit.NANOSECONDS)) {
64+
assertLessThanOrEqual(next, new Duration(prev.roundTo(TimeUnit.NANOSECONDS) * 3, TimeUnit.NANOSECONDS));
65+
}
66+
assertLessThanOrEqual(next, max);
67+
prev = next;
68+
last = next;
69+
}
70+
}
71+
}
72+
73+
@Test
74+
public void testSuggestedBackoffGreaterThanMax()
75+
{
76+
Duration min = new Duration(1, TimeUnit.MILLISECONDS);
77+
Duration suggested = new Duration(10, TimeUnit.SECONDS);
78+
Duration max = new Duration(5, TimeUnit.SECONDS);
79+
BackoffPolicy policy = new DecorrelatedJitteredBackoffPolicy(min, max);
80+
Duration last = new Duration(0, TimeUnit.SECONDS);
81+
Duration next = policy.backoff(last, suggested);
82+
assertEquals(next, max);
83+
}
4784
}

0 commit comments

Comments
 (0)