Skip to content

Commit 8f5aabe

Browse files
authored
Merge pull request #1026 from yue9944882/feat/exception-handler
Reflector and leader-elector exception handlers
2 parents b2b6ac0 + 6a7f2d8 commit 8f5aabe

File tree

4 files changed

+147
-20
lines changed

4 files changed

+147
-20
lines changed

extended/src/main/java/io/kubernetes/client/extended/leaderelection/LeaderElector.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import java.util.List;
1010
import java.util.concurrent.*;
1111
import java.util.concurrent.atomic.AtomicBoolean;
12+
import java.util.function.Consumer;
1213
import org.slf4j.Logger;
1314
import org.slf4j.LoggerFactory;
1415

@@ -23,6 +24,7 @@ public class LeaderElector {
2324
// internal bookkeeping
2425
private LeaderElectionRecord observedRecord;
2526
private long observedTimeMilliSeconds;
27+
private final Consumer<Throwable> exceptionHandler;
2628

2729
private ScheduledExecutorService scheduledWorkers =
2830
Executors.newSingleThreadScheduledExecutor(
@@ -33,6 +35,14 @@ public class LeaderElector {
3335
Executors.newSingleThreadExecutor(makeThreadFactory("leader-elector-hook-worker-%d"));
3436

3537
public LeaderElector(LeaderElectionConfig config) {
38+
this(
39+
config,
40+
(t) -> {
41+
log.error("Unexpected error on acquiring or renewing the lease", t);
42+
});
43+
}
44+
45+
public LeaderElector(LeaderElectionConfig config, Consumer<Throwable> exceptionHandler) {
3646
if (config == null) {
3747
throw new IllegalArgumentException("Config must be provided.");
3848
}
@@ -68,6 +78,7 @@ public LeaderElector(LeaderElectionConfig config) {
6878
throw new IllegalArgumentException(String.join(",", errors));
6979
}
7080
this.config = config;
81+
this.exceptionHandler = exceptionHandler;
7182
}
7283

7384
public void run(Runnable startLeadingHook, Runnable stopLeadingHook) {
@@ -110,7 +121,7 @@ private boolean acquire() {
110121
} catch (CancellationException e) {
111122
log.info("Processing tryAcquireOrRenew successfully canceled");
112123
} catch (Throwable t) {
113-
log.error("Error processing tryAcquireOrRenew as {}", t.getMessage());
124+
this.exceptionHandler.accept(t);
114125
future.cancel(true);
115126
}
116127
},

extended/src/test/java/io/kubernetes/client/extended/leaderelection/LeaderElectionTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,27 @@
11
package io.kubernetes.client.extended.leaderelection;
22

33
import static java.util.concurrent.TimeUnit.SECONDS;
4+
import static org.junit.Assert.*;
5+
import static org.mockito.Mockito.*;
46

7+
import io.kubernetes.client.openapi.ApiException;
58
import java.time.Duration;
69
import java.util.ArrayList;
710
import java.util.List;
811
import java.util.concurrent.CountDownLatch;
912
import java.util.concurrent.ExecutorService;
1013
import java.util.concurrent.Executors;
14+
import java.util.concurrent.atomic.AtomicReference;
1115
import java.util.concurrent.locks.ReentrantLock;
1216
import java.util.function.Consumer;
1317
import org.junit.Assert;
1418
import org.junit.Before;
1519
import org.junit.Test;
20+
import org.junit.runner.RunWith;
21+
import org.mockito.Mock;
22+
import org.mockito.junit.MockitoJUnitRunner;
1623

24+
@RunWith(MockitoJUnitRunner.class)
1725
public class LeaderElectionTest {
1826

1927
@Before
@@ -22,6 +30,8 @@ public void setUp() {
2230
MockResourceLock.leaderRecord = null;
2331
}
2432

33+
@Mock private Lock lock;
34+
2535
@Test
2636
public void testSimpleLeaderElection() throws InterruptedException {
2737
List<String> electionHistory = new ArrayList<>();
@@ -250,6 +260,28 @@ private void assertHistory(List<String> history, String... expected) {
250260
}
251261
}
252262

263+
@Test
264+
public void testLeaderElectionCaptureException() throws ApiException, InterruptedException {
265+
RuntimeException expectedException = new RuntimeException("noxu");
266+
AtomicReference<Throwable> actualException = new AtomicReference<>();
267+
when(lock.get()).thenThrow(expectedException);
268+
LeaderElectionConfig leaderElectionConfig = new LeaderElectionConfig();
269+
leaderElectionConfig.setLock(lock);
270+
leaderElectionConfig.setLeaseDuration(Duration.ofMillis(1000));
271+
leaderElectionConfig.setRetryPeriod(Duration.ofMillis(200));
272+
leaderElectionConfig.setRenewDeadline(Duration.ofMillis(700));
273+
LeaderElector leaderElector =
274+
new LeaderElector(leaderElectionConfig, (t) -> actualException.set(t));
275+
276+
ExecutorService leaderElectionWorker = Executors.newFixedThreadPool(1);
277+
leaderElectionWorker.submit(
278+
() -> {
279+
leaderElector.run(() -> {}, () -> {});
280+
});
281+
Thread.sleep(Duration.ofSeconds(2).toMillis());
282+
assertEquals(expectedException, actualException.get().getCause());
283+
}
284+
253285
public static class MockResourceLock implements Lock {
254286

255287
public static ReentrantLock lock;

util/src/main/java/io/kubernetes/client/informer/cache/ReflectorRunnable.java

Lines changed: 47 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,13 @@
66
import io.kubernetes.client.informer.ListerWatcher;
77
import io.kubernetes.client.openapi.models.V1ListMeta;
88
import io.kubernetes.client.openapi.models.V1ObjectMeta;
9-
import io.kubernetes.client.util.*;
9+
import io.kubernetes.client.util.CallGeneratorParams;
10+
import io.kubernetes.client.util.Watchable;
1011
import java.net.ConnectException;
1112
import java.time.Duration;
12-
import java.util.*;
13+
import java.util.List;
1314
import java.util.concurrent.atomic.AtomicBoolean;
15+
import java.util.function.BiConsumer;
1416
import org.slf4j.Logger;
1517
import org.slf4j.LoggerFactory;
1618

@@ -31,21 +33,32 @@ public class ReflectorRunnable<
3133

3234
private AtomicBoolean isActive = new AtomicBoolean(true);
3335

36+
private final BiConsumer<Class<ApiType>, Throwable> exceptionHandler;
37+
3438
public ReflectorRunnable(
3539
Class<ApiType> apiTypeClass, ListerWatcher listerWatcher, DeltaFIFO store) {
40+
this(apiTypeClass, listerWatcher, store, ReflectorRunnable::defaultWatchErrorHandler);
41+
}
42+
43+
public ReflectorRunnable(
44+
Class<ApiType> apiTypeClass,
45+
ListerWatcher listerWatcher,
46+
DeltaFIFO store,
47+
BiConsumer<Class<ApiType>, Throwable> exceptionHandler) {
3648
this.listerWatcher = listerWatcher;
3749
this.store = store;
3850
this.apiTypeClass = apiTypeClass;
51+
this.exceptionHandler = exceptionHandler;
3952
}
4053

4154
/**
4255
* run first lists all items and get the resource version at the moment of call, and then use the
4356
* resource version to watch.
4457
*/
4558
public void run() {
46-
try {
47-
log.info("{}#Start listing and watching...", apiTypeClass);
59+
log.info("{}#Start listing and watching...", apiTypeClass);
4860

61+
try {
4962
ApiListType list = listerWatcher.list(new CallGeneratorParams(Boolean.FALSE, null, null));
5063

5164
V1ListMeta listMeta = list.getMetadata();
@@ -82,25 +95,27 @@ public void run() {
8295
Long.valueOf(Duration.ofMinutes(5).toMillis()).intValue()));
8396
watchHandler(watch);
8497
} catch (Throwable t) {
85-
log.info("{}#Watch connection get exception {}", apiTypeClass, t.getMessage());
86-
Throwable cause = t.getCause();
87-
// If this is "connection refused" error, it means that most likely apiserver is not
88-
// responsive.
89-
// It doesn't make sense to re-list all objects because most likely we will be able to
90-
// restart
91-
// watch where we ended.
92-
// If that's the case wait and resend watch request.
93-
if (cause != null && (cause instanceof ConnectException)) {
94-
log.info("{}#Watch get connect exception, retry watch", apiTypeClass);
95-
Thread.sleep(1000L);
98+
if (isConnectException(t)) {
99+
// If this is "connection refused" error, it means that most likely apiserver is not
100+
// responsive.
101+
// It doesn't make sense to re-list all objects because most likely we will be able to
102+
// restart
103+
// watch where we ended.
104+
// If that's the case wait and resend watch request.
105+
log.info("{}#Watch get connect exception, retry watch", this.apiTypeClass);
106+
try {
107+
Thread.sleep(1000L);
108+
} catch (InterruptedException e) {
109+
// no-op
110+
}
96111
continue;
97112
}
98113
if ((t instanceof RuntimeException)
99114
&& t.getMessage().contains("IO Exception during hasNext")) {
100-
log.info("{}#Read timeout retry list and watch", apiTypeClass);
115+
log.info("{}#Read timeout retry list and watch", this.apiTypeClass);
101116
return;
102117
}
103-
log.error("{}#Watch failed as {} unexpected", apiTypeClass, t.getMessage(), t);
118+
this.exceptionHandler.accept(apiTypeClass, t);
104119
return;
105120
} finally {
106121
if (watch != null) {
@@ -110,7 +125,7 @@ public void run() {
110125
}
111126
}
112127
} catch (Throwable t) {
113-
log.error("Failed to list-watch", t);
128+
this.exceptionHandler.accept(apiTypeClass, t);
114129
}
115130
}
116131

@@ -168,4 +183,18 @@ private void watchHandler(Watchable<ApiType> watch) {
168183
}
169184
}
170185
}
186+
187+
private static <ApiType extends KubernetesObject> void defaultWatchErrorHandler(
188+
Class<ApiType> watchingApiTypeClass, Throwable t) {
189+
log.error(String.format("%s#Reflector loop failed unexpectedly", watchingApiTypeClass), t);
190+
}
191+
192+
private boolean isConnectException(Throwable t) {
193+
if (t instanceof ConnectException) {
194+
return true;
195+
}
196+
// ApiException can nest a ConnectException
197+
Throwable cause = t.getCause();
198+
return cause instanceof ConnectException;
199+
}
171200
}

util/src/test/java/io/kubernetes/client/informer/cache/ReflectorRunnableTest.java

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,14 @@
66
import io.kubernetes.client.informer.EventType;
77
import io.kubernetes.client.informer.ListerWatcher;
88
import io.kubernetes.client.openapi.ApiException;
9-
import io.kubernetes.client.openapi.models.*;
9+
import io.kubernetes.client.openapi.models.V1ListMeta;
10+
import io.kubernetes.client.openapi.models.V1Pod;
11+
import io.kubernetes.client.openapi.models.V1PodList;
12+
import io.kubernetes.client.openapi.models.V1Status;
1013
import io.kubernetes.client.util.CallGeneratorParams;
1114
import io.kubernetes.client.util.Watch;
1215
import io.kubernetes.client.util.Watchable;
16+
import java.util.concurrent.atomic.AtomicReference;
1317
import org.junit.Rule;
1418
import org.junit.Test;
1519
import org.junit.contrib.java.lang.system.EnvironmentVariables;
@@ -90,4 +94,55 @@ public Watchable<V1Pod> watch(CallGeneratorParams params) throws ApiException {
9094
reflectorRunnable.stop();
9195
}
9296
}
97+
98+
@Test
99+
public void testReflectorRunnableCaptureListException()
100+
throws ApiException, InterruptedException {
101+
RuntimeException expectedException = new RuntimeException("noxu");
102+
AtomicReference<Throwable> actualException = new AtomicReference<>();
103+
when(listerWatcher.list(any())).thenThrow(expectedException);
104+
ReflectorRunnable<V1Pod, V1PodList> reflectorRunnable =
105+
new ReflectorRunnable<>(
106+
V1Pod.class,
107+
listerWatcher,
108+
deltaFIFO,
109+
(apiType, t) -> {
110+
actualException.set(t);
111+
});
112+
try {
113+
Thread thread = new Thread(reflectorRunnable::run);
114+
thread.setDaemon(true);
115+
thread.start();
116+
Thread.sleep(1000);
117+
} finally {
118+
reflectorRunnable.stop();
119+
}
120+
assertEquals(expectedException, actualException.get());
121+
}
122+
123+
@Test
124+
public void testReflectorRunnableCaptureWatchException()
125+
throws ApiException, InterruptedException {
126+
RuntimeException expectedException = new RuntimeException("noxu");
127+
AtomicReference<Throwable> actualException = new AtomicReference<>();
128+
when(listerWatcher.list(any())).thenReturn(new V1PodList().metadata(new V1ListMeta()));
129+
when(listerWatcher.watch(any())).thenThrow(expectedException);
130+
ReflectorRunnable<V1Pod, V1PodList> reflectorRunnable =
131+
new ReflectorRunnable<>(
132+
V1Pod.class,
133+
listerWatcher,
134+
deltaFIFO,
135+
(apiType, t) -> {
136+
actualException.set(t);
137+
});
138+
try {
139+
Thread thread = new Thread(reflectorRunnable::run);
140+
thread.setDaemon(true);
141+
thread.start();
142+
Thread.sleep(1000);
143+
} finally {
144+
reflectorRunnable.stop();
145+
}
146+
assertEquals(expectedException, actualException.get());
147+
}
93148
}

0 commit comments

Comments
 (0)