Skip to content

Commit abf3181

Browse files
committed
Use Instant for token expiration time
1 parent a72270e commit abf3181

File tree

7 files changed

+73
-33
lines changed

7 files changed

+73
-33
lines changed

src/main/java/com/rabbitmq/client/amqp/impl/TokenCredentials.java

Lines changed: 36 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.concurrent.atomic.AtomicLong;
3333
import java.util.concurrent.locks.Lock;
3434
import java.util.concurrent.locks.ReentrantLock;
35+
import java.util.function.Function;
3536
import org.slf4j.Logger;
3637
import org.slf4j.LoggerFactory;
3738

@@ -46,11 +47,13 @@ final class TokenCredentials implements Credentials {
4647
private final Map<Long, RegistrationImpl> registrations = new ConcurrentHashMap<>();
4748
private final AtomicLong registrationSequence = new AtomicLong(0);
4849
private final AtomicBoolean schedulingRenewal = new AtomicBoolean(false);
50+
private final Function<Instant, Duration> refreshDelayStrategy;
4951
private volatile ScheduledFuture<?> renewalTask;
5052

5153
TokenCredentials(TokenRequester requester, ScheduledExecutorService scheduledExecutorService) {
5254
this.requester = requester;
5355
this.scheduledExecutorService = scheduledExecutorService;
56+
this.refreshDelayStrategy = new RatioRefreshDelayStrategy(0.8f);
5457
}
5558

5659
private void lock() {
@@ -61,16 +64,8 @@ private void unlock() {
6164
this.lock.unlock();
6265
}
6366

64-
private boolean expiresSoon(Token t) {
65-
// TODO use strategy to tell if the token expires soon
66-
return t.expirationTime() < System.currentTimeMillis() - 20_000;
67-
}
68-
69-
private Duration delayBeforeTokenRenewal(Token token) {
70-
long expiresIn = token.expirationTime() - System.currentTimeMillis();
71-
// TODO use strategy to decide when to renew token
72-
long delay = (long) (expiresIn * 0.8);
73-
return Duration.ofMillis(delay);
67+
private boolean expiresSoon(Token ignores) {
68+
return false;
7469
}
7570

7671
private Token getToken() {
@@ -138,10 +133,7 @@ private void scheduleRenewal(Token t) {
138133
if (this.renewalTask != null) {
139134
this.renewalTask.cancel(false);
140135
}
141-
Duration delay = delayBeforeTokenRenewal(t);
142-
if (delay.isZero() || delay.isNegative()) {
143-
delay = Duration.ofSeconds(1);
144-
}
136+
Duration delay = this.refreshDelayStrategy.apply(t.expirationTime());
145137
if (!this.registrations.isEmpty()) {
146138
LOGGER.debug("Scheduling token retrieval in {}", delay);
147139
this.renewalTask =
@@ -168,8 +160,8 @@ private void scheduleRenewal(Token t) {
168160
}
169161
}
170162

171-
private static String format(long timestampMs) {
172-
return DateTimeFormatter.ISO_INSTANT.format(Instant.ofEpochMilli(timestampMs));
163+
private static String format(Instant instant) {
164+
return DateTimeFormatter.ISO_INSTANT.format(instant);
173165
}
174166

175167
private final class RegistrationImpl implements Registration {
@@ -259,4 +251,32 @@ public int hashCode() {
259251
return Objects.hashCode(id);
260252
}
261253
}
254+
255+
static Function<Instant, Duration> ratioRefreshDelayStrategy(float ratio) {
256+
return new RatioRefreshDelayStrategy(ratio);
257+
}
258+
259+
private static class RatioRefreshDelayStrategy implements Function<Instant, Duration> {
260+
261+
private final float ratio;
262+
263+
private RatioRefreshDelayStrategy(float ratio) {
264+
if (ratio < 0 || ratio > 1) {
265+
throw new IllegalArgumentException("Ratio should be > 0 and <= 1: " + ratio);
266+
}
267+
this.ratio = ratio;
268+
}
269+
270+
@Override
271+
public Duration apply(Instant expirationTime) {
272+
Duration expiresIn = Duration.between(Instant.now(), expirationTime);
273+
Duration delay;
274+
if (expiresIn.isZero() || expiresIn.isNegative()) {
275+
delay = Duration.ofSeconds(1);
276+
} else {
277+
delay = Duration.ofMillis((long) (expiresIn.toMillis() * ratio));
278+
}
279+
return delay;
280+
}
281+
}
262282
}

src/main/java/com/rabbitmq/client/amqp/oauth/GsonTokenParser.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.google.gson.Gson;
2121
import com.google.gson.reflect.TypeToken;
2222
import java.time.Duration;
23+
import java.time.Instant;
2324
import java.util.Map;
2425

2526
public class GsonTokenParser implements TokenParser {
@@ -33,16 +34,17 @@ public Token parse(String tokenAsString) {
3334
String accessToken = (String) tokenAsMap.get("access_token");
3435
// in seconds, see https://www.rfc-editor.org/rfc/rfc6749#section-5.1
3536
Duration expiresIn = Duration.ofSeconds(((Number) tokenAsMap.get("expires_in")).longValue());
36-
long expirationTime = System.currentTimeMillis() + expiresIn.toMillis();
37+
Instant expirationTime =
38+
Instant.ofEpochMilli(System.currentTimeMillis() + expiresIn.toMillis());
3739
return new DefaultTokenInfo(accessToken, expirationTime);
3840
}
3941

4042
private static final class DefaultTokenInfo implements Token {
4143

4244
private final String value;
43-
private final long expirationTime;
45+
private final Instant expirationTime;
4446

45-
private DefaultTokenInfo(String value, long expirationTime) {
47+
private DefaultTokenInfo(String value, Instant expirationTime) {
4648
this.value = value;
4749
this.expirationTime = expirationTime;
4850
}
@@ -53,7 +55,7 @@ public String value() {
5355
}
5456

5557
@Override
56-
public long expirationTime() {
58+
public Instant expirationTime() {
5759
return this.expirationTime;
5860
}
5961
}

src/main/java/com/rabbitmq/client/amqp/oauth/Token.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@
1717
1818
package com.rabbitmq.client.amqp.oauth;
1919

20+
import java.time.Instant;
21+
2022
public interface Token {
2123

2224
String value();
2325

24-
long expirationTime();
26+
Instant expirationTime();
2527
}

src/test/java/com/rabbitmq/client/amqp/impl/JwtTestUtils.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.google.gson.Gson;
2121
import com.google.gson.reflect.TypeToken;
2222
import com.rabbitmq.client.amqp.oauth.Token;
23+
import java.time.Instant;
2324
import java.util.Base64;
2425
import java.util.List;
2526
import java.util.Map;
@@ -87,8 +88,8 @@ public String value() {
8788
}
8889

8990
@Override
90-
public long expirationTime() {
91-
return expirationTime;
91+
public Instant expirationTime() {
92+
return Instant.ofEpochMilli(expirationTime);
9293
}
9394
};
9495
}

src/test/java/com/rabbitmq/client/amqp/impl/TokenCredentialsTest.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import static com.rabbitmq.client.amqp.impl.Assertions.assertThat;
2121
import static com.rabbitmq.client.amqp.impl.TestUtils.sync;
2222
import static com.rabbitmq.client.amqp.impl.TestUtils.waitAtMost;
23+
import static java.time.Duration.ofMillis;
24+
import static java.time.Duration.ofSeconds;
2325
import static java.util.stream.Collectors.toList;
2426
import static java.util.stream.IntStream.range;
2527
import static org.assertj.core.api.Assertions.assertThat;
@@ -29,10 +31,12 @@
2931
import com.rabbitmq.client.amqp.oauth.Token;
3032
import com.rabbitmq.client.amqp.oauth.TokenRequester;
3133
import java.time.Duration;
34+
import java.time.Instant;
3235
import java.util.List;
3336
import java.util.concurrent.Executors;
3437
import java.util.concurrent.ScheduledExecutorService;
3538
import java.util.concurrent.atomic.AtomicInteger;
39+
import java.util.function.Function;
3640
import org.junit.jupiter.api.AfterEach;
3741
import org.junit.jupiter.api.BeforeEach;
3842
import org.junit.jupiter.api.Test;
@@ -59,13 +63,13 @@ void tearDown() throws Exception {
5963

6064
@Test
6165
void refreshShouldStopOnceUnregistered() throws InterruptedException {
62-
Duration tokenExpiry = Duration.ofMillis(50);
66+
Duration tokenExpiry = ofMillis(50);
6367
AtomicInteger requestCount = new AtomicInteger(0);
6468
when(this.requester.request())
6569
.thenAnswer(
6670
ignored -> {
6771
requestCount.incrementAndGet();
68-
return token("ok", System.currentTimeMillis() + tokenExpiry.toMillis());
72+
return token("ok", Instant.now().plus(tokenExpiry));
6973
});
7074
TokenCredentials credentials =
7175
new TokenCredentials(this.requester, this.scheduledExecutorService);
@@ -93,11 +97,11 @@ void refreshShouldStopOnceUnregistered() throws InterruptedException {
9397

9498
@Test
9599
void severalRegistrationsShouldBeRefreshed() throws InterruptedException {
96-
Duration tokenExpiry = Duration.ofMillis(50);
100+
Duration tokenExpiry = ofMillis(50);
97101
Duration waitTime = tokenExpiry.dividedBy(4);
98102
Duration timeout = tokenExpiry.multipliedBy(20);
99103
when(this.requester.request())
100-
.thenAnswer(ignored -> token("ok", System.currentTimeMillis() + tokenExpiry.toMillis()));
104+
.thenAnswer(ignored -> token("ok", Instant.now().plus(tokenExpiry)));
101105
TokenCredentials credentials =
102106
new TokenCredentials(this.requester, this.scheduledExecutorService);
103107
int expectedRefreshCountPerConnection = 3;
@@ -142,15 +146,23 @@ void severalRegistrationsShouldBeRefreshed() throws InterruptedException {
142146
assertThat(totalRefreshCount).hasValue(refreshCountSnapshot + splitCount * 2);
143147
}
144148

145-
private static Token token(String value, long expirationTime) {
149+
@Test
150+
void refreshDelayStrategy() {
151+
Duration diff = ofMillis(100);
152+
Function<Instant, Duration> strategy = TokenCredentials.ratioRefreshDelayStrategy(0.8f);
153+
assertThat(strategy.apply(Instant.now().plusSeconds(10))).isCloseTo(ofSeconds(8), diff);
154+
assertThat(strategy.apply(Instant.now().minusSeconds(10))).isEqualTo(ofSeconds(1));
155+
}
156+
157+
private static Token token(String value, Instant expirationTime) {
146158
return new Token() {
147159
@Override
148160
public String value() {
149161
return value;
150162
}
151163

152164
@Override
153-
public long expirationTime() {
165+
public Instant expirationTime() {
154166
return expirationTime;
155167
}
156168
};

src/test/java/com/rabbitmq/client/amqp/oauth/GsonTokenParserTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,12 @@
2020
import static com.rabbitmq.client.amqp.oauth.OAuthTestUtils.sampleJsonToken;
2121
import static java.time.Duration.ofSeconds;
2222
import static org.assertj.core.api.Assertions.assertThat;
23+
import static org.assertj.core.api.Assertions.within;
2324

2425
import java.time.Duration;
26+
import java.time.Instant;
27+
import java.time.temporal.ChronoUnit;
2528
import java.util.UUID;
26-
import org.assertj.core.data.Offset;
2729
import org.junit.jupiter.api.Test;
2830

2931
public class GsonTokenParserTest {
@@ -38,6 +40,6 @@ void parse() {
3840
Token token = parser.parse(jsonToken);
3941
assertThat(token.value()).isEqualTo(accessToken);
4042
assertThat(token.expirationTime())
41-
.isCloseTo(System.currentTimeMillis() + expireIn.toMillis(), Offset.offset(1000L));
43+
.isCloseTo(Instant.now().plus(expireIn), within(1, ChronoUnit.SECONDS));
4244
}
4345
}

src/test/java/com/rabbitmq/client/amqp/oauth/HttpTokenRequesterTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.io.IOException;
3030
import java.io.OutputStream;
3131
import java.time.Duration;
32+
import java.time.Instant;
3233
import java.util.Arrays;
3334
import java.util.Map;
3435
import java.util.UUID;
@@ -144,8 +145,8 @@ public String value() {
144145
}
145146

146147
@Override
147-
public long expirationTime() {
148-
return 0;
148+
public Instant expirationTime() {
149+
return Instant.EPOCH;
149150
}
150151
}
151152
}

0 commit comments

Comments
 (0)