Skip to content

Commit b9ea21a

Browse files
committed
nfd-worker: Watch features.d changes
Signed-off-by: Oleg Zhurakivskyy <[email protected]>
1 parent 04d835d commit b9ea21a

File tree

3 files changed

+111
-3
lines changed

3 files changed

+111
-3
lines changed

pkg/nfd-worker/nfd-worker.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ type nfdWorker struct {
121121
k8sClient k8sclient.Interface
122122
nfdClient nfdclient.Interface
123123
stop chan struct{} // channel for signaling stop
124+
sourceEvent chan string // channel for events from soures
124125
featureSources []source.FeatureSource
125126
labelSources []source.LabelSource
126127
ownerReference []metav1.OwnerReference
@@ -248,6 +249,36 @@ func (w *nfdWorker) runFeatureDiscovery() error {
248249
return nil
249250
}
250251

252+
// Run feature discovery.
253+
func (w *nfdWorker) runFeatureDiscoveryBySourceName(source string) error {
254+
discoveryStart := time.Now()
255+
for _, s := range w.featureSources {
256+
if s.Name() == source {
257+
currentSourceStart := time.Now()
258+
if err := s.Discover(); err != nil {
259+
klog.ErrorS(err, "feature discovery failed", "source", s.Name())
260+
}
261+
klog.V(3).InfoS("feature discovery completed", "featureSource", s.Name(), "duration", time.Since(currentSourceStart))
262+
}
263+
}
264+
265+
discoveryDuration := time.Since(discoveryStart)
266+
klog.V(2).InfoS("feature discovery of all sources completed", "duration", discoveryDuration)
267+
featureDiscoveryDuration.WithLabelValues(utils.NodeName()).Observe(discoveryDuration.Seconds())
268+
if w.config.Core.SleepInterval.Duration > 0 && discoveryDuration > w.config.Core.SleepInterval.Duration/2 {
269+
klog.InfoS("feature discovery sources took over half of sleep interval ", "duration", discoveryDuration, "sleepInterval", w.config.Core.SleepInterval.Duration)
270+
}
271+
// Get the set of feature labels.
272+
labels := createFeatureLabels(w.labelSources, w.config.Core.LabelWhiteList.Regexp)
273+
274+
// Update the node with the feature labels.
275+
if !w.config.Core.NoPublish {
276+
return w.advertiseFeatures(labels)
277+
}
278+
279+
return nil
280+
}
281+
251282
// Set owner ref
252283
func (w *nfdWorker) setOwnerReference() error {
253284
ownerReference := []metav1.OwnerReference{}
@@ -304,6 +335,12 @@ func (w *nfdWorker) Run() error {
304335
labelTrigger.Reset(w.config.Core.SleepInterval.Duration)
305336
defer labelTrigger.Stop()
306337

338+
w.sourceEvent = make(chan string)
339+
eventSources := source.GetAllEventSources()
340+
for _, s := range eventSources {
341+
s.SetNotifyChannel(w.sourceEvent)
342+
}
343+
307344
httpMux := http.NewServeMux()
308345

309346
// Register to metrics server
@@ -341,6 +378,12 @@ func (w *nfdWorker) Run() error {
341378
return err
342379
}
343380

381+
case sourceName := <-w.sourceEvent:
382+
err = w.runFeatureDiscoveryBySourceName(sourceName)
383+
if err != nil {
384+
return err
385+
}
386+
344387
case <-w.stop:
345388
klog.InfoS("shutting down nfd-worker")
346389
return nil

source/local/local.go

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626

2727
"k8s.io/klog/v2"
2828

29+
"github.com/fsnotify/fsnotify"
2930
nfdv1alpha1 "sigs.k8s.io/node-feature-discovery/api/nfd/v1alpha1"
3031
"sigs.k8s.io/node-feature-discovery/pkg/utils"
3132
"sigs.k8s.io/node-feature-discovery/source"
@@ -65,10 +66,11 @@ var (
6566
featureFilesDir = "/etc/kubernetes/node-feature-discovery/features.d/"
6667
)
6768

68-
// localSource implements the FeatureSource and LabelSource interfaces.
69+
// localSource implements the FeatureSource, LabelSource, EventSource interfaces.
6970
type localSource struct {
70-
features *nfdv1alpha1.Features
71-
config *Config
71+
features *nfdv1alpha1.Features
72+
config *Config
73+
fsWatcher *fsnotify.Watcher
7274
}
7375

7476
type Config struct {
@@ -87,6 +89,7 @@ var (
8789
_ source.FeatureSource = &src
8890
_ source.LabelSource = &src
8991
_ source.ConfigurableSource = &src
92+
_ source.EventSource = &src
9093
)
9194

9295
// Name method of the LabelSource interface
@@ -318,6 +321,49 @@ func getFileContent(fileName string) ([][]byte, error) {
318321
return lines, nil
319322
}
320323

324+
func (s *localSource) runNotifier(ch chan string) {
325+
for {
326+
select {
327+
case event := <-s.fsWatcher.Events:
328+
opAny := fsnotify.Create | fsnotify.Write | fsnotify.Remove | fsnotify.Rename | fsnotify.Chmod
329+
if event.Op&opAny != 0 {
330+
klog.V(2).InfoS("fsnotify event", "eventName", event.Name, "eventOp", event.Op)
331+
ch <- s.Name()
332+
}
333+
case err := <-s.fsWatcher.Errors:
334+
klog.ErrorS(err, "failed to to watch features.d changes")
335+
}
336+
time.Sleep(1 * time.Second)
337+
}
338+
}
339+
340+
// SetNotifyChannel method of the EventSource Interface
341+
func (s *localSource) SetNotifyChannel(ch chan string) error {
342+
info, err := os.Stat(featureFilesDir)
343+
if err != nil {
344+
if !os.IsNotExist(err) {
345+
return err
346+
}
347+
}
348+
349+
if info != nil && info.IsDir() {
350+
watcher, err := fsnotify.NewWatcher()
351+
if err != nil {
352+
return err
353+
}
354+
355+
err = watcher.Add(featureFilesDir)
356+
if err != nil {
357+
return fmt.Errorf("unable to access %v: %w", featureFilesDir, err)
358+
}
359+
s.fsWatcher = watcher
360+
}
361+
362+
go s.runNotifier(ch)
363+
364+
return nil
365+
}
366+
321367
func init() {
322368
source.Register(&src)
323369
}

source/source.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,14 @@ type SupplementalSource interface {
7777
DisableByDefault() bool
7878
}
7979

80+
// EventSource is an interface for a source that can send events
81+
type EventSource interface {
82+
Source
83+
84+
// SetNotifyChannel sets the channel
85+
SetNotifyChannel(chan string) error
86+
}
87+
8088
// FeatureLabelValue represents the value of one feature label
8189
type FeatureLabelValue interface{}
8290

@@ -155,6 +163,17 @@ func GetAllConfigurableSources() map[string]ConfigurableSource {
155163
return all
156164
}
157165

166+
// GetAllEventSources returns all registered event sources
167+
func GetAllEventSources() map[string]EventSource {
168+
all := make(map[string]EventSource)
169+
for k, v := range sources {
170+
if s, ok := v.(EventSource); ok {
171+
all[k] = s
172+
}
173+
}
174+
return all
175+
}
176+
158177
// GetAllFeatures returns a combined set of all features from all feature
159178
// sources.
160179
func GetAllFeatures() *nfdv1alpha1.Features {

0 commit comments

Comments
 (0)