Skip to content

Commit c9b4cf3

Browse files
authored
Merge pull request kubernetes#87342 from Huang-Wei/move-predicates-to-kubelet
Move GeneralPredicates logic to kubelet.
2 parents 95fcc5f + 3f8b202 commit c9b4cf3

File tree

10 files changed

+245
-339
lines changed

10 files changed

+245
-339
lines changed

pkg/kubelet/lifecycle/BUILD

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,11 @@ go_library(
2121
"//pkg/kubelet/container:go_default_library",
2222
"//pkg/kubelet/types:go_default_library",
2323
"//pkg/kubelet/util/format:go_default_library",
24-
"//pkg/scheduler/algorithm/predicates:go_default_library",
24+
"//pkg/scheduler/framework/plugins/helper:go_default_library",
25+
"//pkg/scheduler/framework/plugins/nodeaffinity:go_default_library",
26+
"//pkg/scheduler/framework/plugins/nodename:go_default_library",
27+
"//pkg/scheduler/framework/plugins/nodeports:go_default_library",
28+
"//pkg/scheduler/framework/plugins/noderesources:go_default_library",
2529
"//pkg/scheduler/nodeinfo:go_default_library",
2630
"//pkg/security/apparmor:go_default_library",
2731
"//staging/src/k8s.io/api/core/v1:go_default_library",
@@ -40,11 +44,16 @@ go_test(
4044
],
4145
embed = [":go_default_library"],
4246
deps = [
47+
"//pkg/apis/core/v1/helper:go_default_library",
4348
"//pkg/kubelet/container:go_default_library",
4449
"//pkg/kubelet/util/format:go_default_library",
50+
"//pkg/scheduler/framework/plugins/nodename:go_default_library",
51+
"//pkg/scheduler/framework/plugins/nodeports:go_default_library",
52+
"//pkg/scheduler/framework/plugins/noderesources:go_default_library",
4553
"//pkg/scheduler/nodeinfo:go_default_library",
4654
"//staging/src/k8s.io/api/core/v1:go_default_library",
4755
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
56+
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
4857
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
4958
],
5059
)

pkg/kubelet/lifecycle/admission_failure_handler_stub.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package lifecycle
1818

1919
import (
2020
"k8s.io/api/core/v1"
21-
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
2221
)
2322

2423
// AdmissionFailureHandlerStub is an AdmissionFailureHandler that does not perform any handling of admission failure.
@@ -31,6 +30,6 @@ func NewAdmissionFailureHandlerStub() *AdmissionFailureHandlerStub {
3130
return &AdmissionFailureHandlerStub{}
3231
}
3332

34-
func (n *AdmissionFailureHandlerStub) HandleAdmissionFailure(admitPod *v1.Pod, failureReasons []predicates.PredicateFailureReason) (bool, []predicates.PredicateFailureReason, error) {
35-
return false, failureReasons, nil
33+
func (n *AdmissionFailureHandlerStub) HandleAdmissionFailure(admitPod *v1.Pod, failureReasons []PredicateFailureReason) ([]PredicateFailureReason, error) {
34+
return failureReasons, nil
3635
}

pkg/kubelet/lifecycle/predicate.go

Lines changed: 77 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,15 @@ import (
2020
"fmt"
2121

2222
"k8s.io/klog"
23+
pluginhelper "k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper"
24+
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity"
25+
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodename"
26+
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeports"
27+
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
2328

2429
"k8s.io/api/core/v1"
2530
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
2631
"k8s.io/kubernetes/pkg/kubelet/util/format"
27-
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
2832
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
2933
)
3034

@@ -35,7 +39,7 @@ type pluginResourceUpdateFuncType func(*schedulernodeinfo.NodeInfo, *PodAdmitAtt
3539
// AdmissionFailureHandler is an interface which defines how to deal with a failure to admit a pod.
3640
// This allows for the graceful handling of pod admission failure.
3741
type AdmissionFailureHandler interface {
38-
HandleAdmissionFailure(admitPod *v1.Pod, failureReasons []predicates.PredicateFailureReason) (bool, []predicates.PredicateFailureReason, error)
42+
HandleAdmissionFailure(admitPod *v1.Pod, failureReasons []PredicateFailureReason) ([]PredicateFailureReason, error)
3943
}
4044

4145
type predicateAdmitHandler struct {
@@ -89,7 +93,8 @@ func (w *predicateAdmitHandler) Admit(attrs *PodAdmitAttributes) PodAdmitResult
8993
// the Resource Class API in the future.
9094
podWithoutMissingExtendedResources := removeMissingExtendedResources(admitPod, nodeInfo)
9195

92-
fit, reasons, err := predicates.GeneralPredicates(podWithoutMissingExtendedResources, nil, nodeInfo)
96+
reasons, err := GeneralPredicates(podWithoutMissingExtendedResources, nodeInfo)
97+
fit := len(reasons) == 0 && err == nil
9398
if err != nil {
9499
message := fmt.Sprintf("GeneralPredicates failed due to %v, which is unexpected.", err)
95100
klog.Warningf("Failed to admit pod %v - %s", format.Pod(admitPod), message)
@@ -100,7 +105,8 @@ func (w *predicateAdmitHandler) Admit(attrs *PodAdmitAttributes) PodAdmitResult
100105
}
101106
}
102107
if !fit {
103-
fit, reasons, err = w.admissionFailureHandler.HandleAdmissionFailure(admitPod, reasons)
108+
reasons, err = w.admissionFailureHandler.HandleAdmissionFailure(admitPod, reasons)
109+
fit = len(reasons) == 0 && err == nil
104110
if err != nil {
105111
message := fmt.Sprintf("Unexpected error while attempting to recover from admission failure: %v", err)
106112
klog.Warningf("Failed to admit pod %v - %s", format.Pod(admitPod), message)
@@ -126,11 +132,11 @@ func (w *predicateAdmitHandler) Admit(attrs *PodAdmitAttributes) PodAdmitResult
126132
// If there are failed predicates, we only return the first one as a reason.
127133
r := reasons[0]
128134
switch re := r.(type) {
129-
case *predicates.PredicateFailureError:
135+
case *PredicateFailureError:
130136
reason = re.PredicateName
131137
message = re.Error()
132138
klog.V(2).Infof("Predicate failed on Pod: %v, for reason: %v", format.Pod(admitPod), message)
133-
case *predicates.InsufficientResourceError:
139+
case *InsufficientResourceError:
134140
reason = fmt.Sprintf("OutOf%s", re.ResourceName)
135141
message = re.Error()
136142
klog.V(2).Infof("Predicate failed on Pod: %v, for reason: %v", format.Pod(admitPod), message)
@@ -168,3 +174,68 @@ func removeMissingExtendedResources(pod *v1.Pod, nodeInfo *schedulernodeinfo.Nod
168174
}
169175
return podCopy
170176
}
177+
178+
// InsufficientResourceError is an error type that indicates what kind of resource limit is
179+
// hit and caused the unfitting failure.
180+
type InsufficientResourceError struct {
181+
noderesources.InsufficientResource
182+
}
183+
184+
func (e *InsufficientResourceError) Error() string {
185+
return fmt.Sprintf("Node didn't have enough resource: %s, requested: %d, used: %d, capacity: %d",
186+
e.ResourceName, e.Requested, e.Used, e.Capacity)
187+
}
188+
189+
// PredicateFailureReason interface represents the failure reason of a predicate.
190+
type PredicateFailureReason interface {
191+
GetReason() string
192+
}
193+
194+
// GetReason returns the reason of the InsufficientResourceError.
195+
func (e *InsufficientResourceError) GetReason() string {
196+
return fmt.Sprintf("Insufficient %v", e.ResourceName)
197+
}
198+
199+
// GetInsufficientAmount returns the amount of the insufficient resource of the error.
200+
func (e *InsufficientResourceError) GetInsufficientAmount() int64 {
201+
return e.Requested - (e.Capacity - e.Used)
202+
}
203+
204+
// PredicateFailureError describes a failure error of predicate.
205+
type PredicateFailureError struct {
206+
PredicateName string
207+
PredicateDesc string
208+
}
209+
210+
func (e *PredicateFailureError) Error() string {
211+
return fmt.Sprintf("Predicate %s failed", e.PredicateName)
212+
}
213+
214+
// GetReason returns the reason of the PredicateFailureError.
215+
func (e *PredicateFailureError) GetReason() string {
216+
return e.PredicateDesc
217+
}
218+
219+
// GeneralPredicates checks a group of predicates that the kubelet cares about.
220+
func GeneralPredicates(pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) ([]PredicateFailureReason, error) {
221+
if nodeInfo.Node() == nil {
222+
return nil, fmt.Errorf("node not found")
223+
}
224+
225+
var reasons []PredicateFailureReason
226+
for _, r := range noderesources.Fits(pod, nodeInfo, nil) {
227+
reasons = append(reasons, &InsufficientResourceError{InsufficientResource: r})
228+
}
229+
230+
if !pluginhelper.PodMatchesNodeSelectorAndAffinityTerms(pod, nodeInfo.Node()) {
231+
reasons = append(reasons, &PredicateFailureError{nodeaffinity.Name, nodeaffinity.ErrReason})
232+
}
233+
if !nodename.Fits(pod, nodeInfo) {
234+
reasons = append(reasons, &PredicateFailureError{nodename.Name, nodename.ErrReason})
235+
}
236+
if !nodeports.Fits(pod, nodeInfo) {
237+
reasons = append(reasons, &PredicateFailureError{nodeports.Name, nodeports.ErrReason})
238+
}
239+
240+
return reasons, nil
241+
}

pkg/kubelet/lifecycle/predicate_test.go

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@ import (
2222

2323
"k8s.io/api/core/v1"
2424
"k8s.io/apimachinery/pkg/api/resource"
25+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26+
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
27+
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodename"
28+
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeports"
29+
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
2530
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
2631
)
2732

@@ -110,3 +115,147 @@ func makeTestNode(allocatable v1.ResourceList) *v1.Node {
110115
},
111116
}
112117
}
118+
119+
var (
120+
extendedResourceA = v1.ResourceName("example.com/aaa")
121+
hugePageResourceA = v1helper.HugePageResourceName(resource.MustParse("2Mi"))
122+
)
123+
124+
func makeResources(milliCPU, memory, pods, extendedA, storage, hugePageA int64) v1.NodeResources {
125+
return v1.NodeResources{
126+
Capacity: v1.ResourceList{
127+
v1.ResourceCPU: *resource.NewMilliQuantity(milliCPU, resource.DecimalSI),
128+
v1.ResourceMemory: *resource.NewQuantity(memory, resource.BinarySI),
129+
v1.ResourcePods: *resource.NewQuantity(pods, resource.DecimalSI),
130+
extendedResourceA: *resource.NewQuantity(extendedA, resource.DecimalSI),
131+
v1.ResourceEphemeralStorage: *resource.NewQuantity(storage, resource.BinarySI),
132+
hugePageResourceA: *resource.NewQuantity(hugePageA, resource.BinarySI),
133+
},
134+
}
135+
}
136+
137+
func makeAllocatableResources(milliCPU, memory, pods, extendedA, storage, hugePageA int64) v1.ResourceList {
138+
return v1.ResourceList{
139+
v1.ResourceCPU: *resource.NewMilliQuantity(milliCPU, resource.DecimalSI),
140+
v1.ResourceMemory: *resource.NewQuantity(memory, resource.BinarySI),
141+
v1.ResourcePods: *resource.NewQuantity(pods, resource.DecimalSI),
142+
extendedResourceA: *resource.NewQuantity(extendedA, resource.DecimalSI),
143+
v1.ResourceEphemeralStorage: *resource.NewQuantity(storage, resource.BinarySI),
144+
hugePageResourceA: *resource.NewQuantity(hugePageA, resource.BinarySI),
145+
}
146+
}
147+
148+
func newResourcePod(usage ...schedulernodeinfo.Resource) *v1.Pod {
149+
containers := []v1.Container{}
150+
for _, req := range usage {
151+
containers = append(containers, v1.Container{
152+
Resources: v1.ResourceRequirements{Requests: req.ResourceList()},
153+
})
154+
}
155+
return &v1.Pod{
156+
Spec: v1.PodSpec{
157+
Containers: containers,
158+
},
159+
}
160+
}
161+
162+
func newPodWithPort(hostPorts ...int) *v1.Pod {
163+
networkPorts := []v1.ContainerPort{}
164+
for _, port := range hostPorts {
165+
networkPorts = append(networkPorts, v1.ContainerPort{HostPort: int32(port)})
166+
}
167+
return &v1.Pod{
168+
Spec: v1.PodSpec{
169+
Containers: []v1.Container{
170+
{
171+
Ports: networkPorts,
172+
},
173+
},
174+
},
175+
}
176+
}
177+
178+
func TestGeneralPredicates(t *testing.T) {
179+
resourceTests := []struct {
180+
pod *v1.Pod
181+
nodeInfo *schedulernodeinfo.NodeInfo
182+
node *v1.Node
183+
fits bool
184+
name string
185+
wErr error
186+
reasons []PredicateFailureReason
187+
}{
188+
{
189+
pod: &v1.Pod{},
190+
nodeInfo: schedulernodeinfo.NewNodeInfo(
191+
newResourcePod(schedulernodeinfo.Resource{MilliCPU: 9, Memory: 19})),
192+
node: &v1.Node{
193+
ObjectMeta: metav1.ObjectMeta{Name: "machine1"},
194+
Status: v1.NodeStatus{Capacity: makeResources(10, 20, 32, 0, 0, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 32, 0, 0, 0)},
195+
},
196+
fits: true,
197+
wErr: nil,
198+
name: "no resources/port/host requested always fits",
199+
},
200+
{
201+
pod: newResourcePod(schedulernodeinfo.Resource{MilliCPU: 8, Memory: 10}),
202+
nodeInfo: schedulernodeinfo.NewNodeInfo(
203+
newResourcePod(schedulernodeinfo.Resource{MilliCPU: 5, Memory: 19})),
204+
node: &v1.Node{
205+
ObjectMeta: metav1.ObjectMeta{Name: "machine1"},
206+
Status: v1.NodeStatus{Capacity: makeResources(10, 20, 32, 0, 0, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 32, 0, 0, 0)},
207+
},
208+
fits: false,
209+
wErr: nil,
210+
reasons: []PredicateFailureReason{
211+
&InsufficientResourceError{InsufficientResource: noderesources.InsufficientResource{ResourceName: v1.ResourceCPU, Requested: 8, Used: 5, Capacity: 10}},
212+
&InsufficientResourceError{InsufficientResource: noderesources.InsufficientResource{ResourceName: v1.ResourceMemory, Requested: 10, Used: 19, Capacity: 20}},
213+
},
214+
name: "not enough cpu and memory resource",
215+
},
216+
{
217+
pod: &v1.Pod{
218+
Spec: v1.PodSpec{
219+
NodeName: "machine2",
220+
},
221+
},
222+
nodeInfo: schedulernodeinfo.NewNodeInfo(),
223+
node: &v1.Node{
224+
ObjectMeta: metav1.ObjectMeta{Name: "machine1"},
225+
Status: v1.NodeStatus{Capacity: makeResources(10, 20, 32, 0, 0, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 32, 0, 0, 0)},
226+
},
227+
fits: false,
228+
wErr: nil,
229+
reasons: []PredicateFailureReason{&PredicateFailureError{nodename.Name, nodename.ErrReason}},
230+
name: "host not match",
231+
},
232+
{
233+
pod: newPodWithPort(123),
234+
nodeInfo: schedulernodeinfo.NewNodeInfo(newPodWithPort(123)),
235+
node: &v1.Node{
236+
ObjectMeta: metav1.ObjectMeta{Name: "machine1"},
237+
Status: v1.NodeStatus{Capacity: makeResources(10, 20, 32, 0, 0, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 32, 0, 0, 0)},
238+
},
239+
fits: false,
240+
wErr: nil,
241+
reasons: []PredicateFailureReason{&PredicateFailureError{nodeports.Name, nodeports.ErrReason}},
242+
name: "hostport conflict",
243+
},
244+
}
245+
for _, test := range resourceTests {
246+
t.Run(test.name, func(t *testing.T) {
247+
test.nodeInfo.SetNode(test.node)
248+
reasons, err := GeneralPredicates(test.pod, test.nodeInfo)
249+
fits := len(reasons) == 0 && err == nil
250+
if err != nil {
251+
t.Errorf("unexpected error: %v", err)
252+
}
253+
if !fits && !reflect.DeepEqual(reasons, test.reasons) {
254+
t.Errorf("unexpected failure reasons: %v, want: %v", reasons, test.reasons)
255+
}
256+
if fits != test.fits {
257+
t.Errorf("expected: %v got %v", test.fits, fits)
258+
}
259+
})
260+
}
261+
}

pkg/kubelet/preemption/BUILD

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ go_library(
1919
"//pkg/kubelet/metrics:go_default_library",
2020
"//pkg/kubelet/types:go_default_library",
2121
"//pkg/kubelet/util/format:go_default_library",
22-
"//pkg/scheduler/algorithm/predicates:go_default_library",
2322
"//staging/src/k8s.io/api/core/v1:go_default_library",
2423
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
2524
"//vendor/k8s.io/klog:go_default_library",

pkg/kubelet/preemption/preemption.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ import (
3131
"k8s.io/kubernetes/pkg/kubelet/metrics"
3232
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
3333
"k8s.io/kubernetes/pkg/kubelet/util/format"
34-
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
3534
)
3635

3736
const message = "Preempted in order to admit critical pod"
@@ -61,16 +60,16 @@ func NewCriticalPodAdmissionHandler(getPodsFunc eviction.ActivePodsFunc, killPod
6160

6261
// HandleAdmissionFailure gracefully handles admission rejection, and, in some cases,
6362
// to allow admission of the pod despite its previous failure.
64-
func (c *CriticalPodAdmissionHandler) HandleAdmissionFailure(admitPod *v1.Pod, failureReasons []predicates.PredicateFailureReason) (bool, []predicates.PredicateFailureReason, error) {
63+
func (c *CriticalPodAdmissionHandler) HandleAdmissionFailure(admitPod *v1.Pod, failureReasons []lifecycle.PredicateFailureReason) ([]lifecycle.PredicateFailureReason, error) {
6564
if !kubetypes.IsCriticalPod(admitPod) {
66-
return false, failureReasons, nil
65+
return failureReasons, nil
6766
}
6867
// InsufficientResourceError is not a reason to reject a critical pod.
6968
// Instead of rejecting, we free up resources to admit it, if no other reasons for rejection exist.
70-
nonResourceReasons := []predicates.PredicateFailureReason{}
69+
nonResourceReasons := []lifecycle.PredicateFailureReason{}
7170
resourceReasons := []*admissionRequirement{}
7271
for _, reason := range failureReasons {
73-
if r, ok := reason.(*predicates.InsufficientResourceError); ok {
72+
if r, ok := reason.(*lifecycle.InsufficientResourceError); ok {
7473
resourceReasons = append(resourceReasons, &admissionRequirement{
7574
resourceName: r.ResourceName,
7675
quantity: r.GetInsufficientAmount(),
@@ -81,11 +80,11 @@ func (c *CriticalPodAdmissionHandler) HandleAdmissionFailure(admitPod *v1.Pod, f
8180
}
8281
if len(nonResourceReasons) > 0 {
8382
// Return only reasons that are not resource related, since critical pods cannot fail admission for resource reasons.
84-
return false, nonResourceReasons, nil
83+
return nonResourceReasons, nil
8584
}
8685
err := c.evictPodsToFreeRequests(admitPod, admissionRequirementList(resourceReasons))
8786
// if no error is returned, preemption succeeded and the pod is safe to admit.
88-
return err == nil, nil, err
87+
return nil, err
8988
}
9089

9190
// evictPodsToFreeRequests takes a list of insufficient resources, and attempts to free them by evicting pods

pkg/scheduler/algorithm/BUILD

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,7 @@ filegroup(
1616

1717
filegroup(
1818
name = "all-srcs",
19-
srcs = [
20-
":package-srcs",
21-
"//pkg/scheduler/algorithm/predicates:all-srcs",
22-
],
19+
srcs = [":package-srcs"],
2320
tags = ["automanaged"],
2421
visibility = ["//visibility:public"],
2522
)

0 commit comments

Comments
 (0)