Skip to content

Commit f093118

Browse files
juliev0dpadhiar
andauthored
feat: Gracefully recover when AnalysisRuns have errors (#968)
Signed-off-by: Julie Vogelman <julie_vogelman@intuit.com> Co-authored-by: Dillen Padhiar <38965141+dpadhiar@users.noreply.github.com>
1 parent b22007e commit f093118

File tree

6 files changed

+245
-26
lines changed

6 files changed

+245
-26
lines changed

internal/controller/monovertexrollout/monovertexrollout_progressive.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,8 @@ func (r *MonoVertexRolloutReconciler) AssessUpgradingChild(
111111
// Success window passed, launch AnalysisRuns or declare success
112112
childStatus.BasicAssessmentEndTime = &metav1.Time{Time: currentTime}
113113
childStatus.BasicAssessmentResult = apiv1.AssessmentResultSuccess
114-
return r.checkAnalysisTemplates(ctx, mvtxRollout, existingUpgradingChildDef)
114+
assessment, err := r.checkAnalysisTemplates(ctx, mvtxRollout, existingUpgradingChildDef)
115+
return assessment, "", err
115116
}
116117

117118
numaLogger.Debugf("Assessment succeeded for upgrading child %s, but success window has not passed yet", existingUpgradingChildDef.GetName())
@@ -122,7 +123,8 @@ func (r *MonoVertexRolloutReconciler) AssessUpgradingChild(
122123
if childStatus.BasicAssessmentResult == apiv1.AssessmentResultSuccess {
123124
childStatus.ChildStatus.Raw = nil
124125
childStatus.FailureReasons = nil
125-
return r.checkAnalysisTemplates(ctx, mvtxRollout, existingUpgradingChildDef)
126+
assessment, err := r.checkAnalysisTemplates(ctx, mvtxRollout, existingUpgradingChildDef)
127+
return assessment, "", err
126128
}
127129
return childStatus.BasicAssessmentResult, "Basic assessment failed", nil
128130
}
@@ -134,7 +136,7 @@ func (r *MonoVertexRolloutReconciler) AssessUpgradingChild(
134136
// otherwise it returns success.
135137
func (r *MonoVertexRolloutReconciler) checkAnalysisTemplates(ctx context.Context,
136138
mvtxRollout *apiv1.MonoVertexRollout,
137-
existingUpgradingChildDef *unstructured.Unstructured) (apiv1.AssessmentResult, string, error) {
139+
existingUpgradingChildDef *unstructured.Unstructured) (apiv1.AssessmentResult, error) {
138140

139141
numaLogger := logger.FromContext(ctx)
140142
analysis := mvtxRollout.GetAnalysis()
@@ -144,11 +146,15 @@ func (r *MonoVertexRolloutReconciler) checkAnalysisTemplates(ctx context.Context
144146
numaLogger.Debugf("Performing analysis for upgrading child %s", existingUpgradingChildDef.GetName())
145147
analysisStatus, err := progressive.PerformAnalysis(ctx, existingUpgradingChildDef, mvtxRollout, mvtxRollout.GetAnalysis(), mvtxRollout.GetAnalysisStatus(), r.client)
146148
if err != nil {
147-
return apiv1.AssessmentResultUnknown, "", err
149+
return apiv1.AssessmentResultUnknown, err
150+
}
151+
assessment, err := progressive.AssessAnalysisStatus(ctx, existingUpgradingChildDef, analysisStatus)
152+
if err != nil {
153+
return apiv1.AssessmentResultUnknown, err
148154
}
149-
return progressive.AssessAnalysisStatus(ctx, existingUpgradingChildDef, analysisStatus)
155+
return assessment, nil
150156
}
151-
return apiv1.AssessmentResultSuccess, "", nil
157+
return apiv1.AssessmentResultSuccess, nil
152158
}
153159

154160
// CheckForDifferences checks to see if the monovertex definition matches the spec and the required metadata

internal/controller/pipelinerollout/pipelinerollout_progressive.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,8 @@ func (r *PipelineRolloutReconciler) AssessUpgradingChild(
166166
// Success window passed, launch AnalysisRun or declared success
167167
childStatus.BasicAssessmentEndTime = &metav1.Time{Time: currentTime}
168168
childStatus.BasicAssessmentResult = apiv1.AssessmentResultSuccess
169-
return r.checkAnalysisTemplates(ctx, pipelineRollout, existingUpgradingChildDef)
169+
assessment, err := r.checkAnalysisTemplates(ctx, pipelineRollout, existingUpgradingChildDef)
170+
return assessment, "", err
170171
}
171172

172173
numaLogger.Debugf("Assessment succeeded for upgrading child %s, but success window has not passed yet", existingUpgradingChildDef.GetName())
@@ -177,7 +178,8 @@ func (r *PipelineRolloutReconciler) AssessUpgradingChild(
177178
if childStatus.BasicAssessmentResult == apiv1.AssessmentResultSuccess {
178179
childStatus.ChildStatus.Raw = nil
179180
childStatus.FailureReasons = nil
180-
return r.checkAnalysisTemplates(ctx, pipelineRollout, existingUpgradingChildDef)
181+
assessment, err := r.checkAnalysisTemplates(ctx, pipelineRollout, existingUpgradingChildDef)
182+
return assessment, "", err
181183
}
182184
return childStatus.BasicAssessmentResult, "Basic assessment failed", nil
183185
}
@@ -188,14 +190,14 @@ func (r *PipelineRolloutReconciler) AssessUpgradingChild(
188190
// checkAnalysisTemplates checks if there are any analysis templates to run and runs them if so.
189191
func (r *PipelineRolloutReconciler) checkAnalysisTemplates(ctx context.Context,
190192
pipelineRollout *apiv1.PipelineRollout,
191-
existingUpgradingChildDef *unstructured.Unstructured) (apiv1.AssessmentResult, string, error) {
193+
existingUpgradingChildDef *unstructured.Unstructured) (apiv1.AssessmentResult, error) {
192194

193195
numaLogger := logger.FromContext(ctx)
194196
analysis := pipelineRollout.GetAnalysis()
195197

196198
globalConfig, err := config.GetConfigManagerInstance().GetConfig()
197199
if err != nil {
198-
return apiv1.AssessmentResultUnknown, "", fmt.Errorf("error getting the global config: %v", err)
200+
return apiv1.AssessmentResultUnknown, fmt.Errorf("error getting the global config: %v", err)
199201
}
200202

201203
// only check for and create AnalysisRun if templates are specified
@@ -204,11 +206,15 @@ func (r *PipelineRolloutReconciler) checkAnalysisTemplates(ctx context.Context,
204206
numaLogger.Debugf("Performing analysis for upgrading child %s", existingUpgradingChildDef.GetName())
205207
analysisStatus, err := progressive.PerformAnalysis(ctx, existingUpgradingChildDef, pipelineRollout, pipelineRollout.GetAnalysis(), pipelineRollout.GetAnalysisStatus(), r.client)
206208
if err != nil {
207-
return apiv1.AssessmentResultUnknown, "", err
209+
return apiv1.AssessmentResultUnknown, err
210+
}
211+
assessment, err := progressive.AssessAnalysisStatus(ctx, existingUpgradingChildDef, analysisStatus)
212+
if err != nil {
213+
return apiv1.AssessmentResultUnknown, err
208214
}
209-
return progressive.AssessAnalysisStatus(ctx, existingUpgradingChildDef, analysisStatus)
215+
return assessment, nil
210216
}
211-
return apiv1.AssessmentResultSuccess, "", nil
217+
return apiv1.AssessmentResultSuccess, nil
212218
}
213219

214220
// CheckForDifferences checks to see if the pipeline definition matches the spec and the required metadata

internal/controller/progressive/analysis.go

Lines changed: 61 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,44 @@ func GetAnalysisTemplatesFromRefs(ctx context.Context, templateRefs *[]argorollo
8989
return uniqueTemplates, uniqueClusterTemplates, nil
9090
}
9191

92+
// dedupMetrics returns a copy of the given metrics slice with duplicates removed by metric name.
93+
// The first occurrence of each name is kept.
94+
func dedupMetrics(metrics []argorolloutsv1.Metric) []argorolloutsv1.Metric {
95+
seen := make(map[string]bool)
96+
out := make([]argorolloutsv1.Metric, 0, len(metrics))
97+
for _, m := range metrics {
98+
if seen[m.Name] {
99+
continue
100+
}
101+
seen[m.Name] = true
102+
out = append(out, m)
103+
}
104+
return out
105+
}
106+
107+
// dedupMetricsForAnalysisTemplates removes duplicate metrics from the given AnalysisTemplates and
108+
// ClusterAnalysisTemplates. When templates are merged (e.g. from nested refs), the same metric may
109+
// appear multiple times; this function ensures each metric appears at most once per template.
110+
// It returns the same template slices with metrics deduplicated, or an error if processing fails.
111+
func dedupMetricsForAnalysisTemplates(ctx context.Context, templates []*argorolloutsv1.AnalysisTemplate, clusterTemplates []*argorolloutsv1.ClusterAnalysisTemplate) ([]*argorolloutsv1.AnalysisTemplate, []*argorolloutsv1.ClusterAnalysisTemplate, error) {
112+
numaLogger := logger.FromContext(ctx)
113+
for index, t := range templates {
114+
deduped := dedupMetrics(t.Spec.Metrics)
115+
if len(deduped) < len(t.Spec.Metrics) {
116+
numaLogger.Warnf("AnalysisTemplate %s had duplicate metrics; using first occurrence only", t.Name)
117+
}
118+
templates[index].Spec.Metrics = deduped
119+
}
120+
for index, t := range clusterTemplates {
121+
deduped := dedupMetrics(t.Spec.Metrics)
122+
if len(deduped) < len(t.Spec.Metrics) {
123+
numaLogger.Warnf("ClusterAnalysisTemplate %s had duplicate metrics; using first occurrence only", t.Name)
124+
}
125+
clusterTemplates[index].Spec.Metrics = deduped
126+
}
127+
return templates, clusterTemplates, nil
128+
}
129+
92130
/*
93131
CreateAnalysisRun finds all templates specified in the Analysis field in the spec of a rollout and creates the resulting AnalysisRun in k8s.
94132
@@ -115,6 +153,12 @@ func CreateAnalysisRun(ctx context.Context, analysis apiv1.Analysis, existingUpg
115153
return err
116154
}
117155

156+
// temporary code to take care of an issue in which incoming AnalysisTemplates have a duplicate metric
157+
analysisTemplates, clusterAnalysisTemplates, err = dedupMetricsForAnalysisTemplates(ctx, analysisTemplates, clusterAnalysisTemplates)
158+
if err != nil {
159+
return err
160+
}
161+
118162
// set special arguments for child name and namespace
119163
childName := existingUpgradingChildDef.GetName()
120164
childNamespace := existingUpgradingChildDef.GetNamespace()
@@ -206,33 +250,41 @@ func PerformAnalysis(
206250
func AssessAnalysisStatus(
207251
ctx context.Context,
208252
existingUpgradingChildDef *unstructured.Unstructured,
209-
analysisStatus *apiv1.AnalysisStatus) (apiv1.AssessmentResult, string, error) {
253+
analysisStatus *apiv1.AnalysisStatus) (apiv1.AssessmentResult, error) {
210254
numaLogger := logger.FromContext(ctx)
211255

212256
// make sure we haven't gone past the max time allowed for an AnalysisRun
213257
analysisRunTimeout, err := getAnalysisRunTimeout(ctx)
214258
if err != nil {
215-
return apiv1.AssessmentResultUnknown, "", err
259+
return apiv1.AssessmentResultUnknown, err
216260
}
217261

218-
// if analysisStatus is set with an AnalysisRun's name, we must also check that it is in a Completed phase to declare success
262+
// if analysisStatus is set, we must also check that it is in a Completed phase to declare success
219263
if analysisStatus != nil && analysisStatus.AnalysisRunName != "" {
220264
numaLogger.WithValues("namespace", existingUpgradingChildDef.GetNamespace(), "name", existingUpgradingChildDef.GetName()).
221265
Debugf("AnalysisRun %s is in phase %s", analysisStatus.AnalysisRunName, analysisStatus.Phase)
222266
switch analysisStatus.Phase {
223267
case argorolloutsv1.AnalysisPhaseSuccessful:
224-
return apiv1.AssessmentResultSuccess, "", nil
225-
case argorolloutsv1.AnalysisPhaseError, argorolloutsv1.AnalysisPhaseFailed, argorolloutsv1.AnalysisPhaseInconclusive:
226-
return apiv1.AssessmentResultFailure, fmt.Sprintf("AnalysisRun %s is in phase %s", analysisStatus.AnalysisRunName, analysisStatus.Phase), nil
268+
return apiv1.AssessmentResultSuccess, nil
269+
case argorolloutsv1.AnalysisPhaseFailed:
270+
return apiv1.AssessmentResultFailure, nil
271+
case argorolloutsv1.AnalysisPhaseError, argorolloutsv1.AnalysisPhaseInconclusive:
272+
// Decision not to consider this a failure since it's either a misconfiguration of the AnalysisTemplate or unavailable Provider, etc
273+
numaLogger.WithValues("namespace", existingUpgradingChildDef.GetNamespace(), "name", existingUpgradingChildDef.GetName()).
274+
Warnf("AnalysisRun %s is in phase %s", analysisStatus.AnalysisRunName, analysisStatus.Phase)
275+
return apiv1.AssessmentResultSuccess, nil
227276
default:
228277
// if analysisRun is not completed yet, we check if it has exceeded the analysisRunTimeout
278+
// Decision not to consider this a failure since it's either a misconfiguration of the AnalysisTemplate or Argo Rollouts unavailable, etc
229279
if time.Since(analysisStatus.StartTime.Time) >= analysisRunTimeout {
230-
return apiv1.AssessmentResultFailure, fmt.Sprintf("AnalysisRun %s in phase %s has exceeded the analysisRunTimeout", analysisStatus.AnalysisRunName, analysisStatus.Phase), nil
280+
numaLogger.WithValues("namespace", existingUpgradingChildDef.GetNamespace(), "name", existingUpgradingChildDef.GetName()).
281+
Warnf("AnalysisRun %s has exceeded the analysisRunTimeout", analysisStatus.AnalysisRunName)
282+
return apiv1.AssessmentResultSuccess, nil
231283
}
232-
return apiv1.AssessmentResultUnknown, "", nil
284+
return apiv1.AssessmentResultUnknown, nil
233285
}
234286
}
235287

236288
// no AnalysisRun so by default we can mark this successful
237-
return apiv1.AssessmentResultSuccess, "", nil
289+
return apiv1.AssessmentResultSuccess, nil
238290
}
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
/*
2+
Copyright 2023.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package progressive
18+
19+
import (
20+
"context"
21+
"testing"
22+
23+
argorolloutsv1 "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
24+
"github.com/stretchr/testify/assert"
25+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26+
)
27+
28+
func metric(name string) argorolloutsv1.Metric {
29+
return argorolloutsv1.Metric{
30+
Name: name,
31+
Provider: argorolloutsv1.MetricProvider{
32+
Prometheus: &argorolloutsv1.PrometheusMetric{
33+
Query: "up",
34+
},
35+
},
36+
SuccessCondition: "true",
37+
}
38+
}
39+
40+
func metricNames(metrics []argorolloutsv1.Metric) []string {
41+
names := make([]string, len(metrics))
42+
for i := range metrics {
43+
names[i] = metrics[i].Name
44+
}
45+
return names
46+
}
47+
48+
func TestDedupMetricsForAnalysisTemplates(t *testing.T) {
49+
ctx := context.Background()
50+
51+
testCases := []struct {
52+
name string
53+
templates []*argorolloutsv1.AnalysisTemplate
54+
clusterTemplates []*argorolloutsv1.ClusterAnalysisTemplate
55+
expectedTemplateMetrics map[string][]string // template name -> expected metric names in order
56+
expectedClusterTemplateMetrics map[string][]string // cluster template name -> expected metric names in order
57+
}{
58+
{
59+
name: "nil and empty slices",
60+
templates: nil,
61+
clusterTemplates: nil,
62+
expectedTemplateMetrics: nil,
63+
expectedClusterTemplateMetrics: nil,
64+
},
65+
{
66+
name: "empty templates no change",
67+
templates: []*argorolloutsv1.AnalysisTemplate{},
68+
clusterTemplates: []*argorolloutsv1.ClusterAnalysisTemplate{},
69+
expectedTemplateMetrics: map[string][]string{},
70+
expectedClusterTemplateMetrics: map[string][]string{},
71+
},
72+
{
73+
name: "AnalysisTemplate with no duplicates unchanged",
74+
templates: []*argorolloutsv1.AnalysisTemplate{{
75+
ObjectMeta: metav1.ObjectMeta{Name: "t1", Namespace: "ns"},
76+
Spec: argorolloutsv1.AnalysisTemplateSpec{
77+
Metrics: []argorolloutsv1.Metric{metric("a"), metric("b")},
78+
},
79+
}},
80+
clusterTemplates: nil,
81+
expectedTemplateMetrics: map[string][]string{"t1": {"a", "b"}},
82+
expectedClusterTemplateMetrics: nil,
83+
},
84+
{
85+
name: "AnalysisTemplate with duplicate metric names",
86+
templates: []*argorolloutsv1.AnalysisTemplate{{
87+
ObjectMeta: metav1.ObjectMeta{Name: "at"},
88+
Spec: argorolloutsv1.AnalysisTemplateSpec{
89+
Metrics: []argorolloutsv1.Metric{metric("m1"), metric("m1"), metric("m2")},
90+
},
91+
}},
92+
clusterTemplates: nil,
93+
expectedTemplateMetrics: map[string][]string{"at": {"m1", "m2"}},
94+
expectedClusterTemplateMetrics: nil,
95+
},
96+
{
97+
name: "ClusterAnalysisTemplate with duplicate metric names",
98+
templates: nil,
99+
clusterTemplates: []*argorolloutsv1.ClusterAnalysisTemplate{{
100+
ObjectMeta: metav1.ObjectMeta{Name: "ct1"},
101+
Spec: argorolloutsv1.AnalysisTemplateSpec{
102+
Metrics: []argorolloutsv1.Metric{metric("cluster-metric"), metric("cluster-metric")},
103+
},
104+
}},
105+
expectedTemplateMetrics: nil,
106+
expectedClusterTemplateMetrics: map[string][]string{"ct1": {"cluster-metric"}},
107+
},
108+
{
109+
name: "both template types with duplicate metric names",
110+
templates: []*argorolloutsv1.AnalysisTemplate{{
111+
ObjectMeta: metav1.ObjectMeta{Name: "at"},
112+
Spec: argorolloutsv1.AnalysisTemplateSpec{
113+
Metrics: []argorolloutsv1.Metric{metric("m1"), metric("m1"), metric("m2")},
114+
},
115+
}},
116+
clusterTemplates: []*argorolloutsv1.ClusterAnalysisTemplate{{
117+
ObjectMeta: metav1.ObjectMeta{Name: "ct1"},
118+
Spec: argorolloutsv1.AnalysisTemplateSpec{
119+
Metrics: []argorolloutsv1.Metric{metric("cluster-metric"), metric("cluster-metric")},
120+
},
121+
}},
122+
expectedTemplateMetrics: map[string][]string{"at": {"m1", "m2"}},
123+
expectedClusterTemplateMetrics: map[string][]string{"ct1": {"cluster-metric"}},
124+
},
125+
}
126+
127+
for _, tc := range testCases {
128+
t.Run(tc.name, func(t *testing.T) {
129+
gotT, gotC, err := dedupMetricsForAnalysisTemplates(ctx, tc.templates, tc.clusterTemplates)
130+
assert.NoError(t, err)
131+
132+
if tc.expectedTemplateMetrics == nil {
133+
assert.Nil(t, gotT)
134+
} else {
135+
assert.Len(t, gotT, len(tc.expectedTemplateMetrics))
136+
for _, tpl := range gotT {
137+
wantNames, ok := tc.expectedTemplateMetrics[tpl.Name]
138+
assert.True(t, ok, "unexpected AnalysisTemplate name %q", tpl.Name)
139+
assert.Equal(t, wantNames, metricNames(tpl.Spec.Metrics), "AnalysisTemplate %q", tpl.Name)
140+
}
141+
}
142+
143+
if tc.expectedClusterTemplateMetrics == nil {
144+
assert.Nil(t, gotC)
145+
} else {
146+
assert.Len(t, gotC, len(tc.expectedClusterTemplateMetrics))
147+
for _, tpl := range gotC {
148+
wantNames, ok := tc.expectedClusterTemplateMetrics[tpl.Name]
149+
assert.True(t, ok, "unexpected ClusterAnalysisTemplate name %q", tpl.Name)
150+
assert.Equal(t, wantNames, metricNames(tpl.Spec.Metrics), "ClusterAnalysisTemplate %q", tpl.Name)
151+
}
152+
}
153+
})
154+
}
155+
}

tests/e2e/progressive-analysis-monovertex-e2e/analysis_monovertex_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ OR
130130
)`,
131131
},
132132
},
133-
SuccessCondition: "result[0] > 0",
133+
SuccessCondition: "len(result) > 0 && result[0] > 0",
134134
},
135135
},
136136
Args: []argov1alpha1.Argument{
@@ -204,7 +204,7 @@ var _ = Describe("Progressive MonoVertex E2E", Serial, func() {
204204
VerifyMonoVertexProgressiveFailure(monoVertexRolloutName, monoVertexScaleMinMaxJSONString, updatedMonoVertexSpec, monoVertexScaleTo, false)
205205

206206
// Verify the AnalysisRun status is Failed
207-
VerifyAnalysisRunStatus("mvtx-example-1", GetInstanceName(analysisRunName, 1), argov1alpha1.AnalysisPhaseError)
207+
VerifyAnalysisRunStatus("mvtx-example-1", GetInstanceName(analysisRunName, 1), argov1alpha1.AnalysisPhaseFailed)
208208
VerifyAnalysisRunStatus("mvtx-example-2", GetInstanceName(analysisRunName, 1), argov1alpha1.AnalysisPhaseSuccessful)
209209

210210
DeleteMonoVertexRollout(monoVertexRolloutName)

0 commit comments

Comments
 (0)