Skip to content

Commit 582b421

Browse files
committed
DRA kubeletplugin: add RollingUpdate
When the new RollingUpdate option is used, the DRA driver gets deployed such that it uses unique socket paths and uses file locking to serialize gRPC calls. This enables the kubelet to pick arbitrarily between two concurrently instances. The handover is seamless (no downtime, no removal of ResourceSlices by the kubelet). For file locking, the fileutils package from etcd is used because that was already a Kubernetes dependency. Unfortunately that package brings in some additional indirect dependency for DRA drivers (zap, multierr), but those seem acceptable.
1 parent b471c2c commit 582b421

File tree

7 files changed

+235
-23
lines changed

7 files changed

+235
-23
lines changed

pkg/kubelet/pluginmanager/pluginwatcher/README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@ setting `maxSurge` to a value larger than zero enables such a seamless upgrade.
3232

3333
### In a plugin
3434

35+
*Note*: For DRA, the
36+
[k8s.io/dynamic-resource-allocation](https://pkg.go.dev/k8s.io/dynamic-resource-allocation/kubeletplugin)
37+
helper package offers the `RollingUpdate` option which implements the socket
38+
handling as described in this section.
39+
3540
To support seamless upgrades, each plugin instance must use a unique
3641
socket filename. Otherwise the following could happen:
3742
- The old instance is registered with `plugin.example.com-reg.sock`.

staging/src/k8s.io/dynamic-resource-allocation/go.mod

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ require (
1212
github.com/google/go-cmp v0.7.0
1313
github.com/onsi/gomega v1.35.1
1414
github.com/stretchr/testify v1.10.0
15+
go.etcd.io/etcd/client/pkg/v3 v3.5.16
1516
google.golang.org/grpc v1.68.1
1617
k8s.io/api v0.0.0
1718
k8s.io/apimachinery v0.0.0
@@ -57,6 +58,8 @@ require (
5758
github.com/x448/float16 v0.8.4 // indirect
5859
go.opentelemetry.io/otel v1.33.0 // indirect
5960
go.opentelemetry.io/otel/trace v1.33.0 // indirect
61+
go.uber.org/multierr v1.11.0 // indirect
62+
go.uber.org/zap v1.27.0 // indirect
6063
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
6164
golang.org/x/net v0.33.0 // indirect
6265
golang.org/x/oauth2 v0.27.0 // indirect

staging/src/k8s.io/dynamic-resource-allocation/go.sum

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go

Lines changed: 112 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,14 @@ import (
2121
"errors"
2222
"fmt"
2323
"net"
24+
"os"
2425
"path"
2526
"sync"
2627

2728
"google.golang.org/grpc"
2829
"k8s.io/klog/v2"
2930

31+
"go.etcd.io/etcd/client/pkg/v3/fileutil"
3032
resourceapi "k8s.io/api/resource/v1beta1"
3133
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3234
"k8s.io/apimachinery/pkg/types"
@@ -195,8 +197,10 @@ func RegistrarDirectoryPath(path string) Option {
195197
// support updates from an installation which used an older release of
196198
// of the helper code.
197199
//
198-
// The default is <driver name>-reg.sock. When rolling updates are enabled (not supported yet),
200+
// The default is <driver name>-reg.sock. When rolling updates are enabled,
199201
// it is <driver name>-<uid>-reg.sock.
202+
//
203+
// This option and [RollingUpdate] are mutually exclusive.
200204
func RegistrarSocketFilename(name string) Option {
201205
return func(o *options) error {
202206
o.pluginRegistrationEndpoint.file = name
@@ -248,6 +252,44 @@ func PluginListener(listen func(ctx context.Context, path string) (net.Listener,
248252
}
249253
}
250254

255+
// RollingUpdate can be used to enable support for running two plugin instances
256+
// in parallel while a newer instance replaces the older. When enabled, both
257+
// instances must share the same plugin data directory and driver name.
258+
// They create different sockets to allow the kubelet to connect to both at
259+
// the same time.
260+
//
261+
// There is no guarantee which of the two instances are used by kubelet.
262+
// For example, it can happen that a claim gets prepared by one instance
263+
// and then needs to be unprepared by the other. Kubelet then may fall back
264+
// to the first one again for some other operation. In practice this means
265+
// that each instance must be entirely stateless across method calls.
266+
// Serialization (on by default, see [Serialize]) ensures that methods
267+
// are serialized across all instances through file locking. The plugin
268+
// implementation can load shared state from a file at the start
269+
// of a call, execute and then store the updated shared state again.
270+
//
271+
// Passing a non-empty uid enables rolling updates, an empty uid disables it.
272+
// The uid must be the pod UID. A DaemonSet can pass that into the driver container
273+
// via the downward API (https://kubernetes.io/docs/concepts/workloads/pods/downward-api/#downwardapi-fieldRef).
274+
//
275+
// Because new instances cannot remove stale sockets of older instances,
276+
// it is important that each pod shuts down cleanly: it must catch SIGINT/TERM
277+
// and stop the helper instead of quitting immediately.
278+
//
279+
// This depends on support in the kubelet which was added in Kubernetes 1.33.
280+
// Don't use this if it is not certain that the kubelet has that support!
281+
//
282+
// This option and [RegistrarSocketFilename] are mutually exclusive.
283+
func RollingUpdate(uid types.UID) Option {
284+
return func(o *options) error {
285+
o.rollingUpdateUID = uid
286+
287+
// TODO: ask the kubelet whether that pod is still running and
288+
// clean up leftover sockets?
289+
return nil
290+
}
291+
}
292+
251293
// GRPCInterceptor is called for each incoming gRPC method call. This option
252294
// may be used more than once and each interceptor will get called.
253295
func GRPCInterceptor(interceptor grpc.UnaryServerInterceptor) Option {
@@ -322,6 +364,17 @@ func Serialize(enabled bool) Option {
322364
}
323365
}
324366

367+
// FlockDir changes where lock files are created and locked. A lock file
368+
// is needed when serializing gRPC calls and rolling updates are enabled.
369+
// The directory must exist and be reserved for exclusive use by the
370+
// driver. The default is the plugin data directory.
371+
func FlockDirectoryPath(path string) Option {
372+
return func(o *options) error {
373+
o.flockDirectoryPath = path
374+
return nil
375+
}
376+
}
377+
325378
type options struct {
326379
logger klog.Logger
327380
grpcVerbosity int
@@ -330,11 +383,13 @@ type options struct {
330383
nodeUID types.UID
331384
pluginRegistrationEndpoint endpoint
332385
pluginDataDirectoryPath string
386+
rollingUpdateUID types.UID
333387
draEndpointListen func(ctx context.Context, path string) (net.Listener, error)
334388
unaryInterceptors []grpc.UnaryServerInterceptor
335389
streamInterceptors []grpc.StreamServerInterceptor
336390
kubeClient kubernetes.Interface
337391
serialize bool
392+
flockDirectoryPath string
338393
nodeV1beta1 bool
339394
}
340395

@@ -344,17 +399,18 @@ type Helper struct {
344399
// backgroundCtx is for activities that are started later.
345400
backgroundCtx context.Context
346401
// cancel cancels the backgroundCtx.
347-
cancel func(cause error)
348-
wg sync.WaitGroup
349-
registrar *nodeRegistrar
350-
pluginServer *grpcServer
351-
plugin DRAPlugin
352-
driverName string
353-
nodeName string
354-
nodeUID types.UID
355-
kubeClient kubernetes.Interface
356-
serialize bool
357-
grpcMutex sync.Mutex
402+
cancel func(cause error)
403+
wg sync.WaitGroup
404+
registrar *nodeRegistrar
405+
pluginServer *grpcServer
406+
plugin DRAPlugin
407+
driverName string
408+
nodeName string
409+
nodeUID types.UID
410+
kubeClient kubernetes.Interface
411+
serialize bool
412+
grpcMutex sync.Mutex
413+
grpcLockFilePath string
358414

359415
// Information about resource publishing changes concurrently and thus
360416
// must be protected by the mutex. The controller gets started only
@@ -392,12 +448,20 @@ func Start(ctx context.Context, plugin DRAPlugin, opts ...Option) (result *Helpe
392448
if o.driverName == "" {
393449
return nil, errors.New("driver name must be set")
394450
}
451+
if o.rollingUpdateUID != "" && o.pluginRegistrationEndpoint.file != "" {
452+
return nil, errors.New("rolling updates and explicit registration socket filename are mutually exclusive")
453+
}
454+
uidPart := ""
455+
if o.rollingUpdateUID != "" {
456+
uidPart = "-" + string(o.rollingUpdateUID)
457+
}
395458
if o.pluginRegistrationEndpoint.file == "" {
396-
o.pluginRegistrationEndpoint.file = o.driverName + "-reg.sock"
459+
o.pluginRegistrationEndpoint.file = o.driverName + uidPart + "-reg.sock"
397460
}
398461
if o.pluginDataDirectoryPath == "" {
399462
o.pluginDataDirectoryPath = path.Join(KubeletPluginsDir, o.driverName)
400463
}
464+
401465
d := &Helper{
402466
driverName: o.driverName,
403467
nodeName: o.nodeName,
@@ -406,6 +470,14 @@ func Start(ctx context.Context, plugin DRAPlugin, opts ...Option) (result *Helpe
406470
serialize: o.serialize,
407471
plugin: plugin,
408472
}
473+
if o.rollingUpdateUID != "" {
474+
dir := o.pluginDataDirectoryPath
475+
if o.flockDirectoryPath != "" {
476+
dir = o.flockDirectoryPath
477+
}
478+
// Enable file locking, required for concurrently running pods.
479+
d.grpcLockFilePath = path.Join(dir, "serialize.lock")
480+
}
409481

410482
// Stop calls cancel and therefore both cancellation
411483
// and Stop cause goroutines to stop.
@@ -434,7 +506,7 @@ func Start(ctx context.Context, plugin DRAPlugin, opts ...Option) (result *Helpe
434506
var supportedServices []string
435507
draEndpoint := endpoint{
436508
dir: o.pluginDataDirectoryPath,
437-
file: "dra.sock", // "dra" is hard-coded.
509+
file: "dra" + uidPart + ".sock", // "dra" is hard-coded. The directory is unique, so we get a unique full path also without the UID.
438510
listenFunc: o.draEndpointListen,
439511
}
440512
pluginServer, err := startGRPCServer(klog.LoggerWithName(logger, "dra"), o.grpcVerbosity, o.unaryInterceptors, o.streamInterceptors, draEndpoint, func(grpcServer *grpc.Server) {
@@ -575,12 +647,25 @@ func (d *Helper) RegistrationStatus() *registerapi.RegistrationStatus {
575647
// serializeGRPCIfEnabled locks a mutex if serialization is enabled.
576648
// Either way it returns a method that the caller must invoke
577649
// via defer.
578-
func (d *Helper) serializeGRPCIfEnabled() func() {
650+
func (d *Helper) serializeGRPCIfEnabled() (func(), error) {
579651
if !d.serialize {
580-
return func() {}
652+
return func() {}, nil
581653
}
654+
655+
// If rolling updates are enabled, we cannot do only in-memory locking.
656+
// We must use file locking.
657+
if d.grpcLockFilePath != "" {
658+
file, err := fileutil.LockFile(d.grpcLockFilePath, os.O_RDWR|os.O_CREATE, 0666)
659+
if err != nil {
660+
return nil, fmt.Errorf("lock file: %w", err)
661+
}
662+
return func() {
663+
_ = file.Close()
664+
}, nil
665+
}
666+
582667
d.grpcMutex.Lock()
583-
return d.grpcMutex.Unlock
668+
return d.grpcMutex.Unlock, nil
584669
}
585670

586671
// nodePluginImplementation is a thin wrapper around the helper instance.
@@ -597,7 +682,11 @@ func (d *nodePluginImplementation) NodePrepareResources(ctx context.Context, req
597682
return nil, fmt.Errorf("get resource claims: %w", err)
598683
}
599684

600-
defer d.serializeGRPCIfEnabled()()
685+
unlock, err := d.serializeGRPCIfEnabled()
686+
if err != nil {
687+
return nil, fmt.Errorf("serialize gRPC: %w", err)
688+
}
689+
defer unlock()
601690

602691
result, err := d.plugin.PrepareResourceClaims(ctx, claims)
603692
if err != nil {
@@ -659,7 +748,11 @@ func (d *nodePluginImplementation) getResourceClaims(ctx context.Context, claims
659748

660749
// NodeUnprepareResources implements [draapi.NodeUnprepareResources].
661750
func (d *nodePluginImplementation) NodeUnprepareResources(ctx context.Context, req *drapb.NodeUnprepareResourcesRequest) (*drapb.NodeUnprepareResourcesResponse, error) {
662-
defer d.serializeGRPCIfEnabled()
751+
unlock, err := d.serializeGRPCIfEnabled()
752+
if err != nil {
753+
return nil, fmt.Errorf("serialize gRPC: %w", err)
754+
}
755+
defer unlock()
663756

664757
claims := make([]NamespacedObject, 0, len(req.Claims))
665758
for _, claim := range req.Claims {

test/e2e/dra/deploy.go

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ const (
7777

7878
type Nodes struct {
7979
NodeNames []string
80+
tempDir string
8081
}
8182

8283
type Resources struct {
@@ -112,6 +113,8 @@ func NewNodesNow(ctx context.Context, f *framework.Framework, minNodes, maxNodes
112113
}
113114

114115
func (nodes *Nodes) init(ctx context.Context, f *framework.Framework, minNodes, maxNodes int) {
116+
nodes.tempDir = ginkgo.GinkgoT().TempDir()
117+
115118
ginkgo.By("selecting nodes")
116119
// The kubelet plugin is harder. We deploy the builtin manifest
117120
// after patching in the driver name and all nodes on which we
@@ -219,6 +222,12 @@ func (d *Driver) Run(nodes *Nodes, configureResources func() Resources, devicesP
219222
ginkgo.DeferCleanup(d.TearDown)
220223
}
221224

225+
// NewGetSlices generates a function for gomega.Eventually/Consistently which
226+
// returns the ResourceSliceList.
227+
func (d *Driver) NewGetSlices() framework.GetFunc[*resourceapi.ResourceSliceList] {
228+
return framework.ListObjects(d.f.ClientSet.ResourceV1beta1().ResourceSlices().List, metav1.ListOptions{FieldSelector: resourceapi.ResourceSliceSelectorDriver + "=" + d.Name})
229+
}
230+
222231
type MethodInstance struct {
223232
Nodename string
224233
FullMethod string
@@ -240,6 +249,10 @@ type Driver struct {
240249
// The socket path is still the same.
241250
InstanceSuffix string
242251

252+
// RollingUpdate can be set to true to enable using different socket names
253+
// for different pods and thus seamless upgrades. Must be supported by the kubelet!
254+
RollingUpdate bool
255+
243256
// Name gets derived automatically from the current test namespace and
244257
// (if set) the NameSuffix while setting up the driver for a test.
245258
Name string
@@ -351,7 +364,6 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[
351364
numNodes := int32(len(nodes.NodeNames))
352365
pluginDataDirectoryPath := path.Join(framework.TestContext.KubeletRootDir, "plugins", d.Name)
353366
registrarDirectoryPath := path.Join(framework.TestContext.KubeletRootDir, "plugins_registry")
354-
registrarSocketFilename := d.Name + "-reg.sock"
355367
instanceName := d.Name + d.InstanceSuffix
356368
err := utils.CreateFromManifests(ctx, d.f, d.f.Namespace, func(item interface{}) error {
357369
switch item := item.(type) {
@@ -447,6 +459,14 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[
447459
// All listeners running in this pod use a new unique local port number
448460
// by atomically incrementing this variable.
449461
listenerPort := int32(9000)
462+
rollingUpdateUID := pod.UID
463+
serialize := true
464+
if !d.RollingUpdate {
465+
rollingUpdateUID = ""
466+
// A test might have to execute two gRPC calls in parallel, so only
467+
// serialize when we explicitly want to test a rolling update.
468+
serialize = false
469+
}
450470
plugin, err := app.StartPlugin(loggerCtx, "/cdi", d.Name, driverClient, nodename, fileOps,
451471
kubeletplugin.GRPCVerbosity(0),
452472
kubeletplugin.GRPCInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
@@ -456,11 +476,14 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[
456476
return d.streamInterceptor(nodename, srv, ss, info, handler)
457477
}),
458478

479+
kubeletplugin.RollingUpdate(rollingUpdateUID),
480+
kubeletplugin.Serialize(serialize),
481+
kubeletplugin.FlockDirectoryPath(nodes.tempDir),
482+
459483
kubeletplugin.PluginDataDirectoryPath(pluginDataDirectoryPath),
460484
kubeletplugin.PluginListener(listen(d.f, &pod, &listenerPort)),
461485

462486
kubeletplugin.RegistrarDirectoryPath(registrarDirectoryPath),
463-
kubeletplugin.RegistrarSocketFilename(registrarSocketFilename),
464487
kubeletplugin.RegistrarListener(listen(d.f, &pod, &listenerPort)),
465488
)
466489
framework.ExpectNoError(err, "start kubelet plugin for node %s", pod.Spec.NodeName)

0 commit comments

Comments
 (0)