Skip to content

Commit f1883c9

Browse files
mortentk8s-ci-robot
authored andcommitted
Support scale subresource for PDBs (kubernetes#76294)
* Support scale subresource for PDBs * Check group in finder functions * Small fixes and more tests
1 parent cdff17a commit f1883c9

File tree

16 files changed

+808
-50
lines changed

16 files changed

+808
-50
lines changed

cmd/kube-controller-manager/app/policy.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ package app
2222

2323
import (
2424
"k8s.io/apimachinery/pkg/runtime/schema"
25+
"k8s.io/client-go/dynamic"
26+
"k8s.io/client-go/scale"
2527
"k8s.io/kubernetes/pkg/controller/disruption"
2628

2729
"net/http"
@@ -40,14 +42,25 @@ func startDisruptionController(ctx ControllerContext) (http.Handler, bool, error
4042
resource, group+"/"+version)
4143
return nil, false, nil
4244
}
45+
46+
client := ctx.ClientBuilder.ClientOrDie("disruption-controller")
47+
config := ctx.ClientBuilder.ConfigOrDie("disruption-controller")
48+
scaleKindResolver := scale.NewDiscoveryScaleKindResolver(client.Discovery())
49+
scaleClient, err := scale.NewForConfig(config, ctx.RESTMapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)
50+
if err != nil {
51+
return nil, false, err
52+
}
53+
4354
go disruption.NewDisruptionController(
4455
ctx.InformerFactory.Core().V1().Pods(),
4556
ctx.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(),
4657
ctx.InformerFactory.Core().V1().ReplicationControllers(),
4758
ctx.InformerFactory.Apps().V1().ReplicaSets(),
4859
ctx.InformerFactory.Apps().V1().Deployments(),
4960
ctx.InformerFactory.Apps().V1().StatefulSets(),
50-
ctx.ClientBuilder.ClientOrDie("disruption-controller"),
61+
client,
62+
ctx.RESTMapper,
63+
scaleClient,
5164
).Run(ctx.Stop)
5265
return nil, true, nil
5366
}

pkg/controller/disruption/BUILD

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ go_library(
1919
"//staging/src/k8s.io/api/policy/v1beta1:go_default_library",
2020
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
2121
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
22+
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
2223
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
24+
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
2325
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
2426
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
2527
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
@@ -34,6 +36,7 @@ go_library(
3436
"//staging/src/k8s.io/client-go/listers/apps/v1:go_default_library",
3537
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
3638
"//staging/src/k8s.io/client-go/listers/policy/v1beta1:go_default_library",
39+
"//staging/src/k8s.io/client-go/scale:go_default_library",
3740
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
3841
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
3942
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
@@ -49,13 +52,20 @@ go_test(
4952
"//pkg/apis/core/install:go_default_library",
5053
"//pkg/controller:go_default_library",
5154
"//staging/src/k8s.io/api/apps/v1:go_default_library",
55+
"//staging/src/k8s.io/api/autoscaling/v1:go_default_library",
5256
"//staging/src/k8s.io/api/core/v1:go_default_library",
5357
"//staging/src/k8s.io/api/policy/v1beta1:go_default_library",
5458
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
59+
"//staging/src/k8s.io/apimachinery/pkg/api/meta/testrestmapper:go_default_library",
5560
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
61+
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
62+
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
63+
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
5664
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
5765
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
5866
"//staging/src/k8s.io/client-go/informers:go_default_library",
67+
"//staging/src/k8s.io/client-go/scale/fake:go_default_library",
68+
"//staging/src/k8s.io/client-go/testing:go_default_library",
5969
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
6070
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
6171
"//vendor/github.com/Azure/go-autorest/autorest/to:go_default_library",

pkg/controller/disruption/disruption.go

Lines changed: 105 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@ import (
2626
policy "k8s.io/api/policy/v1beta1"
2727
apiequality "k8s.io/apimachinery/pkg/api/equality"
2828
"k8s.io/apimachinery/pkg/api/errors"
29+
apimeta "k8s.io/apimachinery/pkg/api/meta"
2930
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
31+
"k8s.io/apimachinery/pkg/runtime/schema"
3032
"k8s.io/apimachinery/pkg/types"
3133
"k8s.io/apimachinery/pkg/util/intstr"
3234
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@@ -41,6 +43,7 @@ import (
4143
appsv1listers "k8s.io/client-go/listers/apps/v1"
4244
corelisters "k8s.io/client-go/listers/core/v1"
4345
policylisters "k8s.io/client-go/listers/policy/v1beta1"
46+
scaleclient "k8s.io/client-go/scale"
4447
"k8s.io/client-go/tools/cache"
4548
"k8s.io/client-go/tools/record"
4649
"k8s.io/client-go/util/workqueue"
@@ -67,6 +70,9 @@ type updater func(*policy.PodDisruptionBudget) error
6770

6871
type DisruptionController struct {
6972
kubeClient clientset.Interface
73+
mapper apimeta.RESTMapper
74+
75+
scaleNamespacer scaleclient.ScalesGetter
7076

7177
pdbLister policylisters.PodDisruptionBudgetLister
7278
pdbListerSynced cache.InformerSynced
@@ -105,7 +111,7 @@ type controllerAndScale struct {
105111

106112
// podControllerFinder is a function type that maps a pod to a list of
107113
// controllers and their scale.
108-
type podControllerFinder func(*v1.Pod) (*controllerAndScale, error)
114+
type podControllerFinder func(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error)
109115

110116
func NewDisruptionController(
111117
podInformer coreinformers.PodInformer,
@@ -115,6 +121,8 @@ func NewDisruptionController(
115121
dInformer appsv1informers.DeploymentInformer,
116122
ssInformer appsv1informers.StatefulSetInformer,
117123
kubeClient clientset.Interface,
124+
restMapper apimeta.RESTMapper,
125+
scaleNamespacer scaleclient.ScalesGetter,
118126
) *DisruptionController {
119127
dc := &DisruptionController{
120128
kubeClient: kubeClient,
@@ -157,19 +165,19 @@ func NewDisruptionController(
157165
dc.ssLister = ssInformer.Lister()
158166
dc.ssListerSynced = ssInformer.Informer().HasSynced
159167

168+
dc.mapper = restMapper
169+
dc.scaleNamespacer = scaleNamespacer
170+
160171
return dc
161172
}
162173

163-
// TODO(mml): When controllerRef is implemented (#2210), we *could* simply
164-
// return controllers without their scales, and access scale type-generically
165-
// via the scale subresource. That may not be as much of a win as it sounds,
166-
// however. We are accessing everything through the pkg/client/cache API that
167-
// we have to set up and tune to the types we know we'll be accessing anyway,
168-
// and we may well need further tweaks just to be able to access scale
169-
// subresources.
174+
// The workload resources do implement the scale subresource, so it would
175+
// be possible to only check the scale subresource here. But since there is no
176+
// way to take advantage of listers with scale subresources, we use the workload
177+
// resources directly and only fall back to the scale subresource when needed.
170178
func (dc *DisruptionController) finders() []podControllerFinder {
171179
return []podControllerFinder{dc.getPodReplicationController, dc.getPodDeployment, dc.getPodReplicaSet,
172-
dc.getPodStatefulSet}
180+
dc.getPodStatefulSet, dc.getScaleController}
173181
}
174182

175183
var (
@@ -180,15 +188,12 @@ var (
180188
)
181189

182190
// getPodReplicaSet finds a replicaset which has no matching deployments.
183-
func (dc *DisruptionController) getPodReplicaSet(pod *v1.Pod) (*controllerAndScale, error) {
184-
controllerRef := metav1.GetControllerOf(pod)
185-
if controllerRef == nil {
186-
return nil, nil
191+
func (dc *DisruptionController) getPodReplicaSet(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) {
192+
ok, err := verifyGroupKind(controllerRef, controllerKindRS.Kind, []string{"apps", "extensions"})
193+
if !ok || err != nil {
194+
return nil, err
187195
}
188-
if controllerRef.Kind != controllerKindRS.Kind {
189-
return nil, nil
190-
}
191-
rs, err := dc.rsLister.ReplicaSets(pod.Namespace).Get(controllerRef.Name)
196+
rs, err := dc.rsLister.ReplicaSets(namespace).Get(controllerRef.Name)
192197
if err != nil {
193198
// The only possible error is NotFound, which is ok here.
194199
return nil, nil
@@ -204,16 +209,13 @@ func (dc *DisruptionController) getPodReplicaSet(pod *v1.Pod) (*controllerAndSca
204209
return &controllerAndScale{rs.UID, *(rs.Spec.Replicas)}, nil
205210
}
206211

207-
// getPodStatefulSet returns the statefulset managing the given pod.
208-
func (dc *DisruptionController) getPodStatefulSet(pod *v1.Pod) (*controllerAndScale, error) {
209-
controllerRef := metav1.GetControllerOf(pod)
210-
if controllerRef == nil {
211-
return nil, nil
212-
}
213-
if controllerRef.Kind != controllerKindSS.Kind {
214-
return nil, nil
212+
// getPodStatefulSet returns the statefulset referenced by the provided controllerRef.
213+
func (dc *DisruptionController) getPodStatefulSet(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) {
214+
ok, err := verifyGroupKind(controllerRef, controllerKindSS.Kind, []string{"apps"})
215+
if !ok || err != nil {
216+
return nil, err
215217
}
216-
ss, err := dc.ssLister.StatefulSets(pod.Namespace).Get(controllerRef.Name)
218+
ss, err := dc.ssLister.StatefulSets(namespace).Get(controllerRef.Name)
217219
if err != nil {
218220
// The only possible error is NotFound, which is ok here.
219221
return nil, nil
@@ -226,15 +228,12 @@ func (dc *DisruptionController) getPodStatefulSet(pod *v1.Pod) (*controllerAndSc
226228
}
227229

228230
// getPodDeployments finds deployments for any replicasets which are being managed by deployments.
229-
func (dc *DisruptionController) getPodDeployment(pod *v1.Pod) (*controllerAndScale, error) {
230-
controllerRef := metav1.GetControllerOf(pod)
231-
if controllerRef == nil {
232-
return nil, nil
233-
}
234-
if controllerRef.Kind != controllerKindRS.Kind {
235-
return nil, nil
231+
func (dc *DisruptionController) getPodDeployment(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) {
232+
ok, err := verifyGroupKind(controllerRef, controllerKindRS.Kind, []string{"apps", "extensions"})
233+
if !ok || err != nil {
234+
return nil, err
236235
}
237-
rs, err := dc.rsLister.ReplicaSets(pod.Namespace).Get(controllerRef.Name)
236+
rs, err := dc.rsLister.ReplicaSets(namespace).Get(controllerRef.Name)
238237
if err != nil {
239238
// The only possible error is NotFound, which is ok here.
240239
return nil, nil
@@ -246,8 +245,10 @@ func (dc *DisruptionController) getPodDeployment(pod *v1.Pod) (*controllerAndSca
246245
if controllerRef == nil {
247246
return nil, nil
248247
}
249-
if controllerRef.Kind != controllerKindDep.Kind {
250-
return nil, nil
248+
249+
ok, err = verifyGroupKind(controllerRef, controllerKindDep.Kind, []string{"apps", "extensions"})
250+
if !ok || err != nil {
251+
return nil, err
251252
}
252253
deployment, err := dc.dLister.Deployments(rs.Namespace).Get(controllerRef.Name)
253254
if err != nil {
@@ -260,15 +261,12 @@ func (dc *DisruptionController) getPodDeployment(pod *v1.Pod) (*controllerAndSca
260261
return &controllerAndScale{deployment.UID, *(deployment.Spec.Replicas)}, nil
261262
}
262263

263-
func (dc *DisruptionController) getPodReplicationController(pod *v1.Pod) (*controllerAndScale, error) {
264-
controllerRef := metav1.GetControllerOf(pod)
265-
if controllerRef == nil {
266-
return nil, nil
267-
}
268-
if controllerRef.Kind != controllerKindRC.Kind {
269-
return nil, nil
264+
func (dc *DisruptionController) getPodReplicationController(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) {
265+
ok, err := verifyGroupKind(controllerRef, controllerKindRC.Kind, []string{""})
266+
if !ok || err != nil {
267+
return nil, err
270268
}
271-
rc, err := dc.rcLister.ReplicationControllers(pod.Namespace).Get(controllerRef.Name)
269+
rc, err := dc.rcLister.ReplicationControllers(namespace).Get(controllerRef.Name)
272270
if err != nil {
273271
// The only possible error is NotFound, which is ok here.
274272
return nil, nil
@@ -279,6 +277,55 @@ func (dc *DisruptionController) getPodReplicationController(pod *v1.Pod) (*contr
279277
return &controllerAndScale{rc.UID, *(rc.Spec.Replicas)}, nil
280278
}
281279

280+
func (dc *DisruptionController) getScaleController(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) {
281+
gv, err := schema.ParseGroupVersion(controllerRef.APIVersion)
282+
if err != nil {
283+
return nil, err
284+
}
285+
286+
gk := schema.GroupKind{
287+
Group: gv.Group,
288+
Kind: controllerRef.Kind,
289+
}
290+
291+
mapping, err := dc.mapper.RESTMapping(gk, gv.Version)
292+
if err != nil {
293+
return nil, err
294+
}
295+
gr := mapping.Resource.GroupResource()
296+
297+
scale, err := dc.scaleNamespacer.Scales(namespace).Get(gr, controllerRef.Name)
298+
if err != nil {
299+
if errors.IsNotFound(err) {
300+
return nil, nil
301+
}
302+
return nil, err
303+
}
304+
if scale.UID != controllerRef.UID {
305+
return nil, nil
306+
}
307+
return &controllerAndScale{scale.UID, scale.Spec.Replicas}, nil
308+
}
309+
310+
func verifyGroupKind(controllerRef *metav1.OwnerReference, expectedKind string, expectedGroups []string) (bool, error) {
311+
gv, err := schema.ParseGroupVersion(controllerRef.APIVersion)
312+
if err != nil {
313+
return false, err
314+
}
315+
316+
if controllerRef.Kind != expectedKind {
317+
return false, nil
318+
}
319+
320+
for _, group := range expectedGroups {
321+
if group == gv.Group {
322+
return true, nil
323+
}
324+
}
325+
326+
return false, nil
327+
}
328+
282329
func (dc *DisruptionController) Run(stopCh <-chan struct{}) {
283330
defer utilruntime.HandleCrash()
284331
defer dc.queue.ShutDown()
@@ -583,10 +630,23 @@ func (dc *DisruptionController) getExpectedScale(pdb *policy.PodDisruptionBudget
583630
// 1. Find the controller for each pod. If any pod has 0 controllers,
584631
// that's an error. With ControllerRef, a pod can only have 1 controller.
585632
for _, pod := range pods {
633+
controllerRef := metav1.GetControllerOf(pod)
634+
if controllerRef == nil {
635+
err = fmt.Errorf("found no controller ref for pod %q", pod.Name)
636+
dc.recorder.Event(pdb, v1.EventTypeWarning, "NoControllerRef", err.Error())
637+
return
638+
}
639+
640+
// If we already know the scale of the controller there is no need to do anything.
641+
if _, found := controllerScale[controllerRef.UID]; found {
642+
continue
643+
}
644+
645+
// Check all the supported controllers to find the desired scale.
586646
foundController := false
587647
for _, finder := range dc.finders() {
588648
var controllerNScale *controllerAndScale
589-
controllerNScale, err = finder(pod)
649+
controllerNScale, err = finder(controllerRef, pod.Namespace)
590650
if err != nil {
591651
return
592652
}

0 commit comments

Comments
 (0)