7
7
"sync"
8
8
"time"
9
9
10
+ "k8s.io/utils/pointer"
11
+
10
12
log "github.com/sirupsen/logrus"
11
13
corev1 "k8s.io/api/core/v1"
12
14
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -43,20 +45,31 @@ func (c *Controller) reconcileAnalysisRun(origRun *v1alpha1.AnalysisRun) *v1alph
43
45
44
46
if run .Status .MetricResults == nil {
45
47
run .Status .MetricResults = make ([]v1alpha1.MetricResult , 0 )
46
- err := analysisutil .ValidateMetrics (run .Spec .Metrics )
47
- if err != nil {
48
- message := fmt .Sprintf ("analysis spec invalid: %v" , err )
49
- log .Warn (message )
50
- run .Status .Phase = v1alpha1 .AnalysisPhaseError
51
- run .Status .Message = message
52
- c .recordAnalysisRunCompletionEvent (run )
53
- return run
54
- }
55
48
}
56
49
57
- tasks := generateMetricTasks (run )
50
+ resolvedMetrics , err := getResolvedMetricsWithoutSecrets (run .Spec .Metrics , run .Spec .Args )
51
+ if err != nil {
52
+ message := fmt .Sprintf ("unable to resolve metric arguments: %v" , err )
53
+ log .Warn (message )
54
+ run .Status .Phase = v1alpha1 .AnalysisPhaseError
55
+ run .Status .Message = message
56
+ c .recordAnalysisRunCompletionEvent (run )
57
+ return run
58
+ }
59
+
60
+ err = analysisutil .ValidateMetrics (resolvedMetrics )
61
+ if err != nil {
62
+ message := fmt .Sprintf ("analysis spec invalid: %v" , err )
63
+ log .Warn (message )
64
+ run .Status .Phase = v1alpha1 .AnalysisPhaseError
65
+ run .Status .Message = message
66
+ c .recordAnalysisRunCompletionEvent (run )
67
+ return run
68
+ }
69
+
70
+ tasks := generateMetricTasks (run , resolvedMetrics )
58
71
log .Infof ("taking %d measurements" , len (tasks ))
59
- err : = c .runMeasurements (run , tasks )
72
+ err = c .runMeasurements (run , tasks )
60
73
if err != nil {
61
74
message := fmt .Sprintf ("unable to resolve metric arguments: %v" , err )
62
75
log .Warn (message )
@@ -66,7 +79,7 @@ func (c *Controller) reconcileAnalysisRun(origRun *v1alpha1.AnalysisRun) *v1alph
66
79
return run
67
80
}
68
81
69
- newStatus , newMessage := c .assessRunStatus (run )
82
+ newStatus , newMessage := c .assessRunStatus (run , resolvedMetrics )
70
83
if newStatus != run .Status .Phase {
71
84
run .Status .Phase = newStatus
72
85
run .Status .Message = newMessage
@@ -81,7 +94,7 @@ func (c *Controller) reconcileAnalysisRun(origRun *v1alpha1.AnalysisRun) *v1alph
81
94
log .Warnf ("Failed to garbage collect measurements: %v" , err )
82
95
}
83
96
84
- nextReconcileTime := calculateNextReconcileTime (run )
97
+ nextReconcileTime := calculateNextReconcileTime (run , resolvedMetrics )
85
98
if nextReconcileTime != nil {
86
99
enqueueSeconds := nextReconcileTime .Sub (time .Now ())
87
100
if enqueueSeconds < 0 {
@@ -93,6 +106,27 @@ func (c *Controller) reconcileAnalysisRun(origRun *v1alpha1.AnalysisRun) *v1alph
93
106
return run
94
107
}
95
108
109
+ func getResolvedMetricsWithoutSecrets (metrics []v1alpha1.Metric , args []v1alpha1.Argument ) ([]v1alpha1.Metric , error ) {
110
+ newArgs := make ([]v1alpha1.Argument , 0 )
111
+ for _ , arg := range args {
112
+ newArg := arg .DeepCopy ()
113
+ if newArg .ValueFrom != nil && newArg .ValueFrom .SecretKeyRef != nil {
114
+ newArg .ValueFrom = nil
115
+ newArg .Value = pointer .StringPtr ("temp-for-secret" )
116
+ }
117
+ newArgs = append (newArgs , * newArg )
118
+ }
119
+ resolvedMetrics := make ([]v1alpha1.Metric , 0 )
120
+ for _ , metric := range metrics {
121
+ resolvedMetric , err := analysisutil .ResolveMetricArgs (metric , newArgs )
122
+ if err != nil {
123
+ return nil , err
124
+ }
125
+ resolvedMetrics = append (resolvedMetrics , * resolvedMetric )
126
+ }
127
+ return resolvedMetrics , nil
128
+ }
129
+
96
130
func (c * Controller ) recordAnalysisRunCompletionEvent (run * v1alpha1.AnalysisRun ) {
97
131
eventType := corev1 .EventTypeNormal
98
132
switch run .Status .Phase {
@@ -106,11 +140,12 @@ func (c *Controller) recordAnalysisRunCompletionEvent(run *v1alpha1.AnalysisRun)
106
140
// sync, based on the last completion times that metric was measured (if ever). If the run is
107
141
// terminating (e.g. due to manual termination or failing metric), will not schedule further
108
142
// measurements other than to resume any in-flight measurements.
109
- func generateMetricTasks (run * v1alpha1.AnalysisRun ) []metricTask {
143
+ func generateMetricTasks (run * v1alpha1.AnalysisRun , metrics []v1alpha1. Metric ) []metricTask {
110
144
log := logutil .WithAnalysisRun (run )
111
145
var tasks []metricTask
112
146
terminating := analysisutil .IsTerminating (run )
113
- for _ , metric := range run .Spec .Metrics {
147
+
148
+ for i , metric := range metrics {
114
149
if analysisutil .MetricCompleted (run , metric .Name ) {
115
150
continue
116
151
}
@@ -124,7 +159,7 @@ func generateMetricTasks(run *v1alpha1.AnalysisRun) []metricTask {
124
159
// last measurement is still in-progress. need to complete it
125
160
logCtx .Infof ("resuming in-progress measurement" )
126
161
tasks = append (tasks , metricTask {
127
- metric : metric ,
162
+ metric : run . Spec . Metrics [ i ] ,
128
163
incompleteMeasurement : lastMeasurement ,
129
164
})
130
165
continue
@@ -149,7 +184,7 @@ func generateMetricTasks(run *v1alpha1.AnalysisRun) []metricTask {
149
184
}
150
185
}
151
186
// measurement never taken
152
- tasks = append (tasks , metricTask {metric : metric })
187
+ tasks = append (tasks , metricTask {metric : run . Spec . Metrics [ i ] })
153
188
logCtx .Infof ("running initial measurement" )
154
189
continue
155
190
}
@@ -174,7 +209,7 @@ func generateMetricTasks(run *v1alpha1.AnalysisRun) []metricTask {
174
209
interval = metricInterval
175
210
}
176
211
if time .Now ().After (lastMeasurement .FinishedAt .Add (interval )) {
177
- tasks = append (tasks , metricTask {metric : metric })
212
+ tasks = append (tasks , metricTask {metric : run . Spec . Metrics [ i ] })
178
213
logCtx .Infof ("running overdue measurement" )
179
214
continue
180
215
}
@@ -238,7 +273,7 @@ func (c *Controller) runMeasurements(run *v1alpha1.AnalysisRun, tasks []metricTa
238
273
var resultsLock sync.Mutex
239
274
terminating := analysisutil .IsTerminating (run )
240
275
241
- // resolve args for metricTasks
276
+ // resolve args for metric tasks
242
277
// get list of secret values for log redaction
243
278
tasks , secrets , err := c .resolveArgs (tasks , run .Spec .Args , run .Namespace )
244
279
if err != nil {
@@ -345,7 +380,7 @@ func (c *Controller) runMeasurements(run *v1alpha1.AnalysisRun, tasks []metricTa
345
380
// assessRunStatus assesses the overall status of this AnalysisRun
346
381
// If any metric is not yet completed, the AnalysisRun is still considered Running
347
382
// Once all metrics are complete, the worst status is used as the overall AnalysisRun status
348
- func (c * Controller ) assessRunStatus (run * v1alpha1.AnalysisRun ) (v1alpha1.AnalysisPhase , string ) {
383
+ func (c * Controller ) assessRunStatus (run * v1alpha1.AnalysisRun , metrics []v1alpha1. Metric ) (v1alpha1.AnalysisPhase , string ) {
349
384
var worstStatus v1alpha1.AnalysisPhase
350
385
var worstMessage string
351
386
terminating := analysisutil .IsTerminating (run )
@@ -360,7 +395,7 @@ func (c *Controller) assessRunStatus(run *v1alpha1.AnalysisRun) (v1alpha1.Analys
360
395
}
361
396
362
397
// Iterate all metrics and update MetricResult.Phase fields based on latest measurement(s)
363
- for _ , metric := range run . Spec . Metrics {
398
+ for _ , metric := range metrics {
364
399
if result := analysisutil .GetResult (run , metric .Name ); result != nil {
365
400
log := logutil .WithAnalysisRun (run ).WithField ("metric" , metric .Name )
366
401
metricStatus := assessMetricStatus (metric , * result , terminating )
@@ -396,6 +431,14 @@ func (c *Controller) assessRunStatus(run *v1alpha1.AnalysisRun) (v1alpha1.Analys
396
431
}
397
432
}
398
433
}
434
+ } else {
435
+ // metric hasn't started running. possible cases where some of the metrics starts with delay
436
+ everythingCompleted = false
437
+ if terminating {
438
+ // we have yet to take a single measurement, but have already been instructed to stop
439
+ log .Infof ("metric assessed %s: run terminated" , v1alpha1 .AnalysisPhaseSuccessful )
440
+ return v1alpha1 .AnalysisPhaseSuccessful , worstMessage
441
+ }
399
442
}
400
443
}
401
444
if ! everythingCompleted {
@@ -489,9 +532,9 @@ func assessMetricFailureInconclusiveOrError(metric v1alpha1.Metric, result v1alp
489
532
490
533
// calculateNextReconcileTime calculates the next time that this AnalysisRun should be reconciled,
491
534
// based on the earliest time of all metrics intervals, counts, and their finishedAt timestamps
492
- func calculateNextReconcileTime (run * v1alpha1.AnalysisRun ) * time.Time {
535
+ func calculateNextReconcileTime (run * v1alpha1.AnalysisRun , metrics []v1alpha1. Metric ) * time.Time {
493
536
var reconcileTime * time.Time
494
- for _ , metric := range run . Spec . Metrics {
537
+ for _ , metric := range metrics {
495
538
if analysisutil .MetricCompleted (run , metric .Name ) {
496
539
// NOTE: this also covers the case where metric.Count is reached
497
540
continue
0 commit comments