Skip to content

Commit 760903c

Browse files
committed
DRA kubelet: give DRA drivers a 30 second grace period for updates
When doing an update of a DaemonSet, first the old pod gets stopped and then the new one is started. This causes the kubelet to remove all ResourceSlices directly after removal and forces the new pod to recreate all of them. Now the kubelet waits 30 seconds before it deletes ResourceSlices. If a new driver registers during that period, nothing is done at all. The new driver finds the existing ResourceSlices and only needs to update them if something changed. The downside is that if the driver gets removed permanently, this creates a delay where pods might still get scheduled to the node although the driver is not going to run there anymore and thus the pods will be stuck.
1 parent 0490b9f commit 760903c

File tree

6 files changed

+202
-26
lines changed

6 files changed

+202
-26
lines changed

pkg/kubelet/cm/dra/manager.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,20 @@ func NewManagerImpl(kubeClient clientset.Interface, stateFileDirectory string, n
9898
}
9999

100100
func (m *ManagerImpl) GetWatcherHandler() cache.PluginHandler {
101-
return cache.PluginHandler(dra.NewRegistrationHandler(m.kubeClient, m.getNode))
101+
// The time that DRA drivers have to come back after being unregistered
102+
// before the kubelet removes their ResourceSlices.
103+
//
104+
// This must be long enough to actually allow stopping a pod and
105+
// starting the replacement (otherwise ResourceSlices get deleted
106+
// unnecessarily) and not too long (otherwise the time window were
107+
// pods might still get scheduled to the node after removal of a
108+
// driver is too long).
109+
//
110+
// 30 seconds might be long enough for a simple container restart.
111+
// If a DRA driver wants to be sure that slices don't get wiped,
112+
// it should use rolling updates.
113+
wipingDelay := 30 * time.Second
114+
return cache.PluginHandler(dra.NewRegistrationHandler(m.kubeClient, m.getNode, wipingDelay))
102115
}
103116

104117
// Start starts the reconcile loop of the manager.

pkg/kubelet/cm/dra/manager_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -580,7 +580,7 @@ func TestPrepareResources(t *testing.T) {
580580
}
581581
defer draServerInfo.teardownFn()
582582

583-
plg := plugin.NewRegistrationHandler(nil, getFakeNode)
583+
plg := plugin.NewRegistrationHandler(nil, getFakeNode, time.Second /* very short wiping delay for testing */)
584584
if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{drapb.DRAPluginService}, pluginClientTimeout); err != nil {
585585
t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err)
586586
}
@@ -717,7 +717,7 @@ func TestUnprepareResources(t *testing.T) {
717717
}
718718
defer draServerInfo.teardownFn()
719719

720-
plg := plugin.NewRegistrationHandler(nil, getFakeNode)
720+
plg := plugin.NewRegistrationHandler(nil, getFakeNode, time.Second /* very short wiping delay for testing */)
721721
if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{drapb.DRAPluginService}, pluginClientTimeout); err != nil {
722722
t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err)
723723
}
@@ -887,7 +887,7 @@ func TestParallelPrepareUnprepareResources(t *testing.T) {
887887
}
888888
defer draServerInfo.teardownFn()
889889

890-
plg := plugin.NewRegistrationHandler(nil, getFakeNode)
890+
plg := plugin.NewRegistrationHandler(nil, getFakeNode, time.Second /* very short wiping delay for testing */)
891891
if err := plg.RegisterPlugin(driverName, draServerInfo.socketName, []string{drapb.DRAPluginService}, nil); err != nil {
892892
t.Fatalf("failed to register plugin %s, err: %v", driverName, err)
893893
}

pkg/kubelet/cm/dra/plugin/registration.go

Lines changed: 76 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"errors"
2222
"fmt"
2323
"slices"
24+
"sync"
2425
"time"
2526

2627
v1 "k8s.io/api/core/v1"
@@ -53,6 +54,18 @@ type RegistrationHandler struct {
5354
backgroundCtx context.Context
5455
kubeClient kubernetes.Interface
5556
getNode func() (*v1.Node, error)
57+
wipingDelay time.Duration
58+
59+
mutex sync.Mutex
60+
61+
// pendingWipes maps a plugin name to a cancel function for
62+
// wiping of that plugin's ResourceSlices. Entries get added
63+
// in DeRegisterPlugin and check in RegisterPlugin. If
64+
// wiping is pending during RegisterPlugin, it gets canceled.
65+
//
66+
// Must use pointers to functions because the entries have to
67+
// be comparable.
68+
pendingWipes map[string]*context.CancelCauseFunc
5669
}
5770

5871
var _ cache.PluginHandler = &RegistrationHandler{}
@@ -62,12 +75,14 @@ var _ cache.PluginHandler = &RegistrationHandler{}
6275
// Must only be called once per process because it manages global state.
6376
// If a kubeClient is provided, then it synchronizes ResourceSlices
6477
// with the resource information provided by plugins.
65-
func NewRegistrationHandler(kubeClient kubernetes.Interface, getNode func() (*v1.Node, error)) *RegistrationHandler {
78+
func NewRegistrationHandler(kubeClient kubernetes.Interface, getNode func() (*v1.Node, error), wipingDelay time.Duration) *RegistrationHandler {
6679
handler := &RegistrationHandler{
6780
// The context and thus logger should come from the caller.
6881
backgroundCtx: klog.NewContext(context.TODO(), klog.LoggerWithName(klog.TODO(), "DRA registration handler")),
6982
kubeClient: kubeClient,
7083
getNode: getNode,
84+
wipingDelay: wipingDelay,
85+
pendingWipes: make(map[string]*context.CancelCauseFunc),
7186
}
7287

7388
// When kubelet starts up, no DRA driver has registered yet. None of
@@ -77,19 +92,34 @@ func NewRegistrationHandler(kubeClient kubernetes.Interface, getNode func() (*v1
7792
// to start up.
7893
//
7994
// This has to run in the background.
80-
go handler.wipeResourceSlices("")
95+
logger := klog.LoggerWithName(klog.FromContext(handler.backgroundCtx), "startup")
96+
ctx := klog.NewContext(handler.backgroundCtx, logger)
97+
go handler.wipeResourceSlices(ctx, 0 /* no delay */, "" /* all drivers */)
8198

8299
return handler
83100
}
84101

85102
// wipeResourceSlices deletes ResourceSlices of the node, optionally just for a specific driver.
86-
func (h *RegistrationHandler) wipeResourceSlices(driver string) {
103+
// Wiping will delay for a while and can be canceled by canceling the context.
104+
func (h *RegistrationHandler) wipeResourceSlices(ctx context.Context, delay time.Duration, driver string) {
87105
if h.kubeClient == nil {
88106
return
89107
}
90-
ctx := h.backgroundCtx
91108
logger := klog.FromContext(ctx)
92109

110+
if delay != 0 {
111+
// Before we start deleting, give the driver time to bounce back.
112+
// Perhaps it got removed as part of a DaemonSet update and the
113+
// replacement pod is about to start.
114+
logger.V(4).Info("Starting to wait before wiping ResourceSlices", "delay", delay)
115+
select {
116+
case <-ctx.Done():
117+
logger.V(4).Info("Aborting wiping of ResourceSlices", "reason", context.Cause(ctx))
118+
case <-time.After(delay):
119+
logger.V(4).Info("Starting to wipe ResourceSlices after waiting", "delay", delay)
120+
}
121+
}
122+
93123
backoff := wait.Backoff{
94124
Duration: time.Second,
95125
Factor: 2,
@@ -184,6 +214,15 @@ func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string,
184214
logger.V(1).Info("DRA plugin already registered, the old plugin was replaced and will be forgotten by the kubelet till the next kubelet restart", "oldEndpoint", oldPlugin.endpoint)
185215
}
186216

217+
// Now cancel any pending ResourceSlice wiping for this plugin.
218+
// Only needs to be done once.
219+
h.mutex.Lock()
220+
defer h.mutex.Unlock()
221+
if cancel := h.pendingWipes[pluginName]; cancel != nil {
222+
(*cancel)(errors.New("new plugin instance registered"))
223+
delete(h.pendingWipes, pluginName)
224+
}
225+
187226
return nil
188227
}
189228

@@ -225,11 +264,42 @@ func (h *RegistrationHandler) DeRegisterPlugin(pluginName string) {
225264
logger := klog.FromContext(p.backgroundCtx)
226265
logger.V(3).Info("Deregister DRA plugin", "endpoint", p.endpoint)
227266

267+
// Prepare for canceling the background wiping. This needs to run
268+
// in the context of the registration handler, the one from
269+
// the plugin is canceled.
270+
logger = klog.FromContext(h.backgroundCtx)
271+
logger = klog.LoggerWithName(logger, "driver-cleanup")
272+
logger = klog.LoggerWithValues(logger, "pluginName", pluginName)
273+
ctx, cancel := context.WithCancelCause(h.backgroundCtx)
274+
ctx = klog.NewContext(ctx, logger)
275+
228276
// Clean up the ResourceSlices for the deleted Plugin since it
229277
// may have died without doing so itself and might never come
230278
// back.
231-
go h.wipeResourceSlices(pluginName)
232-
279+
//
280+
// May get canceled if the plugin comes back quickly enough
281+
// (see RegisterPlugin).
282+
h.mutex.Lock()
283+
defer h.mutex.Unlock()
284+
if cancel := h.pendingWipes[pluginName]; cancel != nil {
285+
(*cancel)(errors.New("plugin deregistered a second time"))
286+
}
287+
h.pendingWipes[pluginName] = &cancel
288+
289+
go func() {
290+
defer func() {
291+
h.mutex.Lock()
292+
defer h.mutex.Unlock()
293+
294+
// Cancel our own context, but remove it from the map only if it
295+
// is the current entry. Perhaps it already got replaced.
296+
cancel(errors.New("wiping done"))
297+
if h.pendingWipes[pluginName] == &cancel {
298+
delete(h.pendingWipes, pluginName)
299+
}
300+
}()
301+
h.wipeResourceSlices(ctx, h.wipingDelay, pluginName)
302+
}()
233303
return
234304
}
235305

pkg/kubelet/cm/dra/plugin/registration_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ func TestRegistrationHandler(t *testing.T) {
157157
}
158158

159159
// The handler wipes all slices at startup.
160-
handler := NewRegistrationHandler(client, getFakeNode)
160+
handler := NewRegistrationHandler(client, getFakeNode, time.Second /* very short wiping delay for testing */)
161161
requireNoSlices := func() {
162162
t.Helper()
163163
if client == nil {

test/e2e/dra/deploy.go

Lines changed: 49 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ import (
6666
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
6767
"k8s.io/kubernetes/test/e2e/storage/drivers/proxy"
6868
"k8s.io/kubernetes/test/e2e/storage/utils"
69+
"k8s.io/utils/ptr"
6970
"sigs.k8s.io/yaml"
7071
)
7172

@@ -214,7 +215,6 @@ func (d *Driver) Run(nodes *Nodes, configureResources func() Resources, devicesP
214215
// not run on all nodes.
215216
resources.Nodes = nodes.NodeNames
216217
}
217-
ginkgo.DeferCleanup(d.IsGone) // Register first so it gets called last.
218218
d.SetUp(nodes, resources, devicesPerNode...)
219219
ginkgo.DeferCleanup(d.TearDown)
220220
}
@@ -227,12 +227,22 @@ type MethodInstance struct {
227227
type Driver struct {
228228
f *framework.Framework
229229
ctx context.Context
230-
cleanup []func() // executed first-in-first-out
230+
cleanup []func(context.Context) // executed first-in-first-out
231231
wg sync.WaitGroup
232232
serviceAccountName string
233233

234+
// NameSuffix can be set while registering a test to deploy different
235+
// drivers in the same test namespace.
234236
NameSuffix string
235-
Name string
237+
238+
// InstanceSuffix can be set while registering a test to deploy two different
239+
// instances of the same driver. Used to generate unique objects in the API server.
240+
// The socket path is still the same.
241+
InstanceSuffix string
242+
243+
// Name gets derived automatically from the current test namespace and
244+
// (if set) the NameSuffix while setting up the driver for a test.
245+
Name string
236246

237247
// Nodes contains entries for each node selected for a test when the test runs.
238248
// In addition, there is one entry for a fictional node.
@@ -263,9 +273,13 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[
263273
ctx, cancel := context.WithCancel(context.Background())
264274
logger := klog.FromContext(ctx)
265275
logger = klog.LoggerWithValues(logger, "driverName", d.Name)
276+
if d.InstanceSuffix != "" {
277+
instance, _ := strings.CutPrefix(d.InstanceSuffix, "-")
278+
logger = klog.LoggerWithValues(logger, "instance", instance)
279+
}
266280
ctx = klog.NewContext(ctx, logger)
267281
d.ctx = ctx
268-
d.cleanup = append(d.cleanup, cancel)
282+
d.cleanup = append(d.cleanup, func(context.Context) { cancel() })
269283

270284
if !resources.NodeLocal {
271285
// Publish one resource pool with "network-attached" devices.
@@ -323,28 +337,32 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[
323337
}
324338

325339
// Create service account and corresponding RBAC rules.
326-
d.serviceAccountName = "dra-kubelet-plugin-" + d.Name + "-service-account"
340+
d.serviceAccountName = "dra-kubelet-plugin-" + d.Name + d.InstanceSuffix + "-service-account"
327341
content := pluginPermissions
328342
content = strings.ReplaceAll(content, "dra-kubelet-plugin-namespace", d.f.Namespace.Name)
329-
content = strings.ReplaceAll(content, "dra-kubelet-plugin", "dra-kubelet-plugin-"+d.Name)
343+
content = strings.ReplaceAll(content, "dra-kubelet-plugin", "dra-kubelet-plugin-"+d.Name+d.InstanceSuffix)
330344
d.createFromYAML(ctx, []byte(content), d.f.Namespace.Name)
331345

346+
// Using a ReplicaSet instead of a DaemonSet has the advantage that we can control
347+
// the lifecycle explicitly, in particular run two pods per node long enough to
348+
// run checks.
332349
instanceKey := "app.kubernetes.io/instance"
333350
rsName := ""
334351
numNodes := int32(len(nodes.NodeNames))
335352
pluginDataDirectoryPath := path.Join(framework.TestContext.KubeletRootDir, "plugins", d.Name)
336353
registrarDirectoryPath := path.Join(framework.TestContext.KubeletRootDir, "plugins_registry")
337354
registrarSocketFilename := d.Name + "-reg.sock"
355+
instanceName := d.Name + d.InstanceSuffix
338356
err := utils.CreateFromManifests(ctx, d.f, d.f.Namespace, func(item interface{}) error {
339357
switch item := item.(type) {
340358
case *appsv1.ReplicaSet:
341-
item.Name += d.NameSuffix
359+
item.Name += d.NameSuffix + d.InstanceSuffix
342360
rsName = item.Name
343361
item.Spec.Replicas = &numNodes
344-
item.Spec.Selector.MatchLabels[instanceKey] = d.Name
345-
item.Spec.Template.Labels[instanceKey] = d.Name
362+
item.Spec.Selector.MatchLabels[instanceKey] = instanceName
363+
item.Spec.Template.Labels[instanceKey] = instanceName
346364
item.Spec.Template.Spec.ServiceAccountName = d.serviceAccountName
347-
item.Spec.Template.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution[0].LabelSelector.MatchLabels[instanceKey] = d.Name
365+
item.Spec.Template.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution[0].LabelSelector.MatchLabels[instanceKey] = instanceName
348366
item.Spec.Template.Spec.Affinity.NodeAffinity = &v1.NodeAffinity{
349367
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
350368
NodeSelectorTerms: []v1.NodeSelectorTerm{
@@ -376,7 +394,7 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[
376394
if err := e2ereplicaset.WaitForReplicaSetTargetAvailableReplicas(ctx, d.f.ClientSet, rs, numNodes); err != nil {
377395
framework.ExpectNoError(err, "all kubelet plugin proxies running")
378396
}
379-
requirement, err := labels.NewRequirement(instanceKey, selection.Equals, []string{d.Name})
397+
requirement, err := labels.NewRequirement(instanceKey, selection.Equals, []string{instanceName})
380398
framework.ExpectNoError(err, "create label selector requirement")
381399
selector := labels.NewSelector().Add(*requirement)
382400
pods, err := d.f.ClientSet.CoreV1().Pods(d.f.Namespace.Name).List(ctx, metav1.ListOptions{LabelSelector: selector.String()})
@@ -446,9 +464,20 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[
446464
kubeletplugin.RegistrarListener(listen(d.f, &pod, &listenerPort)),
447465
)
448466
framework.ExpectNoError(err, "start kubelet plugin for node %s", pod.Spec.NodeName)
449-
d.cleanup = append(d.cleanup, func() {
467+
d.cleanup = append(d.cleanup, func(ctx context.Context) {
450468
// Depends on cancel being called first.
451469
plugin.Stop()
470+
471+
// Also explicitly stop all pods.
472+
ginkgo.By("scaling down driver proxy pods for " + d.Name)
473+
rs, err := d.f.ClientSet.AppsV1().ReplicaSets(d.f.Namespace.Name).Get(ctx, rsName, metav1.GetOptions{})
474+
framework.ExpectNoError(err, "get ReplicaSet for driver "+d.Name)
475+
rs.Spec.Replicas = ptr.To(int32(0))
476+
rs, err = d.f.ClientSet.AppsV1().ReplicaSets(d.f.Namespace.Name).Update(ctx, rs, metav1.UpdateOptions{})
477+
framework.ExpectNoError(err, "scale down ReplicaSet for driver "+d.Name)
478+
if err := e2ereplicaset.WaitForReplicaSetTargetAvailableReplicas(ctx, d.f.ClientSet, rs, 0); err != nil {
479+
framework.ExpectNoError(err, "all kubelet plugin proxies stopped")
480+
}
452481
})
453482
d.Nodes[nodename] = KubeletPlugin{ExamplePlugin: plugin, ClientSet: driverClient}
454483
}
@@ -717,22 +746,27 @@ func pipe(ctx context.Context, msg string, verbosity int) *io.PipeWriter {
717746
return writer
718747
}
719748

720-
func (d *Driver) TearDown() {
749+
func (d *Driver) TearDown(ctx context.Context) {
721750
for _, c := range d.cleanup {
722-
c()
751+
c(ctx)
723752
}
724753
d.cleanup = nil
725754
d.wg.Wait()
726755
}
727756

757+
// IsGone checks that the kubelet is done with the driver.
758+
// This is done by waiting for the kubelet to remove the
759+
// driver's ResourceSlices, which takes at least 5 minutes
760+
// because of the delay in the kubelet. Only use this in slow
761+
// tests...
728762
func (d *Driver) IsGone(ctx context.Context) {
729763
gomega.Eventually(ctx, func(ctx context.Context) ([]resourceapi.ResourceSlice, error) {
730764
slices, err := d.f.ClientSet.ResourceV1beta1().ResourceSlices().List(ctx, metav1.ListOptions{FieldSelector: resourceapi.ResourceSliceSelectorDriver + "=" + d.Name})
731765
if err != nil {
732766
return nil, err
733767
}
734768
return slices.Items, err
735-
}).Should(gomega.BeEmpty())
769+
}).WithTimeout(7 * time.Minute).Should(gomega.BeEmpty())
736770
}
737771

738772
func (d *Driver) interceptor(nodename string, ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {

0 commit comments

Comments
 (0)