Skip to content

Commit b844b00

Browse files
authored
Merge pull request #527 from oracle/fix-take3
Reduce code paths through delete, watches, and recheck
2 parents d6b4d1b + 0214ea5 commit b844b00

File tree

11 files changed

+162
-110
lines changed

11 files changed

+162
-110
lines changed

operator/src/main/java/oracle/kubernetes/operator/DomainProcessor.java

Lines changed: 35 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import io.kubernetes.client.models.V1Service;
1313
import io.kubernetes.client.models.V1beta1Ingress;
1414
import io.kubernetes.client.util.Watch;
15-
import java.util.Map;
1615
import java.util.concurrent.ScheduledFuture;
1716
import java.util.concurrent.TimeUnit;
1817
import java.util.concurrent.atomic.AtomicInteger;
@@ -82,11 +81,11 @@ static void dispatchPodWatch(Watch.Response<V1Pod> item) {
8281
case "DELETED":
8382
sko.getLastKnownStatus().set(WebLogicConstants.SHUTDOWN_STATE);
8483
V1Pod oldPod = sko.getPod().getAndSet(null);
85-
if (oldPod != null) {
84+
if (oldPod != null && !info.isDeleting()) {
8685
// Pod was deleted, but sko still contained a non-null entry
8786
LOGGER.info(
8887
MessageKeys.POD_DELETED, domainUID, metadata.getNamespace(), serverName);
89-
makeRightDomainPresence(info, info.getDomain(), false, false, true);
88+
makeRightDomainPresence(info, domainUID, info.getDomain(), false, false, true);
9089
}
9190
break;
9291

@@ -150,14 +149,14 @@ static void dispatchServiceWatch(Watch.Response<V1Service> item) {
150149
if (sko != null) {
151150
if (channelName != null) {
152151
V1Service oldService = sko.getChannels().remove(channelName);
153-
if (oldService != null) {
152+
if (oldService != null && !info.isDeleting()) {
154153
// Service was deleted, but sko still contained a non-null entry
155154
LOGGER.info(
156155
MessageKeys.SERVER_SERVICE_DELETED,
157156
domainUID,
158157
metadata.getNamespace(),
159158
serverName);
160-
makeRightDomainPresence(info, info.getDomain(), false, false, true);
159+
makeRightDomainPresence(info, domainUID, info.getDomain(), false, false, true);
161160
}
162161
} else {
163162
V1Service oldService = sko.getService().getAndSet(null);
@@ -168,7 +167,7 @@ static void dispatchServiceWatch(Watch.Response<V1Service> item) {
168167
domainUID,
169168
metadata.getNamespace(),
170169
serverName);
171-
makeRightDomainPresence(info, info.getDomain(), false, false, true);
170+
makeRightDomainPresence(info, domainUID, info.getDomain(), false, false, true);
172171
}
173172
}
174173
} else if (clusterName != null) {
@@ -180,7 +179,7 @@ static void dispatchServiceWatch(Watch.Response<V1Service> item) {
180179
domainUID,
181180
metadata.getNamespace(),
182181
clusterName);
183-
makeRightDomainPresence(info, info.getDomain(), false, false, true);
182+
makeRightDomainPresence(info, domainUID, info.getDomain(), false, false, true);
184183
}
185184
}
186185
break;
@@ -214,11 +213,11 @@ static void dispatchIngressWatch(Watch.Response<V1beta1Ingress> item) {
214213
break;
215214
case "DELETED":
216215
V1beta1Ingress oldIngress = info.getIngresses().remove(clusterName);
217-
if (oldIngress != null) {
216+
if (oldIngress != null && !info.isDeleting()) {
218217
// Ingress was deleted, but sko still contained a non-null entry
219218
LOGGER.info(
220219
MessageKeys.INGRESS_DELETED, domainUID, metadata.getNamespace(), clusterName);
221-
makeRightDomainPresence(info, info.getDomain(), false, false, true);
220+
makeRightDomainPresence(info, domainUID, info.getDomain(), false, false, true);
222221
}
223222
break;
224223

@@ -288,22 +287,30 @@ static void dispatchDomainWatch(Watch.Response<Domain> item) {
288287
Domain d;
289288
String domainUID;
290289
DomainPresenceInfo existing;
290+
boolean added = false;
291291
switch (item.type) {
292292
case "ADDED":
293+
added = true;
293294
case "MODIFIED":
294295
d = item.object;
295296
domainUID = d.getSpec().getDomainUID();
296297
LOGGER.info(MessageKeys.WATCH_DOMAIN, domainUID);
297298
existing = DomainPresenceInfoManager.lookup(domainUID);
298-
makeRightDomainPresence(existing, d, false, false, true);
299+
if (existing != null && added) {
300+
existing.setDeleting(false);
301+
}
302+
makeRightDomainPresence(existing, domainUID, d, added, false, true);
299303
break;
300304

301305
case "DELETED":
302306
d = item.object;
303307
domainUID = d.getSpec().getDomainUID();
304308
LOGGER.info(MessageKeys.WATCH_DOMAIN_DELETED, domainUID);
305309
existing = DomainPresenceInfoManager.lookup(domainUID);
306-
makeRightDomainPresence(existing, d, false, true, true);
310+
if (existing != null) {
311+
existing.setDeleting(true);
312+
}
313+
makeRightDomainPresence(existing, domainUID, d, true, true, true);
307314
break;
308315

309316
case "ERROR":
@@ -427,15 +434,22 @@ public void onThrowable(Packet packet, Throwable throwable) {
427434

428435
static void makeRightDomainPresence(
429436
DomainPresenceInfo existing,
437+
String domainUID,
430438
Domain dom,
431439
boolean explicitRecheck,
432440
boolean isDeleting,
433441
boolean isWillInterrupt) {
434442
LOGGER.entering();
435443

436-
DomainSpec spec = dom.getSpec();
437-
DomainPresenceControl.normalizeDomainSpec(spec);
438-
String domainUID = spec.getDomainUID();
444+
DomainSpec spec = null;
445+
String ns;
446+
if (dom != null) {
447+
spec = dom.getSpec();
448+
DomainPresenceControl.normalizeDomainSpec(spec);
449+
ns = dom.getMetadata().getNamespace();
450+
} else {
451+
ns = existing.getNamespace();
452+
}
439453

440454
if (existing != null) {
441455
Domain current = existing.getDomain();
@@ -446,7 +460,7 @@ static void makeRightDomainPresence(
446460
return;
447461
}
448462
// Has the spec actually changed? We will get watch events for status updates
449-
if (!explicitRecheck && spec.equals(current.getSpec())) {
463+
if (!explicitRecheck && spec != null && spec.equals(current.getSpec())) {
450464
// nothing in the spec has changed, but status likely did; update current
451465
existing.setDomain(dom);
452466
LOGGER.fine(MessageKeys.NOT_STARTING_DOMAINUID_THREAD, domainUID);
@@ -455,8 +469,7 @@ static void makeRightDomainPresence(
455469
}
456470
}
457471

458-
internalMakeRightDomainPresence(
459-
existing, dom, domainUID, dom.getMetadata().getNamespace(), isDeleting, isWillInterrupt);
472+
internalMakeRightDomainPresence(existing, dom, domainUID, ns, isDeleting, isWillInterrupt);
460473
}
461474

462475
private static void internalMakeRightDomainPresence(
@@ -469,7 +482,9 @@ private static void internalMakeRightDomainPresence(
469482
if (isDeleting || !Main.isNamespaceStopping(ns).get()) {
470483
LOGGER.info(MessageKeys.PROCESSING_DOMAIN, domainUID);
471484
Step.StepAndPacket plan =
472-
isDeleting ? createDomainDownPlan(existing, ns, domainUID) : createDomainUpPlan(dom, ns);
485+
isDeleting || (existing != null && existing.isDeleting())
486+
? createDomainDownPlan(existing, ns, domainUID)
487+
: createDomainUpPlan(dom, ns);
473488

474489
runDomainPlan(dom, domainUID, plan, isDeleting, isWillInterrupt);
475490
}
@@ -580,7 +595,7 @@ public NextAction apply(Packet packet) {
580595
.getComponents()
581596
.put(
582597
ProcessingConstants.DOMAIN_COMPONENT_NAME,
583-
Component.createFor(info, Main.getVersion(), pw));
598+
Component.createFor(info, Main.getVersion(), PodAwaiterStepFactory.class, pw));
584599
packet.put(ProcessingConstants.PRINCIPAL, Main.getPrincipal());
585600
scheduleDomainStatusUpdating(info);
586601
return doNext(packet);
@@ -604,7 +619,7 @@ public NextAction apply(Packet packet) {
604619
.getComponents()
605620
.put(
606621
ProcessingConstants.DOMAIN_COMPONENT_NAME,
607-
Component.createFor(info, Main.getVersion(), pw));
622+
Component.createFor(info, Main.getVersion(), PodAwaiterStepFactory.class, pw));
608623
packet.put(ProcessingConstants.PRINCIPAL, Main.getPrincipal());
609624
return doNext(packet);
610625
}
@@ -637,31 +652,4 @@ private static Step bringAdminServerUp(Domain dom, Step next) {
637652
private static Step bringManagedServersUp(Step next) {
638653
return new ManagedServersUpStep(next);
639654
}
640-
641-
static void deleteStrandedResources() {
642-
for (Map.Entry<String, DomainPresenceInfo> entry :
643-
DomainPresenceInfoManager.getDomainPresenceInfos().entrySet()) {
644-
String domainUID = entry.getKey();
645-
DomainPresenceInfo existing = entry.getValue();
646-
if (existing != null) {
647-
Domain current = existing.getDomain();
648-
internalMakeRightDomainPresence(
649-
existing, current, domainUID, existing.getNamespace(), true, false);
650-
}
651-
}
652-
}
653-
654-
static void deleteDomainPresence(Domain dom) {
655-
String domainUID = dom.getSpec().getDomainUID();
656-
DomainPresenceInfo existing = DomainPresenceInfoManager.lookup(domainUID);
657-
Domain current = null;
658-
if (existing != null) {
659-
current = existing.getDomain();
660-
if (current != null && isOutdatedWatchEvent(current.getMetadata(), dom.getMetadata())) {
661-
return;
662-
}
663-
}
664-
internalMakeRightDomainPresence(
665-
existing, current, domainUID, dom.getMetadata().getNamespace(), true, true);
666-
}
667655
}

operator/src/main/java/oracle/kubernetes/operator/Main.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -205,8 +205,6 @@ private static void begin() {
205205
}
206206

207207
private static void completeBegin() {
208-
DomainProcessor.deleteStrandedResources();
209-
210208
try {
211209
// start the REST server
212210
startRestServer(principal, isNamespaceStopping.keySet());
@@ -312,7 +310,7 @@ private static Runnable recheckDomains() {
312310
namespacesToStop.removeAll(targetNamespaces);
313311
stopNamespaces(namespacesToStop);
314312

315-
runSteps(new StartNamespacesStep(targetNamespaces), DomainProcessor::deleteStrandedResources);
313+
runSteps(new StartNamespacesStep(targetNamespaces));
316314
};
317315
}
318316

@@ -539,20 +537,18 @@ public NextAction onSuccess(Packet packet, CallResponse<DomainList> callResponse
539537
// Update domain here if namespace is not yet running
540538
info.setDomain(dom);
541539
}
542-
DomainProcessor.makeRightDomainPresence(info, dom, true, false, false);
540+
DomainProcessor.makeRightDomainPresence(info, domainUID, dom, true, false, false);
543541
}
544542
}
545543

546544
getDomainPresenceInfos()
547545
.forEach(
548546
(key, value) -> {
549-
Domain d = value.getDomain();
550-
if (d != null && ns.equals(d.getMetadata().getNamespace())) {
551-
if (!domainUIDs.contains(d.getSpec().getDomainUID())) {
552-
// This is a stranded DomainPresenceInfo. Clear the Domain reference
553-
// so that stranded resources are marked for clean-up.
554-
value.setDomain(null);
555-
}
547+
if (!domainUIDs.contains(key)) {
548+
// This is a stranded DomainPresenceInfo.
549+
value.setDeleting(true);
550+
Domain dom = value.getDomain();
551+
DomainProcessor.makeRightDomainPresence(value, key, dom, true, true, false);
556552
}
557553
});
558554

operator/src/main/java/oracle/kubernetes/operator/PodWatcher.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@
3434
import oracle.kubernetes.operator.work.Step;
3535

3636
/** Watches for Pods to become Ready or leave Ready state */
37-
public class PodWatcher extends Watcher<V1Pod> implements WatchListener<V1Pod> {
37+
public class PodWatcher extends Watcher<V1Pod>
38+
implements WatchListener<V1Pod>, PodAwaiterStepFactory {
3839
private static final LoggingFacade LOGGER = LoggingFactory.getLogger("Operator", "Operator");
3940

4041
private final String ns;

operator/src/main/java/oracle/kubernetes/operator/helpers/DomainPresenceInfo.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import java.util.concurrent.ConcurrentHashMap;
1717
import java.util.concurrent.ConcurrentMap;
1818
import java.util.concurrent.ScheduledFuture;
19+
import java.util.concurrent.atomic.AtomicBoolean;
1920
import java.util.concurrent.atomic.AtomicReference;
2021
import oracle.kubernetes.operator.wlsconfig.WlsDomainConfig;
2122
import oracle.kubernetes.operator.wlsconfig.WlsServerConfig;
@@ -31,6 +32,7 @@
3132
public class DomainPresenceInfo {
3233
private final String namespace;
3334
private final AtomicReference<Domain> domain;
35+
private final AtomicBoolean isDeleting = new AtomicBoolean(false);
3436
private final AtomicReference<ScheduledFuture<?>> statusUpdater;
3537
private final AtomicReference<Collection<ServerStartupInfo>> serverStartupInfo;
3638

@@ -68,6 +70,14 @@ public DomainPresenceInfo(Domain domain) {
6870
this.statusUpdater = new AtomicReference<>(null);
6971
}
7072

73+
public boolean isDeleting() {
74+
return isDeleting.get();
75+
}
76+
77+
public void setDeleting(boolean deleting) {
78+
isDeleting.set(deleting);
79+
}
80+
7181
/**
7282
* Claims associated with the domain
7383
*

operator/src/main/java/oracle/kubernetes/operator/helpers/ServiceHelper.java

Lines changed: 69 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,12 @@
2020
import io.kubernetes.client.models.V1ServicePort;
2121
import io.kubernetes.client.models.V1ServiceSpec;
2222
import io.kubernetes.client.models.V1Status;
23+
import java.util.ArrayList;
24+
import java.util.Collection;
2325
import java.util.Collections;
2426
import java.util.List;
2527
import java.util.Map;
28+
import java.util.concurrent.ConcurrentMap;
2629
import javax.annotation.Nonnull;
2730
import oracle.kubernetes.operator.LabelConstants;
2831
import oracle.kubernetes.operator.ProcessingConstants;
@@ -407,14 +410,41 @@ public NextAction onSuccess(Packet packet, CallResponse<V1Service> callResponse)
407410
}
408411

409412
/**
410-
* Factory for {@link Step} that deletes per-managed server service
413+
* Factory for {@link Step} that deletes per-managed server and channel services
411414
*
412415
* @param sko Server Kubernetes Objects
413416
* @param next Next processing step
414-
* @return Step for deleting per-managed server service
417+
* @return Step for deleting per-managed server and channel services
415418
*/
416-
public static Step deleteServiceStep(ServerKubernetesObjects sko, Step next) {
417-
return new DeleteServiceStep(sko, next);
419+
public static Step deleteServicesStep(ServerKubernetesObjects sko, Step next) {
420+
return new DeleteServicesIteratorStep(sko, next);
421+
}
422+
423+
private static class DeleteServicesIteratorStep extends Step {
424+
private final ServerKubernetesObjects sko;
425+
426+
DeleteServicesIteratorStep(ServerKubernetesObjects sko, Step next) {
427+
super(next);
428+
this.sko = sko;
429+
}
430+
431+
@Override
432+
public NextAction apply(Packet packet) {
433+
Collection<StepAndPacket> startDetails = new ArrayList<>();
434+
435+
startDetails.add(new StepAndPacket(new DeleteServiceStep(sko, null), packet.clone()));
436+
ConcurrentMap<String, V1Service> channels = sko.getChannels();
437+
for (Map.Entry<String, V1Service> entry : channels.entrySet()) {
438+
startDetails.add(
439+
new StepAndPacket(
440+
new DeleteChannelServiceStep(channels, entry.getKey(), null), packet.clone()));
441+
}
442+
443+
if (startDetails.isEmpty()) {
444+
return doNext(packet);
445+
}
446+
return doForkJoin(getNext(), packet, startDetails);
447+
}
418448
}
419449

420450
private static class DeleteServiceStep extends Step {
@@ -449,6 +479,41 @@ private V1Service removeServiceFromRecord() {
449479
}
450480
}
451481

482+
private static class DeleteChannelServiceStep extends Step {
483+
private final ConcurrentMap<String, V1Service> channels;
484+
private final String channelName;
485+
486+
DeleteChannelServiceStep(
487+
ConcurrentMap<String, V1Service> channels, String channelName, Step next) {
488+
super(next);
489+
this.channels = channels;
490+
this.channelName = channelName;
491+
}
492+
493+
@Override
494+
public NextAction apply(Packet packet) {
495+
DomainPresenceInfo info = packet.getSPI(DomainPresenceInfo.class);
496+
V1Service oldService = removeServiceFromRecord();
497+
498+
if (oldService != null) {
499+
return doNext(
500+
deleteService(oldService.getMetadata().getName(), info.getNamespace()), packet);
501+
}
502+
return doNext(packet);
503+
}
504+
505+
Step deleteService(String name, String namespace) {
506+
V1DeleteOptions deleteOptions = new V1DeleteOptions();
507+
return new CallBuilder()
508+
.deleteServiceAsync(name, namespace, deleteOptions, new DefaultResponseStep<>(getNext()));
509+
}
510+
511+
// Set service to null so that watcher doesn't try to recreate service
512+
private V1Service removeServiceFromRecord() {
513+
return channels.remove(channelName);
514+
}
515+
}
516+
452517
/**
453518
* Create asynchronous step for internal cluster service
454519
*

0 commit comments

Comments
 (0)