Skip to content

Commit ec12727

Browse files
committed
DRA kubeletplugin: revise socket path handling
When supporting rolling updates, we cannot use the same fixed socket paths for old and new pod. With the revised API, the caller no longer specifies the full socket paths, only directories. The logic about how to name sockets then can be in the helper. While at it, avoid passing a context to the gRPC helper code when all that the helper code needs is a logger. That leads to confusion about whether cancellation has an effect.
1 parent c6252da commit ec12727

File tree

11 files changed

+326
-134
lines changed

11 files changed

+326
-134
lines changed

staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/doc.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,52 @@ limitations under the License.
1616

1717
// Package kubeletplugin provides helper functions for running a dynamic
1818
// resource allocation kubelet plugin.
19+
//
20+
// A DRA driver using this package can be deployed as a DaemonSet on suitable
21+
// nodes. Node labeling, for example through NFD
22+
// (https://github.com/kubernetes-sigs/node-feature-discovery), can be used
23+
// to run the driver only on nodes which have the necessary hardware.
24+
//
25+
// The service account of the DaemonSet must have sufficient RBAC permissions
26+
// to read ResourceClaims and to create and update ResourceSlices, if
27+
// the driver intends to publish per-node ResourceSlices. It is good
28+
// security practice (but not required) to limit access to ResourceSlices
29+
// associated with the node a specific Pod is running on. This can be done
30+
// with a Validating Admission Policy (VAP). For more information,
31+
// see the deployment of the DRA example driver
32+
// (https://github.com/kubernetes-sigs/dra-example-driver/tree/main/deployments/helm/dra-example-driver/templates).
33+
//
34+
// Traditionally, the kubelet has not supported rolling updates of plugins.
35+
// Therefore the DaemonSet must not set `maxSurge` to a value larger than
36+
// zero. With the default `maxSurge: 0`, updating the DaemonSet of the driver
37+
// will first shut down the old driver Pod, then start the replacement.
38+
//
39+
// This leads to a short downtime for operations that need the driver:
40+
// - Pods cannot start unless the claims they depend on were already
41+
// prepared for use.
42+
// - Cleanup after the last pod which used a claim gets delayed
43+
// until the driver is available again. The pod is not marked
44+
// as terminated. This prevents reusing the resources used by
45+
// the pod for other pods.
46+
// - Running pods are *not* affected as far as Kubernetes is
47+
// concerned. However, a DRA driver might provide required runtime
48+
// services. Vendors need to document this.
49+
//
50+
// Note that the second point also means that draining a node should
51+
// first evict normal pods, then the driver DaemonSet Pod.
52+
//
53+
// Starting with Kubernetes 1.33, the kubelet supports rolling updates
54+
// such that old and new Pod run at the same time for a short while
55+
// and hand over work gracefully, with no downtime.
56+
// However, there is no mechanism for determining in advance whether
57+
// the node the DaemonSet runs on supports that. Trying
58+
// to do a rolling update with a kubelet which does not support it yet
59+
// will fail because shutting down the old Pod unregisters the driver
60+
// even though the new Pod is running. See https://github.com/kubernetes/kubernetes/pull/129832
61+
// for details (TODO: link to doc after merging instead).
62+
//
63+
// A DRA driver can either require 1.33 as minimal Kubernetes version or
64+
// provide two variants of its DaemonSet deployment. In the variant with
65+
// support for rolling updates, `maxSurge` can be set to a non-zero
66+
// value. Administrators have to be careful about running the right variant.
1967
package kubeletplugin

staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go

Lines changed: 76 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"errors"
2222
"fmt"
2323
"net"
24+
"path"
2425
"sync"
2526

2627
"google.golang.org/grpc"
@@ -36,6 +37,13 @@ import (
3637
registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
3738
)
3839

40+
const (
41+
// KubeletPluginsDir is the default directory for [PluginDataDirectoryPath].
42+
KubeletPluginsDir = "/var/lib/kubelet/plugins"
43+
// KubeletRegistryDir is the default for [RegistrarDirectoryPath]
44+
KubeletRegistryDir = "/var/lib/kubelet/plugins_registry"
45+
)
46+
3947
// DRAPlugin is the interface that needs to be implemented by a DRA driver to
4048
// use this helper package. The helper package then implements the gRPC
4149
// interface expected by the kubelet by wrapping the DRAPlugin implementation.
@@ -165,63 +173,77 @@ func GRPCVerbosity(level int) Option {
165173
}
166174
}
167175

168-
// RegistrarSocketPath sets the file path for a Unix domain socket.
169-
// If RegistrarListener is not used, then Start will remove
170-
// a file at that path, should one exist, and creates a socket
171-
// itself. Otherwise it uses the provided listener and only
172-
// removes the socket at the specified path during shutdown.
176+
// RegistrarDirectoryPath sets the path to the directory where the kubelet
177+
// expects to find registration sockets of plugins. Typically this is
178+
// /var/lib/kubelet/plugins_registry with /var/lib/kubelet being the kubelet's
179+
// data directory.
173180
//
174-
// At least one of these two options is required.
175-
func RegistrarSocketPath(path string) Option {
181+
// This is also the default. Some Kubernetes clusters may use a different data directory.
182+
// This path must be the same inside and outside of the driver's container.
183+
// The directory must exist.
184+
func RegistrarDirectoryPath(path string) Option {
176185
return func(o *options) error {
177-
o.pluginRegistrationEndpoint.path = path
186+
o.pluginRegistrationEndpoint.dir = path
178187
return nil
179188
}
180189
}
181190

182-
// RegistrarListener sets an already created listener for the plugin
183-
// registration API. Can be combined with RegistrarSocketPath.
191+
// RegistrarSocketFilename sets the name of the socket inside the directory where
192+
// the kubelet watches for registration sockets (see RegistrarDirectoryPath).
193+
//
194+
// Usually DRA drivers should not need this option. It is provided to
195+
// support updates from an installation which used an older release of
196+
// of the helper code.
184197
//
185-
// At least one of these two options is required.
186-
func RegistrarListener(listener net.Listener) Option {
198+
// The default is <driver name>-reg.sock. When rolling updates are enabled (not supported yet),
199+
// it is <driver name>-<uid>-reg.sock.
200+
func RegistrarSocketFilename(name string) Option {
187201
return func(o *options) error {
188-
o.pluginRegistrationEndpoint.listener = listener
202+
o.pluginRegistrationEndpoint.file = name
189203
return nil
190204
}
191205
}
192206

193-
// PluginSocketPath sets the file path for a Unix domain socket.
194-
// If PluginListener is not used, then Start will remove
195-
// a file at that path, should one exist, and creates a socket
196-
// itself. Otherwise it uses the provided listener and only
197-
// removes the socket at the specified path during shutdown.
207+
// RegistrarListener configures how to create the registrar socket.
208+
// The default is to remove the file if it exists and to then
209+
// create a socket.
198210
//
199-
// At least one of these two options is required.
200-
func PluginSocketPath(path string) Option {
211+
// This is used in Kubernetes for end-to-end testing. The default should
212+
// be fine for DRA drivers.
213+
func RegistrarListener(listen func(ctx context.Context, path string) (net.Listener, error)) Option {
201214
return func(o *options) error {
202-
o.draEndpoint.path = path
215+
o.pluginRegistrationEndpoint.listenFunc = listen
203216
return nil
204217
}
205218
}
206219

207-
// PluginListener sets an already created listener for the dynamic resource
208-
// allocation plugin API. Can be combined with PluginSocketPath.
220+
// PluginDataDirectoryPath sets the path where the DRA driver creates the
221+
// "dra.sock" socket that the kubelet connects to for the DRA-specific gRPC calls.
222+
// It is also used to coordinate between different Pods when using rolling
223+
// updates. It must not be shared with other kubelet plugins.
224+
//
225+
// The default is /var/lib/kubelet/plugins/<driver name>. This directory
226+
// does not need to be inside the kubelet data directory, as long as
227+
// the kubelet can access it.
209228
//
210-
// At least one of these two options is required.
211-
func PluginListener(listener net.Listener) Option {
229+
// This path must be the same inside and outside of the driver's container.
230+
// The directory must exist.
231+
func PluginDataDirectoryPath(path string) Option {
212232
return func(o *options) error {
213-
o.draEndpoint.listener = listener
233+
o.pluginDataDirectoryPath = path
214234
return nil
215235
}
216236
}
217237

218-
// KubeletPluginSocketPath defines how kubelet will connect to the dynamic
219-
// resource allocation plugin. This corresponds to PluginSocketPath, except
220-
// that PluginSocketPath defines the path in the filesystem of the caller and
221-
// KubeletPluginSocketPath in the filesystem of kubelet.
222-
func KubeletPluginSocketPath(path string) Option {
238+
// PluginListener configures how to create the registrar socket.
239+
// The default is to remove the file if it exists and to then
240+
// create a socket.
241+
//
242+
// This is used in Kubernetes for end-to-end testing. The default should
243+
// be fine for DRA drivers.
244+
func PluginListener(listen func(ctx context.Context, path string) (net.Listener, error)) Option {
223245
return func(o *options) error {
224-
o.draAddress = path
246+
o.draEndpointListen = listen
225247
return nil
226248
}
227249
}
@@ -306,9 +328,9 @@ type options struct {
306328
driverName string
307329
nodeName string
308330
nodeUID types.UID
309-
draEndpoint endpoint
310-
draAddress string
311331
pluginRegistrationEndpoint endpoint
332+
pluginDataDirectoryPath string
333+
draEndpointListen func(ctx context.Context, path string) (net.Listener, error)
312334
unaryInterceptors []grpc.UnaryServerInterceptor
313335
streamInterceptors []grpc.StreamServerInterceptor
314336
kubeClient kubernetes.Interface
@@ -349,14 +371,17 @@ type Helper struct {
349371
// a name to all log entries.
350372
//
351373
// If the plugin will be used to publish resources, [KubeClient] and [NodeName]
352-
// options are mandatory.
374+
// options are mandatory. Otherwise only [DriverName] is mandatory.
353375
func Start(ctx context.Context, plugin DRAPlugin, opts ...Option) (result *Helper, finalErr error) {
354376
logger := klog.FromContext(ctx)
355377
o := options{
356378
logger: klog.Background(),
357379
grpcVerbosity: 6, // Logs requests and responses, which can be large.
358380
serialize: true,
359381
nodeV1beta1: true,
382+
pluginRegistrationEndpoint: endpoint{
383+
dir: KubeletRegistryDir,
384+
},
360385
}
361386
for _, option := range opts {
362387
if err := option(&o); err != nil {
@@ -367,17 +392,12 @@ func Start(ctx context.Context, plugin DRAPlugin, opts ...Option) (result *Helpe
367392
if o.driverName == "" {
368393
return nil, errors.New("driver name must be set")
369394
}
370-
if o.draAddress == "" {
371-
return nil, errors.New("DRA address must be set")
395+
if o.pluginRegistrationEndpoint.file == "" {
396+
o.pluginRegistrationEndpoint.file = o.driverName + "-reg.sock"
372397
}
373-
var emptyEndpoint endpoint
374-
if o.draEndpoint == emptyEndpoint {
375-
return nil, errors.New("a Unix domain socket path and/or listener must be set for the kubelet plugin")
398+
if o.pluginDataDirectoryPath == "" {
399+
o.pluginDataDirectoryPath = path.Join(KubeletPluginsDir, o.driverName)
376400
}
377-
if o.pluginRegistrationEndpoint == emptyEndpoint {
378-
return nil, errors.New("a Unix domain socket path and/or listener must be set for the registrar")
379-
}
380-
381401
d := &Helper{
382402
driverName: o.driverName,
383403
nodeName: o.nodeName,
@@ -412,7 +432,12 @@ func Start(ctx context.Context, plugin DRAPlugin, opts ...Option) (result *Helpe
412432

413433
// Run the node plugin gRPC server first to ensure that it is ready.
414434
var supportedServices []string
415-
pluginServer, err := startGRPCServer(klog.NewContext(ctx, klog.LoggerWithName(logger, "dra")), o.grpcVerbosity, o.unaryInterceptors, o.streamInterceptors, o.draEndpoint, func(grpcServer *grpc.Server) {
435+
draEndpoint := endpoint{
436+
dir: o.pluginDataDirectoryPath,
437+
file: "dra.sock", // "dra" is hard-coded.
438+
listenFunc: o.draEndpointListen,
439+
}
440+
pluginServer, err := startGRPCServer(klog.LoggerWithName(logger, "dra"), o.grpcVerbosity, o.unaryInterceptors, o.streamInterceptors, draEndpoint, func(grpcServer *grpc.Server) {
416441
if o.nodeV1beta1 {
417442
logger.V(5).Info("registering v1beta1.DRAPlugin gRPC service")
418443
drapb.RegisterDRAPluginServer(grpcServer, &nodePluginImplementation{Helper: d})
@@ -428,7 +453,7 @@ func Start(ctx context.Context, plugin DRAPlugin, opts ...Option) (result *Helpe
428453
}
429454

430455
// Now make it available to kubelet.
431-
registrar, err := startRegistrar(klog.NewContext(ctx, klog.LoggerWithName(logger, "registrar")), o.grpcVerbosity, o.unaryInterceptors, o.streamInterceptors, o.driverName, supportedServices, o.draAddress, o.pluginRegistrationEndpoint)
456+
registrar, err := startRegistrar(klog.LoggerWithName(logger, "registrar"), o.grpcVerbosity, o.unaryInterceptors, o.streamInterceptors, o.driverName, supportedServices, draEndpoint.path(), o.pluginRegistrationEndpoint)
432457
if err != nil {
433458
return nil, fmt.Errorf("start registrar: %v", err)
434459
}
@@ -510,6 +535,11 @@ func (d *Helper) PublishResources(_ context.Context, resources resourceslice.Dri
510535
// our background context, not the one passed into this
511536
// function, and thus is connected to the lifecycle of the
512537
// plugin.
538+
//
539+
// TODO: don't delete ResourceSlices, not even on a clean shutdown.
540+
// We either support rolling updates and want to hand over seamlessly
541+
// or don't and then perhaps restart the pod quickly enough that
542+
// the kubelet hasn't deleted ResourceSlices yet.
513543
controllerCtx := d.backgroundCtx
514544
controllerLogger := klog.FromContext(controllerCtx)
515545
controllerLogger = klog.LoggerWithName(controllerLogger, "ResourceSlice controller")
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package kubeletplugin
18+
19+
import (
20+
"context"
21+
"errors"
22+
"fmt"
23+
"net"
24+
"os"
25+
"path"
26+
)
27+
28+
// endpoint defines where and how to listen for incoming connections.
29+
// The listener always gets closed when shutting down.
30+
//
31+
// If the listen function is not set, a new listener for a Unix domain socket gets
32+
// created at the path.
33+
type endpoint struct {
34+
dir, file string
35+
listenFunc func(ctx context.Context, socketpath string) (net.Listener, error)
36+
}
37+
38+
func (e endpoint) path() string {
39+
return path.Join(e.dir, e.file)
40+
}
41+
42+
func (e endpoint) listen(ctx context.Context) (net.Listener, error) {
43+
socketpath := e.path()
44+
45+
if e.listenFunc != nil {
46+
return e.listenFunc(ctx, socketpath)
47+
}
48+
49+
// Remove stale sockets, listen would fail otherwise.
50+
if err := e.removeSocket(); err != nil {
51+
return nil, err
52+
}
53+
cfg := net.ListenConfig{}
54+
listener, err := cfg.Listen(ctx, "unix", socketpath)
55+
if err != nil {
56+
if removeErr := e.removeSocket(); removeErr != nil {
57+
err = errors.Join(err, err)
58+
}
59+
return nil, err
60+
}
61+
return &unixListener{Listener: listener, endpoint: e}, nil
62+
}
63+
64+
func (e endpoint) removeSocket() error {
65+
if err := os.Remove(e.path()); err != nil && !os.IsNotExist(err) {
66+
return fmt.Errorf("remove Unix domain socket: %w", err)
67+
}
68+
return nil
69+
}
70+
71+
// unixListener adds removing of the Unix domain socket on Close.
72+
type unixListener struct {
73+
net.Listener
74+
endpoint endpoint
75+
}
76+
77+
func (l *unixListener) Close() error {
78+
err := l.Listener.Close()
79+
if removeErr := l.endpoint.removeSocket(); removeErr != nil {
80+
err = errors.Join(err, err)
81+
}
82+
return err
83+
}

0 commit comments

Comments
 (0)