|
14 | 14 |
|
15 | 15 | import static org.junit.Assert.assertFalse;
|
16 | 16 | import static org.mockito.Mockito.any;
|
| 17 | +import static org.mockito.Mockito.mock; |
17 | 18 | import static org.mockito.Mockito.never;
|
18 | 19 | import static org.mockito.Mockito.times;
|
19 | 20 | import static org.mockito.Mockito.verify;
|
|
31 | 32 | import io.kubernetes.client.util.Watchable;
|
32 | 33 | import java.net.HttpURLConnection;
|
33 | 34 | import java.time.Duration;
|
| 35 | +import java.util.concurrent.CompletableFuture; |
34 | 36 | import java.util.concurrent.atomic.AtomicReference;
|
35 | 37 | import org.awaitility.Awaitility;
|
36 | 38 | import org.hamcrest.core.IsEqual;
|
@@ -279,31 +281,35 @@ public void testReflectorListShouldHandleExpiredResourceVersion() throws ApiExce
|
279 | 281 | when(listerWatcher.watch(any())).thenThrow(new ApiException(HttpURLConnection.HTTP_GONE, ""));
|
280 | 282 | ReflectorRunnable<V1Pod, V1PodList> reflectorRunnable =
|
281 | 283 | new ReflectorRunnable<>(V1Pod.class, listerWatcher, deltaFIFO);
|
282 |
| - try { |
283 |
| - Thread thread = new Thread(reflectorRunnable::run); |
284 |
| - thread.setDaemon(true); |
285 |
| - thread.start(); |
286 |
| - Awaitility.await() |
287 |
| - .atMost(Duration.ofSeconds(1)) |
288 |
| - .pollInterval(Duration.ofMillis(100)) |
289 |
| - .until( |
290 |
| - () -> expectedResourceVersion.equals(reflectorRunnable.getLastSyncResourceVersion())); |
291 |
| - assertFalse(reflectorRunnable.isLastSyncResourceVersionUnavailable()); |
292 |
| - } finally { |
293 |
| - reflectorRunnable.stop(); |
294 |
| - } |
| 284 | + CompletableFuture<Void> future = CompletableFuture.runAsync(reflectorRunnable::run); |
| 285 | + Awaitility.await() |
| 286 | + .atMost(Duration.ofSeconds(2)) |
| 287 | + .pollInterval(Duration.ofMillis(100)) |
| 288 | + .until(() -> future.isDone()); |
| 289 | + assertFalse(future.isCompletedExceptionally()); |
| 290 | + } |
295 | 291 |
|
296 |
| - try { |
297 |
| - when(listerWatcher.list(any())).thenThrow(new ApiException(HttpURLConnection.HTTP_GONE, "")); |
298 |
| - Thread thread = new Thread(reflectorRunnable::run); |
299 |
| - thread.setDaemon(true); |
300 |
| - thread.start(); |
301 |
| - Awaitility.await() |
302 |
| - .atMost(Duration.ofSeconds(5)) |
303 |
| - .pollInterval(Duration.ofMillis(100)) |
304 |
| - .until(() -> reflectorRunnable.isLastSyncResourceVersionUnavailable()); |
305 |
| - } finally { |
306 |
| - reflectorRunnable.stop(); |
307 |
| - } |
| 292 | + @Test |
| 293 | + public void testReflectorWatchShouldRelistUponExpiredWatchItem() throws ApiException { |
| 294 | + String expectedResourceVersion = "100"; |
| 295 | + Watchable<V1Pod> expiredWatchable = mock(Watchable.class); |
| 296 | + when(expiredWatchable.hasNext()).thenReturn(true); |
| 297 | + when(expiredWatchable.next()) |
| 298 | + .thenReturn( |
| 299 | + new Watch.Response<>( |
| 300 | + EventType.ERROR.name(), new V1Status().code(HttpURLConnection.HTTP_GONE))); |
| 301 | + when(listerWatcher.list(any())) |
| 302 | + .thenReturn( |
| 303 | + new V1PodList().metadata(new V1ListMeta().resourceVersion(expectedResourceVersion))); |
| 304 | + // constantly failing watches will make the reflector run only one time |
| 305 | + when(listerWatcher.watch(any())).thenReturn(expiredWatchable); |
| 306 | + ReflectorRunnable<V1Pod, V1PodList> reflectorRunnable = |
| 307 | + new ReflectorRunnable<>(V1Pod.class, listerWatcher, deltaFIFO); |
| 308 | + CompletableFuture<Void> future = CompletableFuture.runAsync(reflectorRunnable::run); |
| 309 | + Awaitility.await() |
| 310 | + .atMost(Duration.ofSeconds(2)) |
| 311 | + .pollInterval(Duration.ofMillis(100)) |
| 312 | + .until(() -> future.isDone()); |
| 313 | + assertFalse(future.isCompletedExceptionally()); |
308 | 314 | }
|
309 | 315 | }
|
0 commit comments