Skip to content

Commit 3edbfe6

Browse files
committed
Improve watch asynchronicity
1 parent 0fec7ba commit 3edbfe6

File tree

3 files changed

+100
-72
lines changed

3 files changed

+100
-72
lines changed

src/main/java/oracle/kubernetes/operator/Main.java

Lines changed: 90 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1267,36 +1267,43 @@ private static PodWatcher createPodWatcher(String namespace) {
12671267
}
12681268

12691269
private static void dispatchPodWatch(Watch.Response<V1Pod> item) {
1270-
switch (item.type) {
1271-
case "ADDED":
1272-
case "MODIFIED":
1273-
case "DELETED":
1274-
V1Pod p = item.object;
1275-
V1ObjectMeta metadata = p.getMetadata();
1276-
String domainUID = metadata.getLabels().get(LabelConstants.DOMAINUID_LABEL);
1277-
String serverName = metadata.getLabels().get(LabelConstants.SERVERNAME_LABEL);
1278-
if (domainUID != null) {
1279-
DomainPresenceInfo info = domains.get(domainUID);
1280-
if (info != null && serverName != null) {
1281-
ServerKubernetesObjects sko = info.getServers().get(serverName);
1282-
if (sko != null) {
1283-
if ("DELETED".equals(item.type)) {
1270+
V1Pod p = item.object;
1271+
if (p != null) {
1272+
V1ObjectMeta metadata = p.getMetadata();
1273+
String domainUID = metadata.getLabels().get(LabelConstants.DOMAINUID_LABEL);
1274+
String serverName = metadata.getLabels().get(LabelConstants.SERVERNAME_LABEL);
1275+
if (domainUID != null) {
1276+
DomainPresenceInfo info = domains.get(domainUID);
1277+
if (info != null && serverName != null) {
1278+
ServerKubernetesObjects sko = info.getServers().get(serverName);
1279+
if (sko != null) {
1280+
switch (item.type) {
1281+
case "ADDED":
1282+
sko.getPod().set(p);
1283+
break;
1284+
case "MODIFIED":
1285+
V1Pod skoPod = sko.getPod().get();
1286+
if (skoPod != null) {
1287+
// If the skoPod is null then the operator deleted this pod
1288+
// and modifications are to the terminating pod
1289+
sko.getPod().compareAndSet(skoPod, p);
1290+
}
1291+
break;
1292+
case "DELETED":
12841293
V1Pod oldPod = sko.getPod().getAndSet(null);
12851294
if (oldPod != null) {
12861295
// Pod was deleted, but sko still contained a non-null entry
12871296
LOGGER.info(MessageKeys.POD_DELETED, domainUID, metadata.getNamespace(), serverName);
12881297
doCheckAndCreateDomainPresence(info.getDomain(), true);
12891298
}
1290-
} else { // ADDED, MODIFIED
1291-
sko.getPod().set(p);
1292-
}
1299+
break;
1300+
1301+
case "ERROR":
1302+
default:
12931303
}
12941304
}
12951305
}
1296-
break;
1297-
1298-
case "ERROR":
1299-
default:
1306+
}
13001307
}
13011308
}
13021309

@@ -1306,21 +1313,39 @@ private static ServiceWatcher createServiceWatcher(String namespace) {
13061313
}
13071314

13081315
private static void dispatchServiceWatch(Watch.Response<V1Service> item) {
1309-
switch (item.type) {
1310-
case "ADDED":
1311-
case "MODIFIED":
1312-
case "DELETED":
1313-
V1Service s = item.object;
1314-
V1ObjectMeta metadata = s.getMetadata();
1315-
String domainUID = metadata.getLabels().get(LabelConstants.DOMAINUID_LABEL);
1316-
String serverName = metadata.getLabels().get(LabelConstants.SERVERNAME_LABEL);
1317-
String channelName = metadata.getLabels().get(LabelConstants.CHANNELNAME_LABEL);
1318-
if (domainUID != null) {
1319-
DomainPresenceInfo info = domains.get(domainUID);
1320-
if (info != null && serverName != null) {
1321-
ServerKubernetesObjects sko = info.getServers().get(serverName);
1322-
if (sko != null) {
1323-
if ("DELETED".equals(item.type)) {
1316+
V1Service s = item.object;
1317+
if (s != null) {
1318+
V1ObjectMeta metadata = s.getMetadata();
1319+
String domainUID = metadata.getLabels().get(LabelConstants.DOMAINUID_LABEL);
1320+
String serverName = metadata.getLabels().get(LabelConstants.SERVERNAME_LABEL);
1321+
String channelName = metadata.getLabels().get(LabelConstants.CHANNELNAME_LABEL);
1322+
if (domainUID != null) {
1323+
DomainPresenceInfo info = domains.get(domainUID);
1324+
if (info != null && serverName != null) {
1325+
ServerKubernetesObjects sko = info.getServers().get(serverName);
1326+
if (sko != null) {
1327+
switch (item.type) {
1328+
case "ADDED":
1329+
if (channelName != null) {
1330+
sko.getChannels().put(channelName, s);
1331+
} else {
1332+
sko.getService().set(s);
1333+
}
1334+
break;
1335+
case "MODIFIED":
1336+
if (channelName != null) {
1337+
V1Service skoService = sko.getChannels().get(channelName);
1338+
if (skoService != null) {
1339+
sko.getChannels().replace(channelName, skoService, s);
1340+
}
1341+
} else {
1342+
V1Service skoService = sko.getService().get();
1343+
if (skoService != null) {
1344+
sko.getService().compareAndSet(skoService, s);
1345+
}
1346+
}
1347+
break;
1348+
case "DELETED":
13241349
if (channelName != null) {
13251350
V1Service oldService = sko.getChannels().put(channelName, null);
13261351
if (oldService != null) {
@@ -1336,16 +1361,14 @@ private static void dispatchServiceWatch(Watch.Response<V1Service> item) {
13361361
doCheckAndCreateDomainPresence(info.getDomain(), true);
13371362
}
13381363
}
1339-
} else { // ADDED, MODIFIED
1340-
sko.getService().set(s);
1341-
}
1364+
break;
1365+
1366+
case "ERROR":
1367+
default:
13421368
}
13431369
}
13441370
}
1345-
break;
1346-
1347-
case "ERROR":
1348-
default:
1371+
}
13491372
}
13501373
}
13511374

@@ -1354,34 +1377,39 @@ private static IngressWatcher createIngressWatcher(String namespace) {
13541377
}
13551378

13561379
private static void dispatchIngressWatch(Watch.Response<V1beta1Ingress> item) {
1357-
switch (item.type) {
1358-
case "ADDED":
1359-
case "MODIFIED":
1360-
case "DELETED":
1361-
V1beta1Ingress i = item.object;
1380+
V1beta1Ingress i = item.object;
1381+
if (i != null) {
13621382
V1ObjectMeta metadata = i.getMetadata();
13631383
String domainUID = metadata.getLabels().get(LabelConstants.DOMAINUID_LABEL);
13641384
String clusterName = metadata.getLabels().get(LabelConstants.CLUSTERNAME_LABEL);
13651385
if (domainUID != null) {
13661386
DomainPresenceInfo info = domains.get(domainUID);
13671387
if (info != null && clusterName != null) {
1368-
if ("DELETED".equals(item.type)) {
1369-
V1beta1Ingress oldIngress = info.getIngresses().remove(clusterName);
1370-
if (oldIngress != null) {
1371-
// Ingress was deleted, but sko still contained a non-null entry
1372-
LOGGER.info(MessageKeys.INGRESS_DELETED, domainUID, metadata.getNamespace(), clusterName);
1373-
doCheckAndCreateDomainPresence(info.getDomain(), true);
1374-
}
1375-
} else { // ADDED, MODIFIED
1376-
info.getIngresses().put(clusterName, i);
1388+
switch (item.type) {
1389+
case "ADDED":
1390+
info.getIngresses().put(clusterName, i);
1391+
break;
1392+
case "MODIFIED":
1393+
V1beta1Ingress skoIngress = info.getIngresses().get(clusterName);
1394+
if (skoIngress != null) {
1395+
info.getIngresses().replace(clusterName, skoIngress, i);
1396+
}
1397+
break;
1398+
case "DELETED":
1399+
V1beta1Ingress oldIngress = info.getIngresses().remove(clusterName);
1400+
if (oldIngress != null) {
1401+
// Ingress was deleted, but sko still contained a non-null entry
1402+
LOGGER.info(MessageKeys.INGRESS_DELETED, domainUID, metadata.getNamespace(), clusterName);
1403+
doCheckAndCreateDomainPresence(info.getDomain(), true);
1404+
}
1405+
break;
1406+
1407+
case "ERROR":
1408+
default:
13771409
}
13781410
}
13791411
}
1380-
break;
1381-
1382-
case "ERROR":
1383-
default:
1384-
}
1412+
}
13851413
}
13861414

13871415
/**

src/main/java/oracle/kubernetes/operator/helpers/DomainPresenceInfo.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55

66
import java.util.Collection;
77
import java.util.List;
8-
import java.util.Map;
98
import java.util.concurrent.ConcurrentHashMap;
9+
import java.util.concurrent.ConcurrentMap;
1010
import java.util.concurrent.atomic.AtomicReference;
1111

1212
import org.joda.time.DateTime;
@@ -28,8 +28,8 @@
2828
*/
2929
public class DomainPresenceInfo {
3030
private final AtomicReference<Domain> domain;
31-
private final Map<String, ServerKubernetesObjects> servers = new ConcurrentHashMap<>();
32-
private final Map<String, V1beta1Ingress> ingresses = new ConcurrentHashMap<>();
31+
private final ConcurrentMap<String, ServerKubernetesObjects> servers = new ConcurrentHashMap<>();
32+
private final ConcurrentMap<String, V1beta1Ingress> ingresses = new ConcurrentHashMap<>();
3333
private final AtomicReference<Collection<ServerStartupInfo>> serverStartupInfo;
3434

3535
private V1PersistentVolumeClaimList claims = null;
@@ -114,15 +114,15 @@ public void setDomain(Domain domain) {
114114
* Map from server name to server objects (Pods and Services)
115115
* @return Server object map
116116
*/
117-
public Map<String, ServerKubernetesObjects> getServers() {
117+
public ConcurrentMap<String, ServerKubernetesObjects> getServers() {
118118
return servers;
119119
}
120120

121121
/**
122122
* Map from cluster name to Ingress
123123
* @return Cluster object map
124124
*/
125-
public Map<String, V1beta1Ingress> getIngresses() {
125+
public ConcurrentMap<String, V1beta1Ingress> getIngresses() {
126126
return ingresses;
127127
}
128128

src/main/java/oracle/kubernetes/operator/helpers/ServerKubernetesObjects.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33

44
package oracle.kubernetes.operator.helpers;
55

6-
import java.util.HashMap;
7-
import java.util.Map;
6+
import java.util.concurrent.ConcurrentHashMap;
7+
import java.util.concurrent.ConcurrentMap;
88
import java.util.concurrent.atomic.AtomicReference;
99

1010
import io.kubernetes.client.models.V1Pod;
@@ -17,7 +17,7 @@
1717
public class ServerKubernetesObjects {
1818
private final AtomicReference<V1Pod> pod = new AtomicReference<>(null);
1919
private final AtomicReference<V1Service> service = new AtomicReference<>(null);
20-
private Map<String, V1Service> channels = null;
20+
private ConcurrentMap<String, V1Service> channels = null;
2121

2222
/**
2323
* The Pod
@@ -39,9 +39,9 @@ public AtomicReference<V1Service> getService() {
3939
* Channel map
4040
* @return Map from channel name to Service
4141
*/
42-
public Map<String, V1Service> getChannels() {
42+
public ConcurrentMap<String, V1Service> getChannels() {
4343
if (channels == null) {
44-
channels = new HashMap<String, V1Service>();
44+
channels = new ConcurrentHashMap<String, V1Service>();
4545
}
4646
return channels;
4747
}

0 commit comments

Comments
 (0)