Skip to content

Commit 50e837a

Browse files
committed
Expand unit test coverage for watchers
1 parent 7bdd3ce commit 50e837a

17 files changed

+620
-369
lines changed

src/main/java/oracle/kubernetes/operator/ConfigMapWatcher.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import oracle.kubernetes.operator.builders.WatchI;
1111
import oracle.kubernetes.operator.helpers.ClientHelper;
1212
import oracle.kubernetes.operator.helpers.ClientHolder;
13-
import oracle.kubernetes.operator.watcher.Watcher;
13+
import oracle.kubernetes.operator.watcher.ThreadedWatcher;
1414
import oracle.kubernetes.operator.watcher.Watching;
1515
import oracle.kubernetes.operator.watcher.WatchingEventDestination;
1616

@@ -20,18 +20,20 @@
2020
* This class handles ConfigMap watching. It receives config map change events and sends
2121
* them into the operator for processing.
2222
*/
23-
public class ConfigMapWatcher implements Runnable {
23+
public class ConfigMapWatcher implements Runnable, ThreadedWatcher {
2424
private final String ns;
2525
private final String initialResourceVersion;
2626
private final WatchingEventDestination<V1ConfigMap> destination;
2727
private final AtomicBoolean isStopping;
28-
28+
private Thread thread;
29+
2930
public static ConfigMapWatcher create(String ns, String initialResourceVersion, WatchingEventDestination<V1ConfigMap> destination, AtomicBoolean isStopping) {
3031
ConfigMapWatcher dlw = new ConfigMapWatcher(ns, initialResourceVersion, destination, isStopping);
3132
Thread thread = new Thread(dlw);
3233
thread.setName("Thread-ConfigMapWatcher-" + ns);
3334
thread.setDaemon(true);
3435
thread.start();
36+
dlw.thread = thread;
3537
return dlw;
3638
}
3739

@@ -42,6 +44,11 @@ private ConfigMapWatcher(String ns, String initialResourceVersion, WatchingEvent
4244
this.isStopping = isStopping;
4345
}
4446

47+
@Override
48+
public Thread getThread() {
49+
return thread;
50+
}
51+
4552
/**
4653
* Polling loop. Get the next ConfigMap object event and process it.
4754
*/
@@ -73,10 +80,13 @@ protected Watching<V1ConfigMap> createWatching(ClientHolder client) {
7380
*/
7481
@Override
7582
public WatchI<V1ConfigMap> initiateWatch(String resourceVersion) throws ApiException {
76-
return new WatchBuilder(client)
77-
.withResourceVersion(resourceVersion)
78-
.withLabelSelector(LabelConstants.CREATEDBYOPERATOR_LABEL)
79-
.createConfigMapWatch(ns);
83+
return initiateWatch(new WatchBuilder(client).withResourceVersion(resourceVersion));
84+
}
85+
86+
@Override
87+
public WatchI<V1ConfigMap> initiateWatch(WatchBuilder watchBuilder) throws ApiException {
88+
return watchBuilder.withLabelSelector(LabelConstants.CREATEDBYOPERATOR_LABEL)
89+
.createConfigMapWatch(ns);
8090
}
8191

8292
@Override

src/main/java/oracle/kubernetes/operator/DomainWatcher.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import oracle.kubernetes.weblogic.domain.v1.Domain;
1111
import oracle.kubernetes.operator.helpers.ClientHelper;
1212
import oracle.kubernetes.operator.helpers.ClientHolder;
13-
import oracle.kubernetes.operator.watcher.Watcher;
1413
import oracle.kubernetes.operator.watcher.Watching;
1514
import oracle.kubernetes.operator.watcher.WatchingEventDestination;
1615

@@ -20,18 +19,20 @@
2019
* This class handles Domain watching. It receives domain events and sends
2120
* them into the operator for processing.
2221
*/
23-
public class DomainWatcher implements Runnable {
22+
public class DomainWatcher implements Runnable, oracle.kubernetes.operator.watcher.ThreadedWatcher {
2423
private final String ns;
2524
private final String initialResourceVersion;
2625
private final WatchingEventDestination<Domain> destination;
2726
private final AtomicBoolean isStopping;
27+
private Thread thread;
2828

2929
public static DomainWatcher create(String ns, String initialResourceVersion, WatchingEventDestination<Domain> destination, AtomicBoolean isStopping) {
3030
DomainWatcher dlw = new DomainWatcher(ns, initialResourceVersion, destination, isStopping);
3131
Thread thread = new Thread(dlw);
3232
thread.setName("Thread-DomainWatcher-" + ns);
3333
thread.setDaemon(true);
3434
thread.start();
35+
dlw.thread = thread;
3536
return dlw;
3637
}
3738

@@ -42,6 +43,11 @@ private DomainWatcher(String ns, String initialResourceVersion, WatchingEventDes
4243
this.isStopping = isStopping;
4344
}
4445

46+
@Override
47+
public Thread getThread() {
48+
return thread;
49+
}
50+
4551
/**
4652
* Polling loop. Get the next Domain object event and process it.
4753
*/
@@ -73,9 +79,11 @@ protected Watching<Domain> createWatching(ClientHolder client) {
7379
*/
7480
@Override
7581
public WatchI<Domain> initiateWatch(String resourceVersion) throws ApiException {
76-
return new WatchBuilder(client)
77-
.withResourceVersion(resourceVersion)
78-
.createDomainWatch(ns);
82+
return initiateWatch(new WatchBuilder(client).withResourceVersion(resourceVersion));
83+
}
84+
85+
public WatchI<Domain> initiateWatch(WatchBuilder watchBuilder) throws ApiException {
86+
return watchBuilder.createDomainWatch(ns);
7987
}
8088

8189
@Override

src/main/java/oracle/kubernetes/operator/IngressWatcher.java

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import oracle.kubernetes.operator.helpers.ClientHelper;
1212
import oracle.kubernetes.operator.helpers.ClientHolder;
1313
import oracle.kubernetes.operator.builders.WatchBuilder;
14-
import oracle.kubernetes.operator.watcher.Watcher;
14+
import oracle.kubernetes.operator.watcher.ThreadedWatcher;
1515
import oracle.kubernetes.operator.watcher.Watching;
1616
import oracle.kubernetes.operator.watcher.WatchingEventDestination;
1717

@@ -22,18 +22,20 @@
2222
* This class handles Ingress watching. It receives Ingress change events and sends
2323
* them into the operator for processing.
2424
*/
25-
public class IngressWatcher implements Runnable {
25+
public class IngressWatcher implements Runnable, ThreadedWatcher {
2626
private final String ns;
2727
private final String initialResourceVersion;
2828
private final WatchingEventDestination<V1beta1Ingress> destination;
2929
private final AtomicBoolean isStopping;
30-
30+
private Thread thread;
31+
3132
public static IngressWatcher create(String ns, String initialResourceVersion, WatchingEventDestination<V1beta1Ingress> destination, AtomicBoolean isStopping) {
3233
IngressWatcher dlw = new IngressWatcher(ns, initialResourceVersion, destination, isStopping);
3334
Thread thread = new Thread(dlw);
3435
thread.setName("Thread-IngressWatcher-" + ns);
3536
thread.setDaemon(true);
3637
thread.start();
38+
dlw.thread = thread;
3739
return dlw;
3840
}
3941

@@ -44,6 +46,10 @@ private IngressWatcher(String ns, String initialResourceVersion, WatchingEventDe
4446
this.isStopping = isStopping;
4547
}
4648

49+
public Thread getThread() {
50+
return thread;
51+
}
52+
4753
/**
4854
* Polling loop. Get the next Ingress object event and process it.
4955
*/
@@ -63,7 +69,7 @@ public void run() {
6369
}
6470
}
6571

66-
protected Watching<V1beta1Ingress> createWatching(ClientHolder client) {
72+
private Watching<V1beta1Ingress> createWatching(ClientHolder client) {
6773
return new Watching<V1beta1Ingress>() {
6874

6975
/**
@@ -75,11 +81,13 @@ protected Watching<V1beta1Ingress> createWatching(ClientHolder client) {
7581
*/
7682
@Override
7783
public WatchI<V1beta1Ingress> initiateWatch(String resourceVersion) throws ApiException {
78-
return new WatchBuilder(client)
79-
.withResourceVersion(resourceVersion)
80-
.withLabelSelector(LabelConstants.DOMAINUID_LABEL
81-
+ "," + LabelConstants.CREATEDBYOPERATOR_LABEL)
82-
.createIngressWatch(ns);
84+
return initiateWatch(new WatchBuilder(client).withResourceVersion(resourceVersion));
85+
}
86+
87+
public WatchI<V1beta1Ingress> initiateWatch(WatchBuilder watchBuilder) throws ApiException {
88+
return watchBuilder
89+
.withLabelSelectors(LabelConstants.DOMAINUID_LABEL, LabelConstants.CREATEDBYOPERATOR_LABEL)
90+
.createIngressWatch(ns);
8391
}
8492

8593
@Override

src/main/java/oracle/kubernetes/operator/PodWatcher.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import oracle.kubernetes.operator.logging.LoggingFacade;
1818
import oracle.kubernetes.operator.logging.LoggingFactory;
1919
import oracle.kubernetes.operator.logging.MessageKeys;
20-
import oracle.kubernetes.operator.watcher.Watcher;
20+
import oracle.kubernetes.operator.watcher.ThreadedWatcher;
2121
import oracle.kubernetes.operator.watcher.Watching;
2222
import oracle.kubernetes.operator.watcher.WatchingEventDestination;
2323
import oracle.kubernetes.operator.work.NextAction;
@@ -34,13 +34,14 @@
3434
* Watches for Pods to become Ready or leave Ready state
3535
*
3636
*/
37-
public class PodWatcher implements Runnable {
37+
public class PodWatcher implements Runnable, ThreadedWatcher {
3838
private static final LoggingFacade LOGGER = LoggingFactory.getLogger("Operator", "Operator");
3939

4040
private final String ns;
4141
private final String initialResourceVersion;
4242
private final WatchingEventDestination<V1Pod> destination;
4343
private final AtomicBoolean isStopping;
44+
private Thread thread;
4445

4546
// Map of Pod name to OnReady
4647
private final ConcurrentMap<String, OnReady> readyCallbackRegistrations = new ConcurrentHashMap<>();
@@ -59,6 +60,7 @@ public static PodWatcher create(String ns, String initialResourceVersion, Watchi
5960
thread.setName("Thread-PodWatcher-" + ns);
6061
thread.setDaemon(true);
6162
thread.start();
63+
prw.thread = thread;
6264
return prw;
6365
}
6466

@@ -69,6 +71,10 @@ private PodWatcher(String ns, String initialResourceVersion, WatchingEventDestin
6971
this.isStopping = isStopping;
7072
}
7173

74+
public Thread getThread() {
75+
return thread;
76+
}
77+
7278
/**
7379
* Polling loop. Get the next Pod object event and process it.
7480
*/
@@ -100,11 +106,14 @@ private Watching<V1Pod> createWatching(ClientHolder client) {
100106
*/
101107
@Override
102108
public WatchI<V1Pod> initiateWatch(String resourceVersion) throws ApiException {
103-
return new WatchBuilder(client)
104-
.withResourceVersion(resourceVersion)
105-
.withLabelSelector(LabelConstants.DOMAINUID_LABEL
106-
+ "," + LabelConstants.CREATEDBYOPERATOR_LABEL)
107-
.createPodWatch(ns);
109+
return initiateWatch(new WatchBuilder(client).withResourceVersion(resourceVersion));
110+
}
111+
112+
@Override
113+
public WatchI<V1Pod> initiateWatch(WatchBuilder watchBuilder) throws ApiException {
114+
return watchBuilder
115+
.withLabelSelectors(LabelConstants.DOMAINUID_LABEL, LabelConstants.CREATEDBYOPERATOR_LABEL)
116+
.createPodWatch(ns);
108117
}
109118

110119
@Override

src/main/java/oracle/kubernetes/operator/ServiceWatcher.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import oracle.kubernetes.operator.builders.WatchI;
1212
import oracle.kubernetes.operator.helpers.ClientHelper;
1313
import oracle.kubernetes.operator.helpers.ClientHolder;
14-
import oracle.kubernetes.operator.watcher.Watcher;
14+
import oracle.kubernetes.operator.watcher.ThreadedWatcher;
1515
import oracle.kubernetes.operator.watcher.Watching;
1616
import oracle.kubernetes.operator.watcher.WatchingEventDestination;
1717

@@ -22,18 +22,20 @@
2222
* This class handles Service watching. It receives service change events and sends
2323
* them into the operator for processing.
2424
*/
25-
public class ServiceWatcher implements Runnable {
25+
public class ServiceWatcher implements Runnable, ThreadedWatcher {
2626
private final String ns;
2727
private final String initialResourceVersion;
2828
private final WatchingEventDestination<V1Service> destination;
2929
private final AtomicBoolean isStopping;
30-
30+
private Thread thread;
31+
3132
public static ServiceWatcher create(String ns, String initialResourceVersion, WatchingEventDestination<V1Service> destination, AtomicBoolean isStopping) {
3233
ServiceWatcher dlw = new ServiceWatcher(ns, initialResourceVersion, destination, isStopping);
3334
Thread thread = new Thread(dlw);
3435
thread.setName("Thread-ServiceWatcher-" + ns);
3536
thread.setDaemon(true);
3637
thread.start();
38+
dlw.thread = thread;
3739
return dlw;
3840
}
3941

@@ -44,6 +46,10 @@ private ServiceWatcher(String ns, String initialResourceVersion, WatchingEventDe
4446
this.isStopping = isStopping;
4547
}
4648

49+
public Thread getThread() {
50+
return thread;
51+
}
52+
4753
/**
4854
* Polling loop. Get the next Service object event and process it.
4955
*/
@@ -63,7 +69,7 @@ public void run() {
6369
}
6470
}
6571

66-
protected Watching<V1Service> createWatching(ClientHolder client) {
72+
private Watching<V1Service> createWatching(ClientHolder client) {
6773
return new Watching<V1Service>() {
6874

6975
/**
@@ -75,11 +81,14 @@ protected Watching<V1Service> createWatching(ClientHolder client) {
7581
*/
7682
@Override
7783
public WatchI<V1Service> initiateWatch(String resourceVersion) throws ApiException {
78-
return new WatchBuilder(client)
79-
.withResourceVersion(resourceVersion)
80-
.withLabelSelector(LabelConstants.DOMAINUID_LABEL
81-
+ "," + LabelConstants.CREATEDBYOPERATOR_LABEL)
82-
.createServiceWatch(ns);
84+
return initiateWatch(new WatchBuilder(client).withResourceVersion(resourceVersion));
85+
}
86+
87+
@Override
88+
public WatchI<V1Service> initiateWatch(WatchBuilder watchBuilder) throws ApiException {
89+
return watchBuilder
90+
.withLabelSelectors(LabelConstants.DOMAINUID_LABEL, LabelConstants.CREATEDBYOPERATOR_LABEL)
91+
.createServiceWatch(ns);
8392
}
8493

8594
@Override

src/main/java/oracle/kubernetes/operator/watcher/Watcher.java renamed to src/main/java/oracle/kubernetes/operator/Watcher.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright 2017, 2018 Oracle Corporation and/or its affiliates. All rights reserved.
22
// Licensed under the Universal Permissive License v 1.0 as shown at http://oss.oracle.com/licenses/upl.
33

4-
package oracle.kubernetes.operator.watcher;
4+
package oracle.kubernetes.operator;
55

66
import io.kubernetes.client.ApiException;
77
import io.kubernetes.client.models.V1ObjectMeta;
@@ -11,6 +11,7 @@
1111
import oracle.kubernetes.operator.logging.LoggingFacade;
1212
import oracle.kubernetes.operator.logging.LoggingFactory;
1313
import oracle.kubernetes.operator.logging.MessageKeys;
14+
import oracle.kubernetes.operator.watcher.Watching;
1415

1516
import java.io.IOException;
1617
import java.lang.reflect.Field;
@@ -42,9 +43,9 @@ public Watcher(Watching<T> watching, String resourceVersion) {
4243
* Kick off the watcher processing that runs in a separate thread.
4344
* @return Started thread
4445
*/
45-
public Thread start() {
46+
public Thread start(String threadName) {
4647
Thread thread = new Thread(this::doWatch);
47-
thread.setName("Watcher");
48+
thread.setName(threadName);
4849
thread.setDaemon(true);
4950
thread.start();
5051
return thread;

src/main/java/oracle/kubernetes/operator/builders/WatchBuilder.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33

44
package oracle.kubernetes.operator.builders;
55

6-
import com.squareup.okhttp.Call;
76
import io.kubernetes.client.ApiException;
87
import io.kubernetes.client.ProgressRequestBody;
98
import io.kubernetes.client.ProgressResponseBody;
@@ -12,8 +11,10 @@
1211
import io.kubernetes.client.models.V1Service;
1312
import io.kubernetes.client.models.V1beta1Ingress;
1413
import io.kubernetes.client.util.Watch;
15-
import oracle.kubernetes.weblogic.domain.v1.Domain;
1614
import oracle.kubernetes.operator.helpers.ClientHolder;
15+
import oracle.kubernetes.weblogic.domain.v1.Domain;
16+
17+
import com.squareup.okhttp.Call;
1718

1819
import java.lang.reflect.ParameterizedType;
1920
import java.lang.reflect.Type;
@@ -230,6 +231,15 @@ public WatchBuilder withLabelSelector(String labelSelector) {
230231
return this;
231232
}
232233

234+
public WatchBuilder withLabelSelectors(String... labelSelectors) {
235+
callParams.setLabelSelector(String.join(",", labelSelectors));
236+
return this;
237+
}
238+
239+
private String asList(String... selectors) {
240+
return String.join(",", selectors);
241+
}
242+
233243
public WatchBuilder withLimit(Integer limit) {
234244
callParams.setLimit(limit);
235245
return this;
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package oracle.kubernetes.operator.watcher;
2+
3+
public interface ThreadedWatcher {
4+
Thread getThread();
5+
}

0 commit comments

Comments
 (0)