24
24
import java .util .StringTokenizer ;
25
25
import java .util .TreeSet ;
26
26
import java .util .concurrent .ConcurrentHashMap ;
27
+ import java .util .concurrent .ConcurrentMap ;
27
28
import java .util .concurrent .ScheduledExecutorService ;
28
29
import java .util .concurrent .ThreadFactory ;
29
30
import java .util .concurrent .TimeUnit ;
@@ -101,7 +102,7 @@ private static ThreadFactory getThreadFactory() {
101
102
102
103
static final Engine engine = new Engine (wrappedExecutorService );
103
104
104
- static final Map <String , AtomicBoolean > isNamespaceStopping = new ConcurrentHashMap <>();
105
+ static final ConcurrentMap <String , AtomicBoolean > isNamespaceStopping = new ConcurrentHashMap <>();
105
106
106
107
private static final Map <String , ConfigMapWatcher > configMapWatchers = new ConcurrentHashMap <>();
107
108
private static final Map <String , DomainWatcher > domainWatchers = new ConcurrentHashMap <>();
@@ -121,12 +122,12 @@ private static ThreadFactory getThreadFactory() {
121
122
static final String READINESS_PROBE_FAILURE_EVENT_FILTER =
122
123
"reason=Unhealthy,type=Warning,involvedObject.fieldPath=spec.containers{weblogic-server}" ;
123
124
124
- static Map <String , DomainPresenceInfo > getDomainPresenceInfos () {
125
- return DomainPresenceInfoManager .getDomainPresenceInfos ();
125
+ static Map <String , DomainPresenceInfo > getDomainPresenceInfos (String ns ) {
126
+ return DomainPresenceInfoManager .getDomainPresenceInfos (ns );
126
127
}
127
128
128
- static ServerKubernetesObjects getKubernetesObjects (String serverLegalName ) {
129
- return ServerKubernetesObjectsManager .lookup ( serverLegalName );
129
+ static Map < String , ServerKubernetesObjects > getKubernetesObjects (String ns ) {
130
+ return ServerKubernetesObjectsManager .getServerKubernetesObjects ( ns );
130
131
}
131
132
132
133
/**
@@ -261,7 +262,7 @@ private static class StartNamespaceBeforeStep extends Step {
261
262
262
263
@ Override
263
264
public NextAction apply (Packet packet ) {
264
- if (isNamespaceStopping . putIfAbsent (ns , new AtomicBoolean ( false )) == null ) {
265
+ if (isNamespaceStopping (ns ). get () ) {
265
266
try {
266
267
HealthCheckHelper .performSecurityChecks (version , operatorNamespace , ns );
267
268
} catch (Throwable e ) {
@@ -283,10 +284,8 @@ private static void stopNamespaces(Collection<String> namespacesToStop) {
283
284
}
284
285
}
285
286
286
- private static final AtomicBoolean UNINITIALIZED_NS_STOPPING = new AtomicBoolean (true );
287
-
288
287
static AtomicBoolean isNamespaceStopping (String ns ) {
289
- return isNamespaceStopping .getOrDefault (ns , UNINITIALIZED_NS_STOPPING );
288
+ return isNamespaceStopping .computeIfAbsent (ns , ( key ) -> new AtomicBoolean ( true ) );
290
289
}
291
290
292
291
static void runSteps (Step firstStep ) {
@@ -527,30 +526,30 @@ public NextAction onFailure(Packet packet, CallResponse<DomainList> callResponse
527
526
528
527
@ Override
529
528
public NextAction onSuccess (Packet packet , CallResponse <DomainList > callResponse ) {
529
+ boolean starting = isNamespaceStopping .putIfAbsent (ns , new AtomicBoolean (false )) == null ;
530
+
530
531
Set <String > domainUIDs = new HashSet <>();
531
532
if (callResponse .getResult () != null ) {
532
533
for (Domain dom : callResponse .getResult ().getItems ()) {
533
534
String domainUID = dom .getSpec ().getDomainUID ();
534
535
domainUIDs .add (domainUID );
535
536
DomainPresenceInfo info = DomainPresenceInfoManager .getOrCreate (dom );
536
- if (isNamespaceStopping ( dom . getMetadata (). getNamespace ()). get () ) {
537
+ if (starting ) {
537
538
// Update domain here if namespace is not yet running
538
539
info .setDomain (dom );
539
540
}
540
541
DomainProcessor .makeRightDomainPresence (info , domainUID , dom , true , false , false );
541
542
}
542
543
}
543
544
544
- getDomainPresenceInfos ()
545
+ getDomainPresenceInfos (ns )
545
546
.forEach (
546
547
(key , value ) -> {
547
548
if (!domainUIDs .contains (key )) {
548
- if (ns .equals (value .getNamespace ())) {
549
- // This is a stranded DomainPresenceInfo.
550
- value .setDeleting (true );
551
- Domain dom = value .getDomain ();
552
- DomainProcessor .makeRightDomainPresence (value , key , dom , true , true , false );
553
- }
549
+ // This is a stranded DomainPresenceInfo.
550
+ value .setDeleting (true );
551
+ Domain dom = value .getDomain ();
552
+ DomainProcessor .makeRightDomainPresence (value , key , dom , true , true , false );
554
553
}
555
554
});
556
555
@@ -592,23 +591,66 @@ public NextAction onFailure(Packet packet, CallResponse<V1ServiceList> callRespo
592
591
@ Override
593
592
public NextAction onSuccess (Packet packet , CallResponse <V1ServiceList > callResponse ) {
594
593
V1ServiceList result = callResponse .getResult ();
594
+
595
+ Set <String > serviceNames = new HashSet <>();
596
+ Set <String > channelNames = new HashSet <>();
597
+ Set <String > clusterNames = new HashSet <>();
595
598
if (result != null ) {
596
599
for (V1Service service : result .getItems ()) {
597
600
String domainUID = ServiceWatcher .getServiceDomainUID (service );
598
601
String serverName = ServiceWatcher .getServiceServerName (service );
599
602
String channelName = ServiceWatcher .getServiceChannelName (service );
600
- if (domainUID != null && serverName != null ) {
603
+ String clusterName = ServiceWatcher .getServiceClusterName (service );
604
+ if (domainUID != null ) {
601
605
DomainPresenceInfo info = DomainPresenceInfoManager .getOrCreate (ns , domainUID );
602
- ServerKubernetesObjects sko =
603
- ServerKubernetesObjectsManager .getOrCreate (info , domainUID , serverName );
604
- if (channelName != null ) {
605
- sko .getChannels ().put (channelName , service );
606
- } else {
607
- sko .getService ().set (service );
606
+ if (clusterName != null ) {
607
+ clusterNames .add (clusterName );
608
+ info .getClusters ().put (clusterName , service );
609
+ } else if (serverName != null ) {
610
+ ServerKubernetesObjects sko =
611
+ ServerKubernetesObjectsManager .getOrCreate (info , domainUID , serverName );
612
+ if (channelName != null ) {
613
+ channelNames .add (channelName );
614
+ sko .getChannels ().put (channelName , service );
615
+ } else {
616
+ serviceNames .add (service .getMetadata ().getName ());
617
+ sko .getService ().set (service );
618
+ }
608
619
}
609
620
}
610
621
}
611
622
}
623
+
624
+ getDomainPresenceInfos (ns )
625
+ .forEach (
626
+ (key , value ) -> {
627
+ ConcurrentMap <String , V1Service > map = value .getClusters ();
628
+ map .forEach (
629
+ (ckey , cvalue ) -> {
630
+ map .compute (
631
+ ckey ,
632
+ (k , current ) -> {
633
+ return clusterNames .contains (ckey ) ? current : null ;
634
+ });
635
+ });
636
+ });
637
+ getKubernetesObjects (ns )
638
+ .forEach (
639
+ (key , value ) -> {
640
+ if (!serviceNames .contains (key )) {
641
+ value .getService ().set (null );
642
+ }
643
+ ConcurrentMap <String , V1Service > map = value .getChannels ();
644
+ map .forEach (
645
+ (ckey , cvalue ) -> {
646
+ map .compute (
647
+ ckey ,
648
+ (k , current ) -> {
649
+ return channelNames .contains (ckey ) ? current : null ;
650
+ });
651
+ });
652
+ });
653
+
612
654
if (!serviceWatchers .containsKey (ns )) {
613
655
serviceWatchers .put (ns , createServiceWatcher (ns , getInitialResourceVersion (result )));
614
656
}
@@ -667,18 +709,30 @@ public NextAction onFailure(Packet packet, CallResponse<V1PodList> callResponse)
667
709
@ Override
668
710
public NextAction onSuccess (Packet packet , CallResponse <V1PodList > callResponse ) {
669
711
V1PodList result = callResponse .getResult ();
712
+
713
+ Set <String > podNames = new HashSet <>();
670
714
if (result != null ) {
671
715
for (V1Pod pod : result .getItems ()) {
672
716
String domainUID = PodWatcher .getPodDomainUID (pod );
673
717
String serverName = PodWatcher .getPodServerName (pod );
674
718
if (domainUID != null && serverName != null ) {
719
+ podNames .add (pod .getMetadata ().getName ());
675
720
DomainPresenceInfo info = DomainPresenceInfoManager .getOrCreate (ns , domainUID );
676
721
ServerKubernetesObjects sko =
677
722
ServerKubernetesObjectsManager .getOrCreate (info , domainUID , serverName );
678
723
sko .getPod ().set (pod );
679
724
}
680
725
}
681
726
}
727
+
728
+ getKubernetesObjects (ns )
729
+ .forEach (
730
+ (key , value ) -> {
731
+ if (!podNames .contains (key )) {
732
+ value .getPod ().set (null );
733
+ }
734
+ });
735
+
682
736
if (!podWatchers .containsKey (ns )) {
683
737
podWatchers .put (ns , createPodWatcher (ns , getInitialResourceVersion (result )));
684
738
}
0 commit comments