Skip to content

Commit b0cb266

Browse files
authored
Merge pull request #6833 from ryanwuer/default-resource-interpreter-for-kubernetes-replicaset
feat: default resource interpreter for kubernetes replicaset
2 parents 03fb1bf + df054de commit b0cb266

File tree

11 files changed

+507
-1
lines changed

11 files changed

+507
-1
lines changed

pkg/resourceinterpreter/default/native/aggregatestatus.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ type aggregateStatusInterpreter func(object *unstructured.Unstructured, aggregat
4343
func getAllDefaultAggregateStatusInterpreter() map[schema.GroupVersionKind]aggregateStatusInterpreter {
4444
s := make(map[schema.GroupVersionKind]aggregateStatusInterpreter)
4545
s[appsv1.SchemeGroupVersion.WithKind(util.DeploymentKind)] = aggregateDeploymentStatus
46+
s[appsv1.SchemeGroupVersion.WithKind(util.ReplicaSetKind)] = aggregateReplicaSetStatus
4647
s[corev1.SchemeGroupVersion.WithKind(util.ServiceKind)] = aggregateServiceStatus
4748
s[networkingv1.SchemeGroupVersion.WithKind(util.IngressKind)] = aggregateIngressStatus
4849
s[batchv1.SchemeGroupVersion.WithKind(util.JobKind)] = aggregateJobStatus
@@ -120,6 +121,63 @@ func aggregateDeploymentStatus(object *unstructured.Unstructured, aggregatedStat
120121
return helper.ToUnstructured(deploy)
121122
}
122123

124+
func aggregateReplicaSetStatus(object *unstructured.Unstructured, aggregatedStatusItems []workv1alpha2.AggregatedStatusItem) (*unstructured.Unstructured, error) {
125+
rs := &appsv1.ReplicaSet{}
126+
err := helper.ConvertToTypedObject(object, rs)
127+
if err != nil {
128+
return nil, err
129+
}
130+
131+
oldStatus := &rs.Status
132+
newStatus := &appsv1.ReplicaSetStatus{}
133+
observedLatestResourceTemplateGenerationCount := 0
134+
for _, item := range aggregatedStatusItems {
135+
if item.Status == nil {
136+
continue
137+
}
138+
member := &WrappedReplicaSetStatus{}
139+
if err = json.Unmarshal(item.Status.Raw, member); err != nil {
140+
return nil, err
141+
}
142+
klog.V(3).Infof("Grab replicaset(%s/%s) status from cluster(%s), replicas: %d, ready: %d, available: %d",
143+
rs.Namespace, rs.Name, item.ClusterName, member.Replicas, member.ReadyReplicas, member.AvailableReplicas)
144+
145+
// `memberStatus.ObservedGeneration >= memberStatus.Generation` means the member's status corresponds the latest spec revision of the member replicaset.
146+
// `memberStatus.ResourceTemplateGeneration >= rs.Generation` means the member replicaset has been aligned with the latest spec revision of federated replicaset.
147+
// If both conditions are met, we consider the member's status corresponds the latest spec revision of federated replicaset.
148+
if member.ObservedGeneration >= member.Generation &&
149+
member.ResourceTemplateGeneration >= rs.Generation {
150+
observedLatestResourceTemplateGenerationCount++
151+
}
152+
153+
newStatus.Replicas += member.Replicas
154+
newStatus.ReadyReplicas += member.ReadyReplicas
155+
newStatus.AvailableReplicas += member.AvailableReplicas
156+
}
157+
158+
// The 'observedGeneration' is mainly used by GitOps tools(like 'Argo CD') to assess the health status.
159+
// For more details, please refer to https://argo-cd.readthedocs.io/en/stable/operator-manual/health/.
160+
newStatus.ObservedGeneration = oldStatus.ObservedGeneration
161+
if observedLatestResourceTemplateGenerationCount == len(aggregatedStatusItems) {
162+
newStatus.ObservedGeneration = rs.Generation
163+
}
164+
165+
if oldStatus.ObservedGeneration == newStatus.ObservedGeneration &&
166+
oldStatus.Replicas == newStatus.Replicas &&
167+
oldStatus.ReadyReplicas == newStatus.ReadyReplicas &&
168+
oldStatus.AvailableReplicas == newStatus.AvailableReplicas {
169+
klog.V(3).Infof("Ignore update replicaset(%s/%s) status as up to date", rs.Namespace, rs.Name)
170+
return object, nil
171+
}
172+
173+
oldStatus.ObservedGeneration = newStatus.ObservedGeneration
174+
oldStatus.Replicas = newStatus.Replicas
175+
oldStatus.ReadyReplicas = newStatus.ReadyReplicas
176+
oldStatus.AvailableReplicas = newStatus.AvailableReplicas
177+
178+
return helper.ToUnstructured(rs)
179+
}
180+
123181
func aggregateServiceStatus(object *unstructured.Unstructured, aggregatedStatusItems []workv1alpha2.AggregatedStatusItem) (*unstructured.Unstructured, error) {
124182
service := &corev1.Service{}
125183
err := helper.ConvertToTypedObject(object, service)

pkg/resourceinterpreter/default/native/aggregatestatus_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1294,6 +1294,7 @@ func Test_aggregateHorizontalPodAutoscalerStatus(t *testing.T) {
12941294
func Test_getAllDefaultAggregateStatusInterpreter(t *testing.T) {
12951295
expectedKinds := []schema.GroupVersionKind{
12961296
{Group: "apps", Version: "v1", Kind: "Deployment"},
1297+
{Group: "apps", Version: "v1", Kind: "ReplicaSet"},
12971298
{Group: "apps", Version: "v1", Kind: "StatefulSet"},
12981299
{Group: "batch", Version: "v1", Kind: "Job"},
12991300
{Group: "", Version: "v1", Kind: "Pod"},

pkg/resourceinterpreter/default/native/dependencies.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ type dependenciesInterpreter func(object *unstructured.Unstructured) ([]configv1
4242
func getAllDefaultDependenciesInterpreter() map[schema.GroupVersionKind]dependenciesInterpreter {
4343
s := make(map[schema.GroupVersionKind]dependenciesInterpreter)
4444
s[appsv1.SchemeGroupVersion.WithKind(util.DeploymentKind)] = getDeploymentDependencies
45+
s[appsv1.SchemeGroupVersion.WithKind(util.ReplicaSetKind)] = getReplicaSetDependencies
4546
s[batchv1.SchemeGroupVersion.WithKind(util.JobKind)] = getJobDependencies
4647
s[batchv1.SchemeGroupVersion.WithKind(util.CronJobKind)] = getCronJobDependencies
4748
s[corev1.SchemeGroupVersion.WithKind(util.PodKind)] = getPodDependencies
@@ -66,6 +67,20 @@ func getDeploymentDependencies(object *unstructured.Unstructured) ([]configv1alp
6667
return helper.GetDependenciesFromPodTemplate(podObj)
6768
}
6869

70+
func getReplicaSetDependencies(object *unstructured.Unstructured) ([]configv1alpha1.DependentObjectReference, error) {
71+
replicaSetObj := &appsv1.ReplicaSet{}
72+
if err := helper.ConvertToTypedObject(object, replicaSetObj); err != nil {
73+
return nil, fmt.Errorf("failed to convert ReplicaSet from unstructured object: %v", err)
74+
}
75+
76+
podObj, err := lifted.GetPodFromTemplate(&replicaSetObj.Spec.Template, replicaSetObj, nil)
77+
if err != nil {
78+
return nil, err
79+
}
80+
81+
return helper.GetDependenciesFromPodTemplate(podObj)
82+
}
83+
6984
func getJobDependencies(object *unstructured.Unstructured) ([]configv1alpha1.DependentObjectReference, error) {
7085
jobObj := &batchv1.Job{}
7186
err := helper.ConvertToTypedObject(object, jobObj)

pkg/resourceinterpreter/default/native/dependencies_test.go

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,113 @@ func Test_getDeploymentDependencies(t *testing.T) {
311311
}
312312
}
313313

314+
func Test_getReplicaSetDependencies(t *testing.T) {
315+
tests := []struct {
316+
name string
317+
object *unstructured.Unstructured
318+
want []configv1alpha1.DependentObjectReference
319+
wantErr bool
320+
}{
321+
{
322+
name: "replicaset without dependencies",
323+
object: &unstructured.Unstructured{
324+
Object: map[string]interface{}{
325+
"apiVersion": "apps/v1",
326+
"kind": "ReplicaSet",
327+
"metadata": map[string]interface{}{
328+
"name": "fake-replicaset",
329+
"generation": 1,
330+
"namespace": namespace,
331+
},
332+
"spec": map[string]interface{}{
333+
"replicas": 3,
334+
},
335+
},
336+
},
337+
want: nil,
338+
wantErr: false,
339+
},
340+
{
341+
name: "replicaset with dependencies 1",
342+
object: &unstructured.Unstructured{
343+
Object: map[string]interface{}{
344+
"apiVersion": "apps/v1",
345+
"kind": "ReplicaSet",
346+
"metadata": map[string]interface{}{
347+
"name": "fake-replicaset",
348+
"generation": 1,
349+
"namespace": namespace,
350+
},
351+
"spec": map[string]interface{}{
352+
"replicas": 3,
353+
"template": map[string]interface{}{
354+
"spec": testPairs[0].podSpecsWithDependencies.Object,
355+
},
356+
},
357+
},
358+
},
359+
want: testPairs[0].dependentObjectReference,
360+
wantErr: false,
361+
},
362+
{
363+
name: "replicaset with dependencies 2",
364+
object: &unstructured.Unstructured{
365+
Object: map[string]interface{}{
366+
"apiVersion": "apps/v1",
367+
"kind": "ReplicaSet",
368+
"metadata": map[string]interface{}{
369+
"name": "fake-replicaset",
370+
"namespace": namespace,
371+
},
372+
"spec": map[string]interface{}{
373+
"replicas": 3,
374+
"template": map[string]interface{}{
375+
"spec": testPairs[1].podSpecsWithDependencies.Object,
376+
},
377+
},
378+
},
379+
},
380+
want: testPairs[1].dependentObjectReference,
381+
wantErr: false,
382+
},
383+
{
384+
name: "replicaset with dependencies 3",
385+
object: &unstructured.Unstructured{
386+
Object: map[string]interface{}{
387+
"apiVersion": "apps/v1",
388+
"kind": "ReplicaSet",
389+
"metadata": map[string]interface{}{
390+
"name": "fake-replicaset",
391+
"namespace": namespace,
392+
},
393+
"spec": map[string]interface{}{
394+
"replicas": 3,
395+
"template": map[string]interface{}{
396+
"spec": testPairs[2].podSpecsWithDependencies.Object,
397+
},
398+
},
399+
},
400+
},
401+
want: testPairs[2].dependentObjectReference,
402+
wantErr: false,
403+
},
404+
}
405+
406+
for i := range tests {
407+
tt := tests[i]
408+
t.Run(tt.name, func(t *testing.T) {
409+
t.Parallel()
410+
got, err := getReplicaSetDependencies(tt.object)
411+
if (err != nil) != tt.wantErr {
412+
t.Errorf("getReplicaSetDependencies() err = %v, wantErr %v", err, tt.wantErr)
413+
}
414+
if !reflect.DeepEqual(got, tt.want) {
415+
t.Errorf("getReplicaSetDependencies() = %v, want %v", got, tt.want)
416+
}
417+
})
418+
}
419+
}
420+
314421
func Test_getJobDependencies(t *testing.T) {
315422
tests := []struct {
316423
name string
@@ -981,6 +1088,7 @@ func Test_getServiceImportDependencies(t *testing.T) {
9811088
func Test_getAllDefaultDependenciesInterpreter(t *testing.T) {
9821089
expectedKinds := []schema.GroupVersionKind{
9831090
{Group: "apps", Version: "v1", Kind: "Deployment"},
1091+
{Group: "apps", Version: "v1", Kind: "ReplicaSet"},
9841092
{Group: "batch", Version: "v1", Kind: "Job"},
9851093
{Group: "batch", Version: "v1", Kind: "CronJob"},
9861094
{Group: "", Version: "v1", Kind: "Pod"},

pkg/resourceinterpreter/default/native/reflectstatus.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ type reflectStatusInterpreter func(object *unstructured.Unstructured) (*runtime.
4141
func getAllDefaultReflectStatusInterpreter() map[schema.GroupVersionKind]reflectStatusInterpreter {
4242
s := make(map[schema.GroupVersionKind]reflectStatusInterpreter)
4343
s[appsv1.SchemeGroupVersion.WithKind(util.DeploymentKind)] = reflectDeploymentStatus
44+
s[appsv1.SchemeGroupVersion.WithKind(util.ReplicaSetKind)] = reflectReplicaSetStatus
4445
s[corev1.SchemeGroupVersion.WithKind(util.ServiceKind)] = reflectServiceStatus
4546
s[networkingv1.SchemeGroupVersion.WithKind(util.IngressKind)] = reflectIngressStatus
4647
s[batchv1.SchemeGroupVersion.WithKind(util.JobKind)] = reflectJobStatus
@@ -105,6 +106,58 @@ func reflectDeploymentStatus(object *unstructured.Unstructured) (*runtime.RawExt
105106
return grabStatusRaw, nil
106107
}
107108

109+
func reflectReplicaSetStatus(object *unstructured.Unstructured) (*runtime.RawExtension, error) {
110+
statusMap, exist, err := unstructured.NestedMap(object.Object, "status")
111+
if err != nil {
112+
klog.Errorf("Failed to get status field from %s(%s/%s), error: %v",
113+
object.GetKind(), object.GetNamespace(), object.GetName(), err)
114+
return nil, err
115+
}
116+
if !exist {
117+
klog.Errorf("Failed to grab status from %s(%s/%s) which should have status field.",
118+
object.GetKind(), object.GetNamespace(), object.GetName())
119+
return nil, nil
120+
}
121+
122+
replicaSetStatus := &appsv1.ReplicaSetStatus{}
123+
if err = helper.ConvertToTypedObject(statusMap, replicaSetStatus); err != nil {
124+
return nil, fmt.Errorf("failed to convert ReplicaSetStatus from map[string]interface{}: %v", err)
125+
}
126+
127+
resourceTemplateGenerationInt := int64(0)
128+
resourceTemplateGenerationStr := util.GetAnnotationValue(object.GetAnnotations(), v1alpha2.ResourceTemplateGenerationAnnotationKey)
129+
err = runtime.Convert_string_To_int64(&resourceTemplateGenerationStr, &resourceTemplateGenerationInt, nil)
130+
if err != nil {
131+
klog.Errorf("Failed to parse ReplicaSet(%s/%s) generation from annotation(%s:%s): %v", object.GetNamespace(), object.GetName(), v1alpha2.ResourceTemplateGenerationAnnotationKey, resourceTemplateGenerationStr, err)
132+
return nil, err
133+
}
134+
135+
grabStatus := &WrappedReplicaSetStatus{
136+
FederatedGeneration: FederatedGeneration{
137+
Generation: object.GetGeneration(),
138+
ResourceTemplateGeneration: resourceTemplateGenerationInt,
139+
},
140+
ReplicaSetStatus: appsv1.ReplicaSetStatus{
141+
Replicas: replicaSetStatus.Replicas,
142+
ReadyReplicas: replicaSetStatus.ReadyReplicas,
143+
AvailableReplicas: replicaSetStatus.AvailableReplicas,
144+
ObservedGeneration: replicaSetStatus.ObservedGeneration,
145+
},
146+
}
147+
148+
grabStatusRaw, err := helper.BuildStatusRawExtension(grabStatus)
149+
if err != nil {
150+
return nil, err
151+
}
152+
153+
// if status is empty struct, it actually means status haven't been collected from member cluster.
154+
if bytes.Equal(grabStatusRaw.Raw, []byte("{}")) {
155+
return nil, nil
156+
}
157+
158+
return grabStatusRaw, nil
159+
}
160+
108161
func reflectServiceStatus(object *unstructured.Unstructured) (*runtime.RawExtension, error) {
109162
serviceType, exist, err := unstructured.NestedString(object.Object, "spec", "type")
110163
if err != nil {

0 commit comments

Comments
 (0)