Skip to content

Commit 6479b9f

Browse files
authored
Merge pull request #1905 from yue9944882/cherrypick-13-handle-stale-watch-event
Cherrypick-13: Bugfix: stale watch event should be handled properly
2 parents e45408a + c6cebf7 commit 6479b9f

File tree

3 files changed

+57
-26
lines changed

3 files changed

+57
-26
lines changed

util/src/main/java/io/kubernetes/client/informer/cache/ReflectorRunnable.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import io.kubernetes.client.common.KubernetesObject;
1717
import io.kubernetes.client.informer.EventType;
1818
import io.kubernetes.client.informer.ListerWatcher;
19+
import io.kubernetes.client.informer.exception.WatchExpiredException;
1920
import io.kubernetes.client.openapi.ApiException;
2021
import io.kubernetes.client.openapi.models.V1ListMeta;
2122
import io.kubernetes.client.openapi.models.V1ObjectMeta;
@@ -131,6 +132,12 @@ public void run() {
131132
watch = newWatch;
132133
}
133134
watchHandler(newWatch);
135+
} catch (WatchExpiredException e) {
136+
// Watch calls were failed due to expired resource-version. Returning
137+
// to unwind the list-watch loops so that we can respawn a new round
138+
// of list-watching.
139+
log.info("{}#Watch expired, will re-list-watch soon", this.apiTypeClass);
140+
return;
134141
} catch (Throwable t) {
135142
if (isConnectException(t)) {
136143
// If this is "connection refused" error, it means that most likely
@@ -150,8 +157,10 @@ public void run() {
150157
continue;
151158
}
152159
if ((t instanceof RuntimeException)
160+
&& t.getMessage() != null
153161
&& t.getMessage().contains("IO Exception during hasNext")) {
154162
log.info("{}#Read timeout retry list and watch", this.apiTypeClass);
163+
// IO timeout should be taken as a normal case
155164
return;
156165
}
157166
this.exceptionHandler.accept(apiTypeClass, t);
@@ -233,7 +242,7 @@ private void watchHandler(Watchable<ApiType> watch) {
233242
if (eventType.get() == EventType.ERROR) {
234243
if (item.status != null && item.status.getCode() == HttpURLConnection.HTTP_GONE) {
235244
log.info("Watch connection expired: {}", item.status.getMessage());
236-
return;
245+
throw new WatchExpiredException();
237246
} else {
238247
String errorMessage =
239248
String.format("got ERROR event and its status: %s", item.status.toString());
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
/*
2+
Copyright 2021 The Kubernetes Authors.
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
package io.kubernetes.client.informer.exception;
14+
15+
/** A ERROR watch item was returned due to stale resource version. */
16+
public class WatchExpiredException extends RuntimeException {}

util/src/test/java/io/kubernetes/client/informer/cache/ReflectorRunnableTest.java

Lines changed: 31 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import static org.junit.Assert.assertFalse;
1616
import static org.mockito.Mockito.any;
17+
import static org.mockito.Mockito.mock;
1718
import static org.mockito.Mockito.never;
1819
import static org.mockito.Mockito.times;
1920
import static org.mockito.Mockito.verify;
@@ -31,6 +32,7 @@
3132
import io.kubernetes.client.util.Watchable;
3233
import java.net.HttpURLConnection;
3334
import java.time.Duration;
35+
import java.util.concurrent.CompletableFuture;
3436
import java.util.concurrent.atomic.AtomicReference;
3537
import org.awaitility.Awaitility;
3638
import org.hamcrest.core.IsEqual;
@@ -279,31 +281,35 @@ public void testReflectorListShouldHandleExpiredResourceVersion() throws ApiExce
279281
when(listerWatcher.watch(any())).thenThrow(new ApiException(HttpURLConnection.HTTP_GONE, ""));
280282
ReflectorRunnable<V1Pod, V1PodList> reflectorRunnable =
281283
new ReflectorRunnable<>(V1Pod.class, listerWatcher, deltaFIFO);
282-
try {
283-
Thread thread = new Thread(reflectorRunnable::run);
284-
thread.setDaemon(true);
285-
thread.start();
286-
Awaitility.await()
287-
.atMost(Duration.ofSeconds(1))
288-
.pollInterval(Duration.ofMillis(100))
289-
.until(
290-
() -> expectedResourceVersion.equals(reflectorRunnable.getLastSyncResourceVersion()));
291-
assertFalse(reflectorRunnable.isLastSyncResourceVersionUnavailable());
292-
} finally {
293-
reflectorRunnable.stop();
294-
}
284+
CompletableFuture<Void> future = CompletableFuture.runAsync(reflectorRunnable::run);
285+
Awaitility.await()
286+
.atMost(Duration.ofSeconds(2))
287+
.pollInterval(Duration.ofMillis(100))
288+
.until(() -> future.isDone());
289+
assertFalse(future.isCompletedExceptionally());
290+
}
295291

296-
try {
297-
when(listerWatcher.list(any())).thenThrow(new ApiException(HttpURLConnection.HTTP_GONE, ""));
298-
Thread thread = new Thread(reflectorRunnable::run);
299-
thread.setDaemon(true);
300-
thread.start();
301-
Awaitility.await()
302-
.atMost(Duration.ofSeconds(5))
303-
.pollInterval(Duration.ofMillis(100))
304-
.until(() -> reflectorRunnable.isLastSyncResourceVersionUnavailable());
305-
} finally {
306-
reflectorRunnable.stop();
307-
}
292+
@Test
293+
public void testReflectorWatchShouldRelistUponExpiredWatchItem() throws ApiException {
294+
String expectedResourceVersion = "100";
295+
Watchable<V1Pod> expiredWatchable = mock(Watchable.class);
296+
when(expiredWatchable.hasNext()).thenReturn(true);
297+
when(expiredWatchable.next())
298+
.thenReturn(
299+
new Watch.Response<>(
300+
EventType.ERROR.name(), new V1Status().code(HttpURLConnection.HTTP_GONE)));
301+
when(listerWatcher.list(any()))
302+
.thenReturn(
303+
new V1PodList().metadata(new V1ListMeta().resourceVersion(expectedResourceVersion)));
304+
// constantly failing watches will make the reflector run only one time
305+
when(listerWatcher.watch(any())).thenReturn(expiredWatchable);
306+
ReflectorRunnable<V1Pod, V1PodList> reflectorRunnable =
307+
new ReflectorRunnable<>(V1Pod.class, listerWatcher, deltaFIFO);
308+
CompletableFuture<Void> future = CompletableFuture.runAsync(reflectorRunnable::run);
309+
Awaitility.await()
310+
.atMost(Duration.ofSeconds(2))
311+
.pollInterval(Duration.ofMillis(100))
312+
.until(() -> future.isDone());
313+
assertFalse(future.isCompletedExceptionally());
308314
}
309315
}

0 commit comments

Comments
 (0)