Skip to content

Commit dab2a83

Browse files
committed
Fix conflicts patching scale subresources
1 parent 5655350 commit dab2a83

File tree

9 files changed

+753
-132
lines changed

9 files changed

+753
-132
lines changed

pkg/registry/apps/deployment/storage/storage.go

Lines changed: 63 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -279,39 +279,10 @@ func (r *ScaleREST) Get(ctx context.Context, name string, options *metav1.GetOpt
279279

280280
// Update alters scale subset of Deployment object.
281281
func (r *ScaleREST) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
282-
obj, err := r.store.Get(ctx, name, &metav1.GetOptions{})
283-
if err != nil {
284-
return nil, false, errors.NewNotFound(apps.Resource("deployments/scale"), name)
285-
}
286-
deployment := obj.(*apps.Deployment)
287-
288-
oldScale, err := scaleFromDeployment(deployment)
289-
if err != nil {
290-
return nil, false, err
291-
}
292-
293-
obj, err = objInfo.UpdatedObject(ctx, oldScale)
294-
if err != nil {
295-
return nil, false, err
296-
}
297-
if obj == nil {
298-
return nil, false, errors.NewBadRequest(fmt.Sprintf("nil update passed to Scale"))
299-
}
300-
scale, ok := obj.(*autoscaling.Scale)
301-
if !ok {
302-
return nil, false, errors.NewBadRequest(fmt.Sprintf("expected input object type to be Scale, but %T", obj))
303-
}
304-
305-
if errs := autoscalingvalidation.ValidateScale(scale); len(errs) > 0 {
306-
return nil, false, errors.NewInvalid(autoscaling.Kind("Scale"), name, errs)
307-
}
308-
309-
deployment.Spec.Replicas = scale.Spec.Replicas
310-
deployment.ResourceVersion = scale.ResourceVersion
311-
obj, _, err = r.store.Update(
282+
obj, _, err := r.store.Update(
312283
ctx,
313-
deployment.Name,
314-
rest.DefaultUpdatedObjectInfo(deployment),
284+
name,
285+
&scaleUpdatedObjectInfo{name, objInfo},
315286
toScaleCreateValidation(createValidation),
316287
toScaleUpdateValidation(updateValidation),
317288
false,
@@ -320,7 +291,7 @@ func (r *ScaleREST) Update(ctx context.Context, name string, objInfo rest.Update
320291
if err != nil {
321292
return nil, false, err
322293
}
323-
deployment = obj.(*apps.Deployment)
294+
deployment := obj.(*apps.Deployment)
324295
newScale, err := scaleFromDeployment(deployment)
325296
if err != nil {
326297
return nil, false, errors.NewBadRequest(fmt.Sprintf("%v", err))
@@ -376,3 +347,62 @@ func scaleFromDeployment(deployment *apps.Deployment) (*autoscaling.Scale, error
376347
},
377348
}, nil
378349
}
350+
351+
// scaleUpdatedObjectInfo transforms existing deployment -> existing scale -> new scale -> new deployment
352+
type scaleUpdatedObjectInfo struct {
353+
name string
354+
reqObjInfo rest.UpdatedObjectInfo
355+
}
356+
357+
func (i *scaleUpdatedObjectInfo) Preconditions() *metav1.Preconditions {
358+
return i.reqObjInfo.Preconditions()
359+
}
360+
361+
func (i *scaleUpdatedObjectInfo) UpdatedObject(ctx context.Context, oldObj runtime.Object) (runtime.Object, error) {
362+
deployment, ok := oldObj.DeepCopyObject().(*apps.Deployment)
363+
if !ok {
364+
return nil, errors.NewBadRequest(fmt.Sprintf("expected existing object type to be Deployment, got %T", deployment))
365+
}
366+
// if zero-value, the existing object does not exist
367+
if len(deployment.ResourceVersion) == 0 {
368+
return nil, errors.NewNotFound(apps.Resource("deployments/scale"), i.name)
369+
}
370+
371+
// deployment -> old scale
372+
oldScale, err := scaleFromDeployment(deployment)
373+
if err != nil {
374+
return nil, err
375+
}
376+
377+
// old scale -> new scale
378+
newScaleObj, err := i.reqObjInfo.UpdatedObject(ctx, oldScale)
379+
if err != nil {
380+
return nil, err
381+
}
382+
if newScaleObj == nil {
383+
return nil, errors.NewBadRequest("nil update passed to Scale")
384+
}
385+
scale, ok := newScaleObj.(*autoscaling.Scale)
386+
if !ok {
387+
return nil, errors.NewBadRequest(fmt.Sprintf("expected input object type to be Scale, but %T", newScaleObj))
388+
}
389+
390+
// validate
391+
if errs := autoscalingvalidation.ValidateScale(scale); len(errs) > 0 {
392+
return nil, errors.NewInvalid(autoscaling.Kind("Scale"), deployment.Name, errs)
393+
}
394+
395+
// validate precondition if specified (resourceVersion matching is handled by storage)
396+
if len(scale.UID) > 0 && scale.UID != deployment.UID {
397+
return nil, errors.NewConflict(
398+
apps.Resource("deployments/scale"),
399+
deployment.Name,
400+
fmt.Errorf("Precondition failed: UID in precondition: %v, UID in object meta: %v", scale.UID, deployment.UID),
401+
)
402+
}
403+
404+
// move replicas/resourceVersion fields to object and return
405+
deployment.Spec.Replicas = scale.Spec.Replicas
406+
deployment.ResourceVersion = scale.ResourceVersion
407+
return deployment, nil
408+
}

pkg/registry/apps/deployment/storage/storage_test.go

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@ import (
2121
"fmt"
2222
"net/http"
2323
"reflect"
24+
"sync"
2425
"testing"
2526

2627
apiequality "k8s.io/apimachinery/pkg/api/equality"
2728
"k8s.io/apimachinery/pkg/api/errors"
29+
apierrors "k8s.io/apimachinery/pkg/api/errors"
2830
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2931
"k8s.io/apimachinery/pkg/fields"
3032
"k8s.io/apimachinery/pkg/labels"
@@ -451,3 +453,123 @@ func TestCategories(t *testing.T) {
451453
expected := []string{"all"}
452454
registrytest.AssertCategories(t, storage.Deployment, expected)
453455
}
456+
457+
func TestScalePatchErrors(t *testing.T) {
458+
storage, server := newStorage(t)
459+
defer server.Terminate(t)
460+
validObj := validNewDeployment()
461+
resourceStore := storage.Deployment.Store
462+
scaleStore := storage.Scale
463+
464+
defer resourceStore.DestroyFunc()
465+
ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), namespace)
466+
467+
{
468+
applyNotFoundPatch := func() rest.TransformFunc {
469+
return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) {
470+
t.Errorf("notfound patch called")
471+
return currentObject, nil
472+
}
473+
}
474+
_, _, err := scaleStore.Update(ctx, "bad-name", rest.DefaultUpdatedObjectInfo(nil, applyNotFoundPatch()), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
475+
if !apierrors.IsNotFound(err) {
476+
t.Errorf("expected notfound, got %v", err)
477+
}
478+
}
479+
480+
if _, err := resourceStore.Create(ctx, validObj, rest.ValidateAllObjectFunc, &metav1.CreateOptions{}); err != nil {
481+
t.Errorf("Unexpected error: %v", err)
482+
}
483+
484+
{
485+
applyBadUIDPatch := func() rest.TransformFunc {
486+
return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) {
487+
currentObject.(*autoscaling.Scale).UID = "123"
488+
return currentObject, nil
489+
}
490+
}
491+
_, _, err := scaleStore.Update(ctx, name, rest.DefaultUpdatedObjectInfo(nil, applyBadUIDPatch()), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
492+
if !apierrors.IsConflict(err) {
493+
t.Errorf("expected conflict, got %v", err)
494+
}
495+
}
496+
497+
{
498+
applyBadResourceVersionPatch := func() rest.TransformFunc {
499+
return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) {
500+
currentObject.(*autoscaling.Scale).ResourceVersion = "123"
501+
return currentObject, nil
502+
}
503+
}
504+
_, _, err := scaleStore.Update(ctx, name, rest.DefaultUpdatedObjectInfo(nil, applyBadResourceVersionPatch()), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
505+
if !apierrors.IsConflict(err) {
506+
t.Errorf("expected conflict, got %v", err)
507+
}
508+
}
509+
}
510+
511+
func TestScalePatchConflicts(t *testing.T) {
512+
storage, server := newStorage(t)
513+
defer server.Terminate(t)
514+
validObj := validNewDeployment()
515+
resourceStore := storage.Deployment.Store
516+
scaleStore := storage.Scale
517+
518+
defer resourceStore.DestroyFunc()
519+
ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), namespace)
520+
if _, err := resourceStore.Create(ctx, validObj, rest.ValidateAllObjectFunc, &metav1.CreateOptions{}); err != nil {
521+
t.Fatalf("Unexpected error: %v", err)
522+
}
523+
applyLabelPatch := func(labelName, labelValue string) rest.TransformFunc {
524+
return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) {
525+
currentObject.(metav1.Object).SetLabels(map[string]string{labelName: labelValue})
526+
return currentObject, nil
527+
}
528+
}
529+
stopCh := make(chan struct{})
530+
wg := &sync.WaitGroup{}
531+
wg.Add(1)
532+
go func() {
533+
defer wg.Done()
534+
// continuously submits a patch that updates a label and verifies the label update was effective
535+
labelName := "timestamp"
536+
for i := 0; ; i++ {
537+
select {
538+
case <-stopCh:
539+
return
540+
default:
541+
expectedLabelValue := fmt.Sprint(i)
542+
updated, _, err := resourceStore.Update(ctx, name, rest.DefaultUpdatedObjectInfo(nil, applyLabelPatch(labelName, fmt.Sprint(i))), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
543+
if err != nil {
544+
t.Errorf("error patching main resource: %v", err)
545+
return
546+
}
547+
gotLabelValue := updated.(metav1.Object).GetLabels()[labelName]
548+
if gotLabelValue != expectedLabelValue {
549+
t.Errorf("wrong label value: expected: %s, got: %s", expectedLabelValue, gotLabelValue)
550+
return
551+
}
552+
}
553+
}
554+
}()
555+
556+
// continuously submits a scale patch of replicas for a monotonically increasing replica value
557+
applyReplicaPatch := func(replicas int) rest.TransformFunc {
558+
return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) {
559+
currentObject.(*autoscaling.Scale).Spec.Replicas = int32(replicas)
560+
return currentObject, nil
561+
}
562+
}
563+
for i := 0; i < 100; i++ {
564+
result, _, err := scaleStore.Update(ctx, name, rest.DefaultUpdatedObjectInfo(nil, applyReplicaPatch(i)), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
565+
if err != nil {
566+
t.Fatalf("error patching scale: %v", err)
567+
}
568+
scale := result.(*autoscaling.Scale)
569+
if scale.Spec.Replicas != int32(i) {
570+
t.Errorf("wrong replicas count: expected: %d got: %d", i, scale.Spec.Replicas)
571+
}
572+
}
573+
close(stopCh)
574+
wg.Wait()
575+
}

pkg/registry/apps/replicaset/storage/storage.go

Lines changed: 64 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ func (r *StatusREST) Update(ctx context.Context, name string, objInfo rest.Updat
138138
return r.store.Update(ctx, name, objInfo, createValidation, updateValidation, false, options)
139139
}
140140

141-
// ScaleREST implements a Scale for Deployment.
141+
// ScaleREST implements a Scale for ReplicaSet.
142142
type ScaleREST struct {
143143
store *genericregistry.Store
144144
}
@@ -182,40 +182,10 @@ func (r *ScaleREST) Get(ctx context.Context, name string, options *metav1.GetOpt
182182

183183
// Update alters scale subset of ReplicaSet object.
184184
func (r *ScaleREST) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
185-
obj, err := r.store.Get(ctx, name, &metav1.GetOptions{})
186-
if err != nil {
187-
return nil, false, errors.NewNotFound(apps.Resource("replicasets/scale"), name)
188-
}
189-
rs := obj.(*apps.ReplicaSet)
190-
191-
oldScale, err := scaleFromReplicaSet(rs)
192-
if err != nil {
193-
return nil, false, err
194-
}
195-
196-
// TODO: should this pass admission?
197-
obj, err = objInfo.UpdatedObject(ctx, oldScale)
198-
if err != nil {
199-
return nil, false, err
200-
}
201-
if obj == nil {
202-
return nil, false, errors.NewBadRequest(fmt.Sprintf("nil update passed to Scale"))
203-
}
204-
scale, ok := obj.(*autoscaling.Scale)
205-
if !ok {
206-
return nil, false, errors.NewBadRequest(fmt.Sprintf("wrong object passed to Scale update: %v", obj))
207-
}
208-
209-
if errs := autoscalingvalidation.ValidateScale(scale); len(errs) > 0 {
210-
return nil, false, errors.NewInvalid(autoscaling.Kind("Scale"), scale.Name, errs)
211-
}
212-
213-
rs.Spec.Replicas = scale.Spec.Replicas
214-
rs.ResourceVersion = scale.ResourceVersion
215-
obj, _, err = r.store.Update(
185+
obj, _, err := r.store.Update(
216186
ctx,
217-
rs.Name,
218-
rest.DefaultUpdatedObjectInfo(rs),
187+
name,
188+
&scaleUpdatedObjectInfo{name, objInfo},
219189
toScaleCreateValidation(createValidation),
220190
toScaleUpdateValidation(updateValidation),
221191
false,
@@ -224,7 +194,7 @@ func (r *ScaleREST) Update(ctx context.Context, name string, objInfo rest.Update
224194
if err != nil {
225195
return nil, false, err
226196
}
227-
rs = obj.(*apps.ReplicaSet)
197+
rs := obj.(*apps.ReplicaSet)
228198
newScale, err := scaleFromReplicaSet(rs)
229199
if err != nil {
230200
return nil, false, errors.NewBadRequest(fmt.Sprintf("%v", err))
@@ -280,3 +250,62 @@ func scaleFromReplicaSet(rs *apps.ReplicaSet) (*autoscaling.Scale, error) {
280250
},
281251
}, nil
282252
}
253+
254+
// scaleUpdatedObjectInfo transforms existing replicaset -> existing scale -> new scale -> new replicaset
255+
type scaleUpdatedObjectInfo struct {
256+
name string
257+
reqObjInfo rest.UpdatedObjectInfo
258+
}
259+
260+
func (i *scaleUpdatedObjectInfo) Preconditions() *metav1.Preconditions {
261+
return i.reqObjInfo.Preconditions()
262+
}
263+
264+
func (i *scaleUpdatedObjectInfo) UpdatedObject(ctx context.Context, oldObj runtime.Object) (runtime.Object, error) {
265+
replicaset, ok := oldObj.DeepCopyObject().(*apps.ReplicaSet)
266+
if !ok {
267+
return nil, errors.NewBadRequest(fmt.Sprintf("expected existing object type to be ReplicaSet, got %T", replicaset))
268+
}
269+
// if zero-value, the existing object does not exist
270+
if len(replicaset.ResourceVersion) == 0 {
271+
return nil, errors.NewNotFound(apps.Resource("replicasets/scale"), i.name)
272+
}
273+
274+
// replicaset -> old scale
275+
oldScale, err := scaleFromReplicaSet(replicaset)
276+
if err != nil {
277+
return nil, err
278+
}
279+
280+
// old scale -> new scale
281+
newScaleObj, err := i.reqObjInfo.UpdatedObject(ctx, oldScale)
282+
if err != nil {
283+
return nil, err
284+
}
285+
if newScaleObj == nil {
286+
return nil, errors.NewBadRequest("nil update passed to Scale")
287+
}
288+
scale, ok := newScaleObj.(*autoscaling.Scale)
289+
if !ok {
290+
return nil, errors.NewBadRequest(fmt.Sprintf("expected input object type to be Scale, but %T", newScaleObj))
291+
}
292+
293+
// validate
294+
if errs := autoscalingvalidation.ValidateScale(scale); len(errs) > 0 {
295+
return nil, errors.NewInvalid(autoscaling.Kind("Scale"), replicaset.Name, errs)
296+
}
297+
298+
// validate precondition if specified (resourceVersion matching is handled by storage)
299+
if len(scale.UID) > 0 && scale.UID != replicaset.UID {
300+
return nil, errors.NewConflict(
301+
apps.Resource("replicasets/scale"),
302+
replicaset.Name,
303+
fmt.Errorf("Precondition failed: UID in precondition: %v, UID in object meta: %v", scale.UID, replicaset.UID),
304+
)
305+
}
306+
307+
// move replicas/resourceVersion fields to object and return
308+
replicaset.Spec.Replicas = scale.Spec.Replicas
309+
replicaset.ResourceVersion = scale.ResourceVersion
310+
return replicaset, nil
311+
}

0 commit comments

Comments
 (0)