Skip to content

Commit c6252da

Browse files
committed
DRA kubeletplugin: turn helper into wrapper
The goal is to simplify writing DRA drivers. This is also a first step towards supporting seamless upgrades. DRA drivers no longer need to implement the kubelet plugin API directly. Instead, the helper wraps an implementation of an interface. The helper then provides common functionality: - retrieve and validate ResourceClaims - serialize gRPC calls (enabled by default, can be opted out) - gRPC logging The definition of that interface is meant to be comprehensive enough that a correct DRA driver can be implemented by following the documentation of the package, without having to cross-reference KEPs. The DRAPlugin interface used to be the abstract API of the helper. Now it's what the DRA driver kubelet plugin needs to implement. The helper is a concrete Server struct with no exported fields. It only exports the methods that drivers need when using the helper. While at it, support for the v1alpha4 API gets removed from the helper, which implies removing the corresponding E2E tests. The kubelet implementation will be dropped separately.
1 parent be32ca6 commit c6252da

File tree

7 files changed

+396
-198
lines changed

7 files changed

+396
-198
lines changed

staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go

Lines changed: 281 additions & 98 deletions
Large diffs are not rendered by default.
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package kubeletplugin
18+
19+
import (
20+
"k8s.io/apimachinery/pkg/types"
21+
)
22+
23+
// NamespacedObject comprises a resource name with a mandatory namespace
24+
// and optional UID. It gets rendered as "<namespace>/<name>:[<uid>]"
25+
// (text output) or as an object (JSON output).
26+
type NamespacedObject struct {
27+
types.NamespacedName
28+
UID types.UID
29+
}
30+
31+
// String returns the general purpose string representation
32+
func (n NamespacedObject) String() string {
33+
if n.UID != "" {
34+
return n.Namespace + string(types.Separator) + n.Name + ":" + string(n.UID)
35+
}
36+
return n.Namespace + string(types.Separator) + n.Name
37+
}
38+
39+
// MarshalLog emits a struct containing required key/value pair
40+
func (n NamespacedObject) MarshalLog() interface{} {
41+
return struct {
42+
Name string `json:"name"`
43+
Namespace string `json:"namespace,omitempty"`
44+
UID types.UID `json:"uid,omitempty"`
45+
}{
46+
Name: n.Name,
47+
Namespace: n.Namespace,
48+
UID: n.UID,
49+
}
50+
}

staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/nonblockinggrpcserver.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -82,14 +82,18 @@ func startGRPCServer(valueCtx context.Context, grpcVerbosity int, unaryIntercept
8282

8383
// Run a gRPC server. It will close the listening socket when
8484
// shutting down, so we don't need to do that.
85+
// The per-context logger always gets initialized because
86+
// there might be log output inside the method implementations.
8587
var opts []grpc.ServerOption
86-
finalUnaryInterceptors := []grpc.UnaryServerInterceptor{unaryContextInterceptor(valueCtx)}
87-
finalStreamInterceptors := []grpc.StreamServerInterceptor{streamContextInterceptor(valueCtx)}
88-
if grpcVerbosity >= 0 {
89-
finalUnaryInterceptors = append(finalUnaryInterceptors, s.interceptor)
90-
finalStreamInterceptors = append(finalStreamInterceptors, s.streamInterceptor)
88+
finalUnaryInterceptors := []grpc.UnaryServerInterceptor{
89+
unaryContextInterceptor(valueCtx),
90+
s.interceptor,
9191
}
9292
finalUnaryInterceptors = append(finalUnaryInterceptors, unaryInterceptors...)
93+
finalStreamInterceptors := []grpc.StreamServerInterceptor{
94+
streamContextInterceptor(valueCtx),
95+
s.streamInterceptor,
96+
}
9397
finalStreamInterceptors = append(finalStreamInterceptors, streamInterceptors...)
9498
opts = append(opts, grpc.ChainUnaryInterceptor(finalUnaryInterceptors...))
9599
opts = append(opts, grpc.ChainStreamInterceptor(finalStreamInterceptors...))
@@ -164,7 +168,6 @@ func (m mergeCtx) Value(i interface{}) interface{} {
164168
// sequentially increasing request ID and adds that logger to the context. It
165169
// also logs request and response.
166170
func (s *grpcServer) interceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
167-
168171
requestID := atomic.AddInt64(&requestID, 1)
169172
logger := klog.FromContext(ctx)
170173
logger = klog.LoggerWithValues(logger, "requestID", requestID, "method", info.FullMethod)

test/e2e/dra/deploy.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,6 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[
430430
kubeletplugin.PluginListener(listen(ctx, d.f, pod.Name, "plugin", 9001)),
431431
kubeletplugin.RegistrarListener(listen(ctx, d.f, pod.Name, "registrar", 9000)),
432432
kubeletplugin.KubeletPluginSocketPath(draAddr),
433-
kubeletplugin.NodeV1alpha4(d.NodeV1alpha4),
434433
kubeletplugin.NodeV1beta1(d.NodeV1beta1),
435434
)
436435
framework.ExpectNoError(err, "start kubelet plugin for node %s", pod.Spec.NodeName)

test/e2e/dra/dra.go

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1737,15 +1737,13 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
17371737
})
17381738
})
17391739

1740-
multipleDrivers := func(nodeV1alpha4, nodeV1beta1 bool) {
1740+
multipleDrivers := func(nodeV1beta1 bool) {
17411741
nodes := NewNodes(f, 1, 4)
17421742
driver1 := NewDriver(f, nodes, perNode(2, nodes))
1743-
driver1.NodeV1alpha4 = nodeV1alpha4
17441743
driver1.NodeV1beta1 = nodeV1beta1
17451744
b1 := newBuilder(f, driver1)
17461745

17471746
driver2 := NewDriver(f, nodes, perNode(2, nodes))
1748-
driver2.NodeV1alpha4 = nodeV1alpha4
17491747
driver2.NodeV1beta1 = nodeV1beta1
17501748
driver2.NameSuffix = "-other"
17511749
b2 := newBuilder(f, driver2)
@@ -1769,16 +1767,14 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
17691767
b1.testPod(ctx, f, pod)
17701768
})
17711769
}
1772-
multipleDriversContext := func(prefix string, nodeV1alpha4, nodeV1beta1 bool) {
1770+
multipleDriversContext := func(prefix string, nodeV1beta1 bool) {
17731771
ginkgo.Context(prefix, func() {
1774-
multipleDrivers(nodeV1alpha4, nodeV1beta1)
1772+
multipleDrivers(nodeV1beta1)
17751773
})
17761774
}
17771775

17781776
ginkgo.Context("multiple drivers", func() {
1779-
multipleDriversContext("using only drapbv1alpha4", true, false)
1780-
multipleDriversContext("using only drapbv1beta1", false, true)
1781-
multipleDriversContext("using both drav1alpha4 and drapbv1beta1", true, true)
1777+
multipleDriversContext("using only drapbv1beta1", true)
17821778
})
17831779

17841780
ginkgo.It("runs pod after driver starts", func(ctx context.Context) {

test/e2e/dra/test-driver/README.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,11 @@ RUNTIME_CONFIG="resource.k8s.io/v1alpha3" FEATURE_GATES=DynamicResourceAllocatio
6060

6161
In another:
6262
```
63-
sudo mkdir -p /var/run/cdi && sudo chmod a+rwx /var/run/cdi /var/lib/kubelet/plugins_registry
63+
sudo mkdir -p /var/run/cdi
64+
sudo mkdir -p /var/lib/kubelet/plugins
65+
sudo mkdir -p /var/lib/kubelet/plugins_registry
66+
sudo chmod a+rx /var/lib/kubelet /var/lib/kubelet/plugins
67+
sudo chmod a+rwx /var/run/cdi /var/lib/kubelet/plugins_registry
6468
KUBECONFIG=/var/run/kubernetes/admin.kubeconfig go run ./test/e2e/dra/test-driver -v=5 kubelet-plugin --node-name=127.0.0.1
6569
```
6670

0 commit comments

Comments
 (0)