Skip to content

Commit dc1914c

Browse files
committed
ReplicaSet controller should count terminating pods in the status
1 parent 2843779 commit dc1914c

File tree

6 files changed

+375
-32
lines changed

6 files changed

+375
-32
lines changed

pkg/controller/controller_utils.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -973,6 +973,21 @@ func compareMaxContainerRestarts(pi *v1.Pod, pj *v1.Pod) *bool {
973973
return nil
974974
}
975975

976+
// FilterClaimedPods returns pods that are controlled by the controller and match the selector.
977+
func FilterClaimedPods(controller metav1.Object, selector labels.Selector, pods []*v1.Pod) []*v1.Pod {
978+
var result []*v1.Pod
979+
for _, pod := range pods {
980+
if !metav1.IsControlledBy(pod, controller) {
981+
// It's an orphan or owned by someone else.
982+
continue
983+
}
984+
if selector.Matches(labels.Set(pod.Labels)) {
985+
result = append(result, pod)
986+
}
987+
}
988+
return result
989+
}
990+
976991
// FilterActivePods returns pods that have not terminated.
977992
func FilterActivePods(logger klog.Logger, pods []*v1.Pod) []*v1.Pod {
978993
var result []*v1.Pod

pkg/controller/controller_utils_test.go

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ import (
5555
"k8s.io/kubernetes/test/utils/ktesting"
5656
testingclock "k8s.io/utils/clock/testing"
5757
"k8s.io/utils/pointer"
58+
"k8s.io/utils/ptr"
5859

5960
"github.com/google/go-cmp/cmp"
6061
"github.com/stretchr/testify/assert"
@@ -433,6 +434,128 @@ func TestCountTerminatingPods(t *testing.T) {
433434
assert.Len(t, terminatingList, int(2))
434435
}
435436

437+
func TestClaimedPodFiltering(t *testing.T) {
438+
rsUUID := uuid.NewUUID()
439+
440+
type podData struct {
441+
podName string
442+
ownerReferences []metav1.OwnerReference
443+
labels map[string]string
444+
}
445+
446+
type test struct {
447+
name string
448+
pods []podData
449+
wantPodNames []string
450+
}
451+
452+
tests := []test{
453+
{
454+
name: "Filters claimed pods",
455+
pods: []podData{
456+
// single owner reference
457+
{podName: "claimed-1", labels: map[string]string{"foo": "bar"}, ownerReferences: []metav1.OwnerReference{
458+
{UID: rsUUID, Controller: ptr.To(true)},
459+
}},
460+
{podName: "wrong-selector-1", labels: map[string]string{"foo": "baz"}, ownerReferences: []metav1.OwnerReference{
461+
{UID: rsUUID, Controller: ptr.To(true)},
462+
}},
463+
{podName: "non-controller-1", labels: map[string]string{"foo": "bar"}, ownerReferences: []metav1.OwnerReference{
464+
{UID: rsUUID, Controller: nil},
465+
}},
466+
{podName: "other-controller-1", labels: map[string]string{"foo": "bar"}, ownerReferences: []metav1.OwnerReference{
467+
{UID: uuid.NewUUID(), Controller: ptr.To(true)},
468+
}},
469+
{podName: "other-workload-1", labels: map[string]string{"foo": "bee"}, ownerReferences: []metav1.OwnerReference{
470+
{UID: uuid.NewUUID(), Controller: ptr.To(true)},
471+
}},
472+
{podName: "standalone-pod-1", labels: map[string]string{"foo": "beetle"}, ownerReferences: []metav1.OwnerReference{}},
473+
// additional controller owner reference set to controller=false
474+
{podName: "claimed-2", labels: map[string]string{"foo": "bar"}, ownerReferences: []metav1.OwnerReference{
475+
{UID: uuid.NewUUID(), Controller: ptr.To(false)},
476+
{UID: rsUUID, Controller: ptr.To(true)},
477+
}},
478+
{podName: "wrong-selector-2", labels: map[string]string{"foo": "baz"}, ownerReferences: []metav1.OwnerReference{
479+
{UID: uuid.NewUUID(), Controller: ptr.To(false)},
480+
{UID: rsUUID, Controller: ptr.To(true)},
481+
}},
482+
{podName: "non-controller-2", labels: map[string]string{"foo": "bar"}, ownerReferences: []metav1.OwnerReference{
483+
{UID: uuid.NewUUID(), Controller: ptr.To(false)},
484+
{UID: rsUUID, Controller: ptr.To(false)},
485+
}},
486+
{podName: "other-controller-2", labels: map[string]string{"foo": "bar"}, ownerReferences: []metav1.OwnerReference{
487+
{UID: uuid.NewUUID(), Controller: ptr.To(false)},
488+
{UID: uuid.NewUUID(), Controller: ptr.To(true)},
489+
}},
490+
{podName: "other-workload-1", labels: map[string]string{"foo": "bee"}, ownerReferences: []metav1.OwnerReference{
491+
{UID: uuid.NewUUID(), Controller: ptr.To(false)},
492+
{UID: uuid.NewUUID(), Controller: ptr.To(true)},
493+
}},
494+
{podName: "standalone-pod-1", labels: map[string]string{"foo": "beetle"}, ownerReferences: []metav1.OwnerReference{
495+
{UID: uuid.NewUUID(), Controller: ptr.To(false)},
496+
}},
497+
// additional controller owner reference set to controller=nil
498+
{podName: "claimed-3", labels: map[string]string{"foo": "bar"}, ownerReferences: []metav1.OwnerReference{
499+
{UID: uuid.NewUUID()},
500+
{UID: rsUUID, Controller: ptr.To(true)},
501+
}},
502+
{podName: "wrong-selector-3", labels: nil, ownerReferences: []metav1.OwnerReference{
503+
{UID: uuid.NewUUID()},
504+
{UID: rsUUID, Controller: ptr.To(true)},
505+
}},
506+
{podName: "non-controller-3", labels: map[string]string{"foo": "bar"}, ownerReferences: []metav1.OwnerReference{
507+
{UID: uuid.NewUUID()},
508+
{UID: rsUUID, Controller: nil},
509+
}},
510+
{podName: "other-controller-3", labels: map[string]string{"foo": "bar"}, ownerReferences: []metav1.OwnerReference{
511+
{UID: uuid.NewUUID()},
512+
{UID: uuid.NewUUID(), Controller: ptr.To(true)},
513+
}},
514+
{podName: "other-workload-1", labels: map[string]string{"foo": "bee"}, ownerReferences: []metav1.OwnerReference{
515+
{UID: uuid.NewUUID()},
516+
}},
517+
{podName: "standalone-pod-1", labels: map[string]string{"foo": "beetle"}, ownerReferences: []metav1.OwnerReference{
518+
{UID: uuid.NewUUID()},
519+
}},
520+
},
521+
wantPodNames: []string{"claimed-1", "claimed-2", "claimed-3"},
522+
},
523+
}
524+
525+
for _, test := range tests {
526+
t.Run(test.name, func(t *testing.T) {
527+
// This rc is not needed by the test, only the newPodList to give the pods labels/a namespace.
528+
rs := newReplicaSet("test-claim", 3, rsUUID)
529+
var pods []*v1.Pod
530+
for _, p := range test.pods {
531+
pods = append(pods, &v1.Pod{
532+
ObjectMeta: metav1.ObjectMeta{
533+
Name: p.podName,
534+
Namespace: rs.Namespace,
535+
Labels: p.labels,
536+
OwnerReferences: p.ownerReferences,
537+
},
538+
Status: v1.PodStatus{Phase: v1.PodRunning},
539+
})
540+
}
541+
542+
selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
543+
if err != nil {
544+
t.Fatalf("Couldn't get selector for object %#v: %v", rs, err)
545+
}
546+
got := FilterClaimedPods(rs, selector, pods)
547+
gotNames := sets.NewString()
548+
for _, pod := range got {
549+
gotNames.Insert(pod.Name)
550+
}
551+
552+
if diff := cmp.Diff(test.wantPodNames, gotNames.List()); diff != "" {
553+
t.Errorf("Active pod names (-want,+got):\n%s", diff)
554+
}
555+
})
556+
}
557+
}
558+
436559
func TestActivePodFiltering(t *testing.T) {
437560
logger, _ := ktesting.NewTestContext(t)
438561
type podData struct {

pkg/controller/replicaset/replica_set.go

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ import (
4545
"k8s.io/apimachinery/pkg/types"
4646
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
4747
"k8s.io/apimachinery/pkg/util/wait"
48+
utilfeature "k8s.io/apiserver/pkg/util/feature"
4849
appsinformers "k8s.io/client-go/informers/apps/v1"
4950
coreinformers "k8s.io/client-go/informers/core/v1"
5051
clientset "k8s.io/client-go/kubernetes"
@@ -60,6 +61,7 @@ import (
6061
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
6162
"k8s.io/kubernetes/pkg/controller"
6263
"k8s.io/kubernetes/pkg/controller/replicaset/metrics"
64+
"k8s.io/kubernetes/pkg/features"
6365
)
6466

6567
const (
@@ -564,10 +566,10 @@ func (rsc *ReplicaSetController) processNextWorkItem(ctx context.Context) bool {
564566
}
565567

566568
// manageReplicas checks and updates replicas for the given ReplicaSet.
567-
// Does NOT modify <filteredPods>.
569+
// Does NOT modify <activePods>.
568570
// It will requeue the replica set in case of an error while creating/deleting pods.
569-
func (rsc *ReplicaSetController) manageReplicas(ctx context.Context, filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
570-
diff := len(filteredPods) - int(*(rs.Spec.Replicas))
571+
func (rsc *ReplicaSetController) manageReplicas(ctx context.Context, activePods []*v1.Pod, rs *apps.ReplicaSet) error {
572+
diff := len(activePods) - int(*(rs.Spec.Replicas))
571573
rsKey, err := controller.KeyFunc(rs)
572574
if err != nil {
573575
utilruntime.HandleError(fmt.Errorf("couldn't get key for %v %#v: %v", rsc.Kind, rs, err))
@@ -627,7 +629,7 @@ func (rsc *ReplicaSetController) manageReplicas(ctx context.Context, filteredPod
627629
utilruntime.HandleError(err)
628630

629631
// Choose which Pods to delete, preferring those in earlier phases of startup.
630-
podsToDelete := getPodsToDelete(filteredPods, relatedPods, diff)
632+
podsToDelete := getPodsToDelete(activePods, relatedPods, diff)
631633

632634
// Snapshot the UIDs (ns/name) of the pods we're expecting to see
633635
// deleted, so we know to record their expectations exactly once either
@@ -707,22 +709,27 @@ func (rsc *ReplicaSetController) syncReplicaSet(ctx context.Context, key string)
707709
if err != nil {
708710
return err
709711
}
710-
// Ignore inactive pods.
711-
filteredPods := controller.FilterActivePods(logger, allPods)
712712

713-
// NOTE: filteredPods are pointing to objects from cache - if you need to
713+
// NOTE: activePods and terminatingPods are pointing to objects from cache - if you need to
714714
// modify them, you need to copy it first.
715-
filteredPods, err = rsc.claimPods(ctx, rs, selector, filteredPods)
715+
allActivePods := controller.FilterActivePods(logger, allPods)
716+
activePods, err := rsc.claimPods(ctx, rs, selector, allActivePods)
716717
if err != nil {
717718
return err
718719
}
719720

721+
var terminatingPods []*v1.Pod
722+
if utilfeature.DefaultFeatureGate.Enabled(features.DeploymentPodReplacementPolicy) {
723+
allTerminatingPods := controller.FilterTerminatingPods(allPods)
724+
terminatingPods = controller.FilterClaimedPods(rs, selector, allTerminatingPods)
725+
}
726+
720727
var manageReplicasErr error
721728
if rsNeedsSync && rs.DeletionTimestamp == nil {
722-
manageReplicasErr = rsc.manageReplicas(ctx, filteredPods, rs)
729+
manageReplicasErr = rsc.manageReplicas(ctx, activePods, rs)
723730
}
724731
rs = rs.DeepCopy()
725-
newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)
732+
newStatus := calculateStatus(rs, activePods, terminatingPods, manageReplicasErr)
726733

727734
// Always updates status as pods come up or die.
728735
updatedRS, err := updateReplicaSetStatus(logger, rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)

pkg/controller/replicaset/replica_set_utils.go

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,11 @@ import (
2929
v1 "k8s.io/api/core/v1"
3030
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3131
"k8s.io/apimachinery/pkg/labels"
32+
utilfeature "k8s.io/apiserver/pkg/util/feature"
3233
appsclient "k8s.io/client-go/kubernetes/typed/apps/v1"
3334
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
35+
"k8s.io/kubernetes/pkg/features"
36+
"k8s.io/utils/ptr"
3437
)
3538

3639
// updateReplicaSetStatus attempts to update the Status.Replicas of the given ReplicaSet, with a single GET/PUT retry.
@@ -42,6 +45,7 @@ func updateReplicaSetStatus(logger klog.Logger, c appsclient.ReplicaSetInterface
4245
rs.Status.FullyLabeledReplicas == newStatus.FullyLabeledReplicas &&
4346
rs.Status.ReadyReplicas == newStatus.ReadyReplicas &&
4447
rs.Status.AvailableReplicas == newStatus.AvailableReplicas &&
48+
ptr.Equal(rs.Status.TerminatingReplicas, newStatus.TerminatingReplicas) &&
4549
rs.Generation == rs.Status.ObservedGeneration &&
4650
reflect.DeepEqual(rs.Status.Conditions, newStatus.Conditions) {
4751
return rs, nil
@@ -56,11 +60,16 @@ func updateReplicaSetStatus(logger klog.Logger, c appsclient.ReplicaSetInterface
5660
var getErr, updateErr error
5761
var updatedRS *apps.ReplicaSet
5862
for i, rs := 0, rs; ; i++ {
63+
terminatingReplicasUpdateInfo := ""
64+
if utilfeature.DefaultFeatureGate.Enabled(features.DeploymentPodReplacementPolicy) {
65+
terminatingReplicasUpdateInfo = fmt.Sprintf("terminatingReplicas %s->%s, ", derefInt32ToStr(rs.Status.TerminatingReplicas), derefInt32ToStr(newStatus.TerminatingReplicas))
66+
}
5967
logger.V(4).Info(fmt.Sprintf("Updating status for %v: %s/%s, ", rs.Kind, rs.Namespace, rs.Name) +
6068
fmt.Sprintf("replicas %d->%d (need %d), ", rs.Status.Replicas, newStatus.Replicas, *(rs.Spec.Replicas)) +
6169
fmt.Sprintf("fullyLabeledReplicas %d->%d, ", rs.Status.FullyLabeledReplicas, newStatus.FullyLabeledReplicas) +
6270
fmt.Sprintf("readyReplicas %d->%d, ", rs.Status.ReadyReplicas, newStatus.ReadyReplicas) +
6371
fmt.Sprintf("availableReplicas %d->%d, ", rs.Status.AvailableReplicas, newStatus.AvailableReplicas) +
72+
terminatingReplicasUpdateInfo +
6473
fmt.Sprintf("sequence No: %v->%v", rs.Status.ObservedGeneration, newStatus.ObservedGeneration))
6574

6675
rs.Status = newStatus
@@ -83,18 +92,18 @@ func updateReplicaSetStatus(logger klog.Logger, c appsclient.ReplicaSetInterface
8392
return nil, updateErr
8493
}
8594

86-
func calculateStatus(rs *apps.ReplicaSet, filteredPods []*v1.Pod, manageReplicasErr error) apps.ReplicaSetStatus {
95+
func calculateStatus(rs *apps.ReplicaSet, activePods []*v1.Pod, terminatingPods []*v1.Pod, manageReplicasErr error) apps.ReplicaSetStatus {
8796
newStatus := rs.Status
8897
// Count the number of pods that have labels matching the labels of the pod
8998
// template of the replica set, the matching pods may have more
9099
// labels than are in the template. Because the label of podTemplateSpec is
91100
// a superset of the selector of the replica set, so the possible
92-
// matching pods must be part of the filteredPods.
101+
// matching pods must be part of the activePods.
93102
fullyLabeledReplicasCount := 0
94103
readyReplicasCount := 0
95104
availableReplicasCount := 0
96105
templateLabel := labels.Set(rs.Spec.Template.Labels).AsSelectorPreValidated()
97-
for _, pod := range filteredPods {
106+
for _, pod := range activePods {
98107
if templateLabel.Matches(labels.Set(pod.Labels)) {
99108
fullyLabeledReplicasCount++
100109
}
@@ -106,10 +115,15 @@ func calculateStatus(rs *apps.ReplicaSet, filteredPods []*v1.Pod, manageReplicas
106115
}
107116
}
108117

118+
var terminatingReplicasCount *int32
119+
if utilfeature.DefaultFeatureGate.Enabled(features.DeploymentPodReplacementPolicy) {
120+
terminatingReplicasCount = ptr.To(int32(len(terminatingPods)))
121+
}
122+
109123
failureCond := GetCondition(rs.Status, apps.ReplicaSetReplicaFailure)
110124
if manageReplicasErr != nil && failureCond == nil {
111125
var reason string
112-
if diff := len(filteredPods) - int(*(rs.Spec.Replicas)); diff < 0 {
126+
if diff := len(activePods) - int(*(rs.Spec.Replicas)); diff < 0 {
113127
reason = "FailedCreate"
114128
} else if diff > 0 {
115129
reason = "FailedDelete"
@@ -120,10 +134,11 @@ func calculateStatus(rs *apps.ReplicaSet, filteredPods []*v1.Pod, manageReplicas
120134
RemoveCondition(&newStatus, apps.ReplicaSetReplicaFailure)
121135
}
122136

123-
newStatus.Replicas = int32(len(filteredPods))
137+
newStatus.Replicas = int32(len(activePods))
124138
newStatus.FullyLabeledReplicas = int32(fullyLabeledReplicasCount)
125139
newStatus.ReadyReplicas = int32(readyReplicasCount)
126140
newStatus.AvailableReplicas = int32(availableReplicasCount)
141+
newStatus.TerminatingReplicas = terminatingReplicasCount
127142
return newStatus
128143
}
129144

@@ -175,3 +190,10 @@ func filterOutCondition(conditions []apps.ReplicaSetCondition, condType apps.Rep
175190
}
176191
return newConditions
177192
}
193+
194+
func derefInt32ToStr(ptr *int32) string {
195+
if ptr == nil {
196+
return "nil"
197+
}
198+
return fmt.Sprintf("%d", *ptr)
199+
}

0 commit comments

Comments
 (0)