@@ -34,6 +34,9 @@ import (
34
34
ctrl "sigs.k8s.io/controller-runtime"
35
35
"sigs.k8s.io/controller-runtime/pkg/client"
36
36
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
37
+ "sigs.k8s.io/controller-runtime/pkg/handler"
38
+ "sigs.k8s.io/controller-runtime/pkg/reconcile"
39
+ "sigs.k8s.io/controller-runtime/pkg/source"
37
40
)
38
41
39
42
const finalizerName = "datasciencepipelinesapplications.opendatahub.io/finalizer"
@@ -118,29 +121,14 @@ func (r *DSPAReconciler) buildCondition(conditionType string, dspa *dspav1alpha1
118
121
return condition
119
122
}
120
123
121
- // isDeploymentInCondition evaluates if condition with "name" is in condition of type "conditionType".
122
- // this procedure is valid only for conditions with bool status type, for conditions of non bool type
123
- // results are undefined.
124
- func (r * DSPAReconciler ) isDeploymentInCondition (ctx context.Context ,
125
- dspa * dspav1alpha1.DataSciencePipelinesApplication , name string ,
126
- conditionType appsv1.DeploymentConditionType ) (bool , appsv1.DeploymentCondition ) {
127
- found := & appsv1.Deployment {}
128
-
129
- // Every Deployment in DSPA is the name followed by the DSPA CR name
130
- component := name + "-" + dspa .Name
131
-
132
- err := r .Get (ctx , types.NamespacedName {Name : component , Namespace : dspa .Namespace }, found )
133
- if err == nil {
134
- if found .Spec .Replicas != nil && * found .Spec .Replicas == 0 {
135
- return false , appsv1.DeploymentCondition {}
136
- }
137
- for _ , s := range found .Status .Conditions {
138
- if s .Type == conditionType && s .Status == corev1 .ConditionTrue {
139
- return true , s
140
- }
124
+ func GetDeploymentCondition (status appsv1.DeploymentStatus , condType appsv1.DeploymentConditionType ) * appsv1.DeploymentCondition {
125
+ for i := range status .Conditions {
126
+ c := status .Conditions [i ]
127
+ if c .Type == condType {
128
+ return & c
141
129
}
142
130
}
143
- return false , appsv1. DeploymentCondition {}
131
+ return nil
144
132
}
145
133
146
134
//+kubebuilder:rbac:groups=datasciencepipelinesapplications.opendatahub.io,resources=datasciencepipelinesapplications,verbs=get;list;watch;create;update;patch;delete
@@ -274,7 +262,11 @@ func (r *DSPAReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
274
262
return ctrl.Result {}, err
275
263
}
276
264
277
- conditions := r .GenerateStatus (ctx , dspa )
265
+ err , conditions := r .GenerateStatus (ctx , dspa )
266
+ if err != nil {
267
+ log .Info (err .Error ())
268
+ return ctrl.Result {}, err
269
+ }
278
270
dspa .Status .Conditions = conditions
279
271
280
272
// Update Status
@@ -306,40 +298,162 @@ func GetConditionByType(t string, conditions []metav1.Condition) metav1.Conditio
306
298
return metav1.Condition {}
307
299
}
308
300
309
- func (r * DSPAReconciler ) GenerateStatus (ctx context.Context , dspa * dspav1alpha1.DataSciencePipelinesApplication ) []metav1.Condition {
310
- var conditions []metav1.Condition
301
+ // isDeploymentInCondition evaluates if condition with "name" is in condition of type "conditionType".
302
+ // this procedure is valid only for conditions with bool status type, for conditions of non bool type
303
+ // results are undefined.
304
+ func (r * DSPAReconciler ) handleReadyCondition (
305
+ ctx context.Context ,
306
+ dspa * dspav1alpha1.DataSciencePipelinesApplication ,
307
+ name string ,
308
+ condition string ,
309
+ ) (error , metav1.Condition ) {
310
+ readyCondition := r .buildCondition (condition , dspa , config .MinimumReplicasAvailable )
311
+ deployment := & appsv1.Deployment {}
311
312
312
- apiServerReady := r .buildCondition (config .APIServerReady , dspa , config .MinimumReplicasAvailable )
313
- deploymentAvailable , _ := r .isDeploymentInCondition (ctx , dspa , "ds-pipeline" , appsv1 .DeploymentAvailable )
314
- if deploymentAvailable {
315
- apiServerReady .Status = metav1 .ConditionTrue
313
+ // Every Deployment in DSPA is the name followed by the DSPA CR name
314
+ component := name + "-" + dspa .Name
315
+
316
+ err := r .Get (ctx , types.NamespacedName {Name : component , Namespace : dspa .Namespace }, deployment )
317
+ if err != nil {
318
+ return err , metav1.Condition {}
319
+ }
320
+
321
+ // First check if deployment is scaled down, if it is, component is deemed not ready
322
+ if deployment .Spec .Replicas != nil && * deployment .Spec .Replicas == 0 {
323
+ readyCondition .Reason = config .MinimumReplicasAvailable
324
+ readyCondition .Status = metav1 .ConditionFalse
325
+ readyCondition .Message = fmt .Sprintf ("Deployment for component \" %s\" is scaled down." , component )
326
+ return nil , readyCondition
327
+ }
328
+
329
+ // At this point component is not minimally available, possible scenarios:
330
+ // 1. Component deployment has encountered errors
331
+ // 2. Component is still deploying
332
+ // We check for (1), and if no errors are found we presume (2)
333
+
334
+ progressingCond := GetDeploymentCondition (deployment .Status , appsv1 .DeploymentProgressing )
335
+ availableCond := GetDeploymentCondition (deployment .Status , appsv1 .DeploymentAvailable )
336
+ replicaFailureCond := GetDeploymentCondition (deployment .Status , appsv1 .DeploymentReplicaFailure )
337
+
338
+ if availableCond != nil && availableCond .Status == corev1 .ConditionTrue {
339
+ // If this DSPA component is minimally available, we are done.
340
+ readyCondition .Reason = config .MinimumReplicasAvailable
341
+ readyCondition .Status = metav1 .ConditionTrue
342
+ readyCondition .Message = fmt .Sprintf ("Component [%s] is minimally available." , component )
343
+ return nil , readyCondition
344
+ }
345
+
346
+ // There are two possible reasons for progress failing, deadline and replica create error:
347
+ // https://github.com/kubernetes/kubernetes/blob/release-1.27/pkg/controller/deployment/util/deployment_util.go#L69
348
+ // We check for both to investigate potential issues during deployment
349
+ if progressingCond != nil && progressingCond .Status == corev1 .ConditionFalse &&
350
+ (progressingCond .Reason == "ProgressDeadlineExceeded" || progressingCond .Reason == "ReplicaSetCreateError" ) {
351
+ readyCondition .Reason = config .FailingToDeploy
352
+ readyCondition .Status = metav1 .ConditionFalse
353
+ readyCondition .Message = fmt .Sprintf ("Component [%s] has failed to progress. Reason: [%s]. " +
354
+ "Message: [%s]" , component , progressingCond .Reason , progressingCond .Message )
355
+ return nil , readyCondition
356
+ }
357
+
358
+ if replicaFailureCond != nil && replicaFailureCond .Status == corev1 .ConditionTrue {
359
+ readyCondition .Reason = config .FailingToDeploy
360
+ readyCondition .Status = metav1 .ConditionFalse
361
+ readyCondition .Message = fmt .Sprintf ("Component's replica [%s] has failed to create. Reason: [%s]. " +
362
+ "Message: [%s]" , component , replicaFailureCond .Reason , replicaFailureCond .Message )
363
+ return nil , readyCondition
364
+ }
365
+
366
+ // Search through the pods associated with this deployment
367
+ // if a failed pod is encountered, report Ready=false with failure
368
+ // message
369
+ podList := & corev1.PodList {}
370
+ opts := []client.ListOption {
371
+ client .MatchingLabels (deployment .Spec .Selector .MatchLabels ),
372
+ }
373
+ err = r .Client .List (ctx , podList , opts ... )
374
+ if err != nil {
375
+ return err , metav1.Condition {}
376
+ }
377
+
378
+ hasPodFailures := false
379
+ podFailureMessage := ""
380
+ // We loop through all pods within this deployment and inspect their statuses for failures
381
+ // Any failure detected in any pod results in FailingToDeploy status
382
+ for _ , p := range podList .Items {
383
+ if p .Status .Phase == corev1 .PodFailed {
384
+ hasPodFailures = true
385
+ podFailureMessage += fmt .Sprintf ("Pod named [%s] that is associated with this component [%s] " +
386
+ "is in failed phase." , p .Name , component )
387
+ }
388
+ // We loop through the containers in each pod, as in some cases the Pod can be in pending state
389
+ // but an individual container may be failing due to runtime errors.
390
+ for _ , c := range p .Status .ContainerStatuses {
391
+ if c .State .Waiting != nil && c .State .Waiting .Reason == "CrashLoopBackOff" {
392
+ readyCondition .Reason = config .FailingToDeploy
393
+ readyCondition .Status = metav1 .ConditionFalse
394
+ // We concatenate messages from all failing containers.
395
+ readyCondition .Message = fmt .Sprintf ("Component [%s] is in CrashLoopBackOff. " +
396
+ "Message from pod: [%s]" , component , c .State .Waiting .Message )
397
+ return nil , readyCondition
398
+ }
399
+ }
316
400
}
317
- conditions = append (conditions , apiServerReady )
318
401
319
- persistenceAgentReady := r .buildCondition (config .PersistenceAgentReady , dspa , config .MinimumReplicasAvailable )
320
- deploymentAvailable , _ = r .isDeploymentInCondition (ctx , dspa , "ds-pipeline-persistenceagent" , appsv1 .DeploymentAvailable )
321
- if deploymentAvailable {
322
- persistenceAgentReady .Status = metav1 .ConditionTrue
402
+ if hasPodFailures {
403
+ readyCondition .Status = metav1 .ConditionFalse
404
+ readyCondition .Reason = config .FailingToDeploy
405
+ readyCondition .Message = podFailureMessage
406
+ return nil , readyCondition
323
407
}
324
- conditions = append (conditions , persistenceAgentReady )
325
408
326
- scheduledWorkflowReady := r .buildCondition (config .ScheduledWorkflowReady , dspa , config .MinimumReplicasAvailable )
327
- deploymentAvailable , _ = r .isDeploymentInCondition (ctx , dspa , "ds-pipeline-scheduledworkflow" , appsv1 .DeploymentAvailable )
328
- if deploymentAvailable {
329
- scheduledWorkflowReady .Status = metav1 .ConditionTrue
409
+ // No errors encountered, assume deployment is progressing successfully
410
+ // If this DSPA component is minimally available, we are done.
411
+ readyCondition .Reason = config .Deploying
412
+ readyCondition .Status = metav1 .ConditionFalse
413
+ readyCondition .Message = fmt .Sprintf ("Component [%s] is deploying." , component )
414
+ return nil , readyCondition
415
+
416
+ }
417
+
418
+ func (r * DSPAReconciler ) GenerateStatus (ctx context.Context , dspa * dspav1alpha1.DataSciencePipelinesApplication ) (error , []metav1.Condition ) {
419
+
420
+ err , apiServerReady := r .handleReadyCondition (ctx , dspa , "ds-pipeline" , config .APIServerReady )
421
+ if err != nil {
422
+ return err , []metav1.Condition {}
423
+ }
424
+ err , persistenceAgentReady := r .handleReadyCondition (ctx , dspa , "ds-pipeline-persistenceagent" , config .PersistenceAgentReady )
425
+ if err != nil {
426
+ return err , []metav1.Condition {}
427
+ }
428
+ err , scheduledWorkflowReady := r .handleReadyCondition (ctx , dspa , "ds-pipeline-scheduledworkflow" , config .ScheduledWorkflowReady )
429
+ if err != nil {
430
+ return err , []metav1.Condition {}
330
431
}
432
+ var conditions []metav1.Condition
433
+ conditions = append (conditions , apiServerReady )
434
+ conditions = append (conditions , persistenceAgentReady )
331
435
conditions = append (conditions , scheduledWorkflowReady )
332
436
437
+ // Compute Ready Logic for the CR
333
438
crReady := r .buildCondition (config .CrReady , dspa , config .MinimumReplicasAvailable )
334
439
crReady .Type = config .CrReady
335
440
336
- // Compute Ready Logic for the CR
337
- if (apiServerReady .Status == metav1 .ConditionTrue ) &&
338
- (persistenceAgentReady .Status == metav1 .ConditionTrue ) &&
339
- (scheduledWorkflowReady .Status == metav1 .ConditionTrue ) {
441
+ componentConditions := []metav1.Condition {apiServerReady , persistenceAgentReady , scheduledWorkflowReady }
442
+ allReady := true
443
+ failureMessages := ""
444
+ for _ , c := range componentConditions {
445
+ if c .Status == metav1 .ConditionFalse {
446
+ allReady = false
447
+ failureMessages += fmt .Sprintf ("%s \n " , c .Message )
448
+ }
449
+ }
450
+
451
+ if allReady {
340
452
crReady .Status = metav1 .ConditionTrue
453
+ crReady .Message = "All components are ready."
341
454
} else {
342
455
crReady .Status = metav1 .ConditionFalse
456
+ crReady .Message = failureMessages
343
457
}
344
458
conditions = append (conditions , crReady )
345
459
@@ -349,7 +463,7 @@ func (r *DSPAReconciler) GenerateStatus(ctx context.Context, dspa *dspav1alpha1.
349
463
}
350
464
}
351
465
352
- return conditions
466
+ return nil , conditions
353
467
}
354
468
355
469
func (r * DSPAReconciler ) PublishMetrics (dspa * dspav1alpha1.DataSciencePipelinesApplication ,
@@ -403,6 +517,33 @@ func (r *DSPAReconciler) SetupWithManager(mgr ctrl.Manager) error {
403
517
Owns (& rbacv1.Role {}).
404
518
Owns (& rbacv1.RoleBinding {}).
405
519
Owns (& routev1.Route {}).
520
+ // Watch for Pods belonging to DSPA
521
+ Watches (& source.Kind {Type : & corev1.Pod {}},
522
+ handler .EnqueueRequestsFromMapFunc (func (o client.Object ) []reconcile.Request {
523
+ log := r .Log .WithValues ("namespace" , o .GetNamespace ())
524
+
525
+ component , hasComponentLabel := o .GetLabels ()["component" ]
526
+
527
+ if ! hasComponentLabel || (component != "data-science-pipelines" ) {
528
+ return []reconcile.Request {}
529
+ }
530
+
531
+ dspaName , hasDSPALabel := o .GetLabels ()["dspa" ]
532
+ if ! hasDSPALabel {
533
+ msg := fmt .Sprintf ("Pod with data-science-pipelines label encountered, but is missing dspa " +
534
+ "label, could not reconcile on [Pod: %s] " , o .GetName ())
535
+ log .V (1 ).Info (msg )
536
+ return []reconcile.Request {}
537
+ }
538
+
539
+ log .V (1 ).Info (fmt .Sprintf ("Reconcile event triggered by [Pod: %s] " , o .GetName ()))
540
+ namespacedName := types.NamespacedName {
541
+ Name : dspaName ,
542
+ Namespace : o .GetNamespace (),
543
+ }
544
+ reconcileRequests := append ([]reconcile.Request {}, reconcile.Request {NamespacedName : namespacedName })
545
+ return reconcileRequests
546
+ })).
406
547
// TODO: Add watcher for ui cluster rbac since it has no owner
407
548
Complete (r )
408
549
}
0 commit comments