Skip to content

Commit 6e35257

Browse files
authored
Merge pull request #2026 from cbuschka/release-14
Cherry pick for #1802 and #1629 to release 14
2 parents 134aacc + 4e609b4 commit 6e35257

File tree

10 files changed

+576
-352
lines changed

10 files changed

+576
-352
lines changed

util/src/main/java/io/kubernetes/client/informer/SharedInformerFactory.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import com.google.gson.reflect.TypeToken;
1616
import io.kubernetes.client.common.KubernetesListObject;
1717
import io.kubernetes.client.common.KubernetesObject;
18+
import io.kubernetes.client.informer.cache.Cache;
1819
import io.kubernetes.client.informer.impl.DefaultSharedIndexInformer;
1920
import io.kubernetes.client.openapi.ApiClient;
2021
import io.kubernetes.client.openapi.ApiException;
@@ -32,6 +33,7 @@
3233
import java.util.concurrent.ExecutorService;
3334
import java.util.concurrent.Executors;
3435
import java.util.concurrent.Future;
36+
import java.util.function.BiConsumer;
3537
import okhttp3.Call;
3638
import org.apache.commons.collections4.MapUtils;
3739

@@ -140,8 +142,19 @@ SharedIndexInformer<ApiType> sharedIndexInformerFor(
140142
ListerWatcher<ApiType, ApiListType> listerWatcher,
141143
Class<ApiType> apiTypeClass,
142144
long resyncPeriodInMillis) {
145+
return sharedIndexInformerFor(listerWatcher, apiTypeClass, resyncPeriodInMillis, null);
146+
}
147+
148+
public synchronized <ApiType extends KubernetesObject, ApiListType extends KubernetesListObject>
149+
SharedIndexInformer<ApiType> sharedIndexInformerFor(
150+
ListerWatcher<ApiType, ApiListType> listerWatcher,
151+
Class<ApiType> apiTypeClass,
152+
long resyncPeriodInMillis,
153+
BiConsumer<Class<ApiType>, Throwable> exceptionHandler) {
154+
143155
SharedIndexInformer<ApiType> informer =
144-
new DefaultSharedIndexInformer<>(apiTypeClass, listerWatcher, resyncPeriodInMillis);
156+
new DefaultSharedIndexInformer<>(
157+
apiTypeClass, listerWatcher, resyncPeriodInMillis, new Cache<>(), exceptionHandler);
145158
this.informers.putIfAbsent(TypeToken.get(apiTypeClass).getType(), informer);
146159
return informer;
147160
}

util/src/main/java/io/kubernetes/client/informer/cache/Controller.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.concurrent.ScheduledExecutorService;
2424
import java.util.concurrent.ScheduledFuture;
2525
import java.util.concurrent.TimeUnit;
26+
import java.util.function.BiConsumer;
2627
import java.util.function.Consumer;
2728
import java.util.function.Supplier;
2829
import org.apache.commons.lang3.tuple.MutablePair;
@@ -65,19 +66,34 @@ public class Controller<
6566

6667
private ScheduledFuture reflectorFuture;
6768

69+
/* visible for testing */ BiConsumer<Class<ApiType>, Throwable> exceptionHandler;
70+
6871
public Controller(
6972
Class<ApiType> apiTypeClass,
7073
DeltaFIFO queue,
7174
ListerWatcher<ApiType, ApiListType> listerWatcher,
7275
Consumer<Deque<MutablePair<DeltaFIFO.DeltaType, KubernetesObject>>> processFunc,
7376
Supplier<Boolean> resyncFunc,
7477
long fullResyncPeriod) {
78+
this(apiTypeClass, queue, listerWatcher, processFunc, resyncFunc, fullResyncPeriod, null);
79+
}
80+
81+
public Controller(
82+
Class<ApiType> apiTypeClass,
83+
DeltaFIFO queue,
84+
ListerWatcher<ApiType, ApiListType> listerWatcher,
85+
Consumer<Deque<MutablePair<DeltaFIFO.DeltaType, KubernetesObject>>> processFunc,
86+
Supplier<Boolean> resyncFunc,
87+
long fullResyncPeriod,
88+
BiConsumer<Class<ApiType>, Throwable> exceptionHandler) {
89+
7590
this.queue = queue;
7691
this.listerWatcher = listerWatcher;
7792
this.apiTypeClass = apiTypeClass;
7893
this.processFunc = processFunc;
7994
this.resyncFunc = resyncFunc;
8095
this.fullResyncPeriod = fullResyncPeriod;
96+
this.exceptionHandler = exceptionHandler;
8197

8298
// starts one daemon thread for reflector
8399
this.reflectExecutor =
@@ -113,7 +129,7 @@ public void run() {
113129

114130
synchronized (this) {
115131
// TODO(yue9944882): proper naming for reflector
116-
reflector = new ReflectorRunnable<ApiType, ApiListType>(apiTypeClass, listerWatcher, queue);
132+
reflector = newReflector();
117133
try {
118134
reflectorFuture =
119135
reflectExecutor.scheduleWithFixedDelay(
@@ -130,6 +146,10 @@ public void run() {
130146
this.processLoop();
131147
}
132148

149+
/* visible for testing */ ReflectorRunnable<ApiType, ApiListType> newReflector() {
150+
return new ReflectorRunnable<>(apiTypeClass, listerWatcher, queue, exceptionHandler);
151+
}
152+
133153
/** stops the resync thread pool firstly, then stop the reflector */
134154
public void stop() {
135155
synchronized (this) {

util/src/main/java/io/kubernetes/client/informer/cache/ReflectorRunnable.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,22 +58,25 @@ public class ReflectorRunnable<
5858

5959
private AtomicBoolean isActive = new AtomicBoolean(true);
6060

61-
private final BiConsumer<Class<ApiType>, Throwable> exceptionHandler;
61+
/* visible for testing */ final BiConsumer<Class<ApiType>, Throwable> exceptionHandler;
6262

6363
public ReflectorRunnable(
64-
Class<ApiType> apiTypeClass, ListerWatcher listerWatcher, DeltaFIFO store) {
65-
this(apiTypeClass, listerWatcher, store, ReflectorRunnable::defaultWatchErrorHandler);
64+
Class<ApiType> apiTypeClass,
65+
ListerWatcher<ApiType, ApiListType> listerWatcher,
66+
DeltaFIFO store) {
67+
this(apiTypeClass, listerWatcher, store, null);
6668
}
6769

6870
public ReflectorRunnable(
6971
Class<ApiType> apiTypeClass,
70-
ListerWatcher listerWatcher,
72+
ListerWatcher<ApiType, ApiListType> listerWatcher,
7173
DeltaFIFO store,
7274
BiConsumer<Class<ApiType>, Throwable> exceptionHandler) {
7375
this.listerWatcher = listerWatcher;
7476
this.store = store;
7577
this.apiTypeClass = apiTypeClass;
76-
this.exceptionHandler = exceptionHandler;
78+
this.exceptionHandler =
79+
exceptionHandler == null ? ReflectorRunnable::defaultWatchErrorHandler : exceptionHandler;
7780
}
7881

7982
/**
@@ -277,7 +280,7 @@ private void watchHandler(Watchable<ApiType> watch) {
277280
}
278281
}
279282

280-
private static <ApiType extends KubernetesObject> void defaultWatchErrorHandler(
283+
static <ApiType extends KubernetesObject> void defaultWatchErrorHandler(
281284
Class<ApiType> watchingApiTypeClass, Throwable t) {
282285
log.error(String.format("%s#Reflector loop failed unexpectedly", watchingApiTypeClass), t);
283286
}

util/src/main/java/io/kubernetes/client/informer/impl/DefaultSharedIndexInformer.java

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.Deque;
2727
import java.util.List;
2828
import java.util.Map;
29+
import java.util.function.BiConsumer;
2930
import java.util.function.Function;
3031
import org.apache.commons.collections4.CollectionUtils;
3132
import org.apache.commons.lang3.tuple.MutablePair;
@@ -82,7 +83,28 @@ public DefaultSharedIndexInformer(
8283
new DeltaFIFO(
8384
(Function<KubernetesObject, String>) cache.getKeyFunc(),
8485
(Cache<KubernetesObject>) cache),
85-
cache);
86+
cache,
87+
null);
88+
}
89+
90+
public DefaultSharedIndexInformer(
91+
Class<ApiType> apiTypeClass,
92+
ListerWatcher<ApiType, ApiListType> listerWatcher,
93+
long resyncPeriod,
94+
Cache<ApiType> cache,
95+
BiConsumer<Class<ApiType>, Throwable> exceptionHandler) {
96+
this(
97+
apiTypeClass,
98+
listerWatcher,
99+
resyncPeriod,
100+
// down-casting should be safe here because one delta FIFO instance only serves one
101+
// resource
102+
// type
103+
new DeltaFIFO(
104+
(Function<KubernetesObject, String>) cache.getKeyFunc(),
105+
(Cache<KubernetesObject>) cache),
106+
cache,
107+
exceptionHandler);
86108
}
87109

88110
public DefaultSharedIndexInformer(
@@ -91,19 +113,31 @@ public DefaultSharedIndexInformer(
91113
long resyncPeriod,
92114
DeltaFIFO deltaFIFO,
93115
Indexer<ApiType> indexer) {
116+
this(apiTypeClass, listerWatcher, resyncPeriod, deltaFIFO, indexer, null);
117+
}
118+
119+
public DefaultSharedIndexInformer(
120+
Class<ApiType> apiTypeClass,
121+
ListerWatcher<ApiType, ApiListType> listerWatcher,
122+
long resyncPeriod,
123+
DeltaFIFO deltaFIFO,
124+
Indexer<ApiType> indexer,
125+
BiConsumer<Class<ApiType>, Throwable> exceptionHandler) {
126+
94127
this.resyncCheckPeriodMillis = resyncPeriod;
95128
this.defaultEventHandlerResyncPeriod = resyncPeriod;
96129

97130
this.processor = new SharedProcessor<>();
98131
this.indexer = indexer;
99132
this.controller =
100-
new Controller<ApiType, ApiListType>(
133+
new Controller<>(
101134
apiTypeClass,
102135
deltaFIFO,
103136
listerWatcher,
104137
this::handleDeltas,
105138
processor::shouldResync,
106-
resyncCheckPeriodMillis);
139+
resyncCheckPeriodMillis,
140+
exceptionHandler);
107141

108142
controllerThread =
109143
new Thread(controller::run, "informer-controller-" + apiTypeClass.getSimpleName());

util/src/main/java/io/kubernetes/client/util/ClientBuilder.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,9 @@ public static ClientBuilder kubeconfig(KubeConfig config) throws IOException {
277277
final ClientBuilder builder = new ClientBuilder();
278278

279279
String server = config.getServer();
280+
if (server == null) {
281+
throw new IllegalArgumentException("No server in kubeconfig");
282+
}
280283
if (!server.contains("://")) {
281284
if (server.contains(":443")) {
282285
server = "https://" + server;

util/src/test/java/io/kubernetes/client/informer/cache/ControllerTest.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import static org.junit.Assert.*;
1616

17+
import io.kubernetes.client.common.KubernetesObject;
1718
import io.kubernetes.client.informer.EventType;
1819
import io.kubernetes.client.informer.ListerWatcher;
1920
import io.kubernetes.client.openapi.models.V1ListMeta;
@@ -23,13 +24,36 @@
2324
import io.kubernetes.client.util.Watch;
2425
import java.time.Duration;
2526
import java.util.Arrays;
27+
import java.util.Deque;
2628
import java.util.concurrent.atomic.AtomicInteger;
29+
import java.util.function.BiConsumer;
30+
import java.util.function.Consumer;
31+
import java.util.function.Supplier;
32+
import org.apache.commons.lang3.tuple.MutablePair;
2733
import org.awaitility.Awaitility;
2834
import org.hamcrest.core.IsEqual;
35+
import org.junit.Rule;
2936
import org.junit.Test;
37+
import org.mockito.Mock;
38+
import org.mockito.junit.MockitoJUnit;
39+
import org.mockito.junit.MockitoRule;
3040

3141
public class ControllerTest {
3242

43+
@Rule public MockitoRule mockitoRule = MockitoJUnit.rule();
44+
45+
private static final Class<V1Pod> anyApiTypeClass = V1Pod.class;
46+
private static final long anyFullResyncPeriod = 1000L;
47+
48+
@Mock private DeltaFIFO deltaFIFOMock;
49+
@Mock private ListerWatcher<V1Pod, V1PodList> listerWatcherMock;
50+
51+
@Mock
52+
private Consumer<Deque<MutablePair<DeltaFIFO.DeltaType, KubernetesObject>>> popProcessFuncMock;
53+
54+
@Mock private Supplier<Boolean> resyncFuncMock;
55+
@Mock private BiConsumer<Class<V1Pod>, Throwable> exceptionHandlerMock;
56+
3357
@Test
3458
public void testControllerProcessDeltas() {
3559

@@ -75,4 +99,37 @@ public void testControllerProcessDeltas() {
7599
controller.stop();
76100
}
77101
}
102+
103+
@Test
104+
public void testReflectorIsConstructedWithExeptionHandler() {
105+
Controller<V1Pod, V1PodList> controller =
106+
new Controller<>(
107+
anyApiTypeClass,
108+
deltaFIFOMock,
109+
listerWatcherMock,
110+
popProcessFuncMock,
111+
resyncFuncMock,
112+
anyFullResyncPeriod,
113+
exceptionHandlerMock);
114+
assertSame(exceptionHandlerMock, controller.exceptionHandler);
115+
116+
ReflectorRunnable<V1Pod, V1PodList> reflector = controller.newReflector();
117+
118+
assertSame(exceptionHandlerMock, reflector.exceptionHandler);
119+
}
120+
121+
@Test
122+
public void testControllerHasNoExceptionHandlerPerDefault() {
123+
124+
Controller<V1Pod, V1PodList> controller =
125+
new Controller<>(
126+
anyApiTypeClass,
127+
deltaFIFOMock,
128+
listerWatcherMock,
129+
popProcessFuncMock,
130+
resyncFuncMock,
131+
anyFullResyncPeriod);
132+
133+
assertNull(controller.exceptionHandler);
134+
}
78135
}

util/src/test/java/io/kubernetes/client/informer/cache/ReflectorRunnableTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
package io.kubernetes.client.informer.cache;
1414

1515
import static org.junit.Assert.assertFalse;
16+
import static org.junit.Assert.assertNotNull;
17+
import static org.junit.Assert.assertSame;
1618
import static org.junit.Assert.assertTrue;
1719
import static org.mockito.Mockito.any;
1820
import static org.mockito.Mockito.mock;
@@ -35,6 +37,7 @@
3537
import java.time.Duration;
3638
import java.util.concurrent.CompletableFuture;
3739
import java.util.concurrent.atomic.AtomicReference;
40+
import java.util.function.BiConsumer;
3841
import org.awaitility.Awaitility;
3942
import org.hamcrest.core.IsEqual;
4043
import org.junit.Test;
@@ -45,10 +48,14 @@
4548
@RunWith(MockitoJUnitRunner.class)
4649
public class ReflectorRunnableTest {
4750

51+
private static final Class<V1Pod> anyApiType = V1Pod.class;
52+
4853
@Mock private DeltaFIFO deltaFIFO;
4954

5055
@Mock private ListerWatcher<V1Pod, V1PodList> listerWatcher;
5156

57+
@Mock private BiConsumer<Class<V1Pod>, Throwable> exceptionHandler;
58+
5259
@Test
5360
public void testReflectorRunOnce() throws ApiException {
5461
String mockResourceVersion = "1000";
@@ -343,4 +350,20 @@ public void testReflectorListShouldHandleExpiredResourceVersionFromWatchHandler(
343350
reflectorRunnable.stop();
344351
}
345352
}
353+
354+
@Test
355+
public void testDefaultExceptionHandlerSetPerDefault() {
356+
ReflectorRunnable<V1Pod, V1PodList> reflector =
357+
new ReflectorRunnable<>(anyApiType, listerWatcher, deltaFIFO);
358+
359+
assertNotNull(reflector.exceptionHandler);
360+
}
361+
362+
@Test
363+
public void testGivemExceptionHandlerSet() {
364+
ReflectorRunnable<V1Pod, V1PodList> reflector =
365+
new ReflectorRunnable<>(anyApiType, listerWatcher, deltaFIFO, exceptionHandler);
366+
367+
assertSame(exceptionHandler, reflector.exceptionHandler);
368+
}
346369
}

0 commit comments

Comments
 (0)