Skip to content

Commit 0fec7ba

Browse files
committed
Update sko objects and improve watching
1 parent 254448e commit 0fec7ba

File tree

9 files changed

+101
-79
lines changed

9 files changed

+101
-79
lines changed

src/main/java/oracle/kubernetes/operator/Main.java

Lines changed: 45 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -602,7 +602,7 @@ public WatchPodReadyAdminStep(Step next) {
602602
@Override
603603
public NextAction apply(Packet packet) {
604604
DomainPresenceInfo info = packet.getSPI(DomainPresenceInfo.class);
605-
V1Pod adminPod = info.getAdmin().getPod();
605+
V1Pod adminPod = info.getAdmin().getPod().get();
606606

607607
PodWatcher pw = podWatchers.get(adminPod.getMetadata().getNamespace());
608608
packet.getComponents().put(PODWATCHER_COMPONENT_NAME, Component.createFor(pw));
@@ -945,10 +945,10 @@ public ManagedServerDownStep(String serverName, ServerKubernetesObjects sko, Ste
945945
@Override
946946
public NextAction apply(Packet packet) {
947947
List<V1Service> services = new ArrayList<V1Service>();
948-
services.add(sko.getService());
948+
services.add(sko.getService().get());
949949
services.addAll(sko.getChannels().values());
950950

951-
return doNext(IngressHelper.createRemoveServerStep(serverName, sko.getService(),
951+
return doNext(IngressHelper.createRemoveServerStep(serverName, sko.getService().get(),
952952
new DeleteServiceListStep(services,
953953
PodHelper.deletePodStep(sko,
954954
new ManagedServerDownFinalizeStep(serverName, next)))), packet);
@@ -1268,6 +1268,8 @@ private static PodWatcher createPodWatcher(String namespace) {
12681268

12691269
private static void dispatchPodWatch(Watch.Response<V1Pod> item) {
12701270
switch (item.type) {
1271+
case "ADDED":
1272+
case "MODIFIED":
12711273
case "DELETED":
12721274
V1Pod p = item.object;
12731275
V1ObjectMeta metadata = p.getMetadata();
@@ -1278,10 +1280,15 @@ private static void dispatchPodWatch(Watch.Response<V1Pod> item) {
12781280
if (info != null && serverName != null) {
12791281
ServerKubernetesObjects sko = info.getServers().get(serverName);
12801282
if (sko != null) {
1281-
if (sko.getPod() != null) {
1282-
// Pod was deleted, but sko still contains a non-null entry
1283-
LOGGER.info(MessageKeys.POD_DELETED, domainUID, metadata.getNamespace(), serverName);
1284-
doCheckAndCreateDomainPresence(info.getDomain(), true);
1283+
if ("DELETED".equals(item.type)) {
1284+
V1Pod oldPod = sko.getPod().getAndSet(null);
1285+
if (oldPod != null) {
1286+
// Pod was deleted, but sko still contained a non-null entry
1287+
LOGGER.info(MessageKeys.POD_DELETED, domainUID, metadata.getNamespace(), serverName);
1288+
doCheckAndCreateDomainPresence(info.getDomain(), true);
1289+
}
1290+
} else { // ADDED, MODIFIED
1291+
sko.getPod().set(p);
12851292
}
12861293
}
12871294
}
@@ -1300,6 +1307,8 @@ private static ServiceWatcher createServiceWatcher(String namespace) {
13001307

13011308
private static void dispatchServiceWatch(Watch.Response<V1Service> item) {
13021309
switch (item.type) {
1310+
case "ADDED":
1311+
case "MODIFIED":
13031312
case "DELETED":
13041313
V1Service s = item.object;
13051314
V1ObjectMeta metadata = s.getMetadata();
@@ -1311,10 +1320,24 @@ private static void dispatchServiceWatch(Watch.Response<V1Service> item) {
13111320
if (info != null && serverName != null) {
13121321
ServerKubernetesObjects sko = info.getServers().get(serverName);
13131322
if (sko != null) {
1314-
if ((channelName != null ? sko.getChannels().get(channelName) : sko.getService()) != null) {
1315-
// Service was deleted, but sko still contains a non-null entry
1316-
LOGGER.info(MessageKeys.SERVICE_DELETED, domainUID, metadata.getNamespace(), serverName);
1317-
doCheckAndCreateDomainPresence(info.getDomain(), true);
1323+
if ("DELETED".equals(item.type)) {
1324+
if (channelName != null) {
1325+
V1Service oldService = sko.getChannels().put(channelName, null);
1326+
if (oldService != null) {
1327+
// Service was deleted, but sko still contained a non-null entry
1328+
LOGGER.info(MessageKeys.SERVICE_DELETED, domainUID, metadata.getNamespace(), serverName);
1329+
doCheckAndCreateDomainPresence(info.getDomain(), true);
1330+
}
1331+
} else {
1332+
V1Service oldService = sko.getService().getAndSet(null);
1333+
if (oldService != null) {
1334+
// Service was deleted, but sko still contained a non-null entry
1335+
LOGGER.info(MessageKeys.SERVICE_DELETED, domainUID, metadata.getNamespace(), serverName);
1336+
doCheckAndCreateDomainPresence(info.getDomain(), true);
1337+
}
1338+
}
1339+
} else { // ADDED, MODIFIED
1340+
sko.getService().set(s);
13181341
}
13191342
}
13201343
}
@@ -1332,6 +1355,8 @@ private static IngressWatcher createIngressWatcher(String namespace) {
13321355

13331356
private static void dispatchIngressWatch(Watch.Response<V1beta1Ingress> item) {
13341357
switch (item.type) {
1358+
case "ADDED":
1359+
case "MODIFIED":
13351360
case "DELETED":
13361361
V1beta1Ingress i = item.object;
13371362
V1ObjectMeta metadata = i.getMetadata();
@@ -1340,10 +1365,15 @@ private static void dispatchIngressWatch(Watch.Response<V1beta1Ingress> item) {
13401365
if (domainUID != null) {
13411366
DomainPresenceInfo info = domains.get(domainUID);
13421367
if (info != null && clusterName != null) {
1343-
if (clusterName != null && info.getIngresses().get(clusterName) != null) {
1344-
// Ingress was deleted, but sko still contains a non-null entry
1345-
LOGGER.info(MessageKeys.INGRESS_DELETED, domainUID, metadata.getNamespace(), clusterName);
1346-
doCheckAndCreateDomainPresence(info.getDomain(), true);
1368+
if ("DELETED".equals(item.type)) {
1369+
V1beta1Ingress oldIngress = info.getIngresses().remove(clusterName);
1370+
if (oldIngress != null) {
1371+
// Ingress was deleted, but sko still contained a non-null entry
1372+
LOGGER.info(MessageKeys.INGRESS_DELETED, domainUID, metadata.getNamespace(), clusterName);
1373+
doCheckAndCreateDomainPresence(info.getDomain(), true);
1374+
}
1375+
} else { // ADDED, MODIFIED
1376+
info.getIngresses().put(clusterName, i);
13471377
}
13481378
}
13491379
}

src/main/java/oracle/kubernetes/operator/helpers/DomainPresenceInfo.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@
44
package oracle.kubernetes.operator.helpers;
55

66
import java.util.Collection;
7-
import java.util.HashMap;
87
import java.util.List;
98
import java.util.Map;
9+
import java.util.concurrent.ConcurrentHashMap;
1010
import java.util.concurrent.atomic.AtomicReference;
1111

1212
import org.joda.time.DateTime;
@@ -28,8 +28,8 @@
2828
*/
2929
public class DomainPresenceInfo {
3030
private final AtomicReference<Domain> domain;
31-
private final Map<String, ServerKubernetesObjects> servers = new HashMap<>();
32-
private final Map<String, V1beta1Ingress> ingresses = new HashMap<>();
31+
private final Map<String, ServerKubernetesObjects> servers = new ConcurrentHashMap<>();
32+
private final Map<String, V1beta1Ingress> ingresses = new ConcurrentHashMap<>();
3333
private final AtomicReference<Collection<ServerStartupInfo>> serverStartupInfo;
3434

3535
private V1PersistentVolumeClaimList claims = null;

src/main/java/oracle/kubernetes/operator/helpers/IngressHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public NextAction apply(Packet packet) {
6262
DomainPresenceInfo info = packet.getSPI(DomainPresenceInfo.class);
6363
ServerKubernetesObjects sko = info.getServers().get(serverName);
6464
if (sko != null) {
65-
V1Service service = sko.getService();
65+
V1Service service = sko.getService().get();
6666
if (service != null) {
6767
// If we have a cluster, create a cluster level ingress
6868
if (clusterName != null) {

src/main/java/oracle/kubernetes/operator/helpers/PodHelper.java

Lines changed: 34 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,8 @@ public NextAction apply(Packet packet) {
205205
// Create or replace, if necessary
206206
ServerKubernetesObjects sko = info.getServers().computeIfAbsent(spec.getAsName(), k -> new ServerKubernetesObjects());
207207

208-
if (isExplicitRestartThisServer || sko.getPod() == null || !validateCurrentPod(adminPod, sko.getPod())) {
208+
V1Pod skoPod = sko.getPod().get();
209+
if (isExplicitRestartThisServer || skoPod == null || !validateCurrentPod(adminPod, skoPod)) {
209210
// There is no Pod or Pod spec has changed
210211
// First, verify there is no existing Pod
211212
Step read = CallBuilder.create().readPodAsync(podName, namespace, new ResponseStep<V1Pod>(next) {
@@ -228,15 +229,15 @@ public NextAction onSuccess(Packet packet, V1Pod result, int statusCode,
228229
Map<String, List<String>> responseHeaders) {
229230

230231
LOGGER.info(MessageKeys.ADMIN_POD_CREATED, weblogicDomainUID, spec.getAsName());
231-
sko.setPod(result);
232+
sko.getPod().set(result);
232233
return doNext(packet);
233234
}
234235
});
235236
return doNext(create, packet);
236237
} else if (!isExplicitRestartThisServer && (AnnotationHelper.checkDomainAnnotation(result.getMetadata(), dom) || validateCurrentPod(adminPod, result))) {
237238
// existing Pod has correct spec
238239
LOGGER.info(MessageKeys.ADMIN_POD_EXISTS, weblogicDomainUID, spec.getAsName());
239-
sko.setPod(result);
240+
sko.getPod().set(result);
240241
return doNext(packet);
241242
} else {
242243
// we need to update the Pod
@@ -279,7 +280,7 @@ public CyclePodStep(String podName, String namespace, V1Pod newPod, String messa
279280
public NextAction apply(Packet packet) {
280281
V1DeleteOptions deleteOptions = new V1DeleteOptions();
281282
// Set to null so that watcher doesn't recreate pod with old spec
282-
sko.setPod(null);
283+
sko.getPod().set(null);
283284
Step delete = CallBuilder.create().deletePodAsync(podName, namespace, deleteOptions, new ResponseStep<V1Status>(next) {
284285
@Override
285286
public NextAction onFailure(Packet packet, ApiException e, int statusCode,
@@ -299,7 +300,7 @@ public NextAction onSuccess(Packet packet, V1Pod result, int statusCode,
299300
Map<String, List<String>> responseHeaders) {
300301

301302
LOGGER.info(messageKey, weblogicDomainUID, serverName);
302-
sko.setPod(result);
303+
sko.getPod().set(result);
303304

304305
PodWatcher pw = packet.getSPI(PodWatcher.class);
305306
return doNext(pw.waitForReady(result, next), packet);
@@ -546,7 +547,8 @@ public NextAction apply(Packet packet) {
546547
// Create or replace, if necessary
547548
ServerKubernetesObjects sko = info.getServers().computeIfAbsent(weblogicServerName, k -> new ServerKubernetesObjects());
548549

549-
if (isExplicitRestartThisServer || sko.getPod() == null || !validateCurrentPod(pod, sko.getPod())) {
550+
V1Pod skoPod = sko.getPod().get();
551+
if (isExplicitRestartThisServer || skoPod == null || !validateCurrentPod(pod, skoPod)) {
550552
// There is no Pod or Pod spec has changed
551553
// First, verify there is no existing Pod
552554
Step read = CallBuilder.create().readPodAsync(podName, namespace, new ResponseStep<V1Pod>(next) {
@@ -569,15 +571,15 @@ public NextAction onSuccess(Packet packet, V1Pod result, int statusCode,
569571
Map<String, List<String>> responseHeaders) {
570572

571573
LOGGER.info(MessageKeys.MANAGED_POD_CREATED, weblogicDomainUID, weblogicServerName);
572-
sko.setPod(result);
574+
sko.getPod().set(result);
573575
return doNext(packet);
574576
}
575577
});
576578
return doNext(DomainStatusUpdater.createProgressingStep(create, false), packet);
577579
} else if (!isExplicitRestartThisServer && (AnnotationHelper.checkDomainAnnotation(result.getMetadata(), dom) || validateCurrentPod(pod, result))) {
578580
// existing Pod has correct spec
579581
LOGGER.info(MessageKeys.MANAGED_POD_EXISTS, weblogicDomainUID, weblogicServerName);
580-
sko.setPod(result);
582+
sko.getPod().set(result);
581583
return doNext(packet);
582584
} else {
583585
// we need to update the Pod
@@ -637,7 +639,7 @@ private static void addEnvVar(V1Container container, String name, String value)
637639
* @param next Next processing step
638640
* @return Step for deleting server pod
639641
*/
640-
public static Step deletePodStep(ServerKubernetesObjects sko,Step next) {
642+
public static Step deletePodStep(ServerKubernetesObjects sko, Step next) {
641643
return new DeletePodStep(sko, next);
642644
}
643645

@@ -651,26 +653,30 @@ public DeletePodStep(ServerKubernetesObjects sko, Step next) {
651653

652654
@Override
653655
public NextAction apply(Packet packet) {
654-
V1ObjectMeta meta = sko.getPod().getMetadata();
655-
V1DeleteOptions deleteOptions = new V1DeleteOptions();
656-
// Set pod to null so that watcher doesn't try to recreate pod
657-
sko.setPod(null);
658-
return doNext(CallBuilder.create().deletePodAsync(meta.getName(), meta.getNamespace(), deleteOptions, new ResponseStep<V1Status>(next) {
659-
@Override
660-
public NextAction onFailure(Packet packet, ApiException e, int statusCode,
661-
Map<String, List<String>> responseHeaders) {
662-
if (statusCode == CallBuilder.NOT_FOUND) {
663-
return onSuccess(packet, null, statusCode, responseHeaders);
656+
V1Pod pod = sko.getPod().get();
657+
if (pod != null) {
658+
V1ObjectMeta meta = pod.getMetadata();
659+
V1DeleteOptions deleteOptions = new V1DeleteOptions();
660+
// Set pod to null so that watcher doesn't try to recreate pod
661+
sko.getPod().set(null);
662+
return doNext(CallBuilder.create().deletePodAsync(meta.getName(), meta.getNamespace(), deleteOptions, new ResponseStep<V1Status>(next) {
663+
@Override
664+
public NextAction onFailure(Packet packet, ApiException e, int statusCode,
665+
Map<String, List<String>> responseHeaders) {
666+
if (statusCode == CallBuilder.NOT_FOUND) {
667+
return onSuccess(packet, null, statusCode, responseHeaders);
668+
}
669+
return super.onFailure(packet, e, statusCode, responseHeaders);
664670
}
665-
return super.onFailure(packet, e, statusCode, responseHeaders);
666-
}
667-
668-
@Override
669-
public NextAction onSuccess(Packet packet, V1Status result, int statusCode,
670-
Map<String, List<String>> responseHeaders) {
671-
return doNext(next, packet);
672-
}
673-
}), packet);
671+
672+
@Override
673+
public NextAction onSuccess(Packet packet, V1Status result, int statusCode,
674+
Map<String, List<String>> responseHeaders) {
675+
return doNext(next, packet);
676+
}
677+
}), packet);
678+
}
679+
return doNext(packet);
674680
}
675681
}
676682
}

src/main/java/oracle/kubernetes/operator/helpers/ServerKubernetesObjects.java

Lines changed: 5 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import java.util.HashMap;
77
import java.util.Map;
8+
import java.util.concurrent.atomic.AtomicReference;
89

910
import io.kubernetes.client.models.V1Pod;
1011
import io.kubernetes.client.models.V1Service;
@@ -14,42 +15,26 @@
1415
*
1516
*/
1617
public class ServerKubernetesObjects {
17-
private V1Pod pod = null;
18-
private V1Service service = null;
18+
private final AtomicReference<V1Pod> pod = new AtomicReference<>(null);
19+
private final AtomicReference<V1Service> service = new AtomicReference<>(null);
1920
private Map<String, V1Service> channels = null;
2021

2122
/**
2223
* The Pod
2324
* @return Pod
2425
*/
25-
public V1Pod getPod() {
26+
public AtomicReference<V1Pod> getPod() {
2627
return pod;
2728
}
2829

29-
/**
30-
* Sets pod
31-
* @param pod Pod
32-
*/
33-
public void setPod(V1Pod pod) {
34-
this.pod = pod;
35-
}
36-
3730
/**
3831
* The Service
3932
* @return Service
4033
*/
41-
public V1Service getService() {
34+
public AtomicReference<V1Service> getService() {
4235
return service;
4336
}
4437

45-
/**
46-
* Sets service
47-
* @param service Service
48-
*/
49-
public void setService(V1Service service) {
50-
this.service = service;
51-
}
52-
5338
/**
5439
* Channel map
5540
* @return Map from channel name to Service

src/main/java/oracle/kubernetes/operator/helpers/ServiceHelper.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,8 @@ public NextAction apply(Packet packet) {
101101
// Create or replace, if necessary
102102
ServerKubernetesObjects sko = info.getServers().computeIfAbsent(serverName, k -> new ServerKubernetesObjects());
103103

104-
if (sko.getService() == null || !validateCurrentService(service, sko.getService())) {
104+
V1Service skoService = sko.getService().get();
105+
if (skoService == null || !validateCurrentService(service, skoService)) {
105106
// There is no Service or Service spec has changed
106107
// First, verify there is no existing Service
107108
Step read = CallBuilder.create().readServiceAsync(name, namespace, new ResponseStep<V1Service>(next) {
@@ -124,15 +125,15 @@ public NextAction onSuccess(Packet packet, V1Service result, int statusCode,
124125
Map<String, List<String>> responseHeaders) {
125126

126127
LOGGER.info(serverName.equals(spec.getAsName()) ? MessageKeys.ADMIN_SERVICE_CREATED : MessageKeys.MANAGED_SERVICE_CREATED, weblogicDomainUID, serverName);
127-
sko.setService(result);
128+
sko.getService().set(result);
128129
return doNext(packet);
129130
}
130131
});
131132
return doNext(create, packet);
132133
} else if (AnnotationHelper.checkDomainAnnotation(result.getMetadata(), dom) || validateCurrentService(service, result)) {
133134
// existing Service has correct spec
134135
LOGGER.info(serverName.equals(spec.getAsName()) ? MessageKeys.ADMIN_SERVICE_EXISTS : MessageKeys.MANAGED_SERVICE_EXISTS, weblogicDomainUID, serverName);
135-
sko.setService(result);
136+
sko.getService().set(result);
136137
return doNext(packet);
137138
} else {
138139
// we need to update the Service
@@ -217,7 +218,7 @@ public NextAction apply(Packet packet) {
217218
if (channelName != null) {
218219
sko.getChannels().remove(channelName);
219220
} else {
220-
sko.setService(null);
221+
sko.getService().set(null);
221222
}
222223
Step delete = CallBuilder.create().deleteServiceAsync(serviceName, namespace, new ResponseStep<V1Status>(next) {
223224
@Override
@@ -241,7 +242,7 @@ public NextAction onSuccess(Packet packet, V1Service result, int statusCode,
241242
if (channelName != null) {
242243
sko.getChannels().put(channelName, result);
243244
} else {
244-
sko.setService(result);
245+
sko.getService().set(result);
245246
}
246247
return doNext(packet);
247248
}

src/main/java/oracle/kubernetes/operator/wlsconfig/WlsConfigRetriever.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ public NextAction apply(Packet packet) {
157157
DomainPresenceInfo info = packet.getSPI(DomainPresenceInfo.class);
158158
Domain dom = info.getDomain();
159159

160-
String serviceURL = HttpClient.getServiceURL(info.getAdmin().getService());
160+
String serviceURL = HttpClient.getServiceURL(info.getAdmin().getService().get());
161161

162162
WlsDomainConfig wlsDomainConfig = null;
163163
String jsonResult = httpClient.executePostUrlOnServiceClusterIP(WlsDomainConfig.getRetrieveServersSearchUrl(), serviceURL, adminServerServiceName, namespace, WlsDomainConfig.getRetrieveServersSearchPayload());

0 commit comments

Comments
 (0)