Skip to content

Commit 4642024

Browse files
authored
Merge pull request #1096 from dddddai/master
Gracefully shutdown listener pool
2 parents 9dae95d + 858f427 commit 4642024

File tree

2 files changed

+80
-2
lines changed

2 files changed

+80
-2
lines changed

util/src/main/java/io/kubernetes/client/informer/cache/SharedProcessor.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@
1313
package io.kubernetes.client.informer.cache;
1414

1515
import io.kubernetes.client.common.KubernetesObject;
16+
import java.time.Duration;
1617
import java.util.ArrayList;
1718
import java.util.List;
1819
import java.util.concurrent.ExecutorService;
1920
import java.util.concurrent.Executors;
21+
import java.util.concurrent.TimeUnit;
2022
import java.util.concurrent.locks.ReadWriteLock;
2123
import java.util.concurrent.locks.ReentrantReadWriteLock;
2224
import org.apache.commons.collections4.CollectionUtils;
@@ -34,15 +36,22 @@ public class SharedProcessor<ApiType extends KubernetesObject> {
3436

3537
private ExecutorService executorService;
3638

39+
private final Duration timeout;
40+
3741
public SharedProcessor() {
3842
this(Executors.newCachedThreadPool());
3943
}
4044

4145
public SharedProcessor(ExecutorService threadPool) {
46+
this(threadPool, Duration.ofMinutes(1));
47+
}
48+
49+
public SharedProcessor(ExecutorService threadPool, Duration timeout) {
4250
this.listeners = new ArrayList<>();
4351
this.syncingListeners = new ArrayList<>();
4452

4553
this.executorService = threadPool;
54+
this.timeout = timeout;
4655
}
4756

4857
/**
@@ -146,7 +155,16 @@ public void stop() {
146155
} finally {
147156
lock.writeLock().unlock();
148157
}
149-
// TODO(yue9944882): gracefully shutdown listener pool
150-
executorService.shutdownNow();
158+
// Disable new tasks from being submitted
159+
executorService.shutdown();
160+
try {
161+
// Wait a while for existing tasks to terminate
162+
if (!executorService.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
163+
// Cancel currently executing tasks
164+
executorService.shutdownNow();
165+
}
166+
} catch (InterruptedException e) {
167+
executorService.shutdownNow();
168+
}
151169
}
152170
}

util/src/test/java/io/kubernetes/client/informer/cache/SharedProcessorTest.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import io.kubernetes.client.informer.ResourceEventHandler;
1919
import io.kubernetes.client.openapi.models.V1ObjectMeta;
2020
import io.kubernetes.client.openapi.models.V1Pod;
21+
import java.time.Duration;
22+
import java.util.concurrent.Executors;
2123
import org.junit.Rule;
2224
import org.junit.Test;
2325
import org.junit.contrib.java.lang.system.EnvironmentVariables;
@@ -61,6 +63,45 @@ public void testListenerAddition() throws InterruptedException {
6163
assertTrue(expectDeleteHandler.isSatisfied());
6264
}
6365

66+
@Test
67+
public void testShutdownGracefully() throws InterruptedException {
68+
SharedProcessor<V1Pod> sharedProcessor =
69+
new SharedProcessor<>(Executors.newCachedThreadPool(), Duration.ofSeconds(5));
70+
TestWorker<V1Pod> quickWorker = new TestWorker<>(null, 0);
71+
quickWorker.setTask(
72+
() -> {
73+
try {
74+
// sleep 2s so that it could terminate within timeout(5s)
75+
Thread.sleep(2000);
76+
} catch (InterruptedException e) {
77+
}
78+
});
79+
long before = System.currentTimeMillis();
80+
sharedProcessor.addAndStartListener(quickWorker);
81+
sharedProcessor.stop();
82+
// the stopping worker properly blocks the processor's stop call
83+
assertTrue(System.currentTimeMillis() - before >= 2000);
84+
85+
sharedProcessor = new SharedProcessor<>(Executors.newCachedThreadPool(), Duration.ofSeconds(5));
86+
TestWorker<V1Pod> slowWorker = new TestWorker<>(null, 0);
87+
final boolean[] interrupted = {false};
88+
slowWorker.setTask(
89+
() -> {
90+
try {
91+
// sleep 10s so that it could be interrupted by shutdownNow()
92+
Thread.sleep(10 * 1000);
93+
} catch (InterruptedException e) {
94+
interrupted[0] = true;
95+
}
96+
});
97+
sharedProcessor.addAndStartListener(slowWorker);
98+
sharedProcessor.stop();
99+
// make sure the slow worker has set interrupted[0] = true
100+
Thread.sleep(1000);
101+
// the slow worker is interrupted upon timeout
102+
assertTrue(interrupted[0]);
103+
}
104+
64105
private static class ExpectingNoticationHandler<ApiType extends KubernetesObject>
65106
extends ProcessorListener<ApiType> {
66107

@@ -99,4 +140,23 @@ public boolean isSatisfied() {
99140
return satisfied;
100141
}
101142
}
143+
144+
private static class TestWorker<ApiType extends KubernetesObject>
145+
extends ProcessorListener<ApiType> {
146+
147+
private Runnable task;
148+
149+
public TestWorker(ResourceEventHandler<ApiType> handler, long resyncPeriod) {
150+
super(handler, resyncPeriod);
151+
}
152+
153+
public void setTask(Runnable task) {
154+
this.task = task;
155+
}
156+
157+
@Override
158+
public void run() {
159+
this.task.run();
160+
}
161+
}
102162
}

0 commit comments

Comments
 (0)