Skip to content

Commit a6ea87b

Browse files
committed
Fix integration tests
Signed-off-by: Jian Qiu <[email protected]>
1 parent 555357d commit a6ea87b

File tree

12 files changed

+69
-72
lines changed

12 files changed

+69
-72
lines changed

pkg/cloudevents/clients/store/informer.go

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@ import (
99
"k8s.io/apimachinery/pkg/runtime"
1010
"k8s.io/apimachinery/pkg/watch"
1111
"k8s.io/client-go/tools/cache"
12-
"k8s.io/klog/v2"
13-
1412
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/utils"
1513
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic"
1614
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
@@ -52,8 +50,6 @@ func (s *AgentInformerWatcherStore[T]) Delete(resource runtime.Object) error {
5250
}
5351

5452
func (s *AgentInformerWatcherStore[T]) HandleReceivedResource(ctx context.Context, action types.ResourceAction, resource T) error {
55-
logger := klog.FromContext(ctx)
56-
5753
switch action {
5854
case types.Added:
5955
newObj, err := utils.ToRuntimeObject(resource)
@@ -63,24 +59,22 @@ func (s *AgentInformerWatcherStore[T]) HandleReceivedResource(ctx context.Contex
6359

6460
return s.Add(newObj)
6561
case types.Modified:
66-
newObj, err := meta.Accessor(resource)
62+
accessor, err := meta.Accessor(resource)
6763
if err != nil {
6864
return err
6965
}
7066

71-
lastObj, exists, err := s.Get(newObj.GetNamespace(), newObj.GetName())
67+
lastObj, exists, err := s.Get(accessor.GetNamespace(), accessor.GetName())
7268
if err != nil {
7369
return err
7470
}
7571
if !exists {
76-
return fmt.Errorf("the resource %s/%s does not exist", newObj.GetNamespace(), newObj.GetName())
72+
return fmt.Errorf("the resource %s/%s does not exist", accessor.GetNamespace(), accessor.GetName())
7773
}
7874

79-
// prevent the resource from being updated if it is deleting
75+
// if resource is deleting, keep the deletiong timestamp
8076
if !lastObj.GetDeletionTimestamp().IsZero() {
81-
logger.Info("the resource is deleting, ignore the update",
82-
"resourceNamespace", newObj.GetNamespace(), "resourceName", newObj.GetName())
83-
return nil
77+
accessor.SetDeletionTimestamp(lastObj.GetDeletionTimestamp())
8478
}
8579

8680
updated, err := utils.ToRuntimeObject(resource)

pkg/cloudevents/clients/work/source/client/manifestwork.go

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"net/http"
7+
"strconv"
78

89
"k8s.io/apimachinery/pkg/api/errors"
910
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -87,7 +88,13 @@ func (c *ManifestWorkSourceClient) Create(ctx context.Context, manifestWork *wor
8788
newWork := manifestWork.DeepCopy()
8889
newWork.UID = kubetypes.UID(utils.UID(c.sourceID, common.ManifestWorkGR.String(), c.namespace, newWork.Name))
8990
newWork.Namespace = c.namespace
90-
newWork.ResourceVersion = getWorkResourceVersion(manifestWork)
91+
92+
rv, generation, err := getWorkResourceVersion(manifestWork)
93+
if err != nil {
94+
return nil, errors.NewInternalError(err)
95+
}
96+
newWork.Generation = generation
97+
newWork.ResourceVersion = rv
9198

9299
if err := utils.EncodeManifests(newWork); err != nil {
93100
returnErr := errors.NewInternalError(err)
@@ -273,7 +280,12 @@ func (c *ManifestWorkSourceClient) Patch(ctx context.Context, name string, pt ku
273280
}
274281

275282
newWork := patchedWork.DeepCopy()
276-
newWork.ResourceVersion = getWorkResourceVersion(patchedWork)
283+
rv, generation, err := getWorkResourceVersion(patchedWork)
284+
if err != nil {
285+
return nil, errors.NewInternalError(err)
286+
}
287+
newWork.Generation = generation
288+
newWork.ResourceVersion = rv
277289

278290
if errs := utils.ValidateWork(newWork); len(errs) != 0 {
279291
returnErr := errors.NewInvalid(common.ManifestWorkGK, name, errs)
@@ -303,15 +315,27 @@ func (c *ManifestWorkSourceClient) Patch(ctx context.Context, name string, pt ku
303315
// firstly, if no annotation is set, we will get the the resource version from work itself,
304316
// if the wok does not have it, "0" will be returned, which means the version of the work
305317
// will not be maintained on source, the message broker guarantees the work update order.
306-
func getWorkResourceVersion(work *workv1.ManifestWork) string {
318+
func getWorkResourceVersion(work *workv1.ManifestWork) (string, int64, error) {
319+
var generation int64
320+
var err error
321+
307322
resourceVersion, ok := work.Annotations[common.CloudEventsResourceVersionAnnotationKey]
308323
if ok {
309-
return resourceVersion
324+
generation, err = strconv.ParseInt(resourceVersion, 10, 16)
325+
if err != nil {
326+
return "", 0, errors.NewInternalError(err)
327+
}
328+
}
329+
330+
if generation == 0 {
331+
generation = work.Generation
310332
}
311333

312-
if work.ResourceVersion != "" {
313-
return work.ResourceVersion
334+
if len(resourceVersion) == 0 && len(work.ResourceVersion) != 0 {
335+
resourceVersion = work.ResourceVersion
336+
} else {
337+
resourceVersion = "0"
314338
}
315339

316-
return "0"
340+
return resourceVersion, generation, nil
317341
}

pkg/cloudevents/clients/work/source/codec/manifestbundle_test.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -204,8 +204,8 @@ func TestManifestBundleDecode(t *testing.T) {
204204
}(),
205205
expectedWork: &workv1.ManifestWork{
206206
ObjectMeta: metav1.ObjectMeta{
207-
UID: kubetypes.UID("test"),
208-
ResourceVersion: "13",
207+
UID: kubetypes.UID("test"),
208+
Generation: 13,
209209
Annotations: map[string]string{
210210
"cloudevents.open-cluster-management.io/sequenceid": "1834773391719010304",
211211
},
@@ -225,13 +225,13 @@ func TestManifestBundleDecode(t *testing.T) {
225225
name: "decode a manifestbundle status cloudevent with meta and spec",
226226
event: func() *cloudevents.Event {
227227
metaJson, err := json.Marshal(metav1.ObjectMeta{
228-
UID: kubetypes.UID("test"),
229-
ResourceVersion: "13",
230-
Name: "work1",
231-
Namespace: "cluster1",
232-
Labels: map[string]string{"test1": "test1"},
233-
Annotations: map[string]string{"test2": "test2"},
234-
Finalizers: []string{"test"},
228+
UID: kubetypes.UID("test"),
229+
Generation: 13,
230+
Name: "work1",
231+
Namespace: "cluster1",
232+
Labels: map[string]string{"test1": "test1"},
233+
Annotations: map[string]string{"test2": "test2"},
234+
Finalizers: []string{"test"},
235235
})
236236
if err != nil {
237237
t.Fatal(err)
@@ -290,11 +290,11 @@ func TestManifestBundleDecode(t *testing.T) {
290290
}(),
291291
expectedWork: &workv1.ManifestWork{
292292
ObjectMeta: metav1.ObjectMeta{
293-
UID: kubetypes.UID("test"),
294-
ResourceVersion: "13",
295-
Name: "work1",
296-
Namespace: "cluster1",
297-
Labels: map[string]string{"test1": "test1"},
293+
UID: kubetypes.UID("test"),
294+
Generation: 13,
295+
Name: "work1",
296+
Namespace: "cluster1",
297+
Labels: map[string]string{"test1": "test1"},
298298
Annotations: map[string]string{
299299
"cloudevents.open-cluster-management.io/sequenceid": "1834773391719010304",
300300
"test2": "test2",

pkg/cloudevents/clients/work/store/base.go

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package store
33
import (
44
"context"
55
"fmt"
6-
"strconv"
76
"time"
87

98
"k8s.io/apimachinery/pkg/api/equality"
@@ -116,22 +115,10 @@ func (b *workProcessor) handleWork(ctx context.Context, work *workv1.ManifestWor
116115
return b.store.Delete(updatedWork)
117116
}
118117

119-
lastResourceVersion, err := strconv.Atoi(lastWork.ResourceVersion)
120-
if err != nil {
121-
logger.Error(err, "invalid resource version for work")
122-
return nil
123-
}
124-
125-
resourceVersion, err := strconv.Atoi(work.ResourceVersion)
126-
if err != nil {
127-
logger.Error(err, "invalid resource version for work")
128-
return nil
129-
}
130-
131118
// the current work's version is maintained on source and the agent's work is newer than source, ignore
132-
if lastResourceVersion != 0 && resourceVersion > lastResourceVersion {
133-
logger.Info("the work resource version is great than its generation, ignore",
134-
"agentResourceVersion", resourceVersion, "sourceResourceVersion", lastResourceVersion)
119+
if lastWork.Generation != 0 && work.Generation > lastWork.Generation {
120+
logger.Info("the work generation is great than its local generation, ignore",
121+
"agentGeneration", lastWork.Generation, "sourceGeneration", work.Generation)
135122
return nil
136123
}
137124

pkg/cloudevents/clients/work/store/informer.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,6 @@ func (s *AgentInformerWatcherStore) HandleReceivedResource(ctx context.Context,
187187
// restore the fields that are maintained by local agent.
188188
updatedWork.Finalizers = lastWork.Finalizers
189189
updatedWork.Status = lastWork.Status
190-
191190
return s.Update(updatedWork)
192191
case types.Deleted:
193192
// the manifestwork is deleting on the source, we just update its deletion timestamp.

pkg/cloudevents/generic/clients/agentclient.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -298,13 +298,13 @@ func (c *CloudEventAgentClient[T]) specAction(
298298
return types.Deleted, nil
299299
}
300300

301-
// if both the current and the last object have the resource version "0" or empty, then object
301+
// if both the current and the last object have the generation "0" or empty, then object
302302
// is considered as modified, the message broker guarantees the order of the messages
303303
if lastObj.GetGeneration() == 0 && obj.GetGeneration() == 0 {
304304
return types.Modified, nil
305305
}
306306

307-
if obj.GetGeneration() <= lastObj.GetGeneration() {
307+
if obj.GetGeneration() < lastObj.GetGeneration() {
308308
return evt, nil
309309
}
310310

pkg/cloudevents/generic/clients/agentclient_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -515,8 +515,8 @@ func TestReceiveResourceSpec(t *testing.T) {
515515
return *evt
516516
}(),
517517
resources: []*generictesting.MockResource{
518-
{UID: kubetypes.UID("test1"), Generation: 2, ResourceVersion: "2", Namespace: "cluster1"},
519-
{UID: kubetypes.UID("test2"), Generation: 1, ResourceVersion: "1", Namespace: "cluster1"},
518+
{UID: kubetypes.UID("test1"), Generation: 3, ResourceVersion: "1", Namespace: "cluster1"},
519+
{UID: kubetypes.UID("test2"), Generation: 3, ResourceVersion: "1", Namespace: "cluster1"},
520520
},
521521
validate: func(event types.ResourceAction, resource *generictesting.MockResource) {
522522
if len(event) != 0 {

pkg/cloudevents/generic/clients/baseclient.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ func (c *baseClient) publish(ctx context.Context, evt cloudevents.Event) error {
143143
}
144144

145145
logger.V(2).Info("Sending event", "event", evt.Context)
146-
logger.V(5).Info("Sending event", "event", func() any { return evt.String() })
146+
logger.V(5).Info("Sending event", "event", evt.String())
147147
if err := c.transport.Send(ctx, evt); err != nil {
148148
return err
149149
}

test/integration-test.mk

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ integration: test-cloudevents-integration test-basecontroller-integration test-s
3131

3232
test-cloudevents-integration:
3333
go test -c ./test/integration/cloudevents
34-
./cloudevents.test -ginkgo.slowSpecThreshold=15 -ginkgo.v -ginkgo.failFast
34+
./cloudevents.test -ginkgo.slowSpecThreshold=15 -ginkgo.v -ginkgo.failFast -v=5
3535
.PHONY: test-cloudevents-integration
3636

3737
test-basecontroller-integration: ensure-kubebuilder-tools

test/integration/cloudevents/cloudevents_resync_test.go

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,13 @@ import (
66
"fmt"
77
jsonpatch "github.com/evanphx/json-patch/v5"
88
"k8s.io/apimachinery/pkg/types"
9-
workinformers "open-cluster-management.io/api/client/work/informers/externalversions"
109
"time"
1110

1211
"github.com/onsi/ginkgo"
1312
"github.com/onsi/gomega"
1413

1514
"k8s.io/apimachinery/pkg/api/meta"
1615
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
17-
"k8s.io/apimachinery/pkg/labels"
1816
"k8s.io/apimachinery/pkg/util/rand"
1917

2018
workv1 "open-cluster-management.io/api/work/v1"
@@ -82,29 +80,20 @@ var _ = ginkgo.Describe("CloudEvents Clients Test - RESYNC", func() {
8280
gomega.Expect(err).ToNot(gomega.HaveOccurred())
8381

8482
ginkgo.By("start a work agent to resync the resources from agent")
85-
clientHolder, _, err := agent.StartWorkAgent(ctx, clusterName, mqttOptions, codec.NewManifestBundleCodec())
86-
87-
factory := workinformers.NewSharedInformerFactoryWithOptions(
88-
clientHolder.WorkInterface(),
89-
5*time.Minute,
90-
workinformers.WithNamespace(clusterName),
91-
)
92-
informer := factory.Work().V1().ManifestWorks()
93-
go informer.Informer().Run(ctx.Done())
83+
clientHolder, watchStore, err := agent.StartWorkAgent(ctx, clusterName, mqttOptions, codec.NewManifestBundleCodec())
9484

9585
gomega.Expect(err).ToNot(gomega.HaveOccurred())
96-
lister := informer.Lister().ManifestWorks(clusterName)
9786
agentWorkClient := clientHolder.ManifestWorks(clusterName)
9887

9988
ginkgo.By("ensure the resources is synced on the agent")
10089
gomega.Eventually(func() error {
101-
list, err := lister.List(labels.Everything())
90+
list, err := watchStore.List(clusterName, metav1.ListOptions{})
10291
if err != nil {
10392
return err
10493
}
10594

10695
// ensure there is only one work was synced on the cluster1
107-
if len(list) != 1 {
96+
if len(list.Items) != 1 {
10897
return fmt.Errorf("unexpected work list %v", list)
10998
}
11099

@@ -131,6 +120,9 @@ var _ = ginkgo.Describe("CloudEvents Clients Test - RESYNC", func() {
131120
return err
132121
}
133122
patchData, err := jsonpatch.CreateMergePatch(workData, newWorkData)
123+
if err != nil {
124+
return err
125+
}
134126

135127
// only update the status on the agent local part
136128
if _, err := agentWorkClient.Patch(context.Background(), workName, types.MergePatchType, patchData, metav1.PatchOptions{}); err != nil {

0 commit comments

Comments
 (0)