Skip to content

Commit 1be31bd

Browse files
authored
fix: update DefaultRetryContext to trap and forward RejectedExceptionException to onFailure (googleapis#3327)
1 parent a5808ea commit 1be31bd

File tree

3 files changed

+59
-11
lines changed

3 files changed

+59
-11
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultRetryContext.java

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.LinkedList;
2828
import java.util.List;
2929
import java.util.Locale;
30+
import java.util.concurrent.RejectedExecutionException;
3031
import java.util.concurrent.ScheduledExecutorService;
3132
import java.util.concurrent.ScheduledFuture;
3233
import java.util.concurrent.TimeUnit;
@@ -142,17 +143,26 @@ public <T extends Throwable> void recordError(T t, OnSuccess onSuccess, OnFailur
142143
BackoffDuration backoffDuration = (BackoffDuration) nextBackoff;
143144

144145
lastBackoffResult = nextBackoff;
145-
pendingBackoff =
146-
scheduledExecutorService.schedule(
147-
() -> {
148-
try {
149-
onSuccess.onSuccess();
150-
} finally {
151-
clearPendingBackoff();
152-
}
153-
},
154-
backoffDuration.getDuration().toNanos(),
155-
TimeUnit.NANOSECONDS);
146+
try {
147+
pendingBackoff =
148+
scheduledExecutorService.schedule(
149+
() -> {
150+
try {
151+
onSuccess.onSuccess();
152+
} finally {
153+
clearPendingBackoff();
154+
}
155+
},
156+
backoffDuration.getDuration().toNanos(),
157+
TimeUnit.NANOSECONDS);
158+
} catch (RejectedExecutionException e) {
159+
InterruptedBackoffComment comment =
160+
new InterruptedBackoffComment(
161+
"Interrupted backoff -- unretryable error due to executor service shutdown");
162+
comment.addSuppressed(e);
163+
t.addSuppressed(comment);
164+
onFailure.onFailure(t);
165+
}
156166
} else {
157167
String msg =
158168
String.format(

google-cloud-storage/src/main/java/com/google/cloud/storage/RetryContext.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package com.google.cloud.storage;
1818

19+
import static java.util.Objects.requireNonNull;
20+
1921
import com.google.api.client.util.Sleeper;
2022
import com.google.api.core.ApiClock;
2123
import com.google.api.core.ApiFuture;
@@ -40,6 +42,7 @@
4042
import java.util.concurrent.ScheduledFuture;
4143
import java.util.concurrent.TimeUnit;
4244
import java.util.concurrent.TimeoutException;
45+
import org.checkerframework.checker.nullness.qual.NonNull;
4346

4447
@InternalApi
4548
@InternalExtensionOnly
@@ -131,6 +134,16 @@ static BackoffComment of(String message) {
131134
}
132135
}
133136

137+
final class InterruptedBackoffComment extends Throwable {
138+
InterruptedBackoffComment(@NonNull String message) {
139+
super(
140+
requireNonNull(message, "message must be non null"),
141+
/* cause= */ null,
142+
/* enableSuppression= */ true,
143+
/* writableStackTrace= */ false);
144+
}
145+
}
146+
134147
final class DirectScheduledExecutorService implements ScheduledExecutorService {
135148
private static final DirectScheduledExecutorService INSTANCE =
136149
new DirectScheduledExecutorService(Sleeper.DEFAULT, NanoClock.getDefaultClock());

google-cloud-storage/src/test/java/com/google/cloud/storage/RetryContextTest.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@
1818

1919
import static com.google.cloud.storage.TestUtils.assertAll;
2020
import static com.google.common.truth.Truth.assertThat;
21+
import static org.mockito.ArgumentMatchers.any;
22+
import static org.mockito.ArgumentMatchers.anyLong;
23+
import static org.mockito.Mockito.mock;
24+
import static org.mockito.Mockito.when;
2125

2226
import com.google.api.core.ApiClock;
2327
import com.google.api.core.NanoClock;
@@ -31,6 +35,8 @@
3135
import com.google.cloud.RetryHelper;
3236
import com.google.cloud.RetryHelper.RetryHelperException;
3337
import com.google.cloud.storage.Backoff.Jitterer;
38+
import com.google.cloud.storage.RetryContext.BackoffComment;
39+
import com.google.cloud.storage.RetryContext.InterruptedBackoffComment;
3440
import com.google.cloud.storage.RetryContext.OnFailure;
3541
import com.google.cloud.storage.RetryContext.OnSuccess;
3642
import com.google.cloud.storage.Retrying.RetryingDependencies;
@@ -42,6 +48,7 @@
4248
import java.util.List;
4349
import java.util.concurrent.CountDownLatch;
4450
import java.util.concurrent.Executors;
51+
import java.util.concurrent.RejectedExecutionException;
4552
import java.util.concurrent.ScheduledExecutorService;
4653
import java.util.concurrent.TimeUnit;
4754
import java.util.concurrent.atomic.AtomicBoolean;
@@ -423,6 +430,24 @@ public void resetAlsoResetsBackoffState() throws Exception {
423430
});
424431
}
425432

433+
@Test
434+
public void rejectedExecutionException_funneledToOnFailureHandlerAsSuppressedException() {
435+
ScheduledExecutorService exec = mock(ScheduledExecutorService.class);
436+
RejectedExecutionException alreadyShutdown = new RejectedExecutionException("already shutdown");
437+
when(exec.schedule(any(Runnable.class), anyLong(), any())).thenThrow(alreadyShutdown);
438+
Throwable t1 = new RuntimeException("{err1}", new Throwable("{err1Cause}"));
439+
RetryContext ctx =
440+
RetryContext.of(exec, maxAttempts(2), Retrying.alwaysRetry(), Jitterer.noJitter());
441+
442+
AtomicReference<Throwable> err1 = new AtomicReference<>();
443+
ctx.recordError(t1, failOnSuccess(), err1::set);
444+
Throwable t = err1.get();
445+
assertThat(t).isNotNull();
446+
assertThat(t.getSuppressed()[0]).isInstanceOf(BackoffComment.class);
447+
assertThat(t.getSuppressed()[1]).isInstanceOf(InterruptedBackoffComment.class);
448+
assertThat(t.getSuppressed()[1].getSuppressed()[0]).isSameInstanceAs(alreadyShutdown);
449+
}
450+
426451
private static ApiException apiException(Code code, String message) {
427452
return ApiExceptionFactory.createException(message, null, GrpcStatusCode.of(code), false);
428453
}

0 commit comments

Comments
 (0)