Skip to content

Commit e21e044

Browse files
committed
shut down reflector daemon job gracefully
1 parent 6b73782 commit e21e044

File tree

2 files changed

+60
-14
lines changed

2 files changed

+60
-14
lines changed

util/src/main/java/io/kubernetes/client/informer/cache/Controller.java

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -96,28 +96,34 @@ public void run() {
9696
log.info("informer#Controller: resync skipped due to 0 full resync period");
9797
}
9898

99-
// TODO(yue9944882): proper naming for reflector
100-
reflector = new ReflectorRunnable<ApiType, ApiListType>(apiTypeClass, listerWatcher, queue);
101-
102-
reflectorFuture =
103-
reflectExecutor.scheduleWithFixedDelay(
104-
reflector::run, 0L, DEFAULT_PERIOD, TimeUnit.MILLISECONDS);
99+
synchronized (this) {
100+
// TODO(yue9944882): proper naming for reflector
101+
reflector = new ReflectorRunnable<ApiType, ApiListType>(apiTypeClass, listerWatcher, queue);
102+
try {
103+
reflectorFuture =
104+
reflectExecutor.scheduleWithFixedDelay(
105+
reflector::run, 0L, DEFAULT_PERIOD, TimeUnit.MILLISECONDS);
106+
} catch (RejectedExecutionException e) {
107+
// submitting reflector list-watching job can fail due to concurrent invocation of
108+
// `shutdown`. handling exception with a warning then return.
109+
log.warn("reflector list-watching job exiting because the thread-pool is shutting down");
110+
return;
111+
}
112+
}
105113

106114
// start the process loop
107115
this.processLoop();
108116
}
109117

110118
/** stops the resync thread pool firstly, then stop the reflector */
111119
public void stop() {
112-
113-
reflector.stop();
114-
reflectorFuture.cancel(true);
115-
reflectExecutor.shutdown();
116-
117-
if (resyncFuture != null) {
118-
resyncFuture.cancel(true);
119-
resyncExecutor.shutdown();
120+
synchronized (this) {
121+
if (reflectorFuture != null) {
122+
reflector.stop();
123+
reflectorFuture.cancel(true);
124+
}
120125
}
126+
reflectExecutor.shutdown();
121127
}
122128

123129
/** returns true if the queue has been resycned */
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package io.kubernetes.client.informer;
2+
3+
import io.kubernetes.client.ApiException;
4+
import io.kubernetes.client.apis.CoreV1Api;
5+
import io.kubernetes.client.models.V1Namespace;
6+
import io.kubernetes.client.models.V1NamespaceList;
7+
import io.kubernetes.client.util.CallGeneratorParams;
8+
import org.junit.Test;
9+
10+
public class SharedInformerFactoryTest {
11+
@Test
12+
public void shutdownInformerFactoryInstantlyAfterStarting() {
13+
CoreV1Api api = new CoreV1Api();
14+
SharedInformerFactory factory = new SharedInformerFactory();
15+
factory.sharedIndexInformerFor(
16+
(CallGeneratorParams params) -> {
17+
try {
18+
return api.listNamespaceCall(
19+
null,
20+
null,
21+
null,
22+
null,
23+
null,
24+
null,
25+
params.resourceVersion,
26+
params.timeoutSeconds,
27+
params.watch,
28+
null,
29+
null);
30+
} catch (ApiException e) {
31+
throw new RuntimeException(e);
32+
}
33+
},
34+
V1Namespace.class,
35+
V1NamespaceList.class);
36+
37+
factory.startAllRegisteredInformers();
38+
factory.stopAllRegisteredInformers();
39+
}
40+
}

0 commit comments

Comments
 (0)