Skip to content

Commit a28f9f9

Browse files
authored
Merge pull request #1140 from yue9944882/fix/graceful-shutdown-sharedprocessor
Graceful shutdown sharedprocessor
2 parents c8051a6 + d194926 commit a28f9f9

File tree

2 files changed

+10
-22
lines changed

2 files changed

+10
-22
lines changed

util/src/main/java/io/kubernetes/client/informer/cache/SharedProcessor.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,16 @@
2323
import java.util.concurrent.locks.ReentrantReadWriteLock;
2424
import org.apache.commons.collections4.CollectionUtils;
2525
import org.joda.time.DateTime;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
2628

2729
/*
2830
* SharedProcessor class manages all the registered ProcessorListener and distributes notifications.
2931
*/
3032
public class SharedProcessor<ApiType extends KubernetesObject> {
3133

34+
private static final Logger log = LoggerFactory.getLogger(SharedProcessor.class);
35+
3236
private ReadWriteLock lock = new ReentrantReadWriteLock();
3337

3438
private List<ProcessorListener<ApiType>> listeners;
@@ -155,16 +159,16 @@ public void stop() {
155159
} finally {
156160
lock.writeLock().unlock();
157161
}
158-
// Disable new tasks from being submitted
159-
executorService.shutdown();
162+
// Interrupts running listeners by signalling InterruptedException
163+
executorService.shutdownNow();
160164
try {
161-
// Wait a while for existing tasks to terminate
165+
// Hold until all the listeners exits
162166
if (!executorService.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
163-
// Cancel currently executing tasks
164-
executorService.shutdownNow();
167+
log.warn(
168+
"SharedProcessors wasn't gracefully terminated, there can be listener thread leakage");
165169
}
166170
} catch (InterruptedException e) {
167-
executorService.shutdownNow();
171+
log.error("Graceful shutdown process of SharedProcessors was interrupted");
168172
}
169173
}
170174
}

util/src/test/java/io/kubernetes/client/informer/cache/SharedProcessorTest.java

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -67,22 +67,6 @@ public void testListenerAddition() throws InterruptedException {
6767
public void testShutdownGracefully() throws InterruptedException {
6868
SharedProcessor<V1Pod> sharedProcessor =
6969
new SharedProcessor<>(Executors.newCachedThreadPool(), Duration.ofSeconds(5));
70-
TestWorker<V1Pod> quickWorker = new TestWorker<>(null, 0);
71-
quickWorker.setTask(
72-
() -> {
73-
try {
74-
// sleep 2s so that it could terminate within timeout(5s)
75-
Thread.sleep(2000);
76-
} catch (InterruptedException e) {
77-
}
78-
});
79-
long before = System.currentTimeMillis();
80-
sharedProcessor.addAndStartListener(quickWorker);
81-
sharedProcessor.stop();
82-
// the stopping worker properly blocks the processor's stop call
83-
assertTrue(System.currentTimeMillis() - before >= 2000);
84-
85-
sharedProcessor = new SharedProcessor<>(Executors.newCachedThreadPool(), Duration.ofSeconds(5));
8670
TestWorker<V1Pod> slowWorker = new TestWorker<>(null, 0);
8771
final boolean[] interrupted = {false};
8872
slowWorker.setTask(

0 commit comments

Comments
 (0)