Skip to content

Commit 0c559a0

Browse files
authored
feat: ability to auto-create Services for each template in an Experiment (argoproj#1158)
* feat: Modify Experiment Controller to auto-create service for each Experiment Template Signed-off-by: khhirani <[email protected]>
1 parent 68cbef9 commit 0c559a0

29 files changed

+1788
-443
lines changed

controller/controller.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ func NewManager(
199199
AnalysisRunInformer: analysisRunInformer,
200200
AnalysisTemplateInformer: analysisTemplateInformer,
201201
ClusterAnalysisTemplateInformer: clusterAnalysisTemplateInformer,
202+
ServiceInformer: servicesInformer,
202203
ResyncPeriod: resyncPeriod,
203204
RolloutWorkQueue: rolloutWorkqueue,
204205
ExperimentWorkQueue: experimentWorkqueue,

experiments/analysisrun_test.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -460,6 +460,8 @@ func TestAssessAnalysisRunStatusesAfterTemplateSuccess(t *testing.T) {
460460
ar2.Status.Phase = test.second
461461
e.Status.AnalysisRuns[1].Phase = test.second
462462
f := newFixture(t, e, rs, ar1, ar2)
463+
f.expectPatchReplicaSetAction(rs) // Add scaleDownDelay annotation to RS
464+
f.expectGetReplicaSetAction(rs) // Happens during scale down logic
463465
if test.expected != v1alpha1.AnalysisPhaseRunning {
464466
patchIdx := f.expectPatchExperimentAction(e)
465467
f.run(getKey(e, t))
@@ -478,6 +480,7 @@ func TestAssessAnalysisRunStatusesAfterTemplateSuccess(t *testing.T) {
478480
func TestFailExperimentWhenAnalysisFails(t *testing.T) {
479481
templates := generateTemplates("bar")
480482
e := newExperiment("foo", templates, "")
483+
e.Spec.ScaleDownDelaySeconds = pointer.Int32Ptr(0)
481484
e.Spec.Analyses = []v1alpha1.ExperimentAnalysisTemplateRef{
482485
{
483486
Name: "success-rate",
@@ -490,6 +493,7 @@ func TestFailExperimentWhenAnalysisFails(t *testing.T) {
490493
}
491494
e.Status.Phase = v1alpha1.AnalysisPhaseRunning
492495
e.Spec.Duration = "5m"
496+
e.Spec.ScaleDownDelaySeconds = pointer.Int32Ptr(0)
493497
e.Status.AvailableAt = secondsAgo(60)
494498
rs := templateToRS(e, templates[0], 1)
495499
ar1 := analysisTemplateToRun("success-rate", e, &v1alpha1.AnalysisTemplateSpec{})
@@ -551,7 +555,8 @@ func TestFailExperimentWhenAnalysisFails(t *testing.T) {
551555
f := newFixture(t, e, rs, ar1, ar2)
552556

553557
if test.expected == v1alpha1.AnalysisPhaseFailed {
554-
f.expectUpdateReplicaSetAction(rs) // scale down to 0
558+
// No scale down delay actions since scaleDownDelay seconds is 0
559+
f.expectUpdateReplicaSetAction(rs)
555560
}
556561
patchIdx := f.expectPatchExperimentAction(e)
557562
f.run(getKey(e, t))
@@ -593,6 +598,7 @@ func TestCompleteExperimentOnSuccessfulRequiredAnalysisRun(t *testing.T) {
593598

594599
f := newFixture(t, e, rs, ar)
595600
defer f.Close()
601+
f.expectGetReplicaSetAction(rs)
596602
f.expectUpdateReplicaSetAction(rs)
597603
patchIndex := f.expectPatchExperimentAction(e)
598604
f.run(getKey(e, t))
@@ -643,6 +649,7 @@ func TestDoNotCompleteExperimentWithRemainingRequiredAnalysisRun(t *testing.T) {
643649

644650
f := newFixture(t, e, rs, ar, ar2)
645651
defer f.Close()
652+
f.expectGetReplicaSetAction(rs)
646653
f.expectUpdateReplicaSetAction(rs)
647654
patchIndex := f.expectPatchExperimentAction(e)
648655
f.run(getKey(e, t))
@@ -653,6 +660,7 @@ func TestDoNotCompleteExperimentWithRemainingRequiredAnalysisRun(t *testing.T) {
653660
func TestCompleteExperimentWithNoRequiredAnalysis(t *testing.T) {
654661
templates := generateTemplates("bar")
655662
e := newExperiment("foo", templates, "1m")
663+
e.Spec.ScaleDownDelaySeconds = pointer.Int32Ptr(0)
656664
e.Spec.Analyses = []v1alpha1.ExperimentAnalysisTemplateRef{
657665
{
658666
Name: "success-rate",
@@ -692,6 +700,7 @@ func TestCompleteExperimentWithNoRequiredAnalysis(t *testing.T) {
692700
func TestTerminateAnalysisRuns(t *testing.T) {
693701
templates := generateTemplates("bar")
694702
e := newExperiment("foo", templates, "")
703+
e.Spec.ScaleDownDelaySeconds = pointer.Int32Ptr(0)
695704
e.Spec.Analyses = []v1alpha1.ExperimentAnalysisTemplateRef{
696705
{
697706
Name: "success-rate",

experiments/controller.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ import (
44
"context"
55
"time"
66

7+
informersv1 "k8s.io/client-go/informers/core/v1"
8+
listersv1 "k8s.io/client-go/listers/core/v1"
9+
710
log "github.com/sirupsen/logrus"
811
appsv1 "k8s.io/api/apps/v1"
912
k8serrors "k8s.io/apimachinery/pkg/api/errors"
@@ -48,6 +51,7 @@ type Controller struct {
4851
analysisTemplateLister listers.AnalysisTemplateLister
4952
clusterAnalysisTemplateLister listers.ClusterAnalysisTemplateLister
5053
analysisRunLister listers.AnalysisRunLister
54+
serviceLister listersv1.ServiceLister
5155

5256
replicaSetSynced cache.InformerSynced
5357
experimentSynced cache.InformerSynced
@@ -83,6 +87,7 @@ type ControllerConfig struct {
8387
AnalysisRunInformer informers.AnalysisRunInformer
8488
AnalysisTemplateInformer informers.AnalysisTemplateInformer
8589
ClusterAnalysisTemplateInformer informers.ClusterAnalysisTemplateInformer
90+
ServiceInformer informersv1.ServiceInformer
8691
ResyncPeriod time.Duration
8792
RolloutWorkQueue workqueue.RateLimitingInterface
8893
ExperimentWorkQueue workqueue.RateLimitingInterface
@@ -107,6 +112,7 @@ func NewController(cfg ControllerConfig) *Controller {
107112
analysisTemplateLister: cfg.AnalysisTemplateInformer.Lister(),
108113
clusterAnalysisTemplateLister: cfg.ClusterAnalysisTemplateInformer.Lister(),
109114
analysisRunLister: cfg.AnalysisRunInformer.Lister(),
115+
serviceLister: cfg.ServiceInformer.Lister(),
110116
metricsServer: cfg.MetricsServer,
111117
rolloutWorkqueue: cfg.RolloutWorkQueue,
112118
experimentWorkqueue: cfg.ExperimentWorkQueue,
@@ -275,16 +281,24 @@ func (ec *Controller) syncHandler(key string) error {
275281
return err
276282
}
277283

284+
templateServices, err := ec.getServicesForExperiment(experiment)
285+
if err != nil {
286+
return err
287+
}
288+
278289
exCtx := newExperimentContext(
279290
experiment,
280291
templateRSs,
292+
templateServices,
281293
ec.kubeclientset,
282294
ec.argoProjClientset,
283295
ec.replicaSetLister,
284296
ec.analysisTemplateLister,
285297
ec.clusterAnalysisTemplateLister,
286298
ec.analysisRunLister,
299+
ec.serviceLister,
287300
ec.recorder,
301+
ec.resyncPeriod,
288302
ec.enqueueExperimentAfter,
289303
)
290304

experiments/controller_test.go

Lines changed: 74 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,13 @@ import (
88
"testing"
99
"time"
1010

11+
"k8s.io/apimachinery/pkg/util/intstr"
12+
13+
"github.com/stretchr/testify/assert"
1114
"github.com/undefinedlabs/go-mpatch"
1215

1316
"github.com/argoproj/argo-rollouts/utils/queue"
1417

15-
"github.com/stretchr/testify/assert"
1618
appsv1 "k8s.io/api/apps/v1"
1719
corev1 "k8s.io/api/core/v1"
1820
"k8s.io/apimachinery/pkg/api/equality"
@@ -70,6 +72,7 @@ type fixture struct {
7072
analysisRunLister []*v1alpha1.AnalysisRun
7173
analysisTemplateLister []*v1alpha1.AnalysisTemplate
7274
clusterAnalysisTemplateLister []*v1alpha1.ClusterAnalysisTemplate
75+
serviceLister []*corev1.Service
7376
// Actions expected to happen on the client.
7477
kubeactions []core.Action
7578
actions []core.Action
@@ -102,6 +105,9 @@ func newFixture(t *testing.T, objects ...runtime.Object) *fixture {
102105
case *appsv1.ReplicaSet:
103106
f.kubeobjects = append(f.kubeobjects, obj)
104107
f.replicaSetLister = append(f.replicaSetLister, obj.(*appsv1.ReplicaSet))
108+
case *corev1.Service:
109+
f.kubeobjects = append(f.kubeobjects, obj)
110+
f.serviceLister = append(f.serviceLister, obj.(*corev1.Service))
105111
}
106112
}
107113
f.client = fake.NewSimpleClientset(f.objects...)
@@ -234,6 +240,32 @@ func newCondition(reason string, experiment *v1alpha1.Experiment) *v1alpha1.Expe
234240
return nil
235241
}
236242

243+
func templateToService(ex *v1alpha1.Experiment, template v1alpha1.TemplateSpec, replicaSet appsv1.ReplicaSet) *corev1.Service {
244+
if template.Service != nil {
245+
service := &corev1.Service{
246+
ObjectMeta: metav1.ObjectMeta{
247+
Name: replicaSet.Name,
248+
Namespace: replicaSet.Namespace,
249+
Annotations: map[string]string{
250+
v1alpha1.ExperimentNameAnnotationKey: ex.Name,
251+
v1alpha1.ExperimentTemplateNameAnnotationKey: template.Name,
252+
},
253+
OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(ex, experimentKind)},
254+
},
255+
Spec: corev1.ServiceSpec{
256+
Selector: replicaSet.Spec.Selector.MatchLabels,
257+
Ports: []corev1.ServicePort{{
258+
Protocol: "TCP",
259+
Port: int32(80),
260+
TargetPort: intstr.FromInt(8080),
261+
}},
262+
},
263+
}
264+
return service
265+
}
266+
return nil
267+
}
268+
237269
func templateToRS(ex *v1alpha1.Experiment, template v1alpha1.TemplateSpec, availableReplicas int32) *appsv1.ReplicaSet {
238270
rsLabels := map[string]string{}
239271
for k, v := range template.Selector.MatchLabels {
@@ -335,6 +367,7 @@ func (f *fixture) newController(resync resyncFunc) (*Controller, informers.Share
335367
AnalysisRunInformer: i.Argoproj().V1alpha1().AnalysisRuns(),
336368
AnalysisTemplateInformer: i.Argoproj().V1alpha1().AnalysisTemplates(),
337369
ClusterAnalysisTemplateInformer: i.Argoproj().V1alpha1().ClusterAnalysisTemplates(),
370+
ServiceInformer: k8sI.Core().V1().Services(),
338371
ResyncPeriod: resync(),
339372
RolloutWorkQueue: rolloutWorkqueue,
340373
ExperimentWorkQueue: experimentWorkqueue,
@@ -371,6 +404,10 @@ func (f *fixture) newController(resync resyncFunc) (*Controller, informers.Share
371404
k8sI.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(r)
372405
}
373406

407+
for _, r := range f.serviceLister {
408+
k8sI.Core().V1().Services().Informer().GetIndexer().Add(r)
409+
}
410+
374411
for _, r := range f.analysisRunLister {
375412
i.Argoproj().V1alpha1().AnalysisRuns().Informer().GetIndexer().Add(r)
376413
}
@@ -484,7 +521,9 @@ func filterInformerActions(actions []core.Action) []core.Action {
484521
action.Matches("list", "clusteranalysistemplates") ||
485522
action.Matches("watch", "clusteranalysistemplates") ||
486523
action.Matches("list", "analysisruns") ||
487-
action.Matches("watch", "analysisruns") {
524+
action.Matches("watch", "analysisruns") ||
525+
action.Matches("watch", "services") ||
526+
action.Matches("list", "services") {
488527
continue
489528
}
490529
ret = append(ret, action)
@@ -493,6 +532,18 @@ func filterInformerActions(actions []core.Action) []core.Action {
493532
return ret
494533
}
495534

535+
func (f *fixture) expectCreateServiceAction(service *corev1.Service) int {
536+
len := len(f.kubeactions)
537+
f.kubeactions = append(f.kubeactions, core.NewCreateAction(schema.GroupVersionResource{Resource: "services"}, service.Namespace, service))
538+
return len
539+
}
540+
541+
func (f *fixture) expectDeleteServiceAction(service *corev1.Service) int {
542+
len := len(f.kubeactions)
543+
f.kubeactions = append(f.kubeactions, core.NewDeleteAction(schema.GroupVersionResource{Resource: "services"}, service.Namespace, service.Name))
544+
return len
545+
}
546+
496547
func (f *fixture) expectCreateReplicaSetAction(r *appsv1.ReplicaSet) int {
497548
len := len(f.kubeactions)
498549
f.kubeactions = append(f.kubeactions, core.NewCreateAction(schema.GroupVersionResource{Resource: "replicasets"}, r.Namespace, r))
@@ -586,6 +637,27 @@ func (f *fixture) getCreatedReplicaSet(index int) *appsv1.ReplicaSet {
586637
return rs
587638
}
588639

640+
func (f *fixture) verifyPatchedReplicaSetAddScaleDownDelay(index int, scaleDownDelaySeconds int32) {
641+
action := filterInformerActions(f.kubeclient.Actions())[index]
642+
patchAction, ok := action.(core.PatchAction)
643+
if !ok {
644+
assert.Fail(f.t, "Expected Patch action, not %s", action.GetVerb())
645+
}
646+
now := metav1.Now().Add(time.Duration(scaleDownDelaySeconds) * time.Second).UTC().Format(time.RFC3339)
647+
patch := fmt.Sprintf(addScaleDownAtAnnotationsPatch, v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey, now)
648+
assert.Equal(f.t, string(patchAction.GetPatch()), patch)
649+
}
650+
651+
func (f *fixture) verifyPatchedReplicaSetRemoveScaleDownDelayAnnotation(index int) {
652+
action := filterInformerActions(f.kubeclient.Actions())[index]
653+
patchAction, ok := action.(core.PatchAction)
654+
if !ok {
655+
assert.Fail(f.t, "Expected Patch action, not %s", action.GetVerb())
656+
}
657+
patch := fmt.Sprintf(removeScaleDownAtAnnotationsPatch, v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey)
658+
assert.Equal(f.t, string(patchAction.GetPatch()), patch)
659+
}
660+
589661
func (f *fixture) getUpdatedReplicaSet(index int) *appsv1.ReplicaSet {
590662
action := filterInformerActions(f.kubeclient.Actions())[index]
591663
updateAction, ok := action.(core.UpdateAction)

0 commit comments

Comments
 (0)