Skip to content

Commit f2428d6

Browse files
authored
Merge pull request kubernetes#125163 from pohly/dra-kubelet-api-version-independent-no-rest-proxy
DRA: make kubelet independent of the resource.k8s.io API version
2 parents 5fc7032 + a7396fd commit f2428d6

File tree

25 files changed

+1660
-1618
lines changed

25 files changed

+1660
-1618
lines changed

pkg/kubelet/cm/dra/manager.go

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import (
3030
clientset "k8s.io/client-go/kubernetes"
3131
"k8s.io/dynamic-resource-allocation/resourceclaim"
3232
"k8s.io/klog/v2"
33-
drapb "k8s.io/kubelet/pkg/apis/dra/v1alpha3"
33+
drapb "k8s.io/kubelet/pkg/apis/dra/v1alpha4"
3434
dra "k8s.io/kubernetes/pkg/kubelet/cm/dra/plugin"
3535
"k8s.io/kubernetes/pkg/kubelet/config"
3636
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
@@ -224,13 +224,9 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error {
224224
// Loop through all plugins and prepare for calling NodePrepareResources.
225225
for _, resourceHandle := range claimInfo.ResourceHandles {
226226
claim := &drapb.Claim{
227-
Namespace: claimInfo.Namespace,
228-
Uid: string(claimInfo.ClaimUID),
229-
Name: claimInfo.ClaimName,
230-
ResourceHandle: resourceHandle.Data,
231-
}
232-
if resourceHandle.StructuredData != nil {
233-
claim.StructuredResourceHandle = []*resourceapi.StructuredResourceHandle{resourceHandle.StructuredData}
227+
Namespace: claimInfo.Namespace,
228+
Uid: string(claimInfo.ClaimUID),
229+
Name: claimInfo.ClaimName,
234230
}
235231
pluginName := resourceHandle.DriverName
236232
batches[pluginName] = append(batches[pluginName], claim)
@@ -455,13 +451,9 @@ func (m *ManagerImpl) unprepareResources(podUID types.UID, namespace string, cla
455451
// Loop through all plugins and prepare for calling NodeUnprepareResources.
456452
for _, resourceHandle := range claimInfo.ResourceHandles {
457453
claim := &drapb.Claim{
458-
Namespace: claimInfo.Namespace,
459-
Uid: string(claimInfo.ClaimUID),
460-
Name: claimInfo.ClaimName,
461-
ResourceHandle: resourceHandle.Data,
462-
}
463-
if resourceHandle.StructuredData != nil {
464-
claim.StructuredResourceHandle = []*resourceapi.StructuredResourceHandle{resourceHandle.StructuredData}
454+
Namespace: claimInfo.Namespace,
455+
Uid: string(claimInfo.ClaimUID),
456+
Name: claimInfo.ClaimName,
465457
}
466458
pluginName := resourceHandle.DriverName
467459
batches[pluginName] = append(batches[pluginName], claim)

pkg/kubelet/cm/dra/manager_test.go

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import (
3636
"k8s.io/apimachinery/pkg/util/sets"
3737
"k8s.io/client-go/kubernetes/fake"
3838
"k8s.io/dynamic-resource-allocation/resourceclaim"
39-
drapbv1 "k8s.io/kubelet/pkg/apis/dra/v1alpha3"
39+
drapb "k8s.io/kubelet/pkg/apis/dra/v1alpha4"
4040
"k8s.io/kubernetes/pkg/kubelet/cm/dra/plugin"
4141
"k8s.io/kubernetes/pkg/kubelet/cm/dra/state"
4242
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
@@ -48,16 +48,16 @@ const (
4848
)
4949

5050
type fakeDRADriverGRPCServer struct {
51-
drapbv1.UnimplementedNodeServer
51+
drapb.UnimplementedNodeServer
5252
driverName string
5353
timeout *time.Duration
5454
prepareResourceCalls atomic.Uint32
5555
unprepareResourceCalls atomic.Uint32
56-
prepareResourcesResponse *drapbv1.NodePrepareResourcesResponse
57-
unprepareResourcesResponse *drapbv1.NodeUnprepareResourcesResponse
56+
prepareResourcesResponse *drapb.NodePrepareResourcesResponse
57+
unprepareResourcesResponse *drapb.NodeUnprepareResourcesResponse
5858
}
5959

60-
func (s *fakeDRADriverGRPCServer) NodePrepareResources(ctx context.Context, req *drapbv1.NodePrepareResourcesRequest) (*drapbv1.NodePrepareResourcesResponse, error) {
60+
func (s *fakeDRADriverGRPCServer) NodePrepareResources(ctx context.Context, req *drapb.NodePrepareResourcesRequest) (*drapb.NodePrepareResourcesResponse, error) {
6161
s.prepareResourceCalls.Add(1)
6262

6363
if s.timeout != nil {
@@ -67,8 +67,8 @@ func (s *fakeDRADriverGRPCServer) NodePrepareResources(ctx context.Context, req
6767
if s.prepareResourcesResponse == nil {
6868
deviceName := "claim-" + req.Claims[0].Uid
6969
result := s.driverName + "/" + driverClassName + "=" + deviceName
70-
return &drapbv1.NodePrepareResourcesResponse{
71-
Claims: map[string]*drapbv1.NodePrepareResourceResponse{
70+
return &drapb.NodePrepareResourcesResponse{
71+
Claims: map[string]*drapb.NodePrepareResourceResponse{
7272
req.Claims[0].Uid: {
7373
CDIDevices: []string{result},
7474
},
@@ -79,16 +79,16 @@ func (s *fakeDRADriverGRPCServer) NodePrepareResources(ctx context.Context, req
7979
return s.prepareResourcesResponse, nil
8080
}
8181

82-
func (s *fakeDRADriverGRPCServer) NodeUnprepareResources(ctx context.Context, req *drapbv1.NodeUnprepareResourcesRequest) (*drapbv1.NodeUnprepareResourcesResponse, error) {
82+
func (s *fakeDRADriverGRPCServer) NodeUnprepareResources(ctx context.Context, req *drapb.NodeUnprepareResourcesRequest) (*drapb.NodeUnprepareResourcesResponse, error) {
8383
s.unprepareResourceCalls.Add(1)
8484

8585
if s.timeout != nil {
8686
time.Sleep(*s.timeout)
8787
}
8888

8989
if s.unprepareResourcesResponse == nil {
90-
return &drapbv1.NodeUnprepareResourcesResponse{
91-
Claims: map[string]*drapbv1.NodeUnprepareResourceResponse{
90+
return &drapb.NodeUnprepareResourcesResponse{
91+
Claims: map[string]*drapb.NodeUnprepareResourceResponse{
9292
req.Claims[0].Uid: {},
9393
},
9494
}, nil
@@ -108,7 +108,7 @@ type fakeDRAServerInfo struct {
108108
teardownFn tearDown
109109
}
110110

111-
func setupFakeDRADriverGRPCServer(shouldTimeout bool, pluginClientTimeout *time.Duration, prepareResourcesResponse *drapbv1.NodePrepareResourcesResponse, unprepareResourcesResponse *drapbv1.NodeUnprepareResourcesResponse) (fakeDRAServerInfo, error) {
111+
func setupFakeDRADriverGRPCServer(shouldTimeout bool, pluginClientTimeout *time.Duration, prepareResourcesResponse *drapb.NodePrepareResourcesResponse, unprepareResourcesResponse *drapb.NodeUnprepareResourcesResponse) (fakeDRAServerInfo, error) {
112112
socketDir, err := os.MkdirTemp("", "dra")
113113
if err != nil {
114114
return fakeDRAServerInfo{
@@ -147,7 +147,7 @@ func setupFakeDRADriverGRPCServer(shouldTimeout bool, pluginClientTimeout *time.
147147
fakeDRADriverGRPCServer.timeout = &timeout
148148
}
149149

150-
drapbv1.RegisterNodeServer(s, fakeDRADriverGRPCServer)
150+
drapb.RegisterNodeServer(s, fakeDRADriverGRPCServer)
151151

152152
go func() {
153153
go s.Serve(l)
@@ -345,7 +345,7 @@ func TestPrepareResources(t *testing.T) {
345345
pod *v1.Pod
346346
claimInfo *ClaimInfo
347347
resourceClaim *resourcev1alpha2.ResourceClaim
348-
resp *drapbv1.NodePrepareResourcesResponse
348+
resp *drapb.NodePrepareResourcesResponse
349349
wantErr bool
350350
wantTimeout bool
351351
wantResourceSkipped bool
@@ -484,7 +484,7 @@ func TestPrepareResources(t *testing.T) {
484484
},
485485
},
486486
},
487-
resp: &drapbv1.NodePrepareResourcesResponse{Claims: map[string]*drapbv1.NodePrepareResourceResponse{"test-reserved": nil}},
487+
resp: &drapb.NodePrepareResourcesResponse{Claims: map[string]*drapb.NodePrepareResourceResponse{"test-reserved": nil}},
488488
expectedCDIDevices: []string{},
489489
ExpectedPrepareCalls: 1,
490490
},
@@ -541,7 +541,7 @@ func TestPrepareResources(t *testing.T) {
541541
},
542542
},
543543
},
544-
resp: &drapbv1.NodePrepareResourcesResponse{Claims: map[string]*drapbv1.NodePrepareResourceResponse{"test-reserved": nil}},
544+
resp: &drapb.NodePrepareResourcesResponse{Claims: map[string]*drapb.NodePrepareResourceResponse{"test-reserved": nil}},
545545
expectedCDIDevices: []string{},
546546
ExpectedPrepareCalls: 1,
547547
},
@@ -748,8 +748,8 @@ func TestPrepareResources(t *testing.T) {
748748
},
749749
},
750750
},
751-
resp: &drapbv1.NodePrepareResourcesResponse{
752-
Claims: map[string]*drapbv1.NodePrepareResourceResponse{
751+
resp: &drapb.NodePrepareResourcesResponse{
752+
Claims: map[string]*drapb.NodePrepareResourceResponse{
753753
"test-reserved": {CDIDevices: []string{fmt.Sprintf("%s/%s=claim-test-reserved", driverName, driverClassName)}},
754754
},
755755
},
@@ -810,8 +810,8 @@ func TestPrepareResources(t *testing.T) {
810810
},
811811
},
812812
},
813-
resp: &drapbv1.NodePrepareResourcesResponse{
814-
Claims: map[string]*drapbv1.NodePrepareResourceResponse{
813+
resp: &drapb.NodePrepareResourcesResponse{
814+
Claims: map[string]*drapb.NodePrepareResourceResponse{
815815
"test-reserved": {CDIDevices: []string{fmt.Sprintf("%s/%s=claim-test-reserved", driverName, driverClassName)}},
816816
},
817817
},
@@ -884,8 +884,8 @@ func TestPrepareResources(t *testing.T) {
884884
},
885885
},
886886
},
887-
resp: &drapbv1.NodePrepareResourcesResponse{
888-
Claims: map[string]*drapbv1.NodePrepareResourceResponse{
887+
resp: &drapb.NodePrepareResourcesResponse{
888+
Claims: map[string]*drapb.NodePrepareResourceResponse{
889889
"test-reserved": {CDIDevices: []string{fmt.Sprintf("%s/%s=claim-test-reserved", driverName, driverClassName)}},
890890
},
891891
},
@@ -977,7 +977,7 @@ func TestUnprepareResources(t *testing.T) {
977977
driverName string
978978
pod *v1.Pod
979979
claimInfo *ClaimInfo
980-
resp *drapbv1.NodeUnprepareResourcesResponse
980+
resp *drapb.NodeUnprepareResourcesResponse
981981
wantErr bool
982982
wantTimeout bool
983983
wantResourceSkipped bool
@@ -1117,7 +1117,7 @@ func TestUnprepareResources(t *testing.T) {
11171117
},
11181118
},
11191119
},
1120-
resp: &drapbv1.NodeUnprepareResourcesResponse{Claims: map[string]*drapbv1.NodeUnprepareResourceResponse{"test-reserved": {}}},
1120+
resp: &drapb.NodeUnprepareResourcesResponse{Claims: map[string]*drapb.NodeUnprepareResourceResponse{"test-reserved": {}}},
11211121
wantErr: true,
11221122
wantTimeout: true,
11231123
expectedUnprepareCalls: 1,
@@ -1168,7 +1168,7 @@ func TestUnprepareResources(t *testing.T) {
11681168
},
11691169
prepared: true,
11701170
},
1171-
resp: &drapbv1.NodeUnprepareResourcesResponse{Claims: map[string]*drapbv1.NodeUnprepareResourceResponse{"": {}}},
1171+
resp: &drapb.NodeUnprepareResourcesResponse{Claims: map[string]*drapb.NodeUnprepareResourceResponse{"": {}}},
11721172
expectedUnprepareCalls: 1,
11731173
},
11741174
{
@@ -1217,7 +1217,7 @@ func TestUnprepareResources(t *testing.T) {
12171217
},
12181218
prepared: false,
12191219
},
1220-
resp: &drapbv1.NodeUnprepareResourcesResponse{Claims: map[string]*drapbv1.NodeUnprepareResourceResponse{"": {}}},
1220+
resp: &drapb.NodeUnprepareResourcesResponse{Claims: map[string]*drapb.NodeUnprepareResourceResponse{"": {}}},
12211221
expectedUnprepareCalls: 1,
12221222
},
12231223
{
@@ -1267,7 +1267,7 @@ func TestUnprepareResources(t *testing.T) {
12671267
},
12681268
prepared: true,
12691269
},
1270-
resp: &drapbv1.NodeUnprepareResourcesResponse{Claims: map[string]*drapbv1.NodeUnprepareResourceResponse{"test-reserved": nil}},
1270+
resp: &drapb.NodeUnprepareResourcesResponse{Claims: map[string]*drapb.NodeUnprepareResourceResponse{"test-reserved": nil}},
12711271
expectedUnprepareCalls: 1,
12721272
},
12731273
} {

pkg/kubelet/cm/dra/plugin/client.go

Lines changed: 69 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,28 @@ package plugin
1818

1919
import (
2020
"context"
21+
"errors"
2122
"fmt"
23+
"net"
24+
"sync"
2225
"time"
2326

2427
"google.golang.org/grpc"
28+
"google.golang.org/grpc/connectivity"
29+
"google.golang.org/grpc/credentials/insecure"
2530

31+
utilversion "k8s.io/apimachinery/pkg/util/version"
2632
"k8s.io/klog/v2"
27-
drapb "k8s.io/kubelet/pkg/apis/dra/v1alpha3"
33+
drapb "k8s.io/kubelet/pkg/apis/dra/v1alpha4"
2834
)
2935

3036
const PluginClientTimeout = 45 * time.Second
3137

32-
func NewDRAPluginClient(pluginName string) (drapb.NodeClient, error) {
38+
// NewDRAPluginClient returns a wrapper around those gRPC methods of a DRA
39+
// driver kubelet plugin which need to be called by kubelet. The wrapper
40+
// handles gRPC connection management and logging. Connections are reused
41+
// across different NewDRAPluginClient calls.
42+
func NewDRAPluginClient(pluginName string) (*Plugin, error) {
3343
if pluginName == "" {
3444
return nil, fmt.Errorf("plugin name is empty")
3545
}
@@ -42,35 +52,64 @@ func NewDRAPluginClient(pluginName string) (drapb.NodeClient, error) {
4252
return existingPlugin, nil
4353
}
4454

45-
func (p *plugin) NodePrepareResources(
46-
ctx context.Context,
47-
req *drapb.NodePrepareResourcesRequest,
48-
opts ...grpc.CallOption,
49-
) (*drapb.NodePrepareResourcesResponse, error) {
55+
type Plugin struct {
56+
backgroundCtx context.Context
57+
cancel func(cause error)
58+
59+
mutex sync.Mutex
60+
conn *grpc.ClientConn
61+
endpoint string
62+
highestSupportedVersion *utilversion.Version
63+
clientTimeout time.Duration
64+
}
65+
66+
func (p *Plugin) getOrCreateGRPCConn() (*grpc.ClientConn, error) {
67+
p.mutex.Lock()
68+
defer p.mutex.Unlock()
69+
70+
if p.conn != nil {
71+
return p.conn, nil
72+
}
73+
74+
ctx := p.backgroundCtx
5075
logger := klog.FromContext(ctx)
51-
logger.V(4).Info(log("calling NodePrepareResources rpc"), "request", req)
5276

53-
conn, err := p.getOrCreateGRPCConn()
77+
network := "unix"
78+
logger.V(4).Info("Creating new gRPC connection", "protocol", network, "endpoint", p.endpoint)
79+
// grpc.Dial is deprecated. grpc.NewClient should be used instead.
80+
// For now this gets ignored because this function is meant to establish
81+
// the connection, with the one second timeout below. Perhaps that
82+
// approach should be reconsidered?
83+
//nolint:staticcheck
84+
conn, err := grpc.Dial(
85+
p.endpoint,
86+
grpc.WithTransportCredentials(insecure.NewCredentials()),
87+
grpc.WithContextDialer(func(ctx context.Context, target string) (net.Conn, error) {
88+
return (&net.Dialer{}).DialContext(ctx, network, target)
89+
}),
90+
)
5491
if err != nil {
5592
return nil, err
5693
}
5794

58-
ctx, cancel := context.WithTimeout(ctx, p.clientTimeout)
95+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
5996
defer cancel()
6097

61-
nodeClient := drapb.NewNodeClient(conn)
62-
response, err := nodeClient.NodePrepareResources(ctx, req)
63-
logger.V(4).Info(log("done calling NodePrepareResources rpc"), "response", response, "err", err)
64-
return response, err
98+
if ok := conn.WaitForStateChange(ctx, connectivity.Connecting); !ok {
99+
return nil, errors.New("timed out waiting for gRPC connection to be ready")
100+
}
101+
102+
p.conn = conn
103+
return p.conn, nil
65104
}
66105

67-
func (p *plugin) NodeUnprepareResources(
106+
func (p *Plugin) NodePrepareResources(
68107
ctx context.Context,
69-
req *drapb.NodeUnprepareResourcesRequest,
108+
req *drapb.NodePrepareResourcesRequest,
70109
opts ...grpc.CallOption,
71-
) (*drapb.NodeUnprepareResourcesResponse, error) {
110+
) (*drapb.NodePrepareResourcesResponse, error) {
72111
logger := klog.FromContext(ctx)
73-
logger.V(4).Info(log("calling NodeUnprepareResource rpc"), "request", req)
112+
logger.V(4).Info("Calling NodePrepareResources rpc", "request", req)
74113

75114
conn, err := p.getOrCreateGRPCConn()
76115
if err != nil {
@@ -81,24 +120,29 @@ func (p *plugin) NodeUnprepareResources(
81120
defer cancel()
82121

83122
nodeClient := drapb.NewNodeClient(conn)
84-
response, err := nodeClient.NodeUnprepareResources(ctx, req)
85-
logger.V(4).Info(log("done calling NodeUnprepareResources rpc"), "response", response, "err", err)
123+
response, err := nodeClient.NodePrepareResources(ctx, req)
124+
logger.V(4).Info("Done calling NodePrepareResources rpc", "response", response, "err", err)
86125
return response, err
87126
}
88127

89-
func (p *plugin) NodeListAndWatchResources(
128+
func (p *Plugin) NodeUnprepareResources(
90129
ctx context.Context,
91-
req *drapb.NodeListAndWatchResourcesRequest,
130+
req *drapb.NodeUnprepareResourcesRequest,
92131
opts ...grpc.CallOption,
93-
) (drapb.Node_NodeListAndWatchResourcesClient, error) {
132+
) (*drapb.NodeUnprepareResourcesResponse, error) {
94133
logger := klog.FromContext(ctx)
95-
logger.V(4).Info(log("calling NodeListAndWatchResources rpc"), "request", req)
134+
logger.V(4).Info("Calling NodeUnprepareResource rpc", "request", req)
96135

97136
conn, err := p.getOrCreateGRPCConn()
98137
if err != nil {
99138
return nil, err
100139
}
101140

141+
ctx, cancel := context.WithTimeout(ctx, p.clientTimeout)
142+
defer cancel()
143+
102144
nodeClient := drapb.NewNodeClient(conn)
103-
return nodeClient.NodeListAndWatchResources(ctx, req, opts...)
145+
response, err := nodeClient.NodeUnprepareResources(ctx, req)
146+
logger.V(4).Info("Done calling NodeUnprepareResources rpc", "response", response, "err", err)
147+
return response, err
104148
}

0 commit comments

Comments
 (0)