Skip to content

Commit b6a9f6e

Browse files
committed
tie operator liveness more closely to main work queue
1 parent 5f862ef commit b6a9f6e

File tree

6 files changed

+31
-42
lines changed

6 files changed

+31
-42
lines changed

operator/src/main/java/oracle/kubernetes/operator/Main.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -79,12 +79,14 @@ public Thread newThread(Runnable r) {
7979
}
8080

8181
private static final ThreadFactory threadFactory = new WrappedThreadFactory();
82+
private static final ScheduledExecutorService wrappedExecutorService =
83+
Engine.wrappedExecutorService("operator", container);
8284

8385
static final TuningParameters tuningAndConfig;
8486

8587
static {
8688
try {
87-
TuningParameters.initializeInstance(threadFactory, "/operator/config");
89+
TuningParameters.initializeInstance(wrappedExecutorService, "/operator/config");
8890
tuningAndConfig = TuningParameters.getInstance();
8991
} catch (IOException e) {
9092
LOGGER.warning(MessageKeys.EXCEPTION, e);
@@ -94,9 +96,6 @@ public Thread newThread(Runnable r) {
9496

9597
private static final CallBuilderFactory callBuilderFactory = new CallBuilderFactory();
9698

97-
private static final ScheduledExecutorService wrappedExecutorService =
98-
Engine.wrappedExecutorService("operator", container);
99-
10099
static {
101100
container
102101
.getComponents()
@@ -435,9 +434,8 @@ private static void stopRestServer() {
435434

436435
private static void startLivenessThread() {
437436
LOGGER.info(MessageKeys.STARTING_LIVENESS_THREAD);
438-
livenessThread = new OperatorLiveness();
439-
livenessThread.setDaemon(true);
440-
livenessThread.start();
437+
// every five seconds we need to update the last modified time on the liveness file
438+
wrappedExecutorService.scheduleWithFixedDelay(new OperatorLiveness(), 5, 5, TimeUnit.SECONDS);
441439
}
442440

443441
private static void waitForDeath() {

operator/src/main/java/oracle/kubernetes/operator/OperatorLiveness.java

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,33 +12,23 @@
1212
import oracle.kubernetes.operator.logging.MessageKeys;
1313

1414
/**
15-
* This thread maintains the "liveness" indicator so that Kubernetes knows the Operator is still
15+
* This task maintains the "liveness" indicator so that Kubernetes knows the Operator is still
1616
* alive.
1717
*/
18-
public class OperatorLiveness extends Thread {
18+
public class OperatorLiveness implements Runnable {
1919

2020
private static final LoggingFacade LOGGER = LoggingFactory.getLogger("Operator", "Operator");
2121
private final File livenessFile = new File("/operator/.alive");
2222

2323
@Override
2424
public void run() {
25-
// every five seconds we need to update the last modified time on the liveness file
26-
27-
while (true) {
28-
29-
if (!livenessFile.exists()) {
30-
try {
31-
livenessFile.createNewFile();
32-
} catch (IOException ioe) {
33-
LOGGER.info(MessageKeys.COULD_NOT_CREATE_LIVENESS_FILE);
34-
}
35-
}
36-
livenessFile.setLastModified(new Date().getTime());
37-
25+
if (!livenessFile.exists()) {
3826
try {
39-
Thread.sleep(5000);
40-
} catch (InterruptedException ignore) {
27+
livenessFile.createNewFile();
28+
} catch (IOException ioe) {
29+
LOGGER.info(MessageKeys.COULD_NOT_CREATE_LIVENESS_FILE);
4130
}
4231
}
32+
livenessFile.setLastModified(new Date().getTime());
4333
}
4434
}

operator/src/main/java/oracle/kubernetes/operator/TuningParameters.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,13 @@
66

77
import java.io.IOException;
88
import java.util.Map;
9-
import java.util.concurrent.ThreadFactory;
9+
import java.util.concurrent.ScheduledExecutorService;
1010

1111
public interface TuningParameters extends Map<String, String> {
1212

13-
static TuningParameters initializeInstance(ThreadFactory factory, String mountPoint)
14-
throws IOException {
15-
return TuningParametersImpl.initializeInstance(factory, mountPoint);
13+
static TuningParameters initializeInstance(
14+
ScheduledExecutorService executorService, String mountPoint) throws IOException {
15+
return TuningParametersImpl.initializeInstance(executorService, mountPoint);
1616
}
1717

1818
public static TuningParameters getInstance() {

operator/src/main/java/oracle/kubernetes/operator/TuningParametersImpl.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
package oracle.kubernetes.operator;
66

77
import java.io.IOException;
8-
import java.util.concurrent.ThreadFactory;
8+
import java.util.concurrent.ScheduledExecutorService;
99
import java.util.concurrent.locks.ReadWriteLock;
1010
import java.util.concurrent.locks.ReentrantReadWriteLock;
1111
import oracle.kubernetes.operator.helpers.ConfigMapConsumer;
@@ -23,10 +23,10 @@ public class TuningParametersImpl extends ConfigMapConsumer implements TuningPar
2323
private WatchTuning watch = null;
2424
private PodTuning pod = null;
2525

26-
static synchronized TuningParameters initializeInstance(ThreadFactory factory, String mountPoint)
27-
throws IOException {
26+
static synchronized TuningParameters initializeInstance(
27+
ScheduledExecutorService executorService, String mountPoint) throws IOException {
2828
if (INSTANCE == null) {
29-
INSTANCE = new TuningParametersImpl(factory, mountPoint);
29+
INSTANCE = new TuningParametersImpl(executorService, mountPoint);
3030
return INSTANCE;
3131
}
3232
throw new IllegalStateException();
@@ -36,8 +36,9 @@ public static synchronized TuningParameters getInstance() {
3636
return INSTANCE;
3737
}
3838

39-
private TuningParametersImpl(ThreadFactory factory, String mountPoint) throws IOException {
40-
super(factory, mountPoint, TuningParametersImpl::updateTuningParameters);
39+
private TuningParametersImpl(ScheduledExecutorService executorService, String mountPoint)
40+
throws IOException {
41+
super(executorService, mountPoint, TuningParametersImpl::updateTuningParameters);
4142
update();
4243
}
4344

operator/src/main/java/oracle/kubernetes/operator/helpers/ConfigMapConsumer.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,8 @@
1818
import java.util.List;
1919
import java.util.Map;
2020
import java.util.Set;
21-
import java.util.concurrent.Executors;
2221
import java.util.concurrent.ScheduledExecutorService;
2322
import java.util.concurrent.ScheduledFuture;
24-
import java.util.concurrent.ThreadFactory;
2523
import java.util.concurrent.TimeUnit;
2624
import java.util.concurrent.atomic.AtomicReference;
2725
import oracle.kubernetes.operator.logging.LoggingFacade;
@@ -42,9 +40,10 @@ public class ConfigMapConsumer implements Map<String, String> {
4240
private final AtomicReference<ScheduledFuture<?>> future = new AtomicReference<>(null);
4341
private final Runnable onUpdate;
4442

45-
public ConfigMapConsumer(ThreadFactory factory, String mountPoint, Runnable onUpdate)
43+
public ConfigMapConsumer(
44+
ScheduledExecutorService executorService, String mountPoint, Runnable onUpdate)
4645
throws IOException {
47-
this.threadPool = Executors.newScheduledThreadPool(2, factory);
46+
this.threadPool = executorService;
4847
this.mountPointDir = new File(mountPoint);
4948
this.watcher = FileSystems.getDefault().newWatchService();
5049
this.onUpdate = onUpdate;

operator/src/main/java/oracle/kubernetes/operator/work/Engine.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
package oracle.kubernetes.operator.work;
66

77
import java.util.concurrent.Executor;
8-
import java.util.concurrent.Executors;
98
import java.util.concurrent.ScheduledExecutorService;
9+
import java.util.concurrent.ScheduledThreadPoolExecutor;
1010
import java.util.concurrent.ThreadFactory;
1111
import java.util.concurrent.atomic.AtomicInteger;
1212

@@ -15,9 +15,10 @@ public class Engine {
1515
private static final int DEFAULT_THREAD_COUNT = 10;
1616

1717
public static ScheduledExecutorService wrappedExecutorService(String id, Container container) {
18-
return wrap(
19-
container,
20-
Executors.newScheduledThreadPool(DEFAULT_THREAD_COUNT, new DaemonThreadFactory(id)));
18+
ScheduledThreadPoolExecutor threadPool =
19+
new ScheduledThreadPoolExecutor(DEFAULT_THREAD_COUNT, new DaemonThreadFactory(id));
20+
threadPool.setRemoveOnCancelPolicy(true);
21+
return wrap(container, threadPool);
2122
}
2223

2324
private volatile ScheduledExecutorService threadPool;

0 commit comments

Comments
 (0)