Skip to content

Commit 4bebc62

Browse files
committed
Reconcile all nodes via a special event
This is to improve the error handling on `reconcileAllNodes()`. See [1] for details. Prior to this commit, the event on MC/MCP will be re-queued if `reconcileAllNodes()` hits an error. However, `syncMachineConfig()` (or `syncMachineConfigPool` respectively) is stateful, i.e., the result replies on the content of the caches that might be changed from the original event. With the commit, a special event stays between an MC/MCP event and `reconcileAllNodes()`. An error from the latter will re-queue the special event which basically means triggering another run of `reconcileAllNodes()`. [1]. #1144 (comment)
1 parent ff3778c commit 4bebc62

File tree

2 files changed

+105
-9
lines changed

2 files changed

+105
-9
lines changed

pkg/updatestatus/nodeinformer.go

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,11 @@ func (c *nodeInformerController) sync(ctx context.Context, syncCtx factory.SyncC
107107

108108
var msg informerMsg
109109
switch t {
110+
case eventKindName:
111+
if name != eventNameReconcileAllNodes {
112+
return fmt.Errorf("invalid name in queue key %s with type %s", queueKey, t)
113+
}
114+
return c.reconcileAllNodes(syncCtx.Queue())
110115
case nodeKindName:
111116
node, err := c.nodes.Get(name)
112117
if err != nil {
@@ -163,13 +168,15 @@ func (c *nodeInformerController) sync(ctx context.Context, syncCtx factory.SyncC
163168
if changed := c.machineConfigVersionCache.forget(name); changed {
164169

165170
klog.V(2).Infof("Reconciling all nodes as machine config %q is deleted", name)
166-
return c.reconcileAllNodes(syncCtx.Queue())
171+
queueKeyFoReconcileAllNodes(syncCtx.Queue())
172+
return nil
167173
}
168174
return nil
169175
}
170176
if changed := c.machineConfigVersionCache.ingest(machineConfig); changed {
171177
klog.V(2).Infof("Reconciling all nodes as machine config %q is refreshed", name)
172-
return c.reconcileAllNodes(syncCtx.Queue())
178+
queueKeyFoReconcileAllNodes(syncCtx.Queue())
179+
return nil
173180
}
174181
return nil
175182
case machineConfigPoolKindName:
@@ -181,13 +188,15 @@ func (c *nodeInformerController) sync(ctx context.Context, syncCtx factory.SyncC
181188
// The pool was deleted
182189
if changed := c.machineConfigPoolSelectorCache.forget(name); changed {
183190
klog.V(2).Infof("Reconciling all nodes as machine config pool %q is deleted", name)
184-
return c.reconcileAllNodes(syncCtx.Queue())
191+
queueKeyFoReconcileAllNodes(syncCtx.Queue())
192+
return nil
185193
}
186194
return nil
187195
}
188196
if changed := c.machineConfigPoolSelectorCache.ingest(machineConfigPool); changed {
189197
klog.V(2).Infof("Reconciling all nodes as machine config pool %q is refreshed", name)
190-
return c.reconcileAllNodes(syncCtx.Queue())
198+
queueKeyFoReconcileAllNodes(syncCtx.Queue())
199+
return nil
191200
}
192201
return nil
193202
default:
@@ -358,6 +367,10 @@ func (c *machineConfigPoolSelectorCache) len() int {
358367
return len(c.cache)
359368
}
360369

370+
func queueKeyFoReconcileAllNodes(queue workqueue.TypedRateLimitingInterface[any]) {
371+
queue.Add(kindAndNameToQueueKey(eventKindName, eventNameReconcileAllNodes))
372+
}
373+
361374
func (c *nodeInformerController) reconcileAllNodes(queue workqueue.TypedRateLimitingInterface[any]) error {
362375
nodes, err := c.nodes.List(labels.Everything())
363376
if err != nil {
@@ -581,9 +594,11 @@ func assessNode(node *corev1.Node, mcp *machineconfigv1.MachineConfigPool, machi
581594
}
582595

583596
const (
584-
nodeKindName = "Node"
585-
machineConfigKindName = "MachineConfig"
586-
machineConfigPoolKindName = "MachineConfigPool"
597+
nodeKindName = "Node"
598+
machineConfigKindName = "MachineConfig"
599+
machineConfigPoolKindName = "MachineConfigPool"
600+
eventKindName = "Event"
601+
eventNameReconcileAllNodes = "reconcileAllNodes"
587602

588603
nodesInformerName = "ni"
589604
)
@@ -601,6 +616,11 @@ func nodeInformerControllerQueueKeys(object runtime.Object) []string {
601616
return nil
602617
}
603618
switch o := object.(type) {
619+
case *corev1.Event:
620+
if o.Name != eventNameReconcileAllNodes {
621+
panic(fmt.Sprintf("USC :: Unknown object %s with type: %T", o.Name, object))
622+
}
623+
return []string{kindAndNameToQueueKey(eventKindName, o.Name)}
604624
case *corev1.Node:
605625
return []string{kindAndNameToQueueKey(nodeKindName, o.Name)}
606626
case *machineconfigv1.MachineConfig:

pkg/updatestatus/nodeinformer_test.go

Lines changed: 78 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1118,6 +1118,82 @@ func Test_sync_with_node(t *testing.T) {
11181118
}
11191119
}
11201120

1121+
func Test_sync_with_event(t *testing.T) {
1122+
1123+
nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
1124+
for _, o := range []metav1.Object{&corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "master-1"}},
1125+
&corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "worker-1"}}} {
1126+
if err := nodeIndexer.Add(o); err != nil {
1127+
t.Fatalf("Failed to add object to indexer: %v", err)
1128+
}
1129+
}
1130+
nodeLister := corelistersv1.NewNodeLister(nodeIndexer)
1131+
1132+
testCases := []struct {
1133+
name string
1134+
1135+
object runtime.Object
1136+
1137+
expectedPanic bool
1138+
expectedErr error
1139+
expectedQueueLen int
1140+
}{
1141+
{
1142+
name: "reconcile for all nodes",
1143+
object: &corev1.Event{ObjectMeta: metav1.ObjectMeta{Name: "reconcileAllNodes"}},
1144+
expectedQueueLen: 2,
1145+
},
1146+
{
1147+
name: "panic with aaa",
1148+
object: &corev1.Event{ObjectMeta: metav1.ObjectMeta{Name: "a"}},
1149+
expectedPanic: true,
1150+
},
1151+
}
1152+
for _, tc := range testCases {
1153+
t.Run(tc.name, func(t *testing.T) {
1154+
defer func() {
1155+
if tc.expectedPanic {
1156+
if r := recover(); r == nil {
1157+
t.Errorf("The expected panic did not happen")
1158+
}
1159+
}
1160+
}()
1161+
1162+
var actualMsgs []informerMsg
1163+
var sendInsight sendInsightFn = func(insight informerMsg) {
1164+
actualMsgs = append(actualMsgs, insight)
1165+
}
1166+
1167+
controller := &nodeInformerController{
1168+
nodes: nodeLister,
1169+
sendInsight: sendInsight,
1170+
}
1171+
queueKey := nodeInformerControllerQueueKeys(tc.object)[0]
1172+
1173+
syncContext := newTestSyncContext(queueKey)
1174+
actualErr := controller.sync(context.TODO(), syncContext)
1175+
1176+
if diff := cmp.Diff(tc.expectedErr, actualErr, cmp.Comparer(func(x, y error) bool {
1177+
if x == nil || y == nil {
1178+
return x == nil && y == nil
1179+
}
1180+
return x.Error() == y.Error()
1181+
})); diff != "" {
1182+
t.Errorf("%s: error differs from expected:\n%s", tc.name, diff)
1183+
}
1184+
1185+
if diff := cmp.Diff(tc.expectedQueueLen, syncContext.Queue().Len()); diff != "" {
1186+
t.Errorf("queue length after sync differs from expected:\n%s", diff)
1187+
}
1188+
1189+
var expectedMsgs []informerMsg
1190+
if diff := cmp.Diff(expectedMsgs, actualMsgs, cmp.AllowUnexported(informerMsg{})); diff != "" {
1191+
t.Errorf("Sync messages differ from expected:\n%s", diff)
1192+
}
1193+
})
1194+
}
1195+
}
1196+
11211197
func Test_sync_with_mcp(t *testing.T) {
11221198
now := metav1.Now()
11231199

@@ -1157,7 +1233,7 @@ func Test_sync_with_mcp(t *testing.T) {
11571233
},
11581234
pools: []*machineconfigv1.MachineConfigPool{getMCP("master"), getMCP("worker"), getMCP("non-exist-mcp")},
11591235
mcpToRemove: "non-exist-mcp",
1160-
expectedQueueLen: 2,
1236+
expectedQueueLen: 1,
11611237
},
11621238
{
11631239
name: "reconcile for a new pool",
@@ -1168,7 +1244,7 @@ func Test_sync_with_mcp(t *testing.T) {
11681244
},
11691245
pools: []*machineconfigv1.MachineConfigPool{getMCP("master"), getMCP("worker")},
11701246
mcpToAdd: "infra",
1171-
expectedQueueLen: 2,
1247+
expectedQueueLen: 1,
11721248
},
11731249
{
11741250
name: "no-op if not-found",

0 commit comments

Comments
 (0)