35
35
import oracle .kubernetes .operator .helpers .CallBuilderFactory ;
36
36
import oracle .kubernetes .operator .helpers .ConfigMapHelper ;
37
37
import oracle .kubernetes .operator .helpers .DomainPresenceInfo ;
38
- import oracle .kubernetes .operator .helpers .DomainPresenceInfoManager ;
39
38
import oracle .kubernetes .operator .helpers .HealthCheckHelper ;
40
39
import oracle .kubernetes .operator .helpers .HealthCheckHelper .KubernetesVersion ;
41
40
import oracle .kubernetes .operator .helpers .ResponseStep ;
42
41
import oracle .kubernetes .operator .helpers .ServerKubernetesObjects ;
43
- import oracle .kubernetes .operator .helpers .ServerKubernetesObjectsManager ;
44
42
import oracle .kubernetes .operator .logging .LoggingFacade ;
45
43
import oracle .kubernetes .operator .logging .LoggingFactory ;
46
44
import oracle .kubernetes .operator .logging .MessageKeys ;
@@ -67,7 +65,7 @@ private static ThreadFactory getThreadFactory() {
67
65
68
66
private static final LoggingFacade LOGGER = LoggingFactory .getLogger ("Operator" , "Operator" );
69
67
70
- private static final String NS_STARTING_NOW = "NS_STARTING_NOW " ;
68
+ private static final String DPI_MAP = "DPI_MAP " ;
71
69
72
70
static final TuningParameters tuningAndConfig ;
73
71
@@ -103,6 +101,7 @@ private static ThreadFactory getThreadFactory() {
103
101
}
104
102
105
103
static final Engine engine = new Engine (wrappedExecutorService );
104
+ private static final DomainProcessor processor = DomainProcessor .getInstance ();
106
105
107
106
static final ConcurrentMap <String , AtomicBoolean > isNamespaceStarted = new ConcurrentHashMap <>();
108
107
static final ConcurrentMap <String , AtomicBoolean > isNamespaceStopping = new ConcurrentHashMap <>();
@@ -125,14 +124,6 @@ private static ThreadFactory getThreadFactory() {
125
124
static final String READINESS_PROBE_FAILURE_EVENT_FILTER =
126
125
"reason=Unhealthy,type=Warning,involvedObject.fieldPath=spec.containers{weblogic-server}" ;
127
126
128
- static Map <String , DomainPresenceInfo > getDomainPresenceInfos (String ns ) {
129
- return DomainPresenceInfoManager .getDomainPresenceInfos (ns );
130
- }
131
-
132
- static Map <String , ServerKubernetesObjects > getKubernetesObjects (String ns ) {
133
- return ServerKubernetesObjectsManager .getServerKubernetesObjects (ns );
134
- }
135
-
136
127
/**
137
128
* Entry point
138
129
*
@@ -266,10 +257,7 @@ private static class StartNamespaceBeforeStep extends Step {
266
257
@ Override
267
258
public NextAction apply (Packet packet ) {
268
259
AtomicBoolean a = isNamespaceStarted .computeIfAbsent (ns , (key ) -> new AtomicBoolean (false ));
269
- boolean startingNow = !a .getAndSet (true );
270
- packet .put (NS_STARTING_NOW , (Boolean ) startingNow );
271
-
272
- if (startingNow ) {
260
+ if (!a .getAndSet (true )) {
273
261
try {
274
262
HealthCheckHelper .performSecurityChecks (version , operatorNamespace , ns );
275
263
} catch (Throwable e ) {
@@ -323,6 +311,7 @@ private static Runnable recheckDomains() {
323
311
324
312
static Step readExistingResources (String operatorNamespace , String ns ) {
325
313
return Step .chain (
314
+ new ReadExistingResourcesBeforeStep (),
326
315
ConfigMapHelper .createScriptConfigMapStep (operatorNamespace , ns ),
327
316
createConfigMapStep (ns ),
328
317
readExistingPods (ns ),
@@ -332,6 +321,15 @@ static Step readExistingResources(String operatorNamespace, String ns) {
332
321
readExistingDomains (ns ));
333
322
}
334
323
324
+ private static class ReadExistingResourcesBeforeStep extends Step {
325
+ @ SuppressWarnings ("rawtypes" )
326
+ @ Override
327
+ public NextAction apply (Packet packet ) {
328
+ packet .put (DPI_MAP , new ConcurrentHashMap ());
329
+ return doNext (packet );
330
+ }
331
+ }
332
+
335
333
private static Step readExistingDomains (String ns ) {
336
334
LOGGER .info (MessageKeys .LISTING_DOMAINS );
337
335
return callBuilderFactory .create ().listDomainAsync (ns , new DomainListStep (ns ));
@@ -363,7 +361,7 @@ private static Step readExistingPods(String ns) {
363
361
364
362
private static ConfigMapAfterStep createConfigMapStep (String ns ) {
365
363
return new ConfigMapAfterStep (
366
- ns , configMapWatchers , isNamespaceStopping (ns ), DomainProcessor ::dispatchConfigMapWatch );
364
+ ns , configMapWatchers , isNamespaceStopping (ns ), processor ::dispatchConfigMapWatch );
367
365
}
368
366
369
367
// -----------------------------------------------------------------------------
@@ -434,7 +432,7 @@ private static EventWatcher createEventWatcher(String ns, String initialResource
434
432
ns ,
435
433
READINESS_PROBE_FAILURE_EVENT_FILTER ,
436
434
initialResourceVersion ,
437
- DomainProcessor ::dispatchEventWatch ,
435
+ processor ::dispatchEventWatch ,
438
436
isNamespaceStopping (ns ));
439
437
}
440
438
@@ -443,7 +441,7 @@ private static PodWatcher createPodWatcher(String ns, String initialResourceVers
443
441
getThreadFactory (),
444
442
ns ,
445
443
initialResourceVersion ,
446
- DomainProcessor ::dispatchPodWatch ,
444
+ processor ::dispatchPodWatch ,
447
445
isNamespaceStopping (ns ));
448
446
}
449
447
@@ -452,7 +450,7 @@ private static ServiceWatcher createServiceWatcher(String ns, String initialReso
452
450
getThreadFactory (),
453
451
ns ,
454
452
initialResourceVersion ,
455
- DomainProcessor ::dispatchServiceWatch ,
453
+ processor ::dispatchServiceWatch ,
456
454
isNamespaceStopping (ns ));
457
455
}
458
456
@@ -461,7 +459,7 @@ private static IngressWatcher createIngressWatcher(String ns, String initialReso
461
459
getThreadFactory (),
462
460
ns ,
463
461
initialResourceVersion ,
464
- DomainProcessor ::dispatchIngressWatch ,
462
+ processor ::dispatchIngressWatch ,
465
463
isNamespaceStopping (ns ));
466
464
}
467
465
@@ -495,18 +493,22 @@ public NextAction onFailure(Packet packet, CallResponse<V1beta1IngressList> call
495
493
496
494
@ Override
497
495
public NextAction onSuccess (Packet packet , CallResponse <V1beta1IngressList > callResponse ) {
496
+ @ SuppressWarnings ("unchecked" )
497
+ Map <String , DomainPresenceInfo > dpis = (Map <String , DomainPresenceInfo >) packet .get (DPI_MAP );
498
+
498
499
V1beta1IngressList result = callResponse .getResult ();
499
500
if (result != null ) {
500
501
for (V1beta1Ingress ingress : result .getItems ()) {
501
502
String domainUID = IngressWatcher .getIngressDomainUID (ingress );
502
503
String clusterName = IngressWatcher .getIngressClusterName (ingress );
503
504
if (domainUID != null && clusterName != null ) {
504
- DomainPresenceInfoManager . getOrCreate ( ns , domainUID )
505
+ dpis . computeIfAbsent ( domainUID , k -> new DomainPresenceInfo ( ns , domainUID ) )
505
506
.getIngresses ()
506
507
.put (clusterName , ingress );
507
508
}
508
509
}
509
510
}
511
+
510
512
if (!ingressWatchers .containsKey (ns )) {
511
513
ingressWatchers .put (ns , createIngressWatcher (ns , getInitialResourceVersion (result )));
512
514
}
@@ -534,35 +536,41 @@ public NextAction onFailure(Packet packet, CallResponse<DomainList> callResponse
534
536
535
537
@ Override
536
538
public NextAction onSuccess (Packet packet , CallResponse <DomainList > callResponse ) {
537
- Boolean startingNow = (Boolean ) packet .get (NS_STARTING_NOW );
538
- if (startingNow == null ) {
539
- startingNow = Boolean .TRUE ;
540
- }
539
+ @ SuppressWarnings ("unchecked" )
540
+ Map <String , DomainPresenceInfo > dpis = (Map <String , DomainPresenceInfo >) packet .get (DPI_MAP );
541
+
542
+ DomainProcessor x = packet .getSPI (DomainProcessor .class );
543
+ DomainProcessor dp = x != null ? x : processor ;
541
544
542
545
Set <String > domainUIDs = new HashSet <>();
543
546
if (callResponse .getResult () != null ) {
544
547
for (Domain dom : callResponse .getResult ().getItems ()) {
545
548
String domainUID = dom .getSpec ().getDomainUID ();
546
549
domainUIDs .add (domainUID );
547
- DomainPresenceInfo info = DomainPresenceInfoManager .getOrCreate (dom );
548
- if (startingNow ) {
549
- // Update domain here if namespace is not yet running
550
- info .setDomain (dom );
551
- }
552
- DomainProcessor .makeRightDomainPresence (info , domainUID , dom , true , false , false );
550
+ DomainPresenceInfo info =
551
+ dpis .compute (
552
+ domainUID ,
553
+ (k , v ) -> {
554
+ if (v == null ) {
555
+ return new DomainPresenceInfo (dom );
556
+ }
557
+ v .setDomain (dom );
558
+ return v ;
559
+ });
560
+ info .setPopulated (true );
561
+ dp .makeRightDomainPresence (info , true , false , false );
553
562
}
554
563
}
555
564
556
- getDomainPresenceInfos (ns )
557
- .forEach (
558
- (key , value ) -> {
559
- if (!domainUIDs .contains (key )) {
560
- // This is a stranded DomainPresenceInfo.
561
- value .setDeleting (true );
562
- Domain dom = value .getDomain ();
563
- DomainProcessor .makeRightDomainPresence (value , key , dom , true , true , false );
564
- }
565
- });
565
+ dpis .forEach (
566
+ (key , value ) -> {
567
+ if (!domainUIDs .contains (key )) {
568
+ // This is a stranded DomainPresenceInfo.
569
+ value .setDeleting (true );
570
+ value .setPopulated (true );
571
+ dp .makeRightDomainPresence (value , true , true , false );
572
+ }
573
+ });
566
574
567
575
if (!domainWatchers .containsKey (ns )) {
568
576
domainWatchers .put (
@@ -580,7 +588,7 @@ private static DomainWatcher createDomainWatcher(String ns, String initialResour
580
588
getThreadFactory (),
581
589
ns ,
582
590
initialResourceVersion ,
583
- DomainProcessor ::dispatchDomainWatch ,
591
+ processor ::dispatchDomainWatch ,
584
592
isNamespaceStopping (ns ));
585
593
}
586
594
}
@@ -603,65 +611,33 @@ public NextAction onFailure(Packet packet, CallResponse<V1ServiceList> callRespo
603
611
public NextAction onSuccess (Packet packet , CallResponse <V1ServiceList > callResponse ) {
604
612
V1ServiceList result = callResponse .getResult ();
605
613
606
- Set < String > serviceNames = new HashSet <>();
607
- Set <String > channelNames = new HashSet <>( );
608
- Set < String > clusterNames = new HashSet <>();
614
+ @ SuppressWarnings ( "unchecked" )
615
+ Map <String , DomainPresenceInfo > dpis = ( Map < String , DomainPresenceInfo >) packet . get ( DPI_MAP );
616
+
609
617
if (result != null ) {
610
618
for (V1Service service : result .getItems ()) {
611
619
String domainUID = ServiceWatcher .getServiceDomainUID (service );
612
620
String serverName = ServiceWatcher .getServiceServerName (service );
613
621
String channelName = ServiceWatcher .getServiceChannelName (service );
614
622
String clusterName = ServiceWatcher .getServiceClusterName (service );
615
623
if (domainUID != null ) {
616
- DomainPresenceInfo info = DomainPresenceInfoManager .getOrCreate (ns , domainUID );
624
+ DomainPresenceInfo info =
625
+ dpis .computeIfAbsent (domainUID , k -> new DomainPresenceInfo (ns , domainUID ));
617
626
if (clusterName != null ) {
618
- clusterNames .add (clusterName );
619
627
info .getClusters ().put (clusterName , service );
620
628
} else if (serverName != null ) {
621
629
ServerKubernetesObjects sko =
622
- ServerKubernetesObjectsManager . getOrCreate ( info , domainUID , serverName );
630
+ info . getServers (). computeIfAbsent ( serverName , k -> new ServerKubernetesObjects () );
623
631
if (channelName != null ) {
624
- channelNames .add (channelName );
625
632
sko .getChannels ().put (channelName , service );
626
633
} else {
627
- serviceNames .add (service .getMetadata ().getName ());
628
634
sko .getService ().set (service );
629
635
}
630
636
}
631
637
}
632
638
}
633
639
}
634
640
635
- getDomainPresenceInfos (ns )
636
- .forEach (
637
- (key , value ) -> {
638
- ConcurrentMap <String , V1Service > map = value .getClusters ();
639
- map .forEach (
640
- (ckey , cvalue ) -> {
641
- map .compute (
642
- ckey ,
643
- (k , current ) -> {
644
- return clusterNames .contains (ckey ) ? current : null ;
645
- });
646
- });
647
- });
648
- getKubernetesObjects (ns )
649
- .forEach (
650
- (key , value ) -> {
651
- if (!serviceNames .contains (key )) {
652
- value .getService ().set (null );
653
- }
654
- ConcurrentMap <String , V1Service > map = value .getChannels ();
655
- map .forEach (
656
- (ckey , cvalue ) -> {
657
- map .compute (
658
- ckey ,
659
- (k , current ) -> {
660
- return channelNames .contains (ckey ) ? current : null ;
661
- });
662
- });
663
- });
664
-
665
641
if (!serviceWatchers .containsKey (ns )) {
666
642
serviceWatchers .put (ns , createServiceWatcher (ns , getInitialResourceVersion (result )));
667
643
}
@@ -721,29 +697,23 @@ public NextAction onFailure(Packet packet, CallResponse<V1PodList> callResponse)
721
697
public NextAction onSuccess (Packet packet , CallResponse <V1PodList > callResponse ) {
722
698
V1PodList result = callResponse .getResult ();
723
699
724
- Set <String > podNames = new HashSet <>();
700
+ @ SuppressWarnings ("unchecked" )
701
+ Map <String , DomainPresenceInfo > dpis = (Map <String , DomainPresenceInfo >) packet .get (DPI_MAP );
702
+
725
703
if (result != null ) {
726
704
for (V1Pod pod : result .getItems ()) {
727
705
String domainUID = PodWatcher .getPodDomainUID (pod );
728
706
String serverName = PodWatcher .getPodServerName (pod );
729
707
if (domainUID != null && serverName != null ) {
730
- podNames . add ( pod . getMetadata (). getName ());
731
- DomainPresenceInfo info = DomainPresenceInfoManager . getOrCreate ( ns , domainUID );
708
+ DomainPresenceInfo info =
709
+ dpis . computeIfAbsent ( domainUID , k -> new DomainPresenceInfo ( ns , domainUID ) );
732
710
ServerKubernetesObjects sko =
733
- ServerKubernetesObjectsManager . getOrCreate ( info , domainUID , serverName );
711
+ info . getServers (). computeIfAbsent ( serverName , k -> new ServerKubernetesObjects () );
734
712
sko .getPod ().set (pod );
735
713
}
736
714
}
737
715
}
738
716
739
- getKubernetesObjects (ns )
740
- .forEach (
741
- (key , value ) -> {
742
- if (!podNames .contains (key )) {
743
- value .getPod ().set (null );
744
- }
745
- });
746
-
747
717
if (!podWatchers .containsKey (ns )) {
748
718
podWatchers .put (ns , createPodWatcher (ns , getInitialResourceVersion (result )));
749
719
}
0 commit comments