Skip to content

Commit c5eafdc

Browse files
committed
Handle thread interruption in credentials refresh service
(cherry picked from commit 443a589)
1 parent 5471dad commit c5eafdc

File tree

2 files changed

+88
-4
lines changed

2 files changed

+88
-4
lines changed

src/main/java/com/rabbitmq/client/impl/DefaultCredentialsRefreshService.java

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -275,13 +275,38 @@ void maybeSetRefreshTask(Supplier<ScheduledFuture<?>> scheduledFutureSupplier) {
275275
}
276276

277277
void refresh() {
278-
// FIXME check whether thread has been cancelled or not before refresh() and refreshAction.call()
278+
if (Thread.currentThread().isInterrupted()) {
279+
return;
280+
}
279281

280-
// FIXME protect this call, or at least log some error
281-
this.credentialsProvider.refresh();
282+
int attemptCount = 0;
283+
boolean refreshSucceeded = false;
284+
while (attemptCount < 3) {
285+
LOGGER.debug("Refreshing token for credentials provider {}", credentialsProvider);
286+
try {
287+
this.credentialsProvider.refresh();
288+
LOGGER.debug("Token refreshed for credentials provider {}", credentialsProvider);
289+
refreshSucceeded = true;
290+
break;
291+
} catch (Exception e) {
292+
LOGGER.warn("Error while trying to refresh token: {}", e.getMessage());
293+
}
294+
attemptCount++;
295+
try {
296+
Thread.sleep(1000L);
297+
} catch (InterruptedException e) {
298+
Thread.currentThread().interrupt();
299+
return;
300+
}
301+
}
302+
303+
if (!refreshSucceeded) {
304+
LOGGER.warn("Token refresh failed after retry, aborting callbacks");
305+
return;
306+
}
282307

283308
Iterator<Registration> iterator = registrations.values().iterator();
284-
while (iterator.hasNext()) {
309+
while (iterator.hasNext() && !Thread.currentThread().isInterrupted()) {
285310
Registration registration = iterator.next();
286311
// FIXME set a timeout on the call? (needs a separate thread)
287312
try {
@@ -291,6 +316,8 @@ void refresh() {
291316
iterator.remove();
292317
}
293318
registration.errorHistory.set(0);
319+
} catch (InterruptedException e) {
320+
Thread.currentThread().interrupt();
294321
} catch (Exception e) {
295322
LOGGER.warn("Error while trying to refresh a connection token", e);
296323
registration.errorHistory.incrementAndGet();

src/test/java/com/rabbitmq/client/impl/DefaultCredentialsRefreshServiceTest.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,63 @@ public void refreshActionIsRemovedIfItErrorsTooMuch() throws Exception {
167167
verify(refreshAction, times(callsCountBeforeCancellation)).call();
168168
}
169169

170+
@Test
171+
public void errorInRefreshShouldBeRetried() throws Exception {
172+
DefaultCredentialsRefreshService.CredentialsProviderState state = new DefaultCredentialsRefreshService.CredentialsProviderState(
173+
credentialsProvider
174+
);
175+
doThrow(RuntimeException.class).doThrow(RuntimeException.class)
176+
.doNothing().when(credentialsProvider).refresh();
177+
178+
when(refreshAction.call()).thenReturn(true);
179+
180+
state.add(new DefaultCredentialsRefreshService.Registration("1", refreshAction));
181+
182+
state.refresh();
183+
184+
verify(credentialsProvider, times(3)).refresh();
185+
verify(refreshAction, times(1)).call();
186+
}
187+
188+
@Test
189+
public void callbacksAreNotCalledWhenRetryOnRefreshIsExhausted() throws Exception {
190+
DefaultCredentialsRefreshService.CredentialsProviderState state = new DefaultCredentialsRefreshService.CredentialsProviderState(
191+
credentialsProvider
192+
);
193+
doThrow(RuntimeException.class).when(credentialsProvider).refresh();
194+
195+
state.add(new DefaultCredentialsRefreshService.Registration("1", refreshAction));
196+
197+
state.refresh();
198+
199+
verify(credentialsProvider, times(3)).refresh();
200+
verify(refreshAction, times(0)).call();
201+
}
202+
203+
@Test
204+
public void refreshCanBeInterrupted() throws Exception {
205+
DefaultCredentialsRefreshService.CredentialsProviderState state = new DefaultCredentialsRefreshService.CredentialsProviderState(
206+
credentialsProvider
207+
);
208+
209+
AtomicInteger callbackCount = new AtomicInteger(10);
210+
when(refreshAction.call()).thenAnswer(invocation -> {
211+
callbackCount.decrementAndGet();
212+
Thread.sleep(1000L);
213+
return true;
214+
});
215+
216+
IntStream.range(0, callbackCount.get()).forEach(i -> state.add(new DefaultCredentialsRefreshService.Registration(i + "", refreshAction)));
217+
218+
Thread refreshThread = new Thread(() -> state.refresh());
219+
refreshThread.start();
220+
Thread.sleep(1000L);
221+
refreshThread.interrupt();
222+
refreshThread.join(5000);
223+
assertThat(refreshThread.isAlive()).isFalse();
224+
assertThat(callbackCount).hasValueGreaterThan(1); // not all the callbacks were called, because thread has been cancelled
225+
}
226+
170227
@Test
171228
public void fixedDelayBeforeExpirationRefreshDelayStrategyTest() {
172229
Function<Duration, Duration> delayStrategy = fixedDelayBeforeExpirationRefreshDelayStrategy(ofSeconds(20));

0 commit comments

Comments
 (0)