44 "context"
55 "fmt"
66 "net/http"
7+ "strconv"
78
89 "k8s.io/apimachinery/pkg/api/errors"
910 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -52,27 +53,34 @@ func (c *ManifestWorkSourceClient) SetNamespace(namespace string) {
5253}
5354
5455func (c * ManifestWorkSourceClient ) Create (ctx context.Context , manifestWork * workv1.ManifestWork , opts metav1.CreateOptions ) (* workv1.ManifestWork , error ) {
56+ var returnErr * errors.StatusError
57+
58+ defer func () {
59+ if returnErr != nil {
60+ metrics .IncreaseWorkProcessedCounter ("create" , string (returnErr .ErrStatus .Reason ))
61+ } else {
62+ metrics .IncreaseWorkProcessedCounter ("create" , metav1 .StatusSuccess )
63+ }
64+ }()
65+
5566 if manifestWork .Namespace != "" && manifestWork .Namespace != c .namespace {
56- returnErr : = errors .NewInvalid (common .ManifestWorkGK , manifestWork .Name , field.ErrorList {
67+ returnErr = errors .NewInvalid (common .ManifestWorkGK , manifestWork .Name , field.ErrorList {
5768 field .Invalid (
5869 field .NewPath ("metadata" ).Child ("namespace" ),
5970 manifestWork .Namespace ,
6071 fmt .Sprintf ("does not match the namespace %s" , c .namespace ),
6172 ),
6273 })
63- metrics .IncreaseWorkProcessedCounter ("create" , string (returnErr .ErrStatus .Reason ))
6474 return nil , returnErr
6575 }
6676
6777 _ , exists , err := c .watcherStore .Get (c .namespace , manifestWork .Name )
6878 if err != nil {
69- returnErr := errors .NewInternalError (err )
70- metrics .IncreaseWorkProcessedCounter ("create" , string (returnErr .ErrStatus .Reason ))
79+ returnErr = errors .NewInternalError (err )
7180 return nil , returnErr
7281 }
7382 if exists {
74- returnErr := errors .NewAlreadyExists (common .ManifestWorkGR , manifestWork .Name )
75- metrics .IncreaseWorkProcessedCounter ("create" , string (returnErr .ErrStatus .Reason ))
83+ returnErr = errors .NewAlreadyExists (common .ManifestWorkGR , manifestWork .Name )
7684 return nil , returnErr
7785 }
7886
@@ -87,34 +95,36 @@ func (c *ManifestWorkSourceClient) Create(ctx context.Context, manifestWork *wor
8795 newWork := manifestWork .DeepCopy ()
8896 newWork .UID = kubetypes .UID (utils .UID (c .sourceID , common .ManifestWorkGR .String (), c .namespace , newWork .Name ))
8997 newWork .Namespace = c .namespace
90- newWork .ResourceVersion = getWorkResourceVersion (manifestWork )
98+
99+ rv , generation , err := getWorkResourceVersion (manifestWork )
100+ if err != nil {
101+ returnErr = errors .NewInternalError (err )
102+ return nil , returnErr
103+ }
104+ newWork .Generation = generation
105+ newWork .ResourceVersion = rv
91106
92107 if err := utils .EncodeManifests (newWork ); err != nil {
93- returnErr := errors .NewInternalError (err )
94- metrics .IncreaseWorkProcessedCounter ("create" , string (returnErr .ErrStatus .Reason ))
108+ returnErr = errors .NewInternalError (err )
95109 return nil , returnErr
96110 }
97111
98112 if errs := utils .ValidateWork (newWork ); len (errs ) != 0 {
99- returnErr := errors .NewInvalid (common .ManifestWorkGK , manifestWork .Name , errs )
100- metrics .IncreaseWorkProcessedCounter ("create" , string (returnErr .ErrStatus .Reason ))
113+ returnErr = errors .NewInvalid (common .ManifestWorkGK , manifestWork .Name , errs )
101114 return nil , returnErr
102115 }
103116
104117 if err := c .cloudEventsClient .Publish (ctx , eventType , newWork ); err != nil {
105- returnErr := cloudeventserrors .ToStatusError (common .ManifestWorkGR , manifestWork .Name , err )
106- metrics .IncreaseWorkProcessedCounter ("create" , string (returnErr .ErrStatus .Reason ))
118+ returnErr = cloudeventserrors .ToStatusError (common .ManifestWorkGR , manifestWork .Name , err )
107119 return nil , returnErr
108120 }
109121
110122 // add the new work to the local cache.
111123 if err := c .watcherStore .Add (newWork ); err != nil {
112- returnErr := errors .NewInternalError (err )
113- metrics .IncreaseWorkProcessedCounter ("create" , string (returnErr .ErrStatus .Reason ))
124+ returnErr = errors .NewInternalError (err )
114125 return nil , returnErr
115126 }
116127
117- metrics .IncreaseWorkProcessedCounter ("create" , metav1 .StatusSuccess )
118128 return newWork .DeepCopy (), nil
119129}
120130
@@ -238,29 +248,34 @@ func (c *ManifestWorkSourceClient) Patch(ctx context.Context, name string, pt ku
238248 logger := klog .FromContext (ctx )
239249 logger .V (4 ).Info ("patching manifestwork" , "manifestWorkName" , name )
240250
251+ var returnErr * errors.StatusError
252+ defer func () {
253+ if returnErr != nil {
254+ metrics .IncreaseWorkProcessedCounter ("patch" , string (returnErr .ErrStatus .Reason ))
255+ } else {
256+ metrics .IncreaseWorkProcessedCounter ("patch" , metav1 .StatusSuccess )
257+ }
258+ }()
259+
241260 if len (subresources ) != 0 {
242261 msg := fmt .Sprintf ("unsupported to update subresources %v" , subresources )
243- returnErr := errors .NewGenericServerResponse (http .StatusMethodNotAllowed , "patch" , common .ManifestWorkGR , name , msg , 0 , false )
244- metrics .IncreaseWorkProcessedCounter ("patch" , string (returnErr .ErrStatus .Reason ))
262+ returnErr = errors .NewGenericServerResponse (http .StatusMethodNotAllowed , "patch" , common .ManifestWorkGR , name , msg , 0 , false )
245263 return nil , returnErr
246264 }
247265
248266 lastWork , exists , err := c .watcherStore .Get (c .namespace , name )
249267 if err != nil {
250- returnErr := errors .NewInternalError (err )
251- metrics .IncreaseWorkProcessedCounter ("patch" , string (returnErr .ErrStatus .Reason ))
268+ returnErr = errors .NewInternalError (err )
252269 return nil , returnErr
253270 }
254271 if ! exists {
255- returnErr := errors .NewNotFound (common .ManifestWorkGR , name )
256- metrics .IncreaseWorkProcessedCounter ("patch" , string (returnErr .ErrStatus .Reason ))
272+ returnErr = errors .NewNotFound (common .ManifestWorkGR , name )
257273 return nil , returnErr
258274 }
259275
260276 patchedWork , err := utils .Patch (pt , lastWork , data )
261277 if err != nil {
262- returnErr := errors .NewInternalError (err )
263- metrics .IncreaseWorkProcessedCounter ("patch" , string (returnErr .ErrStatus .Reason ))
278+ returnErr = errors .NewInternalError (err )
264279 return nil , returnErr
265280 }
266281
@@ -273,28 +288,30 @@ func (c *ManifestWorkSourceClient) Patch(ctx context.Context, name string, pt ku
273288 }
274289
275290 newWork := patchedWork .DeepCopy ()
276- newWork .ResourceVersion = getWorkResourceVersion (patchedWork )
291+ rv , generation , err := getWorkResourceVersion (patchedWork )
292+ if err != nil {
293+ returnErr = errors .NewInternalError (err )
294+ return nil , returnErr
295+ }
296+ newWork .Generation = generation
297+ newWork .ResourceVersion = rv
277298
278299 if errs := utils .ValidateWork (newWork ); len (errs ) != 0 {
279- returnErr := errors .NewInvalid (common .ManifestWorkGK , name , errs )
280- metrics .IncreaseWorkProcessedCounter ("patch" , string (returnErr .ErrStatus .Reason ))
300+ returnErr = errors .NewInvalid (common .ManifestWorkGK , name , errs )
281301 return nil , returnErr
282302 }
283303
284304 if err := c .cloudEventsClient .Publish (ctx , eventType , newWork ); err != nil {
285- returnErr := cloudeventserrors .ToStatusError (common .ManifestWorkGR , name , err )
286- metrics .IncreaseWorkProcessedCounter ("patch" , string (returnErr .ErrStatus .Reason ))
305+ returnErr = cloudeventserrors .ToStatusError (common .ManifestWorkGR , name , err )
287306 return nil , returnErr
288307 }
289308
290309 // modify the updated work in the local cache.
291310 if err := c .watcherStore .Update (newWork ); err != nil {
292- returnErr := errors .NewInternalError (err )
293- metrics .IncreaseWorkProcessedCounter ("patch" , string (returnErr .ErrStatus .Reason ))
311+ returnErr = errors .NewInternalError (err )
294312 return nil , returnErr
295313 }
296314
297- metrics .IncreaseWorkProcessedCounter ("patch" , metav1 .StatusSuccess )
298315 return newWork .DeepCopy (), nil
299316}
300317
@@ -303,15 +320,29 @@ func (c *ManifestWorkSourceClient) Patch(ctx context.Context, name string, pt ku
303320// firstly, if no annotation is set, we will get the the resource version from work itself,
304321// if the wok does not have it, "0" will be returned, which means the version of the work
305322// will not be maintained on source, the message broker guarantees the work update order.
306- func getWorkResourceVersion (work * workv1.ManifestWork ) string {
323+ func getWorkResourceVersion (work * workv1.ManifestWork ) (string , int64 , error ) {
324+ var generation int64
325+ var err error
326+
307327 resourceVersion , ok := work .Annotations [common .CloudEventsResourceVersionAnnotationKey ]
308328 if ok {
309- return resourceVersion
329+ generation , err = strconv .ParseInt (resourceVersion , 10 , 64 )
330+ if err != nil {
331+ return "" , 0 , errors .NewInternalError (err )
332+ }
310333 }
311334
312- if work .ResourceVersion != "" {
313- return work .ResourceVersion
335+ if generation == 0 {
336+ generation = work .Generation
337+ }
338+
339+ if len (resourceVersion ) == 0 {
340+ if len (work .ResourceVersion ) != 0 {
341+ resourceVersion = work .ResourceVersion
342+ } else {
343+ resourceVersion = "0"
344+ }
314345 }
315346
316- return "0"
347+ return resourceVersion , generation , nil
317348}
0 commit comments