Skip to content

Commit 157f4b9

Browse files
authored
Merge pull request kubernetes#125753 from SergeyKanzhelev/devicePluginFailuresTests
device plugin failure tests
2 parents 0fa4b9e + 541f2af commit 157f4b9

File tree

2 files changed

+593
-0
lines changed

2 files changed

+593
-0
lines changed
Lines changed: 356 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,356 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package e2enode
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"time"
23+
24+
"github.com/onsi/ginkgo/v2"
25+
"github.com/onsi/gomega"
26+
27+
v1 "k8s.io/api/core/v1"
28+
kubeletdevicepluginv1beta1 "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
29+
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
30+
admissionapi "k8s.io/pod-security-admission/api"
31+
32+
"k8s.io/apimachinery/pkg/api/resource"
33+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
34+
"k8s.io/apimachinery/pkg/util/uuid"
35+
"k8s.io/kubernetes/test/e2e/framework"
36+
"k8s.io/kubernetes/test/e2e_node/testdeviceplugin"
37+
)
38+
39+
type ResourceValue struct {
40+
Allocatable int
41+
Capacity int
42+
}
43+
44+
// Serial because the test restarts Kubelet
45+
var _ = SIGDescribe("Device Plugin Failures:", framework.WithNodeConformance(), func() {
46+
f := framework.NewDefaultFramework("device-plugin-failures")
47+
f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
48+
49+
var getNodeResourceValues = func(ctx context.Context, resourceName string) ResourceValue {
50+
ginkgo.GinkgoHelper()
51+
node := getLocalNode(ctx, f)
52+
53+
// -1 represents that the resource is not found
54+
result := ResourceValue{
55+
Allocatable: -1,
56+
Capacity: -1,
57+
}
58+
59+
for key, val := range node.Status.Capacity {
60+
resource := string(key)
61+
if resource == resourceName {
62+
result.Capacity = int(val.Value())
63+
break
64+
}
65+
}
66+
67+
for key, val := range node.Status.Allocatable {
68+
resource := string(key)
69+
if resource == resourceName {
70+
result.Allocatable = int(val.Value())
71+
break
72+
}
73+
}
74+
75+
return result
76+
}
77+
78+
var createPod = func(resourceName string, quantity int) *v1.Pod {
79+
ginkgo.GinkgoHelper()
80+
rl := v1.ResourceList{v1.ResourceName(resourceName): *resource.NewQuantity(int64(quantity), resource.DecimalSI)}
81+
pod := &v1.Pod{
82+
ObjectMeta: metav1.ObjectMeta{Name: "device-plugin-failures-test-" + string(uuid.NewUUID())},
83+
Spec: v1.PodSpec{
84+
RestartPolicy: v1.RestartPolicyAlways,
85+
Containers: []v1.Container{{
86+
Image: busyboxImage,
87+
Name: "container-1",
88+
Command: []string{"sh", "-c", fmt.Sprintf("env && sleep %s", sleepIntervalForever)},
89+
Resources: v1.ResourceRequirements{
90+
Limits: rl,
91+
Requests: rl,
92+
},
93+
}},
94+
},
95+
}
96+
return pod
97+
}
98+
99+
nodeStatusUpdateTimeout := 1 * time.Minute
100+
devicePluginUpdateTimeout := 1 * time.Minute
101+
devicePluginGracefulTimeout := 5 * time.Minute // see endpointStopGracePeriod in pkg/kubelet/cm/devicemanager/types.go
102+
103+
ginkgo.It("when GetDevicePluginOptions fails, device plugin will not be used", func(ctx context.Context) {
104+
// randomizing so tests can run in parallel
105+
resourceName := fmt.Sprintf("test.device/%s", f.UniqueName)
106+
107+
expectedErr := fmt.Errorf("GetDevicePluginOptions failed")
108+
109+
plugin := testdeviceplugin.NewDevicePlugin(func(name string) error {
110+
if name == "GetDevicePluginOptions" {
111+
return expectedErr
112+
}
113+
return nil
114+
})
115+
116+
err := plugin.RegisterDevicePlugin(ctx, f.UniqueName, resourceName, []kubeletdevicepluginv1beta1.Device{{ID: "testdevice", Health: kubeletdevicepluginv1beta1.Healthy}})
117+
defer plugin.Stop() // should stop even if registration failed
118+
gomega.Expect(err).To(gomega.MatchError(gomega.ContainSubstring("failed to get device plugin options")))
119+
gomega.Expect(err).To(gomega.MatchError(gomega.ContainSubstring(expectedErr.Error())))
120+
121+
gomega.Expect(plugin.WasCalled("ListAndWatch")).To(gomega.BeFalseBecause("plugin should not be used if GetDevicePluginOptions fails"))
122+
gomega.Expect(plugin.WasCalled("GetDevicePluginOptions")).To(gomega.BeTrueBecause("get device plugin options should be called exactly once"))
123+
gomega.Expect(plugin.Calls()).To(gomega.HaveLen(1))
124+
125+
// kubelet will not even register the resource
126+
gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: -1, Capacity: -1}))
127+
})
128+
129+
ginkgo.It("will set allocatable to zero when a single device became unhealthy and then back to 1 if it got healthy again", func(ctx context.Context) {
130+
// randomizing so tests can run in parallel
131+
resourceName := fmt.Sprintf("test.device/%s", f.UniqueName)
132+
devices := []kubeletdevicepluginv1beta1.Device{{ID: "testdevice", Health: kubeletdevicepluginv1beta1.Healthy}}
133+
plugin := testdeviceplugin.NewDevicePlugin(nil)
134+
135+
err := plugin.RegisterDevicePlugin(ctx, f.UniqueName, resourceName, devices)
136+
defer plugin.Stop() // should stop even if registration failed
137+
gomega.Expect(err).To(gomega.Succeed())
138+
139+
// at first the device is healthy
140+
gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 1, Capacity: 1}))
141+
142+
// now make the device unhealthy
143+
devices[0].Health = kubeletdevicepluginv1beta1.Unhealthy
144+
plugin.UpdateDevices(devices)
145+
146+
gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 0, Capacity: 1}))
147+
148+
// now make the device healthy again
149+
devices[0].Health = kubeletdevicepluginv1beta1.Healthy
150+
plugin.UpdateDevices(devices)
151+
152+
gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 1, Capacity: 1}))
153+
})
154+
155+
ginkgo.It("will set allocatable to zero when a single device became unhealthy, but capacity will stay at 1", func(ctx context.Context) {
156+
// randomizing so tests can run in parallel
157+
resourceName := fmt.Sprintf("test.device/%s", f.UniqueName)
158+
devices := []kubeletdevicepluginv1beta1.Device{{ID: "testdevice", Health: kubeletdevicepluginv1beta1.Healthy}}
159+
plugin := testdeviceplugin.NewDevicePlugin(nil)
160+
161+
err := plugin.RegisterDevicePlugin(ctx, f.UniqueName, resourceName, devices)
162+
defer plugin.Stop() // should stop even if registration failed
163+
gomega.Expect(err).To(gomega.Succeed())
164+
165+
ginkgo.By("initial state: capacity and allocatable are set")
166+
gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 1, Capacity: 1}))
167+
168+
// schedule a pod that requests the device
169+
client := e2epod.NewPodClient(f)
170+
pod := client.Create(ctx, createPod(resourceName, 1))
171+
172+
// wait for the pod to be running
173+
gomega.Expect(e2epod.WaitForPodRunningInNamespace(ctx, f.ClientSet, pod)).To(gomega.Succeed())
174+
175+
ginkgo.By("once pod is running, it does not affect allocatable value")
176+
gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 1, Capacity: 1}))
177+
178+
// now make the device unhealthy
179+
devices[0].Health = kubeletdevicepluginv1beta1.Unhealthy
180+
plugin.UpdateDevices(devices)
181+
182+
ginkgo.By("even when device became unhealthy. pod is still running and keeping the capacity")
183+
// we keep the allocatable at the same value even though device is not healthy any longer
184+
gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 0, Capacity: 1}))
185+
186+
// pod is not affected by the device becoming unhealthy
187+
188+
gomega.Consistently(func() v1.PodPhase {
189+
pod, err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).Get(ctx, pod.Name, metav1.GetOptions{})
190+
return pod.Status.Phase
191+
}, devicePluginUpdateTimeout, f.Timeouts.Poll).Should(gomega.Equal(v1.PodRunning))
192+
193+
// deleting the pod
194+
err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).Delete(ctx, pod.Name, metav1.DeleteOptions{})
195+
gomega.Expect(err).To(gomega.Succeed())
196+
197+
// wait for the pod to be deleted
198+
gomega.Eventually(func() error {
199+
_, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Get(ctx, pod.Name, metav1.GetOptions{})
200+
return err
201+
}, f.Timeouts.PodDelete, f.Timeouts.Poll).Should(gomega.MatchError((gomega.ContainSubstring("not found"))))
202+
203+
ginkgo.By("when pod is deleted, nothing changes")
204+
gomega.Eventually(getNodeResourceValues, devicePluginGracefulTimeout+1*time.Minute, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 0, Capacity: 1}))
205+
})
206+
207+
ginkgo.It("will lower allocatable to a number of unhealthy devices and then back if they became healthy again", func(ctx context.Context) {
208+
// randomizing so tests can run in parallel
209+
resourceName := fmt.Sprintf("test.device/%s", f.UniqueName)
210+
211+
devices := []kubeletdevicepluginv1beta1.Device{
212+
{ID: "0", Health: kubeletdevicepluginv1beta1.Healthy},
213+
{ID: "1", Health: kubeletdevicepluginv1beta1.Healthy},
214+
{ID: "2", Health: kubeletdevicepluginv1beta1.Healthy},
215+
{ID: "3", Health: kubeletdevicepluginv1beta1.Healthy},
216+
}
217+
plugin := testdeviceplugin.NewDevicePlugin(nil)
218+
219+
err := plugin.RegisterDevicePlugin(ctx, f.UniqueName, resourceName, devices)
220+
defer plugin.Stop() // should stop even if registration failed
221+
gomega.Expect(err).To(gomega.Succeed())
222+
223+
// at first all the devices are healthy
224+
gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 4, Capacity: 4}))
225+
226+
// now make one device unhealthy
227+
devices[3].Health = kubeletdevicepluginv1beta1.Unhealthy
228+
plugin.UpdateDevices(devices)
229+
230+
gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 3, Capacity: 4}))
231+
232+
// now make the device healthy again
233+
devices[3].Health = kubeletdevicepluginv1beta1.Healthy
234+
plugin.UpdateDevices(devices)
235+
236+
gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 4, Capacity: 4}))
237+
238+
// now make two devices unhealthy
239+
devices[1].Health = kubeletdevicepluginv1beta1.Unhealthy
240+
devices[3].Health = kubeletdevicepluginv1beta1.Unhealthy
241+
plugin.UpdateDevices(devices)
242+
243+
gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 2, Capacity: 4}))
244+
245+
// now make the device healthy again
246+
devices[3].Health = kubeletdevicepluginv1beta1.Healthy
247+
plugin.UpdateDevices(devices)
248+
249+
gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 3, Capacity: 4}))
250+
251+
// now make the device healthy again
252+
devices[1].Health = kubeletdevicepluginv1beta1.Healthy
253+
plugin.UpdateDevices(devices)
254+
255+
gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 4, Capacity: 4}))
256+
})
257+
258+
ginkgo.It("when ListAndWatch fails immediately, node allocatable will be set to zero and kubelet will not retry to list resources", func(ctx context.Context) {
259+
// randomizing so tests can run in parallel
260+
resourceName := fmt.Sprintf("test.device/%s", f.UniqueName)
261+
devices := []kubeletdevicepluginv1beta1.Device{{ID: "testdevice", Health: kubeletdevicepluginv1beta1.Healthy}}
262+
263+
// Initially, there are no allocatable of this resource
264+
gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: -1, Capacity: -1}))
265+
266+
plugin := testdeviceplugin.NewDevicePlugin(func(name string) error {
267+
if name == "ListAndWatch" {
268+
return fmt.Errorf("ListAndWatch failed")
269+
}
270+
return nil
271+
})
272+
273+
err := plugin.RegisterDevicePlugin(ctx, f.UniqueName, resourceName, devices)
274+
defer plugin.Stop() // should stop even if registration failed
275+
gomega.Expect(err).To(gomega.Succeed())
276+
277+
// kubelet registers the resource, but will not have any allocatable
278+
gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 0, Capacity: 0}))
279+
280+
// kubelet will never retry ListAndWatch (this will sleep for a long time)
281+
gomega.Consistently(plugin.Calls, devicePluginUpdateTimeout, f.Timeouts.Poll).Should(gomega.HaveLen(2))
282+
283+
// however kubelet will not delete the resource
284+
gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 0, Capacity: 0}))
285+
})
286+
287+
ginkgo.It("when ListAndWatch fails after provisioning devices, node allocatable will be set to zero and kubelet will not retry to list resources", func(ctx context.Context) {
288+
// randomizing so tests can run in parallel
289+
resourceName := fmt.Sprintf("test.device/%s", f.UniqueName)
290+
devices := []kubeletdevicepluginv1beta1.Device{
291+
{ID: "0", Health: kubeletdevicepluginv1beta1.Healthy},
292+
{ID: "1", Health: kubeletdevicepluginv1beta1.Healthy},
293+
}
294+
295+
failing := false
296+
plugin := testdeviceplugin.NewDevicePlugin(func(name string) error {
297+
if name == "ListAndWatch" {
298+
if failing {
299+
return fmt.Errorf("ListAndWatch failed")
300+
}
301+
}
302+
return nil
303+
})
304+
305+
err := plugin.RegisterDevicePlugin(ctx, f.UniqueName, resourceName, devices)
306+
defer plugin.Stop() // should stop even if registration failed
307+
gomega.Expect(err).To(gomega.Succeed())
308+
309+
// at first the device is healthy
310+
gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 2, Capacity: 2}))
311+
312+
// let's make ListAndWatch fail
313+
failing = true
314+
315+
// kubelet will mark all devices as unhealthy
316+
gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 0, Capacity: 2}))
317+
318+
// kubelet will never retry ListAndWatch (this will sleep for a long time)
319+
gomega.Consistently(plugin.Calls, devicePluginUpdateTimeout, f.Timeouts.Poll).Should(gomega.HaveLen(2))
320+
321+
// however kubelet will not delete the resource and will keep the capacity
322+
gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 0, Capacity: 2}))
323+
324+
// after the graceful period devices capacity will reset to zero
325+
gomega.Eventually(getNodeResourceValues, devicePluginGracefulTimeout+1*time.Minute, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 0, Capacity: 0}))
326+
})
327+
328+
ginkgo.It("when device plugin is stopped after provisioning devices, node allocatable will be set to zero", func(ctx context.Context) {
329+
// randomizing so tests can run in parallel
330+
resourceName := fmt.Sprintf("test.device/%s", f.UniqueName)
331+
devices := []kubeletdevicepluginv1beta1.Device{
332+
{ID: "0", Health: kubeletdevicepluginv1beta1.Healthy},
333+
{ID: "1", Health: kubeletdevicepluginv1beta1.Healthy},
334+
}
335+
336+
gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: -1, Capacity: -1}))
337+
338+
plugin := testdeviceplugin.NewDevicePlugin(nil)
339+
340+
err := plugin.RegisterDevicePlugin(ctx, f.UniqueName, resourceName, devices)
341+
defer plugin.Stop() // should stop even if registration failed
342+
gomega.Expect(err).To(gomega.Succeed())
343+
344+
// at first the device is healthy
345+
gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 2, Capacity: 2}))
346+
347+
// let's unload the plugin
348+
plugin.Stop()
349+
350+
// kubelet will mark all devices as unhealthy
351+
gomega.Eventually(getNodeResourceValues, nodeStatusUpdateTimeout, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 0, Capacity: 2}))
352+
353+
// after the graceful period devices capacity will reset to zero
354+
gomega.Eventually(getNodeResourceValues, devicePluginGracefulTimeout+1*time.Minute, f.Timeouts.Poll).WithContext(ctx).WithArguments(resourceName).Should(gomega.Equal(ResourceValue{Allocatable: 0, Capacity: 0}))
355+
})
356+
})

0 commit comments

Comments
 (0)