Skip to content

Commit 23235b3

Browse files
committed
Fix integration tests
Signed-off-by: Jian Qiu <[email protected]>
1 parent 555357d commit 23235b3

File tree

12 files changed

+129
-110
lines changed

12 files changed

+129
-110
lines changed

pkg/cloudevents/clients/store/informer.go

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@ import (
99
"k8s.io/apimachinery/pkg/runtime"
1010
"k8s.io/apimachinery/pkg/watch"
1111
"k8s.io/client-go/tools/cache"
12-
"k8s.io/klog/v2"
13-
1412
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/utils"
1513
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic"
1614
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
@@ -52,8 +50,6 @@ func (s *AgentInformerWatcherStore[T]) Delete(resource runtime.Object) error {
5250
}
5351

5452
func (s *AgentInformerWatcherStore[T]) HandleReceivedResource(ctx context.Context, action types.ResourceAction, resource T) error {
55-
logger := klog.FromContext(ctx)
56-
5753
switch action {
5854
case types.Added:
5955
newObj, err := utils.ToRuntimeObject(resource)
@@ -63,24 +59,22 @@ func (s *AgentInformerWatcherStore[T]) HandleReceivedResource(ctx context.Contex
6359

6460
return s.Add(newObj)
6561
case types.Modified:
66-
newObj, err := meta.Accessor(resource)
62+
accessor, err := meta.Accessor(resource)
6763
if err != nil {
6864
return err
6965
}
7066

71-
lastObj, exists, err := s.Get(newObj.GetNamespace(), newObj.GetName())
67+
lastObj, exists, err := s.Get(accessor.GetNamespace(), accessor.GetName())
7268
if err != nil {
7369
return err
7470
}
7571
if !exists {
76-
return fmt.Errorf("the resource %s/%s does not exist", newObj.GetNamespace(), newObj.GetName())
72+
return fmt.Errorf("the resource %s/%s does not exist", accessor.GetNamespace(), accessor.GetName())
7773
}
7874

79-
// prevent the resource from being updated if it is deleting
75+
// if resource is deleting, keep the deletion timestamp
8076
if !lastObj.GetDeletionTimestamp().IsZero() {
81-
logger.Info("the resource is deleting, ignore the update",
82-
"resourceNamespace", newObj.GetNamespace(), "resourceName", newObj.GetName())
83-
return nil
77+
accessor.SetDeletionTimestamp(lastObj.GetDeletionTimestamp())
8478
}
8579

8680
updated, err := utils.ToRuntimeObject(resource)
@@ -99,10 +93,6 @@ func (s *AgentInformerWatcherStore[T]) HandleReceivedResource(ctx context.Contex
9993
return nil
10094
}
10195

102-
if len(newObj.GetFinalizers()) != 0 {
103-
return nil
104-
}
105-
10696
last, exists, err := s.Get(newObj.GetNamespace(), newObj.GetName())
10797
if err != nil {
10898
return err
@@ -116,6 +106,19 @@ func (s *AgentInformerWatcherStore[T]) HandleReceivedResource(ctx context.Contex
116106
return err
117107
}
118108

109+
// trigger an update event if the object is deleting.
110+
// Only need to update generation/finalizer/deletionTimeStamp of the object.
111+
if len(newObj.GetFinalizers()) != 0 {
112+
accessor, err := meta.Accessor(deletingObj)
113+
if err != nil {
114+
return err
115+
}
116+
accessor.SetDeletionTimestamp(newObj.GetDeletionTimestamp())
117+
accessor.SetFinalizers(newObj.GetFinalizers())
118+
accessor.SetGeneration(newObj.GetGeneration())
119+
return s.Update(deletingObj)
120+
}
121+
119122
return s.Delete(deletingObj)
120123
default:
121124
return fmt.Errorf("unsupported resource action %s", action)

pkg/cloudevents/clients/work/source/client/manifestwork.go

Lines changed: 68 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"net/http"
7+
"strconv"
78

89
"k8s.io/apimachinery/pkg/api/errors"
910
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -52,27 +53,34 @@ func (c *ManifestWorkSourceClient) SetNamespace(namespace string) {
5253
}
5354

5455
func (c *ManifestWorkSourceClient) Create(ctx context.Context, manifestWork *workv1.ManifestWork, opts metav1.CreateOptions) (*workv1.ManifestWork, error) {
56+
var returnErr *errors.StatusError
57+
58+
defer func() {
59+
if returnErr != nil {
60+
metrics.IncreaseWorkProcessedCounter("create", string(returnErr.ErrStatus.Reason))
61+
} else {
62+
metrics.IncreaseWorkProcessedCounter("create", metav1.StatusSuccess)
63+
}
64+
}()
65+
5566
if manifestWork.Namespace != "" && manifestWork.Namespace != c.namespace {
56-
returnErr := errors.NewInvalid(common.ManifestWorkGK, manifestWork.Name, field.ErrorList{
67+
returnErr = errors.NewInvalid(common.ManifestWorkGK, manifestWork.Name, field.ErrorList{
5768
field.Invalid(
5869
field.NewPath("metadata").Child("namespace"),
5970
manifestWork.Namespace,
6071
fmt.Sprintf("does not match the namespace %s", c.namespace),
6172
),
6273
})
63-
metrics.IncreaseWorkProcessedCounter("create", string(returnErr.ErrStatus.Reason))
6474
return nil, returnErr
6575
}
6676

6777
_, exists, err := c.watcherStore.Get(c.namespace, manifestWork.Name)
6878
if err != nil {
69-
returnErr := errors.NewInternalError(err)
70-
metrics.IncreaseWorkProcessedCounter("create", string(returnErr.ErrStatus.Reason))
79+
returnErr = errors.NewInternalError(err)
7180
return nil, returnErr
7281
}
7382
if exists {
74-
returnErr := errors.NewAlreadyExists(common.ManifestWorkGR, manifestWork.Name)
75-
metrics.IncreaseWorkProcessedCounter("create", string(returnErr.ErrStatus.Reason))
83+
returnErr = errors.NewAlreadyExists(common.ManifestWorkGR, manifestWork.Name)
7684
return nil, returnErr
7785
}
7886

@@ -87,34 +95,36 @@ func (c *ManifestWorkSourceClient) Create(ctx context.Context, manifestWork *wor
8795
newWork := manifestWork.DeepCopy()
8896
newWork.UID = kubetypes.UID(utils.UID(c.sourceID, common.ManifestWorkGR.String(), c.namespace, newWork.Name))
8997
newWork.Namespace = c.namespace
90-
newWork.ResourceVersion = getWorkResourceVersion(manifestWork)
98+
99+
rv, generation, err := getWorkResourceVersion(manifestWork)
100+
if err != nil {
101+
returnErr = errors.NewInternalError(err)
102+
return nil, returnErr
103+
}
104+
newWork.Generation = generation
105+
newWork.ResourceVersion = rv
91106

92107
if err := utils.EncodeManifests(newWork); err != nil {
93-
returnErr := errors.NewInternalError(err)
94-
metrics.IncreaseWorkProcessedCounter("create", string(returnErr.ErrStatus.Reason))
108+
returnErr = errors.NewInternalError(err)
95109
return nil, returnErr
96110
}
97111

98112
if errs := utils.ValidateWork(newWork); len(errs) != 0 {
99-
returnErr := errors.NewInvalid(common.ManifestWorkGK, manifestWork.Name, errs)
100-
metrics.IncreaseWorkProcessedCounter("create", string(returnErr.ErrStatus.Reason))
113+
returnErr = errors.NewInvalid(common.ManifestWorkGK, manifestWork.Name, errs)
101114
return nil, returnErr
102115
}
103116

104117
if err := c.cloudEventsClient.Publish(ctx, eventType, newWork); err != nil {
105-
returnErr := cloudeventserrors.ToStatusError(common.ManifestWorkGR, manifestWork.Name, err)
106-
metrics.IncreaseWorkProcessedCounter("create", string(returnErr.ErrStatus.Reason))
118+
returnErr = cloudeventserrors.ToStatusError(common.ManifestWorkGR, manifestWork.Name, err)
107119
return nil, returnErr
108120
}
109121

110122
// add the new work to the local cache.
111123
if err := c.watcherStore.Add(newWork); err != nil {
112-
returnErr := errors.NewInternalError(err)
113-
metrics.IncreaseWorkProcessedCounter("create", string(returnErr.ErrStatus.Reason))
124+
returnErr = errors.NewInternalError(err)
114125
return nil, returnErr
115126
}
116127

117-
metrics.IncreaseWorkProcessedCounter("create", metav1.StatusSuccess)
118128
return newWork.DeepCopy(), nil
119129
}
120130

@@ -238,29 +248,34 @@ func (c *ManifestWorkSourceClient) Patch(ctx context.Context, name string, pt ku
238248
logger := klog.FromContext(ctx)
239249
logger.V(4).Info("patching manifestwork", "manifestWorkName", name)
240250

251+
var returnErr *errors.StatusError
252+
defer func() {
253+
if returnErr != nil {
254+
metrics.IncreaseWorkProcessedCounter("patch", string(returnErr.ErrStatus.Reason))
255+
} else {
256+
metrics.IncreaseWorkProcessedCounter("patch", metav1.StatusSuccess)
257+
}
258+
}()
259+
241260
if len(subresources) != 0 {
242261
msg := fmt.Sprintf("unsupported to update subresources %v", subresources)
243-
returnErr := errors.NewGenericServerResponse(http.StatusMethodNotAllowed, "patch", common.ManifestWorkGR, name, msg, 0, false)
244-
metrics.IncreaseWorkProcessedCounter("patch", string(returnErr.ErrStatus.Reason))
262+
returnErr = errors.NewGenericServerResponse(http.StatusMethodNotAllowed, "patch", common.ManifestWorkGR, name, msg, 0, false)
245263
return nil, returnErr
246264
}
247265

248266
lastWork, exists, err := c.watcherStore.Get(c.namespace, name)
249267
if err != nil {
250-
returnErr := errors.NewInternalError(err)
251-
metrics.IncreaseWorkProcessedCounter("patch", string(returnErr.ErrStatus.Reason))
268+
returnErr = errors.NewInternalError(err)
252269
return nil, returnErr
253270
}
254271
if !exists {
255-
returnErr := errors.NewNotFound(common.ManifestWorkGR, name)
256-
metrics.IncreaseWorkProcessedCounter("patch", string(returnErr.ErrStatus.Reason))
272+
returnErr = errors.NewNotFound(common.ManifestWorkGR, name)
257273
return nil, returnErr
258274
}
259275

260276
patchedWork, err := utils.Patch(pt, lastWork, data)
261277
if err != nil {
262-
returnErr := errors.NewInternalError(err)
263-
metrics.IncreaseWorkProcessedCounter("patch", string(returnErr.ErrStatus.Reason))
278+
returnErr = errors.NewInternalError(err)
264279
return nil, returnErr
265280
}
266281

@@ -273,28 +288,30 @@ func (c *ManifestWorkSourceClient) Patch(ctx context.Context, name string, pt ku
273288
}
274289

275290
newWork := patchedWork.DeepCopy()
276-
newWork.ResourceVersion = getWorkResourceVersion(patchedWork)
291+
rv, generation, err := getWorkResourceVersion(patchedWork)
292+
if err != nil {
293+
returnErr = errors.NewInternalError(err)
294+
return nil, returnErr
295+
}
296+
newWork.Generation = generation
297+
newWork.ResourceVersion = rv
277298

278299
if errs := utils.ValidateWork(newWork); len(errs) != 0 {
279-
returnErr := errors.NewInvalid(common.ManifestWorkGK, name, errs)
280-
metrics.IncreaseWorkProcessedCounter("patch", string(returnErr.ErrStatus.Reason))
300+
returnErr = errors.NewInvalid(common.ManifestWorkGK, name, errs)
281301
return nil, returnErr
282302
}
283303

284304
if err := c.cloudEventsClient.Publish(ctx, eventType, newWork); err != nil {
285-
returnErr := cloudeventserrors.ToStatusError(common.ManifestWorkGR, name, err)
286-
metrics.IncreaseWorkProcessedCounter("patch", string(returnErr.ErrStatus.Reason))
305+
returnErr = cloudeventserrors.ToStatusError(common.ManifestWorkGR, name, err)
287306
return nil, returnErr
288307
}
289308

290309
// modify the updated work in the local cache.
291310
if err := c.watcherStore.Update(newWork); err != nil {
292-
returnErr := errors.NewInternalError(err)
293-
metrics.IncreaseWorkProcessedCounter("patch", string(returnErr.ErrStatus.Reason))
311+
returnErr = errors.NewInternalError(err)
294312
return nil, returnErr
295313
}
296314

297-
metrics.IncreaseWorkProcessedCounter("patch", metav1.StatusSuccess)
298315
return newWork.DeepCopy(), nil
299316
}
300317

@@ -303,15 +320,29 @@ func (c *ManifestWorkSourceClient) Patch(ctx context.Context, name string, pt ku
303320
// firstly, if no annotation is set, we will get the the resource version from work itself,
304321
// if the wok does not have it, "0" will be returned, which means the version of the work
305322
// will not be maintained on source, the message broker guarantees the work update order.
306-
func getWorkResourceVersion(work *workv1.ManifestWork) string {
323+
func getWorkResourceVersion(work *workv1.ManifestWork) (string, int64, error) {
324+
var generation int64
325+
var err error
326+
307327
resourceVersion, ok := work.Annotations[common.CloudEventsResourceVersionAnnotationKey]
308328
if ok {
309-
return resourceVersion
329+
generation, err = strconv.ParseInt(resourceVersion, 10, 64)
330+
if err != nil {
331+
return "", 0, errors.NewInternalError(err)
332+
}
310333
}
311334

312-
if work.ResourceVersion != "" {
313-
return work.ResourceVersion
335+
if generation == 0 {
336+
generation = work.Generation
337+
}
338+
339+
if len(resourceVersion) == 0 {
340+
if len(work.ResourceVersion) != 0 {
341+
resourceVersion = work.ResourceVersion
342+
} else {
343+
resourceVersion = "0"
344+
}
314345
}
315346

316-
return "0"
347+
return resourceVersion, generation, nil
317348
}

pkg/cloudevents/clients/work/source/codec/manifestbundle_test.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -204,8 +204,8 @@ func TestManifestBundleDecode(t *testing.T) {
204204
}(),
205205
expectedWork: &workv1.ManifestWork{
206206
ObjectMeta: metav1.ObjectMeta{
207-
UID: kubetypes.UID("test"),
208-
ResourceVersion: "13",
207+
UID: kubetypes.UID("test"),
208+
Generation: 13,
209209
Annotations: map[string]string{
210210
"cloudevents.open-cluster-management.io/sequenceid": "1834773391719010304",
211211
},
@@ -225,13 +225,13 @@ func TestManifestBundleDecode(t *testing.T) {
225225
name: "decode a manifestbundle status cloudevent with meta and spec",
226226
event: func() *cloudevents.Event {
227227
metaJson, err := json.Marshal(metav1.ObjectMeta{
228-
UID: kubetypes.UID("test"),
229-
ResourceVersion: "13",
230-
Name: "work1",
231-
Namespace: "cluster1",
232-
Labels: map[string]string{"test1": "test1"},
233-
Annotations: map[string]string{"test2": "test2"},
234-
Finalizers: []string{"test"},
228+
UID: kubetypes.UID("test"),
229+
Generation: 13,
230+
Name: "work1",
231+
Namespace: "cluster1",
232+
Labels: map[string]string{"test1": "test1"},
233+
Annotations: map[string]string{"test2": "test2"},
234+
Finalizers: []string{"test"},
235235
})
236236
if err != nil {
237237
t.Fatal(err)
@@ -290,11 +290,11 @@ func TestManifestBundleDecode(t *testing.T) {
290290
}(),
291291
expectedWork: &workv1.ManifestWork{
292292
ObjectMeta: metav1.ObjectMeta{
293-
UID: kubetypes.UID("test"),
294-
ResourceVersion: "13",
295-
Name: "work1",
296-
Namespace: "cluster1",
297-
Labels: map[string]string{"test1": "test1"},
293+
UID: kubetypes.UID("test"),
294+
Generation: 13,
295+
Name: "work1",
296+
Namespace: "cluster1",
297+
Labels: map[string]string{"test1": "test1"},
298298
Annotations: map[string]string{
299299
"cloudevents.open-cluster-management.io/sequenceid": "1834773391719010304",
300300
"test2": "test2",

pkg/cloudevents/clients/work/store/base.go

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package store
33
import (
44
"context"
55
"fmt"
6-
"strconv"
76
"time"
87

98
"k8s.io/apimachinery/pkg/api/equality"
@@ -116,22 +115,10 @@ func (b *workProcessor) handleWork(ctx context.Context, work *workv1.ManifestWor
116115
return b.store.Delete(updatedWork)
117116
}
118117

119-
lastResourceVersion, err := strconv.Atoi(lastWork.ResourceVersion)
120-
if err != nil {
121-
logger.Error(err, "invalid resource version for work")
122-
return nil
123-
}
124-
125-
resourceVersion, err := strconv.Atoi(work.ResourceVersion)
126-
if err != nil {
127-
logger.Error(err, "invalid resource version for work")
128-
return nil
129-
}
130-
131118
// the current work's version is maintained on source and the agent's work is newer than source, ignore
132-
if lastResourceVersion != 0 && resourceVersion > lastResourceVersion {
133-
logger.Info("the work resource version is great than its generation, ignore",
134-
"agentResourceVersion", resourceVersion, "sourceResourceVersion", lastResourceVersion)
119+
if lastWork.Generation != 0 && work.Generation > lastWork.Generation {
120+
logger.Info("the work generation is greater than its local generation, ignore",
121+
"localGeneration", lastWork.Generation, "remoteGeneration", work.Generation)
135122
return nil
136123
}
137124

pkg/cloudevents/clients/work/store/informer.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,6 @@ func (s *AgentInformerWatcherStore) HandleReceivedResource(ctx context.Context,
187187
// restore the fields that are maintained by local agent.
188188
updatedWork.Finalizers = lastWork.Finalizers
189189
updatedWork.Status = lastWork.Status
190-
191190
return s.Update(updatedWork)
192191
case types.Deleted:
193192
// the manifestwork is deleting on the source, we just update its deletion timestamp.
@@ -199,9 +198,12 @@ func (s *AgentInformerWatcherStore) HandleReceivedResource(ctx context.Context,
199198
return nil
200199
}
201200

202-
// we should only update the deletionTimestamp or the local work
201+
// update the deletionTimeStamp and generation of last work.
202+
// generation needs to be updated because it is possible that generation still change after
203+
// the object is in deleting state.
203204
updatedWork := lastWork.DeepCopy()
204205
updatedWork.DeletionTimestamp = work.DeletionTimestamp
206+
updatedWork.Generation = work.Generation
205207
return s.Update(updatedWork)
206208
default:
207209
return fmt.Errorf("unsupported resource action %s", action)

0 commit comments

Comments
 (0)