Skip to content

Commit 5471dad

Browse files
committed
Add ratio-based token refresh delay strategy
(cherry picked from commit 97c8700)
1 parent 0ce9d78 commit 5471dad

File tree

2 files changed

+98
-35
lines changed

2 files changed

+98
-35
lines changed

src/main/java/com/rabbitmq/client/impl/DefaultCredentialsRefreshService.java

Lines changed: 64 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,11 @@ public class DefaultCredentialsRefreshService implements CredentialsRefreshServi
5050

5151
private final boolean privateScheduler;
5252

53-
private final Function<Duration, Long> refreshDelayStrategy;
53+
private final Function<Duration, Duration> refreshDelayStrategy;
5454

5555
private final Function<Duration, Boolean> needRefreshStrategy;
5656

57-
public DefaultCredentialsRefreshService(ScheduledExecutorService scheduler, Function<Duration, Long> refreshDelayStrategy, Function<Duration, Boolean> needRefreshStrategy) {
57+
public DefaultCredentialsRefreshService(ScheduledExecutorService scheduler, Function<Duration, Duration> refreshDelayStrategy, Function<Duration, Boolean> needRefreshStrategy) {
5858
this.refreshDelayStrategy = refreshDelayStrategy;
5959
this.needRefreshStrategy = needRefreshStrategy;
6060
if (scheduler == null) {
@@ -67,43 +67,57 @@ public DefaultCredentialsRefreshService(ScheduledExecutorService scheduler, Func
6767
}
6868

6969
/**
70-
* Delay before refresh is <code>TTL - specified duration</code>.
70+
* Delay before refresh is a ratio of the time before expiration.
7171
* <p>
72-
* E.g. if TTL is 60 seconds and specified duration is 20 seconds, refresh will
72+
* E.g. if time before expiration is 60 seconds and specified ratio is 0.8, refresh will
73+
* be scheduled in 60 x 0.8 = 48 seconds.
74+
*
75+
* @param ratio
76+
* @return the delay before refreshing
77+
*/
78+
public static Function<Duration, Duration> ratioRefreshDelayStrategy(double ratio) {
79+
return new RatioRefreshDelayStrategy(ratio);
80+
}
81+
82+
/**
83+
* Delay before refresh is <code>time before expiration - specified duration</code>.
84+
* <p>
85+
* E.g. if time before expiration is 60 seconds and specified duration is 20 seconds, refresh will
7386
* be scheduled in 60 - 20 = 40 seconds.
7487
*
7588
* @param duration
76-
* @return
89+
* @return the delay before refreshing
7790
*/
78-
public static Function<Duration, Long> fixedDelayBeforeExpirationRefreshDelayStrategy(Duration duration) {
79-
return new FixedDelayBeforeExpirationRefreshDelayStrategy(duration.toMillis());
91+
public static Function<Duration, Duration> fixedDelayBeforeExpirationRefreshDelayStrategy(Duration duration) {
92+
return new FixedDelayBeforeExpirationRefreshDelayStrategy(duration);
8093
}
8194

8295
/**
8396
* Advise to refresh credentials if <code>TTL <= limit</code>.
8497
*
8598
* @param limitBeforeExpiration
86-
* @return
99+
* @return true if credentials should be refreshed, false otherwise
87100
*/
88101
public static Function<Duration, Boolean> fixedTimeNeedRefreshStrategy(Duration limitBeforeExpiration) {
89102
return new FixedTimeNeedRefreshStrategy(limitBeforeExpiration.toMillis());
90103
}
91104

92-
// TODO add a delay refresh strategy that bases the time on a percentage of the TTL, use it as default with 80% TTL
93-
94105
private static Runnable refresh(ScheduledExecutorService scheduler, CredentialsProviderState credentialsProviderState,
95-
Function<Duration, Long> refreshDelayStrategy) {
106+
Function<Duration, Duration> refreshDelayStrategy) {
96107
return () -> {
97108
LOGGER.debug("Refreshing token");
98109
credentialsProviderState.refresh();
99110

100111
Duration timeBeforeExpiration = credentialsProviderState.credentialsProvider.getTimeBeforeExpiration();
112+
Duration newDelay = refreshDelayStrategy.apply(timeBeforeExpiration);
101113

102-
long newDelay = refreshDelayStrategy.apply(timeBeforeExpiration);
114+
LOGGER.debug("Scheduling refresh in {} seconds", newDelay.getSeconds());
103115

104-
LOGGER.debug("Scheduling refresh in {} milliseconds", newDelay);
105-
106-
ScheduledFuture<?> scheduledFuture = scheduler.schedule(refresh(scheduler, credentialsProviderState, refreshDelayStrategy), newDelay, TimeUnit.MILLISECONDS);
116+
ScheduledFuture<?> scheduledFuture = scheduler.schedule(
117+
refresh(scheduler, credentialsProviderState, refreshDelayStrategy),
118+
newDelay.getSeconds(),
119+
TimeUnit.SECONDS
120+
);
107121
credentialsProviderState.refreshTask.set(scheduledFuture);
108122
};
109123
}
@@ -122,9 +136,13 @@ public String register(CredentialsProvider credentialsProvider, Callable<Boolean
122136
credentialsProviderState.add(registration);
123137

124138
credentialsProviderState.maybeSetRefreshTask(() -> {
125-
long delay = refreshDelayStrategy.apply(credentialsProvider.getTimeBeforeExpiration());
126-
LOGGER.debug("Scheduling refresh in {} milliseconds", delay);
127-
return scheduler.schedule(refresh(scheduler, credentialsProviderState, refreshDelayStrategy), delay, TimeUnit.MILLISECONDS);
139+
Duration delay = refreshDelayStrategy.apply(credentialsProvider.getTimeBeforeExpiration());
140+
LOGGER.debug("Scheduling refresh in {} seconds", delay.getSeconds());
141+
return scheduler.schedule(
142+
refresh(scheduler, credentialsProviderState, refreshDelayStrategy),
143+
delay.getSeconds(),
144+
TimeUnit.SECONDS
145+
);
128146
});
129147

130148
return registrationId;
@@ -163,25 +181,42 @@ public Boolean apply(Duration timeBeforeExpiration) {
163181
}
164182
}
165183

166-
private static class FixedDelayBeforeExpirationRefreshDelayStrategy implements Function<Duration, Long> {
184+
private static class FixedDelayBeforeExpirationRefreshDelayStrategy implements Function<Duration, Duration> {
167185

168-
private final long delay;
186+
private final Duration delay;
169187

170-
private FixedDelayBeforeExpirationRefreshDelayStrategy(long delay) {
188+
private FixedDelayBeforeExpirationRefreshDelayStrategy(Duration delay) {
171189
this.delay = delay;
172190
}
173191

174192
@Override
175-
public Long apply(Duration timeBeforeExpiration) {
176-
long refreshTimeBeforeExpiration = timeBeforeExpiration.toMillis() - delay;
177-
if (refreshTimeBeforeExpiration < 0) {
178-
return timeBeforeExpiration.toMillis();
193+
public Duration apply(Duration timeBeforeExpiration) {
194+
Duration refreshTimeBeforeExpiration = timeBeforeExpiration.minus(delay);
195+
if (refreshTimeBeforeExpiration.isNegative()) {
196+
return timeBeforeExpiration;
179197
} else {
180198
return refreshTimeBeforeExpiration;
181199
}
182200
}
183201
}
184202

203+
private static class RatioRefreshDelayStrategy implements Function<Duration, Duration> {
204+
205+
private final double ratio;
206+
207+
private RatioRefreshDelayStrategy(double ratio) {
208+
if (ratio < 0 || ratio > 1) {
209+
throw new IllegalArgumentException("Ratio should be > 0 and <= 1: " + ratio);
210+
}
211+
this.ratio = ratio;
212+
}
213+
214+
@Override
215+
public Duration apply(Duration duration) {
216+
return Duration.ofSeconds((long) ((double) duration.getSeconds() * ratio));
217+
}
218+
}
219+
185220
static class Registration {
186221

187222
private final Callable<Boolean> refreshAction;
@@ -240,7 +275,7 @@ void maybeSetRefreshTask(Supplier<ScheduledFuture<?>> scheduledFutureSupplier) {
240275
}
241276

242277
void refresh() {
243-
// FIXME check whether thread has been cancelled or not before refresh() and registratAction.call()
278+
// FIXME check whether thread has been cancelled or not before refresh() and refreshAction.call()
244279

245280
// FIXME protect this call, or at least log some error
246281
this.credentialsProvider.refresh();
@@ -276,16 +311,16 @@ public static class DefaultCredentialsRefreshServiceBuilder {
276311

277312
private ScheduledExecutorService scheduler;
278313

279-
private Function<Duration, Long> refreshDelayStrategy = fixedDelayBeforeExpirationRefreshDelayStrategy(Duration.ofSeconds(60));
314+
private Function<Duration, Duration> refreshDelayStrategy = ratioRefreshDelayStrategy(0.8);
280315

281-
private Function<Duration, Boolean> needRefreshStrategy = fixedTimeNeedRefreshStrategy(Duration.ofSeconds(60));
316+
private Function<Duration, Boolean> needRefreshStrategy = ttl -> false;
282317

283318
public DefaultCredentialsRefreshServiceBuilder scheduler(ScheduledThreadPoolExecutor scheduler) {
284319
this.scheduler = scheduler;
285320
return this;
286321
}
287322

288-
public DefaultCredentialsRefreshServiceBuilder refreshDelayStrategy(Function<Duration, Long> refreshDelayStrategy) {
323+
public DefaultCredentialsRefreshServiceBuilder refreshDelayStrategy(Function<Duration, Duration> refreshDelayStrategy) {
289324
this.refreshDelayStrategy = refreshDelayStrategy;
290325
return this;
291326
}

src/test/java/com/rabbitmq/client/impl/DefaultCredentialsRefreshServiceTest.java

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,12 @@
2929
import java.util.concurrent.CountDownLatch;
3030
import java.util.concurrent.TimeUnit;
3131
import java.util.concurrent.atomic.AtomicInteger;
32+
import java.util.function.Function;
3233
import java.util.stream.IntStream;
3334

35+
import static com.rabbitmq.client.impl.DefaultCredentialsRefreshService.fixedDelayBeforeExpirationRefreshDelayStrategy;
36+
import static com.rabbitmq.client.impl.DefaultCredentialsRefreshService.fixedTimeNeedRefreshStrategy;
37+
import static java.time.Duration.ofSeconds;
3438
import static org.assertj.core.api.Assertions.assertThat;
3539
import static org.mockito.Mockito.*;
3640

@@ -55,13 +59,13 @@ public void tearDown() {
5559
@Test
5660
public void scheduling() throws Exception {
5761
refreshService = new DefaultCredentialsRefreshService.DefaultCredentialsRefreshServiceBuilder()
58-
.refreshDelayStrategy(DefaultCredentialsRefreshService.fixedDelayBeforeExpirationRefreshDelayStrategy(Duration.ofSeconds(2)))
62+
.refreshDelayStrategy(fixedDelayBeforeExpirationRefreshDelayStrategy(ofSeconds(2)))
5963
.build();
6064

6165
AtomicInteger passwordSequence = new AtomicInteger(0);
6266
when(credentialsProvider.getPassword()).thenAnswer(
6367
(Answer<String>) invocation -> "password-" + passwordSequence.get());
64-
when(credentialsProvider.getTimeBeforeExpiration()).thenAnswer((Answer<Duration>) invocation -> Duration.ofSeconds(5));
68+
when(credentialsProvider.getTimeBeforeExpiration()).thenAnswer((Answer<Duration>) invocation -> ofSeconds(5));
6569
doAnswer(invocation -> {
6670
passwordSequence.incrementAndGet();
6771
return null;
@@ -82,13 +86,12 @@ public void scheduling() throws Exception {
8286
AtomicInteger passwordSequence2 = new AtomicInteger(0);
8387
CredentialsProvider credentialsProvider2 = mock(CredentialsProvider.class);
8488
when(credentialsProvider2.getPassword()).thenAnswer((Answer<String>) invocation -> "password2-" + passwordSequence2.get());
85-
when(credentialsProvider2.getTimeBeforeExpiration()).thenAnswer((Answer<Duration>) invocation -> Duration.ofSeconds(4));
89+
when(credentialsProvider2.getTimeBeforeExpiration()).thenAnswer((Answer<Duration>) invocation -> ofSeconds(4));
8690
doAnswer(invocation -> {
8791
passwordSequence2.incrementAndGet();
8892
return null;
8993
}).when(credentialsProvider2).refresh();
9094

91-
9295
List<String> passwords2 = new CopyOnWriteArrayList<>();
9396
CountDownLatch latch2 = new CountDownLatch(2 * 1);
9497
refreshAction = () -> {
@@ -104,8 +107,6 @@ public void scheduling() throws Exception {
104107
"password2-1", "password2-2"
105108
);
106109
assertThat(passwords).hasSizeGreaterThan(4);
107-
108-
109110
}
110111

111112
@Test
@@ -166,4 +167,31 @@ public void refreshActionIsRemovedIfItErrorsTooMuch() throws Exception {
166167
verify(refreshAction, times(callsCountBeforeCancellation)).call();
167168
}
168169

170+
@Test
171+
public void fixedDelayBeforeExpirationRefreshDelayStrategyTest() {
172+
Function<Duration, Duration> delayStrategy = fixedDelayBeforeExpirationRefreshDelayStrategy(ofSeconds(20));
173+
assertThat(delayStrategy.apply(ofSeconds(60))).as("refresh delay is TTL - fixed delay").isEqualTo(ofSeconds(40));
174+
assertThat(delayStrategy.apply(ofSeconds(10))).as("refresh delay is TTL if TTL < fixed delay").isEqualTo(ofSeconds(10));
175+
}
176+
177+
@Test
178+
public void fixedTimeNeedRefreshStrategyTest() {
179+
Function<Duration, Boolean> refreshStrategy = fixedTimeNeedRefreshStrategy(ofSeconds(20));
180+
assertThat(refreshStrategy.apply(ofSeconds(60))).isFalse();
181+
assertThat(refreshStrategy.apply(ofSeconds(20))).isTrue();
182+
assertThat(refreshStrategy.apply(ofSeconds(19))).isTrue();
183+
assertThat(refreshStrategy.apply(ofSeconds(10))).isTrue();
184+
}
185+
186+
@Test
187+
public void ratioRefreshDelayStrategyTest() {
188+
Function<Duration, Duration> delayStrategy = DefaultCredentialsRefreshService.ratioRefreshDelayStrategy(0.8);
189+
assertThat(delayStrategy.apply(ofSeconds(60))).isEqualTo(ofSeconds(48));
190+
assertThat(delayStrategy.apply(ofSeconds(30))).isEqualTo(ofSeconds(24));
191+
assertThat(delayStrategy.apply(ofSeconds(10))).isEqualTo(ofSeconds(8));
192+
assertThat(delayStrategy.apply(ofSeconds(5))).isEqualTo(ofSeconds(4));
193+
assertThat(delayStrategy.apply(ofSeconds(2))).isEqualTo(ofSeconds(1));
194+
assertThat(delayStrategy.apply(ofSeconds(1))).isEqualTo(ofSeconds(0));
195+
}
196+
169197
}

0 commit comments

Comments
 (0)