24
24
import java .util .StringTokenizer ;
25
25
import java .util .TreeSet ;
26
26
import java .util .concurrent .ConcurrentHashMap ;
27
+ import java .util .concurrent .ConcurrentMap ;
27
28
import java .util .concurrent .ScheduledExecutorService ;
28
29
import java .util .concurrent .ThreadFactory ;
29
30
import java .util .concurrent .TimeUnit ;
@@ -66,6 +67,8 @@ private static ThreadFactory getThreadFactory() {
66
67
67
68
private static final LoggingFacade LOGGER = LoggingFactory .getLogger ("Operator" , "Operator" );
68
69
70
+ private static final String NS_STARTING_NOW = "NS_STARTING_NOW" ;
71
+
69
72
static final TuningParameters tuningAndConfig ;
70
73
71
74
static {
@@ -101,7 +104,8 @@ private static ThreadFactory getThreadFactory() {
101
104
102
105
static final Engine engine = new Engine (wrappedExecutorService );
103
106
104
- static final Map <String , AtomicBoolean > isNamespaceStopping = new ConcurrentHashMap <>();
107
+ static final ConcurrentMap <String , AtomicBoolean > isNamespaceStarted = new ConcurrentHashMap <>();
108
+ static final ConcurrentMap <String , AtomicBoolean > isNamespaceStopping = new ConcurrentHashMap <>();
105
109
106
110
private static final Map <String , ConfigMapWatcher > configMapWatchers = new ConcurrentHashMap <>();
107
111
private static final Map <String , DomainWatcher > domainWatchers = new ConcurrentHashMap <>();
@@ -121,12 +125,12 @@ private static ThreadFactory getThreadFactory() {
121
125
static final String READINESS_PROBE_FAILURE_EVENT_FILTER =
122
126
"reason=Unhealthy,type=Warning,involvedObject.fieldPath=spec.containers{weblogic-server}" ;
123
127
124
- static Map <String , DomainPresenceInfo > getDomainPresenceInfos () {
125
- return DomainPresenceInfoManager .getDomainPresenceInfos ();
128
+ static Map <String , DomainPresenceInfo > getDomainPresenceInfos (String ns ) {
129
+ return DomainPresenceInfoManager .getDomainPresenceInfos (ns );
126
130
}
127
131
128
- static ServerKubernetesObjects getKubernetesObjects (String serverLegalName ) {
129
- return ServerKubernetesObjectsManager .lookup ( serverLegalName );
132
+ static Map < String , ServerKubernetesObjects > getKubernetesObjects (String ns ) {
133
+ return ServerKubernetesObjectsManager .getServerKubernetesObjects ( ns );
130
134
}
131
135
132
136
/**
@@ -261,7 +265,11 @@ private static class StartNamespaceBeforeStep extends Step {
261
265
262
266
@ Override
263
267
public NextAction apply (Packet packet ) {
264
- if (isNamespaceStopping .putIfAbsent (ns , new AtomicBoolean (false )) == null ) {
268
+ AtomicBoolean a = isNamespaceStarted .computeIfAbsent (ns , (key ) -> new AtomicBoolean (false ));
269
+ boolean startingNow = !a .getAndSet (true );
270
+ packet .put (NS_STARTING_NOW , (Boolean ) startingNow );
271
+
272
+ if (startingNow ) {
265
273
try {
266
274
HealthCheckHelper .performSecurityChecks (version , operatorNamespace , ns );
267
275
} catch (Throwable e ) {
@@ -280,13 +288,12 @@ private static void stopNamespaces(Collection<String> namespacesToStop) {
280
288
if (stopping != null ) {
281
289
stopping .set (true );
282
290
}
291
+ isNamespaceStarted .remove (ns );
283
292
}
284
293
}
285
294
286
- private static final AtomicBoolean UNINITIALIZED_NS_STOPPING = new AtomicBoolean (true );
287
-
288
295
static AtomicBoolean isNamespaceStopping (String ns ) {
289
- return isNamespaceStopping .getOrDefault (ns , UNINITIALIZED_NS_STOPPING );
296
+ return isNamespaceStopping .computeIfAbsent (ns , ( key ) -> new AtomicBoolean ( false ) );
290
297
}
291
298
292
299
static void runSteps (Step firstStep ) {
@@ -527,21 +534,26 @@ public NextAction onFailure(Packet packet, CallResponse<DomainList> callResponse
527
534
528
535
@ Override
529
536
public NextAction onSuccess (Packet packet , CallResponse <DomainList > callResponse ) {
537
+ Boolean startingNow = (Boolean ) packet .get (NS_STARTING_NOW );
538
+ if (startingNow == null ) {
539
+ startingNow = Boolean .TRUE ;
540
+ }
541
+
530
542
Set <String > domainUIDs = new HashSet <>();
531
543
if (callResponse .getResult () != null ) {
532
544
for (Domain dom : callResponse .getResult ().getItems ()) {
533
545
String domainUID = dom .getSpec ().getDomainUID ();
534
546
domainUIDs .add (domainUID );
535
547
DomainPresenceInfo info = DomainPresenceInfoManager .getOrCreate (dom );
536
- if (isNamespaceStopping ( dom . getMetadata (). getNamespace ()). get () ) {
548
+ if (startingNow ) {
537
549
// Update domain here if namespace is not yet running
538
550
info .setDomain (dom );
539
551
}
540
552
DomainProcessor .makeRightDomainPresence (info , domainUID , dom , true , false , false );
541
553
}
542
554
}
543
555
544
- getDomainPresenceInfos ()
556
+ getDomainPresenceInfos (ns )
545
557
.forEach (
546
558
(key , value ) -> {
547
559
if (!domainUIDs .contains (key )) {
@@ -590,23 +602,66 @@ public NextAction onFailure(Packet packet, CallResponse<V1ServiceList> callRespo
590
602
@ Override
591
603
public NextAction onSuccess (Packet packet , CallResponse <V1ServiceList > callResponse ) {
592
604
V1ServiceList result = callResponse .getResult ();
605
+
606
+ Set <String > serviceNames = new HashSet <>();
607
+ Set <String > channelNames = new HashSet <>();
608
+ Set <String > clusterNames = new HashSet <>();
593
609
if (result != null ) {
594
610
for (V1Service service : result .getItems ()) {
595
611
String domainUID = ServiceWatcher .getServiceDomainUID (service );
596
612
String serverName = ServiceWatcher .getServiceServerName (service );
597
613
String channelName = ServiceWatcher .getServiceChannelName (service );
598
- if (domainUID != null && serverName != null ) {
614
+ String clusterName = ServiceWatcher .getServiceClusterName (service );
615
+ if (domainUID != null ) {
599
616
DomainPresenceInfo info = DomainPresenceInfoManager .getOrCreate (ns , domainUID );
600
- ServerKubernetesObjects sko =
601
- ServerKubernetesObjectsManager .getOrCreate (info , domainUID , serverName );
602
- if (channelName != null ) {
603
- sko .getChannels ().put (channelName , service );
604
- } else {
605
- sko .getService ().set (service );
617
+ if (clusterName != null ) {
618
+ clusterNames .add (clusterName );
619
+ info .getClusters ().put (clusterName , service );
620
+ } else if (serverName != null ) {
621
+ ServerKubernetesObjects sko =
622
+ ServerKubernetesObjectsManager .getOrCreate (info , domainUID , serverName );
623
+ if (channelName != null ) {
624
+ channelNames .add (channelName );
625
+ sko .getChannels ().put (channelName , service );
626
+ } else {
627
+ serviceNames .add (service .getMetadata ().getName ());
628
+ sko .getService ().set (service );
629
+ }
606
630
}
607
631
}
608
632
}
609
633
}
634
+
635
+ getDomainPresenceInfos (ns )
636
+ .forEach (
637
+ (key , value ) -> {
638
+ ConcurrentMap <String , V1Service > map = value .getClusters ();
639
+ map .forEach (
640
+ (ckey , cvalue ) -> {
641
+ map .compute (
642
+ ckey ,
643
+ (k , current ) -> {
644
+ return clusterNames .contains (ckey ) ? current : null ;
645
+ });
646
+ });
647
+ });
648
+ getKubernetesObjects (ns )
649
+ .forEach (
650
+ (key , value ) -> {
651
+ if (!serviceNames .contains (key )) {
652
+ value .getService ().set (null );
653
+ }
654
+ ConcurrentMap <String , V1Service > map = value .getChannels ();
655
+ map .forEach (
656
+ (ckey , cvalue ) -> {
657
+ map .compute (
658
+ ckey ,
659
+ (k , current ) -> {
660
+ return channelNames .contains (ckey ) ? current : null ;
661
+ });
662
+ });
663
+ });
664
+
610
665
if (!serviceWatchers .containsKey (ns )) {
611
666
serviceWatchers .put (ns , createServiceWatcher (ns , getInitialResourceVersion (result )));
612
667
}
@@ -665,18 +720,30 @@ public NextAction onFailure(Packet packet, CallResponse<V1PodList> callResponse)
665
720
@ Override
666
721
public NextAction onSuccess (Packet packet , CallResponse <V1PodList > callResponse ) {
667
722
V1PodList result = callResponse .getResult ();
723
+
724
+ Set <String > podNames = new HashSet <>();
668
725
if (result != null ) {
669
726
for (V1Pod pod : result .getItems ()) {
670
727
String domainUID = PodWatcher .getPodDomainUID (pod );
671
728
String serverName = PodWatcher .getPodServerName (pod );
672
729
if (domainUID != null && serverName != null ) {
730
+ podNames .add (pod .getMetadata ().getName ());
673
731
DomainPresenceInfo info = DomainPresenceInfoManager .getOrCreate (ns , domainUID );
674
732
ServerKubernetesObjects sko =
675
733
ServerKubernetesObjectsManager .getOrCreate (info , domainUID , serverName );
676
734
sko .getPod ().set (pod );
677
735
}
678
736
}
679
737
}
738
+
739
+ getKubernetesObjects (ns )
740
+ .forEach (
741
+ (key , value ) -> {
742
+ if (!podNames .contains (key )) {
743
+ value .getPod ().set (null );
744
+ }
745
+ });
746
+
680
747
if (!podWatchers .containsKey (ns )) {
681
748
podWatchers .put (ns , createPodWatcher (ns , getInitialResourceVersion (result )));
682
749
}
0 commit comments