Skip to content

Commit 0a4e1d3

Browse files
authored
Merge pull request #6666 from liaolecheng/feature/stateful-workload-cluster-failover
Build gracefulEviction task with ClusterFailoverBehavior
2 parents e95003c + 4b3997f commit 0a4e1d3

File tree

8 files changed

+578
-303
lines changed

8 files changed

+578
-303
lines changed

pkg/controllers/applicationfailover/common.go

Lines changed: 3 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,19 @@ limitations under the License.
1717
package applicationfailover
1818

1919
import (
20-
"bytes"
21-
"encoding/json"
2220
"fmt"
2321
"sync"
2422

2523
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2624
"k8s.io/apimachinery/pkg/types"
2725
"k8s.io/apimachinery/pkg/util/sets"
28-
"k8s.io/client-go/util/jsonpath"
2926
"k8s.io/klog/v2"
3027
"k8s.io/utils/ptr"
3128

3229
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
3330
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
3431
"github.com/karmada-io/karmada/pkg/features"
32+
"github.com/karmada-io/karmada/pkg/util/helper"
3533
)
3634

3735
type workloadUnhealthyMap struct {
@@ -126,55 +124,6 @@ func distinguishUnhealthyClustersWithOthers(aggregatedStatusItems []workv1alpha2
126124
return unhealthyClusters, others
127125
}
128126

129-
func buildPreservedLabelState(statePreservation *policyv1alpha1.StatePreservation, rawStatus []byte) (map[string]string, error) {
130-
results := make(map[string]string, len(statePreservation.Rules))
131-
for _, rule := range statePreservation.Rules {
132-
value, err := parseJSONValue(rawStatus, rule.JSONPath)
133-
if err != nil {
134-
klog.ErrorS(err, "Failed to parse value with jsonPath from status",
135-
"jsonPath", rule.JSONPath,
136-
"status", string(rawStatus))
137-
return nil, err
138-
}
139-
results[rule.AliasLabelName] = value
140-
}
141-
142-
return results, nil
143-
}
144-
145-
func parseJSONValue(rawStatus []byte, jsonPath string) (string, error) {
146-
template := jsonPath
147-
j := jsonpath.New(jsonPath)
148-
j.AllowMissingKeys(false)
149-
err := j.Parse(template)
150-
if err != nil {
151-
return "", err
152-
}
153-
154-
buf := new(bytes.Buffer)
155-
unmarshalled := make(map[string]interface{})
156-
_ = json.Unmarshal(rawStatus, &unmarshalled)
157-
err = j.Execute(buf, unmarshalled)
158-
if err != nil {
159-
return "", err
160-
}
161-
return buf.String(), nil
162-
}
163-
164-
func findTargetStatusItemByCluster(aggregatedStatusItems []workv1alpha2.AggregatedStatusItem, cluster string) (workv1alpha2.AggregatedStatusItem, bool) {
165-
if len(aggregatedStatusItems) == 0 {
166-
return workv1alpha2.AggregatedStatusItem{}, false
167-
}
168-
169-
for index, statusItem := range aggregatedStatusItems {
170-
if statusItem.ClusterName == cluster {
171-
return aggregatedStatusItems[index], true
172-
}
173-
}
174-
175-
return workv1alpha2.AggregatedStatusItem{}, false
176-
}
177-
178127
func getClusterNamesFromTargetClusters(targetClusters []workv1alpha2.TargetCluster) []string {
179128
if targetClusters == nil {
180129
return nil
@@ -195,11 +144,11 @@ func buildTaskOptions(failoverBehavior *policyv1alpha1.ApplicationFailoverBehavi
195144

196145
if features.FeatureGate.Enabled(features.StatefulFailoverInjection) {
197146
if failoverBehavior.StatePreservation != nil && len(failoverBehavior.StatePreservation.Rules) != 0 {
198-
targetStatusItem, exist := findTargetStatusItemByCluster(aggregatedStatus, cluster)
147+
targetStatusItem, exist := helper.FindTargetStatusItemByCluster(aggregatedStatus, cluster)
199148
if !exist || targetStatusItem.Status == nil || targetStatusItem.Status.Raw == nil {
200149
return nil, fmt.Errorf("the application status has not yet been collected from Cluster(%s)", cluster)
201150
}
202-
preservedLabelState, err := buildPreservedLabelState(failoverBehavior.StatePreservation, targetStatusItem.Status.Raw)
151+
preservedLabelState, err := helper.BuildPreservedLabelState(failoverBehavior.StatePreservation, targetStatusItem.Status.Raw)
203152
if err != nil {
204153
return nil, err
205154
}

pkg/controllers/applicationfailover/common_test.go

Lines changed: 0 additions & 237 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package applicationfailover
1919
import (
2020
"fmt"
2121
"reflect"
22-
"strings"
2322
"testing"
2423

2524
"github.com/stretchr/testify/assert"
@@ -187,123 +186,6 @@ func TestDistinguishUnhealthyClustersWithOthers(t *testing.T) {
187186
}
188187
}
189188

190-
func Test_parseJSONValue(t *testing.T) {
191-
// This json value describes a DeploymentList object, it contains two deployment elements.
192-
var deploymentListStrBytes = []byte(`{"apiVersion":"v1","items":[{"apiVersion":"apps/v1","kind":"Deployment","metadata":{"creationTimestamp":"2024-11-27T07:59:13Z","generation":2,"labels":{"app":"nginx","propagationpolicy.karmada.io/permanent-id":"89a95e21-57ec-4f5d-b8c6-15bc196c1449"},"name":"nginx-01","namespace":"default","resourceVersion":"1148","uid":"12cf1e9f-61fd-4e47-a14e-e844165c7f93"},"spec":{"progressDeadlineSeconds":600,"replicas":2,"revisionHistoryLimit":10,"selector":{"matchLabels":{"app":"nginx"}},"strategy":{"rollingUpdate":{"maxSurge":"25%","maxUnavailable":"25%"},"type":"RollingUpdate"},"template":{"metadata":{"creationTimestamp":null,"labels":{"app":"nginx"}},"spec":{"containers":[{"image":"nginx","imagePullPolicy":"Always","name":"nginx","resources":{},"terminationMessagePath":"/dev/termination-log","terminationMessagePolicy":"File"}],"dnsPolicy":"ClusterFirst","restartPolicy":"Always","schedulerName":"default-scheduler","securityContext":{},"terminationGracePeriodSeconds":30}}},"status":{"availableReplicas":2,"observedGeneration":2,"readyReplicas":2,"replicas":2,"updatedReplicas":2}},{"apiVersion":"apps/v1","kind":"Deployment","metadata":{"creationTimestamp":"2024-11-27T07:59:13Z","generation":2,"labels":{"app":"nginx","propagationpolicy.karmada.io/permanent-id":"89a95e21-57ec-4f5d-b8c6-15bc196c1449"},"name":"nginx-02","namespace":"default","resourceVersion":"1149","uid":"12cf1e9f-61fd-4e47-a14e-e844165c7f93"},"spec":{"progressDeadlineSeconds":600,"replicas":2,"revisionHistoryLimit":10,"selector":{"matchLabels":{"app":"nginx"}},"strategy":{"rollingUpdate":{"maxSurge":"25%","maxUnavailable":"25%"},"type":"RollingUpdate"},"template":{"metadata":{"creationTimestamp":null,"labels":{"app":"nginx"}},"spec":{"containers":[{"image":"nginx","imagePullPolicy":"Always","name":"nginx","resources":{},"terminationMessagePath":"/dev/termination-log","terminationMessagePolicy":"File"}],"dnsPolicy":"ClusterFirst","restartPolicy":"Always","schedulerName":"default-scheduler","securityContext":{},"terminationGracePeriodSeconds":30}}},"status":{"availableReplicas":2,"observedGeneration":2,"readyReplicas":2,"replicas":2,"updatedReplicas":2}}],"kind":"List","metadata":{"resourceVersion":""}}`)
193-
type args struct {
194-
rawStatus []byte
195-
jsonPath string
196-
}
197-
tests := []struct {
198-
name string
199-
args args
200-
want string
201-
wantErr assert.ErrorAssertionFunc
202-
}{
203-
// Build the following test cases from the perspective of parsing application state
204-
{
205-
name: "target field not found",
206-
args: args{
207-
rawStatus: []byte(`{"readyReplicas": 2}`),
208-
jsonPath: "{ .replicas }",
209-
},
210-
wantErr: assert.Error,
211-
},
212-
{
213-
name: "invalid jsonPath",
214-
args: args{
215-
rawStatus: []byte(`{"readyReplicas": 2}`),
216-
jsonPath: "{ %replicas }",
217-
},
218-
wantErr: assert.Error,
219-
},
220-
{
221-
name: "success to parse",
222-
args: args{
223-
rawStatus: []byte(`{"replicas": 2}`),
224-
jsonPath: "{ .replicas }",
225-
},
226-
wantErr: assert.NoError,
227-
want: "2",
228-
},
229-
// Build the following test cases in terms of what the function supports (which we don't use now).
230-
// Please refer to Function Support: https://kubernetes.io/docs/reference/kubectl/jsonpath/
231-
{
232-
name: "the current object parse",
233-
args: args{
234-
rawStatus: deploymentListStrBytes,
235-
jsonPath: "{ @ }",
236-
},
237-
wantErr: assert.NoError,
238-
want: string(deploymentListStrBytes),
239-
},
240-
{
241-
name: "child operator parse",
242-
args: args{
243-
rawStatus: deploymentListStrBytes,
244-
jsonPath: "{ ['kind'] }",
245-
},
246-
wantErr: assert.NoError,
247-
want: "List",
248-
},
249-
{
250-
name: "recursive descent parse",
251-
args: args{
252-
rawStatus: deploymentListStrBytes,
253-
jsonPath: "{ ..resourceVersion }",
254-
},
255-
wantErr: assert.NoError,
256-
want: "1148 1149",
257-
},
258-
{
259-
name: "wildcard get all objects parse",
260-
args: args{
261-
rawStatus: deploymentListStrBytes,
262-
jsonPath: "{ .items[*].metadata.name }",
263-
},
264-
wantErr: assert.NoError,
265-
want: "nginx-01 nginx-02",
266-
},
267-
{
268-
name: "subscript operator parse",
269-
args: args{
270-
rawStatus: deploymentListStrBytes,
271-
jsonPath: "{ .items[0].metadata.name }",
272-
},
273-
wantErr: assert.NoError,
274-
want: "nginx-01",
275-
},
276-
{
277-
name: "filter parse",
278-
args: args{
279-
rawStatus: deploymentListStrBytes,
280-
jsonPath: "{ .items[?(@.metadata.name==\"nginx-01\")].metadata.name }",
281-
},
282-
wantErr: assert.NoError,
283-
want: "nginx-01",
284-
},
285-
{
286-
name: "iterate list parse",
287-
args: args{
288-
rawStatus: deploymentListStrBytes,
289-
jsonPath: "{range .items[*]}[{.metadata.name}, {.metadata.namespace}] {end}",
290-
},
291-
wantErr: assert.NoError,
292-
want: "[nginx-01, default] [nginx-02, default]",
293-
},
294-
}
295-
for _, tt := range tests {
296-
t.Run(tt.name, func(t *testing.T) {
297-
got, err := parseJSONValue(tt.args.rawStatus, tt.args.jsonPath)
298-
if !tt.wantErr(t, err, fmt.Sprintf("parseJSONValue(%s, %v)", tt.args.rawStatus, tt.args.jsonPath)) {
299-
return
300-
}
301-
got = strings.Trim(got, " ")
302-
assert.Equalf(t, tt.want, got, "parseJSONValue(%s, %v)", tt.args.rawStatus, tt.args.jsonPath)
303-
})
304-
}
305-
}
306-
307189
func Test_getClusterNamesFromTargetClusters(t *testing.T) {
308190
type args struct {
309191
targetClusters []workv1alpha2.TargetCluster
@@ -338,125 +220,6 @@ func Test_getClusterNamesFromTargetClusters(t *testing.T) {
338220
}
339221
}
340222

341-
func Test_findTargetStatusItemByCluster(t *testing.T) {
342-
type args struct {
343-
aggregatedStatusItems []workv1alpha2.AggregatedStatusItem
344-
cluster string
345-
}
346-
tests := []struct {
347-
name string
348-
args args
349-
want workv1alpha2.AggregatedStatusItem
350-
wantExist bool
351-
}{
352-
{
353-
name: "nil aggregatedStatusItems",
354-
args: args{
355-
aggregatedStatusItems: nil,
356-
cluster: "c1",
357-
},
358-
want: workv1alpha2.AggregatedStatusItem{},
359-
wantExist: false,
360-
},
361-
{
362-
name: "cluster exist in the aggregatedStatusItems",
363-
args: args{
364-
aggregatedStatusItems: []workv1alpha2.AggregatedStatusItem{
365-
{ClusterName: "c1"},
366-
{ClusterName: "c2"},
367-
},
368-
cluster: "c1",
369-
},
370-
want: workv1alpha2.AggregatedStatusItem{ClusterName: "c1"},
371-
wantExist: true,
372-
},
373-
{
374-
name: "cluster does not exist in the aggregatedStatusItems",
375-
args: args{
376-
aggregatedStatusItems: []workv1alpha2.AggregatedStatusItem{
377-
{ClusterName: "c1"},
378-
{ClusterName: "c2"},
379-
},
380-
cluster: "c?",
381-
},
382-
want: workv1alpha2.AggregatedStatusItem{},
383-
wantExist: false,
384-
},
385-
}
386-
for _, tt := range tests {
387-
t.Run(tt.name, func(t *testing.T) {
388-
got, got1 := findTargetStatusItemByCluster(tt.args.aggregatedStatusItems, tt.args.cluster)
389-
assert.Equalf(t, tt.want, got, "findTargetStatusItemByCluster(%v, %v)", tt.args.aggregatedStatusItems, tt.args.cluster)
390-
assert.Equalf(t, tt.wantExist, got1, "findTargetStatusItemByCluster(%v, %v)", tt.args.aggregatedStatusItems, tt.args.cluster)
391-
})
392-
}
393-
}
394-
395-
func Test_buildPreservedLabelState(t *testing.T) {
396-
type args struct {
397-
statePreservation *policyv1alpha1.StatePreservation
398-
rawStatus []byte
399-
}
400-
tests := []struct {
401-
name string
402-
args args
403-
want map[string]string
404-
wantErr assert.ErrorAssertionFunc
405-
}{
406-
{
407-
name: "successful case",
408-
args: args{
409-
statePreservation: &policyv1alpha1.StatePreservation{
410-
Rules: []policyv1alpha1.StatePreservationRule{
411-
{AliasLabelName: "key-a", JSONPath: "{ .replicas }"},
412-
{AliasLabelName: "key-b", JSONPath: "{ .health }"},
413-
},
414-
},
415-
rawStatus: []byte(`{"replicas": 2, "health": true}`),
416-
},
417-
wantErr: assert.NoError,
418-
want: map[string]string{"key-a": "2", "key-b": "true"},
419-
},
420-
{
421-
name: "one statePreservation rule exist not found field",
422-
args: args{
423-
statePreservation: &policyv1alpha1.StatePreservation{
424-
Rules: []policyv1alpha1.StatePreservationRule{
425-
{AliasLabelName: "key-a", JSONPath: "{ .replicas }"},
426-
{AliasLabelName: "key-b", JSONPath: "{ .notfound }"},
427-
},
428-
},
429-
rawStatus: []byte(`{"replicas": 2, "health": true}`),
430-
},
431-
wantErr: assert.Error,
432-
want: nil,
433-
},
434-
{
435-
name: "one statePreservation rule has invalid jsonPath",
436-
args: args{
437-
statePreservation: &policyv1alpha1.StatePreservation{
438-
Rules: []policyv1alpha1.StatePreservationRule{
439-
{AliasLabelName: "key-a", JSONPath: "{ .replicas }"},
440-
{AliasLabelName: "key-b", JSONPath: "{ %health }"},
441-
},
442-
},
443-
rawStatus: []byte(`{"replicas": 2, "health": true}`),
444-
},
445-
wantErr: assert.Error,
446-
want: nil,
447-
},
448-
}
449-
for _, tt := range tests {
450-
t.Run(tt.name, func(t *testing.T) {
451-
got, err := buildPreservedLabelState(tt.args.statePreservation, tt.args.rawStatus)
452-
if !tt.wantErr(t, err, fmt.Sprintf("buildPreservedLabelState(%v, %s)", tt.args.statePreservation, tt.args.rawStatus)) {
453-
return
454-
}
455-
assert.Equalf(t, tt.want, got, "buildPreservedLabelState(%v, %s)", tt.args.statePreservation, tt.args.rawStatus)
456-
})
457-
}
458-
}
459-
460223
func Test_buildTaskOptions(t *testing.T) {
461224
type args struct {
462225
failoverBehavior *policyv1alpha1.ApplicationFailoverBehavior

0 commit comments

Comments
 (0)