Skip to content

Commit 2339ddc

Browse files
authored
feat: add workload-ref/generation to rollout (argoproj#1198)
- Add a "rollout.argoproj.io/workload-generation" annotation to the rollout metadata, which equals to the generation of reference workload - workload-generation is updated when the referenced workload is updated. - status.workloadObservedGeneration records the observed generation of the rollout Signed-off-by: Hui Kang <[email protected]>
1 parent d4c8aab commit 2339ddc

File tree

16 files changed

+393
-35
lines changed

16 files changed

+393
-35
lines changed

controller/controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ func NewManager(
146146
serviceWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Services")
147147
ingressWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Ingresses")
148148

149-
refResolver := rollout.NewInformerBasedWorkloadRefResolver(namespace, dynamicclientset, discoveryClient, rolloutWorkqueue, rolloutsInformer.Informer())
149+
refResolver := rollout.NewInformerBasedWorkloadRefResolver(namespace, dynamicclientset, discoveryClient, argoprojclientset, rolloutsInformer.Informer())
150150
apiFactory := api.NewFactory(record.NewAPIFactorySettings(), defaults.Namespace(), secretInformer.Informer(), configMapInformer.Informer())
151151
recorder := record.NewEventRecorder(kubeclientset, metrics.MetricRolloutEventsTotal, apiFactory)
152152
notificationsController := controller.NewController(dynamicclientset.Resource(v1alpha1.RolloutGVR), rolloutsInformer.Informer(), apiFactory,

docs/migrating.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,8 @@ After creation Rollout will spinup required number of Pods side-by-side with the
108108
Rollout won't try to manage existing Deployment Pods. That means you can safely update add Rollout
109109
to the production environment without any interruption but you are going to run twice more Pods during migration.
110110

111+
Argo-rollouts controller patches the spec of rollout object with an annotation of `rollout.argoproj.io/workload-generation`, which equals the generation of referenced deployment. Users can detect if the rollout matches desired generation of deployment by checking the `workloadObservedGeneration` in the rollout status.
112+
111113
**Traffic Management During Migration**
112114

113115
The Rollout offers traffic management functionality that manages routing rules and flows the traffic to different

manifests/crds/rollout-crd.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2820,6 +2820,8 @@ spec:
28202820
updatedReplicas:
28212821
format: int32
28222822
type: integer
2823+
workloadObservedGeneration:
2824+
type: string
28232825
type: object
28242826
required:
28252827
- spec

manifests/install.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12509,6 +12509,8 @@ spec:
1250912509
updatedReplicas:
1251012510
format: int32
1251112511
type: integer
12512+
workloadObservedGeneration:
12513+
type: string
1251212514
type: object
1251312515
required:
1251412516
- spec

manifests/namespace-install.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12509,6 +12509,8 @@ spec:
1250912509
updatedReplicas:
1251012510
format: int32
1251112511
type: integer
12512+
workloadObservedGeneration:
12513+
type: string
1251212514
type: object
1251312515
required:
1251412516
- spec

pkg/apiclient/rollout/rollout.swagger.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1223,6 +1223,10 @@
12231223
"type": "string",
12241224
"title": "The generation observed by the rollout controller from metadata.generation\n+optional"
12251225
},
1226+
"workloadObservedGeneration": {
1227+
"type": "string",
1228+
"title": "The generation of referenced workload observed by the rollout controller\n+optional"
1229+
},
12261230
"conditions": {
12271231
"type": "array",
12281232
"items": {

pkg/apis/rollouts/v1alpha1/types.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -664,6 +664,9 @@ type RolloutStatus struct {
664664
// The generation observed by the rollout controller from metadata.generation
665665
// +optional
666666
ObservedGeneration string `json:"observedGeneration,omitempty" protobuf:"bytes,13,opt,name=observedGeneration"`
667+
// The generation of referenced workload observed by the rollout controller
668+
// +optional
669+
WorkloadObservedGeneration string `json:"workloadObservedGeneration,omitempty" protobuf:"bytes,24,opt,name=workloadObservedGeneration"`
667670
// Conditions a list of conditions a rollout can have.
668671
// +optional
669672
Conditions []RolloutCondition `json:"conditions,omitempty" protobuf:"bytes,14,rep,name=conditions"`

rollout/sync.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -691,6 +691,16 @@ func (c *rolloutContext) persistRolloutStatus(newStatus *v1alpha1.RolloutStatus)
691691

692692
prevStatus := c.rollout.Status
693693
c.pauseContext.CalculatePauseStatus(newStatus)
694+
if c.rollout.Spec.TemplateResolvedFromRef {
695+
workloadRefObservation, _ := annotations.GetWorkloadGenerationAnnotation(c.rollout)
696+
currentWorkloadObservedGeneration, _ := strconv.ParseInt(newStatus.WorkloadObservedGeneration, 10, 32)
697+
if workloadRefObservation != int32(currentWorkloadObservedGeneration) {
698+
newStatus.WorkloadObservedGeneration = strconv.Itoa(int(workloadRefObservation))
699+
}
700+
} else {
701+
newStatus.WorkloadObservedGeneration = ""
702+
}
703+
694704
newStatus.ObservedGeneration = strconv.Itoa(int(c.rollout.Generation))
695705
newStatus.Phase, newStatus.Message = rolloututil.CalculateRolloutPhase(c.rollout.Spec, *newStatus)
696706

rollout/sync_test.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,54 @@ func TestReconcileRevisionHistoryLimit(t *testing.T) {
215215
}
216216
}
217217

218+
func TestPersistWorkloadRefGeneration(t *testing.T) {
219+
replica := int32(1)
220+
r := &v1alpha1.Rollout{
221+
Spec: v1alpha1.RolloutSpec{
222+
Replicas: &replica,
223+
},
224+
}
225+
fake := fake.Clientset{}
226+
roCtx := &rolloutContext{
227+
rollout: r,
228+
log: logutil.WithRollout(r),
229+
reconcilerBase: reconcilerBase{
230+
argoprojclientset: &fake,
231+
},
232+
pauseContext: &pauseContext{
233+
rollout: r,
234+
},
235+
}
236+
237+
tests := []struct {
238+
annotatedRefGeneration string
239+
currentObserved string
240+
}{
241+
{"1", ""},
242+
{"2", "1"},
243+
{"", "1"},
244+
}
245+
246+
for _, tc := range tests {
247+
newStatus := &v1alpha1.RolloutStatus{
248+
UpdatedReplicas: int32(1),
249+
AvailableReplicas: int32(1),
250+
}
251+
252+
if tc.annotatedRefGeneration != "" {
253+
annotations.SetRolloutWorkloadRefGeneration(r, tc.annotatedRefGeneration)
254+
r.Spec.TemplateResolvedFromRef = true
255+
256+
newStatus.WorkloadObservedGeneration = tc.currentObserved
257+
} else {
258+
r.Spec.TemplateResolvedFromRef = false
259+
annotations.RemoveRolloutWorkloadRefGeneration(r)
260+
}
261+
roCtx.persistRolloutStatus(newStatus)
262+
assert.Equal(t, tc.annotatedRefGeneration, newStatus.WorkloadObservedGeneration)
263+
}
264+
}
265+
218266
// TestCanaryPromoteFull verifies skip pause, analysis, steps when promote full is set for a canary rollout
219267
func TestCanaryPromoteFull(t *testing.T) {
220268
f := newFixture(t)

rollout/temlateref.go

Lines changed: 50 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,16 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7+
"strconv"
78
"sync"
89
"time"
910

1011
"github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
12+
clientset "github.com/argoproj/argo-rollouts/pkg/client/clientset/versioned"
13+
"github.com/argoproj/argo-rollouts/utils/annotations"
1114
unstructuredutil "github.com/argoproj/argo-rollouts/utils/unstructured"
1215

16+
log "github.com/sirupsen/logrus"
1317
corev1 "k8s.io/api/core/v1"
1418
"k8s.io/apimachinery/pkg/api/errors"
1519
"k8s.io/apimachinery/pkg/api/meta"
@@ -21,7 +25,6 @@ import (
2125
"k8s.io/client-go/dynamic/dynamicinformer"
2226
"k8s.io/client-go/informers"
2327
"k8s.io/client-go/tools/cache"
24-
"k8s.io/client-go/util/workqueue"
2528
)
2629

2730
const (
@@ -57,16 +60,16 @@ type informerBasedTemplateResolver struct {
5760
discoClient discovery.DiscoveryInterface
5861
ctx context.Context
5962
cancelContext context.CancelFunc
60-
rolloutWorkQueue workqueue.Interface
6163
rolloutsInformer cache.SharedIndexInformer
64+
argoprojclientset clientset.Interface
6265
}
6366

6467
// NewInformerBasedWorkloadRefResolver create new instance of workload ref resolver.
6568
func NewInformerBasedWorkloadRefResolver(
6669
namespace string,
6770
dynamicClient dynamic.Interface,
6871
discoClient discovery.DiscoveryInterface,
69-
rolloutWorkQueue workqueue.Interface,
72+
agrgoProjClientset clientset.Interface,
7073
rolloutsInformer cache.SharedIndexInformer,
7174
) *informerBasedTemplateResolver {
7275
ctx, cancelContext := context.WithCancel(context.TODO())
@@ -88,9 +91,9 @@ func NewInformerBasedWorkloadRefResolver(
8891
cancelContext: cancelContext,
8992
informerResyncDuration: time.Minute * 5,
9093
informerSyncTimeout: time.Minute,
94+
argoprojclientset: agrgoProjClientset,
9195
dynamicClient: dynamicClient,
9296
discoClient: discoClient,
93-
rolloutWorkQueue: rolloutWorkQueue,
9497
rolloutsInformer: rolloutsInformer,
9598
}
9699
}
@@ -122,6 +125,7 @@ func remarshalMap(objMap map[string]interface{}, res interface{}) error {
122125
// Resolve verifies if given rollout has template reference and resolves pod template
123126
func (r *informerBasedTemplateResolver) Resolve(rollout *v1alpha1.Rollout) error {
124127
if rollout.Spec.WorkloadRef == nil {
128+
annotations.RemoveRolloutWorkloadRefGeneration(rollout)
125129
return nil
126130
}
127131

@@ -170,6 +174,14 @@ func (r *informerBasedTemplateResolver) Resolve(rollout *v1alpha1.Rollout) error
170174
}
171175
}
172176

177+
// initialize rollout workload-generation annotation
178+
workloadMeta, err := meta.Accessor(obj)
179+
if err != nil {
180+
return err
181+
}
182+
generation := strconv.FormatInt(workloadMeta.GetGeneration(), 10)
183+
annotations.SetRolloutWorkloadRefGeneration(rollout, generation)
184+
173185
return nil
174186
}
175187

@@ -198,36 +210,60 @@ func (r *informerBasedTemplateResolver) newInformerForGVK(gvk schema.GroupVersio
198210
nil)
199211
informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
200212
AddFunc: func(obj interface{}) {
201-
r.requeueReferencedRollouts(obj, gvk)
213+
r.updateRolloutsReferenceAnnotation(obj, gvk)
202214
},
203215
UpdateFunc: func(oldObj, newObj interface{}) {
204-
r.requeueReferencedRollouts(newObj, gvk)
216+
r.updateRolloutsReferenceAnnotation(newObj, gvk)
205217
},
206218
DeleteFunc: func(obj interface{}) {
207-
r.requeueReferencedRollouts(obj, gvk)
219+
r.updateRolloutsReferenceAnnotation(obj, gvk)
208220
},
209221
})
210222
return informer, nil
211223

212224
}
213225

214-
// requeueReferencedRollouts re-queues all rollouts referenced by given object
215-
func (r *informerBasedTemplateResolver) requeueReferencedRollouts(obj interface{}, gvk schema.GroupVersionKind) {
216-
roMeta, err := meta.Accessor(obj)
226+
// updateRolloutsReferenceAnnotation update the annotation of all rollouts referenced by given object
227+
func (r *informerBasedTemplateResolver) updateRolloutsReferenceAnnotation(obj interface{}, gvk schema.GroupVersionKind) {
228+
workloadMeta, err := meta.Accessor(obj)
217229
if err != nil {
218230
return
219231
}
232+
220233
rollouts, err := r.rolloutsInformer.GetIndexer().ByIndex(templateRefIndexName, refKey(v1alpha1.ObjectRef{
221234
Kind: gvk.Kind,
222235
APIVersion: gvk.GroupVersion().String(),
223-
Name: roMeta.GetName(),
224-
}, roMeta.GetNamespace()))
236+
Name: workloadMeta.GetName(),
237+
}, workloadMeta.GetNamespace()))
225238
if err != nil {
226239
return
227240
}
241+
242+
var updateAnnotation func(ro *v1alpha1.Rollout)
243+
244+
generation := strconv.FormatInt(workloadMeta.GetGeneration(), 10)
245+
updateAnnotation = func(ro *v1alpha1.Rollout) {
246+
updated := annotations.SetRolloutWorkloadRefGeneration(ro, generation)
247+
if updated {
248+
// update the annotation causes the rollout to be requeued and the template will be resolved to the referred
249+
// workload during next reconciliation
250+
ro.Spec.Template.Spec.Containers = []corev1.Container{}
251+
_, err := r.argoprojclientset.ArgoprojV1alpha1().Rollouts(ro.Namespace).Update(context.TODO(), ro, v1.UpdateOptions{})
252+
if err != nil {
253+
log.Errorf("Cannot update the workload-ref/annotation for %s/%s", ro.GetName(), ro.GetNamespace())
254+
}
255+
}
256+
}
228257
for _, ro := range rollouts {
229-
if key, err := cache.MetaNamespaceKeyFunc(ro); err == nil {
230-
r.rolloutWorkQueue.Add(key)
258+
rollout, ok := ro.(*v1alpha1.Rollout)
259+
if ok {
260+
updateAnnotation(rollout)
261+
} else {
262+
un, ok := ro.(*unstructured.Unstructured)
263+
if ok {
264+
rollout := unstructuredutil.ObjectToRollout(un)
265+
updateAnnotation(rollout)
266+
}
231267
}
232268
}
233269
}

0 commit comments

Comments
 (0)