4
4
5
5
package oracle .kubernetes .operator ;
6
6
7
- import java .util .ArrayList ;
8
- import java .util .List ;
9
- import java .util .Map ;
10
- import java .util .Optional ;
11
- import java .util .concurrent .ConcurrentHashMap ;
12
- import java .util .concurrent .ConcurrentMap ;
13
- import java .util .concurrent .ScheduledFuture ;
14
- import java .util .concurrent .TimeUnit ;
15
- import java .util .concurrent .atomic .AtomicInteger ;
16
- import javax .annotation .Nullable ;
7
+ import static oracle .kubernetes .operator .helpers .LegalNames .toJobIntrospectorName ;
17
8
18
9
import io .kubernetes .client .models .V1ConfigMap ;
10
+ import io .kubernetes .client .models .V1ContainerState ;
11
+ import io .kubernetes .client .models .V1ContainerStatus ;
19
12
import io .kubernetes .client .models .V1Event ;
20
13
import io .kubernetes .client .models .V1ObjectMeta ;
21
14
import io .kubernetes .client .models .V1ObjectReference ;
24
17
import io .kubernetes .client .models .V1Service ;
25
18
import io .kubernetes .client .models .V1ServiceList ;
26
19
import io .kubernetes .client .util .Watch ;
20
+ import java .util .ArrayList ;
21
+ import java .util .List ;
22
+ import java .util .Map ;
23
+ import java .util .Optional ;
24
+ import java .util .concurrent .ConcurrentHashMap ;
25
+ import java .util .concurrent .ConcurrentMap ;
26
+ import java .util .concurrent .ScheduledFuture ;
27
+ import java .util .concurrent .TimeUnit ;
28
+ import java .util .concurrent .atomic .AtomicInteger ;
29
+ import javax .annotation .Nullable ;
27
30
import oracle .kubernetes .operator .TuningParameters .MainTuning ;
28
31
import oracle .kubernetes .operator .calls .CallResponse ;
29
32
import oracle .kubernetes .operator .helpers .CallBuilder ;
@@ -148,12 +151,9 @@ static Step bringAdminServerUp(
148
151
return Step .chain (bringAdminServerUpSteps (info , podAwaiterStepFactory , next ));
149
152
}
150
153
151
- private static Step [] domainIntrospectionSteps (DomainPresenceInfo info , Step next ) {
152
- Domain dom = info .getDomain ();
154
+ private static Step [] domainIntrospectionSteps (Step next ) {
153
155
List <Step > resources = new ArrayList <>();
154
- resources .add (
155
- JobHelper .deleteDomainIntrospectorJobStep (
156
- dom .getDomainUid (), dom .getMetadata ().getNamespace (), null ));
156
+ resources .add (JobHelper .deleteDomainIntrospectorJobStep (null ));
157
157
resources .add (JobHelper .createDomainIntrospectorJobStep (next ));
158
158
return resources .toArray (new Step [0 ]);
159
159
}
@@ -206,40 +206,67 @@ public void stopNamespace(String ns) {
206
206
}
207
207
208
208
public void dispatchPodWatch (Watch .Response <V1Pod > item ) {
209
- V1Pod pod = item .object ;
210
- if (pod != null ) {
211
- V1ObjectMeta metadata = pod .getMetadata ();
212
- String domainUid = metadata .getLabels ().get (LabelConstants .DOMAINUID_LABEL );
213
- String serverName = metadata .getLabels ().get (LabelConstants .SERVERNAME_LABEL );
214
- if (domainUid != null && serverName != null ) {
215
- DomainPresenceInfo info = getExistingDomainPresenceInfo (metadata .getNamespace (), domainUid );
216
- if (info != null ) {
217
- switch (item .type ) {
218
- case "ADDED" :
219
- info .setServerPodBeingDeleted (serverName , Boolean .FALSE );
220
- // fall through
221
- case "MODIFIED" :
222
- info .setServerPodFromEvent (serverName , pod );
223
- break ;
224
- case "DELETED" :
225
- boolean removed = info .deleteServerPodFromEvent (serverName , pod );
226
- if (removed && info .isNotDeleting () && !info .isServerPodBeingDeleted (serverName )) {
227
- LOGGER .info (
228
- MessageKeys .POD_DELETED , domainUid , metadata .getNamespace (), serverName );
229
- makeRightDomainPresence (info , true , false , true );
230
- }
231
- break ;
209
+ if (getPodLabel (item .object , LabelConstants .DOMAINUID_LABEL ) == null ) return ;
232
210
233
- case "ERROR" :
234
- default :
235
- }
211
+ if (getPodLabel (item .object , LabelConstants .SERVERNAME_LABEL ) != null )
212
+ processServerPodWatch (item .object , item .type );
213
+ else if (getPodLabel (item .object , LabelConstants .JOBNAME_LABEL ) != null )
214
+ processIntrospectorJobPodWatch (item .object , item .type );
215
+ }
216
+
217
+ private void processServerPodWatch (V1Pod pod , String watchType ) {
218
+ String domainUid = getPodLabel (pod , LabelConstants .DOMAINUID_LABEL );
219
+ DomainPresenceInfo info = getExistingDomainPresenceInfo (getNamespace (pod ), domainUid );
220
+ if (info == null ) return ;
221
+
222
+ String serverName = getPodLabel (pod , LabelConstants .SERVERNAME_LABEL );
223
+ switch (watchType ) {
224
+ case "ADDED" :
225
+ info .setServerPodBeingDeleted (serverName , Boolean .FALSE );
226
+ // fall through
227
+ case "MODIFIED" :
228
+ info .setServerPodFromEvent (serverName , pod );
229
+ break ;
230
+ case "DELETED" :
231
+ boolean removed = info .deleteServerPodFromEvent (serverName , pod );
232
+ if (removed && info .isNotDeleting () && !info .isServerPodBeingDeleted (serverName )) {
233
+ LOGGER .info (MessageKeys .POD_DELETED , domainUid , getNamespace (pod ), serverName );
234
+ makeRightDomainPresence (info , true , false , true );
236
235
}
237
- }
236
+ break ;
237
+
238
+ case "ERROR" :
239
+ default :
238
240
}
239
241
}
240
242
241
- private V1Pod getNewerPod (V1Pod first , V1Pod second ) {
242
- return KubernetesUtils .isFirstNewer (getMetadata (first ), getMetadata (second )) ? first : second ;
243
+ private String getNamespace (V1Pod pod ) {
244
+ return Optional .ofNullable (pod )
245
+ .map (V1Pod ::getMetadata )
246
+ .map (V1ObjectMeta ::getNamespace )
247
+ .orElse (null );
248
+ }
249
+
250
+ private String getPodLabel (V1Pod pod , String labelName ) {
251
+ return Optional .ofNullable (pod )
252
+ .map (V1Pod ::getMetadata )
253
+ .map (V1ObjectMeta ::getLabels )
254
+ .map (m -> m .get (labelName ))
255
+ .orElse (null );
256
+ }
257
+
258
+ private void processIntrospectorJobPodWatch (V1Pod pod , String watchType ) {
259
+ String domainUid = getPodLabel (pod , LabelConstants .DOMAINUID_LABEL );
260
+ DomainPresenceInfo info = getExistingDomainPresenceInfo (getNamespace (pod ), domainUid );
261
+ if (info == null ) return ;
262
+
263
+ switch (watchType ) {
264
+ case "ADDED" :
265
+ case "MODIFIED" :
266
+ new DomainStatusUpdate (info .getDomain (), pod , domainUid ).invoke ();
267
+ break ;
268
+ default :
269
+ }
243
270
}
244
271
245
272
/* Recently, we've seen a number of intermittent bugs where K8s reports
@@ -248,10 +275,6 @@ private V1Pod getNewerPod(V1Pod first, V1Pod second) {
248
275
* a MODIFIED event for an object that has already had subsequent modifications.
249
276
*/
250
277
251
- private V1ObjectMeta getMetadata (V1Pod pod ) {
252
- return pod == null ? null : pod .getMetadata ();
253
- }
254
-
255
278
public void dispatchServiceWatch (Watch .Response <V1Service > item ) {
256
279
V1Service service = item .object ;
257
280
String domainUid = ServiceHelper .getServiceDomainUid (service );
@@ -360,32 +383,37 @@ private void scheduleDomainStatusUpdating(DomainPresenceInfo info) {
360
383
DomainStatusUpdater .createStatusStep (main .statusUpdateTimeoutSeconds , null );
361
384
FiberGate gate = getStatusFiberGate (info .getNamespace ());
362
385
363
- Fiber f = gate .startFiberIfNoCurrentFiber (
364
- info .getDomainUid (),
365
- strategy ,
366
- packet ,
367
- new CompletionCallback () {
368
- @ Override
369
- public void onCompletion (Packet packet ) {
370
- AtomicInteger serverHealthRead =
371
- packet .getValue (ProcessingConstants .REMAINING_SERVERS_HEALTH_TO_READ );
372
- if (serverHealthRead == null || serverHealthRead .get () == 0 ) {
373
- loggingFilter .setFiltering (false ).resetLogHistory ();
374
- } else {
375
- loggingFilter .setFiltering (true );
376
- }
377
- }
378
-
379
- @ Override
380
- public void onThrowable (Packet packet , Throwable throwable ) {
381
- LOGGER .severe (MessageKeys .EXCEPTION , throwable );
382
- loggingFilter .setFiltering (true );
383
- }
384
- });
386
+ Fiber f =
387
+ gate .startFiberIfNoCurrentFiber (
388
+ info .getDomainUid (),
389
+ strategy ,
390
+ packet ,
391
+ new CompletionCallback () {
392
+ @ Override
393
+ public void onCompletion (Packet packet ) {
394
+ AtomicInteger serverHealthRead =
395
+ packet .getValue (
396
+ ProcessingConstants .REMAINING_SERVERS_HEALTH_TO_READ );
397
+ if (serverHealthRead == null || serverHealthRead .get () == 0 ) {
398
+ loggingFilter .setFiltering (false ).resetLogHistory ();
399
+ } else {
400
+ loggingFilter .setFiltering (true );
401
+ }
402
+ }
403
+
404
+ @ Override
405
+ public void onThrowable (Packet packet , Throwable throwable ) {
406
+ LOGGER .severe (MessageKeys .EXCEPTION , throwable );
407
+ loggingFilter .setFiltering (true );
408
+ }
409
+ });
385
410
} catch (Throwable t ) {
386
411
LOGGER .severe (MessageKeys .EXCEPTION , t );
387
412
}
388
- }, main .initialShortDelay , main .initialShortDelay , TimeUnit .SECONDS ));
413
+ },
414
+ main .initialShortDelay ,
415
+ main .initialShortDelay ,
416
+ TimeUnit .SECONDS ));
389
417
}
390
418
391
419
public void makeRightDomainPresence (
@@ -429,6 +457,8 @@ public void makeRightDomainPresence(
429
457
430
458
private void internalMakeRightDomainPresence (
431
459
@ Nullable DomainPresenceInfo info , boolean isDeleting , boolean isWillInterrupt ) {
460
+ if (info == null ) return ;
461
+
432
462
String ns = info .getNamespace ();
433
463
String domainUid = info .getDomainUid ();
434
464
Domain dom = info .getDomain ();
@@ -456,6 +486,7 @@ private Step readExistingServices(DomainPresenceInfo info) {
456
486
.listServiceAsync (info .getNamespace (), new ServiceListStep (info ));
457
487
}
458
488
489
+ @ SuppressWarnings ("unused" )
459
490
private void runDomainPlan (
460
491
Domain dom ,
461
492
String domainUid ,
@@ -537,7 +568,6 @@ Step createDomainUpPlan(DomainPresenceInfo info) {
537
568
Step strategy =
538
569
Step .chain (
539
570
domainIntrospectionSteps (
540
- info ,
541
571
new DomainStatusStep (
542
572
info ,
543
573
bringAdminServerUp (
@@ -737,4 +767,33 @@ public NextAction apply(Packet packet) {
737
767
return doNext (packet );
738
768
}
739
769
}
770
+
771
+ private class DomainStatusUpdate {
772
+ private Domain domain ;
773
+ private V1Pod pod ;
774
+ private String domainUid ;
775
+
776
+ DomainStatusUpdate (Domain domain , V1Pod pod , String domainUid ) {
777
+ this .domain = domain ;
778
+ this .pod = pod ;
779
+ this .domainUid = domainUid ;
780
+ }
781
+
782
+ public void invoke () {
783
+ Optional .ofNullable (getMatchingContainerStatus (pod , domainUid ).getState ())
784
+ .map (V1ContainerState ::getWaiting )
785
+ .ifPresent (waiting -> updateStatus (waiting .getReason (), waiting .getMessage ()));
786
+ }
787
+
788
+ private void updateStatus (String reason , String message ) {
789
+ KubernetesUtils .updateStatus (domain , reason , message );
790
+ }
791
+
792
+ private V1ContainerStatus getMatchingContainerStatus (V1Pod pod , String domainUid ) {
793
+ return pod .getStatus ().getContainerStatuses ().stream ()
794
+ .filter (s -> toJobIntrospectorName (domainUid ).equals (s .getName ()))
795
+ .findFirst ()
796
+ .orElse (null );
797
+ }
798
+ }
740
799
}
0 commit comments