Skip to content

Commit fe60c43

Browse files
authored
Merge pull request kubernetes#130514 from xigang/daemonset
Add workqueue for node updates in DaemonSetController
2 parents 64621d1 + aa32537 commit fe60c43

File tree

2 files changed

+134
-29
lines changed

2 files changed

+134
-29
lines changed

pkg/controller/daemon/daemon_controller.go

Lines changed: 101 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ type DaemonSetsController struct {
113113
historyStoreSynced cache.InformerSynced
114114
// podLister get list/get pods from the shared informers's store
115115
podLister corelisters.PodLister
116+
// podIndexer allows looking up pods by node name.
117+
podIndexer cache.Indexer
116118
// podStoreSynced returns true if the pod store has been synced at least once.
117119
// Added as a member to the struct to allow injection for testing.
118120
podStoreSynced cache.InformerSynced
@@ -125,6 +127,10 @@ type DaemonSetsController struct {
125127
// DaemonSet keys that need to be synced.
126128
queue workqueue.TypedRateLimitingInterface[string]
127129

130+
// nodeUpdateQueue is a workqueue that processes node updates to ensure DaemonSets
131+
// are properly reconciled when node properties change
132+
nodeUpdateQueue workqueue.TypedRateLimitingInterface[string]
133+
128134
failedPodsBackoff *flowcontrol.Backoff
129135
}
130136

@@ -159,6 +165,12 @@ func NewDaemonSetsController(
159165
Name: "daemonset",
160166
},
161167
),
168+
nodeUpdateQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
169+
workqueue.DefaultTypedControllerRateLimiter[string](),
170+
workqueue.TypedRateLimitingQueueConfig[string]{
171+
Name: "daemonset-node-updates",
172+
},
173+
),
162174
}
163175

164176
daemonSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
@@ -204,6 +216,8 @@ func NewDaemonSetsController(
204216
})
205217
dsc.podLister = podInformer.Lister()
206218
dsc.podStoreSynced = podInformer.Informer().HasSynced
219+
controller.AddPodNodeNameIndexer(podInformer.Informer())
220+
dsc.podIndexer = podInformer.Informer().GetIndexer()
207221

208222
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
209223
AddFunc: func(obj interface{}) {
@@ -289,6 +303,7 @@ func (dsc *DaemonSetsController) Run(ctx context.Context, workers int) {
289303
defer dsc.eventBroadcaster.Shutdown()
290304

291305
defer dsc.queue.ShutDown()
306+
defer dsc.nodeUpdateQueue.ShutDown()
292307

293308
logger := klog.FromContext(ctx)
294309
logger.Info("Starting daemon sets controller")
@@ -300,6 +315,7 @@ func (dsc *DaemonSetsController) Run(ctx context.Context, workers int) {
300315

301316
for i := 0; i < workers; i++ {
302317
go wait.UntilWithContext(ctx, dsc.runWorker, time.Second)
318+
go wait.UntilWithContext(ctx, dsc.runNodeUpdateWorker, time.Second)
303319
}
304320

305321
go wait.Until(dsc.failedPodsBackoff.GC, BackoffGCInterval, ctx.Done())
@@ -643,18 +659,14 @@ func (dsc *DaemonSetsController) deletePod(logger klog.Logger, obj interface{})
643659
}
644660

645661
func (dsc *DaemonSetsController) addNode(logger klog.Logger, obj interface{}) {
646-
// TODO: it'd be nice to pass a hint with these enqueues, so that each ds would only examine the added node (unless it has other work to do, too).
647-
dsList, err := dsc.dsLister.List(labels.Everything())
648-
if err != nil {
649-
logger.V(4).Info("Error enqueueing daemon sets", "err", err)
662+
node, ok := obj.(*v1.Node)
663+
if !ok {
664+
utilruntime.HandleError(fmt.Errorf("couldn't get node from object %#v", obj))
650665
return
651666
}
652-
node := obj.(*v1.Node)
653-
for _, ds := range dsList {
654-
if shouldRun, _ := NodeShouldRunDaemonPod(node, ds); shouldRun {
655-
dsc.enqueueDaemonSet(ds)
656-
}
657-
}
667+
668+
logger.V(4).Info("Queuing node addition", "node", klog.KObj(node))
669+
dsc.nodeUpdateQueue.Add(node.Name)
658670
}
659671

660672
// shouldIgnoreNodeUpdate returns true if Node labels and taints have not changed, otherwise returns false.
@@ -667,24 +679,13 @@ func shouldIgnoreNodeUpdate(oldNode, curNode v1.Node) bool {
667679
func (dsc *DaemonSetsController) updateNode(logger klog.Logger, old, cur interface{}) {
668680
oldNode := old.(*v1.Node)
669681
curNode := cur.(*v1.Node)
682+
670683
if shouldIgnoreNodeUpdate(*oldNode, *curNode) {
671684
return
672685
}
673686

674-
dsList, err := dsc.dsLister.List(labels.Everything())
675-
if err != nil {
676-
logger.V(4).Info("Error listing daemon sets", "err", err)
677-
return
678-
}
679-
// TODO: it'd be nice to pass a hint with these enqueues, so that each ds would only examine the added node (unless it has other work to do, too).
680-
for _, ds := range dsList {
681-
// If NodeShouldRunDaemonPod needs to uses other than Labels and Taints (mutable) properties of node, it needs to update shouldIgnoreNodeUpdate.
682-
oldShouldRun, oldShouldContinueRunning := NodeShouldRunDaemonPod(oldNode, ds)
683-
currentShouldRun, currentShouldContinueRunning := NodeShouldRunDaemonPod(curNode, ds)
684-
if (oldShouldRun != currentShouldRun) || (oldShouldContinueRunning != currentShouldContinueRunning) {
685-
dsc.enqueueDaemonSet(ds)
686-
}
687-
}
687+
logger.V(4).Info("Queuing node update", "node", klog.KObj(curNode))
688+
dsc.nodeUpdateQueue.Add(curNode.Name)
688689
}
689690

690691
// getDaemonPods returns daemon pods owned by the given ds.
@@ -1376,3 +1377,79 @@ func getUnscheduledPodsWithoutNode(runningNodesList []*v1.Node, nodeToDaemonPods
13761377

13771378
return results
13781379
}
1380+
1381+
// runNodeUpdateWorker is a worker that processes node updates from the nodeUpdateQueue.
1382+
func (dsc *DaemonSetsController) runNodeUpdateWorker(ctx context.Context) {
1383+
for dsc.processNextNodeUpdate(ctx) {
1384+
}
1385+
}
1386+
1387+
func (dsc *DaemonSetsController) processNextNodeUpdate(ctx context.Context) bool {
1388+
nodeName, quit := dsc.nodeUpdateQueue.Get()
1389+
if quit {
1390+
return false
1391+
}
1392+
defer dsc.nodeUpdateQueue.Done(nodeName)
1393+
1394+
err := dsc.syncNodeUpdate(ctx, nodeName)
1395+
if err == nil {
1396+
dsc.nodeUpdateQueue.Forget(nodeName)
1397+
return true
1398+
}
1399+
1400+
utilruntime.HandleError(fmt.Errorf("%v failed with : %w", nodeName, err))
1401+
dsc.nodeUpdateQueue.AddRateLimited(nodeName)
1402+
1403+
return true
1404+
}
1405+
1406+
func (dsc *DaemonSetsController) syncNodeUpdate(ctx context.Context, nodeName string) error {
1407+
logger := klog.FromContext(ctx)
1408+
1409+
node, err := dsc.nodeLister.Get(nodeName)
1410+
if apierrors.IsNotFound(err) {
1411+
logger.V(3).Info("Node not found, skipping update", "node", nodeName)
1412+
return nil
1413+
}
1414+
if err != nil {
1415+
return fmt.Errorf("error getting node %s: %w", nodeName, err)
1416+
}
1417+
1418+
dsList, err := dsc.dsLister.List(labels.Everything())
1419+
if err != nil {
1420+
return fmt.Errorf("error listing daemon sets: %w", err)
1421+
}
1422+
1423+
podsOnNode, err := dsc.podIndexer.ByIndex(controller.PodNodeNameKeyIndex, nodeName)
1424+
if err != nil {
1425+
return fmt.Errorf("error getting pods by node name: %w", err)
1426+
}
1427+
1428+
podsByDS := make(map[string][]*v1.Pod)
1429+
for _, obj := range podsOnNode {
1430+
pod := obj.(*v1.Pod)
1431+
controllerRef := metav1.GetControllerOf(pod)
1432+
if controllerRef == nil || controllerRef.Kind != controllerKind.Kind {
1433+
continue
1434+
}
1435+
dsKey := cache.NewObjectName(pod.Namespace, controllerRef.Name).String()
1436+
podsByDS[dsKey] = append(podsByDS[dsKey], pod)
1437+
}
1438+
1439+
for _, ds := range dsList {
1440+
shouldRun, shouldContinueRunning := NodeShouldRunDaemonPod(node, ds)
1441+
1442+
dsKey, err := controller.KeyFunc(ds)
1443+
if err != nil {
1444+
return fmt.Errorf("error getting key for object %#v: %w", ds, err)
1445+
}
1446+
daemonPods := podsByDS[dsKey]
1447+
scheduled := len(daemonPods) > 0
1448+
1449+
if (shouldRun && !scheduled) || (!shouldContinueRunning && scheduled) {
1450+
dsc.enqueueDaemonSet(ds)
1451+
}
1452+
}
1453+
1454+
return nil
1455+
}

pkg/controller/daemon/daemon_controller_test.go

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2494,6 +2494,13 @@ func TestUpdateNode(t *testing.T) {
24942494
t.Fatal(err)
24952495
}
24962496

2497+
manager.nodeUpdateQueue = workqueue.NewTypedRateLimitingQueueWithConfig(
2498+
workqueue.DefaultTypedControllerRateLimiter[string](),
2499+
workqueue.TypedRateLimitingQueueConfig[string]{
2500+
Name: "test-daemon-node-updates",
2501+
},
2502+
)
2503+
24972504
expectedEvents := 0
24982505
if c.expectedEventsFunc != nil {
24992506
expectedEvents = c.expectedEventsFunc(strategy.Type)
@@ -2510,8 +2517,18 @@ func TestUpdateNode(t *testing.T) {
25102517
}
25112518
}
25122519

2520+
err = manager.nodeStore.Add(c.newNode)
2521+
if err != nil {
2522+
t.Fatal(err)
2523+
}
2524+
25132525
enqueued = false
25142526
manager.updateNode(logger, c.oldNode, c.newNode)
2527+
2528+
nodeKeys := getQueuedKeys(manager.nodeUpdateQueue)
2529+
for _, key := range nodeKeys {
2530+
manager.syncNodeUpdate(ctx, key)
2531+
}
25152532
if enqueued != c.shouldEnqueue {
25162533
t.Errorf("Test case: '%s', expected: %t, got: %t", c.test, c.shouldEnqueue, enqueued)
25172534
}
@@ -2880,18 +2897,29 @@ func TestAddNode(t *testing.T) {
28802897
t.Fatal(err)
28812898
}
28822899
manager.addNode(logger, node1)
2883-
if got, want := manager.queue.Len(), 0; got != want {
2900+
if got, want := manager.nodeUpdateQueue.Len(), 1; got != want {
28842901
t.Fatalf("queue.Len() = %v, want %v", got, want)
28852902
}
2903+
key, done := manager.nodeUpdateQueue.Get()
2904+
if done {
2905+
t.Fatal("failed to get item from nodeUpdateQueue")
2906+
}
2907+
if key != node1.Name {
2908+
t.Fatalf("expected node name %v, got %v", node1.Name, key)
2909+
}
2910+
manager.nodeUpdateQueue.Done(key)
28862911

28872912
node2 := newNode("node2", simpleNodeLabel)
28882913
manager.addNode(logger, node2)
2889-
if got, want := manager.queue.Len(), 1; got != want {
2914+
if got, want := manager.nodeUpdateQueue.Len(), 1; got != want {
28902915
t.Fatalf("queue.Len() = %v, want %v", got, want)
28912916
}
2892-
key, done := manager.queue.Get()
2893-
if key == "" || done {
2894-
t.Fatalf("failed to enqueue controller for node %v", node2.Name)
2917+
key, done = manager.nodeUpdateQueue.Get()
2918+
if done {
2919+
t.Fatal("failed to get item from nodeUpdateQueue")
2920+
}
2921+
if key != node2.Name {
2922+
t.Fatalf("expected node name %v, got %v", node2.Name, key)
28952923
}
28962924
}
28972925

0 commit comments

Comments
 (0)