Skip to content

Commit caafb03

Browse files
committed
nfd-worker: Watch features.d changes
Signed-off-by: Oleg Zhurakivskyy <[email protected]>
1 parent 04d835d commit caafb03

File tree

4 files changed

+136
-19
lines changed

4 files changed

+136
-19
lines changed

pkg/nfd-worker/nfd-worker-internal_test.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package nfdworker
1818

1919
import (
20+
"context"
2021
"os"
2122
"regexp"
2223
"strings"
@@ -102,7 +103,7 @@ func TestConfigParse(t *testing.T) {
102103
overrides := `{"core": {"labelSources": ["fake"],"noPublish": true},"sources": {"cpu": {"cpuid": {"attributeBlacklist": ["foo","bar"]}}}}`
103104

104105
Convey("and no core cmdline flags have been specified", func() {
105-
So(worker.configure("non-existing-file", overrides), ShouldBeNil)
106+
So(worker.configure(context.Background(), "non-existing-file", overrides), ShouldBeNil)
106107

107108
Convey("core overrides should be in effect", func() {
108109
So(worker.config.Core.LabelSources, ShouldResemble, []string{"fake"})
@@ -114,7 +115,7 @@ func TestConfigParse(t *testing.T) {
114115
worker.args = Args{Overrides: ConfigOverrideArgs{
115116
LabelSources: &utils.StringSliceVal{"cpu", "kernel", "pci"},
116117
FeatureSources: &utils.StringSliceVal{"cpu"}}}
117-
So(worker.configure("non-existing-file", overrides), ShouldBeNil)
118+
So(worker.configure(context.Background(), "non-existing-file", overrides), ShouldBeNil)
118119

119120
Convey("core cmdline flags should be in effect instead overrides", func() {
120121
So(worker.config.Core.LabelSources, ShouldResemble, []string{"cpu", "kernel", "pci"})
@@ -150,7 +151,7 @@ sources:
150151

151152
Convey("and a proper config file is specified", func() {
152153
worker.args = Args{Overrides: ConfigOverrideArgs{LabelSources: &utils.StringSliceVal{"cpu", "kernel", "pci"}}}
153-
So(worker.configure(f.Name(), ""), ShouldBeNil)
154+
So(worker.configure(context.Background(), f.Name(), ""), ShouldBeNil)
154155

155156
Convey("specified configuration should take effect", func() {
156157
// Verify core config
@@ -172,7 +173,7 @@ sources:
172173
Convey("and a proper config file and overrides are given", func() {
173174
worker.args = Args{Overrides: ConfigOverrideArgs{FeatureSources: &utils.StringSliceVal{"cpu"}}}
174175
overrides := `{"core": {"labelSources": ["fake"],"noPublish": true},"sources": {"pci": {"deviceClassWhitelist": ["03"]}}}`
175-
So(worker.configure(f.Name(), overrides), ShouldBeNil)
176+
So(worker.configure(context.Background(), f.Name(), overrides), ShouldBeNil)
176177

177178
Convey("overrides should take precedence over the config file", func() {
178179
// Verify core config
@@ -205,7 +206,7 @@ func TestNewNfdWorker(t *testing.T) {
205206
So(err, ShouldBeNil)
206207
})
207208
worker := w.(*nfdWorker)
208-
So(worker.configure("", ""), ShouldBeNil)
209+
So(worker.configure(context.Background(), "", ""), ShouldBeNil)
209210
Convey("all sources should be enabled and the whitelist regexp should be empty", func() {
210211
So(len(worker.featureSources), ShouldEqual, len(source.GetAllFeatureSources())-1)
211212
So(len(worker.labelSources), ShouldEqual, len(source.GetAllLabelSources())-1)
@@ -223,7 +224,7 @@ func TestNewNfdWorker(t *testing.T) {
223224
So(err, ShouldBeNil)
224225
})
225226
worker := w.(*nfdWorker)
226-
So(worker.configure("", ""), ShouldBeNil)
227+
So(worker.configure(context.Background(), "", ""), ShouldBeNil)
227228
Convey("proper sources should be enabled", func() {
228229
So(len(worker.featureSources), ShouldEqual, 1)
229230
So(worker.featureSources[0].Name(), ShouldEqual, "cpu")

pkg/nfd-worker/nfd-worker.go

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,8 @@ type nfdWorker struct {
120120
kubernetesNamespace string
121121
k8sClient k8sclient.Interface
122122
nfdClient nfdclient.Interface
123-
stop chan struct{} // channel for signaling stop
123+
stop chan struct{} // channel for signaling stop
124+
sourceEvent chan *source.FeatureSource // channel for events from sources
124125
featureSources []source.FeatureSource
125126
labelSources []source.LabelSource
126127
ownerReference []metav1.OwnerReference
@@ -167,6 +168,7 @@ func NewNfdWorker(opts ...NfdWorkerOption) (NfdWorker, error) {
167168
config: &NFDConfig{},
168169
kubernetesNamespace: utils.GetKubernetesNamespace(),
169170
stop: make(chan struct{}),
171+
sourceEvent: make(chan *source.FeatureSource),
170172
}
171173

172174
for _, o := range opts {
@@ -220,6 +222,19 @@ func (i *infiniteTicker) Reset(d time.Duration) {
220222
}
221223
}
222224

225+
// Publish labels.
226+
func (w *nfdWorker) publishNodeFeatureObject() error {
227+
// Get the set of feature labels.
228+
labels := createFeatureLabels(w.labelSources, w.config.Core.LabelWhiteList.Regexp)
229+
230+
// Update the node with the feature labels.
231+
if !w.config.Core.NoPublish {
232+
return w.advertiseFeatures(labels)
233+
}
234+
235+
return nil
236+
}
237+
223238
// Run feature discovery.
224239
func (w *nfdWorker) runFeatureDiscovery() error {
225240
discoveryStart := time.Now()
@@ -237,12 +252,9 @@ func (w *nfdWorker) runFeatureDiscovery() error {
237252
if w.config.Core.SleepInterval.Duration > 0 && discoveryDuration > w.config.Core.SleepInterval.Duration/2 {
238253
klog.InfoS("feature discovery sources took over half of sleep interval ", "duration", discoveryDuration, "sleepInterval", w.config.Core.SleepInterval.Duration)
239254
}
240-
// Get the set of feature labels.
241-
labels := createFeatureLabels(w.labelSources, w.config.Core.LabelWhiteList.Regexp)
242255

243-
// Update the node with the feature labels.
244-
if !w.config.Core.NoPublish {
245-
return w.advertiseFeatures(labels)
256+
if err := w.publishNodeFeatureObject(); err != nil {
257+
return err
246258
}
247259

248260
return nil
@@ -293,8 +305,11 @@ func (w *nfdWorker) setOwnerReference() error {
293305
func (w *nfdWorker) Run() error {
294306
klog.InfoS("Node Feature Discovery Worker", "version", version.Get(), "nodeName", utils.NodeName(), "namespace", w.kubernetesNamespace)
295307

308+
ctx, cancel := context.WithCancel(context.Background())
309+
defer cancel()
310+
296311
// Read configuration file
297-
err := w.configure(w.configFilePath, w.args.Options)
312+
err := w.configure(ctx, w.configFilePath, w.args.Options)
298313
if err != nil {
299314
return err
300315
}
@@ -341,6 +356,15 @@ func (w *nfdWorker) Run() error {
341356
return err
342357
}
343358

359+
case s := <-w.sourceEvent:
360+
if err := (*s).Discover(); err != nil {
361+
klog.ErrorS(err, "feature discovery failed", "source", (*s).Name())
362+
break
363+
}
364+
if err = w.publishNodeFeatureObject(); err != nil {
365+
return err
366+
}
367+
344368
case <-w.stop:
345369
klog.InfoS("shutting down nfd-worker")
346370
return nil
@@ -361,7 +385,7 @@ func (c *coreConfig) sanitize() {
361385
}
362386
}
363387

364-
func (w *nfdWorker) configureCore(c coreConfig) error {
388+
func (w *nfdWorker) configureCore(ctx context.Context, c coreConfig) error {
365389
// Handle klog
366390
err := klogutils.MergeKlogConfiguration(w.args.Klog, c.Klog)
367391
if err != nil {
@@ -438,6 +462,15 @@ func (w *nfdWorker) configureCore(c coreConfig) error {
438462
return w.labelSources[i].Name() < w.labelSources[j].Name()
439463
})
440464

465+
eventSources := source.GetAllEventSources()
466+
for _, s := range eventSources {
467+
if ok := featureSources[s.Name()]; ok != nil {
468+
if err := s.SetNotifyChannel(ctx, w.sourceEvent); err != nil {
469+
klog.ErrorS(err, "failed to set notify channel for event source", "source", s.Name())
470+
}
471+
}
472+
}
473+
441474
if klogV := klog.V(1); klogV.Enabled() {
442475
n := make([]string, len(w.featureSources))
443476
for i, s := range w.featureSources {
@@ -461,7 +494,7 @@ func (w *nfdWorker) configureCore(c coreConfig) error {
461494
}
462495

463496
// Parse configuration options
464-
func (w *nfdWorker) configure(filepath string, overrides string) error {
497+
func (w *nfdWorker) configure(ctx context.Context, filepath string, overrides string) error {
465498
// Create a new default config
466499
c := newDefaultConfig()
467500
confSources := source.GetAllConfigurableSources()
@@ -516,7 +549,7 @@ func (w *nfdWorker) configure(filepath string, overrides string) error {
516549

517550
w.config = c
518551

519-
if err := w.configureCore(c.Core); err != nil {
552+
if err := w.configureCore(ctx, c.Core); err != nil {
520553
return err
521554
}
522555

source/local/local.go

Lines changed: 64 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package local
1818

1919
import (
2020
"bytes"
21+
"context"
2122
"fmt"
2223
"os"
2324
"path/filepath"
@@ -26,6 +27,7 @@ import (
2627

2728
"k8s.io/klog/v2"
2829

30+
"github.com/fsnotify/fsnotify"
2931
nfdv1alpha1 "sigs.k8s.io/node-feature-discovery/api/nfd/v1alpha1"
3032
"sigs.k8s.io/node-feature-discovery/pkg/utils"
3133
"sigs.k8s.io/node-feature-discovery/source"
@@ -65,10 +67,11 @@ var (
6567
featureFilesDir = "/etc/kubernetes/node-feature-discovery/features.d/"
6668
)
6769

68-
// localSource implements the FeatureSource and LabelSource interfaces.
70+
// localSource implements the FeatureSource, LabelSource, EventSource interfaces.
6971
type localSource struct {
70-
features *nfdv1alpha1.Features
71-
config *Config
72+
features *nfdv1alpha1.Features
73+
config *Config
74+
fsWatcher *fsnotify.Watcher
7275
}
7376

7477
type Config struct {
@@ -87,6 +90,7 @@ var (
8790
_ source.FeatureSource = &src
8891
_ source.LabelSource = &src
8992
_ source.ConfigurableSource = &src
93+
_ source.EventSource = &src
9094
)
9195

9296
// Name method of the LabelSource interface
@@ -318,6 +322,63 @@ func getFileContent(fileName string) ([][]byte, error) {
318322
return lines, nil
319323
}
320324

325+
func (s *localSource) runNotifier(ctx context.Context, ch chan *source.FeatureSource) {
326+
rateLimit := time.NewTicker(time.Second)
327+
defer rateLimit.Stop()
328+
limit := false
329+
for {
330+
select {
331+
case event := <-s.fsWatcher.Events:
332+
opAny := fsnotify.Create | fsnotify.Write | fsnotify.Remove | fsnotify.Rename | fsnotify.Chmod
333+
if event.Op&opAny != 0 {
334+
klog.V(2).InfoS("fsnotify event", "eventName", event.Name, "eventOp", event.Op)
335+
if !limit {
336+
fs := source.FeatureSource(s)
337+
ch <- &fs
338+
limit = true
339+
}
340+
}
341+
case err := <-s.fsWatcher.Errors:
342+
klog.ErrorS(err, "failed to watch features.d changes")
343+
case <-rateLimit.C:
344+
limit = false
345+
case <-ctx.Done():
346+
return
347+
}
348+
}
349+
}
350+
351+
// SetNotifyChannel method of the EventSource Interface
352+
func (s *localSource) SetNotifyChannel(ctx context.Context, ch chan *source.FeatureSource) error {
353+
info, err := os.Stat(featureFilesDir)
354+
if err != nil {
355+
if !os.IsNotExist(err) {
356+
return err
357+
}
358+
}
359+
360+
if info != nil && info.IsDir() {
361+
if s.fsWatcher == nil {
362+
watcher, err := fsnotify.NewWatcher()
363+
if err != nil {
364+
return err
365+
}
366+
err = watcher.Add(featureFilesDir)
367+
if err != nil {
368+
errWatcher := watcher.Close()
369+
if errWatcher != nil {
370+
klog.ErrorS(errWatcher, "failed to close fsnotify watcher")
371+
}
372+
return fmt.Errorf("unable to access %v: %w", featureFilesDir, err)
373+
}
374+
s.fsWatcher = watcher
375+
}
376+
go s.runNotifier(ctx, ch)
377+
}
378+
379+
return nil
380+
}
381+
321382
func init() {
322383
source.Register(&src)
323384
}

source/source.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package source
1919
//go:generate mockery --name=LabelSource --inpackage
2020

2121
import (
22+
"context"
2223
"fmt"
2324

2425
nfdv1alpha1 "sigs.k8s.io/node-feature-discovery/api/nfd/v1alpha1"
@@ -77,6 +78,16 @@ type SupplementalSource interface {
7778
DisableByDefault() bool
7879
}
7980

81+
// EventSource is an interface for a source that can send events
82+
type EventSource interface {
83+
FeatureSource
84+
85+
// SetNotifyChannel sets the notification channel used to send updates about feature changes.
86+
// The provided channel will receive a notification (a pointer to the FeatureSource) whenever
87+
// the source detects new or updated features, typically after a successful Discover operation.
88+
SetNotifyChannel(ctx context.Context, ch chan *FeatureSource) error
89+
}
90+
8091
// FeatureLabelValue represents the value of one feature label
8192
type FeatureLabelValue interface{}
8293

@@ -155,6 +166,17 @@ func GetAllConfigurableSources() map[string]ConfigurableSource {
155166
return all
156167
}
157168

169+
// GetAllEventSources returns all registered event sources
170+
func GetAllEventSources() map[string]EventSource {
171+
all := make(map[string]EventSource)
172+
for k, v := range sources {
173+
if s, ok := v.(EventSource); ok {
174+
all[k] = s
175+
}
176+
}
177+
return all
178+
}
179+
158180
// GetAllFeatures returns a combined set of all features from all feature
159181
// sources.
160182
func GetAllFeatures() *nfdv1alpha1.Features {

0 commit comments

Comments
 (0)