Skip to content

Commit fd74333

Browse files
authored
Merge pull request kubernetes#93622 from knight42/test/plugin-register-timeout
Fix race condition in pluginWatcher
2 parents f55eac0 + de46e81 commit fd74333

File tree

2 files changed

+52
-38
lines changed

2 files changed

+52
-38
lines changed

pkg/kubelet/pluginmanager/plugin_manager_test.go

Lines changed: 41 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ import (
2020
"fmt"
2121
"io/ioutil"
2222
"os"
23+
"path/filepath"
24+
"reflect"
25+
"strconv"
2326
"sync"
2427
"testing"
2528
"time"
@@ -29,6 +32,7 @@ import (
2932
"k8s.io/apimachinery/pkg/util/wait"
3033
"k8s.io/client-go/tools/record"
3134
registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
35+
3236
"k8s.io/kubernetes/pkg/kubelet/config"
3337
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher"
3438
)
@@ -39,41 +43,41 @@ var (
3943
)
4044

4145
type fakePluginHandler struct {
42-
validatePluginCalled bool
43-
registerPluginCalled bool
44-
deregisterPluginCalled bool
46+
events []string
4547
sync.RWMutex
4648
}
4749

4850
func newFakePluginHandler() *fakePluginHandler {
49-
return &fakePluginHandler{
50-
validatePluginCalled: false,
51-
registerPluginCalled: false,
52-
deregisterPluginCalled: false,
53-
}
51+
return &fakePluginHandler{}
5452
}
5553

5654
// ValidatePlugin is a fake method
5755
func (f *fakePluginHandler) ValidatePlugin(pluginName string, endpoint string, versions []string) error {
5856
f.Lock()
5957
defer f.Unlock()
60-
f.validatePluginCalled = true
58+
f.events = append(f.events, "validate "+pluginName)
6159
return nil
6260
}
6361

6462
// RegisterPlugin is a fake method
6563
func (f *fakePluginHandler) RegisterPlugin(pluginName, endpoint string, versions []string) error {
6664
f.Lock()
6765
defer f.Unlock()
68-
f.registerPluginCalled = true
66+
f.events = append(f.events, "register "+pluginName)
6967
return nil
7068
}
7169

7270
// DeRegisterPlugin is a fake method
7371
func (f *fakePluginHandler) DeRegisterPlugin(pluginName string) {
7472
f.Lock()
7573
defer f.Unlock()
76-
f.deregisterPluginCalled = true
74+
f.events = append(f.events, "deregister "+pluginName)
75+
}
76+
77+
func (f *fakePluginHandler) Reset() {
78+
f.Lock()
79+
defer f.Unlock()
80+
f.events = nil
7781
}
7882

7983
func init() {
@@ -90,15 +94,17 @@ func cleanup(t *testing.T) {
9094
os.MkdirAll(socketDir, 0755)
9195
}
9296

93-
func waitForRegistration(t *testing.T, fakePluginHandler *fakePluginHandler) {
97+
func waitForRegistration(t *testing.T, fakePluginHandler *fakePluginHandler, pluginName string) {
98+
expected := []string{"validate " + pluginName, "register " + pluginName}
9499
err := retryWithExponentialBackOff(
95-
time.Duration(500*time.Millisecond),
100+
100*time.Millisecond,
96101
func() (bool, error) {
97102
fakePluginHandler.Lock()
98103
defer fakePluginHandler.Unlock()
99-
if fakePluginHandler.validatePluginCalled && fakePluginHandler.registerPluginCalled {
104+
if reflect.DeepEqual(fakePluginHandler.events, expected) {
100105
return true, nil
101106
}
107+
t.Logf("expected %#v, got %#v, will retry", expected, fakePluginHandler.events)
102108
return false, nil
103109
},
104110
)
@@ -134,19 +140,29 @@ func TestPluginRegistration(t *testing.T) {
134140
fakeHandler := newFakePluginHandler()
135141
pluginManager.AddHandler(registerapi.DevicePlugin, fakeHandler)
136142

137-
// Add a new plugin
138-
socketPath := fmt.Sprintf("%s/plugin.sock", socketDir)
139-
pluginName := "example-plugin"
140-
p := pluginwatcher.NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
141-
require.NoError(t, p.Serve("v1beta1", "v1beta2"))
142-
143-
// Verify that the plugin is registered
144-
waitForRegistration(t, fakeHandler)
143+
const maxDepth = 3
144+
// Make sure the plugin manager is aware of the socket in subdirectories
145+
for i := 0; i < maxDepth; i++ {
146+
fakeHandler.Reset()
147+
pluginDir := socketDir
148+
149+
for j := 0; j < i; j++ {
150+
pluginDir = filepath.Join(pluginDir, strconv.Itoa(j))
151+
}
152+
require.NoError(t, os.MkdirAll(pluginDir, os.ModePerm))
153+
socketPath := filepath.Join(pluginDir, fmt.Sprintf("plugin-%d.sock", i))
154+
155+
// Add a new plugin
156+
pluginName := fmt.Sprintf("example-plugin-%d", i)
157+
p := pluginwatcher.NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
158+
require.NoError(t, p.Serve("v1beta1", "v1beta2"))
159+
160+
// Verify that the plugin is registered
161+
waitForRegistration(t, fakeHandler, pluginName)
162+
}
145163
}
146164

147-
func newTestPluginManager(
148-
sockDir string) PluginManager {
149-
165+
func newTestPluginManager(sockDir string) PluginManager {
150166
pm := NewPluginManager(
151167
sockDir,
152168
&record.FakeRecorder{},

pkg/kubelet/pluginmanager/pluginwatcher/plugin_watcher.go

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"os"
2222
"runtime"
2323
"strings"
24-
"time"
2524

2625
"github.com/fsnotify/fsnotify"
2726
"k8s.io/klog/v2"
@@ -36,7 +35,6 @@ type Watcher struct {
3635
path string
3736
fs utilfs.Filesystem
3837
fsWatcher *fsnotify.Watcher
39-
stopped chan struct{}
4038
desiredStateOfWorld cache.DesiredStateOfWorld
4139
}
4240

@@ -53,8 +51,6 @@ func NewWatcher(sockDir string, desiredStateOfWorld cache.DesiredStateOfWorld) *
5351
func (w *Watcher) Start(stopCh <-chan struct{}) error {
5452
klog.V(2).Infof("Plugin Watcher Start at %s", w.path)
5553

56-
w.stopped = make(chan struct{})
57-
5854
// Creating the directory to be watched if it doesn't exist yet,
5955
// and walks through the directory to discover the existing plugins.
6056
if err := w.init(); err != nil {
@@ -73,7 +69,6 @@ func (w *Watcher) Start(stopCh <-chan struct{}) error {
7369
}
7470

7571
go func(fsWatcher *fsnotify.Watcher) {
76-
defer close(w.stopped)
7772
for {
7873
select {
7974
case event := <-fsWatcher.Events:
@@ -93,14 +88,6 @@ func (w *Watcher) Start(stopCh <-chan struct{}) error {
9388
}
9489
continue
9590
case <-stopCh:
96-
// In case of plugin watcher being stopped by plugin manager, stop
97-
// probing the creation/deletion of plugin sockets.
98-
// Also give all pending go routines a chance to complete
99-
select {
100-
case <-w.stopped:
101-
case <-time.After(11 * time.Second):
102-
klog.Errorf("timeout on stopping watcher")
103-
}
10491
w.fsWatcher.Close()
10592
return
10693
}
@@ -123,6 +110,12 @@ func (w *Watcher) init() error {
123110
// Walks through the plugin directory discover any existing plugin sockets.
124111
// Ignore all errors except root dir not being walkable
125112
func (w *Watcher) traversePluginDir(dir string) error {
113+
// watch the new dir
114+
err := w.fsWatcher.Add(dir)
115+
if err != nil {
116+
return fmt.Errorf("failed to watch %s, err: %v", w.path, err)
117+
}
118+
// traverse existing children in the dir
126119
return w.fs.Walk(dir, func(path string, info os.FileInfo, err error) error {
127120
if err != nil {
128121
if path == dir {
@@ -133,6 +126,11 @@ func (w *Watcher) traversePluginDir(dir string) error {
133126
return nil
134127
}
135128

129+
// do not call fsWatcher.Add twice on the root dir to avoid potential problems.
130+
if path == dir {
131+
return nil
132+
}
133+
136134
switch mode := info.Mode(); {
137135
case mode.IsDir():
138136
if err := w.fsWatcher.Add(path); err != nil {

0 commit comments

Comments
 (0)