Skip to content

Commit ef1be6d

Browse files
committed
nfd-worker: Watch features.d changes
Signed-off-by: Oleg Zhurakivskyy <[email protected]>
1 parent 04d835d commit ef1be6d

File tree

4 files changed

+136
-19
lines changed

4 files changed

+136
-19
lines changed

pkg/nfd-worker/nfd-worker-internal_test.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import (
2323
"testing"
2424
"time"
2525

26+
"golang.org/x/net/context"
27+
2628
. "github.com/smartystreets/goconvey/convey"
2729
"github.com/vektra/errors"
2830
fakeclient "k8s.io/client-go/kubernetes/fake"
@@ -102,7 +104,7 @@ func TestConfigParse(t *testing.T) {
102104
overrides := `{"core": {"labelSources": ["fake"],"noPublish": true},"sources": {"cpu": {"cpuid": {"attributeBlacklist": ["foo","bar"]}}}}`
103105

104106
Convey("and no core cmdline flags have been specified", func() {
105-
So(worker.configure("non-existing-file", overrides), ShouldBeNil)
107+
So(worker.configure(context.Background(), "non-existing-file", overrides), ShouldBeNil)
106108

107109
Convey("core overrides should be in effect", func() {
108110
So(worker.config.Core.LabelSources, ShouldResemble, []string{"fake"})
@@ -114,7 +116,7 @@ func TestConfigParse(t *testing.T) {
114116
worker.args = Args{Overrides: ConfigOverrideArgs{
115117
LabelSources: &utils.StringSliceVal{"cpu", "kernel", "pci"},
116118
FeatureSources: &utils.StringSliceVal{"cpu"}}}
117-
So(worker.configure("non-existing-file", overrides), ShouldBeNil)
119+
So(worker.configure(context.Background(), "non-existing-file", overrides), ShouldBeNil)
118120

119121
Convey("core cmdline flags should be in effect instead overrides", func() {
120122
So(worker.config.Core.LabelSources, ShouldResemble, []string{"cpu", "kernel", "pci"})
@@ -150,7 +152,7 @@ sources:
150152

151153
Convey("and a proper config file is specified", func() {
152154
worker.args = Args{Overrides: ConfigOverrideArgs{LabelSources: &utils.StringSliceVal{"cpu", "kernel", "pci"}}}
153-
So(worker.configure(f.Name(), ""), ShouldBeNil)
155+
So(worker.configure(context.Background(), f.Name(), ""), ShouldBeNil)
154156

155157
Convey("specified configuration should take effect", func() {
156158
// Verify core config
@@ -172,7 +174,7 @@ sources:
172174
Convey("and a proper config file and overrides are given", func() {
173175
worker.args = Args{Overrides: ConfigOverrideArgs{FeatureSources: &utils.StringSliceVal{"cpu"}}}
174176
overrides := `{"core": {"labelSources": ["fake"],"noPublish": true},"sources": {"pci": {"deviceClassWhitelist": ["03"]}}}`
175-
So(worker.configure(f.Name(), overrides), ShouldBeNil)
177+
So(worker.configure(context.Background(), f.Name(), overrides), ShouldBeNil)
176178

177179
Convey("overrides should take precedence over the config file", func() {
178180
// Verify core config
@@ -205,7 +207,7 @@ func TestNewNfdWorker(t *testing.T) {
205207
So(err, ShouldBeNil)
206208
})
207209
worker := w.(*nfdWorker)
208-
So(worker.configure("", ""), ShouldBeNil)
210+
So(worker.configure(context.Background(), "", ""), ShouldBeNil)
209211
Convey("all sources should be enabled and the whitelist regexp should be empty", func() {
210212
So(len(worker.featureSources), ShouldEqual, len(source.GetAllFeatureSources())-1)
211213
So(len(worker.labelSources), ShouldEqual, len(source.GetAllLabelSources())-1)
@@ -223,7 +225,7 @@ func TestNewNfdWorker(t *testing.T) {
223225
So(err, ShouldBeNil)
224226
})
225227
worker := w.(*nfdWorker)
226-
So(worker.configure("", ""), ShouldBeNil)
228+
So(worker.configure(context.Background(), "", ""), ShouldBeNil)
227229
Convey("proper sources should be enabled", func() {
228230
So(len(worker.featureSources), ShouldEqual, 1)
229231
So(worker.featureSources[0].Name(), ShouldEqual, "cpu")

pkg/nfd-worker/nfd-worker.go

Lines changed: 41 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,8 @@ type nfdWorker struct {
120120
kubernetesNamespace string
121121
k8sClient k8sclient.Interface
122122
nfdClient nfdclient.Interface
123-
stop chan struct{} // channel for signaling stop
123+
stop chan struct{} // channel for signaling stop
124+
sourceEvent chan *source.FeatureSource // channel for events from sources
124125
featureSources []source.FeatureSource
125126
labelSources []source.LabelSource
126127
ownerReference []metav1.OwnerReference
@@ -167,6 +168,7 @@ func NewNfdWorker(opts ...NfdWorkerOption) (NfdWorker, error) {
167168
config: &NFDConfig{},
168169
kubernetesNamespace: utils.GetKubernetesNamespace(),
169170
stop: make(chan struct{}),
171+
sourceEvent: make(chan *source.FeatureSource),
170172
}
171173

172174
for _, o := range opts {
@@ -220,6 +222,19 @@ func (i *infiniteTicker) Reset(d time.Duration) {
220222
}
221223
}
222224

225+
// Publish labels.
226+
func (w *nfdWorker) publishNodeFeatureObject() error {
227+
// Get the set of feature labels.
228+
labels := createFeatureLabels(w.labelSources, w.config.Core.LabelWhiteList.Regexp)
229+
230+
// Update the node with the feature labels.
231+
if !w.config.Core.NoPublish {
232+
return w.advertiseFeatures(labels)
233+
}
234+
235+
return nil
236+
}
237+
223238
// Run feature discovery.
224239
func (w *nfdWorker) runFeatureDiscovery() error {
225240
discoveryStart := time.Now()
@@ -237,12 +252,9 @@ func (w *nfdWorker) runFeatureDiscovery() error {
237252
if w.config.Core.SleepInterval.Duration > 0 && discoveryDuration > w.config.Core.SleepInterval.Duration/2 {
238253
klog.InfoS("feature discovery sources took over half of sleep interval ", "duration", discoveryDuration, "sleepInterval", w.config.Core.SleepInterval.Duration)
239254
}
240-
// Get the set of feature labels.
241-
labels := createFeatureLabels(w.labelSources, w.config.Core.LabelWhiteList.Regexp)
242255

243-
// Update the node with the feature labels.
244-
if !w.config.Core.NoPublish {
245-
return w.advertiseFeatures(labels)
256+
if err := w.publishNodeFeatureObject(); err != nil {
257+
return err
246258
}
247259

248260
return nil
@@ -293,8 +305,11 @@ func (w *nfdWorker) setOwnerReference() error {
293305
func (w *nfdWorker) Run() error {
294306
klog.InfoS("Node Feature Discovery Worker", "version", version.Get(), "nodeName", utils.NodeName(), "namespace", w.kubernetesNamespace)
295307

308+
ctx, cancel := context.WithCancel(context.Background())
309+
defer cancel()
310+
296311
// Read configuration file
297-
err := w.configure(w.configFilePath, w.args.Options)
312+
err := w.configure(ctx, w.configFilePath, w.args.Options)
298313
if err != nil {
299314
return err
300315
}
@@ -341,6 +356,15 @@ func (w *nfdWorker) Run() error {
341356
return err
342357
}
343358

359+
case s := <-w.sourceEvent:
360+
if err := (*s).Discover(); err != nil {
361+
klog.ErrorS(err, "feature discovery failed", "source", (*s).Name())
362+
break
363+
}
364+
if err = w.publishNodeFeatureObject(); err != nil {
365+
return err
366+
}
367+
344368
case <-w.stop:
345369
klog.InfoS("shutting down nfd-worker")
346370
return nil
@@ -361,7 +385,7 @@ func (c *coreConfig) sanitize() {
361385
}
362386
}
363387

364-
func (w *nfdWorker) configureCore(c coreConfig) error {
388+
func (w *nfdWorker) configureCore(ctx context.Context, c coreConfig) error {
365389
// Handle klog
366390
err := klogutils.MergeKlogConfiguration(w.args.Klog, c.Klog)
367391
if err != nil {
@@ -438,6 +462,13 @@ func (w *nfdWorker) configureCore(c coreConfig) error {
438462
return w.labelSources[i].Name() < w.labelSources[j].Name()
439463
})
440464

465+
eventSources := source.GetAllEventSources()
466+
for _, s := range eventSources {
467+
if err := s.SetNotifyChannel(ctx, w.sourceEvent); err != nil {
468+
klog.ErrorS(err, "failed to set notify channel for event source", "source", s.Name())
469+
}
470+
}
471+
441472
if klogV := klog.V(1); klogV.Enabled() {
442473
n := make([]string, len(w.featureSources))
443474
for i, s := range w.featureSources {
@@ -461,7 +492,7 @@ func (w *nfdWorker) configureCore(c coreConfig) error {
461492
}
462493

463494
// Parse configuration options
464-
func (w *nfdWorker) configure(filepath string, overrides string) error {
495+
func (w *nfdWorker) configure(ctx context.Context, filepath string, overrides string) error {
465496
// Create a new default config
466497
c := newDefaultConfig()
467498
confSources := source.GetAllConfigurableSources()
@@ -516,7 +547,7 @@ func (w *nfdWorker) configure(filepath string, overrides string) error {
516547

517548
w.config = c
518549

519-
if err := w.configureCore(c.Core); err != nil {
550+
if err := w.configureCore(ctx, c.Core); err != nil {
520551
return err
521552
}
522553

source/local/local.go

Lines changed: 64 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@ import (
2424
"strings"
2525
"time"
2626

27+
"golang.org/x/net/context"
2728
"k8s.io/klog/v2"
2829

30+
"github.com/fsnotify/fsnotify"
2931
nfdv1alpha1 "sigs.k8s.io/node-feature-discovery/api/nfd/v1alpha1"
3032
"sigs.k8s.io/node-feature-discovery/pkg/utils"
3133
"sigs.k8s.io/node-feature-discovery/source"
@@ -65,10 +67,11 @@ var (
6567
featureFilesDir = "/etc/kubernetes/node-feature-discovery/features.d/"
6668
)
6769

68-
// localSource implements the FeatureSource and LabelSource interfaces.
70+
// localSource implements the FeatureSource, LabelSource, EventSource interfaces.
6971
type localSource struct {
70-
features *nfdv1alpha1.Features
71-
config *Config
72+
features *nfdv1alpha1.Features
73+
config *Config
74+
fsWatcher *fsnotify.Watcher
7275
}
7376

7477
type Config struct {
@@ -87,6 +90,7 @@ var (
8790
_ source.FeatureSource = &src
8891
_ source.LabelSource = &src
8992
_ source.ConfigurableSource = &src
93+
_ source.EventSource = &src
9094
)
9195

9296
// Name method of the LabelSource interface
@@ -318,6 +322,63 @@ func getFileContent(fileName string) ([][]byte, error) {
318322
return lines, nil
319323
}
320324

325+
func (s *localSource) runNotifier(ctx context.Context, ch chan *source.FeatureSource) {
326+
rateLimit := time.NewTicker(time.Second)
327+
defer rateLimit.Stop()
328+
limit := false
329+
for {
330+
select {
331+
case event := <-s.fsWatcher.Events:
332+
opAny := fsnotify.Create | fsnotify.Write | fsnotify.Remove | fsnotify.Rename | fsnotify.Chmod
333+
if event.Op&opAny != 0 {
334+
klog.V(2).InfoS("fsnotify event", "eventName", event.Name, "eventOp", event.Op)
335+
if !limit {
336+
fs := source.FeatureSource(s)
337+
ch <- &fs
338+
limit = true
339+
}
340+
}
341+
case err := <-s.fsWatcher.Errors:
342+
klog.ErrorS(err, "failed to watch features.d changes")
343+
case <-rateLimit.C:
344+
limit = false
345+
case <-ctx.Done():
346+
return
347+
}
348+
}
349+
}
350+
351+
// SetNotifyChannel method of the EventSource Interface
352+
func (s *localSource) SetNotifyChannel(ctx context.Context, ch chan *source.FeatureSource) error {
353+
info, err := os.Stat(featureFilesDir)
354+
if err != nil {
355+
if !os.IsNotExist(err) {
356+
return err
357+
}
358+
}
359+
360+
if info != nil && info.IsDir() {
361+
if s.fsWatcher == nil {
362+
watcher, err := fsnotify.NewWatcher()
363+
if err != nil {
364+
return err
365+
}
366+
err = watcher.Add(featureFilesDir)
367+
if err != nil {
368+
errWatcher := watcher.Close()
369+
if errWatcher != nil {
370+
klog.ErrorS(errWatcher, "failed to close fsnotify watcher")
371+
}
372+
return fmt.Errorf("unable to access %v: %w", featureFilesDir, err)
373+
}
374+
s.fsWatcher = watcher
375+
}
376+
go s.runNotifier(ctx, ch)
377+
}
378+
379+
return nil
380+
}
381+
321382
func init() {
322383
source.Register(&src)
323384
}

source/source.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ package source
2121
import (
2222
"fmt"
2323

24+
"golang.org/x/net/context"
25+
2426
nfdv1alpha1 "sigs.k8s.io/node-feature-discovery/api/nfd/v1alpha1"
2527
)
2628

@@ -77,6 +79,16 @@ type SupplementalSource interface {
7779
DisableByDefault() bool
7880
}
7981

82+
// EventSource is an interface for a source that can send events
83+
type EventSource interface {
84+
FeatureSource
85+
86+
// SetNotifyChannel sets the notification channel used to send updates about feature changes.
87+
// The provided channel will receive a notification (a pointer to the FeatureSource) whenever
88+
// the source detects new or updated features, typically after a successful Discover operation.
89+
SetNotifyChannel(ctx context.Context, ch chan *FeatureSource) error
90+
}
91+
8092
// FeatureLabelValue represents the value of one feature label
8193
type FeatureLabelValue interface{}
8294

@@ -155,6 +167,17 @@ func GetAllConfigurableSources() map[string]ConfigurableSource {
155167
return all
156168
}
157169

170+
// GetAllEventSources returns all registered event sources
171+
func GetAllEventSources() map[string]EventSource {
172+
all := make(map[string]EventSource)
173+
for k, v := range sources {
174+
if s, ok := v.(EventSource); ok {
175+
all[k] = s
176+
}
177+
}
178+
return all
179+
}
180+
158181
// GetAllFeatures returns a combined set of all features from all feature
159182
// sources.
160183
func GetAllFeatures() *nfdv1alpha1.Features {

0 commit comments

Comments
 (0)