Skip to content

Commit 227c2e7

Browse files
authored
Merge pull request kubernetes#123720 from HirazawaUi/fix-slow-dra-test
kubelet: fix slow dra unit test
2 parents 20d0ab7 + 10b6319 commit 227c2e7

File tree

11 files changed

+51
-19
lines changed

11 files changed

+51
-19
lines changed

pkg/kubelet/cm/devicemanager/plugin/v1beta1/handler.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package v1beta1
1919
import (
2020
"fmt"
2121
"os"
22+
"time"
2223

2324
core "k8s.io/api/core/v1"
2425
"k8s.io/klog/v2"
@@ -37,7 +38,7 @@ func (s *server) GetPluginHandler() cache.PluginHandler {
3738
return s
3839
}
3940

40-
func (s *server) RegisterPlugin(pluginName string, endpoint string, versions []string) error {
41+
func (s *server) RegisterPlugin(pluginName string, endpoint string, versions []string, pluginClientTimeout *time.Duration) error {
4142
klog.V(2).InfoS("Registering plugin at endpoint", "plugin", pluginName, "endpoint", endpoint)
4243
return s.connectClient(pluginName, endpoint)
4344
}

pkg/kubelet/cm/dra/manager_test.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ type fakeDRAServerInfo struct {
8484
teardownFn tearDown
8585
}
8686

87-
func setupFakeDRADriverGRPCServer(shouldTimeout bool) (fakeDRAServerInfo, error) {
87+
func setupFakeDRADriverGRPCServer(shouldTimeout bool, pluginClientTimeout *time.Duration) (fakeDRAServerInfo, error) {
8888
socketDir, err := os.MkdirTemp("", "dra")
8989
if err != nil {
9090
return fakeDRAServerInfo{
@@ -117,7 +117,7 @@ func setupFakeDRADriverGRPCServer(shouldTimeout bool) (fakeDRAServerInfo, error)
117117
driverName: driverName,
118118
}
119119
if shouldTimeout {
120-
timeout := plugin.PluginClientTimeout + time.Second
120+
timeout := *pluginClientTimeout * 2
121121
fakeDRADriverGRPCServer.timeout = &timeout
122122
}
123123

@@ -758,14 +758,20 @@ func TestPrepareResources(t *testing.T) {
758758
}
759759
}
760760

761-
draServerInfo, err := setupFakeDRADriverGRPCServer(test.wantTimeout)
761+
var pluginClientTimeout *time.Duration
762+
if test.wantTimeout {
763+
timeout := time.Millisecond * 20
764+
pluginClientTimeout = &timeout
765+
}
766+
767+
draServerInfo, err := setupFakeDRADriverGRPCServer(test.wantTimeout, pluginClientTimeout)
762768
if err != nil {
763769
t.Fatal(err)
764770
}
765771
defer draServerInfo.teardownFn()
766772

767773
plg := plugin.NewRegistrationHandler(nil, getFakeNode)
768-
if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{"1.27"}); err != nil {
774+
if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{"1.27"}, pluginClientTimeout); err != nil {
769775
t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err)
770776
}
771777
defer plg.DeRegisterPlugin(test.driverName) // for sake of next tests
@@ -1058,14 +1064,20 @@ func TestUnprepareResources(t *testing.T) {
10581064
t.Fatalf("failed to create a new instance of the claimInfoCache, err: %v", err)
10591065
}
10601066

1061-
draServerInfo, err := setupFakeDRADriverGRPCServer(test.wantTimeout)
1067+
var pluginClientTimeout *time.Duration
1068+
if test.wantTimeout {
1069+
timeout := time.Millisecond * 20
1070+
pluginClientTimeout = &timeout
1071+
}
1072+
1073+
draServerInfo, err := setupFakeDRADriverGRPCServer(test.wantTimeout, pluginClientTimeout)
10621074
if err != nil {
10631075
t.Fatal(err)
10641076
}
10651077
defer draServerInfo.teardownFn()
10661078

10671079
plg := plugin.NewRegistrationHandler(nil, getFakeNode)
1068-
if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{"1.27"}); err != nil {
1080+
if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{"1.27"}, pluginClientTimeout); err != nil {
10691081
t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err)
10701082
}
10711083
defer plg.DeRegisterPlugin(test.driverName) // for sake of next tests

pkg/kubelet/cm/dra/plugin/client.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ func (p *plugin) NodePrepareResources(
154154
return nil, err
155155
}
156156

157-
ctx, cancel := context.WithTimeout(ctx, PluginClientTimeout)
157+
ctx, cancel := context.WithTimeout(ctx, p.clientTimeout)
158158
defer cancel()
159159

160160
version := p.getVersion()
@@ -183,7 +183,7 @@ func (p *plugin) NodeUnprepareResources(
183183
return nil, err
184184
}
185185

186-
ctx, cancel := context.WithTimeout(ctx, PluginClientTimeout)
186+
ctx, cancel := context.WithTimeout(ctx, p.clientTimeout)
187187
defer cancel()
188188

189189
version := p.getVersion()

pkg/kubelet/cm/dra/plugin/client_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -268,8 +268,9 @@ func TestNodeUnprepareResource(t *testing.T) {
268268
defer teardown()
269269

270270
p := &plugin{
271-
endpoint: addr,
272-
version: v1alpha3Version,
271+
endpoint: addr,
272+
version: v1alpha3Version,
273+
clientTimeout: PluginClientTimeout,
273274
}
274275

275276
conn, err := p.getOrCreateGRPCConn()

pkg/kubelet/cm/dra/plugin/plugin.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ type plugin struct {
4949
endpoint string
5050
version string
5151
highestSupportedVersion *utilversion.Version
52+
clientTimeout time.Duration
5253
}
5354

5455
func (p *plugin) getOrCreateGRPCConn() (*grpc.ClientConn, error) {
@@ -116,19 +117,27 @@ func NewRegistrationHandler(kubeClient kubernetes.Interface, getNode func() (*v1
116117
}
117118

118119
// RegisterPlugin is called when a plugin can be registered.
119-
func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string, versions []string) error {
120+
func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string, versions []string, pluginClientTimeout *time.Duration) error {
120121
klog.InfoS("Register new DRA plugin", "name", pluginName, "endpoint", endpoint)
121122

122123
highestSupportedVersion, err := h.validateVersions("RegisterPlugin", pluginName, versions)
123124
if err != nil {
124125
return err
125126
}
126127

128+
var timeout time.Duration
129+
if pluginClientTimeout == nil {
130+
timeout = PluginClientTimeout
131+
} else {
132+
timeout = *pluginClientTimeout
133+
}
134+
127135
pluginInstance := &plugin{
128136
conn: nil,
129137
endpoint: endpoint,
130138
version: v1alpha3Version,
131139
highestSupportedVersion: highestSupportedVersion,
140+
clientTimeout: timeout,
132141
}
133142

134143
// Storing endpoint of newly registered DRA Plugin into the map, where plugin name will be the key

pkg/kubelet/cm/dra/plugin/plugin_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func TestRegistrationHandler_ValidatePlugin(t *testing.T) {
5656
description: "plugin already registered with a higher supported version",
5757
handler: func() *RegistrationHandler {
5858
handler := newRegistrationHandler()
59-
if err := handler.RegisterPlugin("this-plugin-already-exists-and-has-a-long-name-so-it-doesnt-collide", "", []string{"v1.1.0"}); err != nil {
59+
if err := handler.RegisterPlugin("this-plugin-already-exists-and-has-a-long-name-so-it-doesnt-collide", "", []string{"v1.1.0"}, nil); err != nil {
6060
t.Fatal(err)
6161
}
6262
return handler

pkg/kubelet/pluginmanager/cache/types.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ limitations under the License.
1616

1717
package cache
1818

19+
import "time"
20+
1921
// PluginHandler is an interface a client of the pluginwatcher API needs to implement in
2022
// order to consume plugins
2123
// The PluginHandler follows the simple following state machine:
@@ -51,7 +53,7 @@ type PluginHandler interface {
5153
// RegisterPlugin is called so that the plugin can be registered by any
5254
// plugin consumer
5355
// Error encountered here can still be Notified to the plugin.
54-
RegisterPlugin(pluginName, endpoint string, versions []string) error
56+
RegisterPlugin(pluginName, endpoint string, versions []string, pluginClientTimeout *time.Duration) error
5557
// DeRegisterPlugin is called once the pluginwatcher observes that the socket has
5658
// been deleted.
5759
DeRegisterPlugin(pluginName string)

pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ func (og *operationGenerator) GenerateRegisterPluginFunc(
121121
if err != nil {
122122
klog.ErrorS(err, "RegisterPlugin error -- failed to add plugin", "path", socketPath)
123123
}
124-
if err := handler.RegisterPlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions); err != nil {
124+
if err := handler.RegisterPlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions, nil); err != nil {
125125
return og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- plugin registration failed with err: %v", err))
126126
}
127127

pkg/kubelet/pluginmanager/plugin_manager_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func (f *fakePluginHandler) ValidatePlugin(pluginName string, endpoint string, v
5959
}
6060

6161
// RegisterPlugin is a fake method
62-
func (f *fakePluginHandler) RegisterPlugin(pluginName, endpoint string, versions []string) error {
62+
func (f *fakePluginHandler) RegisterPlugin(pluginName, endpoint string, versions []string, pluginClientTimeout *time.Duration) error {
6363
f.Lock()
6464
defer f.Unlock()
6565
f.events = append(f.events, "register "+pluginName)

pkg/kubelet/pluginmanager/reconciler/reconciler_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ func (d *DummyImpl) ValidatePlugin(pluginName string, endpoint string, versions
127127
}
128128

129129
// RegisterPlugin is a dummy implementation
130-
func (d *DummyImpl) RegisterPlugin(pluginName string, endpoint string, versions []string) error {
130+
func (d *DummyImpl) RegisterPlugin(pluginName string, endpoint string, versions []string, pluginClientTimeout *time.Duration) error {
131131
return nil
132132
}
133133

0 commit comments

Comments
 (0)