Skip to content

Commit 58994c9

Browse files
khhiranialexmt
authored andcommitted
fix: Analysis argument validation (argoproj#1412)
* fix: Analysis argument validation Signed-off-by: khhirani <[email protected]>
1 parent 4022cb8 commit 58994c9

File tree

7 files changed

+315
-60
lines changed

7 files changed

+315
-60
lines changed

analysis/analysis.go

Lines changed: 58 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import (
77
"sync"
88
"time"
99

10+
"k8s.io/utils/pointer"
11+
1012
log "github.com/sirupsen/logrus"
1113
corev1 "k8s.io/api/core/v1"
1214
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -43,20 +45,31 @@ func (c *Controller) reconcileAnalysisRun(origRun *v1alpha1.AnalysisRun) *v1alph
4345

4446
if run.Status.MetricResults == nil {
4547
run.Status.MetricResults = make([]v1alpha1.MetricResult, 0)
46-
err := analysisutil.ValidateMetrics(run.Spec.Metrics)
47-
if err != nil {
48-
message := fmt.Sprintf("analysis spec invalid: %v", err)
49-
log.Warn(message)
50-
run.Status.Phase = v1alpha1.AnalysisPhaseError
51-
run.Status.Message = message
52-
c.recordAnalysisRunCompletionEvent(run)
53-
return run
54-
}
5548
}
5649

57-
tasks := generateMetricTasks(run)
50+
resolvedMetrics, err := getResolvedMetricsWithoutSecrets(run.Spec.Metrics, run.Spec.Args)
51+
if err != nil {
52+
message := fmt.Sprintf("unable to resolve metric arguments: %v", err)
53+
log.Warn(message)
54+
run.Status.Phase = v1alpha1.AnalysisPhaseError
55+
run.Status.Message = message
56+
c.recordAnalysisRunCompletionEvent(run)
57+
return run
58+
}
59+
60+
err = analysisutil.ValidateMetrics(resolvedMetrics)
61+
if err != nil {
62+
message := fmt.Sprintf("analysis spec invalid: %v", err)
63+
log.Warn(message)
64+
run.Status.Phase = v1alpha1.AnalysisPhaseError
65+
run.Status.Message = message
66+
c.recordAnalysisRunCompletionEvent(run)
67+
return run
68+
}
69+
70+
tasks := generateMetricTasks(run, resolvedMetrics)
5871
log.Infof("taking %d measurements", len(tasks))
59-
err := c.runMeasurements(run, tasks)
72+
err = c.runMeasurements(run, tasks)
6073
if err != nil {
6174
message := fmt.Sprintf("unable to resolve metric arguments: %v", err)
6275
log.Warn(message)
@@ -66,7 +79,7 @@ func (c *Controller) reconcileAnalysisRun(origRun *v1alpha1.AnalysisRun) *v1alph
6679
return run
6780
}
6881

69-
newStatus, newMessage := c.assessRunStatus(run)
82+
newStatus, newMessage := c.assessRunStatus(run, resolvedMetrics)
7083
if newStatus != run.Status.Phase {
7184
run.Status.Phase = newStatus
7285
run.Status.Message = newMessage
@@ -81,7 +94,7 @@ func (c *Controller) reconcileAnalysisRun(origRun *v1alpha1.AnalysisRun) *v1alph
8194
log.Warnf("Failed to garbage collect measurements: %v", err)
8295
}
8396

84-
nextReconcileTime := calculateNextReconcileTime(run)
97+
nextReconcileTime := calculateNextReconcileTime(run, resolvedMetrics)
8598
if nextReconcileTime != nil {
8699
enqueueSeconds := nextReconcileTime.Sub(time.Now())
87100
if enqueueSeconds < 0 {
@@ -93,6 +106,27 @@ func (c *Controller) reconcileAnalysisRun(origRun *v1alpha1.AnalysisRun) *v1alph
93106
return run
94107
}
95108

109+
func getResolvedMetricsWithoutSecrets(metrics []v1alpha1.Metric, args []v1alpha1.Argument) ([]v1alpha1.Metric, error) {
110+
newArgs := make([]v1alpha1.Argument, 0)
111+
for _, arg := range args {
112+
newArg := arg.DeepCopy()
113+
if newArg.ValueFrom != nil && newArg.ValueFrom.SecretKeyRef != nil {
114+
newArg.ValueFrom = nil
115+
newArg.Value = pointer.StringPtr("temp-for-secret")
116+
}
117+
newArgs = append(newArgs, *newArg)
118+
}
119+
resolvedMetrics := make([]v1alpha1.Metric, 0)
120+
for _, metric := range metrics {
121+
resolvedMetric, err := analysisutil.ResolveMetricArgs(metric, newArgs)
122+
if err != nil {
123+
return nil, err
124+
}
125+
resolvedMetrics = append(resolvedMetrics, *resolvedMetric)
126+
}
127+
return resolvedMetrics, nil
128+
}
129+
96130
func (c *Controller) recordAnalysisRunCompletionEvent(run *v1alpha1.AnalysisRun) {
97131
eventType := corev1.EventTypeNormal
98132
switch run.Status.Phase {
@@ -106,11 +140,12 @@ func (c *Controller) recordAnalysisRunCompletionEvent(run *v1alpha1.AnalysisRun)
106140
// sync, based on the last completion times that metric was measured (if ever). If the run is
107141
// terminating (e.g. due to manual termination or failing metric), will not schedule further
108142
// measurements other than to resume any in-flight measurements.
109-
func generateMetricTasks(run *v1alpha1.AnalysisRun) []metricTask {
143+
func generateMetricTasks(run *v1alpha1.AnalysisRun, metrics []v1alpha1.Metric) []metricTask {
110144
log := logutil.WithAnalysisRun(run)
111145
var tasks []metricTask
112146
terminating := analysisutil.IsTerminating(run)
113-
for _, metric := range run.Spec.Metrics {
147+
148+
for i, metric := range metrics {
114149
if analysisutil.MetricCompleted(run, metric.Name) {
115150
continue
116151
}
@@ -124,7 +159,7 @@ func generateMetricTasks(run *v1alpha1.AnalysisRun) []metricTask {
124159
// last measurement is still in-progress. need to complete it
125160
logCtx.Infof("resuming in-progress measurement")
126161
tasks = append(tasks, metricTask{
127-
metric: metric,
162+
metric: run.Spec.Metrics[i],
128163
incompleteMeasurement: lastMeasurement,
129164
})
130165
continue
@@ -149,7 +184,7 @@ func generateMetricTasks(run *v1alpha1.AnalysisRun) []metricTask {
149184
}
150185
}
151186
// measurement never taken
152-
tasks = append(tasks, metricTask{metric: metric})
187+
tasks = append(tasks, metricTask{metric: run.Spec.Metrics[i]})
153188
logCtx.Infof("running initial measurement")
154189
continue
155190
}
@@ -174,7 +209,7 @@ func generateMetricTasks(run *v1alpha1.AnalysisRun) []metricTask {
174209
interval = metricInterval
175210
}
176211
if time.Now().After(lastMeasurement.FinishedAt.Add(interval)) {
177-
tasks = append(tasks, metricTask{metric: metric})
212+
tasks = append(tasks, metricTask{metric: run.Spec.Metrics[i]})
178213
logCtx.Infof("running overdue measurement")
179214
continue
180215
}
@@ -238,7 +273,7 @@ func (c *Controller) runMeasurements(run *v1alpha1.AnalysisRun, tasks []metricTa
238273
var resultsLock sync.Mutex
239274
terminating := analysisutil.IsTerminating(run)
240275

241-
// resolve args for metricTasks
276+
// resolve args for metric tasks
242277
// get list of secret values for log redaction
243278
tasks, secrets, err := c.resolveArgs(tasks, run.Spec.Args, run.Namespace)
244279
if err != nil {
@@ -345,7 +380,7 @@ func (c *Controller) runMeasurements(run *v1alpha1.AnalysisRun, tasks []metricTa
345380
// assessRunStatus assesses the overall status of this AnalysisRun
346381
// If any metric is not yet completed, the AnalysisRun is still considered Running
347382
// Once all metrics are complete, the worst status is used as the overall AnalysisRun status
348-
func (c *Controller) assessRunStatus(run *v1alpha1.AnalysisRun) (v1alpha1.AnalysisPhase, string) {
383+
func (c *Controller) assessRunStatus(run *v1alpha1.AnalysisRun, metrics []v1alpha1.Metric) (v1alpha1.AnalysisPhase, string) {
349384
var worstStatus v1alpha1.AnalysisPhase
350385
var worstMessage string
351386
terminating := analysisutil.IsTerminating(run)
@@ -360,7 +395,7 @@ func (c *Controller) assessRunStatus(run *v1alpha1.AnalysisRun) (v1alpha1.Analys
360395
}
361396

362397
// Iterate all metrics and update MetricResult.Phase fields based on latest measurement(s)
363-
for _, metric := range run.Spec.Metrics {
398+
for _, metric := range metrics {
364399
if result := analysisutil.GetResult(run, metric.Name); result != nil {
365400
log := logutil.WithAnalysisRun(run).WithField("metric", metric.Name)
366401
metricStatus := assessMetricStatus(metric, *result, terminating)
@@ -497,9 +532,9 @@ func assessMetricFailureInconclusiveOrError(metric v1alpha1.Metric, result v1alp
497532

498533
// calculateNextReconcileTime calculates the next time that this AnalysisRun should be reconciled,
499534
// based on the earliest time of all metrics intervals, counts, and their finishedAt timestamps
500-
func calculateNextReconcileTime(run *v1alpha1.AnalysisRun) *time.Time {
535+
func calculateNextReconcileTime(run *v1alpha1.AnalysisRun, metrics []v1alpha1.Metric) *time.Time {
501536
var reconcileTime *time.Time
502-
for _, metric := range run.Spec.Metrics {
537+
for _, metric := range metrics {
503538
if analysisutil.MetricCompleted(run, metric.Name) {
504539
// NOTE: this also covers the case where metric.Count is reached
505540
continue

0 commit comments

Comments
 (0)