24
24
import java .util .StringTokenizer ;
25
25
import java .util .TreeSet ;
26
26
import java .util .concurrent .ConcurrentHashMap ;
27
+ import java .util .concurrent .ConcurrentMap ;
27
28
import java .util .concurrent .ScheduledExecutorService ;
28
29
import java .util .concurrent .ThreadFactory ;
29
30
import java .util .concurrent .TimeUnit ;
@@ -66,6 +67,8 @@ private static ThreadFactory getThreadFactory() {
66
67
67
68
private static final LoggingFacade LOGGER = LoggingFactory .getLogger ("Operator" , "Operator" );
68
69
70
+ private static final String NS_STARTING_NOW = "NS_STARTING_NOW" ;
71
+
69
72
static final TuningParameters tuningAndConfig ;
70
73
71
74
static {
@@ -101,7 +104,8 @@ private static ThreadFactory getThreadFactory() {
101
104
102
105
static final Engine engine = new Engine (wrappedExecutorService );
103
106
104
- static final Map <String , AtomicBoolean > isNamespaceStopping = new ConcurrentHashMap <>();
107
+ static final ConcurrentMap <String , AtomicBoolean > isNamespaceStarted = new ConcurrentHashMap <>();
108
+ static final ConcurrentMap <String , AtomicBoolean > isNamespaceStopping = new ConcurrentHashMap <>();
105
109
106
110
private static final Map <String , ConfigMapWatcher > configMapWatchers = new ConcurrentHashMap <>();
107
111
private static final Map <String , DomainWatcher > domainWatchers = new ConcurrentHashMap <>();
@@ -121,12 +125,12 @@ private static ThreadFactory getThreadFactory() {
121
125
static final String READINESS_PROBE_FAILURE_EVENT_FILTER =
122
126
"reason=Unhealthy,type=Warning,involvedObject.fieldPath=spec.containers{weblogic-server}" ;
123
127
124
- static Map <String , DomainPresenceInfo > getDomainPresenceInfos () {
125
- return DomainPresenceInfoManager .getDomainPresenceInfos ();
128
+ static Map <String , DomainPresenceInfo > getDomainPresenceInfos (String ns ) {
129
+ return DomainPresenceInfoManager .getDomainPresenceInfos (ns );
126
130
}
127
131
128
- static ServerKubernetesObjects getKubernetesObjects (String serverLegalName ) {
129
- return ServerKubernetesObjectsManager .lookup ( serverLegalName );
132
+ static Map < String , ServerKubernetesObjects > getKubernetesObjects (String ns ) {
133
+ return ServerKubernetesObjectsManager .getServerKubernetesObjects ( ns );
130
134
}
131
135
132
136
/**
@@ -205,8 +209,6 @@ private static void begin() {
205
209
}
206
210
207
211
private static void completeBegin () {
208
- DomainProcessor .deleteStrandedResources ();
209
-
210
212
try {
211
213
// start the REST server
212
214
startRestServer (principal , isNamespaceStopping .keySet ());
@@ -263,7 +265,11 @@ private static class StartNamespaceBeforeStep extends Step {
263
265
264
266
@ Override
265
267
public NextAction apply (Packet packet ) {
266
- if (isNamespaceStopping .putIfAbsent (ns , new AtomicBoolean (false )) == null ) {
268
+ AtomicBoolean a = isNamespaceStarted .computeIfAbsent (ns , (key ) -> new AtomicBoolean (false ));
269
+ boolean startingNow = !a .getAndSet (true );
270
+ packet .put (NS_STARTING_NOW , (Boolean ) startingNow );
271
+
272
+ if (startingNow ) {
267
273
try {
268
274
HealthCheckHelper .performSecurityChecks (version , operatorNamespace , ns );
269
275
} catch (Throwable e ) {
@@ -282,13 +288,12 @@ private static void stopNamespaces(Collection<String> namespacesToStop) {
282
288
if (stopping != null ) {
283
289
stopping .set (true );
284
290
}
291
+ isNamespaceStarted .remove (ns );
285
292
}
286
293
}
287
294
288
- private static final AtomicBoolean UNINITIALIZED_NS_STOPPING = new AtomicBoolean (true );
289
-
290
295
static AtomicBoolean isNamespaceStopping (String ns ) {
291
- return isNamespaceStopping .getOrDefault (ns , UNINITIALIZED_NS_STOPPING );
296
+ return isNamespaceStopping .computeIfAbsent (ns , ( key ) -> new AtomicBoolean ( false ) );
292
297
}
293
298
294
299
static void runSteps (Step firstStep ) {
@@ -312,7 +317,7 @@ private static Runnable recheckDomains() {
312
317
namespacesToStop .removeAll (targetNamespaces );
313
318
stopNamespaces (namespacesToStop );
314
319
315
- runSteps (new StartNamespacesStep (targetNamespaces ), DomainProcessor :: deleteStrandedResources );
320
+ runSteps (new StartNamespacesStep (targetNamespaces ));
316
321
};
317
322
}
318
323
@@ -468,7 +473,7 @@ static String getOperatorNamespace() {
468
473
return namespace ;
469
474
}
470
475
471
- private static Collection <String > getTargetNamespaces () {
476
+ public static Collection <String > getTargetNamespaces () {
472
477
String namespace = getOperatorNamespace ();
473
478
474
479
return getTargetNamespaces (tuningAndConfig .get ("targetNamespaces" ), namespace );
@@ -529,30 +534,33 @@ public NextAction onFailure(Packet packet, CallResponse<DomainList> callResponse
529
534
530
535
@ Override
531
536
public NextAction onSuccess (Packet packet , CallResponse <DomainList > callResponse ) {
537
+ Boolean startingNow = (Boolean ) packet .get (NS_STARTING_NOW );
538
+ if (startingNow == null ) {
539
+ startingNow = Boolean .TRUE ;
540
+ }
541
+
532
542
Set <String > domainUIDs = new HashSet <>();
533
543
if (callResponse .getResult () != null ) {
534
544
for (Domain dom : callResponse .getResult ().getItems ()) {
535
545
String domainUID = dom .getSpec ().getDomainUID ();
536
546
domainUIDs .add (domainUID );
537
547
DomainPresenceInfo info = DomainPresenceInfoManager .getOrCreate (dom );
538
- if (isNamespaceStopping ( dom . getMetadata (). getNamespace ()). get () ) {
548
+ if (startingNow ) {
539
549
// Update domain here if namespace is not yet running
540
550
info .setDomain (dom );
541
551
}
542
- DomainProcessor .makeRightDomainPresence (info , dom , true , false , false );
552
+ DomainProcessor .makeRightDomainPresence (info , domainUID , dom , true , false , false );
543
553
}
544
554
}
545
555
546
- getDomainPresenceInfos ()
556
+ getDomainPresenceInfos (ns )
547
557
.forEach (
548
558
(key , value ) -> {
549
- Domain d = value .getDomain ();
550
- if (d != null && ns .equals (d .getMetadata ().getNamespace ())) {
551
- if (!domainUIDs .contains (d .getSpec ().getDomainUID ())) {
552
- // This is a stranded DomainPresenceInfo. Clear the Domain reference
553
- // so that stranded resources are marked for clean-up.
554
- value .setDomain (null );
555
- }
559
+ if (!domainUIDs .contains (key )) {
560
+ // This is a stranded DomainPresenceInfo.
561
+ value .setDeleting (true );
562
+ Domain dom = value .getDomain ();
563
+ DomainProcessor .makeRightDomainPresence (value , key , dom , true , true , false );
556
564
}
557
565
});
558
566
@@ -594,23 +602,66 @@ public NextAction onFailure(Packet packet, CallResponse<V1ServiceList> callRespo
594
602
@ Override
595
603
public NextAction onSuccess (Packet packet , CallResponse <V1ServiceList > callResponse ) {
596
604
V1ServiceList result = callResponse .getResult ();
605
+
606
+ Set <String > serviceNames = new HashSet <>();
607
+ Set <String > channelNames = new HashSet <>();
608
+ Set <String > clusterNames = new HashSet <>();
597
609
if (result != null ) {
598
610
for (V1Service service : result .getItems ()) {
599
611
String domainUID = ServiceWatcher .getServiceDomainUID (service );
600
612
String serverName = ServiceWatcher .getServiceServerName (service );
601
613
String channelName = ServiceWatcher .getServiceChannelName (service );
602
- if (domainUID != null && serverName != null ) {
614
+ String clusterName = ServiceWatcher .getServiceClusterName (service );
615
+ if (domainUID != null ) {
603
616
DomainPresenceInfo info = DomainPresenceInfoManager .getOrCreate (ns , domainUID );
604
- ServerKubernetesObjects sko =
605
- ServerKubernetesObjectsManager .getOrCreate (info , domainUID , serverName );
606
- if (channelName != null ) {
607
- sko .getChannels ().put (channelName , service );
608
- } else {
609
- sko .getService ().set (service );
617
+ if (clusterName != null ) {
618
+ clusterNames .add (clusterName );
619
+ info .getClusters ().put (clusterName , service );
620
+ } else if (serverName != null ) {
621
+ ServerKubernetesObjects sko =
622
+ ServerKubernetesObjectsManager .getOrCreate (info , domainUID , serverName );
623
+ if (channelName != null ) {
624
+ channelNames .add (channelName );
625
+ sko .getChannels ().put (channelName , service );
626
+ } else {
627
+ serviceNames .add (service .getMetadata ().getName ());
628
+ sko .getService ().set (service );
629
+ }
610
630
}
611
631
}
612
632
}
613
633
}
634
+
635
+ getDomainPresenceInfos (ns )
636
+ .forEach (
637
+ (key , value ) -> {
638
+ ConcurrentMap <String , V1Service > map = value .getClusters ();
639
+ map .forEach (
640
+ (ckey , cvalue ) -> {
641
+ map .compute (
642
+ ckey ,
643
+ (k , current ) -> {
644
+ return clusterNames .contains (ckey ) ? current : null ;
645
+ });
646
+ });
647
+ });
648
+ getKubernetesObjects (ns )
649
+ .forEach (
650
+ (key , value ) -> {
651
+ if (!serviceNames .contains (key )) {
652
+ value .getService ().set (null );
653
+ }
654
+ ConcurrentMap <String , V1Service > map = value .getChannels ();
655
+ map .forEach (
656
+ (ckey , cvalue ) -> {
657
+ map .compute (
658
+ ckey ,
659
+ (k , current ) -> {
660
+ return channelNames .contains (ckey ) ? current : null ;
661
+ });
662
+ });
663
+ });
664
+
614
665
if (!serviceWatchers .containsKey (ns )) {
615
666
serviceWatchers .put (ns , createServiceWatcher (ns , getInitialResourceVersion (result )));
616
667
}
@@ -669,18 +720,30 @@ public NextAction onFailure(Packet packet, CallResponse<V1PodList> callResponse)
669
720
@ Override
670
721
public NextAction onSuccess (Packet packet , CallResponse <V1PodList > callResponse ) {
671
722
V1PodList result = callResponse .getResult ();
723
+
724
+ Set <String > podNames = new HashSet <>();
672
725
if (result != null ) {
673
726
for (V1Pod pod : result .getItems ()) {
674
727
String domainUID = PodWatcher .getPodDomainUID (pod );
675
728
String serverName = PodWatcher .getPodServerName (pod );
676
729
if (domainUID != null && serverName != null ) {
730
+ podNames .add (pod .getMetadata ().getName ());
677
731
DomainPresenceInfo info = DomainPresenceInfoManager .getOrCreate (ns , domainUID );
678
732
ServerKubernetesObjects sko =
679
733
ServerKubernetesObjectsManager .getOrCreate (info , domainUID , serverName );
680
734
sko .getPod ().set (pod );
681
735
}
682
736
}
683
737
}
738
+
739
+ getKubernetesObjects (ns )
740
+ .forEach (
741
+ (key , value ) -> {
742
+ if (!podNames .contains (key )) {
743
+ value .getPod ().set (null );
744
+ }
745
+ });
746
+
684
747
if (!podWatchers .containsKey (ns )) {
685
748
podWatchers .put (ns , createPodWatcher (ns , getInitialResourceVersion (result )));
686
749
}
0 commit comments