Skip to content

Commit 531a50c

Browse files
committed
simplify pluginwatcher closing
1 parent 4990c5e commit 531a50c

File tree

1 file changed

+10
-22
lines changed

1 file changed

+10
-22
lines changed

pkg/kubelet/util/pluginwatcher/plugin_watcher.go

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,10 @@ import (
3939
type Watcher struct {
4040
path string
4141
deprecatedPath string
42-
stopCh chan interface{}
42+
stopCh chan struct{}
43+
stopped chan struct{}
4344
fs utilfs.Filesystem
4445
fsWatcher *fsnotify.Watcher
45-
wg sync.WaitGroup
4646

4747
mutex sync.Mutex
4848
handlers map[string]PluginHandler
@@ -88,7 +88,8 @@ func (w *Watcher) getHandler(pluginType string) (PluginHandler, bool) {
8888
// Start watches for the creation of plugin sockets at the path
8989
func (w *Watcher) Start() error {
9090
klog.V(2).Infof("Plugin Watcher Start at %s", w.path)
91-
w.stopCh = make(chan interface{})
91+
w.stopCh = make(chan struct{})
92+
w.stopped = make(chan struct{})
9293

9394
// Creating the directory to be watched if it doesn't exist yet,
9495
// and walks through the directory to discover the existing plugins.
@@ -104,22 +105,20 @@ func (w *Watcher) Start() error {
104105

105106
// Traverse plugin dir and add filesystem watchers before starting the plugin processing goroutine.
106107
if err := w.traversePluginDir(w.path); err != nil {
107-
w.Stop()
108+
w.fsWatcher.Close()
108109
return fmt.Errorf("failed to traverse plugin socket path %q, err: %v", w.path, err)
109110
}
110111

111112
// Traverse deprecated plugin dir, if specified.
112113
if len(w.deprecatedPath) != 0 {
113114
if err := w.traversePluginDir(w.deprecatedPath); err != nil {
114-
w.Stop()
115+
w.fsWatcher.Close()
115116
return fmt.Errorf("failed to traverse deprecated plugin socket path %q, err: %v", w.deprecatedPath, err)
116117
}
117118
}
118119

119-
w.wg.Add(1)
120-
go func(fsWatcher *fsnotify.Watcher) {
121-
defer w.wg.Done()
122-
120+
go func() {
121+
defer close(w.stopped)
123122
for {
124123
select {
125124
case event := <-fsWatcher.Events:
@@ -135,17 +134,15 @@ func (w *Watcher) Start() error {
135134
klog.Errorf("error %v when handling delete event: %s", err, event)
136135
}
137136
}
138-
continue
139137
case err := <-fsWatcher.Errors:
140138
if err != nil {
141139
klog.Errorf("fsWatcher received error: %v", err)
142140
}
143-
continue
144141
case <-w.stopCh:
145142
return
146143
}
147144
}
148-
}(fsWatcher)
145+
}()
149146

150147
return nil
151148
}
@@ -154,18 +151,9 @@ func (w *Watcher) Start() error {
154151
func (w *Watcher) Stop() error {
155152
close(w.stopCh)
156153

157-
c := make(chan struct{})
158-
var once sync.Once
159-
closeFunc := func() { close(c) }
160-
go func() {
161-
defer once.Do(closeFunc)
162-
w.wg.Wait()
163-
}()
164-
165154
select {
166-
case <-c:
155+
case <-w.stopped:
167156
case <-time.After(11 * time.Second):
168-
once.Do(closeFunc)
169157
return fmt.Errorf("timeout on stopping watcher")
170158
}
171159

0 commit comments

Comments
 (0)