Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions pkg/nfd-worker/nfd-worker-internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"testing"
"time"

"golang.org/x/net/context"

. "github.com/smartystreets/goconvey/convey"
"github.com/vektra/errors"
fakeclient "k8s.io/client-go/kubernetes/fake"
Expand Down Expand Up @@ -102,7 +104,7 @@ func TestConfigParse(t *testing.T) {
overrides := `{"core": {"labelSources": ["fake"],"noPublish": true},"sources": {"cpu": {"cpuid": {"attributeBlacklist": ["foo","bar"]}}}}`

Convey("and no core cmdline flags have been specified", func() {
So(worker.configure("non-existing-file", overrides), ShouldBeNil)
So(worker.configure(context.Background(), "non-existing-file", overrides), ShouldBeNil)

Convey("core overrides should be in effect", func() {
So(worker.config.Core.LabelSources, ShouldResemble, []string{"fake"})
Expand All @@ -114,7 +116,7 @@ func TestConfigParse(t *testing.T) {
worker.args = Args{Overrides: ConfigOverrideArgs{
LabelSources: &utils.StringSliceVal{"cpu", "kernel", "pci"},
FeatureSources: &utils.StringSliceVal{"cpu"}}}
So(worker.configure("non-existing-file", overrides), ShouldBeNil)
So(worker.configure(context.Background(), "non-existing-file", overrides), ShouldBeNil)

Convey("core cmdline flags should be in effect instead overrides", func() {
So(worker.config.Core.LabelSources, ShouldResemble, []string{"cpu", "kernel", "pci"})
Expand Down Expand Up @@ -150,7 +152,7 @@ sources:

Convey("and a proper config file is specified", func() {
worker.args = Args{Overrides: ConfigOverrideArgs{LabelSources: &utils.StringSliceVal{"cpu", "kernel", "pci"}}}
So(worker.configure(f.Name(), ""), ShouldBeNil)
So(worker.configure(context.Background(), f.Name(), ""), ShouldBeNil)

Convey("specified configuration should take effect", func() {
// Verify core config
Expand All @@ -172,7 +174,7 @@ sources:
Convey("and a proper config file and overrides are given", func() {
worker.args = Args{Overrides: ConfigOverrideArgs{FeatureSources: &utils.StringSliceVal{"cpu"}}}
overrides := `{"core": {"labelSources": ["fake"],"noPublish": true},"sources": {"pci": {"deviceClassWhitelist": ["03"]}}}`
So(worker.configure(f.Name(), overrides), ShouldBeNil)
So(worker.configure(context.Background(), f.Name(), overrides), ShouldBeNil)

Convey("overrides should take precedence over the config file", func() {
// Verify core config
Expand Down Expand Up @@ -205,7 +207,7 @@ func TestNewNfdWorker(t *testing.T) {
So(err, ShouldBeNil)
})
worker := w.(*nfdWorker)
So(worker.configure("", ""), ShouldBeNil)
So(worker.configure(context.Background(), "", ""), ShouldBeNil)
Convey("all sources should be enabled and the whitelist regexp should be empty", func() {
So(len(worker.featureSources), ShouldEqual, len(source.GetAllFeatureSources())-1)
So(len(worker.labelSources), ShouldEqual, len(source.GetAllLabelSources())-1)
Expand All @@ -223,7 +225,7 @@ func TestNewNfdWorker(t *testing.T) {
So(err, ShouldBeNil)
})
worker := w.(*nfdWorker)
So(worker.configure("", ""), ShouldBeNil)
So(worker.configure(context.Background(), "", ""), ShouldBeNil)
Convey("proper sources should be enabled", func() {
So(len(worker.featureSources), ShouldEqual, 1)
So(worker.featureSources[0].Name(), ShouldEqual, "cpu")
Expand Down
51 changes: 41 additions & 10 deletions pkg/nfd-worker/nfd-worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ type nfdWorker struct {
kubernetesNamespace string
k8sClient k8sclient.Interface
nfdClient nfdclient.Interface
stop chan struct{} // channel for signaling stop
stop chan struct{} // channel for signaling stop
sourceEvent chan *source.FeatureSource // channel for events from sources
featureSources []source.FeatureSource
labelSources []source.LabelSource
ownerReference []metav1.OwnerReference
Expand Down Expand Up @@ -167,6 +168,7 @@ func NewNfdWorker(opts ...NfdWorkerOption) (NfdWorker, error) {
config: &NFDConfig{},
kubernetesNamespace: utils.GetKubernetesNamespace(),
stop: make(chan struct{}),
sourceEvent: make(chan *source.FeatureSource),
}

for _, o := range opts {
Expand Down Expand Up @@ -220,6 +222,19 @@ func (i *infiniteTicker) Reset(d time.Duration) {
}
}

// Publish labels.
func (w *nfdWorker) publishNodeFeatureObject() error {
// Get the set of feature labels.
labels := createFeatureLabels(w.labelSources, w.config.Core.LabelWhiteList.Regexp)

// Update the node with the feature labels.
if !w.config.Core.NoPublish {
return w.advertiseFeatures(labels)
}

return nil
}

// Run feature discovery.
func (w *nfdWorker) runFeatureDiscovery() error {
discoveryStart := time.Now()
Expand All @@ -237,12 +252,9 @@ func (w *nfdWorker) runFeatureDiscovery() error {
if w.config.Core.SleepInterval.Duration > 0 && discoveryDuration > w.config.Core.SleepInterval.Duration/2 {
klog.InfoS("feature discovery sources took over half of sleep interval ", "duration", discoveryDuration, "sleepInterval", w.config.Core.SleepInterval.Duration)
}
// Get the set of feature labels.
labels := createFeatureLabels(w.labelSources, w.config.Core.LabelWhiteList.Regexp)

// Update the node with the feature labels.
if !w.config.Core.NoPublish {
return w.advertiseFeatures(labels)
if err := w.publishNodeFeatureObject(); err != nil {
return err
}

return nil
Expand Down Expand Up @@ -293,8 +305,11 @@ func (w *nfdWorker) setOwnerReference() error {
func (w *nfdWorker) Run() error {
klog.InfoS("Node Feature Discovery Worker", "version", version.Get(), "nodeName", utils.NodeName(), "namespace", w.kubernetesNamespace)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Read configuration file
err := w.configure(w.configFilePath, w.args.Options)
err := w.configure(ctx, w.configFilePath, w.args.Options)
if err != nil {
return err
}
Expand Down Expand Up @@ -341,6 +356,15 @@ func (w *nfdWorker) Run() error {
return err
}

case s := <-w.sourceEvent:
if err := (*s).Discover(); err != nil {
klog.ErrorS(err, "feature discovery failed", "source", (*s).Name())
break
}
if err = w.publishNodeFeatureObject(); err != nil {
return err
}

case <-w.stop:
klog.InfoS("shutting down nfd-worker")
return nil
Expand All @@ -361,7 +385,7 @@ func (c *coreConfig) sanitize() {
}
}

func (w *nfdWorker) configureCore(c coreConfig) error {
func (w *nfdWorker) configureCore(ctx context.Context, c coreConfig) error {
// Handle klog
err := klogutils.MergeKlogConfiguration(w.args.Klog, c.Klog)
if err != nil {
Expand Down Expand Up @@ -438,6 +462,13 @@ func (w *nfdWorker) configureCore(c coreConfig) error {
return w.labelSources[i].Name() < w.labelSources[j].Name()
})

eventSources := source.GetAllEventSources()
for _, s := range eventSources {
if err := s.SetNotifyChannel(ctx, w.sourceEvent); err != nil {
klog.ErrorS(err, "failed to set notify channel for event source", "source", s.Name())
}
}

if klogV := klog.V(1); klogV.Enabled() {
n := make([]string, len(w.featureSources))
for i, s := range w.featureSources {
Expand All @@ -461,7 +492,7 @@ func (w *nfdWorker) configureCore(c coreConfig) error {
}

// Parse configuration options
func (w *nfdWorker) configure(filepath string, overrides string) error {
func (w *nfdWorker) configure(ctx context.Context, filepath string, overrides string) error {
// Create a new default config
c := newDefaultConfig()
confSources := source.GetAllConfigurableSources()
Expand Down Expand Up @@ -516,7 +547,7 @@ func (w *nfdWorker) configure(filepath string, overrides string) error {

w.config = c

if err := w.configureCore(c.Core); err != nil {
if err := w.configureCore(ctx, c.Core); err != nil {
return err
}

Expand Down
67 changes: 64 additions & 3 deletions source/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ import (
"strings"
"time"

"golang.org/x/net/context"
"k8s.io/klog/v2"

"github.com/fsnotify/fsnotify"
nfdv1alpha1 "sigs.k8s.io/node-feature-discovery/api/nfd/v1alpha1"
"sigs.k8s.io/node-feature-discovery/pkg/utils"
"sigs.k8s.io/node-feature-discovery/source"
Expand Down Expand Up @@ -65,10 +67,11 @@ var (
featureFilesDir = "/etc/kubernetes/node-feature-discovery/features.d/"
)

// localSource implements the FeatureSource and LabelSource interfaces.
// localSource implements the FeatureSource, LabelSource, EventSource interfaces.
type localSource struct {
features *nfdv1alpha1.Features
config *Config
features *nfdv1alpha1.Features
config *Config
fsWatcher *fsnotify.Watcher
}

type Config struct {
Expand All @@ -87,6 +90,7 @@ var (
_ source.FeatureSource = &src
_ source.LabelSource = &src
_ source.ConfigurableSource = &src
_ source.EventSource = &src
)

// Name method of the LabelSource interface
Expand Down Expand Up @@ -318,6 +322,63 @@ func getFileContent(fileName string) ([][]byte, error) {
return lines, nil
}

func (s *localSource) runNotifier(ctx context.Context, ch chan *source.FeatureSource) {
rateLimit := time.NewTicker(time.Second)
defer rateLimit.Stop()
limit := false
for {
select {
Copy link

Copilot AI Aug 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The runNotifier goroutine runs indefinitely without a way to stop it. Consider adding a context or stop channel to allow graceful shutdown and prevent goroutine leaks.

Suggested change
select {
func (s *localSource) runNotifier(ctx context.Context, ch chan struct{}) {
for {
select {
case <-ctx.Done():
return

Copilot uses AI. Check for mistakes.
case event := <-s.fsWatcher.Events:
opAny := fsnotify.Create | fsnotify.Write | fsnotify.Remove | fsnotify.Rename | fsnotify.Chmod
if event.Op&opAny != 0 {
klog.V(2).InfoS("fsnotify event", "eventName", event.Name, "eventOp", event.Op)
if !limit {
fs := source.FeatureSource(s)
ch <- &fs
limit = true
}
}
case err := <-s.fsWatcher.Errors:
klog.ErrorS(err, "failed to watch features.d changes")
case <-rateLimit.C:
limit = false
case <-ctx.Done():
return
}
}
}

// SetNotifyChannel method of the EventSource Interface
func (s *localSource) SetNotifyChannel(ctx context.Context, ch chan *source.FeatureSource) error {
info, err := os.Stat(featureFilesDir)
if err != nil {
if !os.IsNotExist(err) {
return err
}
}

if info != nil && info.IsDir() {
if s.fsWatcher == nil {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return err
}
err = watcher.Add(featureFilesDir)
if err != nil {
errWatcher := watcher.Close()
if errWatcher != nil {
klog.ErrorS(errWatcher, "failed to close fsnotify watcher")
}
return fmt.Errorf("unable to access %v: %w", featureFilesDir, err)
}
s.fsWatcher = watcher
}
go s.runNotifier(ctx, ch)
}

Copy link

Copilot AI Aug 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Starting the goroutine unconditionally even when fsWatcher is nil (when directory doesn't exist) will cause the goroutine to block indefinitely on nil channel reads, leading to a goroutine leak.

Suggested change
go s.runNotifier(ch)
}

Copilot uses AI. Check for mistakes.
return nil
}

func init() {
source.Register(&src)
}
23 changes: 23 additions & 0 deletions source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ package source
import (
"fmt"

"golang.org/x/net/context"

nfdv1alpha1 "sigs.k8s.io/node-feature-discovery/api/nfd/v1alpha1"
)

Expand Down Expand Up @@ -77,6 +79,16 @@ type SupplementalSource interface {
DisableByDefault() bool
}

// EventSource is an interface for a source that can send events
type EventSource interface {
FeatureSource

// SetNotifyChannel sets the notification channel used to send updates about feature changes.
// The provided channel will receive a notification (a pointer to the FeatureSource) whenever
// the source detects new or updated features, typically after a successful Discover operation.
SetNotifyChannel(ctx context.Context, ch chan *FeatureSource) error
}

// FeatureLabelValue represents the value of one feature label
type FeatureLabelValue interface{}

Expand Down Expand Up @@ -155,6 +167,17 @@ func GetAllConfigurableSources() map[string]ConfigurableSource {
return all
}

// GetAllEventSources returns all registered event sources
func GetAllEventSources() map[string]EventSource {
all := make(map[string]EventSource)
for k, v := range sources {
if s, ok := v.(EventSource); ok {
all[k] = s
}
}
return all
}

// GetAllFeatures returns a combined set of all features from all feature
// sources.
func GetAllFeatures() *nfdv1alpha1.Features {
Expand Down