Skip to content
This repository was archived by the owner on Sep 26, 2023. It is now read-only.

Commit 89e55e4

Browse files
authored
fix: Watchdog controls lifecycle of the future, not executor (#1890) (#1930)
1 parent f7caf21 commit 89e55e4

File tree

3 files changed

+87
-11
lines changed

3 files changed

+87
-11
lines changed

gax/src/main/java/com/google/api/gax/rpc/Watchdog.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,11 @@
3636
import java.util.Map.Entry;
3737
import java.util.concurrent.CancellationException;
3838
import java.util.concurrent.ConcurrentHashMap;
39+
import java.util.concurrent.ExecutionException;
3940
import java.util.concurrent.ScheduledExecutorService;
4041
import java.util.concurrent.ScheduledFuture;
4142
import java.util.concurrent.TimeUnit;
43+
import java.util.concurrent.TimeoutException;
4244
import java.util.logging.Level;
4345
import java.util.logging.Logger;
4446
import javax.annotation.Nonnull;
@@ -61,6 +63,7 @@
6163
* </ul>
6264
*/
6365
public final class Watchdog implements Runnable, BackgroundResource {
66+
6467
private static final Logger LOG = Logger.getLogger(Watchdog.class.getName());
6568

6669
// Dummy value to convert the ConcurrentHashMap into a Set
@@ -138,12 +141,12 @@ public void shutdown() {
138141

139142
@Override
140143
public boolean isShutdown() {
141-
return executor.isShutdown();
144+
return future.isCancelled();
142145
}
143146

144147
@Override
145148
public boolean isTerminated() {
146-
return executor.isTerminated();
149+
return future.isDone();
147150
}
148151

149152
@Override
@@ -153,7 +156,14 @@ public void shutdownNow() {
153156

154157
@Override
155158
public boolean awaitTermination(long duration, TimeUnit unit) throws InterruptedException {
156-
return executor.awaitTermination(duration, unit);
159+
try {
160+
future.get(duration, unit);
161+
return true;
162+
} catch (ExecutionException | CancellationException e) {
163+
return true;
164+
} catch (TimeoutException e) {
165+
return false;
166+
}
157167
}
158168

159169
@Override

gax/src/main/java/com/google/api/gax/rpc/WatchdogProvider.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,5 +49,6 @@ public interface WatchdogProvider {
4949

5050
Watchdog getWatchdog();
5151

52+
/** Return true if the watchdog should be automatically unscheduled. */
5253
boolean shouldAutoClose();
5354
}

gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java

Lines changed: 73 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import java.util.concurrent.ScheduledExecutorService;
4545
import java.util.concurrent.ScheduledFuture;
4646
import java.util.concurrent.TimeUnit;
47+
import java.util.concurrent.TimeoutException;
4748
import org.junit.Before;
4849
import org.junit.Test;
4950
import org.junit.runner.RunWith;
@@ -195,14 +196,7 @@ public void testMultiple() throws Exception {
195196
@SuppressWarnings("unchecked")
196197
public void testWatchdogBeingClosed() {
197198
ScheduledFuture future = Mockito.mock(ScheduledFuture.class);
198-
ScheduledExecutorService mockExecutor = Mockito.mock(ScheduledExecutorService.class);
199-
Mockito.when(
200-
mockExecutor.scheduleAtFixedRate(
201-
Mockito.any(Watchdog.class),
202-
Mockito.anyLong(),
203-
Mockito.anyLong(),
204-
Mockito.any(TimeUnit.class)))
205-
.thenReturn(future);
199+
ScheduledExecutorService mockExecutor = getMockExecutorService(future);
206200
Watchdog underTest = Watchdog.create(clock, checkInterval, mockExecutor);
207201
assertThat(underTest).isInstanceOf(BackgroundResource.class);
208202

@@ -219,6 +213,77 @@ public void testWatchdogBeingClosed() {
219213
Mockito.verifyNoMoreInteractions(mockExecutor);
220214
}
221215

216+
@Test
217+
public void awaitTermination_shouldReturnTrueIfFutureIsDone() throws Exception {
218+
int duration = 1000;
219+
TimeUnit timeUnit = TimeUnit.MILLISECONDS;
220+
ScheduledFuture future = Mockito.mock(ScheduledFuture.class);
221+
ScheduledExecutorService mockExecutor = getMockExecutorService(future);
222+
Watchdog watchdog = Watchdog.create(clock, checkInterval, mockExecutor);
223+
watchdog.shutdown();
224+
225+
boolean actual = watchdog.awaitTermination(duration, timeUnit);
226+
227+
assertThat(actual).isTrue();
228+
}
229+
230+
@Test
231+
public void awaitTermination_shouldReturnFalseIfGettingFutureTimedOut() throws Exception {
232+
int duration = 1000;
233+
TimeUnit timeUnit = TimeUnit.MILLISECONDS;
234+
ScheduledFuture future = Mockito.mock(ScheduledFuture.class);
235+
Mockito.doThrow(new TimeoutException()).when(future).get(duration, timeUnit);
236+
ScheduledExecutorService mockExecutor = getMockExecutorService(future);
237+
Watchdog watchdog = Watchdog.create(clock, checkInterval, mockExecutor);
238+
239+
boolean actual = watchdog.awaitTermination(duration, timeUnit);
240+
241+
assertThat(actual).isFalse();
242+
}
243+
244+
@Test
245+
public void awaitTermination_shouldReturnTrueIfFutureIsAlreadyCancelled() throws Exception {
246+
int duration = 1000;
247+
TimeUnit timeUnit = TimeUnit.MILLISECONDS;
248+
ScheduledFuture future = Mockito.mock(ScheduledFuture.class);
249+
Mockito.doThrow(new CancellationException()).when(future).get(duration, timeUnit);
250+
ScheduledExecutorService mockExecutor = getMockExecutorService(future);
251+
Watchdog watchdog = Watchdog.create(clock, checkInterval, mockExecutor);
252+
253+
boolean actual = watchdog.awaitTermination(duration, timeUnit);
254+
255+
assertThat(actual).isTrue();
256+
}
257+
258+
@Test
259+
public void awaitTermination_shouldReturnFalseIfGettingFutureThrowsExecutionException()
260+
throws Exception {
261+
int duration = 1000;
262+
TimeUnit timeUnit = TimeUnit.MILLISECONDS;
263+
ScheduledFuture future = Mockito.mock(ScheduledFuture.class);
264+
Mockito.doThrow(new ExecutionException(new RuntimeException()))
265+
.when(future)
266+
.get(duration, timeUnit);
267+
ScheduledExecutorService mockExecutor = getMockExecutorService(future);
268+
Watchdog watchdog = Watchdog.create(clock, checkInterval, mockExecutor);
269+
270+
boolean actual = watchdog.awaitTermination(duration, timeUnit);
271+
272+
assertThat(actual).isTrue();
273+
}
274+
275+
private ScheduledExecutorService getMockExecutorService(ScheduledFuture future) {
276+
ScheduledExecutorService mockExecutor = Mockito.mock(ScheduledExecutorService.class);
277+
Mockito.when(
278+
mockExecutor.scheduleAtFixedRate(
279+
Mockito.any(Watchdog.class),
280+
Mockito.anyLong(),
281+
Mockito.anyLong(),
282+
Mockito.any(TimeUnit.class)))
283+
.thenReturn(future);
284+
return mockExecutor;
285+
}
286+
222287
static class AccumulatingObserver<T> implements ResponseObserver<T> {
223288
SettableApiFuture<StreamController> controller = SettableApiFuture.create();
224289
Queue<T> responses = Queues.newLinkedBlockingDeque();

0 commit comments

Comments
 (0)