Skip to content

Commit 2c23fe1

Browse files
committed
DRA kubelet: list supported gRPC services during registration
Listing supported gRPC services (e.g. drav1alpha3.Node, drav1beta1.DRAPlugin) during registration enables the kubelet to determine in advance which methods it can call. Versioning by Kubernetes release makes less sense because it doesn't say anything about which gRPC service is supported. New ones might get added and obsolete ones removed. Some services might be optional. In the past, this versioning support wasn't really used. At least one version had to be provided and kubelet tried to use the plugin with the highest version. This version comparison gets dropped. In the unlikely situation that different plugins register under the same name, the most recent one is used. Because advertising gRPC services is a new convention, plugins only reporting some version are treated as providing the old alpha gRPC service.
1 parent 437be1e commit 2c23fe1

File tree

9 files changed

+171
-165
lines changed

9 files changed

+171
-165
lines changed

pkg/kubelet/cm/dra/manager_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -581,7 +581,7 @@ func TestPrepareResources(t *testing.T) {
581581
defer draServerInfo.teardownFn()
582582

583583
plg := plugin.NewRegistrationHandler(nil, getFakeNode)
584-
if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{"1.27"}, pluginClientTimeout); err != nil {
584+
if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{drapb.DRAPluginService}, pluginClientTimeout); err != nil {
585585
t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err)
586586
}
587587
defer plg.DeRegisterPlugin(test.driverName) // for sake of next tests
@@ -718,7 +718,7 @@ func TestUnprepareResources(t *testing.T) {
718718
defer draServerInfo.teardownFn()
719719

720720
plg := plugin.NewRegistrationHandler(nil, getFakeNode)
721-
if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{"1.27"}, pluginClientTimeout); err != nil {
721+
if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{drapb.DRAPluginService}, pluginClientTimeout); err != nil {
722722
t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err)
723723
}
724724
defer plg.DeRegisterPlugin(test.driverName) // for sake of next tests
@@ -888,7 +888,7 @@ func TestParallelPrepareUnprepareResources(t *testing.T) {
888888
defer draServerInfo.teardownFn()
889889

890890
plg := plugin.NewRegistrationHandler(nil, getFakeNode)
891-
if err := plg.RegisterPlugin(driverName, draServerInfo.socketName, []string{"1.27"}, nil); err != nil {
891+
if err := plg.RegisterPlugin(driverName, draServerInfo.socketName, []string{drapb.DRAPluginService}, nil); err != nil {
892892
t.Fatalf("failed to register plugin %s, err: %v", driverName, err)
893893
}
894894
defer plg.DeRegisterPlugin(driverName)

pkg/kubelet/cm/dra/plugin/plugin.go

Lines changed: 24 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,10 @@ import (
2525
"time"
2626

2727
"google.golang.org/grpc"
28-
"google.golang.org/grpc/codes"
2928
"google.golang.org/grpc/connectivity"
3029
"google.golang.org/grpc/credentials/insecure"
3130
"google.golang.org/grpc/status"
3231

33-
utilversion "k8s.io/apimachinery/pkg/util/version"
3432
"k8s.io/klog/v2"
3533
drapbv1alpha4 "k8s.io/kubelet/pkg/apis/dra/v1alpha4"
3634
drapbv1beta1 "k8s.io/kubelet/pkg/apis/dra/v1beta1"
@@ -59,27 +57,19 @@ type Plugin struct {
5957
backgroundCtx context.Context
6058
cancel func(cause error)
6159

62-
mutex sync.Mutex
63-
conn *grpc.ClientConn
64-
supportedAPI apiVersion
65-
endpoint string
66-
highestSupportedVersion *utilversion.Version
67-
clientCallTimeout time.Duration
60+
mutex sync.Mutex
61+
conn *grpc.ClientConn
62+
endpoint string
63+
chosenService string // e.g. drapbv1beta1.DRAPluginService
64+
clientCallTimeout time.Duration
6865
}
6966

70-
type apiVersion string
71-
72-
const (
73-
apiV1alpha4 = apiVersion("v1alpha4")
74-
apiV1beta1 = apiVersion("v1beta1")
75-
)
76-
77-
func (p *Plugin) getOrCreateGRPCConn() (*grpc.ClientConn, apiVersion, error) {
67+
func (p *Plugin) getOrCreateGRPCConn() (*grpc.ClientConn, error) {
7868
p.mutex.Lock()
7969
defer p.mutex.Unlock()
8070

8171
if p.conn != nil {
82-
return p.conn, p.supportedAPI, nil
72+
return p.conn, nil
8373
}
8474

8575
ctx := p.backgroundCtx
@@ -101,18 +91,18 @@ func (p *Plugin) getOrCreateGRPCConn() (*grpc.ClientConn, apiVersion, error) {
10191
grpc.WithChainUnaryInterceptor(newMetricsInterceptor(p.name)),
10292
)
10393
if err != nil {
104-
return nil, "", err
94+
return nil, err
10595
}
10696

10797
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
10898
defer cancel()
10999

110100
if ok := conn.WaitForStateChange(ctx, connectivity.Connecting); !ok {
111-
return nil, "", errors.New("timed out waiting for gRPC connection to be ready")
101+
return nil, errors.New("timed out waiting for gRPC connection to be ready")
112102
}
113103

114104
p.conn = conn
115-
return p.conn, "", nil
105+
return p.conn, nil
116106
}
117107

118108
func (p *Plugin) NodePrepareResources(
@@ -123,7 +113,7 @@ func (p *Plugin) NodePrepareResources(
123113
logger := klog.FromContext(ctx)
124114
logger.V(4).Info("Calling NodePrepareResources rpc", "request", req)
125115

126-
conn, supportedAPI, err := p.getOrCreateGRPCConn()
116+
conn, err := p.getOrCreateGRPCConn()
127117
if err != nil {
128118
return nil, err
129119
}
@@ -132,29 +122,17 @@ func (p *Plugin) NodePrepareResources(
132122
defer cancel()
133123

134124
var response *drapbv1beta1.NodePrepareResourcesResponse
135-
switch supportedAPI {
136-
case apiV1beta1:
125+
switch p.chosenService {
126+
case drapbv1beta1.DRAPluginService:
137127
nodeClient := drapbv1beta1.NewDRAPluginClient(conn)
138128
response, err = nodeClient.NodePrepareResources(ctx, req)
139-
case apiV1alpha4:
129+
case drapbv1alpha4.NodeService:
140130
nodeClient := drapbv1alpha4.NewNodeClient(conn)
141131
response, err = nodeClient.NodePrepareResources(ctx, req)
142132
default:
143-
// Try it, fall back if necessary.
144-
supportedAPI = apiV1beta1
145-
nodeClient := drapbv1beta1.NewDRAPluginClient(conn)
146-
response, err = nodeClient.NodePrepareResources(ctx, req)
147-
if err != nil && status.Convert(err).Code() == codes.Unimplemented {
148-
supportedAPI = apiV1alpha4
149-
nodeClient := drapbv1alpha4.NewNodeClient(conn)
150-
response, err = nodeClient.NodePrepareResources(ctx, req)
151-
}
152-
if err == nil || status.Convert(err).Code() != codes.Unimplemented {
153-
// Store discovered version for future use.
154-
p.mutex.Lock()
155-
p.supportedAPI = supportedAPI
156-
p.mutex.Unlock()
157-
}
133+
// Shouldn't happen, validateSupportedServices should only
134+
// return services we support here.
135+
return nil, fmt.Errorf("internal error: unsupported chosen service: %q", p.chosenService)
158136
}
159137
logger.V(4).Info("Done calling NodePrepareResources rpc", "response", response, "err", err)
160138
return response, err
@@ -168,7 +146,7 @@ func (p *Plugin) NodeUnprepareResources(
168146
logger := klog.FromContext(ctx)
169147
logger.V(4).Info("Calling NodeUnprepareResource rpc", "request", req)
170148

171-
conn, supportedAPI, err := p.getOrCreateGRPCConn()
149+
conn, err := p.getOrCreateGRPCConn()
172150
if err != nil {
173151
return nil, err
174152
}
@@ -177,29 +155,17 @@ func (p *Plugin) NodeUnprepareResources(
177155
defer cancel()
178156

179157
var response *drapbv1beta1.NodeUnprepareResourcesResponse
180-
switch supportedAPI {
181-
case apiV1beta1:
158+
switch p.chosenService {
159+
case drapbv1beta1.DRAPluginService:
182160
nodeClient := drapbv1beta1.NewDRAPluginClient(conn)
183161
response, err = nodeClient.NodeUnprepareResources(ctx, req)
184-
case apiV1alpha4:
162+
case drapbv1alpha4.NodeService:
185163
nodeClient := drapbv1alpha4.NewNodeClient(conn)
186164
response, err = nodeClient.NodeUnprepareResources(ctx, req)
187165
default:
188-
// Try it, fall back if necessary.
189-
supportedAPI = apiV1beta1
190-
nodeClient := drapbv1beta1.NewDRAPluginClient(conn)
191-
response, err = nodeClient.NodeUnprepareResources(ctx, req)
192-
if err != nil && status.Convert(err).Code() == codes.Unimplemented {
193-
supportedAPI = apiV1alpha4
194-
nodeClient := drapbv1alpha4.NewNodeClient(conn)
195-
response, err = nodeClient.NodeUnprepareResources(ctx, req)
196-
}
197-
if err == nil || status.Convert(err).Code() != codes.Unimplemented {
198-
// Store discovered version for future use.
199-
p.mutex.Lock()
200-
p.supportedAPI = supportedAPI
201-
p.mutex.Unlock()
202-
}
166+
// Shouldn't happen, validateSupportedServices should only
167+
// return services we support here.
168+
return nil, fmt.Errorf("internal error: unsupported chosen service: %q", p.chosenService)
203169
}
204170
logger.V(4).Info("Done calling NodeUnprepareResources rpc", "response", response, "err", err)
205171
return response, err

pkg/kubelet/cm/dra/plugin/plugin_test.go

Lines changed: 41 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -27,23 +27,20 @@ import (
2727

2828
"github.com/stretchr/testify/assert"
2929
"google.golang.org/grpc"
30-
drapb "k8s.io/kubelet/pkg/apis/dra/v1beta1"
30+
drapbv1alpha4 "k8s.io/kubelet/pkg/apis/dra/v1alpha4"
31+
drapbv1beta1 "k8s.io/kubelet/pkg/apis/dra/v1beta1"
3132
"k8s.io/kubernetes/test/utils/ktesting"
3233
)
3334

34-
const (
35-
v1alpha4Version = "v1alpha4"
36-
)
37-
38-
type fakeV1alpha4GRPCServer struct {
39-
drapb.UnimplementedDRAPluginServer
35+
type fakeGRPCServer struct {
36+
drapbv1beta1.UnimplementedDRAPluginServer
4037
}
4138

42-
var _ drapb.DRAPluginServer = &fakeV1alpha4GRPCServer{}
39+
var _ drapbv1beta1.DRAPluginServer = &fakeGRPCServer{}
4340

44-
func (f *fakeV1alpha4GRPCServer) NodePrepareResources(ctx context.Context, in *drapb.NodePrepareResourcesRequest) (*drapb.NodePrepareResourcesResponse, error) {
45-
return &drapb.NodePrepareResourcesResponse{Claims: map[string]*drapb.NodePrepareResourceResponse{"claim-uid": {
46-
Devices: []*drapb.Device{
41+
func (f *fakeGRPCServer) NodePrepareResources(ctx context.Context, in *drapbv1beta1.NodePrepareResourcesRequest) (*drapbv1beta1.NodePrepareResourcesResponse, error) {
42+
return &drapbv1beta1.NodePrepareResourcesResponse{Claims: map[string]*drapbv1beta1.NodePrepareResourceResponse{"claim-uid": {
43+
Devices: []*drapbv1beta1.Device{
4744
{
4845
RequestNames: []string{"test-request"},
4946
CDIDeviceIDs: []string{"test-cdi-id"},
@@ -52,14 +49,14 @@ func (f *fakeV1alpha4GRPCServer) NodePrepareResources(ctx context.Context, in *d
5249
}}}, nil
5350
}
5451

55-
func (f *fakeV1alpha4GRPCServer) NodeUnprepareResources(ctx context.Context, in *drapb.NodeUnprepareResourcesRequest) (*drapb.NodeUnprepareResourcesResponse, error) {
52+
func (f *fakeGRPCServer) NodeUnprepareResources(ctx context.Context, in *drapbv1beta1.NodeUnprepareResourcesRequest) (*drapbv1beta1.NodeUnprepareResourcesResponse, error) {
5653

57-
return &drapb.NodeUnprepareResourcesResponse{}, nil
54+
return &drapbv1beta1.NodeUnprepareResourcesResponse{}, nil
5855
}
5956

6057
type tearDown func()
6158

62-
func setupFakeGRPCServer(version string) (string, tearDown, error) {
59+
func setupFakeGRPCServer(service string) (string, tearDown, error) {
6360
p, err := os.MkdirTemp("", "dra_plugin")
6461
if err != nil {
6562
return "", nil, err
@@ -81,12 +78,14 @@ func setupFakeGRPCServer(version string) (string, tearDown, error) {
8178
}
8279

8380
s := grpc.NewServer()
84-
switch version {
85-
case v1alpha4Version:
86-
fakeGRPCServer := &fakeV1alpha4GRPCServer{}
87-
drapb.RegisterDRAPluginServer(s, fakeGRPCServer)
81+
fakeGRPCServer := &fakeGRPCServer{}
82+
switch service {
83+
case drapbv1beta1.DRAPluginService:
84+
drapbv1beta1.RegisterDRAPluginServer(s, fakeGRPCServer)
85+
case drapbv1alpha4.NodeService:
86+
drapbv1alpha4.RegisterNodeServer(s, fakeGRPCServer)
8887
default:
89-
return "", nil, fmt.Errorf("unsupported version: %s", version)
88+
return "", nil, fmt.Errorf("unsupported gRPC service: %s", service)
9089
}
9190

9291
go func() {
@@ -104,7 +103,8 @@ func setupFakeGRPCServer(version string) (string, tearDown, error) {
104103

105104
func TestGRPCConnIsReused(t *testing.T) {
106105
tCtx := ktesting.Init(t)
107-
addr, teardown, err := setupFakeGRPCServer(v1alpha4Version)
106+
service := drapbv1beta1.DRAPluginService
107+
addr, teardown, err := setupFakeGRPCServer(service)
108108
if err != nil {
109109
t.Fatal(err)
110110
}
@@ -119,10 +119,11 @@ func TestGRPCConnIsReused(t *testing.T) {
119119
name: pluginName,
120120
backgroundCtx: tCtx,
121121
endpoint: addr,
122+
chosenService: service,
122123
clientCallTimeout: defaultClientCallTimeout,
123124
}
124125

125-
conn, _, err := p.getOrCreateGRPCConn()
126+
conn, err := p.getOrCreateGRPCConn()
126127
defer func() {
127128
err := conn.Close()
128129
if err != nil {
@@ -148,8 +149,8 @@ func TestGRPCConnIsReused(t *testing.T) {
148149
return
149150
}
150151

151-
req := &drapb.NodePrepareResourcesRequest{
152-
Claims: []*drapb.Claim{
152+
req := &drapbv1beta1.NodePrepareResourcesRequest{
153+
Claims: []*drapbv1beta1.Claim{
153154
{
154155
Namespace: "dummy-namespace",
155156
UID: "dummy-uid",
@@ -233,21 +234,27 @@ func TestNewDRAPluginClient(t *testing.T) {
233234

234235
func TestNodeUnprepareResources(t *testing.T) {
235236
for _, test := range []struct {
236-
description string
237-
serverSetup func(string) (string, tearDown, error)
238-
serverVersion string
239-
request *drapb.NodeUnprepareResourcesRequest
237+
description string
238+
serverSetup func(string) (string, tearDown, error)
239+
service string
240+
request *drapbv1beta1.NodeUnprepareResourcesRequest
240241
}{
241242
{
242-
description: "server supports v1alpha4",
243-
serverSetup: setupFakeGRPCServer,
244-
serverVersion: v1alpha4Version,
245-
request: &drapb.NodeUnprepareResourcesRequest{},
243+
description: "server supports v1alpha4",
244+
serverSetup: setupFakeGRPCServer,
245+
service: drapbv1alpha4.NodeService,
246+
request: &drapbv1beta1.NodeUnprepareResourcesRequest{},
247+
},
248+
{
249+
description: "server supports v1beta1",
250+
serverSetup: setupFakeGRPCServer,
251+
service: drapbv1beta1.DRAPluginService,
252+
request: &drapbv1beta1.NodeUnprepareResourcesRequest{},
246253
},
247254
} {
248255
t.Run(test.description, func(t *testing.T) {
249256
tCtx := ktesting.Init(t)
250-
addr, teardown, err := setupFakeGRPCServer(test.serverVersion)
257+
addr, teardown, err := setupFakeGRPCServer(test.service)
251258
if err != nil {
252259
t.Fatal(err)
253260
}
@@ -258,10 +265,11 @@ func TestNodeUnprepareResources(t *testing.T) {
258265
name: pluginName,
259266
backgroundCtx: tCtx,
260267
endpoint: addr,
268+
chosenService: test.service,
261269
clientCallTimeout: defaultClientCallTimeout,
262270
}
263271

264-
conn, _, err := p.getOrCreateGRPCConn()
272+
conn, err := p.getOrCreateGRPCConn()
265273
defer func() {
266274
err := conn.Close()
267275
if err != nil {

0 commit comments

Comments
 (0)