Skip to content

Commit 4847759

Browse files
committed
Use ThreadFactory
1 parent d2bf3e7 commit 4847759

15 files changed

+56
-44
lines changed

src/main/java/oracle/kubernetes/operator/ConfigMapWatcher.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import oracle.kubernetes.operator.builders.WatchI;
1010
import oracle.kubernetes.operator.watcher.WatchListener;
1111

12-
import java.util.concurrent.ScheduledExecutorService;
12+
import java.util.concurrent.ThreadFactory;
1313
import java.util.concurrent.atomic.AtomicBoolean;
1414

1515
/**
@@ -19,9 +19,9 @@
1919
public class ConfigMapWatcher extends Watcher<V1ConfigMap> {
2020
private final String ns;
2121

22-
public static ConfigMapWatcher create(ScheduledExecutorService threadPool, String ns, String initialResourceVersion, WatchListener<V1ConfigMap> listener, AtomicBoolean isStopping) {
22+
public static ConfigMapWatcher create(ThreadFactory factory, String ns, String initialResourceVersion, WatchListener<V1ConfigMap> listener, AtomicBoolean isStopping) {
2323
ConfigMapWatcher watcher = new ConfigMapWatcher(ns, initialResourceVersion, listener, isStopping);
24-
watcher.start(threadPool);
24+
watcher.start(factory);
2525
return watcher;
2626
}
2727

src/main/java/oracle/kubernetes/operator/DomainWatcher.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import oracle.kubernetes.operator.watcher.WatchListener;
1010
import oracle.kubernetes.weblogic.domain.v1.Domain;
1111

12-
import java.util.concurrent.ScheduledExecutorService;
12+
import java.util.concurrent.ThreadFactory;
1313
import java.util.concurrent.atomic.AtomicBoolean;
1414

1515
/**
@@ -19,9 +19,9 @@
1919
public class DomainWatcher extends Watcher<Domain> {
2020
private final String ns;
2121

22-
public static DomainWatcher create(ScheduledExecutorService threadPool, String ns, String initialResourceVersion, WatchListener<Domain> listener, AtomicBoolean isStopping) {
22+
public static DomainWatcher create(ThreadFactory factory, String ns, String initialResourceVersion, WatchListener<Domain> listener, AtomicBoolean isStopping) {
2323
DomainWatcher watcher = new DomainWatcher(ns, initialResourceVersion, listener, isStopping);
24-
watcher.start(threadPool);
24+
watcher.start(factory);
2525
return watcher;
2626
}
2727

src/main/java/oracle/kubernetes/operator/IngressWatcher.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import oracle.kubernetes.operator.watcher.WatchListener;
1212

1313
import java.util.Map;
14-
import java.util.concurrent.ScheduledExecutorService;
14+
import java.util.concurrent.ThreadFactory;
1515
import java.util.concurrent.atomic.AtomicBoolean;
1616

1717
/**
@@ -21,9 +21,9 @@
2121
public class IngressWatcher extends Watcher<V1beta1Ingress> {
2222
private final String ns;
2323

24-
public static IngressWatcher create(ScheduledExecutorService threadPool, String ns, String initialResourceVersion, WatchListener<V1beta1Ingress> listener, AtomicBoolean isStopping) {
24+
public static IngressWatcher create(ThreadFactory factory, String ns, String initialResourceVersion, WatchListener<V1beta1Ingress> listener, AtomicBoolean isStopping) {
2525
IngressWatcher watcher = new IngressWatcher(ns, initialResourceVersion, listener, isStopping);
26-
watcher.start(threadPool);
26+
watcher.start(factory);
2727
return watcher;
2828
}
2929

src/main/java/oracle/kubernetes/operator/Main.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414
import java.util.StringTokenizer;
1515
import java.util.concurrent.ConcurrentHashMap;
1616
import java.util.concurrent.ConcurrentMap;
17+
import java.util.concurrent.Executors;
1718
import java.util.concurrent.ScheduledFuture;
19+
import java.util.concurrent.ThreadFactory;
1820
import java.util.concurrent.TimeUnit;
1921
import java.util.concurrent.atomic.AtomicBoolean;
2022
import java.util.concurrent.atomic.AtomicInteger;
@@ -86,6 +88,14 @@ public class Main {
8688

8789
private static final Container container = new Container();
8890
private static final Engine engine = new Engine("operator", container);
91+
private static final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
92+
private static final ThreadFactory factory = (r) -> {
93+
Thread t = defaultFactory.newThread(r);
94+
if (!t.isDaemon()) {
95+
t.setDaemon(true);
96+
}
97+
return t;
98+
};
8999

90100
private static final LoggingFacade LOGGER = LoggingFactory.getLogger("Operator", "Operator");
91101
private static final FiberGate domainUpdaters = new FiberGate(engine);
@@ -94,7 +104,7 @@ public class Main {
94104
private static final TuningParameters tuningAndConfig;
95105
static {
96106
try {
97-
tuningAndConfig = TuningParameters.initializeInstance(engine.getExecutor(), "/operator/config");
107+
tuningAndConfig = TuningParameters.initializeInstance(factory, "/operator/config");
98108
} catch (IOException e) {
99109
LOGGER.warning(MessageKeys.EXCEPTION, e);
100110
throw new RuntimeException(e);
@@ -1531,12 +1541,12 @@ public static boolean getStopping() {
15311541
}
15321542

15331543
private static DomainWatcher createDomainWatcher(String namespace, String initialResourceVersion) {
1534-
return DomainWatcher.create(engine.getExecutor(), namespace,
1544+
return DomainWatcher.create(factory, namespace,
15351545
initialResourceVersion, Main::dispatchDomainWatch, stopping);
15361546
}
15371547

15381548
private static PodWatcher createPodWatcher(String namespace, String initialResourceVersion) {
1539-
return PodWatcher.create(engine.getExecutor(), namespace,
1549+
return PodWatcher.create(factory, namespace,
15401550
initialResourceVersion, Main::dispatchPodWatch, stopping);
15411551
}
15421552

@@ -1585,7 +1595,7 @@ private static void dispatchPodWatch(Watch.Response<V1Pod> item) {
15851595

15861596

15871597
private static ServiceWatcher createServiceWatcher(String namespace, String initialResourceVersion) {
1588-
return ServiceWatcher.create(engine.getExecutor(), namespace,
1598+
return ServiceWatcher.create(factory, namespace,
15891599
initialResourceVersion, Main::dispatchServiceWatch, stopping);
15901600
}
15911601

@@ -1674,7 +1684,7 @@ private static void dispatchServiceWatch(Watch.Response<V1Service> item) {
16741684
}
16751685

16761686
private static IngressWatcher createIngressWatcher(String namespace, String initialResourceVersion) {
1677-
return IngressWatcher.create(engine.getExecutor(), namespace,
1687+
return IngressWatcher.create(factory, namespace,
16781688
initialResourceVersion, Main::dispatchIngressWatch, stopping);
16791689
}
16801690

@@ -1731,7 +1741,7 @@ public NextAction apply(Packet packet) {
17311741
}
17321742

17331743
private static ConfigMapWatcher createConfigMapWatcher(String namespace, String initialResourceVersion) {
1734-
return ConfigMapWatcher.create(engine.getExecutor(), namespace,
1744+
return ConfigMapWatcher.create(factory, namespace,
17351745
initialResourceVersion, Main::dispatchConfigMapWatch, stopping);
17361746
}
17371747

src/main/java/oracle/kubernetes/operator/PodWatcher.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import java.util.Map;
2828
import java.util.concurrent.ConcurrentHashMap;
2929
import java.util.concurrent.ConcurrentMap;
30-
import java.util.concurrent.ScheduledExecutorService;
30+
import java.util.concurrent.ThreadFactory;
3131
import java.util.concurrent.atomic.AtomicBoolean;
3232

3333
/**
@@ -52,9 +52,9 @@ public class PodWatcher extends Watcher<V1Pod> implements WatchListener<V1Pod> {
5252
* @param isStopping Stop signal
5353
* @return Pod watcher for the namespace
5454
*/
55-
public static PodWatcher create(ScheduledExecutorService threadPool, String ns, String initialResourceVersion, WatchListener<V1Pod> listener, AtomicBoolean isStopping) {
55+
public static PodWatcher create(ThreadFactory factory, String ns, String initialResourceVersion, WatchListener<V1Pod> listener, AtomicBoolean isStopping) {
5656
PodWatcher watcher = new PodWatcher(ns, initialResourceVersion, listener, isStopping);
57-
watcher.start(threadPool);
57+
watcher.start(factory);
5858
return watcher;
5959
}
6060

src/main/java/oracle/kubernetes/operator/ServiceWatcher.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import oracle.kubernetes.operator.watcher.WatchListener;
1212

1313
import java.util.Map;
14-
import java.util.concurrent.ScheduledExecutorService;
14+
import java.util.concurrent.ThreadFactory;
1515
import java.util.concurrent.atomic.AtomicBoolean;
1616

1717
/**
@@ -21,9 +21,9 @@
2121
public class ServiceWatcher extends Watcher<V1Service> {
2222
private final String ns;
2323

24-
public static ServiceWatcher create(ScheduledExecutorService threadPool, String ns, String initialResourceVersion, WatchListener<V1Service> listener, AtomicBoolean isStopping) {
24+
public static ServiceWatcher create(ThreadFactory factory, String ns, String initialResourceVersion, WatchListener<V1Service> listener, AtomicBoolean isStopping) {
2525
ServiceWatcher watcher = new ServiceWatcher(ns, initialResourceVersion, listener, isStopping);
26-
watcher.start(threadPool);
26+
watcher.start(factory);
2727
return watcher;
2828
}
2929

src/main/java/oracle/kubernetes/operator/TuningParameters.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
package oracle.kubernetes.operator;
55

66
import java.io.IOException;
7-
import java.util.concurrent.ScheduledExecutorService;
7+
import java.util.concurrent.ThreadFactory;
88
import java.util.concurrent.locks.ReadWriteLock;
99
import java.util.concurrent.locks.ReentrantReadWriteLock;
1010

@@ -79,16 +79,16 @@ public PodTuning(int readinessProbeInitialDelaySeconds, int readinessProbeTimeou
7979
}
8080

8181
public synchronized static TuningParameters initializeInstance(
82-
ScheduledExecutorService threadPool, String mountPoint) throws IOException {
82+
ThreadFactory factory, String mountPoint) throws IOException {
8383
if (INSTANCE == null) {
84-
INSTANCE = new TuningParameters(threadPool, mountPoint);
84+
INSTANCE = new TuningParameters(factory, mountPoint);
8585
return INSTANCE;
8686
}
8787
throw new IllegalStateException();
8888
}
8989

90-
private TuningParameters(ScheduledExecutorService threadPool, String mountPoint) throws IOException {
91-
super(threadPool, mountPoint, () -> {
90+
private TuningParameters(ThreadFactory factory, String mountPoint) throws IOException {
91+
super(factory, mountPoint, () -> {
9292
updateTuningParameters();
9393
});
9494
update();

src/main/java/oracle/kubernetes/operator/Watcher.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,7 @@
1818

1919
import java.io.IOException;
2020
import java.lang.reflect.Method;
21-
import java.util.concurrent.ExecutionException;
22-
import java.util.concurrent.ExecutorService;
23-
import java.util.concurrent.Future;
21+
import java.util.concurrent.ThreadFactory;
2422
import java.util.concurrent.atomic.AtomicBoolean;
2523

2624
import static java.net.HttpURLConnection.HTTP_GONE;
@@ -41,7 +39,7 @@ abstract class Watcher<T> {
4139
private String resourceVersion;
4240
private AtomicBoolean stopping;
4341
private WatchListener<T> listener;
44-
private Future<?> future;
42+
private Thread thread = null;
4543

4644
/**
4745
* Constructs a watcher without specifying a listener. Needed when the listener is the watch subclass itself.
@@ -69,9 +67,10 @@ abstract class Watcher<T> {
6967
*/
7068
void waitForExit() {
7169
try {
72-
future.get();
70+
if (thread != null) {
71+
thread.join();
72+
}
7373
} catch (InterruptedException ignored) {
74-
} catch (ExecutionException e) {
7574
}
7675
}
7776

@@ -86,8 +85,9 @@ void setListener(WatchListener<T> listener) {
8685
/**
8786
* Kick off the watcher processing that runs in a separate thread.
8887
*/
89-
void start(ExecutorService threadPool) {
90-
future = threadPool.submit(this::doWatch);
88+
void start(ThreadFactory factory) {
89+
thread = factory.newThread(this::doWatch);
90+
thread.run();
9191
}
9292

9393
private void doWatch() {

src/main/java/oracle/kubernetes/operator/helpers/ConfigMapConsumer.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@
2121
import java.util.List;
2222
import java.util.Map;
2323
import java.util.Set;
24+
import java.util.concurrent.Executors;
2425
import java.util.concurrent.ScheduledExecutorService;
2526
import java.util.concurrent.ScheduledFuture;
27+
import java.util.concurrent.ThreadFactory;
2628
import java.util.concurrent.TimeUnit;
2729
import java.util.concurrent.atomic.AtomicReference;
2830

@@ -40,8 +42,8 @@ public class ConfigMapConsumer implements Map<String, String> {
4042
private final AtomicReference<ScheduledFuture<?>> future = new AtomicReference<>(null);
4143
private final Runnable onUpdate;
4244

43-
public ConfigMapConsumer(ScheduledExecutorService threadPool, String mountPoint, Runnable onUpdate) throws IOException {
44-
this.threadPool = threadPool;
45+
public ConfigMapConsumer(ThreadFactory factory, String mountPoint, Runnable onUpdate) throws IOException {
46+
this.threadPool = Executors.newScheduledThreadPool(2, factory);
4547
this.mountPointDir = new File(mountPoint);
4648
this.watcher = FileSystems.getDefault().newWatchService();
4749
this.onUpdate = onUpdate;

src/main/java/oracle/kubernetes/operator/work/Engine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* Collection of {@link Fiber}s. Owns an {@link Executor} to run them.
1414
*/
1515
public class Engine {
16-
private final int DEFAULT_THREAD_COUNT = 5;
16+
private final int DEFAULT_THREAD_COUNT = 10;
1717

1818
private volatile ScheduledExecutorService threadPool;
1919
public final String id;

0 commit comments

Comments
 (0)