Skip to content

Commit acdfb61

Browse files
authored
Fix Flaky Core Tests (Azure#23600)
Fix Flaky Core Tests
1 parent 14c3a07 commit acdfb61

File tree

5 files changed

+121
-180
lines changed

5 files changed

+121
-180
lines changed

sdk/core/azure-core/src/main/java/com/azure/core/http/policy/RetryPolicy.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.time.temporal.ChronoUnit;
2222
import java.util.Objects;
2323
import java.util.function.Function;
24+
import java.util.function.Supplier;
2425

2526
import static com.azure.core.util.CoreUtils.isNullOrEmpty;
2627

@@ -155,7 +156,7 @@ static Duration determineDelayDuration(HttpResponse response, int tryCount, Retr
155156
String retryAfterHeader, ChronoUnit retryAfterTimeUnit) {
156157
// If the retry after header hasn't been configured, attempt to look up the well-known headers.
157158
if (isNullOrEmpty(retryAfterHeader)) {
158-
return getWellKnownRetryDelay(response.getHeaders(), tryCount, retryStrategy);
159+
return getWellKnownRetryDelay(response.getHeaders(), tryCount, retryStrategy, OffsetDateTime::now);
159160
}
160161

161162
String retryHeaderValue = response.getHeaderValue(retryAfterHeader);
@@ -172,7 +173,8 @@ static Duration determineDelayDuration(HttpResponse response, int tryCount, Retr
172173
/*
173174
* Determines the delay duration that should be waited before retrying using the well-known retry headers.
174175
*/
175-
static Duration getWellKnownRetryDelay(HttpHeaders responseHeaders, int tryCount, RetryStrategy retryStrategy) {
176+
static Duration getWellKnownRetryDelay(HttpHeaders responseHeaders, int tryCount, RetryStrategy retryStrategy,
177+
Supplier<OffsetDateTime> nowSupplier) {
176178
// Found 'x-ms-retry-after-ms' header, use a Duration of milliseconds based on the value.
177179
Duration retryDelay = tryGetRetryDelay(responseHeaders, X_MS_RETRY_AFTER_MS_HEADER,
178180
RetryPolicy::tryGetDelayMillis);
@@ -188,7 +190,8 @@ static Duration getWellKnownRetryDelay(HttpHeaders responseHeaders, int tryCount
188190

189191
// Found 'Retry-After' header. First, attempt to resolve it as a Duration of seconds. If that fails, then
190192
// attempt to resolve it as an HTTP date (RFC1123).
191-
retryDelay = tryGetRetryDelay(responseHeaders, RETRY_AFTER_HEADER, RetryPolicy::tryParseLongOrDateTime);
193+
retryDelay = tryGetRetryDelay(responseHeaders, RETRY_AFTER_HEADER,
194+
headerValue -> tryParseLongOrDateTime(headerValue, nowSupplier));
192195
if (retryDelay != null) {
193196
return retryDelay;
194197
}
@@ -209,12 +212,12 @@ private static Duration tryGetDelayMillis(String value) {
209212
return (delayMillis >= 0) ? Duration.ofMillis(delayMillis) : null;
210213
}
211214

212-
private static Duration tryParseLongOrDateTime(String value) {
215+
private static Duration tryParseLongOrDateTime(String value, Supplier<OffsetDateTime> nowSupplier) {
213216
long delaySeconds;
214217
try {
215218
OffsetDateTime retryAfter = new DateTimeRfc1123(value).getDateTime();
216219

217-
delaySeconds = OffsetDateTime.now().until(retryAfter, ChronoUnit.SECONDS);
220+
delaySeconds = nowSupplier.get().until(retryAfter, ChronoUnit.SECONDS);
218221
} catch (DateTimeException ex) {
219222
delaySeconds = tryParseLong(value);
220223
}

sdk/core/azure-core/src/test/java/com/azure/core/credential/TokenCacheTests.java

Lines changed: 20 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -17,39 +17,35 @@
1717
import java.util.concurrent.atomic.AtomicInteger;
1818
import java.util.concurrent.atomic.AtomicLong;
1919

20+
import static org.junit.jupiter.api.Assertions.assertEquals;
21+
2022
public class TokenCacheTests {
2123
private static final Random RANDOM = new Random();
2224

2325
@Test
2426
public void testOnlyOneThreadRefreshesToken() throws Exception {
27+
AtomicLong refreshes = new AtomicLong(0);
28+
2529
// Token acquisition time grows in 1 sec, 2 sec... To make sure only one token acquisition is run
26-
SimpleTokenCache cache = new SimpleTokenCache(() -> incrementalRemoteGetTokenAsync(new AtomicInteger(1)));
30+
SimpleTokenCache cache = new SimpleTokenCache(() -> {
31+
refreshes.incrementAndGet();
32+
return incrementalRemoteGetTokenAsync(new AtomicInteger(1));
33+
});
2734

2835
CountDownLatch latch = new CountDownLatch(1);
29-
AtomicLong maxMillis = new AtomicLong(0);
30-
31-
Flux.range(1, 10)
32-
.flatMap(i -> Mono.just(OffsetDateTime.now())
33-
// Runs cache.getToken() on 10 different threads
34-
.publishOn(Schedulers.parallel())
35-
.flatMap(start -> cache.getToken()
36-
.map(t -> Duration.between(start, OffsetDateTime.now()).toMillis())
37-
.doOnNext(millis -> {
38-
if (millis > maxMillis.get()) {
39-
maxMillis.set(millis);
40-
}
41-
// System.out.format("Thread: %s\tDuration: %smillis%n",
42-
// Thread.currentThread().getName(), Duration.between(start, OffsetDateTime.now()).toMillis());
43-
})))
36+
37+
Flux.range(1, 10).flatMap(ignored -> Mono.just(OffsetDateTime.now()))
38+
.parallel(10)
39+
// Runs cache.getToken() on 10 different threads
40+
.runOn(Schedulers.boundedElastic())
41+
.flatMap(start -> cache.getToken())
4442
.doOnComplete(latch::countDown)
4543
.subscribe();
4644

4745
latch.await();
48-
long maxMs = maxMillis.get();
49-
Assertions.assertTrue(maxMs > 1000, () -> String.format("maxMillis was less than 1000ms. Was %d.", maxMs));
5046

51-
// Big enough for any latency, small enough to make sure no get token is called twice
52-
Assertions.assertTrue(maxMs < 2000, () -> String.format("maxMillis was greater than 2000ms. Was %d.", maxMs));
47+
// Ensure that only one refresh attempt is made.
48+
assertEquals(1, refreshes.get());
5349
}
5450

5551
@Test
@@ -59,23 +55,15 @@ public void testLongRunningWontOverflow() throws Exception {
5955
// token expires on creation. Run this 100 times to simulate running the application a long time
6056
SimpleTokenCache cache = new SimpleTokenCache(() -> {
6157
refreshes.incrementAndGet();
62-
return remoteGetTokenThatExpiresSoonAsync(1000, 0);
58+
return remoteGetTokenThatExpiresSoonAsync();
6359
});
6460

6561
VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.create();
6662
CountDownLatch latch = new CountDownLatch(1);
6763

6864
Flux.interval(Duration.ofMillis(100), virtualTimeScheduler)
6965
.take(100)
70-
.flatMap(i -> Mono.just(OffsetDateTime.now())
71-
// Runs cache.getToken() on 10 different threads
72-
.subscribeOn(Schedulers.parallel())
73-
.flatMap(start -> cache.getToken()
74-
.map(t -> Duration.between(start, OffsetDateTime.now()).toMillis())
75-
.doOnNext(millis -> {
76-
// System.out.format("Thread: %s\tDuration: %smillis%n",
77-
// Thread.currentThread().getName(), Duration.between(start, OffsetDateTime.now()).toMillis());
78-
})))
66+
.flatMap(i -> cache.getToken())
7967
.doOnComplete(latch::countDown)
8068
.subscribe();
8169

@@ -86,14 +74,8 @@ public void testLongRunningWontOverflow() throws Exception {
8674
Assertions.assertTrue(refreshes.get() <= 11);
8775
}
8876

89-
private Mono<AccessToken> remoteGetTokenAsync(long delayInMillis) {
90-
return Mono.delay(Duration.ofMillis(delayInMillis))
91-
.map(l -> new Token(Integer.toString(RANDOM.nextInt(100))));
92-
}
93-
94-
private Mono<AccessToken> remoteGetTokenThatExpiresSoonAsync(long delayInMillis, long validityInMillis) {
95-
return Mono.delay(Duration.ofMillis(delayInMillis))
96-
.map(l -> new Token(Integer.toString(RANDOM.nextInt(100)), validityInMillis));
77+
private Mono<AccessToken> remoteGetTokenThatExpiresSoonAsync() {
78+
return Mono.delay(Duration.ofMillis(1000)).map(l -> new Token(Integer.toString(RANDOM.nextInt(100)), 0));
9779
}
9880

9981
// First token takes latency seconds, and adds 1 sec every subsequent call
@@ -103,32 +85,12 @@ private Mono<AccessToken> incrementalRemoteGetTokenAsync(AtomicInteger latency)
10385
}
10486

10587
private static class Token extends AccessToken {
106-
private String token;
107-
private OffsetDateTime expiry;
108-
109-
@Override
110-
public String getToken() {
111-
return token;
112-
}
113-
11488
Token(String token) {
11589
this(token, 5000);
11690
}
11791

11892
Token(String token, long validityInMillis) {
11993
super(token, OffsetDateTime.now().plus(Duration.ofMillis(validityInMillis)));
120-
this.token = token;
121-
this.expiry = OffsetDateTime.now().plus(Duration.ofMillis(validityInMillis));
122-
}
123-
124-
@Override
125-
public OffsetDateTime getExpiresAt() {
126-
return expiry;
127-
}
128-
129-
@Override
130-
public boolean isExpired() {
131-
return OffsetDateTime.now().isAfter(expiry);
13294
}
13395
}
13496
}

sdk/core/azure-core/src/test/java/com/azure/core/http/policy/HttpLoggingPolicyTests.java

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,11 @@
1818
import com.azure.core.util.logging.LogLevel;
1919
import org.junit.jupiter.api.AfterEach;
2020
import org.junit.jupiter.api.BeforeEach;
21+
import org.junit.jupiter.api.parallel.Execution;
22+
import org.junit.jupiter.api.parallel.ExecutionMode;
23+
import org.junit.jupiter.api.parallel.Isolated;
2124
import org.junit.jupiter.api.parallel.ResourceLock;
25+
import org.junit.jupiter.api.parallel.Resources;
2226
import org.junit.jupiter.params.ParameterizedTest;
2327
import org.junit.jupiter.params.provider.Arguments;
2428
import org.junit.jupiter.params.provider.EnumSource;
@@ -28,8 +32,8 @@
2832
import reactor.test.StepVerifier;
2933

3034
import java.io.ByteArrayOutputStream;
31-
import java.io.IOException;
3235
import java.io.PrintStream;
36+
import java.io.UncheckedIOException;
3337
import java.io.UnsupportedEncodingException;
3438
import java.net.MalformedURLException;
3539
import java.net.URL;
@@ -50,6 +54,9 @@
5054
/**
5155
* This class contains tests for {@link HttpLoggingPolicy}.
5256
*/
57+
@Execution(ExecutionMode.SAME_THREAD)
58+
@Isolated
59+
@ResourceLock(Resources.SYSTEM_OUT)
5360
public class HttpLoggingPolicyTests {
5461
private static final String REDACTED = "REDACTED";
5562
private static final Context CONTEXT = new Context("caller-method", HttpLoggingPolicyTests.class.getName());
@@ -61,9 +68,7 @@ public class HttpLoggingPolicyTests {
6168
@BeforeEach
6269
public void prepareForTest() {
6370
// Set the log level to information for the test.
64-
originalLogLevel = Configuration.getGlobalConfiguration().get(PROPERTY_AZURE_LOG_LEVEL);
65-
Configuration.getGlobalConfiguration().put(PROPERTY_AZURE_LOG_LEVEL,
66-
String.valueOf(LogLevel.INFORMATIONAL.getLogLevel()));
71+
setupLogLevel(LogLevel.INFORMATIONAL.getLogLevel());
6772

6873
/*
6974
* DefaultLogger uses System.out to log. Inject a custom PrintStream to log into for the duration of the test to
@@ -75,25 +80,19 @@ public void prepareForTest() {
7580
}
7681

7782
@AfterEach
78-
public void cleanupAfterTest() throws IOException {
83+
public void cleanupAfterTest() {
7984
// Reset or clear the log level after the test completes.
80-
if (CoreUtils.isNullOrEmpty(originalLogLevel)) {
81-
Configuration.getGlobalConfiguration().remove(PROPERTY_AZURE_LOG_LEVEL);
82-
} else {
83-
Configuration.getGlobalConfiguration().put(PROPERTY_AZURE_LOG_LEVEL, originalLogLevel);
84-
}
85+
setPropertyToOriginalOrClear(originalLogLevel);
8586

8687
// Reset System.err to the original PrintStream.
8788
System.setOut(originalSystemOut);
88-
logCaptureStream.close();
8989
}
9090

9191
/**
9292
* Tests that a query string will be properly redacted before it is logged.
9393
*/
9494
@ParameterizedTest
9595
@MethodSource("redactQueryParametersSupplier")
96-
@ResourceLock("SYSTEM_OUT")
9796
public void redactQueryParameters(String requestUrl, String expectedQueryString,
9897
Set<String> allowedQueryParameters) {
9998
HttpPipeline pipeline = new HttpPipelineBuilder()
@@ -138,7 +137,6 @@ private static Stream<Arguments> redactQueryParametersSupplier() {
138137
*/
139138
@ParameterizedTest(name = "[{index}] {displayName}")
140139
@MethodSource("validateLoggingDoesNotConsumeSupplier")
141-
@ResourceLock("SYSTEM_OUT")
142140
public void validateLoggingDoesNotConsumeRequest(Flux<ByteBuffer> stream, byte[] data, int contentLength)
143141
throws MalformedURLException {
144142
URL requestUrl = new URL("https://test.com");
@@ -154,7 +152,7 @@ public void validateLoggingDoesNotConsumeRequest(Flux<ByteBuffer> stream, byte[]
154152
.build();
155153

156154
StepVerifier.create(pipeline.send(new HttpRequest(HttpMethod.POST, requestUrl, requestHeaders, stream),
157-
CONTEXT))
155+
CONTEXT))
158156
.verifyComplete();
159157

160158
String logString = convertOutputStreamToString(logCaptureStream);
@@ -166,7 +164,6 @@ public void validateLoggingDoesNotConsumeRequest(Flux<ByteBuffer> stream, byte[]
166164
*/
167165
@ParameterizedTest(name = "[{index}] {displayName}")
168166
@MethodSource("validateLoggingDoesNotConsumeSupplier")
169-
@ResourceLock("SYSTEM_OUT")
170167
public void validateLoggingDoesNotConsumeResponse(Flux<ByteBuffer> stream, byte[] data, int contentLength) {
171168
HttpRequest request = new HttpRequest(HttpMethod.GET, "https://test.com");
172169
HttpHeaders responseHeaders = new HttpHeaders()
@@ -276,8 +273,7 @@ public Mono<String> getBodyAsString(Charset charset) {
276273

277274
@ParameterizedTest(name = "[{index}] {displayName}")
278275
@EnumSource(value = HttpLogDetailLevel.class, mode = EnumSource.Mode.INCLUDE,
279-
names = { "BASIC", "HEADERS", "BODY", "BODY_AND_HEADERS" })
280-
@ResourceLock("SYSTEM_OUT")
276+
names = {"BASIC", "HEADERS", "BODY", "BODY_AND_HEADERS"})
281277
public void loggingIncludesRetryCount(HttpLogDetailLevel logLevel) {
282278
AtomicInteger requestCount = new AtomicInteger();
283279
HttpRequest request = new HttpRequest(HttpMethod.GET, "https://test.com");
@@ -298,11 +294,24 @@ public void loggingIncludesRetryCount(HttpLogDetailLevel logLevel) {
298294
assertTrue(logString.contains("Try count: 2"));
299295
}
300296

297+
private void setupLogLevel(int logLevelToSet) {
298+
originalLogLevel = Configuration.getGlobalConfiguration().get(PROPERTY_AZURE_LOG_LEVEL);
299+
Configuration.getGlobalConfiguration().put(PROPERTY_AZURE_LOG_LEVEL, String.valueOf(logLevelToSet));
300+
}
301+
302+
private void setPropertyToOriginalOrClear(String originalValue) {
303+
if (CoreUtils.isNullOrEmpty(originalValue)) {
304+
Configuration.getGlobalConfiguration().remove(PROPERTY_AZURE_LOG_LEVEL);
305+
} else {
306+
Configuration.getGlobalConfiguration().put(PROPERTY_AZURE_LOG_LEVEL, originalValue);
307+
}
308+
}
309+
301310
private static String convertOutputStreamToString(ByteArrayOutputStream stream) {
302311
try {
303-
return stream.toString("UTF-8");
312+
return stream.toString(StandardCharsets.UTF_8.name());
304313
} catch (UnsupportedEncodingException e) {
305-
throw new RuntimeException(e);
314+
throw new UncheckedIOException(e);
306315
}
307316
}
308317
}

sdk/core/azure-core/src/test/java/com/azure/core/http/policy/RetryPolicyTests.java

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import java.util.stream.Stream;
4040

4141
import static org.junit.jupiter.api.Assertions.assertEquals;
42-
import static org.junit.jupiter.api.Assertions.assertTrue;
4342
import static org.mockito.ArgumentMatchers.anyInt;
4443
import static org.mockito.Mockito.mock;
4544
import static org.mockito.Mockito.when;
@@ -282,12 +281,13 @@ public Mono<HttpResponse> send(HttpRequest request) {
282281
.verifyComplete();
283282
}
284283

284+
@SuppressWarnings("ReactiveStreamsUnusedPublisher")
285285
@Test
286286
public void retryConsumesBody() {
287287
final AtomicInteger bodyConsumptionCount = new AtomicInteger();
288288
Flux<ByteBuffer> errorBody = Flux.generate(sink -> {
289289
bodyConsumptionCount.incrementAndGet();
290-
sink.next(ByteBuffer.wrap("Should be consumed" .getBytes(StandardCharsets.UTF_8)));
290+
sink.next(ByteBuffer.wrap("Should be consumed".getBytes(StandardCharsets.UTF_8)));
291291
sink.complete();
292292
});
293293

@@ -341,7 +341,8 @@ public Mono<String> getBodyAsString(Charset charset) {
341341
@ParameterizedTest
342342
@MethodSource("getWellKnownRetryDelaySupplier")
343343
public void getWellKnownRetryDelay(HttpHeaders responseHeaders, RetryStrategy retryStrategy, Duration expected) {
344-
assertEquals(expected, RetryPolicy.getWellKnownRetryDelay(responseHeaders, 1, retryStrategy));
344+
assertEquals(expected, RetryPolicy.getWellKnownRetryDelay(responseHeaders, 1, retryStrategy,
345+
OffsetDateTime::now));
345346
}
346347

347348
private static Stream<Arguments> getWellKnownRetryDelaySupplier() {
@@ -381,14 +382,11 @@ private static Stream<Arguments> getWellKnownRetryDelaySupplier() {
381382

382383
@Test
383384
public void retryAfterDateTime() {
384-
HttpHeaders headers = new HttpHeaders().set("Retry-After",
385-
new DateTimeRfc1123(OffsetDateTime.now().plusSeconds(30)).toString());
386-
Duration actual = RetryPolicy.getWellKnownRetryDelay(headers, 1, null);
387-
388-
// Since DateTime based Retry-After uses OffsetDateTime.now internally make sure this result skew isn't larger
389-
// than an allowable bound.
390-
Duration skew = Duration.ofSeconds(30).minus(actual);
391-
assertTrue(skew.getSeconds() < 2);
385+
OffsetDateTime now = OffsetDateTime.now().withNano(0);
386+
HttpHeaders headers = new HttpHeaders().set("Retry-After", new DateTimeRfc1123(now.plusSeconds(30)).toString());
387+
Duration actual = RetryPolicy.getWellKnownRetryDelay(headers, 1, null, () -> now);
388+
389+
assertEquals(Duration.ofSeconds(30), actual);
392390
}
393391

394392
private static RetryStrategy createStatusCodeRetryStrategy(int... retriableErrorCodes) {

0 commit comments

Comments
 (0)