Skip to content

Commit e6995e5

Browse files
authored
Merge pull request #532 from oracle/fix-take5
Resolve many intermittent issues by re-reading resources
2 parents 098327d + 44b4072 commit e6995e5

22 files changed

+1198
-1354
lines changed

integration-tests/src/test/java/oracle/kubernetes/operator/ITOperator.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -296,12 +296,8 @@ private Domain testAllUseCasesForADomain(Operator operator, String domainYamlFil
296296
logger.info("Creating Domain & verifing the domain creation");
297297
// create domain1
298298
Domain domain = testDomainCreation(domainYamlFile);
299-
if (System.getenv("QUICKTEST") == null
300-
|| (System.getenv("QUICKTEST") != null
301-
&& !System.getenv("QUICKTEST").equalsIgnoreCase("true"))) {
302-
testDomainLifecyle(operator, domain);
303-
testOperatorLifecycle(operator, domain);
304-
}
299+
testDomainLifecyle(operator, domain);
300+
testOperatorLifecycle(operator, domain);
305301
testClusterScaling(operator, domain);
306302
return domain;
307303
}

operator/src/main/java/oracle/kubernetes/operator/DomainProcessor.java

Lines changed: 12 additions & 802 deletions
Large diffs are not rendered by default.

operator/src/main/java/oracle/kubernetes/operator/DomainProcessorImpl.java

Lines changed: 1008 additions & 0 deletions
Large diffs are not rendered by default.

operator/src/main/java/oracle/kubernetes/operator/Main.java

Lines changed: 62 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,10 @@
3535
import oracle.kubernetes.operator.helpers.CallBuilderFactory;
3636
import oracle.kubernetes.operator.helpers.ConfigMapHelper;
3737
import oracle.kubernetes.operator.helpers.DomainPresenceInfo;
38-
import oracle.kubernetes.operator.helpers.DomainPresenceInfoManager;
3938
import oracle.kubernetes.operator.helpers.HealthCheckHelper;
4039
import oracle.kubernetes.operator.helpers.HealthCheckHelper.KubernetesVersion;
4140
import oracle.kubernetes.operator.helpers.ResponseStep;
4241
import oracle.kubernetes.operator.helpers.ServerKubernetesObjects;
43-
import oracle.kubernetes.operator.helpers.ServerKubernetesObjectsManager;
4442
import oracle.kubernetes.operator.logging.LoggingFacade;
4543
import oracle.kubernetes.operator.logging.LoggingFactory;
4644
import oracle.kubernetes.operator.logging.MessageKeys;
@@ -67,7 +65,7 @@ private static ThreadFactory getThreadFactory() {
6765

6866
private static final LoggingFacade LOGGER = LoggingFactory.getLogger("Operator", "Operator");
6967

70-
private static final String NS_STARTING_NOW = "NS_STARTING_NOW";
68+
private static final String DPI_MAP = "DPI_MAP";
7169

7270
static final TuningParameters tuningAndConfig;
7371

@@ -103,6 +101,7 @@ private static ThreadFactory getThreadFactory() {
103101
}
104102

105103
static final Engine engine = new Engine(wrappedExecutorService);
104+
private static final DomainProcessor processor = DomainProcessor.getInstance();
106105

107106
static final ConcurrentMap<String, AtomicBoolean> isNamespaceStarted = new ConcurrentHashMap<>();
108107
static final ConcurrentMap<String, AtomicBoolean> isNamespaceStopping = new ConcurrentHashMap<>();
@@ -125,14 +124,6 @@ private static ThreadFactory getThreadFactory() {
125124
static final String READINESS_PROBE_FAILURE_EVENT_FILTER =
126125
"reason=Unhealthy,type=Warning,involvedObject.fieldPath=spec.containers{weblogic-server}";
127126

128-
static Map<String, DomainPresenceInfo> getDomainPresenceInfos(String ns) {
129-
return DomainPresenceInfoManager.getDomainPresenceInfos(ns);
130-
}
131-
132-
static Map<String, ServerKubernetesObjects> getKubernetesObjects(String ns) {
133-
return ServerKubernetesObjectsManager.getServerKubernetesObjects(ns);
134-
}
135-
136127
/**
137128
* Entry point
138129
*
@@ -266,10 +257,7 @@ private static class StartNamespaceBeforeStep extends Step {
266257
@Override
267258
public NextAction apply(Packet packet) {
268259
AtomicBoolean a = isNamespaceStarted.computeIfAbsent(ns, (key) -> new AtomicBoolean(false));
269-
boolean startingNow = !a.getAndSet(true);
270-
packet.put(NS_STARTING_NOW, (Boolean) startingNow);
271-
272-
if (startingNow) {
260+
if (!a.getAndSet(true)) {
273261
try {
274262
HealthCheckHelper.performSecurityChecks(version, operatorNamespace, ns);
275263
} catch (Throwable e) {
@@ -323,6 +311,7 @@ private static Runnable recheckDomains() {
323311

324312
static Step readExistingResources(String operatorNamespace, String ns) {
325313
return Step.chain(
314+
new ReadExistingResourcesBeforeStep(),
326315
ConfigMapHelper.createScriptConfigMapStep(operatorNamespace, ns),
327316
createConfigMapStep(ns),
328317
readExistingPods(ns),
@@ -332,6 +321,15 @@ static Step readExistingResources(String operatorNamespace, String ns) {
332321
readExistingDomains(ns));
333322
}
334323

324+
private static class ReadExistingResourcesBeforeStep extends Step {
325+
@SuppressWarnings("rawtypes")
326+
@Override
327+
public NextAction apply(Packet packet) {
328+
packet.put(DPI_MAP, new ConcurrentHashMap());
329+
return doNext(packet);
330+
}
331+
}
332+
335333
private static Step readExistingDomains(String ns) {
336334
LOGGER.info(MessageKeys.LISTING_DOMAINS);
337335
return callBuilderFactory.create().listDomainAsync(ns, new DomainListStep(ns));
@@ -363,7 +361,7 @@ private static Step readExistingPods(String ns) {
363361

364362
private static ConfigMapAfterStep createConfigMapStep(String ns) {
365363
return new ConfigMapAfterStep(
366-
ns, configMapWatchers, isNamespaceStopping(ns), DomainProcessor::dispatchConfigMapWatch);
364+
ns, configMapWatchers, isNamespaceStopping(ns), processor::dispatchConfigMapWatch);
367365
}
368366

369367
// -----------------------------------------------------------------------------
@@ -434,7 +432,7 @@ private static EventWatcher createEventWatcher(String ns, String initialResource
434432
ns,
435433
READINESS_PROBE_FAILURE_EVENT_FILTER,
436434
initialResourceVersion,
437-
DomainProcessor::dispatchEventWatch,
435+
processor::dispatchEventWatch,
438436
isNamespaceStopping(ns));
439437
}
440438

@@ -443,7 +441,7 @@ private static PodWatcher createPodWatcher(String ns, String initialResourceVers
443441
getThreadFactory(),
444442
ns,
445443
initialResourceVersion,
446-
DomainProcessor::dispatchPodWatch,
444+
processor::dispatchPodWatch,
447445
isNamespaceStopping(ns));
448446
}
449447

@@ -452,7 +450,7 @@ private static ServiceWatcher createServiceWatcher(String ns, String initialReso
452450
getThreadFactory(),
453451
ns,
454452
initialResourceVersion,
455-
DomainProcessor::dispatchServiceWatch,
453+
processor::dispatchServiceWatch,
456454
isNamespaceStopping(ns));
457455
}
458456

@@ -461,7 +459,7 @@ private static IngressWatcher createIngressWatcher(String ns, String initialReso
461459
getThreadFactory(),
462460
ns,
463461
initialResourceVersion,
464-
DomainProcessor::dispatchIngressWatch,
462+
processor::dispatchIngressWatch,
465463
isNamespaceStopping(ns));
466464
}
467465

@@ -495,18 +493,22 @@ public NextAction onFailure(Packet packet, CallResponse<V1beta1IngressList> call
495493

496494
@Override
497495
public NextAction onSuccess(Packet packet, CallResponse<V1beta1IngressList> callResponse) {
496+
@SuppressWarnings("unchecked")
497+
Map<String, DomainPresenceInfo> dpis = (Map<String, DomainPresenceInfo>) packet.get(DPI_MAP);
498+
498499
V1beta1IngressList result = callResponse.getResult();
499500
if (result != null) {
500501
for (V1beta1Ingress ingress : result.getItems()) {
501502
String domainUID = IngressWatcher.getIngressDomainUID(ingress);
502503
String clusterName = IngressWatcher.getIngressClusterName(ingress);
503504
if (domainUID != null && clusterName != null) {
504-
DomainPresenceInfoManager.getOrCreate(ns, domainUID)
505+
dpis.computeIfAbsent(domainUID, k -> new DomainPresenceInfo(ns, domainUID))
505506
.getIngresses()
506507
.put(clusterName, ingress);
507508
}
508509
}
509510
}
511+
510512
if (!ingressWatchers.containsKey(ns)) {
511513
ingressWatchers.put(ns, createIngressWatcher(ns, getInitialResourceVersion(result)));
512514
}
@@ -534,35 +536,41 @@ public NextAction onFailure(Packet packet, CallResponse<DomainList> callResponse
534536

535537
@Override
536538
public NextAction onSuccess(Packet packet, CallResponse<DomainList> callResponse) {
537-
Boolean startingNow = (Boolean) packet.get(NS_STARTING_NOW);
538-
if (startingNow == null) {
539-
startingNow = Boolean.TRUE;
540-
}
539+
@SuppressWarnings("unchecked")
540+
Map<String, DomainPresenceInfo> dpis = (Map<String, DomainPresenceInfo>) packet.get(DPI_MAP);
541+
542+
DomainProcessor x = packet.getSPI(DomainProcessor.class);
543+
DomainProcessor dp = x != null ? x : processor;
541544

542545
Set<String> domainUIDs = new HashSet<>();
543546
if (callResponse.getResult() != null) {
544547
for (Domain dom : callResponse.getResult().getItems()) {
545548
String domainUID = dom.getSpec().getDomainUID();
546549
domainUIDs.add(domainUID);
547-
DomainPresenceInfo info = DomainPresenceInfoManager.getOrCreate(dom);
548-
if (startingNow) {
549-
// Update domain here if namespace is not yet running
550-
info.setDomain(dom);
551-
}
552-
DomainProcessor.makeRightDomainPresence(info, domainUID, dom, true, false, false);
550+
DomainPresenceInfo info =
551+
dpis.compute(
552+
domainUID,
553+
(k, v) -> {
554+
if (v == null) {
555+
return new DomainPresenceInfo(dom);
556+
}
557+
v.setDomain(dom);
558+
return v;
559+
});
560+
info.setPopulated(true);
561+
dp.makeRightDomainPresence(info, true, false, false);
553562
}
554563
}
555564

556-
getDomainPresenceInfos(ns)
557-
.forEach(
558-
(key, value) -> {
559-
if (!domainUIDs.contains(key)) {
560-
// This is a stranded DomainPresenceInfo.
561-
value.setDeleting(true);
562-
Domain dom = value.getDomain();
563-
DomainProcessor.makeRightDomainPresence(value, key, dom, true, true, false);
564-
}
565-
});
565+
dpis.forEach(
566+
(key, value) -> {
567+
if (!domainUIDs.contains(key)) {
568+
// This is a stranded DomainPresenceInfo.
569+
value.setDeleting(true);
570+
value.setPopulated(true);
571+
dp.makeRightDomainPresence(value, true, true, false);
572+
}
573+
});
566574

567575
if (!domainWatchers.containsKey(ns)) {
568576
domainWatchers.put(
@@ -580,7 +588,7 @@ private static DomainWatcher createDomainWatcher(String ns, String initialResour
580588
getThreadFactory(),
581589
ns,
582590
initialResourceVersion,
583-
DomainProcessor::dispatchDomainWatch,
591+
processor::dispatchDomainWatch,
584592
isNamespaceStopping(ns));
585593
}
586594
}
@@ -603,65 +611,33 @@ public NextAction onFailure(Packet packet, CallResponse<V1ServiceList> callRespo
603611
public NextAction onSuccess(Packet packet, CallResponse<V1ServiceList> callResponse) {
604612
V1ServiceList result = callResponse.getResult();
605613

606-
Set<String> serviceNames = new HashSet<>();
607-
Set<String> channelNames = new HashSet<>();
608-
Set<String> clusterNames = new HashSet<>();
614+
@SuppressWarnings("unchecked")
615+
Map<String, DomainPresenceInfo> dpis = (Map<String, DomainPresenceInfo>) packet.get(DPI_MAP);
616+
609617
if (result != null) {
610618
for (V1Service service : result.getItems()) {
611619
String domainUID = ServiceWatcher.getServiceDomainUID(service);
612620
String serverName = ServiceWatcher.getServiceServerName(service);
613621
String channelName = ServiceWatcher.getServiceChannelName(service);
614622
String clusterName = ServiceWatcher.getServiceClusterName(service);
615623
if (domainUID != null) {
616-
DomainPresenceInfo info = DomainPresenceInfoManager.getOrCreate(ns, domainUID);
624+
DomainPresenceInfo info =
625+
dpis.computeIfAbsent(domainUID, k -> new DomainPresenceInfo(ns, domainUID));
617626
if (clusterName != null) {
618-
clusterNames.add(clusterName);
619627
info.getClusters().put(clusterName, service);
620628
} else if (serverName != null) {
621629
ServerKubernetesObjects sko =
622-
ServerKubernetesObjectsManager.getOrCreate(info, domainUID, serverName);
630+
info.getServers().computeIfAbsent(serverName, k -> new ServerKubernetesObjects());
623631
if (channelName != null) {
624-
channelNames.add(channelName);
625632
sko.getChannels().put(channelName, service);
626633
} else {
627-
serviceNames.add(service.getMetadata().getName());
628634
sko.getService().set(service);
629635
}
630636
}
631637
}
632638
}
633639
}
634640

635-
getDomainPresenceInfos(ns)
636-
.forEach(
637-
(key, value) -> {
638-
ConcurrentMap<String, V1Service> map = value.getClusters();
639-
map.forEach(
640-
(ckey, cvalue) -> {
641-
map.compute(
642-
ckey,
643-
(k, current) -> {
644-
return clusterNames.contains(ckey) ? current : null;
645-
});
646-
});
647-
});
648-
getKubernetesObjects(ns)
649-
.forEach(
650-
(key, value) -> {
651-
if (!serviceNames.contains(key)) {
652-
value.getService().set(null);
653-
}
654-
ConcurrentMap<String, V1Service> map = value.getChannels();
655-
map.forEach(
656-
(ckey, cvalue) -> {
657-
map.compute(
658-
ckey,
659-
(k, current) -> {
660-
return channelNames.contains(ckey) ? current : null;
661-
});
662-
});
663-
});
664-
665641
if (!serviceWatchers.containsKey(ns)) {
666642
serviceWatchers.put(ns, createServiceWatcher(ns, getInitialResourceVersion(result)));
667643
}
@@ -721,29 +697,23 @@ public NextAction onFailure(Packet packet, CallResponse<V1PodList> callResponse)
721697
public NextAction onSuccess(Packet packet, CallResponse<V1PodList> callResponse) {
722698
V1PodList result = callResponse.getResult();
723699

724-
Set<String> podNames = new HashSet<>();
700+
@SuppressWarnings("unchecked")
701+
Map<String, DomainPresenceInfo> dpis = (Map<String, DomainPresenceInfo>) packet.get(DPI_MAP);
702+
725703
if (result != null) {
726704
for (V1Pod pod : result.getItems()) {
727705
String domainUID = PodWatcher.getPodDomainUID(pod);
728706
String serverName = PodWatcher.getPodServerName(pod);
729707
if (domainUID != null && serverName != null) {
730-
podNames.add(pod.getMetadata().getName());
731-
DomainPresenceInfo info = DomainPresenceInfoManager.getOrCreate(ns, domainUID);
708+
DomainPresenceInfo info =
709+
dpis.computeIfAbsent(domainUID, k -> new DomainPresenceInfo(ns, domainUID));
732710
ServerKubernetesObjects sko =
733-
ServerKubernetesObjectsManager.getOrCreate(info, domainUID, serverName);
711+
info.getServers().computeIfAbsent(serverName, k -> new ServerKubernetesObjects());
734712
sko.getPod().set(pod);
735713
}
736714
}
737715
}
738716

739-
getKubernetesObjects(ns)
740-
.forEach(
741-
(key, value) -> {
742-
if (!podNames.contains(key)) {
743-
value.getPod().set(null);
744-
}
745-
});
746-
747717
if (!podWatchers.containsKey(ns)) {
748718
podWatchers.put(ns, createPodWatcher(ns, getInitialResourceVersion(result)));
749719
}

operator/src/main/java/oracle/kubernetes/operator/PodWatcher.java

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,10 @@
2121
import oracle.kubernetes.operator.helpers.CallBuilder;
2222
import oracle.kubernetes.operator.helpers.CallBuilderFactory;
2323
import oracle.kubernetes.operator.helpers.ResponseStep;
24-
import oracle.kubernetes.operator.helpers.ServerKubernetesObjects;
25-
import oracle.kubernetes.operator.helpers.ServerKubernetesObjectsManager;
2624
import oracle.kubernetes.operator.logging.LoggingFacade;
2725
import oracle.kubernetes.operator.logging.LoggingFactory;
2826
import oracle.kubernetes.operator.logging.MessageKeys;
2927
import oracle.kubernetes.operator.watcher.WatchListener;
30-
import oracle.kubernetes.operator.work.Container;
3128
import oracle.kubernetes.operator.work.ContainerResolver;
3229
import oracle.kubernetes.operator.work.NextAction;
3330
import oracle.kubernetes.operator.work.Packet;
@@ -93,24 +90,11 @@ public void receivedResponse(Watch.Response<V1Pod> item) {
9390
V1Pod pod = item.object;
9491
Boolean isReady = isReady(pod);
9592
String podName = pod.getMetadata().getName();
96-
Container c = ContainerResolver.getInstance().getContainer();
97-
ServerKubernetesObjects sko =
98-
ServerKubernetesObjectsManager.lookup(pod.getMetadata().getNamespace(), podName);
99-
if (sko != null) {
100-
sko.getLastKnownStatus().set(isReady ? WebLogicConstants.RUNNING_STATE : null);
101-
}
10293
if (isReady) {
103-
if (sko != null) {
104-
sko.getLastKnownStatus().set(WebLogicConstants.RUNNING_STATE);
105-
}
10694
OnReady ready = readyCallbackRegistrations.remove(podName);
10795
if (ready != null) {
10896
ready.onReady();
10997
}
110-
} else {
111-
if (sko != null) {
112-
sko.getLastKnownStatus().compareAndSet(WebLogicConstants.RUNNING_STATE, null);
113-
}
11498
}
11599
break;
116100
case "DELETED":
@@ -123,6 +107,10 @@ public void receivedResponse(Watch.Response<V1Pod> item) {
123107
LOGGER.exiting();
124108
}
125109

110+
static boolean isTerminating(V1Pod pod) {
111+
return pod.getMetadata().getDeletionTimestamp() != null;
112+
}
113+
126114
static boolean isReady(V1Pod pod) {
127115
return isReady(pod, false);
128116
}

0 commit comments

Comments
 (0)