Skip to content

Commit b2fb0f7

Browse files
authored
Merge pull request kubernetes#82572 from tnozicka/fix-rs-expectations
Fix RS expectations for recreate case
2 parents 6d0994f + ce52643 commit b2fb0f7

File tree

4 files changed

+336
-59
lines changed

4 files changed

+336
-59
lines changed

pkg/controller/replicaset/BUILD

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ go_library(
4646
go_test(
4747
name = "go_default_test",
4848
srcs = [
49+
"init_test.go",
4950
"replica_set_test.go",
5051
"replica_set_utils_test.go",
5152
],
@@ -72,6 +73,7 @@ go_test(
7273
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
7374
"//staging/src/k8s.io/client-go/util/testing:go_default_library",
7475
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
76+
"//vendor/k8s.io/klog:go_default_library",
7577
],
7678
)
7779

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package replicaset
18+
19+
import (
20+
"k8s.io/klog"
21+
)
22+
23+
func init() {
24+
klog.InitFlags(nil)
25+
}

pkg/controller/replicaset/replica_set.go

Lines changed: 83 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -140,12 +140,9 @@ func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer
140140
}
141141

142142
rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
143-
AddFunc: rsc.enqueueReplicaSet,
143+
AddFunc: rsc.addRS,
144144
UpdateFunc: rsc.updateRS,
145-
// This will enter the sync loop and no-op, because the replica set has been deleted from the store.
146-
// Note that deleting a replica set immediately after scaling it to 0 will not work. The recommended
147-
// way of achieving this is by performing a `stop` operation on the replica set.
148-
DeleteFunc: rsc.enqueueReplicaSet,
145+
DeleteFunc: rsc.deleteRS,
149146
})
150147
rsc.rsLister = rsInformer.Lister()
151148
rsc.rsListerSynced = rsInformer.Informer().HasSynced
@@ -266,11 +263,50 @@ func (rsc *ReplicaSetController) resolveControllerRef(namespace string, controll
266263
return rs
267264
}
268265

266+
func (rsc *ReplicaSetController) enqueueRS(rs *apps.ReplicaSet) {
267+
key, err := controller.KeyFunc(rs)
268+
if err != nil {
269+
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err))
270+
return
271+
}
272+
273+
rsc.queue.Add(key)
274+
}
275+
276+
func (rsc *ReplicaSetController) enqueueRSAfter(rs *apps.ReplicaSet, duration time.Duration) {
277+
key, err := controller.KeyFunc(rs)
278+
if err != nil {
279+
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err))
280+
return
281+
}
282+
283+
rsc.queue.AddAfter(key, duration)
284+
}
285+
286+
func (rsc *ReplicaSetController) addRS(obj interface{}) {
287+
rs := obj.(*apps.ReplicaSet)
288+
klog.V(4).Infof("Adding %s %s/%s", rsc.Kind, rs.Namespace, rs.Name)
289+
rsc.enqueueRS(rs)
290+
}
291+
269292
// callback when RS is updated
270293
func (rsc *ReplicaSetController) updateRS(old, cur interface{}) {
271294
oldRS := old.(*apps.ReplicaSet)
272295
curRS := cur.(*apps.ReplicaSet)
273296

297+
// TODO: make a KEP and fix informers to always call the delete event handler on re-create
298+
if curRS.UID != oldRS.UID {
299+
key, err := controller.KeyFunc(oldRS)
300+
if err != nil {
301+
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", oldRS, err))
302+
return
303+
}
304+
rsc.deleteRS(cache.DeletedFinalStateUnknown{
305+
Key: key,
306+
Obj: oldRS,
307+
})
308+
}
309+
274310
// You might imagine that we only really need to enqueue the
275311
// replica set when Spec changes, but it is safer to sync any
276312
// time this function is triggered. That way a full informer
@@ -286,7 +322,36 @@ func (rsc *ReplicaSetController) updateRS(old, cur interface{}) {
286322
if *(oldRS.Spec.Replicas) != *(curRS.Spec.Replicas) {
287323
klog.V(4).Infof("%v %v updated. Desired pod count change: %d->%d", rsc.Kind, curRS.Name, *(oldRS.Spec.Replicas), *(curRS.Spec.Replicas))
288324
}
289-
rsc.enqueueReplicaSet(cur)
325+
rsc.enqueueRS(curRS)
326+
}
327+
328+
func (rsc *ReplicaSetController) deleteRS(obj interface{}) {
329+
rs, ok := obj.(*apps.ReplicaSet)
330+
if !ok {
331+
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
332+
if !ok {
333+
utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
334+
return
335+
}
336+
rs, ok = tombstone.Obj.(*apps.ReplicaSet)
337+
if !ok {
338+
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a ReplicaSet %#v", obj))
339+
return
340+
}
341+
}
342+
343+
key, err := controller.KeyFunc(rs)
344+
if err != nil {
345+
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err))
346+
return
347+
}
348+
349+
klog.V(4).Infof("Deleting %s %q", rsc.Kind, key)
350+
351+
// Delete expectations for the ReplicaSet so if we create a new one with the same name it starts clean
352+
rsc.expectations.DeleteExpectations(key)
353+
354+
rsc.queue.Add(key)
290355
}
291356

292357
// When a pod is created, enqueue the replica set that manages it and update its expectations.
@@ -312,7 +377,7 @@ func (rsc *ReplicaSetController) addPod(obj interface{}) {
312377
}
313378
klog.V(4).Infof("Pod %s created: %#v.", pod.Name, pod)
314379
rsc.expectations.CreationObserved(rsKey)
315-
rsc.enqueueReplicaSet(rs)
380+
rsc.queue.Add(rsKey)
316381
return
317382
}
318383

@@ -326,7 +391,7 @@ func (rsc *ReplicaSetController) addPod(obj interface{}) {
326391
}
327392
klog.V(4).Infof("Orphan Pod %s created: %#v.", pod.Name, pod)
328393
for _, rs := range rss {
329-
rsc.enqueueReplicaSet(rs)
394+
rsc.enqueueRS(rs)
330395
}
331396
}
332397

@@ -363,7 +428,7 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) {
363428
if controllerRefChanged && oldControllerRef != nil {
364429
// The ControllerRef was changed. Sync the old controller, if any.
365430
if rs := rsc.resolveControllerRef(oldPod.Namespace, oldControllerRef); rs != nil {
366-
rsc.enqueueReplicaSet(rs)
431+
rsc.enqueueRS(rs)
367432
}
368433
}
369434

@@ -374,7 +439,7 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) {
374439
return
375440
}
376441
klog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
377-
rsc.enqueueReplicaSet(rs)
442+
rsc.enqueueRS(rs)
378443
// TODO: MinReadySeconds in the Pod will generate an Available condition to be added in
379444
// the Pod status which in turn will trigger a requeue of the owning replica set thus
380445
// having its status updated with the newly available replica. For now, we can fake the
@@ -386,7 +451,7 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) {
386451
klog.V(2).Infof("%v %q will be enqueued after %ds for availability check", rsc.Kind, rs.Name, rs.Spec.MinReadySeconds)
387452
// Add a second to avoid milliseconds skew in AddAfter.
388453
// See https://github.com/kubernetes/kubernetes/issues/39785#issuecomment-279959133 for more info.
389-
rsc.enqueueReplicaSetAfter(rs, (time.Duration(rs.Spec.MinReadySeconds)*time.Second)+time.Second)
454+
rsc.enqueueRSAfter(rs, (time.Duration(rs.Spec.MinReadySeconds)*time.Second)+time.Second)
390455
}
391456
return
392457
}
@@ -400,7 +465,7 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) {
400465
}
401466
klog.V(4).Infof("Orphan Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
402467
for _, rs := range rss {
403-
rsc.enqueueReplicaSet(rs)
468+
rsc.enqueueRS(rs)
404469
}
405470
}
406471
}
@@ -438,31 +503,12 @@ func (rsc *ReplicaSetController) deletePod(obj interface{}) {
438503
}
439504
rsKey, err := controller.KeyFunc(rs)
440505
if err != nil {
506+
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err))
441507
return
442508
}
443509
klog.V(4).Infof("Pod %s/%s deleted through %v, timestamp %+v: %#v.", pod.Namespace, pod.Name, utilruntime.GetCaller(), pod.DeletionTimestamp, pod)
444510
rsc.expectations.DeletionObserved(rsKey, controller.PodKey(pod))
445-
rsc.enqueueReplicaSet(rs)
446-
}
447-
448-
// obj could be an *apps.ReplicaSet, or a DeletionFinalStateUnknown marker item.
449-
func (rsc *ReplicaSetController) enqueueReplicaSet(obj interface{}) {
450-
key, err := controller.KeyFunc(obj)
451-
if err != nil {
452-
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
453-
return
454-
}
455-
rsc.queue.Add(key)
456-
}
457-
458-
// obj could be an *apps.ReplicaSet, or a DeletionFinalStateUnknown marker item.
459-
func (rsc *ReplicaSetController) enqueueReplicaSetAfter(obj interface{}, after time.Duration) {
460-
key, err := controller.KeyFunc(obj)
461-
if err != nil {
462-
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
463-
return
464-
}
465-
rsc.queue.AddAfter(key, after)
511+
rsc.queue.Add(rsKey)
466512
}
467513

468514
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
@@ -485,7 +531,7 @@ func (rsc *ReplicaSetController) processNextWorkItem() bool {
485531
return true
486532
}
487533

488-
utilruntime.HandleError(fmt.Errorf("Sync %q failed with %v", key, err))
534+
utilruntime.HandleError(fmt.Errorf("sync %q failed with %v", key, err))
489535
rsc.queue.AddRateLimited(key)
490536

491537
return true
@@ -498,7 +544,7 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps
498544
diff := len(filteredPods) - int(*(rs.Spec.Replicas))
499545
rsKey, err := controller.KeyFunc(rs)
500546
if err != nil {
501-
utilruntime.HandleError(fmt.Errorf("Couldn't get key for %v %#v: %v", rsc.Kind, rs, err))
547+
utilruntime.HandleError(fmt.Errorf("couldn't get key for %v %#v: %v", rsc.Kind, rs, err))
502548
return nil
503549
}
504550
if diff < 0 {
@@ -608,7 +654,6 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps
608654
// meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be
609655
// invoked concurrently with the same key.
610656
func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
611-
612657
startTime := time.Now()
613658
defer func() {
614659
klog.V(4).Infof("Finished syncing %v %q (%v)", rsc.Kind, key, time.Since(startTime))
@@ -631,7 +676,7 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
631676
rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)
632677
selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
633678
if err != nil {
634-
utilruntime.HandleError(fmt.Errorf("Error converting pod selector to selector: %v", err))
679+
utilruntime.HandleError(fmt.Errorf("error converting pod selector to selector: %v", err))
635680
return nil
636681
}
637682

@@ -670,7 +715,7 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
670715
if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&
671716
updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&
672717
updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {
673-
rsc.enqueueReplicaSetAfter(updatedRS, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
718+
rsc.queue.AddAfter(key, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
674719
}
675720
return manageReplicasErr
676721
}

0 commit comments

Comments
 (0)