Skip to content

Commit 1608578

Browse files
authored
Merge pull request kubernetes#75110 from bertinatto/fix_race_watcher
Send events before adding watchers in traversePluginDir
2 parents b5d9aa5 + f564557 commit 1608578

File tree

3 files changed

+53
-46
lines changed

3 files changed

+53
-46
lines changed

pkg/kubelet/util/pluginwatcher/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ go_test(
2929
embed = [":go_default_library"],
3030
deps = [
3131
"//pkg/kubelet/apis/pluginregistration/v1:go_default_library",
32+
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
3233
"//vendor/github.com/stretchr/testify/require:go_default_library",
3334
"//vendor/k8s.io/klog:go_default_library",
3435
],

pkg/kubelet/util/pluginwatcher/plugin_watcher.go

Lines changed: 33 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -102,31 +102,39 @@ func (w *Watcher) Start() error {
102102
}
103103
w.fsWatcher = fsWatcher
104104

105+
// Traverse plugin dir and add filesystem watchers before starting the plugin processing goroutine.
106+
if err := w.traversePluginDir(w.path); err != nil {
107+
w.Stop()
108+
return fmt.Errorf("failed to traverse plugin socket path %q, err: %v", w.path, err)
109+
}
110+
111+
// Traverse deprecated plugin dir, if specified.
112+
if len(w.deprecatedPath) != 0 {
113+
if err := w.traversePluginDir(w.deprecatedPath); err != nil {
114+
w.Stop()
115+
return fmt.Errorf("failed to traverse deprecated plugin socket path %q, err: %v", w.deprecatedPath, err)
116+
}
117+
}
118+
105119
w.wg.Add(1)
106120
go func(fsWatcher *fsnotify.Watcher) {
107121
defer w.wg.Done()
122+
108123
for {
109124
select {
110125
case event := <-fsWatcher.Events:
111126
//TODO: Handle errors by taking corrective measures
112-
113-
w.wg.Add(1)
114-
func() {
115-
defer w.wg.Done()
116-
117-
if event.Op&fsnotify.Create == fsnotify.Create {
118-
err := w.handleCreateEvent(event)
119-
if err != nil {
120-
klog.Errorf("error %v when handling create event: %s", err, event)
121-
}
122-
} else if event.Op&fsnotify.Remove == fsnotify.Remove {
123-
err := w.handleDeleteEvent(event)
124-
if err != nil {
125-
klog.Errorf("error %v when handling delete event: %s", err, event)
126-
}
127+
if event.Op&fsnotify.Create == fsnotify.Create {
128+
err := w.handleCreateEvent(event)
129+
if err != nil {
130+
klog.Errorf("error %v when handling create event: %s", err, event)
127131
}
128-
return
129-
}()
132+
} else if event.Op&fsnotify.Remove == fsnotify.Remove {
133+
err := w.handleDeleteEvent(event)
134+
if err != nil {
135+
klog.Errorf("error %v when handling delete event: %s", err, event)
136+
}
137+
}
130138
continue
131139
case err := <-fsWatcher.Errors:
132140
if err != nil {
@@ -139,20 +147,6 @@ func (w *Watcher) Start() error {
139147
}
140148
}(fsWatcher)
141149

142-
// Traverse plugin dir after starting the plugin processing goroutine
143-
if err := w.traversePluginDir(w.path); err != nil {
144-
w.Stop()
145-
return fmt.Errorf("failed to traverse plugin socket path %q, err: %v", w.path, err)
146-
}
147-
148-
// Traverse deprecated plugin dir, if specified.
149-
if len(w.deprecatedPath) != 0 {
150-
if err := w.traversePluginDir(w.deprecatedPath); err != nil {
151-
w.Stop()
152-
return fmt.Errorf("failed to traverse deprecated plugin socket path %q, err: %v", w.deprecatedPath, err)
153-
}
154-
}
155-
156150
return nil
157151
}
158152

@@ -211,14 +205,14 @@ func (w *Watcher) traversePluginDir(dir string) error {
211205
return fmt.Errorf("failed to watch %s, err: %v", path, err)
212206
}
213207
case mode&os.ModeSocket != 0:
214-
w.wg.Add(1)
215-
go func() {
216-
defer w.wg.Done()
217-
w.fsWatcher.Events <- fsnotify.Event{
218-
Name: path,
219-
Op: fsnotify.Create,
220-
}
221-
}()
208+
event := fsnotify.Event{
209+
Name: path,
210+
Op: fsnotify.Create,
211+
}
212+
//TODO: Handle errors by taking corrective measures
213+
if err := w.handleCreateEvent(event); err != nil {
214+
klog.Errorf("error %v when handling create event: %s", err, event)
215+
}
222216
default:
223217
klog.V(5).Infof("Ignoring file %s with mode %v", path, mode)
224218
}

pkg/kubelet/util/pluginwatcher/plugin_watcher_test.go

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727

2828
"github.com/stretchr/testify/require"
2929

30+
"k8s.io/apimachinery/pkg/util/wait"
3031
"k8s.io/klog"
3132
registerapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1"
3233
)
@@ -173,9 +174,6 @@ func TestPluginRegistrationAtKubeletStart(t *testing.T) {
173174
plugins[i] = p
174175
}
175176

176-
w := newWatcherWithHandler(t, hdlr, false /* testDeprecatedDir */)
177-
defer func() { require.NoError(t, w.Stop()) }()
178-
179177
var wg sync.WaitGroup
180178
for i := 0; i < len(plugins); i++ {
181179
wg.Add(1)
@@ -189,6 +187,9 @@ func TestPluginRegistrationAtKubeletStart(t *testing.T) {
189187
}(plugins[i])
190188
}
191189

190+
w := newWatcherWithHandler(t, hdlr, false /* testDeprecatedDir */)
191+
defer func() { require.NoError(t, w.Stop()) }()
192+
192193
c := make(chan struct{})
193194
go func() {
194195
defer close(c)
@@ -198,7 +199,7 @@ func TestPluginRegistrationAtKubeletStart(t *testing.T) {
198199
select {
199200
case <-c:
200201
return
201-
case <-time.After(2 * time.Second):
202+
case <-time.After(wait.ForeverTestTimeout):
202203
t.Fatalf("Timeout while waiting for the plugin registration status")
203204
}
204205
}
@@ -238,11 +239,22 @@ func TestPlugiRegistrationFailureWithUnsupportedVersionAtKubeletStart(t *testing
238239
hdlr := NewExampleHandler(supportedVersions, false /* permitDeprecatedDir */)
239240
hdlr.AddPluginName(pluginName)
240241

242+
c := make(chan struct{})
243+
go func() {
244+
defer close(c)
245+
require.True(t, waitForEvent(t, exampleEventValidate, hdlr.EventChan(p.pluginName)))
246+
require.False(t, waitForPluginRegistrationStatus(t, p.registrationStatus))
247+
}()
248+
241249
w := newWatcherWithHandler(t, hdlr, false /* testDeprecatedDir */)
242250
defer func() { require.NoError(t, w.Stop()) }()
243251

244-
require.True(t, waitForEvent(t, exampleEventValidate, hdlr.EventChan(p.pluginName)))
245-
require.False(t, waitForPluginRegistrationStatus(t, p.registrationStatus))
252+
select {
253+
case <-c:
254+
return
255+
case <-time.After(wait.ForeverTestTimeout):
256+
t.Fatalf("Timeout while waiting for the plugin registration status")
257+
}
246258
}
247259

248260
func waitForPluginRegistrationStatus(t *testing.T, statusChan chan registerapi.RegistrationStatus) bool {
@@ -259,7 +271,7 @@ func waitForEvent(t *testing.T, expected examplePluginEvent, eventChan chan exam
259271
select {
260272
case event := <-eventChan:
261273
return event == expected
262-
case <-time.After(2 * time.Second):
274+
case <-time.After(wait.ForeverTestTimeout):
263275
t.Fatalf("Timed out while waiting for registration status %v", expected)
264276
}
265277

0 commit comments

Comments
 (0)