diff --git a/pkg/nfd-worker/nfd-worker.go b/pkg/nfd-worker/nfd-worker.go index f54a901ec5..dc57bcf6cb 100644 --- a/pkg/nfd-worker/nfd-worker.go +++ b/pkg/nfd-worker/nfd-worker.go @@ -121,6 +121,7 @@ type nfdWorker struct { k8sClient k8sclient.Interface nfdClient nfdclient.Interface stop chan struct{} // channel for signaling stop + sourceEvent chan string // channel for events from soures featureSources []source.FeatureSource labelSources []source.LabelSource ownerReference []metav1.OwnerReference @@ -248,6 +249,36 @@ func (w *nfdWorker) runFeatureDiscovery() error { return nil } +// Run feature discovery. +func (w *nfdWorker) runFeatureDiscoveryBySourceName(source string) error { + discoveryStart := time.Now() + for _, s := range w.featureSources { + if s.Name() == source { + currentSourceStart := time.Now() + if err := s.Discover(); err != nil { + klog.ErrorS(err, "feature discovery failed", "source", s.Name()) + } + klog.V(3).InfoS("feature discovery completed", "featureSource", s.Name(), "duration", time.Since(currentSourceStart)) + } + } + + discoveryDuration := time.Since(discoveryStart) + klog.V(2).InfoS("feature discovery of all sources completed", "duration", discoveryDuration) + featureDiscoveryDuration.WithLabelValues(utils.NodeName()).Observe(discoveryDuration.Seconds()) + if w.config.Core.SleepInterval.Duration > 0 && discoveryDuration > w.config.Core.SleepInterval.Duration/2 { + klog.InfoS("feature discovery sources took over half of sleep interval ", "duration", discoveryDuration, "sleepInterval", w.config.Core.SleepInterval.Duration) + } + // Get the set of feature labels. + labels := createFeatureLabels(w.labelSources, w.config.Core.LabelWhiteList.Regexp) + + // Update the node with the feature labels. + if !w.config.Core.NoPublish { + return w.advertiseFeatures(labels) + } + + return nil +} + // Set owner ref func (w *nfdWorker) setOwnerReference() error { ownerReference := []metav1.OwnerReference{} @@ -304,6 +335,12 @@ func (w *nfdWorker) Run() error { labelTrigger.Reset(w.config.Core.SleepInterval.Duration) defer labelTrigger.Stop() + w.sourceEvent = make(chan string) + eventSources := source.GetAllEventSources() + for _, s := range eventSources { + s.SetNotifyChannel(w.sourceEvent) + } + httpMux := http.NewServeMux() // Register to metrics server @@ -341,6 +378,12 @@ func (w *nfdWorker) Run() error { return err } + case sourceName := <-w.sourceEvent: + err = w.runFeatureDiscoveryBySourceName(sourceName) + if err != nil { + return err + } + case <-w.stop: klog.InfoS("shutting down nfd-worker") return nil diff --git a/source/local/local.go b/source/local/local.go index ebad5a1cca..58a4744e84 100644 --- a/source/local/local.go +++ b/source/local/local.go @@ -26,6 +26,7 @@ import ( "k8s.io/klog/v2" + "github.com/fsnotify/fsnotify" nfdv1alpha1 "sigs.k8s.io/node-feature-discovery/api/nfd/v1alpha1" "sigs.k8s.io/node-feature-discovery/pkg/utils" "sigs.k8s.io/node-feature-discovery/source" @@ -65,10 +66,11 @@ var ( featureFilesDir = "/etc/kubernetes/node-feature-discovery/features.d/" ) -// localSource implements the FeatureSource and LabelSource interfaces. +// localSource implements the FeatureSource, LabelSource, EventSource interfaces. type localSource struct { - features *nfdv1alpha1.Features - config *Config + features *nfdv1alpha1.Features + config *Config + fsWatcher *fsnotify.Watcher } type Config struct { @@ -87,6 +89,7 @@ var ( _ source.FeatureSource = &src _ source.LabelSource = &src _ source.ConfigurableSource = &src + _ source.EventSource = &src ) // Name method of the LabelSource interface @@ -318,6 +321,49 @@ func getFileContent(fileName string) ([][]byte, error) { return lines, nil } +func (s *localSource) runNotifier(ch chan string) { + for { + select { + case event := <-s.fsWatcher.Events: + opAny := fsnotify.Create | fsnotify.Write | fsnotify.Remove | fsnotify.Rename | fsnotify.Chmod + if event.Op&opAny != 0 { + klog.V(2).InfoS("fsnotify event", "eventName", event.Name, "eventOp", event.Op) + ch <- s.Name() + } + case err := <-s.fsWatcher.Errors: + klog.ErrorS(err, "failed to to watch features.d changes") + } + time.Sleep(1 * time.Second) + } +} + +// SetNotifyChannel method of the EventSource Interface +func (s *localSource) SetNotifyChannel(ch chan string) error { + info, err := os.Stat(featureFilesDir) + if err != nil { + if !os.IsNotExist(err) { + return err + } + } + + if info != nil && info.IsDir() { + watcher, err := fsnotify.NewWatcher() + if err != nil { + return err + } + + err = watcher.Add(featureFilesDir) + if err != nil { + return fmt.Errorf("unable to access %v: %w", featureFilesDir, err) + } + s.fsWatcher = watcher + } + + go s.runNotifier(ch) + + return nil +} + func init() { source.Register(&src) } diff --git a/source/source.go b/source/source.go index 24b27b4ffd..8365f9851b 100644 --- a/source/source.go +++ b/source/source.go @@ -77,6 +77,14 @@ type SupplementalSource interface { DisableByDefault() bool } +// EventSource is an interface for a source that can send events +type EventSource interface { + Source + + // SetNotifyChannel sets the channel + SetNotifyChannel(chan string) error +} + // FeatureLabelValue represents the value of one feature label type FeatureLabelValue interface{} @@ -155,6 +163,17 @@ func GetAllConfigurableSources() map[string]ConfigurableSource { return all } +// GetAllEventSources returns all registered event sources +func GetAllEventSources() map[string]EventSource { + all := make(map[string]EventSource) + for k, v := range sources { + if s, ok := v.(EventSource); ok { + all[k] = s + } + } + return all +} + // GetAllFeatures returns a combined set of all features from all feature // sources. func GetAllFeatures() *nfdv1alpha1.Features {