Skip to content

Commit 3c12e3a

Browse files
authored
Refine delay jitter for exponential backoff (#7206)
1 parent 9e0efd4 commit 3c12e3a

File tree

3 files changed

+84
-67
lines changed

3 files changed

+84
-67
lines changed

exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -213,9 +213,11 @@ HttpResponse<byte[]> sendInternal(Marshaler marshaler) throws IOException {
213213
do {
214214
if (attempt > 0) {
215215
// Compute and sleep for backoff
216-
long upperBoundNanos = Math.min(nextBackoffNanos, retryPolicy.getMaxBackoff().toNanos());
217-
long backoffNanos = ThreadLocalRandom.current().nextLong(upperBoundNanos);
218-
nextBackoffNanos = (long) (nextBackoffNanos * retryPolicy.getBackoffMultiplier());
216+
long currentBackoffNanos =
217+
Math.min(nextBackoffNanos, retryPolicy.getMaxBackoff().toNanos());
218+
long backoffNanos =
219+
(long) (ThreadLocalRandom.current().nextDouble(0.8d, 1.2d) * currentBackoffNanos);
220+
nextBackoffNanos = (long) (currentBackoffNanos * retryPolicy.getBackoffMultiplier());
219221
try {
220222
TimeUnit.NANOSECONDS.sleep(backoffNanos);
221223
} catch (InterruptedException e) {
@@ -227,16 +229,11 @@ HttpResponse<byte[]> sendInternal(Marshaler marshaler) throws IOException {
227229
break;
228230
}
229231
}
230-
231-
attempt++;
232+
httpResponse = null;
233+
exception = null;
232234
requestBuilder.timeout(Duration.ofNanos(timeoutNanos - (System.nanoTime() - startTimeNanos)));
233235
try {
234236
httpResponse = sendRequest(requestBuilder, byteBufferPool);
235-
} catch (IOException e) {
236-
exception = e;
237-
}
238-
239-
if (httpResponse != null) {
240237
boolean retryable = retryableStatusCodes.contains(httpResponse.statusCode());
241238
if (logger.isLoggable(Level.FINER)) {
242239
logger.log(
@@ -251,8 +248,8 @@ HttpResponse<byte[]> sendInternal(Marshaler marshaler) throws IOException {
251248
if (!retryable) {
252249
return httpResponse;
253250
}
254-
}
255-
if (exception != null) {
251+
} catch (IOException e) {
252+
exception = e;
256253
boolean retryable = retryExceptionPredicate.test(exception);
257254
if (logger.isLoggable(Level.FINER)) {
258255
logger.log(
@@ -268,7 +265,7 @@ HttpResponse<byte[]> sendInternal(Marshaler marshaler) throws IOException {
268265
throw exception;
269266
}
270267
}
271-
} while (attempt < retryPolicy.getMaxAttempts());
268+
} while (++attempt < retryPolicy.getMaxAttempts());
272269

273270
if (httpResponse != null) {
274271
return httpResponse;

exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/RetryInterceptor.java

Lines changed: 30 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.util.concurrent.TimeUnit;
1919
import java.util.function.Function;
2020
import java.util.function.Predicate;
21+
import java.util.function.Supplier;
2122
import java.util.logging.Level;
2223
import java.util.logging.Logger;
2324
import okhttp3.Interceptor;
@@ -37,7 +38,7 @@ public final class RetryInterceptor implements Interceptor {
3738
private final Function<Response, Boolean> isRetryable;
3839
private final Predicate<IOException> retryExceptionPredicate;
3940
private final Sleeper sleeper;
40-
private final BoundedLongGenerator randomLong;
41+
private final Supplier<Double> randomJitter;
4142

4243
/** Constructs a new retrier. */
4344
public RetryInterceptor(RetryPolicy retryPolicy, Function<Response, Boolean> isRetryable) {
@@ -48,7 +49,7 @@ public RetryInterceptor(RetryPolicy retryPolicy, Function<Response, Boolean> isR
4849
? RetryInterceptor::isRetryableException
4950
: retryPolicy.getRetryExceptionPredicate(),
5051
TimeUnit.NANOSECONDS::sleep,
51-
bound -> ThreadLocalRandom.current().nextLong(bound));
52+
() -> ThreadLocalRandom.current().nextDouble(0.8d, 1.2d));
5253
}
5354

5455
// Visible for testing
@@ -57,12 +58,12 @@ public RetryInterceptor(RetryPolicy retryPolicy, Function<Response, Boolean> isR
5758
Function<Response, Boolean> isRetryable,
5859
Predicate<IOException> retryExceptionPredicate,
5960
Sleeper sleeper,
60-
BoundedLongGenerator randomLong) {
61+
Supplier<Double> randomJitter) {
6162
this.retryPolicy = retryPolicy;
6263
this.isRetryable = isRetryable;
6364
this.retryExceptionPredicate = retryExceptionPredicate;
6465
this.sleeper = sleeper;
65-
this.randomLong = randomLong;
66+
this.randomJitter = randomJitter;
6667
}
6768

6869
@Override
@@ -75,9 +76,10 @@ public Response intercept(Chain chain) throws IOException {
7576
if (attempt > 0) {
7677
// Compute and sleep for backoff
7778
// https://github.com/grpc/proposal/blob/master/A6-client-retries.md#exponential-backoff
78-
long upperBoundNanos = Math.min(nextBackoffNanos, retryPolicy.getMaxBackoff().toNanos());
79-
long backoffNanos = randomLong.get(upperBoundNanos);
80-
nextBackoffNanos = (long) (nextBackoffNanos * retryPolicy.getBackoffMultiplier());
79+
long currentBackoffNanos =
80+
Math.min(nextBackoffNanos, retryPolicy.getMaxBackoff().toNanos());
81+
long backoffNanos = (long) (randomJitter.get() * currentBackoffNanos);
82+
nextBackoffNanos = (long) (currentBackoffNanos * retryPolicy.getBackoffMultiplier());
8183
try {
8284
sleeper.sleep(backoffNanos);
8385
} catch (InterruptedException e) {
@@ -88,31 +90,31 @@ public Response intercept(Chain chain) throws IOException {
8890
if (response != null) {
8991
response.close();
9092
}
93+
exception = null;
9194
}
92-
93-
attempt++;
9495
try {
9596
response = chain.proceed(chain.request());
97+
if (response != null) {
98+
boolean retryable = Boolean.TRUE.equals(isRetryable.apply(response));
99+
if (logger.isLoggable(Level.FINER)) {
100+
logger.log(
101+
Level.FINER,
102+
"Attempt "
103+
+ attempt
104+
+ " returned "
105+
+ (retryable ? "retryable" : "non-retryable")
106+
+ " response: "
107+
+ responseStringRepresentation(response));
108+
}
109+
if (!retryable) {
110+
return response;
111+
}
112+
} else {
113+
throw new NullPointerException("response cannot be null.");
114+
}
96115
} catch (IOException e) {
97116
exception = e;
98-
}
99-
if (response != null) {
100-
boolean retryable = Boolean.TRUE.equals(isRetryable.apply(response));
101-
if (logger.isLoggable(Level.FINER)) {
102-
logger.log(
103-
Level.FINER,
104-
"Attempt "
105-
+ attempt
106-
+ " returned "
107-
+ (retryable ? "retryable" : "non-retryable")
108-
+ " response: "
109-
+ responseStringRepresentation(response));
110-
}
111-
if (!retryable) {
112-
return response;
113-
}
114-
}
115-
if (exception != null) {
117+
response = null;
116118
boolean retryable = retryExceptionPredicate.test(exception);
117119
if (logger.isLoggable(Level.FINER)) {
118120
logger.log(
@@ -128,8 +130,7 @@ public Response intercept(Chain chain) throws IOException {
128130
throw exception;
129131
}
130132
}
131-
132-
} while (attempt < retryPolicy.getMaxAttempts());
133+
} while (++attempt < retryPolicy.getMaxAttempts());
133134

134135
if (response != null) {
135136
return response;
@@ -172,11 +173,6 @@ static boolean isRetryableException(IOException e) {
172173
return false;
173174
}
174175

175-
// Visible for testing
176-
interface BoundedLongGenerator {
177-
long get(long bound);
178-
}
179-
180176
// Visible for testing
181177
interface Sleeper {
182178
void sleep(long delayNanos) throws InterruptedException;

exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/RetryInterceptorTest.java

Lines changed: 44 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,9 @@
99
import static org.assertj.core.api.Assertions.assertThatThrownBy;
1010
import static org.mockito.ArgumentMatchers.any;
1111
import static org.mockito.ArgumentMatchers.anyLong;
12+
import static org.mockito.Mockito.doAnswer;
1213
import static org.mockito.Mockito.doNothing;
13-
import static org.mockito.Mockito.doThrow;
14+
import static org.mockito.Mockito.mock;
1415
import static org.mockito.Mockito.never;
1516
import static org.mockito.Mockito.spy;
1617
import static org.mockito.Mockito.verify;
@@ -32,9 +33,11 @@
3233
import java.time.Duration;
3334
import java.util.concurrent.TimeUnit;
3435
import java.util.function.Predicate;
36+
import java.util.function.Supplier;
3537
import java.util.logging.Level;
3638
import java.util.logging.Logger;
3739
import java.util.stream.Stream;
40+
import okhttp3.Interceptor;
3841
import okhttp3.OkHttpClient;
3942
import okhttp3.Request;
4043
import okhttp3.Response;
@@ -47,15 +50,17 @@
4750
import org.junit.jupiter.params.provider.MethodSource;
4851
import org.junit.jupiter.params.provider.ValueSource;
4952
import org.mockito.Mock;
53+
import org.mockito.invocation.InvocationOnMock;
5054
import org.mockito.junit.jupiter.MockitoExtension;
55+
import org.mockito.stubbing.Answer;
5156

5257
@ExtendWith(MockitoExtension.class)
5358
class RetryInterceptorTest {
5459

5560
@RegisterExtension static final MockWebServerExtension server = new MockWebServerExtension();
5661

5762
@Mock private RetryInterceptor.Sleeper sleeper;
58-
@Mock private RetryInterceptor.BoundedLongGenerator random;
63+
@Mock private Supplier<Double> random;
5964
private Predicate<IOException> retryExceptionPredicate;
6065

6166
private RetryInterceptor retrier;
@@ -91,6 +96,24 @@ public boolean test(IOException e) {
9196
client = new OkHttpClient.Builder().addInterceptor(retrier).build();
9297
}
9398

99+
@Test
100+
void noRetryOnNullResponse() throws IOException {
101+
Interceptor.Chain chain = mock(Interceptor.Chain.class);
102+
when(chain.proceed(any())).thenReturn(null);
103+
when(chain.request())
104+
.thenReturn(new Request.Builder().url(server.httpUri().toString()).build());
105+
assertThatThrownBy(
106+
() -> {
107+
retrier.intercept(chain);
108+
})
109+
.isInstanceOf(NullPointerException.class)
110+
.hasMessage("response cannot be null.");
111+
112+
verifyNoInteractions(retryExceptionPredicate);
113+
verifyNoInteractions(random);
114+
verifyNoInteractions(sleeper);
115+
}
116+
94117
@Test
95118
void noRetry() throws Exception {
96119
server.enqueue(HttpResponse.of(HttpStatus.OK));
@@ -109,17 +132,8 @@ void noRetry() throws Exception {
109132
@ValueSource(ints = {5, 6})
110133
void backsOff(int attempts) throws Exception {
111134
succeedOnAttempt(attempts);
112-
113-
// Will backoff 4 times
114-
when(random.get((long) (TimeUnit.SECONDS.toNanos(1) * Math.pow(1.6, 0)))).thenReturn(100L);
115-
when(random.get((long) (TimeUnit.SECONDS.toNanos(1) * Math.pow(1.6, 1)))).thenReturn(50L);
116-
// Capped
117-
when(random.get(TimeUnit.SECONDS.toNanos(2))).thenReturn(500L).thenReturn(510L);
118-
119-
doNothing().when(sleeper).sleep(100);
120-
doNothing().when(sleeper).sleep(50);
121-
doNothing().when(sleeper).sleep(500);
122-
doNothing().when(sleeper).sleep(510);
135+
when(random.get()).thenReturn(1.0d);
136+
doNothing().when(sleeper).sleep(anyLong());
123137

124138
try (Response response = sendRequest()) {
125139
if (attempts <= 5) {
@@ -139,16 +153,26 @@ void interrupted() throws Exception {
139153
succeedOnAttempt(5);
140154

141155
// Backs off twice, second is interrupted
142-
when(random.get((long) (TimeUnit.SECONDS.toNanos(1) * Math.pow(1.6, 0)))).thenReturn(100L);
143-
when(random.get((long) (TimeUnit.SECONDS.toNanos(1) * Math.pow(1.6, 1)))).thenReturn(50L);
156+
when(random.get()).thenReturn(1.0d).thenReturn(1.0d);
157+
doAnswer(
158+
new Answer<Void>() {
159+
int counter = 0;
144160

145-
doNothing().when(sleeper).sleep(100);
146-
doThrow(new InterruptedException()).when(sleeper).sleep(50);
161+
@Override
162+
public Void answer(InvocationOnMock invocation) throws Throwable {
163+
if (counter++ == 1) {
164+
throw new InterruptedException();
165+
}
166+
return null;
167+
}
168+
})
169+
.when(sleeper)
170+
.sleep(anyLong());
147171

148172
try (Response response = sendRequest()) {
149173
assertThat(response.isSuccessful()).isFalse();
150174
}
151-
175+
verify(sleeper, times(2)).sleep(anyLong());
152176
for (int i = 0; i < 2; i++) {
153177
server.takeRequest(0, TimeUnit.NANOSECONDS);
154178
}
@@ -157,7 +181,7 @@ void interrupted() throws Exception {
157181
@Test
158182
void connectTimeout() throws Exception {
159183
client = connectTimeoutClient();
160-
when(random.get(anyLong())).thenReturn(1L);
184+
when(random.get()).thenReturn(1.0d);
161185
doNothing().when(sleeper).sleep(anyLong());
162186

163187
// Connecting to a non-routable IP address to trigger connection error
@@ -174,7 +198,7 @@ void connectTimeout() throws Exception {
174198
@Test
175199
void connectException() throws Exception {
176200
client = connectTimeoutClient();
177-
when(random.get(anyLong())).thenReturn(1L);
201+
when(random.get()).thenReturn(1.0d);
178202
doNothing().when(sleeper).sleep(anyLong());
179203

180204
// Connecting to localhost on an unused port address to trigger java.net.ConnectException

0 commit comments

Comments
 (0)